This is an automated email from the ASF dual-hosted git repository.
ppa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 045f8ebc8a3 IGNITE-14341 Reduced contention in the PendingEntriesTree
when clearing expired entries (#9992)
045f8ebc8a3 is described below
commit 045f8ebc8a3650f6b5a5138740acf92e0911a21f
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Tue May 31 20:24:21 2022 +0300
IGNITE-14341 Reduced contention in the PendingEntriesTree when clearing
expired entries (#9992)
---
.../jmh/cache/JmhCacheExpireBenchmark.java | 130 +++++++
.../cache/IgniteCacheOffheapManagerImpl.java | 39 +--
.../cache/persistence/tree/BPlusTree.java | 333 ++++++++++++++++--
.../processors/database/BPlusTreeSelfTest.java | 379 ++++++++++++++++++++-
4 files changed, 801 insertions(+), 80 deletions(-)
diff --git
a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/cache/JmhCacheExpireBenchmark.java
b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/cache/JmhCacheExpireBenchmark.java
new file mode 100644
index 00000000000..4c25bbc12b4
--- /dev/null
+++
b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/cache/JmhCacheExpireBenchmark.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmarks.jmh.cache;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/**
+ * Compare put with expiry policy and without expiry policy.
+ */
+@State(Scope.Benchmark)
+@Fork(1)
+@Threads(16)
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Warmup(iterations = 3, time = 3)
+@Measurement(iterations = 3, time = 10)
+public class JmhCacheExpireBenchmark {
+ /** Items count. */
+ private static final int CNT = 100000;
+
+ /** Ignite. */
+ private Ignite ignite;
+
+ /** Cache without expire policy. */
+ private IgniteCache<Integer, Integer> cacheReg;
+
+ /** Cache with expire policy. */
+ private IgniteCache<Integer, Integer> cacheExp;
+
+ /** */
+ @Benchmark
+ public void putWithExpire() {
+ int key = ThreadLocalRandom.current().nextInt(CNT);
+
+ cacheExp.put(key, key);
+ }
+
+ /** */
+ @Benchmark
+ public void putWithoutExpire() {
+ int key = ThreadLocalRandom.current().nextInt(CNT);
+
+ cacheReg.put(key, key);
+ }
+
+ /**
+ * Initiate Ignite and caches.
+ */
+ @Setup(Level.Trial)
+ public void setup() {
+ ignite = Ignition.start(new
IgniteConfiguration().setIgniteInstanceName("test"));
+
+ cacheReg = ignite.getOrCreateCache(new
CacheConfiguration<>("CACHE_REG"));
+
+ cacheExp = ignite.getOrCreateCache(
+ new CacheConfiguration<Integer, Integer>("CACHE_EXP")
+ .setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new
Duration(TimeUnit.MILLISECONDS, 1)))
+ );
+ }
+
+ /**
+ * Clear caches.
+ */
+ @Setup(Level.Iteration)
+ public void setupIteration() {
+ cacheReg.clear();
+ cacheExp.clear();
+ }
+
+ /**
+ * Stop Ignite instance.
+ */
+ @TearDown
+ public void tearDown() {
+ ignite.close();
+ }
+
+ /**
+ * Run benchmarks.
+ *
+ * @param args Args.
+ * @throws Exception Exception.
+ */
+ public static void main(String[] args) throws Exception {
+ final Options options = new OptionsBuilder()
+ .include(JmhCacheExpireBenchmark.class.getSimpleName())
+ .build();
+
+ new Runner(options).run();
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index afe0d02b3c5..33f18528b21 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1363,55 +1363,36 @@ public class IgniteCacheOffheapManagerImpl implements
IgniteCacheOffheapManager
IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c,
int amount
) throws IgniteCheckedException {
- long now = U.currentTimeMillis();
-
GridCacheVersion obsoleteVer = null;
- GridCursor<PendingRow> cur;
-
cctx.shared().database().checkpointReadLock();
try {
- if (grp.sharedGroup())
- cur = pendingEntries.find(new PendingRow(cctx.cacheId()), new
PendingRow(cctx.cacheId(), now, 0));
- else
- cur = pendingEntries.find(null, new
PendingRow(CU.UNDEFINED_CACHE_ID, now, 0));
-
- if (!cur.next())
- return 0;
+ int cacheId = grp.sharedGroup() ? cctx.cacheId() :
CU.UNDEFINED_CACHE_ID;
if (!busyLock.enterBusy())
return 0;
try {
- int cleared = 0;
-
- do {
- if (amount != -1 && cleared > amount)
- return cleared;
-
- PendingRow row = cur.get();
+ List<PendingRow> rows = pendingEntries.remove(
+ new PendingRow(cacheId, Long.MIN_VALUE, 0), new
PendingRow(cacheId, U.currentTimeMillis(), 0), amount);
+ for (PendingRow row : rows) {
if (row.key.partition() == -1)
row.key.partition(cctx.affinity().partition(row.key));
assert row.key != null && row.link != 0 && row.expireTime
!= 0 : row;
- if (pendingEntries.removex(row)) {
- if (obsoleteVer == null)
- obsoleteVer = cctx.cache().nextVersion();
-
- GridCacheEntryEx entry = cctx.cache().entryEx(row.key);
+ if (obsoleteVer == null)
+ obsoleteVer = cctx.cache().nextVersion();
- if (entry != null)
- c.apply(entry, obsoleteVer);
- }
+ GridCacheEntryEx entry = cctx.cache().entryEx(row.key);
- cleared++;
+ if (entry != null)
+ c.apply(entry, obsoleteVer);
}
- while (cur.next());
- return cleared;
+ return rows.size();
}
finally {
busyLock.leaveBusy();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
index 5a7cc63c99e..69ae6a17171 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
@@ -532,9 +532,9 @@ public abstract class BPlusTree<L, T extends L> extends
DataStructure implements
/**
*
*/
- private class RemoveFromLeaf extends GetPageHandler<Remove> {
+ private class RemoveFromLeaf<R extends Remove> extends GetPageHandler<R> {
/** {@inheritDoc} */
- @Override public Result run0(long leafId, long leafPage, long
leafAddr, BPlusIO<L> io, Remove r, int lvl)
+ @Override public Result run0(long leafId, long leafPage, long
leafAddr, BPlusIO<L> io, R r, int lvl)
throws IgniteCheckedException {
assert lvl == 0 : lvl; // Leaf.
@@ -551,18 +551,42 @@ public abstract class BPlusTree<L, T extends L> extends
DataStructure implements
if (idx < 0)
return RETRY; // We've found exact match on search but now
it's gone.
+ return doRemoveOrLockTail(idx, cnt, 1, leafId, leafPage, leafAddr,
io, r);
+ }
+
+ /**
+ * @param idx Insertion index.
+ * @param cnt Row count.
+ * @param rmvCnt Number of rows to remove.
+ * @param leafId Leaf page ID.
+ * @param leafPage Leaf page pointer.
+ * @param leafAddr Leaf page address.
+ * @param io IO.
+ * @param r Remove operation.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected Result doRemoveOrLockTail(
+ int idx,
+ int cnt,
+ int rmvCnt,
+ long leafId,
+ long leafPage,
+ long leafAddr,
+ BPlusIO<L> io,
+ R r
+ ) throws IgniteCheckedException {
assert idx >= 0 && idx < cnt : idx;
// Need to do inner replace when we remove the rightmost element
and the leaf have no forward page,
// i.e. it is not the rightmost leaf of the tree.
- boolean needReplaceInner = canGetRowFromInner && idx == cnt - 1 &&
io.getForward(leafAddr) != 0;
+ boolean needReplaceInner = canGetRowFromInner && idx == cnt -
rmvCnt && io.getForward(leafAddr) != 0;
// !!! Before modifying state we have to make sure that we will
not go for retry.
// We may need to replace inner key or want to merge this leaf
with sibling after the remove -> keep lock.
if (needReplaceInner ||
// We need to make sure that we have back or forward to be
able to merge.
- ((r.fwdId != 0 || r.backId != 0) && mayMerge(cnt - 1,
io.getMaxCount(leafAddr, pageSize())))) {
+ ((r.fwdId != 0 || r.backId != 0) && mayMerge(cnt - rmvCnt,
io.getMaxCount(leafAddr, pageSize())))) {
// If we have backId then we've already locked back page,
nothing to do here.
if (r.fwdId != 0 && r.backId == 0) {
Result res = r.lockForward(0);
@@ -580,7 +604,7 @@ public abstract class BPlusTree<L, T extends L> extends
DataStructure implements
assert r.needReplaceInner == FALSE : "needReplaceInner";
assert r.needMergeEmptyBranch == FALSE :
"needMergeEmptyBranch";
- if (cnt == 1) // It was the last element on the leaf.
+ if (cnt == rmvCnt) // It was the last element on the leaf.
r.needMergeEmptyBranch = TRUE;
if (needReplaceInner)
@@ -601,6 +625,58 @@ public abstract class BPlusTree<L, T extends L> extends
DataStructure implements
}
}
+ /** */
+ private final PageHandler<Remove, Result> rmvRangeFromLeaf;
+
+ /**
+ *
+ */
+ private class RemoveRangeFromLeaf extends RemoveFromLeaf<RemoveRange> {
+ /** {@inheritDoc} */
+ @Override public Result run0(long leafId, long leafPage, long
leafAddr, BPlusIO<L> io, RemoveRange r, int lvl)
+ throws IgniteCheckedException {
+ assert lvl == 0 : lvl; // Leaf.
+
+ // Check the triangle invariant.
+ if (io.getForward(leafAddr) != r.fwdId)
+ return RETRY;
+
+ final int cnt = io.getCount(leafAddr);
+
+ assert cnt <= Short.MAX_VALUE : cnt;
+
+ int idx = findInsertionPoint(lvl, io, leafAddr, 0, cnt, r.lower,
0);
+
+ if (idx < 0) {
+ idx = fix(idx);
+
+ // Before the page was locked, its state could have changed,
so you need to make sure that
+ // it has elements from the range, otherwise repeat the search.
+ if (idx == cnt || compare(io, leafAddr, idx, r.upper) > 0)
+ return RETRY;
+ }
+
+ r.highIdx = findInsertionPoint(lvl, io, leafAddr, idx, cnt,
r.upper, 0);
+
+ int highIdx = r.highIdx >= 0 ? r.highIdx : fix(r.highIdx) - 1;
+
+ if (r.remaining != -1 && highIdx - idx + 1 >= r.remaining)
+ highIdx = idx + r.remaining - 1;
+
+ assert highIdx >= idx : "low=" + idx + ", high=" + highIdx;
+
+ r.highIdx = r.highIdx >= 0 ? highIdx : -highIdx - 1;
+
+ Result res = doRemoveOrLockTail(idx, cnt, highIdx - idx + 1,
leafId, leafPage, leafAddr, io, r);
+
+ // Search row should point to the rightmost element, otherwise we
won't find it on the inner node.
+ if (res == FOUND && r.needReplaceInner == TRUE)
+ r.row = getRow(io, leafAddr, highIdx);
+
+ return res;
+ }
+ }
+
/** */
private final PageHandler<Remove, Result> lockBackAndRmvFromLeaf;
@@ -915,6 +991,7 @@ public abstract class BPlusTree<L, T extends L> extends
DataStructure implements
rmvFromLeaf = (PageHandler<Remove, Result>)wrap(this, new
RemoveFromLeaf());
insert = (PageHandler<Put, Result>)wrap(this, new Insert());
replace = (PageHandler<Put, Result>)wrap(this, new Replace());
+ rmvRangeFromLeaf = (PageHandler<Remove, Result>)wrap(this, new
RemoveRangeFromLeaf());
}
/**
@@ -1989,7 +2066,7 @@ public abstract class BPlusTree<L, T extends L> extends
DataStructure implements
* @throws IgniteCheckedException If failed.
*/
@Override public final T remove(L row) throws IgniteCheckedException {
- return doRemove(row, true);
+ return doRemove(new Remove(row, true));
}
/**
@@ -1998,11 +2075,33 @@ public abstract class BPlusTree<L, T extends L> extends
DataStructure implements
* @return {@code True} if removed row.
*/
public final boolean removex(L row) throws IgniteCheckedException {
- Boolean res = (Boolean)doRemove(row, false);
+ Boolean res = (Boolean)doRemove(new Remove(row, false));
return res != null ? res : false;
}
+ /**
+ * @param lower Lower bound (inclusive).
+ * @param upper Upper bound (inclusive).
+ * @param limit Limit of processed entries by single call, {@code 0} for
no limit.
+ * @return Removed rows.
+ * @throws IgniteCheckedException If failed.
+ */
+ public List<L> remove(L lower, L upper, int limit) throws
IgniteCheckedException {
+ // We may not find the lower bound if the inner node
+ // contain a key that is not present on the leaf page.
+ assert canGetRowFromInner : "Not supported";
+ assert limit >= 0 : limit;
+
+ RemoveRange rmvOp = new RemoveRange(lower, upper, true, limit);
+
+ doRemove(rmvOp);
+
+ assert rmvOp.isDone();
+
+ return Collections.unmodifiableList(rmvOp.removedRows);
+ }
+
/** {@inheritDoc} */
@Override public void invoke(L row, Object z, InvokeClosure<T> c) throws
IgniteCheckedException {
checkDestroyed();
@@ -2155,17 +2254,15 @@ public abstract class BPlusTree<L, T extends L> extends
DataStructure implements
}
/**
- * @param row Lookup row.
- * @param needOld {@code True} if need return removed row.
- * @return Removed row.
+ * @return r Remove operation.
* @throws IgniteCheckedException If failed.
*/
- private T doRemove(L row, boolean needOld) throws IgniteCheckedException {
+ private T doRemove(Remove r) throws IgniteCheckedException {
assert !sequentialWriteOptsEnabled;
- checkDestroyed();
+ L row = r.row;
- Remove r = new Remove(row, needOld);
+ checkDestroyed();
try {
for (;;) {
@@ -2282,9 +2379,10 @@ public abstract class BPlusTree<L, T extends L> extends
DataStructure implements
// We are at the bottom.
assert lvl == 0 : lvl;
- r.finish();
+ if (!r.ceil())
+ return r.finish(res);
- return res;
+ // Intentional fallthrough to remove something from
this page.
case FOUND:
return r.tryRemoveFromLeaf(pageId, page, backId,
fwdId, lvl);
@@ -4250,9 +4348,7 @@ public abstract class BPlusTree<L, T extends L> extends
DataStructure implements
if (isRemove()) {
assert lvl == 0;
- ((Remove)op).finish();
-
- return NOT_FOUND;
+ return ((Remove)op).finish(NOT_FOUND);
}
return ((Put)op).tryInsert(pageId, page, fwdId, lvl);
@@ -4534,7 +4630,7 @@ public abstract class BPlusTree<L, T extends L> extends
DataStructure implements
/**
* Remove operation.
*/
- public final class Remove extends Update implements ReuseBag {
+ public class Remove extends Update implements ReuseBag {
/** */
Bool needReplaceInner = FALSE;
@@ -4553,14 +4649,28 @@ public abstract class BPlusTree<L, T extends L> extends
DataStructure implements
/** */
final boolean needOld;
+ /** */
+ final PageHandler<Remove, Result> rmvFromLeafHnd;
+
/**
* @param row Row.
* @param needOld {@code True} If need return old value.
*/
private Remove(L row, boolean needOld) {
+ this(row, needOld, rmvFromLeaf);
+ }
+
+ /**
+ * @param row Row.
+ * @param needOld {@code True} If need return old value.
+ * @param rmvFromLeaf Remove from leaf page handler.
+ */
+ private Remove(L row, boolean needOld, PageHandler<Remove, Result>
rmvFromLeaf) {
super(row);
this.needOld = needOld;
+
+ rmvFromLeafHnd = rmvFromLeaf;
}
/** {@inheritDoc} */
@@ -4618,7 +4728,7 @@ public abstract class BPlusTree<L, T extends L> extends
DataStructure implements
}
/** {@inheritDoc} */
- @Override boolean notFound(BPlusIO<L> io, long pageAddr, int idx, int
lvl) {
+ @Override boolean notFound(BPlusIO<L> io, long pageAddr, int idx, int
lvl) throws IgniteCheckedException {
if (lvl == 0) {
assert tail == null;
@@ -4628,13 +4738,23 @@ public abstract class BPlusTree<L, T extends L> extends
DataStructure implements
return false;
}
+ /**
+ * @return Flag indicating that values are removed using an interval
+ * (i.e. {@link #row} specifies the start of the interval, not an
exact match).
+ */
+ protected boolean ceil() {
+ return false;
+ }
+
/**
* Finish the operation.
*/
- private void finish() {
+ protected Result finish(Result res) {
assert tail == null;
row = null;
+
+ return res;
}
/**
@@ -4716,7 +4836,7 @@ public abstract class BPlusTree<L, T extends L> extends
DataStructure implements
/**
* @return {@code true} If already removed from leaf.
*/
- private boolean isRemoved() {
+ protected boolean isRemoved() {
return rmvd != null;
}
@@ -4724,7 +4844,7 @@ public abstract class BPlusTree<L, T extends L> extends
DataStructure implements
* @param t Tail to release.
* @return {@code true} If we need to retry or {@code false} to exit.
*/
- private boolean releaseForRetry(Tail<L> t) {
+ protected boolean releaseForRetry(Tail<L> t) {
// Try to simply release all first.
if (t.lvl <= 1) {
// We've just locked leaf and did not do the remove, can
safely release all and retry.
@@ -4859,9 +4979,8 @@ public abstract class BPlusTree<L, T extends L> extends
DataStructure implements
assert isRemoved();
releaseTail();
- finish();
- return FOUND;
+ return finish(FOUND);
}
/**
@@ -4911,10 +5030,10 @@ public abstract class BPlusTree<L, T extends L> extends
DataStructure implements
* @return Result code.
* @throws IgniteCheckedException If failed.
*/
- private Result doRemoveFromLeaf() throws IgniteCheckedException {
+ protected Result doRemoveFromLeaf() throws IgniteCheckedException {
assert page != 0L;
- return write(pageId, page, rmvFromLeaf, this, 0, RETRY,
statisticsHolder());
+ return write(pageId, page, rmvFromLeafHnd, this, 0, RETRY,
statisticsHolder());
}
/**
@@ -4966,7 +5085,7 @@ public abstract class BPlusTree<L, T extends L> extends
DataStructure implements
* @return Result code.
* @throws IgniteCheckedException If failed.
*/
- private Result lockForward(int lvl) throws IgniteCheckedException {
+ protected Result lockForward(int lvl) throws IgniteCheckedException {
assert fwdId != 0 : fwdId;
assert backId == 0 : backId;
@@ -4993,9 +5112,15 @@ public abstract class BPlusTree<L, T extends L> extends
DataStructure implements
* @param idx Index to remove.
* @throws IgniteCheckedException If failed.
*/
- private void removeDataRowFromLeaf(long pageId, long page, long
pageAddr, Boolean walPlc, BPlusIO<L> io, int cnt,
- int idx)
- throws IgniteCheckedException {
+ protected void removeDataRowFromLeaf(
+ long pageId,
+ long page,
+ long pageAddr,
+ Boolean walPlc,
+ BPlusIO<L> io,
+ int cnt,
+ int idx
+ ) throws IgniteCheckedException {
assert idx >= 0 && idx < cnt : idx;
assert io.isLeaf() : "inner";
assert !isRemoved() : "already removed";
@@ -5020,7 +5145,7 @@ public abstract class BPlusTree<L, T extends L> extends
DataStructure implements
* @param idx Index to remove.
* @throws IgniteCheckedException If failed.
*/
- private void doRemove(long pageId, long page, long pageAddr, Boolean
walPlc, BPlusIO<L> io, int cnt,
+ protected void doRemove(long pageId, long page, long pageAddr, Boolean
walPlc, BPlusIO<L> io, int cnt,
int idx)
throws IgniteCheckedException {
assert cnt > 0 : cnt;
@@ -5342,7 +5467,7 @@ public abstract class BPlusTree<L, T extends L> extends
DataStructure implements
Result res = removeFromLeaf(pageId, page, backId, fwdId);
if (res == FOUND && tail == null) // Finish if we don't need to do
any merges.
- finish();
+ return finish(res);
return res;
}
@@ -6347,4 +6472,146 @@ public abstract class BPlusTree<L, T extends L> extends
DataStructure implements
getClass().getSimpleName() + " [grpName=" + grpName + ",
treeName=" + name() + ", metaPageId=" +
U.hexLong(metaPageId) + "].";
}
+
+ /**
+ * The operation of deleting a range of values.
+ * <p>
+ * Performs the removal of several elements from the leaf at once.
+ */
+ protected class RemoveRange extends Remove {
+ /** Upper bound. */
+ private final L upper;
+
+ /** Lower bound. */
+ private final L lower;
+
+ /** List of removed rows. */
+ private final List<L> removedRows;
+
+ /** The number of remaining rows to remove ({@code -1}, if the limit
hasn't been specified). */
+ private int remaining;
+
+ /** Flag indicating that no more rows were found from the specified
range. */
+ private boolean completed;
+
+ /** The index of the highest row found on the page from the specified
range. */
+ private int highIdx;
+
+ /**
+ * @param lower Lower bound (inclusive).
+ * @param upper Upper bound (inclusive).
+ * @param needOld {@code True} If need return old value.
+ */
+ protected RemoveRange(L lower, L upper, boolean needOld, int limit) {
+ super(lower, needOld, rmvRangeFromLeaf);
+
+ this.lower = lower;
+ this.upper = upper;
+
+ remaining = limit <= 0 ? -1 : limit;
+ removedRows = needOld ? new ArrayList<>() : null;
+ }
+
+ /**
+ * @return {@code True} if operation is completed.
+ */
+ private boolean isDone() {
+ return completed || remaining == 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override boolean notFound(BPlusIO<L> io, long pageAddr, int idx, int
lvl) throws IgniteCheckedException {
+ if (lvl != 0)
+ return false;
+
+ assert !completed;
+ assert tail == null;
+
+ // If the lower bound is higher than the rightmost item, or if
this item is outside the given range,
+ // then the search is completed - there are no items from the
given range.
+ if (idx == io.getCount(pageAddr) || compare(io, pageAddr, idx,
upper) > 0)
+ completed = true;
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean ceil() {
+ return !completed;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void removeDataRowFromLeaf(
+ long pageId,
+ long page,
+ long pageAddr,
+ Boolean walPlc,
+ BPlusIO<L> io,
+ int cnt,
+ int idx
+ ) throws IgniteCheckedException {
+ assert io.isLeaf() : "inner";
+ assert !isRemoved() : "already removed";
+ assert remaining > 0 || remaining == -1 : remaining;
+
+ // It's just a marker that we finished with this leaf-page.
+ rmvd = (T)Boolean.TRUE;
+
+ // We had an exact match of the upper bound on this page or the
upper bound is lower than the last item.
+ if (highIdx >= 0 || (highIdx = fix(highIdx)) < cnt - 1)
+ completed = true;
+
+ assert idx >= 0 && idx <= highIdx && highIdx < cnt : "low=" + idx
+ ", high=" + highIdx + ", cnt=" + cnt;
+
+ // Delete from right to left to reduce the number of items moved
during the delete operation.
+ for (int i = highIdx; i >= idx; i--) {
+ if (needOld)
+ removedRows.add(getRow(io, pageAddr, i));
+
+ doRemove(pageId, page, pageAddr, walPlc, io, cnt - highIdx +
i, i);
+
+ if (remaining != -1)
+ --remaining;
+ }
+
+ if (needOld) {
+ // Reverse the order of added elements.
+ int len = highIdx - idx + 1;
+ int off = removedRows.size() - len;
+
+ for (int i = off, j = removedRows.size() - 1; i < j; i++, j--)
+ removedRows.set(i, removedRows.set(j, removedRows.get(i)));
+ }
+
+ assert isRemoved();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean releaseForRetry(Tail<L> t) {
+ // Reset search row to lower bound.
+ if (t.lvl <= 1 && needReplaceInner != FALSE)
+ row = lower;
+
+ return super.releaseForRetry(t);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Result finish(Result res) {
+ if (isDone())
+ return super.finish(res);
+
+ assert tail == null;
+
+ // Continue operation - restart from the root.
+ row = lower;
+ needReplaceInner = FALSE;
+ needMergeEmptyBranch = FALSE;
+ rmvd = null;
+
+ // Reset retries counter.
+ lockRetriesCnt = getLockRetries();
+
+ return RETRY;
+ }
+ }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
index 391d6b402a4..0c4a3adb32b 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
@@ -383,6 +383,19 @@ public class BPlusTreeSelfTest extends
GridCommonAbstractTest {
doTestPutRemove(true);
}
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ @Test
+ public void testPutRemove_1_20_mm2_1() throws IgniteCheckedException {
+ MAX_PER_PAGE = 1;
+ CNT = 20;
+ PUT_INC = -1;
+ RMV_INC = -2;
+
+ doTestPutRemove(true);
+ }
+
/**
* @throws IgniteCheckedException If failed.
*/
@@ -409,6 +422,19 @@ public class BPlusTreeSelfTest extends
GridCommonAbstractTest {
doTestPutRemove(true);
}
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ @Test
+ public void testPutRemove_1_20_pm2_1() throws IgniteCheckedException {
+ MAX_PER_PAGE = 1;
+ CNT = 20;
+ PUT_INC = 1;
+ RMV_INC = -2;
+
+ doTestPutRemove(true);
+ }
+
/**
* @throws IgniteCheckedException If failed.
*/
@@ -448,6 +474,19 @@ public class BPlusTreeSelfTest extends
GridCommonAbstractTest {
doTestPutRemove(false);
}
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ @Test
+ public void testPutRemove_1_20_pp2_1() throws IgniteCheckedException {
+ MAX_PER_PAGE = 1;
+ CNT = 20;
+ PUT_INC = 1;
+ RMV_INC = 2;
+
+ doTestPutRemove(true);
+ }
+
/**
* @throws IgniteCheckedException If failed.
*/
@@ -461,6 +500,19 @@ public class BPlusTreeSelfTest extends
GridCommonAbstractTest {
doTestPutRemove(true);
}
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ @Test
+ public void testPutRemove_1_20_mp2_1() throws IgniteCheckedException {
+ MAX_PER_PAGE = 1;
+ CNT = 20;
+ PUT_INC = -1;
+ RMV_INC = 2;
+
+ doTestPutRemove(true);
+ }
+
/**
* @throws IgniteCheckedException If failed.
*/
@@ -488,6 +540,19 @@ public class BPlusTreeSelfTest extends
GridCommonAbstractTest {
doTestPutRemove(true);
}
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ @Test
+ public void testPutRemove_2_40_mm2_1() throws IgniteCheckedException {
+ MAX_PER_PAGE = 2;
+ CNT = 40;
+ PUT_INC = -1;
+ RMV_INC = -2;
+
+ doTestPutRemove(true);
+ }
+
/**
* @throws IgniteCheckedException If failed.
*/
@@ -514,6 +579,19 @@ public class BPlusTreeSelfTest extends
GridCommonAbstractTest {
doTestPutRemove(true);
}
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ @Test
+ public void testPutRemove_2_40_pm2_1() throws IgniteCheckedException {
+ MAX_PER_PAGE = 2;
+ CNT = 40;
+ PUT_INC = 1;
+ RMV_INC = -2;
+
+ doTestPutRemove(true);
+ }
+
/**
* @throws IgniteCheckedException If failed.
*/
@@ -540,6 +618,19 @@ public class BPlusTreeSelfTest extends
GridCommonAbstractTest {
doTestPutRemove(true);
}
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ @Test
+ public void testPutRemove_2_40_pp2_1() throws IgniteCheckedException {
+ MAX_PER_PAGE = 2;
+ CNT = 40;
+ PUT_INC = 1;
+ RMV_INC = 1;
+
+ doTestPutRemove(true);
+ }
+
/**
* @throws IgniteCheckedException If failed.
*/
@@ -566,6 +657,19 @@ public class BPlusTreeSelfTest extends
GridCommonAbstractTest {
doTestPutRemove(true);
}
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ @Test
+ public void testPutRemove_2_40_mp2_1() throws IgniteCheckedException {
+ MAX_PER_PAGE = 2;
+ CNT = 40;
+ PUT_INC = -1;
+ RMV_INC = 1;
+
+ doTestPutRemove(true);
+ }
+
/**
* @throws IgniteCheckedException If failed.
*/
@@ -593,6 +697,19 @@ public class BPlusTreeSelfTest extends
GridCommonAbstractTest {
doTestPutRemove(true);
}
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ @Test
+ public void testPutRemove_3_60_mm2_1() throws IgniteCheckedException {
+ MAX_PER_PAGE = 3;
+ CNT = 60;
+ PUT_INC = -1;
+ RMV_INC = -2;
+
+ doTestPutRemove(true);
+ }
+
/**
* @throws IgniteCheckedException If failed.
*/
@@ -619,6 +736,19 @@ public class BPlusTreeSelfTest extends
GridCommonAbstractTest {
doTestPutRemove(true);
}
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ @Test
+ public void testPutRemove_3_60_pm2_1() throws IgniteCheckedException {
+ MAX_PER_PAGE = 3;
+ CNT = 60;
+ PUT_INC = 1;
+ RMV_INC = -2;
+
+ doTestPutRemove(true);
+ }
+
/**
* @throws IgniteCheckedException If failed.
*/
@@ -645,6 +775,19 @@ public class BPlusTreeSelfTest extends
GridCommonAbstractTest {
doTestPutRemove(true);
}
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ @Test
+ public void testPutRemove_3_60_pp2_1() throws IgniteCheckedException {
+ MAX_PER_PAGE = 3;
+ CNT = 60;
+ PUT_INC = 1;
+ RMV_INC = 2;
+
+ doTestPutRemove(true);
+ }
+
/**
* @throws IgniteCheckedException If failed.
*/
@@ -671,6 +814,19 @@ public class BPlusTreeSelfTest extends
GridCommonAbstractTest {
doTestPutRemove(true);
}
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ @Test
+ public void testPutRemove_3_60_mp2_1() throws IgniteCheckedException {
+ MAX_PER_PAGE = 3;
+ CNT = 60;
+ PUT_INC = -1;
+ RMV_INC = 2;
+
+ doTestPutRemove(true);
+ }
+
/**
* @throws IgniteCheckedException If failed.
*/
@@ -726,10 +882,22 @@ public class BPlusTreeSelfTest extends
GridCommonAbstractTest {
assertNull(tree.findOne(cnt));
checkIterate(tree, cnt, cnt, null, false);
+ boolean rmvRange = U.safeAbs(RMV_INC) > 1;
+
for (long x = RMV_INC > 0 ? 0 : cnt - 1; x >= 0 && x < cnt; x +=
RMV_INC) {
X.println(" -- " + x);
- assertEquals(Long.valueOf(x), tree.remove(x));
+ long x2 = rmvRange ? x + (RMV_INC / 2) : x;
+
+ if (rmvRange) {
+ List<Long> res = tree.remove(Math.min(x, x2), Math.max(x, x2),
2);
+
+ assertEquals(2, res.size());
+ assertTrue(res.contains(x));
+ assertTrue(res.contains(x2));
+ }
+ else
+ assertEquals(Long.valueOf(x), tree.remove(x));
assertNoLocks();
@@ -738,7 +906,8 @@ public class BPlusTreeSelfTest extends
GridCommonAbstractTest {
assertNoLocks();
assertNull(tree.findOne(x));
- checkIterate(tree, x, x, null, false);
+ assertNull(tree.findOne(x2));
+ checkIterate(tree, x, x2, null, false);
assertNoLocks();
@@ -747,7 +916,7 @@ public class BPlusTreeSelfTest extends
GridCommonAbstractTest {
assertNoLocks();
}
- assertFalse(tree.find(null, null).next());
+ assertFalse(tree.printTree(), tree.find(null, null).next());
assertEquals(0, tree.size());
assertEquals(0, tree.rootLevel());
@@ -953,6 +1122,16 @@ public class BPlusTreeSelfTest extends
GridCommonAbstractTest {
doTestMassiveRemove(true);
}
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testMassiveRemove3_true_range() throws Exception {
+ MAX_PER_PAGE = 3;
+
+ doTestMassiveRemove(true, 5);
+ }
+
/**
* @throws Exception If failed.
*/
@@ -973,6 +1152,16 @@ public class BPlusTreeSelfTest extends
GridCommonAbstractTest {
doTestMassiveRemove(true);
}
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testMassiveRemove2_true_range() throws Exception {
+ MAX_PER_PAGE = 2;
+
+ doTestMassiveRemove(true, 5);
+ }
+
/**
* @throws Exception If failed.
*/
@@ -993,22 +1182,45 @@ public class BPlusTreeSelfTest extends
GridCommonAbstractTest {
doTestMassiveRemove(true);
}
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testMassiveRemove1_true_range() throws Exception {
+ MAX_PER_PAGE = 1;
+
+ doTestMassiveRemove(true, 5);
+ }
+
/**
* @param canGetRow Can get row in inner page.
* @throws Exception If failed.
*/
private void doTestMassiveRemove(final boolean canGetRow) throws Exception
{
+ doTestMassiveRemove(canGetRow, 1);
+ }
+
+ /**
+ * @param canGetRow Can get row in inner page.
+ * @param batchSize Batch size.
+ * @throws Exception If failed.
+ */
+ private void doTestMassiveRemove(final boolean canGetRow, int batchSize)
throws Exception {
final int threads = 64;
final int keys = 3000;
- final AtomicLongArray rmvd = new AtomicLongArray(keys);
+ final AtomicLongArray rmvd = new AtomicLongArray(keys / batchSize);
final TestTree tree = createTestTree(canGetRow);
+ final List<Long> expKeys = batchSize == 1 ? null : new
ArrayList<>(keys);
+
// Put keys in reverse order to have a better balance in the tree
(lower height).
for (long i = keys - 1; i >= 0; i--) {
tree.put(i);
-// X.println(tree.printTree());
+
+ if (expKeys != null)
+ expKeys.add(keys - i - 1);
}
assertEquals(keys, tree.size());
@@ -1025,9 +1237,10 @@ public class BPlusTreeSelfTest extends
GridCommonAbstractTest {
for (;;) {
int idx = 0;
boolean found = false;
+ int batchesCnt = keys / batchSize;
- for (int i = 0, shift = rnd.nextInt(keys); i < keys;
i++) {
- idx = (i + shift) % keys;
+ for (int i = 0, shift = rnd.nextInt(batchesCnt); i <
batchesCnt; i++) {
+ idx = (i + shift) % batchesCnt;
if (rmvd.get(idx) == 0 && rmvd.compareAndSet(idx,
0, 1)) {
found = true;
@@ -1039,10 +1252,24 @@ public class BPlusTreeSelfTest extends
GridCommonAbstractTest {
if (!found)
break;
- assertEquals(Long.valueOf(idx),
tree.remove((long)idx));
+ Collection<Long> rmvdIds0;
+
+ if (batchSize == 1) {
+ Long rmvId = tree.remove((long)idx);
+ rmvdIds0 = Collections.singleton(rmvId);
+ assertEquals(Long.valueOf(idx), rmvId);
+ }
+ else {
+ long startIdx = (long)idx * batchSize;
+
+ // Ensure that we remove from left to right.
+ rmvdIds0 = tree.remove(startIdx, Long.MAX_VALUE,
batchSize);
+
+ assertEquals(expKeys.subList((int)startIdx,
(int)startIdx + batchSize), rmvdIds0);
+ }
if (canGetRow)
- rmvdIds.add((long)idx);
+ rmvdIds.addAll(rmvdIds0);
}
return null;
@@ -1174,6 +1401,75 @@ public class BPlusTreeSelfTest extends
GridCommonAbstractTest {
assertNoLocks();
}
+ /** */
+ @Test
+ public void testBasicBatchRemove() throws IgniteCheckedException {
+ BPlusTree<Long, Long> tree = createTestTree(true);
+
+ assertEquals(0, tree.size());
+
+ int batchSize = 5;
+ int keysCnt = 10_000;
+ int startKey = 1_000;
+ int endKey = startKey + keysCnt;
+ List<Long> expList = new ArrayList<>();
+
+ for (long i = startKey; i < endKey; i++) {
+ tree.put(i);
+ expList.add(i);
+ }
+
+ for (long i = 0; i < (endKey / batchSize); i++) {
+ long startIdx = i * batchSize;
+ long endIdx = startIdx + batchSize - 1;
+ int op = (int)(i % 4);
+
+ List<Long> res;
+
+ switch (op) {
+ case 0:
+ res = tree.remove(startIdx, endIdx, 0);
+
+ break;
+ case 1:
+ res = tree.remove(startIdx, endIdx, batchSize);
+
+ break;
+ case 2:
+ if (startIdx >= startKey) {
+ res = tree.remove(startIdx, Long.MAX_VALUE, batchSize);
+
+ break;
+ }
+ case 3:
+ res = tree.remove(Long.MIN_VALUE, endIdx, batchSize);
+
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown operation: " +
op);
+ }
+
+ List<Long> expSubList = startIdx < startKey ?
+ Collections.emptyList() : expList.subList((int)startIdx -
startKey, (int)endIdx - startKey + 1);
+
+ assertEquals("idx=" + startIdx + ", op=" + op, expSubList, res);
+ }
+
+ assertNoLocks();
+ assertEquals(0, tree.size());
+
+ // Check an empty range between the bounds.
+ tree.put(0L);
+ tree.put(10L);
+
+ assertTrue(tree.remove(1L, 9L, 0).isEmpty());
+ assertEquals(Collections.singletonList(0L),
tree.remove(Long.MIN_VALUE, Long.MAX_VALUE, 1));
+ assertEquals(Collections.singletonList(10L),
tree.remove(Long.MIN_VALUE, Long.MAX_VALUE, 1));
+
+ assertNoLocks();
+ assertEquals(0, tree.size());
+ }
+
/**
* @param canGetRow Can get row from inner page.
* @throws IgniteCheckedException If failed.
@@ -2273,7 +2569,7 @@ public class BPlusTreeSelfTest extends
GridCommonAbstractTest {
*/
@Test
public void testIterateConcurrentPutRemove() throws Exception {
- iterateConcurrentPutRemove();
+ iterateConcurrentPutRemove(false);
}
/**
@@ -2283,7 +2579,7 @@ public class BPlusTreeSelfTest extends
GridCommonAbstractTest {
public void testIterateConcurrentPutRemove_1() throws Exception {
MAX_PER_PAGE = 1;
- iterateConcurrentPutRemove();
+ iterateConcurrentPutRemove(false);
}
/**
@@ -2293,23 +2589,62 @@ public class BPlusTreeSelfTest extends
GridCommonAbstractTest {
public void testIterateConcurrentPutRemove_2() throws Exception {
MAX_PER_PAGE = 2;
- iterateConcurrentPutRemove();
+ iterateConcurrentPutRemove(false);
}
/**
* @throws Exception If failed.
*/
@Test
- public void testIteratePutRemove_10() throws Exception {
+ public void testIterateConcurrentPutRemove_10() throws Exception {
MAX_PER_PAGE = 10;
- iterateConcurrentPutRemove();
+ iterateConcurrentPutRemove(false);
}
/**
* @throws Exception If failed.
*/
- private void iterateConcurrentPutRemove() throws Exception {
+ @Test
+ public void testIterateConcurrentPutRemoveRange() throws Exception {
+ iterateConcurrentPutRemove(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testIterateConcurrentPutRemoveRange_1() throws Exception {
+ MAX_PER_PAGE = 1;
+
+ iterateConcurrentPutRemove(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testIterateConcurrentPutRemoveRange_2() throws Exception {
+ MAX_PER_PAGE = 2;
+
+ iterateConcurrentPutRemove(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testIterateConcurrentPutRemoveRange_10() throws Exception {
+ MAX_PER_PAGE = 10;
+
+ iterateConcurrentPutRemove(true);
+ }
+
+ /**
+ * @param rmvRange Remove multiple values with the {@link
BPlusTree#remove} operation.
+ * @throws Exception If failed.
+ */
+ private void iterateConcurrentPutRemove(boolean rmvRange) throws Exception
{
final TestTree tree = createTestTree(true);
// Single key per page is a degenerate case: it is very hard to merge
pages in a tree because
@@ -2423,17 +2758,25 @@ public class BPlusTreeSelfTest extends
GridCommonAbstractTest {
U.sleep(100);
for (int j = 0; j < 20; j++) {
- for (long idx = 0L; idx < KEYS / 2; ++idx) {
+ int keysPerOp = rmvRange ? 2 : 1;
+
+ for (long idx = 0L; idx < KEYS / (2 * keysPerOp); ++idx) {
long toRmv = rnd.nextLong(KEYS);
- if (toRmv != findKey)
+ if (toRmv == findKey)
+ continue;
+
+ if (rmvRange && toRmv + 1 != findKey)
+ assertTrue(tree.remove(toRmv, toRmv + 1,
keysPerOp).size() <= keysPerOp);
+ else
tree.remove(toRmv);
}
for (long idx = 0L; idx < KEYS / 2; ++idx) {
long put = rnd.nextLong(KEYS);
- tree.put(put);
+ if (put != findKey)
+ tree.put(put);
}
}
}