This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 045f8ebc8a3 IGNITE-14341 Reduced contention in the PendingEntriesTree 
when clearing expired entries (#9992)
045f8ebc8a3 is described below

commit 045f8ebc8a3650f6b5a5138740acf92e0911a21f
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Tue May 31 20:24:21 2022 +0300

    IGNITE-14341 Reduced contention in the PendingEntriesTree when clearing 
expired entries (#9992)
---
 .../jmh/cache/JmhCacheExpireBenchmark.java         | 130 +++++++
 .../cache/IgniteCacheOffheapManagerImpl.java       |  39 +--
 .../cache/persistence/tree/BPlusTree.java          | 333 ++++++++++++++++--
 .../processors/database/BPlusTreeSelfTest.java     | 379 ++++++++++++++++++++-
 4 files changed, 801 insertions(+), 80 deletions(-)

diff --git 
a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/cache/JmhCacheExpireBenchmark.java
 
b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/cache/JmhCacheExpireBenchmark.java
new file mode 100644
index 00000000000..4c25bbc12b4
--- /dev/null
+++ 
b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/cache/JmhCacheExpireBenchmark.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmarks.jmh.cache;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/**
+ * Compare put with expiry policy and without expiry policy.
+ */
+@State(Scope.Benchmark)
+@Fork(1)
+@Threads(16)
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Warmup(iterations = 3, time = 3)
+@Measurement(iterations = 3, time = 10)
+public class JmhCacheExpireBenchmark {
+    /** Items count. */
+    private static final int CNT = 100000;
+
+    /** Ignite. */
+    private Ignite ignite;
+
+    /** Cache without expire policy. */
+    private IgniteCache<Integer, Integer> cacheReg;
+
+    /** Cache with expire policy. */
+    private IgniteCache<Integer, Integer> cacheExp;
+
+    /** */
+    @Benchmark
+    public void putWithExpire() {
+        int key = ThreadLocalRandom.current().nextInt(CNT);
+
+        cacheExp.put(key, key);
+    }
+
+    /** */
+    @Benchmark
+    public void putWithoutExpire() {
+        int key = ThreadLocalRandom.current().nextInt(CNT);
+
+        cacheReg.put(key, key);
+    }
+
+    /**
+     * Initiate Ignite and caches.
+     */
+    @Setup(Level.Trial)
+    public void setup() {
+        ignite = Ignition.start(new 
IgniteConfiguration().setIgniteInstanceName("test"));
+
+        cacheReg = ignite.getOrCreateCache(new 
CacheConfiguration<>("CACHE_REG"));
+
+        cacheExp = ignite.getOrCreateCache(
+            new CacheConfiguration<Integer, Integer>("CACHE_EXP")
+                .setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new 
Duration(TimeUnit.MILLISECONDS, 1)))
+        );
+    }
+
+    /**
+     * Clear caches.
+     */
+    @Setup(Level.Iteration)
+    public void setupIteration() {
+        cacheReg.clear();
+        cacheExp.clear();
+    }
+
+    /**
+     * Stop Ignite instance.
+     */
+    @TearDown
+    public void tearDown() {
+        ignite.close();
+    }
+
+    /**
+     * Run benchmarks.
+     *
+     * @param args Args.
+     * @throws Exception Exception.
+     */
+    public static void main(String[] args) throws Exception {
+        final Options options = new OptionsBuilder()
+            .include(JmhCacheExpireBenchmark.class.getSimpleName())
+            .build();
+
+        new Runner(options).run();
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index afe0d02b3c5..33f18528b21 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1363,55 +1363,36 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
         IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c,
         int amount
     ) throws IgniteCheckedException {
-        long now = U.currentTimeMillis();
-
         GridCacheVersion obsoleteVer = null;
 
-        GridCursor<PendingRow> cur;
-
         cctx.shared().database().checkpointReadLock();
 
         try {
-            if (grp.sharedGroup())
-                cur = pendingEntries.find(new PendingRow(cctx.cacheId()), new 
PendingRow(cctx.cacheId(), now, 0));
-            else
-                cur = pendingEntries.find(null, new 
PendingRow(CU.UNDEFINED_CACHE_ID, now, 0));
-
-            if (!cur.next())
-                return 0;
+            int cacheId = grp.sharedGroup() ? cctx.cacheId() : 
CU.UNDEFINED_CACHE_ID;
 
             if (!busyLock.enterBusy())
                 return 0;
 
             try {
-                int cleared = 0;
-
-                do {
-                    if (amount != -1 && cleared > amount)
-                        return cleared;
-
-                    PendingRow row = cur.get();
+                List<PendingRow> rows = pendingEntries.remove(
+                    new PendingRow(cacheId, Long.MIN_VALUE, 0), new 
PendingRow(cacheId, U.currentTimeMillis(), 0), amount);
 
+                for (PendingRow row : rows) {
                     if (row.key.partition() == -1)
                         row.key.partition(cctx.affinity().partition(row.key));
 
                     assert row.key != null && row.link != 0 && row.expireTime 
!= 0 : row;
 
-                    if (pendingEntries.removex(row)) {
-                        if (obsoleteVer == null)
-                            obsoleteVer = cctx.cache().nextVersion();
-
-                        GridCacheEntryEx entry = cctx.cache().entryEx(row.key);
+                    if (obsoleteVer == null)
+                        obsoleteVer = cctx.cache().nextVersion();
 
-                        if (entry != null)
-                            c.apply(entry, obsoleteVer);
-                    }
+                    GridCacheEntryEx entry = cctx.cache().entryEx(row.key);
 
-                    cleared++;
+                    if (entry != null)
+                        c.apply(entry, obsoleteVer);
                 }
-                while (cur.next());
 
-                return cleared;
+                return rows.size();
             }
             finally {
                 busyLock.leaveBusy();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
index 5a7cc63c99e..69ae6a17171 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
@@ -532,9 +532,9 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
     /**
      *
      */
-    private class RemoveFromLeaf extends GetPageHandler<Remove> {
+    private class RemoveFromLeaf<R extends Remove> extends GetPageHandler<R> {
         /** {@inheritDoc} */
-        @Override public Result run0(long leafId, long leafPage, long 
leafAddr, BPlusIO<L> io, Remove r, int lvl)
+        @Override public Result run0(long leafId, long leafPage, long 
leafAddr, BPlusIO<L> io, R r, int lvl)
             throws IgniteCheckedException {
             assert lvl == 0 : lvl; // Leaf.
 
@@ -551,18 +551,42 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
             if (idx < 0)
                 return RETRY; // We've found exact match on search but now 
it's gone.
 
+            return doRemoveOrLockTail(idx, cnt, 1, leafId, leafPage, leafAddr, 
io, r);
+        }
+
+        /**
+         * @param idx Insertion index.
+         * @param cnt Row count.
+         * @param rmvCnt Number of rows to remove.
+         * @param leafId Leaf page ID.
+         * @param leafPage Leaf page pointer.
+         * @param leafAddr Leaf page address.
+         * @param io IO.
+         * @param r Remove operation.
+         * @throws IgniteCheckedException If failed.
+         */
+        protected Result doRemoveOrLockTail(
+            int idx,
+            int cnt,
+            int rmvCnt,
+            long leafId,
+            long leafPage,
+            long leafAddr,
+            BPlusIO<L> io,
+            R r
+        ) throws IgniteCheckedException {
             assert idx >= 0 && idx < cnt : idx;
 
             // Need to do inner replace when we remove the rightmost element 
and the leaf have no forward page,
             // i.e. it is not the rightmost leaf of the tree.
-            boolean needReplaceInner = canGetRowFromInner && idx == cnt - 1 && 
io.getForward(leafAddr) != 0;
+            boolean needReplaceInner = canGetRowFromInner && idx == cnt - 
rmvCnt && io.getForward(leafAddr) != 0;
 
             // !!! Before modifying state we have to make sure that we will 
not go for retry.
 
             // We may need to replace inner key or want to merge this leaf 
with sibling after the remove -> keep lock.
             if (needReplaceInner ||
                 // We need to make sure that we have back or forward to be 
able to merge.
-                ((r.fwdId != 0 || r.backId != 0) && mayMerge(cnt - 1, 
io.getMaxCount(leafAddr, pageSize())))) {
+                ((r.fwdId != 0 || r.backId != 0) && mayMerge(cnt - rmvCnt, 
io.getMaxCount(leafAddr, pageSize())))) {
                 // If we have backId then we've already locked back page, 
nothing to do here.
                 if (r.fwdId != 0 && r.backId == 0) {
                     Result res = r.lockForward(0);
@@ -580,7 +604,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
                 assert r.needReplaceInner == FALSE : "needReplaceInner";
                 assert r.needMergeEmptyBranch == FALSE : 
"needMergeEmptyBranch";
 
-                if (cnt == 1) // It was the last element on the leaf.
+                if (cnt == rmvCnt) // It was the last element on the leaf.
                     r.needMergeEmptyBranch = TRUE;
 
                 if (needReplaceInner)
@@ -601,6 +625,58 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
         }
     }
 
+    /** */
+    private final PageHandler<Remove, Result> rmvRangeFromLeaf;
+
+    /**
+     *
+     */
+    private class RemoveRangeFromLeaf extends RemoveFromLeaf<RemoveRange> {
+        /** {@inheritDoc} */
+        @Override public Result run0(long leafId, long leafPage, long 
leafAddr, BPlusIO<L> io, RemoveRange r, int lvl)
+            throws IgniteCheckedException {
+            assert lvl == 0 : lvl; // Leaf.
+
+            // Check the triangle invariant.
+            if (io.getForward(leafAddr) != r.fwdId)
+                return RETRY;
+
+            final int cnt = io.getCount(leafAddr);
+
+            assert cnt <= Short.MAX_VALUE : cnt;
+
+            int idx = findInsertionPoint(lvl, io, leafAddr, 0, cnt, r.lower, 
0);
+
+            if (idx < 0) {
+                idx = fix(idx);
+
+                // Before the page was locked, its state could have changed, 
so you need to make sure that
+                // it has elements from the range, otherwise repeat the search.
+                if (idx == cnt || compare(io, leafAddr, idx, r.upper) > 0)
+                    return RETRY;
+            }
+
+            r.highIdx = findInsertionPoint(lvl, io, leafAddr, idx, cnt, 
r.upper, 0);
+
+            int highIdx = r.highIdx >= 0 ? r.highIdx : fix(r.highIdx) - 1;
+
+            if (r.remaining != -1 && highIdx - idx + 1 >= r.remaining)
+                highIdx = idx + r.remaining - 1;
+
+            assert highIdx >= idx : "low=" + idx + ", high=" + highIdx;
+
+            r.highIdx = r.highIdx >= 0 ? highIdx : -highIdx - 1;
+
+            Result res = doRemoveOrLockTail(idx, cnt, highIdx - idx + 1, 
leafId, leafPage, leafAddr, io, r);
+
+            // Search row should point to the rightmost element, otherwise we 
won't find it on the inner node.
+            if (res == FOUND && r.needReplaceInner == TRUE)
+                r.row = getRow(io, leafAddr, highIdx);
+
+            return res;
+        }
+    }
+
     /** */
     private final PageHandler<Remove, Result> lockBackAndRmvFromLeaf;
 
@@ -915,6 +991,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
         rmvFromLeaf = (PageHandler<Remove, Result>)wrap(this, new 
RemoveFromLeaf());
         insert = (PageHandler<Put, Result>)wrap(this, new Insert());
         replace = (PageHandler<Put, Result>)wrap(this, new Replace());
+        rmvRangeFromLeaf = (PageHandler<Remove, Result>)wrap(this, new 
RemoveRangeFromLeaf());
     }
 
     /**
@@ -1989,7 +2066,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
      * @throws IgniteCheckedException If failed.
      */
     @Override public final T remove(L row) throws IgniteCheckedException {
-        return doRemove(row, true);
+        return doRemove(new Remove(row, true));
     }
 
     /**
@@ -1998,11 +2075,33 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
      * @return {@code True} if removed row.
      */
     public final boolean removex(L row) throws IgniteCheckedException {
-        Boolean res = (Boolean)doRemove(row, false);
+        Boolean res = (Boolean)doRemove(new Remove(row, false));
 
         return res != null ? res : false;
     }
 
+    /**
+     * @param lower Lower bound (inclusive).
+     * @param upper Upper bound (inclusive).
+     * @param limit Limit of processed entries by single call, {@code 0} for 
no limit.
+     * @return Removed rows.
+     * @throws IgniteCheckedException If failed.
+     */
+    public List<L> remove(L lower, L upper, int limit) throws 
IgniteCheckedException {
+        // We may not find the lower bound if the inner node
+        // contain a key that is not present on the leaf page.
+        assert canGetRowFromInner : "Not supported";
+        assert limit >= 0 : limit;
+
+        RemoveRange rmvOp = new RemoveRange(lower, upper, true, limit);
+
+        doRemove(rmvOp);
+
+        assert rmvOp.isDone();
+
+        return Collections.unmodifiableList(rmvOp.removedRows);
+    }
+
     /** {@inheritDoc} */
     @Override public void invoke(L row, Object z, InvokeClosure<T> c) throws 
IgniteCheckedException {
         checkDestroyed();
@@ -2155,17 +2254,15 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
     }
 
     /**
-     * @param row Lookup row.
-     * @param needOld {@code True} if need return removed row.
-     * @return Removed row.
+     * @return r Remove operation.
      * @throws IgniteCheckedException If failed.
      */
-    private T doRemove(L row, boolean needOld) throws IgniteCheckedException {
+    private T doRemove(Remove r) throws IgniteCheckedException {
         assert !sequentialWriteOptsEnabled;
 
-        checkDestroyed();
+        L row = r.row;
 
-        Remove r = new Remove(row, needOld);
+        checkDestroyed();
 
         try {
             for (;;) {
@@ -2282,9 +2379,10 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
                         // We are at the bottom.
                         assert lvl == 0 : lvl;
 
-                        r.finish();
+                        if (!r.ceil())
+                            return r.finish(res);
 
-                        return res;
+                        // Intentional fallthrough to remove something from 
this page.
 
                     case FOUND:
                         return r.tryRemoveFromLeaf(pageId, page, backId, 
fwdId, lvl);
@@ -4250,9 +4348,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
             if (isRemove()) {
                 assert lvl == 0;
 
-                ((Remove)op).finish();
-
-                return NOT_FOUND;
+                return ((Remove)op).finish(NOT_FOUND);
             }
 
             return ((Put)op).tryInsert(pageId, page, fwdId, lvl);
@@ -4534,7 +4630,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
     /**
      * Remove operation.
      */
-    public final class Remove extends Update implements ReuseBag {
+    public class Remove extends Update implements ReuseBag {
         /** */
         Bool needReplaceInner = FALSE;
 
@@ -4553,14 +4649,28 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
         /** */
         final boolean needOld;
 
+        /** */
+        final PageHandler<Remove, Result> rmvFromLeafHnd;
+
         /**
          * @param row Row.
          * @param needOld {@code True} If need return old value.
          */
         private Remove(L row, boolean needOld) {
+            this(row, needOld, rmvFromLeaf);
+        }
+
+        /**
+         * @param row Row.
+         * @param needOld {@code True} If need return old value.
+         * @param rmvFromLeaf Remove from leaf page handler.
+         */
+        private Remove(L row, boolean needOld, PageHandler<Remove, Result> 
rmvFromLeaf) {
             super(row);
 
             this.needOld = needOld;
+
+            rmvFromLeafHnd = rmvFromLeaf;
         }
 
         /** {@inheritDoc} */
@@ -4618,7 +4728,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
         }
 
         /** {@inheritDoc} */
-        @Override boolean notFound(BPlusIO<L> io, long pageAddr, int idx, int 
lvl) {
+        @Override boolean notFound(BPlusIO<L> io, long pageAddr, int idx, int 
lvl) throws IgniteCheckedException {
             if (lvl == 0) {
                 assert tail == null;
 
@@ -4628,13 +4738,23 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
             return false;
         }
 
+        /**
+         * @return Flag indicating that values are removed using an interval
+         * (i.e. {@link #row} specifies the start of the interval, not an 
exact match).
+         */
+        protected boolean ceil() {
+            return false;
+        }
+
         /**
          * Finish the operation.
          */
-        private void finish() {
+        protected Result finish(Result res) {
             assert tail == null;
 
             row = null;
+
+            return res;
         }
 
         /**
@@ -4716,7 +4836,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
         /**
          * @return {@code true} If already removed from leaf.
          */
-        private boolean isRemoved() {
+        protected boolean isRemoved() {
             return rmvd != null;
         }
 
@@ -4724,7 +4844,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
          * @param t Tail to release.
          * @return {@code true} If we need to retry or {@code false} to exit.
          */
-        private boolean releaseForRetry(Tail<L> t) {
+        protected boolean releaseForRetry(Tail<L> t) {
             // Try to simply release all first.
             if (t.lvl <= 1) {
                 // We've just locked leaf and did not do the remove, can 
safely release all and retry.
@@ -4859,9 +4979,8 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
             assert isRemoved();
 
             releaseTail();
-            finish();
 
-            return FOUND;
+            return finish(FOUND);
         }
 
         /**
@@ -4911,10 +5030,10 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
          * @return Result code.
          * @throws IgniteCheckedException If failed.
          */
-        private Result doRemoveFromLeaf() throws IgniteCheckedException {
+        protected Result doRemoveFromLeaf() throws IgniteCheckedException {
             assert page != 0L;
 
-            return write(pageId, page, rmvFromLeaf, this, 0, RETRY, 
statisticsHolder());
+            return write(pageId, page, rmvFromLeafHnd, this, 0, RETRY, 
statisticsHolder());
         }
 
         /**
@@ -4966,7 +5085,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
          * @return Result code.
          * @throws IgniteCheckedException If failed.
          */
-        private Result lockForward(int lvl) throws IgniteCheckedException {
+        protected Result lockForward(int lvl) throws IgniteCheckedException {
             assert fwdId != 0 : fwdId;
             assert backId == 0 : backId;
 
@@ -4993,9 +5112,15 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
          * @param idx Index to remove.
          * @throws IgniteCheckedException If failed.
          */
-        private void removeDataRowFromLeaf(long pageId, long page, long 
pageAddr, Boolean walPlc, BPlusIO<L> io, int cnt,
-            int idx)
-            throws IgniteCheckedException {
+        protected void removeDataRowFromLeaf(
+            long pageId,
+            long page,
+            long pageAddr,
+            Boolean walPlc,
+            BPlusIO<L> io,
+            int cnt,
+            int idx
+        ) throws IgniteCheckedException {
             assert idx >= 0 && idx < cnt : idx;
             assert io.isLeaf() : "inner";
             assert !isRemoved() : "already removed";
@@ -5020,7 +5145,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
          * @param idx Index to remove.
          * @throws IgniteCheckedException If failed.
          */
-        private void doRemove(long pageId, long page, long pageAddr, Boolean 
walPlc, BPlusIO<L> io, int cnt,
+        protected void doRemove(long pageId, long page, long pageAddr, Boolean 
walPlc, BPlusIO<L> io, int cnt,
             int idx)
             throws IgniteCheckedException {
             assert cnt > 0 : cnt;
@@ -5342,7 +5467,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
             Result res = removeFromLeaf(pageId, page, backId, fwdId);
 
             if (res == FOUND && tail == null) // Finish if we don't need to do 
any merges.
-                finish();
+                return finish(res);
 
             return res;
         }
@@ -6347,4 +6472,146 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
             getClass().getSimpleName() + " [grpName=" + grpName + ", 
treeName=" + name() + ", metaPageId=" +
             U.hexLong(metaPageId) + "].";
     }
+
+    /**
+     * The operation of deleting a range of values.
+     * <p>
+     * Performs the removal of several elements from the leaf at once.
+     */
+    protected class RemoveRange extends Remove {
+        /** Upper bound. */
+        private final L upper;
+
+        /** Lower bound. */
+        private final L lower;
+
+        /** List of removed rows. */
+        private final List<L> removedRows;
+
+        /** The number of remaining rows to remove ({@code -1}, if the limit 
hasn't been specified). */
+        private int remaining;
+
+        /** Flag indicating that no more rows were found from the specified 
range. */
+        private boolean completed;
+
+        /** The index of the highest row found on the page from the specified 
range. */
+        private int highIdx;
+
+        /**
+         * @param lower Lower bound (inclusive).
+         * @param upper Upper bound (inclusive).
+         * @param needOld {@code True} If need return old value.
+         */
+        protected RemoveRange(L lower, L upper, boolean needOld, int limit) {
+            super(lower, needOld, rmvRangeFromLeaf);
+
+            this.lower = lower;
+            this.upper = upper;
+
+            remaining = limit <= 0 ? -1 : limit;
+            removedRows = needOld ? new ArrayList<>() : null;
+        }
+
+        /**
+         * @return {@code True} if operation is completed.
+         */
+        private boolean isDone() {
+            return completed || remaining == 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override boolean notFound(BPlusIO<L> io, long pageAddr, int idx, int 
lvl) throws IgniteCheckedException {
+            if (lvl != 0)
+                return false;
+
+            assert !completed;
+            assert tail == null;
+
+            // If the lower bound is higher than the rightmost item, or if 
this item is outside the given range,
+            // then the search is completed - there are no items from the 
given range.
+            if (idx == io.getCount(pageAddr) || compare(io, pageAddr, idx, 
upper) > 0)
+                completed = true;
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected boolean ceil() {
+            return !completed;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void removeDataRowFromLeaf(
+            long pageId,
+            long page,
+            long pageAddr,
+            Boolean walPlc,
+            BPlusIO<L> io,
+            int cnt,
+            int idx
+        ) throws IgniteCheckedException {
+            assert io.isLeaf() : "inner";
+            assert !isRemoved() : "already removed";
+            assert remaining > 0 || remaining == -1 : remaining;
+
+            // It's just a marker that we finished with this leaf-page.
+            rmvd = (T)Boolean.TRUE;
+
+            // We had an exact match of the upper bound on this page or the 
upper bound is lower than the last item.
+            if (highIdx >= 0 || (highIdx = fix(highIdx)) < cnt - 1)
+                completed = true;
+
+            assert idx >= 0 && idx <= highIdx && highIdx < cnt : "low=" + idx 
+ ", high=" + highIdx + ", cnt=" + cnt;
+
+            // Delete from right to left to reduce the number of items moved 
during the delete operation.
+            for (int i = highIdx; i >= idx; i--) {
+                if (needOld)
+                    removedRows.add(getRow(io, pageAddr, i));
+
+                doRemove(pageId, page, pageAddr, walPlc, io, cnt - highIdx + 
i, i);
+
+                if (remaining != -1)
+                    --remaining;
+            }
+
+            if (needOld) {
+                // Reverse the order of added elements.
+                int len = highIdx - idx + 1;
+                int off = removedRows.size() - len;
+
+                for (int i = off, j = removedRows.size() - 1; i < j; i++, j--)
+                    removedRows.set(i, removedRows.set(j, removedRows.get(i)));
+            }
+
+            assert isRemoved();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected boolean releaseForRetry(Tail<L> t) {
+            // Reset search row to lower bound.
+            if (t.lvl <= 1 && needReplaceInner != FALSE)
+                row = lower;
+
+            return super.releaseForRetry(t);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected Result finish(Result res) {
+            if (isDone())
+                return super.finish(res);
+
+            assert tail == null;
+
+            // Continue operation - restart from the root.
+            row = lower;
+            needReplaceInner = FALSE;
+            needMergeEmptyBranch = FALSE;
+            rmvd = null;
+
+            // Reset retries counter.
+            lockRetriesCnt = getLockRetries();
+
+            return RETRY;
+        }
+    }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
index 391d6b402a4..0c4a3adb32b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
@@ -383,6 +383,19 @@ public class BPlusTreeSelfTest extends 
GridCommonAbstractTest {
         doTestPutRemove(true);
     }
 
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    @Test
+    public void testPutRemove_1_20_mm2_1() throws IgniteCheckedException {
+        MAX_PER_PAGE = 1;
+        CNT = 20;
+        PUT_INC = -1;
+        RMV_INC = -2;
+
+        doTestPutRemove(true);
+    }
+
     /**
      * @throws IgniteCheckedException If failed.
      */
@@ -409,6 +422,19 @@ public class BPlusTreeSelfTest extends 
GridCommonAbstractTest {
         doTestPutRemove(true);
     }
 
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    @Test
+    public void testPutRemove_1_20_pm2_1() throws IgniteCheckedException {
+        MAX_PER_PAGE = 1;
+        CNT = 20;
+        PUT_INC = 1;
+        RMV_INC = -2;
+
+        doTestPutRemove(true);
+    }
+
     /**
      * @throws IgniteCheckedException If failed.
      */
@@ -448,6 +474,19 @@ public class BPlusTreeSelfTest extends 
GridCommonAbstractTest {
         doTestPutRemove(false);
     }
 
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    @Test
+    public void testPutRemove_1_20_pp2_1() throws IgniteCheckedException {
+        MAX_PER_PAGE = 1;
+        CNT = 20;
+        PUT_INC = 1;
+        RMV_INC = 2;
+
+        doTestPutRemove(true);
+    }
+
     /**
      * @throws IgniteCheckedException If failed.
      */
@@ -461,6 +500,19 @@ public class BPlusTreeSelfTest extends 
GridCommonAbstractTest {
         doTestPutRemove(true);
     }
 
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    @Test
+    public void testPutRemove_1_20_mp2_1() throws IgniteCheckedException {
+        MAX_PER_PAGE = 1;
+        CNT = 20;
+        PUT_INC = -1;
+        RMV_INC = 2;
+
+        doTestPutRemove(true);
+    }
+
     /**
      * @throws IgniteCheckedException If failed.
      */
@@ -488,6 +540,19 @@ public class BPlusTreeSelfTest extends 
GridCommonAbstractTest {
         doTestPutRemove(true);
     }
 
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    @Test
+    public void testPutRemove_2_40_mm2_1() throws IgniteCheckedException {
+        MAX_PER_PAGE = 2;
+        CNT = 40;
+        PUT_INC = -1;
+        RMV_INC = -2;
+
+        doTestPutRemove(true);
+    }
+
     /**
      * @throws IgniteCheckedException If failed.
      */
@@ -514,6 +579,19 @@ public class BPlusTreeSelfTest extends 
GridCommonAbstractTest {
         doTestPutRemove(true);
     }
 
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    @Test
+    public void testPutRemove_2_40_pm2_1() throws IgniteCheckedException {
+        MAX_PER_PAGE = 2;
+        CNT = 40;
+        PUT_INC = 1;
+        RMV_INC = -2;
+
+        doTestPutRemove(true);
+    }
+
     /**
      * @throws IgniteCheckedException If failed.
      */
@@ -540,6 +618,19 @@ public class BPlusTreeSelfTest extends 
GridCommonAbstractTest {
         doTestPutRemove(true);
     }
 
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    @Test
+    public void testPutRemove_2_40_pp2_1() throws IgniteCheckedException {
+        MAX_PER_PAGE = 2;
+        CNT = 40;
+        PUT_INC = 1;
+        RMV_INC = 1;
+
+        doTestPutRemove(true);
+    }
+
     /**
      * @throws IgniteCheckedException If failed.
      */
@@ -566,6 +657,19 @@ public class BPlusTreeSelfTest extends 
GridCommonAbstractTest {
         doTestPutRemove(true);
     }
 
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    @Test
+    public void testPutRemove_2_40_mp2_1() throws IgniteCheckedException {
+        MAX_PER_PAGE = 2;
+        CNT = 40;
+        PUT_INC = -1;
+        RMV_INC = 1;
+
+        doTestPutRemove(true);
+    }
+
     /**
      * @throws IgniteCheckedException If failed.
      */
@@ -593,6 +697,19 @@ public class BPlusTreeSelfTest extends 
GridCommonAbstractTest {
         doTestPutRemove(true);
     }
 
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    @Test
+    public void testPutRemove_3_60_mm2_1() throws IgniteCheckedException {
+        MAX_PER_PAGE = 3;
+        CNT = 60;
+        PUT_INC = -1;
+        RMV_INC = -2;
+
+        doTestPutRemove(true);
+    }
+
     /**
      * @throws IgniteCheckedException If failed.
      */
@@ -619,6 +736,19 @@ public class BPlusTreeSelfTest extends 
GridCommonAbstractTest {
         doTestPutRemove(true);
     }
 
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    @Test
+    public void testPutRemove_3_60_pm2_1() throws IgniteCheckedException {
+        MAX_PER_PAGE = 3;
+        CNT = 60;
+        PUT_INC = 1;
+        RMV_INC = -2;
+
+        doTestPutRemove(true);
+    }
+
     /**
      * @throws IgniteCheckedException If failed.
      */
@@ -645,6 +775,19 @@ public class BPlusTreeSelfTest extends 
GridCommonAbstractTest {
         doTestPutRemove(true);
     }
 
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    @Test
+    public void testPutRemove_3_60_pp2_1() throws IgniteCheckedException {
+        MAX_PER_PAGE = 3;
+        CNT = 60;
+        PUT_INC = 1;
+        RMV_INC = 2;
+
+        doTestPutRemove(true);
+    }
+
     /**
      * @throws IgniteCheckedException If failed.
      */
@@ -671,6 +814,19 @@ public class BPlusTreeSelfTest extends 
GridCommonAbstractTest {
         doTestPutRemove(true);
     }
 
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    @Test
+    public void testPutRemove_3_60_mp2_1() throws IgniteCheckedException {
+        MAX_PER_PAGE = 3;
+        CNT = 60;
+        PUT_INC = -1;
+        RMV_INC = 2;
+
+        doTestPutRemove(true);
+    }
+
     /**
      * @throws IgniteCheckedException If failed.
      */
@@ -726,10 +882,22 @@ public class BPlusTreeSelfTest extends 
GridCommonAbstractTest {
         assertNull(tree.findOne(cnt));
         checkIterate(tree, cnt, cnt, null, false);
 
+        boolean rmvRange = U.safeAbs(RMV_INC) > 1;
+
         for (long x = RMV_INC > 0 ? 0 : cnt - 1; x >= 0 && x < cnt; x += 
RMV_INC) {
             X.println(" -- " + x);
 
-            assertEquals(Long.valueOf(x), tree.remove(x));
+            long x2 = rmvRange ? x + (RMV_INC / 2) : x;
+
+            if (rmvRange) {
+                List<Long> res = tree.remove(Math.min(x, x2), Math.max(x, x2), 
2);
+
+                assertEquals(2, res.size());
+                assertTrue(res.contains(x));
+                assertTrue(res.contains(x2));
+            }
+            else
+                assertEquals(Long.valueOf(x), tree.remove(x));
 
             assertNoLocks();
 
@@ -738,7 +906,8 @@ public class BPlusTreeSelfTest extends 
GridCommonAbstractTest {
             assertNoLocks();
 
             assertNull(tree.findOne(x));
-            checkIterate(tree, x, x, null, false);
+            assertNull(tree.findOne(x2));
+            checkIterate(tree, x, x2, null, false);
 
             assertNoLocks();
 
@@ -747,7 +916,7 @@ public class BPlusTreeSelfTest extends 
GridCommonAbstractTest {
             assertNoLocks();
         }
 
-        assertFalse(tree.find(null, null).next());
+        assertFalse(tree.printTree(), tree.find(null, null).next());
         assertEquals(0, tree.size());
         assertEquals(0, tree.rootLevel());
 
@@ -953,6 +1122,16 @@ public class BPlusTreeSelfTest extends 
GridCommonAbstractTest {
         doTestMassiveRemove(true);
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testMassiveRemove3_true_range() throws Exception {
+        MAX_PER_PAGE = 3;
+
+        doTestMassiveRemove(true, 5);
+    }
+
     /**
      * @throws Exception If failed.
      */
@@ -973,6 +1152,16 @@ public class BPlusTreeSelfTest extends 
GridCommonAbstractTest {
         doTestMassiveRemove(true);
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testMassiveRemove2_true_range() throws Exception {
+        MAX_PER_PAGE = 2;
+
+        doTestMassiveRemove(true, 5);
+    }
+
     /**
      * @throws Exception If failed.
      */
@@ -993,22 +1182,45 @@ public class BPlusTreeSelfTest extends 
GridCommonAbstractTest {
         doTestMassiveRemove(true);
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testMassiveRemove1_true_range() throws Exception {
+        MAX_PER_PAGE = 1;
+
+        doTestMassiveRemove(true, 5);
+    }
+
     /**
      * @param canGetRow Can get row in inner page.
      * @throws Exception If failed.
      */
     private void doTestMassiveRemove(final boolean canGetRow) throws Exception 
{
+        doTestMassiveRemove(canGetRow, 1);
+    }
+
+    /**
+     * @param canGetRow Can get row in inner page.
+     * @param batchSize Batch size.
+     * @throws Exception If failed.
+     */
+    private void doTestMassiveRemove(final boolean canGetRow, int batchSize) 
throws Exception {
         final int threads = 64;
         final int keys = 3000;
 
-        final AtomicLongArray rmvd = new AtomicLongArray(keys);
+        final AtomicLongArray rmvd = new AtomicLongArray(keys / batchSize);
 
         final TestTree tree = createTestTree(canGetRow);
 
+        final List<Long> expKeys = batchSize == 1 ? null : new 
ArrayList<>(keys);
+
         // Put keys in reverse order to have a better balance in the tree 
(lower height).
         for (long i = keys - 1; i >= 0; i--) {
             tree.put(i);
-//            X.println(tree.printTree());
+
+            if (expKeys != null)
+                expKeys.add(keys - i - 1);
         }
 
         assertEquals(keys, tree.size());
@@ -1025,9 +1237,10 @@ public class BPlusTreeSelfTest extends 
GridCommonAbstractTest {
                     for (;;) {
                         int idx = 0;
                         boolean found = false;
+                        int batchesCnt = keys / batchSize;
 
-                        for (int i = 0, shift = rnd.nextInt(keys); i < keys; 
i++) {
-                            idx = (i + shift) % keys;
+                        for (int i = 0, shift = rnd.nextInt(batchesCnt); i < 
batchesCnt; i++) {
+                            idx = (i + shift) % batchesCnt;
 
                             if (rmvd.get(idx) == 0 && rmvd.compareAndSet(idx, 
0, 1)) {
                                 found = true;
@@ -1039,10 +1252,24 @@ public class BPlusTreeSelfTest extends 
GridCommonAbstractTest {
                         if (!found)
                             break;
 
-                        assertEquals(Long.valueOf(idx), 
tree.remove((long)idx));
+                        Collection<Long> rmvdIds0;
+
+                        if (batchSize == 1) {
+                            Long rmvId = tree.remove((long)idx);
+                            rmvdIds0 = Collections.singleton(rmvId);
+                            assertEquals(Long.valueOf(idx), rmvId);
+                        }
+                        else {
+                            long startIdx = (long)idx * batchSize;
+
+                            // Ensure that we remove from left to right.
+                            rmvdIds0 = tree.remove(startIdx, Long.MAX_VALUE, 
batchSize);
+
+                            assertEquals(expKeys.subList((int)startIdx, 
(int)startIdx + batchSize), rmvdIds0);
+                        }
 
                         if (canGetRow)
-                            rmvdIds.add((long)idx);
+                            rmvdIds.addAll(rmvdIds0);
                     }
 
                     return null;
@@ -1174,6 +1401,75 @@ public class BPlusTreeSelfTest extends 
GridCommonAbstractTest {
         assertNoLocks();
     }
 
+    /** */
+    @Test
+    public void testBasicBatchRemove() throws IgniteCheckedException {
+        BPlusTree<Long, Long> tree = createTestTree(true);
+
+        assertEquals(0, tree.size());
+
+        int batchSize = 5;
+        int keysCnt = 10_000;
+        int startKey = 1_000;
+        int endKey = startKey + keysCnt;
+        List<Long> expList = new ArrayList<>();
+
+        for (long i = startKey; i < endKey; i++) {
+            tree.put(i);
+            expList.add(i);
+        }
+
+        for (long i = 0; i < (endKey / batchSize); i++) {
+            long startIdx = i * batchSize;
+            long endIdx = startIdx + batchSize - 1;
+            int op = (int)(i % 4);
+
+            List<Long> res;
+
+            switch (op) {
+                case 0:
+                    res = tree.remove(startIdx, endIdx, 0);
+
+                    break;
+                case 1:
+                    res = tree.remove(startIdx, endIdx, batchSize);
+
+                    break;
+                case 2:
+                    if (startIdx >= startKey) {
+                        res = tree.remove(startIdx, Long.MAX_VALUE, batchSize);
+
+                        break;
+                    }
+                case 3:
+                    res = tree.remove(Long.MIN_VALUE, endIdx, batchSize);
+
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unknown operation: " + 
op);
+            }
+
+            List<Long> expSubList = startIdx < startKey ?
+                Collections.emptyList() : expList.subList((int)startIdx - 
startKey, (int)endIdx - startKey + 1);
+
+            assertEquals("idx=" + startIdx + ", op=" + op, expSubList, res);
+        }
+
+        assertNoLocks();
+        assertEquals(0, tree.size());
+
+        // Check an empty range between the bounds.
+        tree.put(0L);
+        tree.put(10L);
+
+        assertTrue(tree.remove(1L, 9L, 0).isEmpty());
+        assertEquals(Collections.singletonList(0L), 
tree.remove(Long.MIN_VALUE, Long.MAX_VALUE, 1));
+        assertEquals(Collections.singletonList(10L), 
tree.remove(Long.MIN_VALUE, Long.MAX_VALUE, 1));
+
+        assertNoLocks();
+        assertEquals(0, tree.size());
+    }
+
     /**
      * @param canGetRow Can get row from inner page.
      * @throws IgniteCheckedException If failed.
@@ -2273,7 +2569,7 @@ public class BPlusTreeSelfTest extends 
GridCommonAbstractTest {
      */
     @Test
     public void testIterateConcurrentPutRemove() throws Exception {
-        iterateConcurrentPutRemove();
+        iterateConcurrentPutRemove(false);
     }
 
     /**
@@ -2283,7 +2579,7 @@ public class BPlusTreeSelfTest extends 
GridCommonAbstractTest {
     public void testIterateConcurrentPutRemove_1() throws Exception {
         MAX_PER_PAGE = 1;
 
-        iterateConcurrentPutRemove();
+        iterateConcurrentPutRemove(false);
     }
 
     /**
@@ -2293,23 +2589,62 @@ public class BPlusTreeSelfTest extends 
GridCommonAbstractTest {
     public void testIterateConcurrentPutRemove_2() throws Exception {
         MAX_PER_PAGE = 2;
 
-        iterateConcurrentPutRemove();
+        iterateConcurrentPutRemove(false);
     }
 
     /**
      * @throws Exception If failed.
      */
     @Test
-    public void testIteratePutRemove_10() throws Exception {
+    public void testIterateConcurrentPutRemove_10() throws Exception {
         MAX_PER_PAGE = 10;
 
-        iterateConcurrentPutRemove();
+        iterateConcurrentPutRemove(false);
     }
 
     /**
      * @throws Exception If failed.
      */
-    private void iterateConcurrentPutRemove() throws Exception {
+    @Test
+    public void testIterateConcurrentPutRemoveRange() throws Exception {
+        iterateConcurrentPutRemove(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testIterateConcurrentPutRemoveRange_1() throws Exception {
+        MAX_PER_PAGE = 1;
+
+        iterateConcurrentPutRemove(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testIterateConcurrentPutRemoveRange_2() throws Exception {
+        MAX_PER_PAGE = 2;
+
+        iterateConcurrentPutRemove(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testIterateConcurrentPutRemoveRange_10() throws Exception {
+        MAX_PER_PAGE = 10;
+
+        iterateConcurrentPutRemove(true);
+    }
+
+    /**
+     * @param rmvRange Remove multiple values with the {@link 
BPlusTree#remove} operation.
+     * @throws Exception If failed.
+     */
+    private void iterateConcurrentPutRemove(boolean rmvRange) throws Exception 
{
         final TestTree tree = createTestTree(true);
 
         // Single key per page is a degenerate case: it is very hard to merge 
pages in a tree because
@@ -2423,17 +2758,25 @@ public class BPlusTreeSelfTest extends 
GridCommonAbstractTest {
                 U.sleep(100);
 
                 for (int j = 0; j < 20; j++) {
-                    for (long idx = 0L; idx < KEYS / 2; ++idx) {
+                    int keysPerOp = rmvRange ? 2 : 1;
+
+                    for (long idx = 0L; idx < KEYS / (2 * keysPerOp); ++idx) {
                         long toRmv = rnd.nextLong(KEYS);
 
-                        if (toRmv != findKey)
+                        if (toRmv == findKey)
+                            continue;
+
+                        if (rmvRange && toRmv + 1 != findKey)
+                            assertTrue(tree.remove(toRmv, toRmv + 1, 
keysPerOp).size() <= keysPerOp);
+                        else
                             tree.remove(toRmv);
                     }
 
                     for (long idx = 0L; idx < KEYS / 2; ++idx) {
                         long put = rnd.nextLong(KEYS);
 
-                        tree.put(put);
+                        if (put != findKey)
+                            tree.put(put);
                     }
                 }
             }

Reply via email to