Repository: ignite Updated Branches: refs/heads/ignite-5937 f8be46d80 -> 6e25b649f
ignite-5937 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6e25b649 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6e25b649 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6e25b649 Branch: refs/heads/ignite-5937 Commit: 6e25b649f758c3aa308118354b08c6899dd50654 Parents: f8be46d Author: sboikov <[email protected]> Authored: Wed Oct 4 16:57:59 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu Oct 5 17:54:17 2017 +0300 ---------------------------------------------------------------------- .../cache/IgniteCacheOffheapManager.java | 11 +- .../cache/IgniteCacheOffheapManagerImpl.java | 179 +++++++++++++------ .../cache/mvcc/CacheCoordinatorsProcessor.java | 12 +- .../cache/persistence/tree/BPlusTree.java | 106 +++++------ .../processors/cache/tree/CacheDataTree.java | 2 +- .../cache/tree/MvccKeyMaxVersionBound.java | 77 ++++++++ .../cache/tree/MvccKeyMinVersionBound.java | 49 +++++ .../processors/cache/tree/MvccUpdateRow.java | 177 ++++++++++++++++++ .../cache/tree/MvccVersionBasedSearchRow.java | 36 +++- .../cache/mvcc/CacheMvccTransactionsTest.java | 40 +++++ .../processors/database/BPlusTreeSelfTest.java | 123 +++++++++---- 11 files changed, 662 insertions(+), 150 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6e25b649/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index 9d03e4a..8967ce8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -492,7 +492,16 @@ public interface IgniteCacheOffheapManager { long expireTime, @Nullable CacheDataRow oldRow) throws IgniteCheckedException; - GridLongList mvccUpdate( + /** + * @param cctx Cache context. + * @param key Key. + * @param val Value. + * @param ver Version. + * @param mvccVer Mvcc version. + * @return List of transactions to wait for. + * @throws IgniteCheckedException If failed. + */ + @Nullable GridLongList mvccUpdate( GridCacheContext cctx, KeyCacheObject key, CacheObject val, http://git-wip-us.apache.org/repos/asf/ignite/blob/6e25b649/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index eef645d..25f36b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -55,7 +55,10 @@ import org.apache.ignite.internal.processors.cache.tree.CacheDataRowStore; import org.apache.ignite.internal.processors.cache.tree.CacheDataTree; import org.apache.ignite.internal.processors.cache.tree.DataRow; import org.apache.ignite.internal.processors.cache.tree.MvccDataRow; +import org.apache.ignite.internal.processors.cache.tree.MvccKeyMaxVersionBound; +import org.apache.ignite.internal.processors.cache.tree.MvccKeyMinVersionBound; import org.apache.ignite.internal.processors.cache.tree.MvccSearchRow; +import org.apache.ignite.internal.processors.cache.tree.MvccUpdateRow; import org.apache.ignite.internal.processors.cache.tree.MvccVersionBasedSearchRow; import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree; import org.apache.ignite.internal.processors.cache.tree.PendingRow; @@ -1361,83 +1364,141 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager try { int cacheId = grp.storeCacheIdInDataPage() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; - MvccDataRow dataRow = new MvccDataRow( - key, - val, - ver, - partId, - cacheId, - mvccVer.coordinatorVersion(), - mvccVer.counter()); - CacheObjectContext coCtx = cctx.cacheObjectContext(); // Make sure value bytes initialized. key.valueBytes(coCtx); val.valueBytes(coCtx); - rowStore.addRow(dataRow); + if (true) { + MvccUpdateRow updateRow = new MvccUpdateRow( + key, + val, + ver, + mvccVer, + partId, + cacheId); - assert dataRow.link() != 0 : dataRow; + rowStore.addRow(updateRow); - if (grp.sharedGroup() && dataRow.cacheId() == CU.UNDEFINED_CACHE_ID) - dataRow.cacheId(cctx.cacheId()); + assert updateRow.link() != 0 : updateRow; - boolean old = dataTree.putx(dataRow); + if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID) + updateRow.cacheId(cctx.cacheId()); - assert !old; + GridLongList waitTxs = null; - GridLongList waitTxs = null; + if (mvccVer.initialLoad()) { + boolean old = dataTree.putx(updateRow); - if (!mvccVer.initialLoad()) { - MvccLongList activeTxs = mvccVer.activeTransactions(); + assert !old; - // TODO IGNITE-3484: need special method. - GridCursor<CacheDataRow> cur = dataTree.find( - new MvccSearchRow(cacheId, key, mvccVer.coordinatorVersion(), mvccVer.counter() - 1), - new MvccSearchRow(cacheId, key, 1, 1)); + incrementSize(cctx.cacheId()); + } + else { + dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key), updateRow); - boolean first = true; + boolean old = dataTree.putx(updateRow); - boolean activeTx = false; + assert !old; - while (cur.next()) { - CacheDataRow oldVal = cur.get(); + if (!updateRow.previousNotNull()) + incrementSize(cctx.cacheId()); + + waitTxs = updateRow.activeTransactions(); + + List<CacheSearchRow> cleanupRows = updateRow.cleanupRows(); - assert oldVal.link() != 0 : oldVal; + if (cleanupRows != null) { + for (int i = 0; i < cleanupRows.size(); i++) { + CacheSearchRow oldRow = cleanupRows.get(i); - if (activeTxs != null && oldVal.mvccCoordinatorVersion() == mvccVer.coordinatorVersion() && - activeTxs.contains(oldVal.mvccCounter())) { - if (waitTxs == null) - waitTxs = new GridLongList(); + assert oldRow.link() != 0L : oldRow; - assert oldVal.mvccCounter() != mvccVer.counter(); + boolean rmvd = dataTree.removex(oldRow); - waitTxs.add(oldVal.mvccCounter()); + assert rmvd; - activeTx = true; + rowStore.removeRow(oldRow.link()); + } } + } - if (!activeTx) { - // Should not delete oldest version which is less than cleanup version. - int cmp = compare(oldVal, mvccVer.coordinatorVersion(), mvccVer.cleanupVersion()); + return waitTxs; + } + else { + MvccDataRow dataRow = new MvccDataRow( + key, + val, + ver, + partId, + cacheId, + mvccVer.coordinatorVersion(), + mvccVer.counter()); - if (cmp <= 0) { - if (first) - first = false; - else { - boolean rmvd = dataTree.removex(oldVal); + rowStore.addRow(dataRow); + + assert dataRow.link() != 0 : dataRow; + + if (grp.sharedGroup() && dataRow.cacheId() == CU.UNDEFINED_CACHE_ID) + dataRow.cacheId(cctx.cacheId()); - assert rmvd; + boolean old = dataTree.putx(dataRow); - rowStore.removeRow(oldVal.link()); + assert !old; + + GridLongList waitTxs = null; + + if (!mvccVer.initialLoad()) { + MvccLongList activeTxs = mvccVer.activeTransactions(); + + // TODO IGNITE-3484: need special method. + GridCursor<CacheDataRow> cur = dataTree.find( + new MvccSearchRow(cacheId, key, mvccVer.coordinatorVersion(), mvccVer.counter() - 1), + new MvccSearchRow(cacheId, key, 1, 1)); + + boolean first = true; + + boolean activeTx = false; + + while (cur.next()) { + CacheDataRow oldVal = cur.get(); + + assert oldVal.link() != 0 : oldVal; + + if (activeTxs != null && oldVal.mvccCoordinatorVersion() == mvccVer.coordinatorVersion() && + activeTxs.contains(oldVal.mvccCounter())) { + if (waitTxs == null) + waitTxs = new GridLongList(); + + assert oldVal.mvccCounter() != mvccVer.counter(); + + waitTxs.add(oldVal.mvccCounter()); + + activeTx = true; + } + + if (!activeTx) { + // Should not delete oldest version which is less than cleanup version. + int cmp = compare(oldVal, mvccVer.coordinatorVersion(), mvccVer.cleanupVersion()); + + if (cmp <= 0) { + if (first) + first = false; + else { + boolean rmvd = dataTree.removex(oldVal); + + assert rmvd; + + rowStore.removeRow(oldVal.link()); + } } } } } - } - return waitTxs; + return waitTxs; + } } finally { busyLock.leaveBusy(); @@ -1649,11 +1710,15 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager if (grp.mvccEnabled()) { if (true) { - row = dataTree.findOneBounded( - new MvccSearchRow(cacheId, key, Long.MAX_VALUE, Long.MAX_VALUE), - new MvccSearchRow(cacheId, key, 1L, 1L), - null, - CacheDataRowAdapter.RowData.NO_KEY); + MvccKeyMaxVersionBound searchRow = new MvccKeyMaxVersionBound(cacheId, key); + + dataTree.iterate( + searchRow, + new MvccKeyMinVersionBound(cacheId, key), + searchRow // Use the same instance as closure to do not create extra object. + ); + + row = searchRow.row(); } else { GridCursor<CacheDataRow> cur = dataTree.find(new MvccSearchRow(cacheId, key, Long.MAX_VALUE, Long.MAX_VALUE), @@ -1681,6 +1746,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager { assert grp.mvccEnabled(); + // Note: this method is intended for testing only. + key.valueBytes(cctx.cacheObjectContext()); int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; @@ -1717,11 +1784,13 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager if (true) { MvccVersionBasedSearchRow lower = new MvccVersionBasedSearchRow(cacheId, key, ver); - CacheDataRow row = dataTree.findOneBounded( + dataTree.iterate( lower, - new MvccSearchRow(cacheId, key, 1L, 1L), - lower, // Use the same instance as predicate to do not create extra object. - CacheDataRowAdapter.RowData.NO_KEY); + new MvccKeyMinVersionBound(cacheId, key), + lower // Use the same instance as closure to do not create extra object. + ); + + CacheDataRow row = lower.row(); afterRowFound(row, key); http://git-wip-us.apache.org/repos/asf/ignite/blob/6e25b649/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java index 5080c83..b9b8ea1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java @@ -614,8 +614,14 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { // TODO IGNITE-3478 sorted? + change GridLongList.writeTo? MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse(); - for (Long txVer : activeTxs.keySet()) + long minActive = Long.MAX_VALUE; + + for (Long txVer : activeTxs.keySet()) { + if (txVer < minActive) + minActive = txVer; + res.addTx(txVer); + } Object old = activeTxs.put(nextCtr, txId); @@ -624,7 +630,9 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { long cleanupVer; if (prevCrdQueries.previousQueriesDone()) { - cleanupVer = committedCntr.get() - 1; + cleanupVer = Math.min(minActive, committedCntr.get()); + + cleanupVer--; Long qryVer = activeQueries.minimalQueryCounter(); http://git-wip-us.apache.org/repos/asf/ignite/blob/6e25b649/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java index b6c5c96..9752b17 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java @@ -908,7 +908,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements long pageAddr = readLock(firstPageId, firstPage); // We always merge pages backwards, the first page is never removed. try { - cursor.init(pageAddr, io(pageAddr), 0); + cursor.init(pageAddr, io(pageAddr), -1); } finally { readUnlock(firstPageId, firstPage, pageAddr); @@ -976,17 +976,16 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements /** * @param lower Lower bound inclusive. * @param upper Upper bound inclusive. - * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row. - * @return First found item which meets bounds and pass predicate. + * @param c Closure applied for all found items, iteration is stopped if closure returns {@code false}. * @throws IgniteCheckedException If failed. */ - public T findOneBounded(L lower, L upper, RowPredicate<L, T> p, Object x) throws IgniteCheckedException { + public void iterate(L lower, L upper, TreeRowClosure<L, T> c) throws IgniteCheckedException { checkDestroyed(); try { - FindOneCursor cursor = new FindOneCursor(lower, upper, p, x); + ClosureCursor cursor = new ClosureCursor(lower, upper, c); - return cursor.findOne(); + cursor.iterate(); } catch (IgniteCheckedException e) { throw new IgniteCheckedException("Runtime failure on bounds: [lower=" + lower + ", upper=" + upper + "]", e); @@ -4431,18 +4430,13 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements /** */ final L upperBound; - /** */ - final Object x; - /** * @param lowerBound Lower bound. * @param upperBound Upper bound. - * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row. */ - AbstractForwardCursor(L lowerBound, L upperBound, Object x) { + AbstractForwardCursor(L lowerBound, L upperBound) { this.lowerBound = lowerBound; this.upperBound = upperBound; - this.x = x; } /** @@ -4559,7 +4553,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements throws IgniteCheckedException { assert io.isLeaf() : io; assert cnt != 0 : cnt; // We can not see empty pages (empty tree handled in init). - assert startIdx >= 0 : startIdx; + assert startIdx >= 0 || startIdx == -1: startIdx; assert cnt >= startIdx; checkDestroyed(); @@ -4596,7 +4590,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements * @return {@code true} If we have rows to return after reading the next page. * @throws IgniteCheckedException If failed. */ - final boolean nextPage(T lastRow) throws IgniteCheckedException { + final boolean nextPage(L lastRow) throws IgniteCheckedException { updateLowerBound(lastRow); for (;;) { @@ -4618,7 +4612,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements try { BPlusIO<L> io = io(pageAddr); - if (fillFromBuffer(pageAddr, io, 0, io.getCount(pageAddr))) + if (fillFromBuffer(pageAddr, io, -1, io.getCount(pageAddr))) return true; // Continue fetching forward. @@ -4639,7 +4633,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements /** * @param lower New exact lower bound. */ - private void updateLowerBound(T lower) { + private void updateLowerBound(L lower) { if (lower != null) { lowerShift = 1; // Now we have the full row an need to avoid duplicates. lowerBound = lower; // Move the lower bound forward for further concurrent merge retries. @@ -4648,30 +4642,27 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements } /** - * Forward cursor. + * Closure cursor. */ @SuppressWarnings("unchecked") - private final class FindOneCursor extends AbstractForwardCursor { - /** */ - private Object resRow; - + private final class ClosureCursor extends AbstractForwardCursor { /** */ - private T lastRow; + private final TreeRowClosure<L, T> p; /** */ - private final RowPredicate<L, T> p; + private L lastRow; /** * @param lowerBound Lower bound. * @param upperBound Upper bound. * @param p Row predicate. - * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row. */ - FindOneCursor(L lowerBound, L upperBound, @Nullable RowPredicate<L, T> p, Object x) { - super(lowerBound, upperBound, x); + ClosureCursor(L lowerBound, L upperBound, TreeRowClosure<L, T> p) { + super(lowerBound, upperBound); assert lowerBound != null; assert upperBound != null; + assert p != null; this.p = p; } @@ -4682,8 +4673,9 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements } /** {@inheritDoc} */ - @Override boolean fillFromBuffer0(long pageAddr, BPlusIO<L> io, int startIdx, int cnt) throws IgniteCheckedException { - if (startIdx == 0) // TODO IGNITE-3478: startIdx == 0? can search twice for first item? + @Override boolean fillFromBuffer0(long pageAddr, BPlusIO<L> io, int startIdx, int cnt) + throws IgniteCheckedException { + if (startIdx == -1) // TODO IGNITE-3478: startIdx == 0? can search twice for first item? startIdx = findLowerBound(pageAddr, io, cnt); if (cnt == startIdx) @@ -4698,15 +4690,17 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements return false; } - if (p == null || p.apply(BPlusTree.this, io, pageAddr, i)) { - resRow = getRow(io, pageAddr, i, x); + boolean stop = !p.apply(BPlusTree.this, io, pageAddr, i); + + if (stop) { + nextPageId = 0; // The End. return true; } } if (nextPageId != 0) - lastRow = getRow(io, pageAddr, cnt - 1, null); // Need save last row. + lastRow = io.getLookupRow(BPlusTree.this, pageAddr, cnt - 1); // Need save last row. return true; } @@ -4718,36 +4712,28 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements /** {@inheritDoc} */ @Override void onNotFound(boolean readDone) { - resRow = EMPTY; + nextPageId = 0; } /** * @throws IgniteCheckedException If failed. - * @return Found row. */ - private T findOne() throws IgniteCheckedException { + private void iterate() throws IgniteCheckedException { find(); - if (resRow != null) { - if (resRow == EMPTY) - return null; - - return (T)resRow; + if (nextPageId == 0) { + return; } for (;;) { - T lastRow0 = lastRow; + L lastRow0 = lastRow; lastRow = null; nextPage(lastRow0); - if (resRow != null) { - if (resRow == EMPTY) - return null; - - return (T)resRow; - } + if (nextPageId == 0) + return; } } } @@ -4758,6 +4744,9 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements @SuppressWarnings("unchecked") private final class ForwardCursor extends AbstractForwardCursor implements GridCursor<T> { /** */ + final Object x; + + /** */ private T[] rows = (T[])EMPTY; /** */ @@ -4769,13 +4758,19 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row. */ ForwardCursor(L lowerBound, L upperBound, Object x) { - super(lowerBound, upperBound, x); + super(lowerBound, upperBound); + + this.x = x; } /** {@inheritDoc} */ @Override boolean fillFromBuffer0(long pageAddr, BPlusIO<L> io, int startIdx, int cnt) throws IgniteCheckedException { - if (lowerBound != null && startIdx == 0) - startIdx = findLowerBound(pageAddr, io, cnt); + if (startIdx == -1) { + if (lowerBound != null) + startIdx = findLowerBound(pageAddr, io, cnt); + else + startIdx = 0; + } if (upperBound != null && cnt != startIdx) cnt = findUpperBound(pageAddr, io, startIdx, cnt); @@ -5003,7 +4998,16 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements /** * */ - public interface RowPredicate<L, T extends L> { - public boolean apply(BPlusTree<L, T> tree, BPlusIO<L> io, long pageAddr, int idx) throws IgniteCheckedException; + public interface TreeRowClosure<L, T extends L> { + /** + * @param tree Tree. + * @param io Tree IO. + * @param pageAddr Page address. + * @param idx Item index. + * @return {@code True} if item pass predicate. TODO IGNITE-3478 + * @throws IgniteCheckedException If failed. + */ + public boolean apply(BPlusTree<L, T> tree, BPlusIO<L> io, long pageAddr, int idx) + throws IgniteCheckedException; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6e25b649/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java index a1bfc9b..eaeefee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java @@ -114,7 +114,7 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> { /** {@inheritDoc} */ @Override protected int compare(BPlusIO<CacheSearchRow> iox, long pageAddr, int idx, CacheSearchRow row) throws IgniteCheckedException { - assert !grp.mvccEnabled() || row.mvccCoordinatorVersion() != 0;// || row.getClass() == SearchRow.class; + assert !grp.mvccEnabled() || row.mvccCoordinatorVersion() != 0 : row; RowLinkIO io = (RowLinkIO)iox; http://git-wip-us.apache.org/repos/asf/ignite/blob/6e25b649/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java new file mode 100644 index 0000000..aa9422d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.tree; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter; +import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow; +import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class MvccKeyMaxVersionBound extends SearchRow implements BPlusTree.TreeRowClosure<CacheSearchRow, CacheDataRow> { + /** */ + private CacheDataRow resRow; + + /** + * @param cacheId Cache ID. + * @param key Key. + */ + public MvccKeyMaxVersionBound(int cacheId, KeyCacheObject key) { + super(cacheId, key); + } + + /** + * @return Found row. + */ + @Nullable public CacheDataRow row() { + return resRow; + } + + /** {@inheritDoc} */ + @Override public boolean apply(BPlusTree<CacheSearchRow, CacheDataRow> tree, BPlusIO<CacheSearchRow> io, + long pageAddr, + int idx) + throws IgniteCheckedException + { + resRow = ((CacheDataTree)tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY); + + return false; // Stop search. + } + + /** {@inheritDoc} */ + @Override public long mvccCoordinatorVersion() { + return Long.MAX_VALUE; + } + + /** {@inheritDoc} */ + @Override public long mvccCounter() { + return Long.MAX_VALUE; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MvccKeyMaxVersionBound.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6e25b649/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMinVersionBound.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMinVersionBound.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMinVersionBound.java new file mode 100644 index 0000000..f2ac308 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMinVersionBound.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.tree; + +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * + */ +public class MvccKeyMinVersionBound extends SearchRow { + /** + * @param cacheId Cache ID. + * @param key Key. + */ + public MvccKeyMinVersionBound(int cacheId, KeyCacheObject key) { + super(cacheId, key); + } + + /** {@inheritDoc} */ + @Override public long mvccCoordinatorVersion() { + return 1L; + } + + /** {@inheritDoc} */ + @Override public long mvccCounter() { + return 1L; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MvccKeyMinVersionBound.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6e25b649/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java new file mode 100644 index 0000000..79544e6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.tree; + +import java.util.ArrayList; +import java.util.List; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; +import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow; +import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.GridLongList; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<CacheSearchRow, CacheDataRow> { + /** */ + private Boolean hasPrev; + + /** */ + private boolean canCleanup; + + /** */ + private GridLongList activeTxs; + + /** */ + private List<CacheSearchRow> cleanupRows; + + /** */ + private final MvccCoordinatorVersion mvccVer; + + /** + * @param key Key. + * @param val Value. + * @param ver Version. + * @param mvccVer Mvcc version. + * @param part Partition. + * @param cacheId Cache ID. + */ + public MvccUpdateRow( + KeyCacheObject key, + CacheObject val, + GridCacheVersion ver, + MvccCoordinatorVersion mvccVer, + int part, + int cacheId) { + super(key, val, ver, part, 0L, cacheId); + + this.mvccVer = mvccVer; + } + + /** + * @return {@code True} if previous value was non-null. + */ + public boolean previousNotNull() { + return hasPrev != null && hasPrev; + } + + /** + * @return Active transactions to wait for. + */ + @Nullable public GridLongList activeTransactions() { + return activeTxs; + } + + /** + * @return Rows which are safe to cleanup. + */ + public List<CacheSearchRow> cleanupRows() { + return cleanupRows; + } + + /** {@inheritDoc} */ + @Override public boolean apply(BPlusTree<CacheSearchRow, CacheDataRow> tree, + BPlusIO<CacheSearchRow> io, + long pageAddr, + int idx) + throws IgniteCheckedException + { + RowLinkIO rowIo = (RowLinkIO)io; + + // All previous version should be less then new one. + assert mvccVer.coordinatorVersion() >= rowIo.getMvccCoordinatorVersion(pageAddr, idx); + assert mvccVer.coordinatorVersion() > rowIo.getMvccCoordinatorVersion(pageAddr, idx) || mvccVer.counter() > rowIo.getMvccCounter(pageAddr, idx); + + boolean checkActive = mvccVer.activeTransactions().size() > 0; + + boolean txActive = false; + + // Suppose transactions on previous coordinator versions are done. + if (checkActive && mvccVer.coordinatorVersion() == rowIo.getMvccCoordinatorVersion(pageAddr, idx)) { + long rowMvccCntr = rowIo.getMvccCounter(pageAddr, idx); + + if (mvccVer.activeTransactions().contains(rowMvccCntr)) { + txActive = true; + + if (activeTxs == null) + activeTxs = new GridLongList(); + + activeTxs.add(rowMvccCntr); + } + } + + if (hasPrev == null) + hasPrev = Boolean.TRUE; // TODO IGNITE-3478 support removes. + + if (!txActive) { + assert Long.compare(mvccVer.coordinatorVersion(), rowIo.getMvccCoordinatorVersion(pageAddr, idx)) >= 0; + + int cmp; + + if (mvccVer.coordinatorVersion() == rowIo.getMvccCoordinatorVersion(pageAddr, idx)) + cmp = Long.compare(mvccVer.cleanupVersion(), rowIo.getMvccCounter(pageAddr, idx)); + else + cmp = 1; + + if (cmp >= 0) { + // Do not cleanup oldest version. + if (canCleanup) { + CacheSearchRow row = io.getLookupRow(tree, pageAddr, idx); + + assert row.link() != 0 && row.mvccCoordinatorVersion() > 0 : row; + + // Should not be possible to cleanup active tx. + assert row.mvccCoordinatorVersion() != mvccVer.coordinatorVersion() + || !mvccVer.activeTransactions().contains(row.mvccCounter()); + + if (cleanupRows == null) + cleanupRows = new ArrayList<>(); + + cleanupRows.add(row); + } + else + canCleanup = true; + } + } + + return true; + } + + /** {@inheritDoc} */ + @Override public long mvccCoordinatorVersion() { + return mvccVer.coordinatorVersion(); + } + + /** {@inheritDoc} */ + @Override public long mvccCounter() { + return mvccVer.counter(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MvccUpdateRow.class, this, "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6e25b649/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java index 6af2c4c..c829afb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java @@ -11,7 +11,7 @@ * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and + See the License for the specific language governing permissions and * limitations under the License. */ @@ -21,18 +21,23 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter; import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow; import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO; import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; /** * */ -public class MvccVersionBasedSearchRow extends SearchRow implements BPlusTree.RowPredicate<CacheSearchRow, CacheDataRow> { +public class MvccVersionBasedSearchRow extends SearchRow implements BPlusTree.TreeRowClosure<CacheSearchRow, CacheDataRow> { /** */ private final MvccCoordinatorVersion ver; + /** */ + private CacheDataRow resRow; + /** * @param cacheId Cache ID. * @param key Key. @@ -46,21 +51,36 @@ public class MvccVersionBasedSearchRow extends SearchRow implements BPlusTree.Ro this.ver = ver; } + /** + * @return Found row. + */ + @Nullable public CacheDataRow row() { + return resRow; + } + /** {@inheritDoc} */ @Override public boolean apply(BPlusTree<CacheSearchRow, CacheDataRow> tree, BPlusIO<CacheSearchRow> io, long pageAddr, int idx) throws IgniteCheckedException { - if (ver.activeTransactions() == null) - return true; + boolean visible = true; + + if (ver.activeTransactions().size() > 0) { + RowLinkIO rowIo = (RowLinkIO)io; + + // TODO IGNITE-3478 sort active transactions? + if (rowIo.getMvccCoordinatorVersion(pageAddr, idx) == ver.coordinatorVersion()) + visible = !ver.activeTransactions().contains(rowIo.getMvccCounter(pageAddr, idx)); + } - RowLinkIO rowIo = (RowLinkIO)io; + if (visible) { + resRow = ((CacheDataTree) tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY); - if (rowIo.getMvccCoordinatorVersion(pageAddr, idx) != ver.coordinatorVersion()) - return true; + return false; // Stop search. + } - return !ver.activeTransactions().contains(rowIo.getMvccCounter(pageAddr, idx)); // TODO IGNITE-3478 sort active transactions? + return true; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/6e25b649/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java index 89b3df2..115e8a2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java @@ -2539,6 +2539,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } /** + * @param restartCrd If {@code true} dedicated coordinator node is restarted during test. * @param srvs Number of server nodes. * @param clients Number of client nodes. * @param cacheBackups Number of cache backups. @@ -2685,6 +2686,45 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { stop.set(true); } } + /** + * @throws IgniteCheckedException If failed. + */ + public void testSize() throws Exception { + Ignite node = startGrid(0); + + IgniteCache cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 1)); + + assertEquals(cache.size(), 0); + + final int KEYS = 10; + + for (int i = 0; i < KEYS; i++) { + final Integer key = i; + + try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(key, i); + + tx.commit(); + } + + assertEquals(i + 1, cache.size()); + } + + for (int i = 0; i < KEYS; i++) { + final Integer key = i; + + try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(key, i); + + tx.commit(); + } + + assertEquals(KEYS, cache.size()); + } + + // TODO IGNITE-3478: test removes. + } + /** * @throws IgniteCheckedException If failed. http://git-wip-us.apache.org/repos/asf/ignite/blob/6e25b649/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java index e7ab34f..e2f6b2e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java @@ -571,7 +571,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { assertNoLocks(); assertEquals(x, tree.findOne(x).longValue()); - assertEquals(x, tree.findOneBounded(x, x, null, null).longValue()); + checkIterate(tree, x, x, x, true); assertNoLocks(); @@ -588,13 +588,13 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { for (long x = 0; x < cnt; x++) { assertEquals(x, tree.findOne(x).longValue()); - assertEquals(x, tree.findOneBounded(x, x, null, null).longValue()); + checkIterate(tree, x, x, x, true); } assertNoLocks(); assertNull(tree.findOne(cnt)); - assertNull(tree.findOneBounded(cnt, cnt, null, null)); + checkIterate(tree, cnt, cnt, null, false); for (long x = RMV_INC > 0 ? 0 : cnt - 1; x >= 0 && x < cnt; x += RMV_INC) { X.println(" -- " + x); @@ -608,7 +608,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { assertNoLocks(); assertNull(tree.findOne(x)); - assertNull(tree.findOneBounded(x, x, null, null)); + checkIterate(tree, x, x, null, false); assertNoLocks(); @@ -625,6 +625,32 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { } /** + * @param tree + * @param lower + * @param upper + * @param exp + * @param expFound + * @throws IgniteCheckedException + */ + private void checkIterate(TestTree tree, long lower, long upper, Long exp, boolean expFound) + throws IgniteCheckedException { + TestTreeRowClosure c = new TestTreeRowClosure(exp); + + tree.iterate(lower, upper, c); + + assertEquals(expFound, c.found); + } + + private void checkIterateC(TestTree tree, long lower, long upper, TestTreeRowClosure c, boolean expFound) + throws IgniteCheckedException { + c.found = false; + + tree.iterate(lower, upper, c); + + assertEquals(expFound, c.found); + } + + /** * @throws IgniteCheckedException If failed. */ public void testRandomInvoke_1_30_1() throws IgniteCheckedException { @@ -1250,44 +1276,53 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testFindOneBounded() throws Exception { + public void testIterate() throws Exception { MAX_PER_PAGE = 5; TestTree tree = createTestTree(true); - assertNull(tree.findOneBounded(0L, 100L, null, null)); + checkIterate(tree, 0L, 100L, null, false); for (long idx = 1L; idx <= 10L; ++idx) tree.put(idx); for (long idx = 1L; idx <= 10L; ++idx) - assertEquals(idx, (Object)tree.findOneBounded(idx, 100L, null, null)); + checkIterate(tree, idx, 100L, idx, true); - assertEquals(1L, (Object)tree.findOneBounded(0L, 100L, null, null)); + checkIterate(tree, 0L, 100L, 1L, true); for (long idx = 1L; idx <= 10L; ++idx) - assertEquals(10L, (Object)tree.findOneBounded(idx, 100L, new TestRowPredicate(10L), null)); + checkIterate(tree, idx, 100L, 10L, true); - assertNull(tree.findOneBounded(0L, 100L, new TestRowPredicate(100L), null)); + checkIterate(tree, 0L, 100L, 100L, false); for (long idx = 1L; idx <= 10L; ++idx) - assertEquals(idx, (Object)tree.findOneBounded(0L, 100L, new TestRowPredicate(idx), null)); + checkIterate(tree, 0L, 100L, idx, true); for (long idx = 0L; idx <= 10L; ++idx) - assertNull(tree.findOneBounded(idx, 11L, new TestRowPredicate(-1L), null)); + checkIterate(tree, idx, 11L, -1L, false); } /** * @throws Exception If failed. */ - public void testFindOneBoundedConcurrentPutRemove() throws Exception { + public void testIterateConcurrentPutRemove() throws Exception { + findOneBoundedConcurrentPutRemove(); + } + + /** + * @throws Exception If failed. + */ + public void testIterateConcurrentPutRemove_1() throws Exception { + MAX_PER_PAGE = 1; + findOneBoundedConcurrentPutRemove(); } /** * @throws Exception If failed. */ - public void testFindOneBoundedConcurrentPutRemove_5() throws Exception { + public void testIterateConcurrentPutRemove_5() throws Exception { MAX_PER_PAGE = 5; findOneBoundedConcurrentPutRemove(); @@ -1296,7 +1331,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testFindOneBoundedConcurrentPutRemove_10() throws Exception { + public void testIteratePutRemove_10() throws Exception { MAX_PER_PAGE = 10; findOneBoundedConcurrentPutRemove(); @@ -1370,33 +1405,30 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { info("Iteration [iter=" + i + ", key=" + findKey + ']'); assertEquals(findKey, tree.findOne(findKey)); - assertEquals(findKey, tree.findOneBounded(findKey, findKey, null, null)); + checkIterate(tree, findKey, findKey, findKey, true); IgniteInternalFuture getFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { @Override public Void call() throws Exception { ThreadLocalRandom rnd = ThreadLocalRandom.current(); - TestRowPredicate p = new TestRowPredicate(findKey); + TestTreeRowClosure p = new TestTreeRowClosure(findKey); - TestRowPredicate falseP = new TestRowPredicate(-1L); + TestTreeRowClosure falseP = new TestTreeRowClosure(-1L); int cnt = 0; while (!stop.get()) { int shift = MAX_PER_PAGE > 0 ? rnd.nextInt(MAX_PER_PAGE * 2) : rnd.nextInt(100); - assertEquals(findKey, tree.findOneBounded(findKey, findKey, null, null)); + checkIterateC(tree, findKey, findKey, p, true); - assertEquals(findKey, - tree.findOneBounded(findKey - shift, findKey, p, null)); + checkIterateC(tree, findKey - shift, findKey, p, true); - assertEquals(findKey, - tree.findOneBounded(findKey - shift, findKey + shift, p, null)); + checkIterateC(tree, findKey - shift, findKey + shift, p, true); - assertEquals(findKey, - tree.findOneBounded(findKey, findKey + shift, p, null)); + checkIterateC(tree, findKey, findKey + shift, p, true); - assertNull(tree.findOneBounded(-100L, KEYS + 100L, falseP, null)); + checkIterateC(tree, -100L, KEYS + 100L, falseP, false); cnt++; } @@ -1650,7 +1682,11 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { last = c.get(); } - last = tree.findOneBounded((long)low, (long)high, null, null); + TestTreeFindFirstClosure cl = new TestTreeFindFirstClosure(); + + tree.iterate((long)low, (long)high, cl); + + last = cl.val; if (last != null) { assertTrue(low + " <= " + last + " <= " + high, last >= low); @@ -2064,23 +2100,46 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { /** * */ - static class TestRowPredicate implements TestTree.RowPredicate<Long, Long> { + static class TestTreeRowClosure implements BPlusTree.TreeRowClosure<Long, Long> { /** */ private final Long expVal; + /** */ + private boolean found; + /** - * @param expVal Expected value. + * @param expVal Value to find or {@code null} to find first. */ - TestRowPredicate(Long expVal) { + TestTreeRowClosure(Long expVal) { this.expVal = expVal; } /** {@inheritDoc} */ @Override public boolean apply(BPlusTree<Long, Long> tree, BPlusIO<Long> io, long pageAddr, int idx) throws IgniteCheckedException { - Long row = io.getLookupRow(tree, pageAddr, idx); + assert !found; + + found = expVal == null || io.getLookupRow(tree, pageAddr, idx).equals(expVal); + + return !found; + } + } + + /** + * + */ + static class TestTreeFindFirstClosure implements BPlusTree.TreeRowClosure<Long, Long> { + /** */ + private Long val; + + /** {@inheritDoc} */ + @Override public boolean apply(BPlusTree<Long, Long> tree, BPlusIO<Long> io, long pageAddr, int idx) + throws IgniteCheckedException { + assert val == null; + + val = io.getLookupRow(tree, pageAddr, idx); - return row.equals(expVal); + return false; } } }
