IGNITE-4993 - Fixing distributed joins on segmented index.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/800b8bd9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/800b8bd9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/800b8bd9

Branch: refs/heads/ignite-2893
Commit: 800b8bd90033ab64f4299ba242cc89b1f4c98417
Parents: 2ded758
Author: Alexey Goncharuk <alexey.goncha...@gmail.com>
Authored: Wed Apr 19 13:55:02 2017 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Wed Apr 19 13:55:35 2017 +0300

----------------------------------------------------------------------
 .../query/h2/opt/GridH2IndexBase.java           | 31 +++++++++++---------
 .../query/IgniteSqlSegmentedIndexSelfTest.java  |  2 +-
 2 files changed, 18 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/800b8bd9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index 7163834..0eac559 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -413,9 +413,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
 
         GridCacheContext<?, ?> cctx = getTable().rowDescriptor().context();
 
-        boolean isLocal = qctx.distributedJoinMode() == LOCAL_ONLY;
-
-        return new DistributedLookupBatch(cctx, ucast, affColId, isLocal);
+        return new DistributedLookupBatch(cctx, ucast, affColId);
     }
 
     /**
@@ -1086,9 +1084,6 @@ public abstract class GridH2IndexBase extends BaseIndex {
         final int affColId;
 
         /** */
-        private final boolean localQuery;
-
-        /** */
         GridH2QueryContext qctx;
 
         /** */
@@ -1113,13 +1108,11 @@ public abstract class GridH2IndexBase extends BaseIndex 
{
          * @param cctx Cache Cache context.
          * @param ucast Unicast or broadcast query.
          * @param affColId Affinity column ID.
-         * @param localQuery Local query flag.
          */
-        DistributedLookupBatch(GridCacheContext<?, ?> cctx, boolean ucast, int 
affColId, boolean localQuery) {
+        DistributedLookupBatch(GridCacheContext<?, ?> cctx, boolean ucast, int 
affColId) {
             this.cctx = cctx;
             this.ucast = ucast;
             this.affColId = affColId;
-            this.localQuery = localQuery;
         }
 
         /**
@@ -1191,25 +1184,26 @@ public abstract class GridH2IndexBase extends BaseIndex 
{
 
             Object affKey = affColId == -1 ? null : getAffinityKey(firstRow, 
lastRow);
 
+            boolean locQry = localQuery();
+
             List<SegmentKey> segmentKeys;
-            Future<Cursor> fut;
 
             if (affKey != null) {
                 // Affinity key is provided.
                 if (affKey == EXPLICIT_NULL) // Affinity key is explicit null, 
we will not find anything.
                     return false;
 
-                segmentKeys = F.asList(rangeSegment(cctx, qctx, affKey, 
localQuery));
+                segmentKeys = F.asList(rangeSegment(cctx, qctx, affKey, 
locQry));
             }
             else {
                 // Affinity key is not provided or is not the same in upper 
and lower bounds, we have to broadcast.
                 if (broadcastSegments == null)
-                    broadcastSegments = broadcastSegments(qctx, cctx, 
localQuery);
+                    broadcastSegments = broadcastSegments(qctx, cctx, locQry);
 
                 segmentKeys = broadcastSegments;
             }
 
-            if (localQuery && segmentKeys.isEmpty())
+            if (locQry && segmentKeys.isEmpty())
                 return false; // Nothing to do
 
             assert !F.isEmpty(segmentKeys) : segmentKeys;
@@ -1250,7 +1244,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
                     batchFull = true;
             }
 
-            fut = new DoneFuture<>(segmentKeys.size() == 1 ?
+            Future<Cursor> fut = new DoneFuture<>(segmentKeys.size() == 1 ?
                 new UnicastCursor(rangeId, segmentKeys, rangeStreams) :
                 new BroadcastCursor(rangeId, segmentKeys, rangeStreams));
 
@@ -1265,6 +1259,15 @@ public abstract class GridH2IndexBase extends BaseIndex {
         }
 
         /**
+         * @return {@code True} if local query execution is enforced.
+         */
+        private boolean localQuery() {
+            assert qctx != null : "Missing query context: " + this;
+
+            return qctx.distributedJoinMode() == LOCAL_ONLY;
+        }
+
+        /**
          *
          */
         private void startStreams() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/800b8bd9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
index 1715a56..586b81e 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
@@ -90,7 +90,7 @@ public class IgniteSqlSegmentedIndexSelfTest extends 
GridCommonAbstractTest {
 
     /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
-        startGridsMultiThreaded(nodesCount(), false);
+        startGrids(nodesCount());
     }
 
     /** {@inheritDoc} */

Reply via email to