ignite-gg-11414 Add iterator wrapper for cursor
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c1c15a6d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c1c15a6d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c1c15a6d Branch: refs/heads/ignite-3477 Commit: c1c15a6db0bd84de49c0f828cdeab1aed4bcd863 Parents: 8852c80 Author: Dmitriy Govorukhin <[email protected]> Authored: Tue Jan 3 19:59:44 2017 +0300 Committer: Dmitriy Govorukhin <[email protected]> Committed: Tue Jan 3 19:59:44 2017 +0300 ---------------------------------------------------------------------- .../util/GridCursorIteratorWrapper.java | 8 + .../ignite/internal/util/lang/GridCursor.java | 1 - .../query/h2/database/H2PkHashIndex.java | 30 ++-- .../query/h2/database/H2TreeIndex.java | 5 +- .../query/h2/opt/GridH2IndexBase.java | 163 ++++++++++++++----- .../query/h2/opt/GridH2TreeIndex.java | 12 +- .../IgniteDistributedJoinTestSuite.java | 38 +++++ 7 files changed, 193 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c1c15a6d/modules/core/src/main/java/org/apache/ignite/internal/util/GridCursorIteratorWrapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridCursorIteratorWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridCursorIteratorWrapper.java index bd30ace..927e365 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridCursorIteratorWrapper.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridCursorIteratorWrapper.java @@ -9,17 +9,25 @@ import java.util.*; * Wrap {@code Iterator} and adapt it to {@code GridCursor}. */ public class GridCursorIteratorWrapper<V> implements GridCursor<V> { + /** Iterator. */ private Iterator<V> iter; + + /** Next. */ private V next; + /** + * @param iter Iterator. + */ public GridCursorIteratorWrapper(Iterator<V> iter) { this.iter = iter; } + /** {@inheritDoc} */ @Override public V get() throws IgniteCheckedException { return next; } + /** {@inheritDoc} */ @Override public boolean next() throws IgniteCheckedException { next = iter.hasNext() ? iter.next() : null; http://git-wip-us.apache.org/repos/asf/ignite/blob/c1c15a6d/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridCursor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridCursor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridCursor.java index da85f99..37d3a48 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridCursor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridCursor.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.util.lang; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.cache.database.*; /** * Simple cursor abstraction. Initial state must be "before first". http://git-wip-us.apache.org/repos/asf/ignite/blob/c1c15a6d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java index b0647b6..eb34be6 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java @@ -101,9 +101,8 @@ public class H2PkHashIndex extends GridH2IndexBase { try { List<GridCursor<? extends CacheDataRow>> cursors = new ArrayList<>(); - for (IgniteCacheOffheapManager.CacheDataStore store : cctx.offheap().cacheDataStores()) { + for (IgniteCacheOffheapManager.CacheDataStore store : cctx.offheap().cacheDataStores()) cursors.add(store.cursor(lowerObj, upperObj)); - } return new H2Cursor(new CompositeGridCursor<>(cursors.iterator()), p); } @@ -123,9 +122,8 @@ public class H2PkHashIndex extends GridH2IndexBase { for (IgniteCacheOffheapManager.CacheDataStore store : cctx.offheap().cacheDataStores()) { CacheDataRow found = store.find(row.key); - if (found != null) { + if (found != null) tbl.rowDescriptor().createRow(row.key(), row.partition(), row.value(), row.version(), 0); - } } return null; @@ -191,7 +189,7 @@ public class H2PkHashIndex extends GridH2IndexBase { } /** {@inheritDoc} */ - @Override public Cursor findFirstOrLast(Session session, boolean b) { + @Override public Cursor findFirstOrLast(Session ses, boolean b) { throw new UnsupportedOperationException(); } @@ -283,30 +281,30 @@ public class H2PkHashIndex extends GridH2IndexBase { */ private static class CompositeGridCursor<T> implements GridCursor<T> { /** */ - private final Iterator<GridCursor<? extends T>> iterator; + private final Iterator<GridCursor<? extends T>> iter; /** */ - private GridCursor<? extends T> current; + private GridCursor<? extends T> curr; /** * */ - public CompositeGridCursor(Iterator<GridCursor<? extends T>> iterator) { - this.iterator = iterator; + public CompositeGridCursor(Iterator<GridCursor<? extends T>> iter) { + this.iter = iter; - if (iterator.hasNext()) - current = iterator.next(); + if (iter.hasNext()) + curr = iter.next(); } /** {@inheritDoc} */ @Override public boolean next() throws IgniteCheckedException { - if (current.next()) + if (curr.next()) return true; - while (iterator.hasNext()) { - current = iterator.next(); + while (iter.hasNext()) { + curr = iter.next(); - if (current.next()) + if (curr.next()) return true; } @@ -315,7 +313,7 @@ public class H2PkHashIndex extends GridH2IndexBase { /** {@inheritDoc} */ @Override public T get() throws IgniteCheckedException { - return current.get(); + return curr.get(); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/c1c15a6d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java index c1eb986..31df27c 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java @@ -219,11 +219,14 @@ public class H2TreeIndex extends GridH2IndexBase { return tree; } + /** {@inheritDoc} */ protected IgniteTree<SearchRow, GridH2Row> treeForRead() { return tree; } - protected GridCursor<GridH2Row> doFind0(IgniteTree t, + /** {@inheritDoc} */ + protected GridCursor<GridH2Row> doFind0( + IgniteTree t, @Nullable SearchRow first, boolean includeFirst, @Nullable SearchRow last, http://git-wip-us.apache.org/repos/asf/ignite/blob/c1c15a6d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java index bd569df..c5e836a 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.query.h2.opt; import java.util.*; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -487,6 +486,8 @@ public abstract class GridH2IndexBase extends BaseIndex { maxRows -= range.rows().size(); } + assert !ranges.isEmpty(); + if (src.hasMoreRows()) { // Save source for future fetches. if (msg.bounds() != null) @@ -497,8 +498,6 @@ public abstract class GridH2IndexBase extends BaseIndex { qctx.putSource(node.id(), msg.batchLookupId(), null); } - assert !ranges.isEmpty(); - res.ranges(ranges); res.status(STATUS_OK); } @@ -1359,14 +1358,14 @@ public abstract class GridH2IndexBase extends BaseIndex { int curRangeId = -1; /** */ - GridCursor<GridH2Row> curRange = EMPTY_CURSOR; - - /** */ final IgniteTree tree; /** */ final IndexingQueryFilter filter; + /** Iterator. */ + Iterator<GridH2Row> iter = emptyIterator(); + /** * @param bounds Bounds. * @param tree Snapshot. @@ -1386,7 +1385,7 @@ public abstract class GridH2IndexBase extends BaseIndex { * @return {@code true} If there are more rows in this source. */ public boolean hasMoreRows() throws IgniteCheckedException { - return boundsIter.hasNext() || curRange.next(); + return boundsIter.hasNext() || iter.hasNext(); } /** @@ -1396,60 +1395,56 @@ public abstract class GridH2IndexBase extends BaseIndex { public GridH2RowRange next(int maxRows) { assert maxRows > 0 : maxRows; - try { - for (;;) { - if (curRange.next()) { - // Here we are getting last rows from previously partially fetched range. - List<GridH2RowMessage> rows = new ArrayList<>(); + for (; ; ) { + if (iter.hasNext()) { + // Here we are getting last rows from previously partially fetched range. + List<GridH2RowMessage> rows = new ArrayList<>(); - GridH2RowRange nextRange = new GridH2RowRange(); + GridH2RowRange nextRange = new GridH2RowRange(); - nextRange.rangeId(curRangeId); - nextRange.rows(rows); + nextRange.rangeId(curRangeId); + nextRange.rows(rows); - do { - rows.add(toRowMessage(curRange.get())); - } - while (rows.size() < maxRows && curRange.next()); + do { + rows.add(toRowMessage(iter.next())); + } + while (rows.size() < maxRows && iter.hasNext()); - if (curRange.next()) - nextRange.setPartial(); - else - curRange = EMPTY_CURSOR; + if (iter.hasNext()) + nextRange.setPartial(); + else + iter = emptyIterator(); - return nextRange; - } + return nextRange; + } - curRange = EMPTY_CURSOR; + iter = emptyIterator(); - if (!boundsIter.hasNext()) { - boundsIter = emptyIterator(); + if (!boundsIter.hasNext()) { + boundsIter = emptyIterator(); - return null; - } + return null; + } - GridH2RowRangeBounds bounds = boundsIter.next(); + GridH2RowRangeBounds bounds = boundsIter.next(); - curRangeId = bounds.rangeId(); + curRangeId = bounds.rangeId(); - SearchRow first = toSearchRow(bounds.first()); - SearchRow last = toSearchRow(bounds.last()); + SearchRow first = toSearchRow(bounds.first()); + SearchRow last = toSearchRow(bounds.last()); - IgniteTree t = tree != null ? tree : treeForRead(); + IgniteTree t = tree != null ? tree : treeForRead(); - curRange = doFind0(t, first, true, last, filter); + iter = new CursorIteratorWrapper(doFind0(t, first, true, last, filter)); - if (!curRange.next()) { - // We have to return empty range here. - GridH2RowRange emptyRange = new GridH2RowRange(); + if (!iter.hasNext()) { + // We have to return empty range here. + GridH2RowRange emptyRange = new GridH2RowRange(); - emptyRange.rangeId(curRangeId); + emptyRange.rangeId(curRangeId); - return emptyRange; - } + return emptyRange; } - } catch (IgniteCheckedException e) { - throw DbException.convert(e); } } } @@ -1469,7 +1464,8 @@ public abstract class GridH2IndexBase extends BaseIndex { * @param filter Filter. * @return Iterator over rows in given range. */ - protected GridCursor<GridH2Row> doFind0(IgniteTree t, + protected GridCursor<GridH2Row> doFind0( + IgniteTree t, @Nullable SearchRow first, boolean includeFirst, @Nullable SearchRow last, @@ -1492,6 +1488,7 @@ public abstract class GridH2IndexBase extends BaseIndex { /** Is value required for filtering predicate? */ private final boolean isValRequired; + /** */ private GridH2Row next; /** @@ -1540,6 +1537,7 @@ public abstract class GridH2IndexBase extends BaseIndex { return fltr.apply(key, val); } + /** {@inheritDoc} */ @Override public boolean next() throws IgniteCheckedException { next = null; @@ -1555,6 +1553,7 @@ public abstract class GridH2IndexBase extends BaseIndex { return false; } + /** {@inheritDoc} */ @Override public GridH2Row get() throws IgniteCheckedException { if (next == null) throw new NoSuchElementException(); @@ -1563,6 +1562,80 @@ public abstract class GridH2IndexBase extends BaseIndex { } } + /** + * + */ + private static final class CursorIteratorWrapper implements Iterator<GridH2Row> { + /** */ + private final GridCursor<GridH2Row> cursor; + + /** First next. */ + private GridH2Row firstNext; + + /** Second next. */ + private GridH2Row secondNext; + + /** + * @param cursor Cursor. + */ + private CursorIteratorWrapper(GridCursor<GridH2Row> cursor) { + this.cursor = cursor; + + fetch(); + } + + /** {@inheritDoc} */ + @Override public boolean hasNext() { + return firstNext != null; + } + + /** {@inheritDoc} */ + @Override public GridH2Row next() { + try { + if (firstNext != null) { + GridH2Row res = firstNext; + + firstNext = secondNext; + + if (cursor.next()) + secondNext = cursor.get(); + else + secondNext = null; + return res; + } + else + return null; + } + catch (Exception e) { + return null; + } + } + + /** + * + */ + private void fetch() { + try { + if (firstNext == null && secondNext == null) { + if (cursor.next()) { + firstNext = cursor.get(); + + if (cursor.next()) + secondNext = cursor.get(); + } + } + } + catch (IgniteCheckedException ignored) { + + } + } + + /** {@inheritDoc} */ + @Override public void remove() { + throw new UnsupportedOperationException("operation is not supported"); + } + } + /** Empty cursor. */ protected static final GridCursor<GridH2Row> EMPTY_CURSOR = new GridCursor<GridH2Row>() { /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/c1c15a6d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java index 002b432..014cf2e 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java @@ -503,20 +503,27 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS * Adapter from {@link NavigableMap} to {@link IgniteTree}. */ private final class IgniteNavigableMapTree implements IgniteTree<GridSearchRowPointer, GridH2Row> { - private NavigableMap<GridSearchRowPointer, GridH2Row> tree; + /** Tree. */ + private final NavigableMap<GridSearchRowPointer, GridH2Row> tree; + /** + * @param tree Tree. + */ public IgniteNavigableMapTree(NavigableMap<GridSearchRowPointer, GridH2Row> tree) { this.tree = tree; } + /** {@inheritDoc} */ @Override public GridH2Row put(GridH2Row value) throws IgniteCheckedException { return tree.put(value, value); } + /** {@inheritDoc} */ @Override public GridH2Row findOne(GridSearchRowPointer key) throws IgniteCheckedException { return tree.get(key); } + /** {@inheritDoc} */ @Override public GridCursor<GridH2Row> find(GridSearchRowPointer lower, GridSearchRowPointer upper) throws IgniteCheckedException { if (lower == null || upper == null) @@ -528,14 +535,17 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS return new GridCursorIteratorWrapper<GridH2Row>(subMap.values().iterator()); } + /** {@inheritDoc} */ @Override public GridH2Row remove(GridSearchRowPointer key) throws IgniteCheckedException { return tree.remove(key); } + /** {@inheritDoc} */ @Override public long size() throws IgniteCheckedException { return tree.size(); } + /** {@inheritDoc} */ @Override public IgniteNavigableMapTree clone() { AbstractMap copy; http://git-wip-us.apache.org/repos/asf/ignite/blob/c1c15a6d/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteDistributedJoinTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteDistributedJoinTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteDistributedJoinTestSuite.java new file mode 100644 index 0000000..dca640f --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteDistributedJoinTestSuite.java @@ -0,0 +1,38 @@ +package org.apache.ignite.testsuites; + +import junit.framework.TestSuite; +import org.apache.ignite.internal.processors.cache.IgniteCacheDistributedJoinCollocatedAndNotTest; +import org.apache.ignite.internal.processors.cache.IgniteCacheDistributedJoinCustomAffinityMapper; +import org.apache.ignite.internal.processors.cache.IgniteCacheDistributedJoinNoIndexTest; +import org.apache.ignite.internal.processors.cache.IgniteCacheDistributedJoinPartitionedAndReplicatedTest; +import org.apache.ignite.internal.processors.cache.IgniteCacheDistributedJoinQueryConditionsTest; +import org.apache.ignite.internal.processors.cache.IgniteCacheDistributedJoinTest; +import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartDistributedJoinSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest; +import org.apache.ignite.internal.processors.query.IgniteSqlDistributedJoinSelfTest; +import org.apache.ignite.internal.processors.query.h2.sql.H2CompareBigQueryDistributedJoinsTest; + +/** + * + */ +public class IgniteDistributedJoinTestSuite extends TestSuite { + /** + * + */ + public static TestSuite suite() { + TestSuite suite = new TestSuite("Distributed Joins Test Suite."); + + suite.addTestSuite(H2CompareBigQueryDistributedJoinsTest.class); + suite.addTestSuite(IgniteCacheDistributedJoinCollocatedAndNotTest.class); + suite.addTestSuite(IgniteCacheDistributedJoinCustomAffinityMapper.class); + suite.addTestSuite(IgniteCacheDistributedJoinNoIndexTest.class); + suite.addTestSuite(IgniteCacheDistributedJoinPartitionedAndReplicatedTest.class); + suite.addTestSuite(IgniteCacheDistributedJoinQueryConditionsTest.class); + suite.addTestSuite(IgniteCacheDistributedJoinTest.class); + suite.addTestSuite(IgniteCacheQueryNodeRestartDistributedJoinSelfTest.class); + suite.addTestSuite(IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.class); + suite.addTestSuite(IgniteSqlDistributedJoinSelfTest.class); + + return suite; + } +}
