IGNITE-4993 - Fixing distributed joins on segmented index.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/800b8bd9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/800b8bd9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/800b8bd9 Branch: refs/heads/ignite-1561 Commit: 800b8bd90033ab64f4299ba242cc89b1f4c98417 Parents: 2ded758 Author: Alexey Goncharuk <alexey.goncha...@gmail.com> Authored: Wed Apr 19 13:55:02 2017 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Wed Apr 19 13:55:35 2017 +0300 ---------------------------------------------------------------------- .../query/h2/opt/GridH2IndexBase.java | 31 +++++++++++--------- .../query/IgniteSqlSegmentedIndexSelfTest.java | 2 +- 2 files changed, 18 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/800b8bd9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java index 7163834..0eac559 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java @@ -413,9 +413,7 @@ public abstract class GridH2IndexBase extends BaseIndex { GridCacheContext<?, ?> cctx = getTable().rowDescriptor().context(); - boolean isLocal = qctx.distributedJoinMode() == LOCAL_ONLY; - - return new DistributedLookupBatch(cctx, ucast, affColId, isLocal); + return new DistributedLookupBatch(cctx, ucast, affColId); } /** @@ -1086,9 +1084,6 @@ public abstract class GridH2IndexBase extends BaseIndex { final int affColId; /** */ - private final boolean localQuery; - - /** */ GridH2QueryContext qctx; /** */ @@ -1113,13 +1108,11 @@ public abstract class GridH2IndexBase extends BaseIndex { * @param cctx Cache Cache context. * @param ucast Unicast or broadcast query. * @param affColId Affinity column ID. - * @param localQuery Local query flag. */ - DistributedLookupBatch(GridCacheContext<?, ?> cctx, boolean ucast, int affColId, boolean localQuery) { + DistributedLookupBatch(GridCacheContext<?, ?> cctx, boolean ucast, int affColId) { this.cctx = cctx; this.ucast = ucast; this.affColId = affColId; - this.localQuery = localQuery; } /** @@ -1191,25 +1184,26 @@ public abstract class GridH2IndexBase extends BaseIndex { Object affKey = affColId == -1 ? null : getAffinityKey(firstRow, lastRow); + boolean locQry = localQuery(); + List<SegmentKey> segmentKeys; - Future<Cursor> fut; if (affKey != null) { // Affinity key is provided. if (affKey == EXPLICIT_NULL) // Affinity key is explicit null, we will not find anything. return false; - segmentKeys = F.asList(rangeSegment(cctx, qctx, affKey, localQuery)); + segmentKeys = F.asList(rangeSegment(cctx, qctx, affKey, locQry)); } else { // Affinity key is not provided or is not the same in upper and lower bounds, we have to broadcast. if (broadcastSegments == null) - broadcastSegments = broadcastSegments(qctx, cctx, localQuery); + broadcastSegments = broadcastSegments(qctx, cctx, locQry); segmentKeys = broadcastSegments; } - if (localQuery && segmentKeys.isEmpty()) + if (locQry && segmentKeys.isEmpty()) return false; // Nothing to do assert !F.isEmpty(segmentKeys) : segmentKeys; @@ -1250,7 +1244,7 @@ public abstract class GridH2IndexBase extends BaseIndex { batchFull = true; } - fut = new DoneFuture<>(segmentKeys.size() == 1 ? + Future<Cursor> fut = new DoneFuture<>(segmentKeys.size() == 1 ? new UnicastCursor(rangeId, segmentKeys, rangeStreams) : new BroadcastCursor(rangeId, segmentKeys, rangeStreams)); @@ -1265,6 +1259,15 @@ public abstract class GridH2IndexBase extends BaseIndex { } /** + * @return {@code True} if local query execution is enforced. + */ + private boolean localQuery() { + assert qctx != null : "Missing query context: " + this; + + return qctx.distributedJoinMode() == LOCAL_ONLY; + } + + /** * */ private void startStreams() { http://git-wip-us.apache.org/repos/asf/ignite/blob/800b8bd9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java index 1715a56..586b81e 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java @@ -90,7 +90,7 @@ public class IgniteSqlSegmentedIndexSelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { - startGridsMultiThreaded(nodesCount(), false); + startGrids(nodesCount()); } /** {@inheritDoc} */