Repository: ignite Updated Branches: refs/heads/ignite-5937 [created] 44ad70112
ignite-5937 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/44ad7011 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/44ad7011 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/44ad7011 Branch: refs/heads/ignite-5937 Commit: 44ad70112fb1063b61fcdc20c5fee893381d2e44 Parents: 27b2be4 Author: sboikov <[email protected]> Authored: Tue Oct 3 16:50:01 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue Oct 3 18:00:46 2017 +0300 ---------------------------------------------------------------------- .../cache/IgniteCacheOffheapManagerImpl.java | 83 ++-- .../cache/persistence/tree/BPlusTree.java | 376 ++++++++++++++----- .../cache/tree/MvccVersionBasedSearchRow.java | 80 ++++ .../cache/mvcc/CacheMvccTransactionsTest.java | 79 ++++ .../processors/database/BPlusTreeSelfTest.java | 232 +++++++++++- 5 files changed, 718 insertions(+), 132 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/44ad7011/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index d8c5eaa..76d9649 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -56,6 +56,7 @@ import org.apache.ignite.internal.processors.cache.tree.CacheDataTree; import org.apache.ignite.internal.processors.cache.tree.DataRow; import org.apache.ignite.internal.processors.cache.tree.MvccDataRow; import org.apache.ignite.internal.processors.cache.tree.MvccSearchRow; +import org.apache.ignite.internal.processors.cache.tree.MvccVersionBasedSearchRow; import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree; import org.apache.ignite.internal.processors.cache.tree.PendingRow; import org.apache.ignite.internal.processors.cache.tree.SearchRow; @@ -1647,14 +1648,22 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager CacheDataRow row; if (grp.mvccEnabled()) { - // TODO IGNITE-3484: need special method. - GridCursor<CacheDataRow> cur = dataTree.find(new MvccSearchRow(cacheId, key, Long.MAX_VALUE, Long.MAX_VALUE), - new MvccSearchRow(cacheId, key, 1, 1)); + if (false) { + row = dataTree.findOneBounded( + new MvccSearchRow(cacheId, key, Long.MAX_VALUE, Long.MAX_VALUE), + new MvccSearchRow(cacheId, key, 1L, 1L), + null, + CacheDataRowAdapter.RowData.NO_KEY); + } + else { + GridCursor<CacheDataRow> cur = dataTree.find(new MvccSearchRow(cacheId, key, Long.MAX_VALUE, Long.MAX_VALUE), + new MvccSearchRow(cacheId, key, 1, 1)); - if (cur.next()) - row = cur.get(); - else - row = null; + if (cur.next()) + row = cur.get(); + else + row = null; + } } else row = dataTree.findOne(new SearchRow(cacheId, key), CacheDataRowAdapter.RowData.NO_KEY); @@ -1705,41 +1714,53 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; - // TODO IGNITE-3484: need special method. - GridCursor<CacheDataRow> cur = dataTree.find( - new MvccSearchRow(cacheId, key, ver.coordinatorVersion(), ver.counter()), - new MvccSearchRow(cacheId, key, 1, 1)); + if (false) { + MvccVersionBasedSearchRow lower = new MvccVersionBasedSearchRow(cacheId, key, ver); - CacheDataRow row = null; + CacheDataRow row = dataTree.findOneBounded( + lower, + new MvccSearchRow(cacheId, key, 1L, 1L), + lower, // Use the same instance as predicate to do not create extra object. + CacheDataRowAdapter.RowData.NO_KEY); - MvccLongList txs = ver.activeTransactions(); + afterRowFound(row, key); - while (cur.next()) { - CacheDataRow row0 = cur.get(); + return row; + } + else { + GridCursor<CacheDataRow> cur = dataTree.find( + new MvccSearchRow(cacheId, key, ver.coordinatorVersion(), ver.counter()), + new MvccSearchRow(cacheId, key, 1, 1)); - assert row0.mvccCoordinatorVersion() > 0 : row0; + CacheDataRow row = null; - boolean visible; + MvccLongList txs = ver.activeTransactions(); - if (txs != null) { - visible = row0.mvccCoordinatorVersion() != ver.coordinatorVersion() - || !txs.contains(row0.mvccCounter()); - } - else - visible = true; + while (cur.next()) { + CacheDataRow row0 = cur.get(); - if (visible) { - row = row0; + assert row0.mvccCoordinatorVersion() > 0 : row0; - break; - } - } + boolean visible; - assert row == null || key.equals(row.key()); + if (txs != null) { + visible = row0.mvccCoordinatorVersion() != ver.coordinatorVersion() + || !txs.contains(row0.mvccCounter()); + } + else + visible = true; - //afterRowFound(row, key); + if (visible) { + row = row0; - return row; + break; + } + } + + assert row == null || key.equals(row.key()); + + return row; + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/44ad7011/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java index c73b4c7..d570f1e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java @@ -2509,14 +2509,14 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements */ private final class GetCursor extends Get { /** */ - ForwardCursor cursor; + AbstractForwardCursor cursor; /** * @param lower Lower bound. * @param shift Shift. * @param cursor Cursor. */ - GetCursor(L lower, int shift, ForwardCursor cursor) { + GetCursor(L lower, int shift, AbstractForwardCursor cursor) { super(lower, false); assert shift != 0; // Either handle range of equal rows or find a greater row after concurrent merge. @@ -4384,52 +4384,85 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements */ protected abstract T getRow(BPlusIO<L> io, long pageAddr, int idx, Object x) throws IgniteCheckedException; + public interface RowPredicate<L, T extends L> { + public boolean apply(BPlusTree<L, T> tree, BPlusIO<L> io, long pageAddr, int idx) throws IgniteCheckedException; + } + + public T findOneBounded(L lower, L upper, RowPredicate<L, T> p, Object x) throws IgniteCheckedException { + checkDestroyed(); + + try { + FindOneCursor cursor = new FindOneCursor(lower, upper, p, x); + + return cursor.findOne(); + } + catch (IgniteCheckedException e) { + throw new IgniteCheckedException("Runtime failure on bounds: [lower=" + lower + ", upper=" + upper + "]", e); + } + catch (RuntimeException e) { + throw new IgniteException("Runtime failure on bounds: [lower=" + lower + ", upper=" + upper + "]", e); + } + catch (AssertionError e) { + throw new AssertionError("Assertion error on bounds: [lower=" + lower + ", upper=" + upper + "]", e); + } + finally { + checkDestroyed(); + } + } + /** - * Forward cursor. + * */ @SuppressWarnings("unchecked") - private final class ForwardCursor implements GridCursor<T> { + private abstract class AbstractForwardCursor { /** */ - private T[] rows = (T[])EMPTY; - - /** */ - private int row = -1; + long nextPageId; /** */ - private long nextPageId; - - /** */ - private L lowerBound; + L lowerBound; /** */ private int lowerShift = -1; // Initially it is -1 to handle multiple equal rows. /** */ - private final L upperBound; + final L upperBound; /** */ - private final Object x; + final Object x; /** * @param lowerBound Lower bound. * @param upperBound Upper bound. + * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row. */ - ForwardCursor(L lowerBound, L upperBound) { + AbstractForwardCursor(L lowerBound, L upperBound, Object x) { this.lowerBound = lowerBound; this.upperBound = upperBound; - this.x = null; + this.x = x; } /** - * @param lowerBound Lower bound. - * @param upperBound Upper bound. - * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row. + * */ - ForwardCursor(L lowerBound, L upperBound, Object x) { - this.lowerBound = lowerBound; - this.upperBound = upperBound; - this.x = x; - } + abstract void init0(); + + /** + * @param pageAddr + * @param io + * @param startIdx + * @param cnt + * @return + * @throws IgniteCheckedException If failed. + */ + abstract boolean fillFromBuffer0(long pageAddr, BPlusIO<L> io, int startIdx, int cnt) throws IgniteCheckedException; + + /** + * @return + * @throws IgniteCheckedException If failed. + */ + abstract boolean reinitialize0() throws IgniteCheckedException; + + abstract void onNotFound(boolean readDone); /** * @param pageAddr Page address. @@ -4437,9 +4470,10 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements * @param startIdx Start index. * @throws IgniteCheckedException If failed. */ - private void init(long pageAddr, BPlusIO<L> io, int startIdx) throws IgniteCheckedException { + final void init(long pageAddr, BPlusIO<L> io, int startIdx) throws IgniteCheckedException { nextPageId = 0; - row = -1; + + init0(); int cnt = io.getCount(pageAddr); @@ -4447,16 +4481,10 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements if (cnt == 0) { assert io.getForward(pageAddr) == 0L; - rows = null; - } - else if (!fillFromBuffer(pageAddr, io, startIdx, cnt)) { - if (rows != EMPTY) { - assert rows.length > 0; // Otherwise it makes no sense to create an array. - - // Fake clear. - rows[0] = null; - } + onNotFound(true); } + else if (!fillFromBuffer(pageAddr, io, startIdx, cnt)) + onNotFound(false); } /** @@ -4466,7 +4494,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements * @return Adjusted to lower bound start index. * @throws IgniteCheckedException If failed. */ - private int findLowerBound(long pageAddr, BPlusIO<L> io, int cnt) throws IgniteCheckedException { + final int findLowerBound(long pageAddr, BPlusIO<L> io, int cnt) throws IgniteCheckedException { assert io.isLeaf(); // Compare with the first row on the page. @@ -4491,7 +4519,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements * @return Corrected number of rows with respect to upper bound. * @throws IgniteCheckedException If failed. */ - private int findUpperBound(long pageAddr, BPlusIO<L> io, int low, int cnt) throws IgniteCheckedException { + final int findUpperBound(long pageAddr, BPlusIO<L> io, int low, int cnt) throws IgniteCheckedException { assert io.isLeaf(); // Compare with the last row on the page. @@ -4530,68 +4558,13 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements nextPageId = io.getForward(pageAddr); - if (lowerBound != null && startIdx == 0) - startIdx = findLowerBound(pageAddr, io, cnt); - - if (upperBound != null && cnt != startIdx) - cnt = findUpperBound(pageAddr, io, startIdx, cnt); - - cnt -= startIdx; - - if (cnt == 0) - return false; - - if (rows == EMPTY) - rows = (T[])new Object[cnt]; - - for (int i = 0; i < cnt; i++) { - T r = getRow(io, pageAddr, startIdx + i, x); - - rows = GridArrays.set(rows, i, r); - } - - GridArrays.clearTail(rows, cnt); - - return true; - } - - /** {@inheritDoc} */ - @SuppressWarnings("SimplifiableIfStatement") - @Override public boolean next() throws IgniteCheckedException { - if (rows == null) - return false; - - if (++row < rows.length && rows[row] != null) { - clearLastRow(); // Allow to GC the last returned row. - - return true; - } - - return nextPage(); - } - - /** - * @return Cleared last row. - */ - private T clearLastRow() { - if (row == 0) - return null; - - int last = row - 1; - - T r = rows[last]; - - assert r != null; - - rows[last] = null; - - return r; + return fillFromBuffer0(pageAddr, io, startIdx, cnt); } /** * @throws IgniteCheckedException If failed. */ - private void find() throws IgniteCheckedException { + final void find() throws IgniteCheckedException { assert lowerBound != null; doFind(new GetCursor(lowerBound, lowerShift, this)); @@ -4607,21 +4580,19 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements // to the previous lower bound. find(); - return next(); + return reinitialize0(); } /** * @return {@code true} If we have rows to return after reading the next page. * @throws IgniteCheckedException If failed. */ - private boolean nextPage() throws IgniteCheckedException { - updateLowerBound(clearLastRow()); - - row = 0; + final boolean nextPage(T lastRow) throws IgniteCheckedException { + updateLowerBound(lastRow); for (;;) { if (nextPageId == 0) { - rows = null; + onNotFound(true); return false; // Done. } @@ -4665,6 +4636,211 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements lowerBound = lower; // Move the lower bound forward for further concurrent merge retries. } } + } + + /** + * Forward cursor. + */ + @SuppressWarnings("unchecked") + private final class FindOneCursor extends AbstractForwardCursor { + /** */ + private Object resRow; + + /** */ + private T lastRow; + + /** */ + private final RowPredicate<L, T> p; + + /** + * @param lowerBound Lower bound. + * @param upperBound Upper bound. + * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row. + */ + FindOneCursor(L lowerBound, L upperBound, RowPredicate<L, T> p, Object x) { + super(lowerBound, upperBound, x); + + assert lowerBound != null; + assert upperBound != null; + + this.p = p; + } + + @Override void init0() { + // No-op. + } + + @Override boolean fillFromBuffer0(long pageAddr, BPlusIO<L> io, int startIdx, int cnt) throws IgniteCheckedException { + if (startIdx == 0) // TODO IGNITE-3478: startIdx == 0? can search twice for first item? + startIdx = findLowerBound(pageAddr, io, cnt); + + if (cnt == startIdx) + return false; + + for (int i = startIdx; i < cnt; i++) { + int cmp = compare(0, io, pageAddr, i, upperBound); + + if (cmp > 0) { + nextPageId = 0; // The End. + + return false; + } + + if (p == null || p.apply(BPlusTree.this, io, pageAddr, i)) { + resRow = getRow(io, pageAddr, i, x); + + return true; + } + } + + if (nextPageId != 0) + lastRow = getRow(io, pageAddr, cnt - 1, x); // Need save last row. + + return true; + } + + @Override boolean reinitialize0() throws IgniteCheckedException { + return true; + } + + @Override void onNotFound(boolean readDone) { + resRow = EMPTY; + } + + /** + * @throws IgniteCheckedException If failed. + * @return Found row. + */ + private T findOne() throws IgniteCheckedException { + find(); + + if (resRow != null) { + if (resRow == EMPTY) + return null; + + return (T)resRow; + } + + for (;;) { + T lastRow0 = lastRow; + + lastRow = null; + + nextPage(lastRow0); + + if (resRow != null) { + if (resRow == EMPTY) + return null; + + return (T)resRow; + } + } + } + } + + /** + * Forward cursor. + */ + @SuppressWarnings("unchecked") + private final class ForwardCursor extends AbstractForwardCursor implements GridCursor<T> { + /** */ + private T[] rows = (T[])EMPTY; + + /** */ + private int row = -1; + + /** + * @param lowerBound Lower bound. + * @param upperBound Upper bound. + * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row. + */ + ForwardCursor(L lowerBound, L upperBound, Object x) { + super(lowerBound, upperBound, x); + } + + @Override boolean fillFromBuffer0(long pageAddr, BPlusIO<L> io, int startIdx, int cnt) throws IgniteCheckedException { + if (lowerBound != null && startIdx == 0) + startIdx = findLowerBound(pageAddr, io, cnt); + + if (upperBound != null && cnt != startIdx) + cnt = findUpperBound(pageAddr, io, startIdx, cnt); + + cnt -= startIdx; + + if (cnt == 0) + return false; + + if (rows == EMPTY) + rows = (T[])new Object[cnt]; + + for (int i = 0; i < cnt; i++) { + T r = getRow(io, pageAddr, startIdx + i, x); + + rows = GridArrays.set(rows, i, r); + } + + GridArrays.clearTail(rows, cnt); + + return true; + } + + @Override boolean reinitialize0() throws IgniteCheckedException { + return next(); + } + + @Override void onNotFound(boolean readDone) { + if (readDone) + rows = null; + else { + if (rows != EMPTY) { + assert rows.length > 0; // Otherwise it makes no sense to create an array. + + // Fake clear. + rows[0] = null; + } + } + } + + @Override void init0() { + row = -1; + } + + /** {@inheritDoc} */ + @SuppressWarnings("SimplifiableIfStatement") + @Override public boolean next() throws IgniteCheckedException { + if (rows == null) + return false; + + if (++row < rows.length && rows[row] != null) { + clearLastRow(); // Allow to GC the last returned row. + + return true; + } + + T lastRow = clearLastRow(); + + row = 0; + + return nextPage(lastRow); + } + + /** + * @return Cleared last row. + */ + private T clearLastRow() { + if (row == 0) + return null; + + int last = row - 1; + + T r = rows[last]; + + assert r != null; + + rows[last] = null; + + return r; + } /** {@inheritDoc} */ @Override public T get() { http://git-wip-us.apache.org/repos/asf/ignite/blob/44ad7011/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java new file mode 100644 index 0000000..f708ffd --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.tree; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; +import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow; +import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * + */ +public class MvccVersionBasedSearchRow extends SearchRow implements BPlusTree.RowPredicate<CacheSearchRow, CacheDataRow> { + /** */ + private final MvccCoordinatorVersion ver; + + /** + * @param cacheId Cache ID. + * @param key Key. + * @param ver Mvcc version. + */ + public MvccVersionBasedSearchRow(int cacheId, KeyCacheObject key, MvccCoordinatorVersion ver) { + super(cacheId, key); + + assert ver != null; + + this.ver = ver; + } + + /** {@inheritDoc} */ + @Override public boolean apply(BPlusTree<CacheSearchRow, CacheDataRow> tree, + BPlusIO<CacheSearchRow> io, + long pageAddr, + int idx) throws IgniteCheckedException + { + if (ver.activeTransactions() == null) + return true; + + RowLinkIO rowIo = (RowLinkIO)io; + + if (rowIo.getMvccUpdateTopologyVersion(pageAddr, idx) != ver.coordinatorVersion()) + return true; + + return !ver.activeTransactions().contains(ver.counter()); // TODO IGNITE-3478 sort active transactions? + } + + /** {@inheritDoc} */ + @Override public long mvccCoordinatorVersion() { + return ver.coordinatorVersion(); + } + + /** {@inheritDoc} */ + @Override public long mvccCounter() { + return ver.counter(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MvccVersionBasedSearchRow.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/44ad7011/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java index f28fe2d..7936340 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java @@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteTransactions; @@ -47,16 +48,20 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.TestCacheNodeExcludingFilter; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.lang.GridInClosure3; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; @@ -2682,6 +2687,80 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } /** + * @throws IgniteCheckedException If failed. + */ + public void testInternalApi() throws Exception { + Ignite node = startGrid(0); + + IgniteCache cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 1)); + + GridCacheContext cctx = + ((IgniteKernal)node).context().cache().context().cacheContext(CU.cacheId(cache.getName())); + + CacheCoordinatorsProcessor crd = cctx.kernalContext().coordinators(); + + // Start query to prevent cleanup. + IgniteInternalFuture<MvccCoordinatorVersion> fut = crd.requestQueryCounter(crd.currentCoordinator()); + + fut.get(); + + final Integer key = 0; + + for (int i = 0; i < 10; i++) { + try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(key + 1, i); + + tx.commit(); + } + } + + for (int i = 0; i < 10; i++) { + try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(key, i); + + tx.commit(); + } + } + + KeyCacheObject key0 = cctx.toCacheKeyObject(key); + + List<T2<Object, MvccCounter>> vers = cctx.offheap().mvccAllVersions(cctx, key0); + + assertEquals(10, vers.size()); + + CacheDataRow row = cctx.offheap().read(cctx, key0); + + checkRow(cctx, row, key0, vers.get(0).get1()); + + for (T2<Object, MvccCounter> ver : vers) { + MvccCounter cntr = ver.get2(); + + MvccCoordinatorVersion readVer = + new MvccCoordinatorVersionResponse(cntr.coordinatorVersion(), cntr.counter(), 0); + + row = cctx.offheap().mvccRead(cctx, key0, readVer); + + checkRow(cctx, row, key0, ver.get1()); + } + + checkRow(cctx, + cctx.offheap().mvccRead(cctx, key0, new MvccCoordinatorVersionResponse(vers.get(0).get2().coordinatorVersion() + 1, 1, 0)), + key0, + vers.get(0).get1()); + + checkRow(cctx, + cctx.offheap().mvccRead(cctx, key0, new MvccCoordinatorVersionResponse(vers.get(0).get2().coordinatorVersion(), vers.get(0).get2().counter() + 1, 0)), + key0, + vers.get(0).get1()); + } + + private void checkRow(GridCacheContext cctx, CacheDataRow row, KeyCacheObject expKey, Object expVal) { + assertNotNull(row); + assertEquals(expKey, row.key()); + assertEquals(expVal, row.value().value(cctx.cacheObjectContext(), false)); + } + + /** * @return Cache configurations. */ private List<CacheConfiguration<Object, Object>> cacheConfigurations() { http://git-wip-us.apache.org/repos/asf/ignite/blob/44ad7011/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java index 9c0d791..e7ab34f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java @@ -25,6 +25,7 @@ import java.util.Random; import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -570,6 +571,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { assertNoLocks(); assertEquals(x, tree.findOne(x).longValue()); + assertEquals(x, tree.findOneBounded(x, x, null, null).longValue()); assertNoLocks(); @@ -584,12 +586,15 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { assertNull(tree.findOne(-1L)); - for (long x = 0; x < cnt; x++) + for (long x = 0; x < cnt; x++) { assertEquals(x, tree.findOne(x).longValue()); + assertEquals(x, tree.findOneBounded(x, x, null, null).longValue()); + } assertNoLocks(); assertNull(tree.findOne(cnt)); + assertNull(tree.findOneBounded(cnt, cnt, null, null)); for (long x = RMV_INC > 0 ? 0 : cnt - 1; x >= 0 && x < cnt; x += RMV_INC) { X.println(" -- " + x); @@ -603,6 +608,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { assertNoLocks(); assertNull(tree.findOne(x)); + assertNull(tree.findOneBounded(x, x, null, null)); assertNoLocks(); @@ -1242,6 +1248,200 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testFindOneBounded() throws Exception { + MAX_PER_PAGE = 5; + + TestTree tree = createTestTree(true); + + assertNull(tree.findOneBounded(0L, 100L, null, null)); + + for (long idx = 1L; idx <= 10L; ++idx) + tree.put(idx); + + for (long idx = 1L; idx <= 10L; ++idx) + assertEquals(idx, (Object)tree.findOneBounded(idx, 100L, null, null)); + + assertEquals(1L, (Object)tree.findOneBounded(0L, 100L, null, null)); + + for (long idx = 1L; idx <= 10L; ++idx) + assertEquals(10L, (Object)tree.findOneBounded(idx, 100L, new TestRowPredicate(10L), null)); + + assertNull(tree.findOneBounded(0L, 100L, new TestRowPredicate(100L), null)); + + for (long idx = 1L; idx <= 10L; ++idx) + assertEquals(idx, (Object)tree.findOneBounded(0L, 100L, new TestRowPredicate(idx), null)); + + for (long idx = 0L; idx <= 10L; ++idx) + assertNull(tree.findOneBounded(idx, 11L, new TestRowPredicate(-1L), null)); + } + + /** + * @throws Exception If failed. + */ + public void testFindOneBoundedConcurrentPutRemove() throws Exception { + findOneBoundedConcurrentPutRemove(); + } + + /** + * @throws Exception If failed. + */ + public void testFindOneBoundedConcurrentPutRemove_5() throws Exception { + MAX_PER_PAGE = 5; + + findOneBoundedConcurrentPutRemove(); + } + + /** + * @throws Exception If failed. + */ + public void testFindOneBoundedConcurrentPutRemove_10() throws Exception { + MAX_PER_PAGE = 10; + + findOneBoundedConcurrentPutRemove(); + } + + /** + * @throws Exception If failed. + */ + private void findOneBoundedConcurrentPutRemove() throws Exception { + final TestTree tree = createTestTree(true); + + final int KEYS = 10_000; + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < 10; i++) { + for (long idx = 0L; idx < KEYS; ++idx) + tree.put(idx); + + final Long findKey; + + if (MAX_PER_PAGE > 0) { + switch (i) { + case 0: + findKey = 1L; + + break; + + case 1: + findKey = (long)MAX_PER_PAGE; + + break; + + case 2: + findKey = (long)MAX_PER_PAGE - 1; + + break; + + case 3: + findKey = (long)MAX_PER_PAGE + 1; + + break; + + case 4: + findKey = (long)(KEYS / MAX_PER_PAGE / 2) * MAX_PER_PAGE; + + break; + + case 5: + findKey = (long)(KEYS / MAX_PER_PAGE / 2) * MAX_PER_PAGE - 1; + + break; + + case 6: + findKey = (long)(KEYS / MAX_PER_PAGE / 2) * MAX_PER_PAGE + 1; + + break; + + case 7: + findKey = (long)KEYS - 1; + + break; + + default: + findKey = rnd.nextLong(KEYS); + } + } + else + findKey = rnd.nextLong(KEYS); + + info("Iteration [iter=" + i + ", key=" + findKey + ']'); + + assertEquals(findKey, tree.findOne(findKey)); + assertEquals(findKey, tree.findOneBounded(findKey, findKey, null, null)); + + IgniteInternalFuture getFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + TestRowPredicate p = new TestRowPredicate(findKey); + + TestRowPredicate falseP = new TestRowPredicate(-1L); + + int cnt = 0; + + while (!stop.get()) { + int shift = MAX_PER_PAGE > 0 ? rnd.nextInt(MAX_PER_PAGE * 2) : rnd.nextInt(100); + + assertEquals(findKey, tree.findOneBounded(findKey, findKey, null, null)); + + assertEquals(findKey, + tree.findOneBounded(findKey - shift, findKey, p, null)); + + assertEquals(findKey, + tree.findOneBounded(findKey - shift, findKey + shift, p, null)); + + assertEquals(findKey, + tree.findOneBounded(findKey, findKey + shift, p, null)); + + assertNull(tree.findOneBounded(-100L, KEYS + 100L, falseP, null)); + + cnt++; + } + + info("Done, read count: " + cnt); + + return null; + } + }, 10, "find"); + + asyncRunFut = new GridCompoundFuture<>(); + + asyncRunFut.add(getFut); + + asyncRunFut.markInitialized(); + + try { + U.sleep(100); + + for (int j = 0; j < 20; j++) { + for (long idx = 0L; idx < KEYS / 2; ++idx) { + long toRmv = rnd.nextLong(KEYS); + + if (toRmv != findKey) + tree.remove(toRmv); + } + + for (long idx = 0L; idx < KEYS / 2; ++idx) { + long put = rnd.nextLong(KEYS); + + tree.put(put); + } + } + } + finally { + stop.set(true); + } + + asyncRunFut.get(); + + stop.set(false); + } + } + + /** * */ public void testConcurrentGrowDegenerateTreeAndConcurrentRemove() throws Exception { @@ -1449,6 +1649,13 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { last = c.get(); } + + last = tree.findOneBounded((long)low, (long)high, null, null); + + if (last != null) { + assertTrue(low + " <= " + last + " <= " + high, last >= low); + assertTrue(low + " <= " + last + " <= " + high, last <= high); + } } return null; @@ -1853,4 +2060,27 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { return PageUtils.getLong(pageAddr, offset(idx)); } } + + /** + * + */ + static class TestRowPredicate implements TestTree.RowPredicate<Long, Long> { + /** */ + private final Long expVal; + + /** + * @param expVal Expected value. + */ + TestRowPredicate(Long expVal) { + this.expVal = expVal; + } + + /** {@inheritDoc} */ + @Override public boolean apply(BPlusTree<Long, Long> tree, BPlusIO<Long> io, long pageAddr, int idx) + throws IgniteCheckedException { + Long row = io.getLookupRow(tree, pageAddr, idx); + + return row.equals(expVal); + } + } }
