This is an automated email from the ASF dual-hosted git repository.

vozerov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 62e9bc8  IGNITE-11310: SQL: Removed distirbuted joins from local 
segmented queries. This closes #6103.
62e9bc8 is described below

commit 62e9bc8761057fed8ba0bdbd479a1a29ddf73d4c
Author: devozerov <[email protected]>
AuthorDate: Thu Feb 14 20:53:57 2019 +0300

    IGNITE-11310: SQL: Removed distirbuted joins from local segmented queries. 
This closes #6103.
---
 .../query/h2/H2IndexingAbstractGeoSelfTest.java    |  39 --------
 .../cache/query/GridCacheTwoStepQuery.java         |  22 ++++-
 .../processors/query/h2/IgniteH2Indexing.java      |   6 +-
 .../internal/processors/query/h2/QueryParser.java  |  97 ++++++++-----------
 .../query/h2/QueryParserResultSelect.java          |  27 +++++-
 .../processors/query/h2/dml/UpdatePlanBuilder.java |   7 +-
 .../query/h2/opt/join/DistributedJoinContext.java  |  22 ++---
 .../query/h2/opt/join/DistributedLookupBatch.java  |  88 +++++------------
 .../query/h2/sql/GridSqlQuerySplitter.java         |  33 +++++--
 .../query/h2/twostep/GridMapQueryExecutor.java     |  28 ++----
 .../query/h2/twostep/GridReduceQueryExecutor.java  |  15 ++-
 .../query/h2/twostep/msg/GridH2QueryRequest.java   |  11 ++-
 .../query/IgniteQueryDedicatedPoolTest.java        |   1 -
 .../query/IgniteSqlSegmentedIndexSelfTest.java     | 107 +++++++++------------
 14 files changed, 213 insertions(+), 290 deletions(-)

diff --git 
a/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/H2IndexingAbstractGeoSelfTest.java
 
b/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/H2IndexingAbstractGeoSelfTest.java
index 423f714..ddf1344 100644
--- 
a/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/H2IndexingAbstractGeoSelfTest.java
+++ 
b/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/H2IndexingAbstractGeoSelfTest.java
@@ -562,8 +562,6 @@ public abstract class H2IndexingAbstractGeoSelfTest extends 
GridCacheAbstractSel
             }
 
             checkDistributedQuery();
-
-            checkLocalQuery();
         }
         finally {
             destroy(c1, grid(0), dynamic);
@@ -604,43 +602,6 @@ public abstract class H2IndexingAbstractGeoSelfTest 
extends GridCacheAbstractSel
     }
 
     /**
-     * Check local query.
-     *
-     * @throws ParseException If failed.
-     */
-    private void checkLocalQuery() throws ParseException {
-        IgniteCache<Integer, Enemy> c1 = grid(0).cache("enemy");
-        IgniteCache<Integer, EnemyCamp> c2 = grid(0).cache("camp");
-
-        final Geometry lethalArea = new WKTReader().read("POLYGON((30 30, 30 
70, 70 70, 70 30, 30 30))");
-
-        Set<Integer> localCampsIDs = new HashSet<>();
-
-        for(Cache.Entry<Integer, EnemyCamp> e : c2.localEntries())
-            localCampsIDs.add(e.getKey());
-
-        int expectedEnemies = 0;
-
-        for (Cache.Entry<Integer, Enemy> e : c1.localEntries()) {
-            final Integer campID = e.getValue().campId;
-
-            if (localCampsIDs.contains(campID)) {
-                final EnemyCamp camp = c2.get(campID);
-
-                if (lethalArea.covers(camp.coords))
-                    expectedEnemies++;
-            }
-        }
-
-        final SqlFieldsQuery query = new SqlFieldsQuery("select e._val, c._val 
from \"enemy\".Enemy e, " +
-            "\"camp\".EnemyCamp c where e.campId = c._key and c.coords && 
?").setArgs(lethalArea);
-
-        List<List<?>> result = c1.query(query.setLocal(true)).getAll();
-
-        assertEquals(expectedEnemies, result.size());
-    }
-
-    /**
      *
      */
     private static class Enemy {
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
index aad9cdd..0fcddeb 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -56,7 +56,7 @@ public class GridCacheTwoStepQuery {
     private final List<Integer> cacheIds;
 
     /** */
-    private final boolean local;
+    private final boolean locSplit;
 
     /** */
     private final PartitionResult derivedPartitions;
@@ -87,7 +87,7 @@ public class GridCacheTwoStepQuery {
         PartitionResult derivedPartitions,
         List<Integer> cacheIds,
         boolean mvccEnabled,
-        boolean local
+        boolean locSplit
     ) {
         this.originalSql = originalSql;
         this.paramsCnt = paramsCnt;
@@ -101,7 +101,7 @@ public class GridCacheTwoStepQuery {
         this.derivedPartitions = derivedPartitions;
         this.cacheIds = cacheIds;
         this.mvccEnabled = mvccEnabled;
-        this.local = local;
+        this.locSplit = locSplit;
     }
 
     /**
@@ -163,6 +163,13 @@ public class GridCacheTwoStepQuery {
     }
 
     /**
+     * @return Whether cache IDs exist.
+     */
+    public boolean hasCacheIds() {
+        return !F.isEmpty(cacheIds);
+    }
+
+    /**
      * @return Original query SQL.
      */
     public String originalSql() {
@@ -173,7 +180,14 @@ public class GridCacheTwoStepQuery {
      * @return {@code True} If query is local.
      */
     public boolean isLocal() {
-        return local;
+        return F.isEmpty(cacheIds) || locSplit;
+    }
+
+    /**
+     * @return {@code True} if this is local query with split.
+     */
+    public boolean isLocalSplit() {
+        return locSplit;
     }
 
     /**
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 621d6e8..0b44cfc 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1550,14 +1550,14 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         // Execute SQL.
         assert select != null;
 
-        if (!select.isLocal()) {
+        if (select.splitNeeded()) {
             // Distributed query.
             GridCacheTwoStepQuery twoStepQry = select.twoStepQuery();
 
             if (ctx.security().enabled())
                 checkSecurity(twoStepQry.cacheIds());
 
-            FieldsQueryCursor<List<?>> res = executeQuery(
+            FieldsQueryCursor<List<?>> res = executeQueryWithSplit(
                 schemaName,
                 qry,
                 twoStepQry,
@@ -1769,7 +1769,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
      * @param registerAsNewQry {@code true} In case it's new query which 
should be registered as running query,
      * @return Cursor representing distributed query result.
      */
-    private FieldsQueryCursor<List<?>> executeQuery(String schemaName, 
SqlFieldsQuery qry,
+    private FieldsQueryCursor<List<?>> executeQueryWithSplit(String 
schemaName, SqlFieldsQuery qry,
         GridCacheTwoStepQuery twoStepQry, List<GridQueryFieldMetadata> meta, 
boolean keepBinary,
         boolean startTx, MvccQueryTracker mvccTracker, GridQueryCancel cancel, 
boolean registerAsNewQry) {
         if (log.isDebugEnabled())
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
index 35fec00..54a3bbd 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
@@ -29,7 +29,6 @@ import 
org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils;
-import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuery;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement;
@@ -245,19 +244,17 @@ public class QueryParser {
 
         H2Utils.setupConnection(c, /*distributedJoins*/false, 
/*enforceJoinOrder*/enforceJoinOrderOnParsing);
 
-        boolean loc = qry.isLocal();
-
         PreparedStatement stmt;
 
         try {
-            stmt = connMgr.prepareStatement(c, qry.getSql());
+            stmt = connMgr.prepareStatementNoCache(c, qry.getSql());
         }
         catch (SQLException e) {
             throw new IgniteSQLException("Failed to parse query. " + 
e.getMessage(),
                 IgniteQueryErrorCode.PARSING, e);
         }
 
-        if (loc && GridSqlQueryParser.checkMultipleStatements(stmt))
+        if (qry.isLocal() && GridSqlQueryParser.checkMultipleStatements(stmt))
             throw new IgniteSQLException("Multiple statements queries are not 
supported for local queries.",
                 IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
 
@@ -299,52 +296,6 @@ public class QueryParser {
 
         SqlFieldsQuery newQry = 
cloneFieldsQuery(qry).setSql(prepared.getSQL()).setArgs(args);
 
-        // TODO: WTF is that? Modifies global query flag (distr joins), 
invokes additional parsing.
-        if (prepared.isQuery()) {
-            try {
-                H2Utils.bindParameters(stmt, F.asList(args));
-            }
-            catch (IgniteCheckedException e) {
-                U.closeQuiet(stmt);
-
-                throw new IgniteSQLException("Failed to bind parameters: 
[qry=" + prepared.getSQL() + ", params=" +
-                    Arrays.deepToString(args) + "]", 
IgniteQueryErrorCode.PARSING, e);
-            }
-
-            GridSqlQueryParser parser = null;
-
-            if (!loc) {
-                parser = new GridSqlQueryParser(false);
-
-                GridSqlStatement parsedStmt = parser.parse(prepared);
-
-                // Legit assertion - we have H2 query flag above.
-                assert parsedStmt instanceof GridSqlQuery;
-
-                loc = parser.isLocalQuery();
-            }
-
-            if (loc) {
-                if (parser == null) {
-                    parser = new GridSqlQueryParser(false);
-
-                    parser.parse(prepared);
-                }
-
-                GridCacheContext cctx = parser.getFirstPartitionedCache();
-
-                if (cctx != null && cctx.config().getQueryParallelism() > 1) {
-                    loc = false;
-
-                    newQry.setDistributedJoins(true);
-                }
-            }
-        }
-
-        // Do not cache multiple statements and distributed queries as whole 
two step query will be cached later on.
-        if (remainingQry != null || !loc)
-            connMgr.statementCacheForThread().remove(schemaName, qry.getSql());
-
         if (CommandProcessor.isCommand(prepared)) {
             GridSqlStatement cmdH2 = new 
GridSqlQueryParser(false).parse(prepared);
 
@@ -364,11 +315,47 @@ public class QueryParser {
                 IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
         }
 
-        // Parse SELECT.
+        // Parse SELECT. Split is required either if query is distirubted, or 
when it is local, but executed
+        // over segmented PARTITIONED case. In this case multiple map queries 
will be executed against local
+        // node stripes in parallel and then merged through reduce process.
+
+
+        // Calculate if query is in fact can be executed locally.
+        boolean loc = qry.isLocal();
+
+        GridSqlQueryParser parser = null;
+
+        if (!loc) {
+            parser = new GridSqlQueryParser(false);
+
+            parser.parse(prepared);
+
+            if (parser.isLocalQuery())
+                loc = true;
+        }
+
+        // If this is a local query, check if it must be split.
+        boolean locSplit = false;
+
+        if (loc) {
+            if (parser == null) {
+                parser = new GridSqlQueryParser(false);
+
+                parser.parse(prepared);
+            }
+
+            GridCacheContext cctx = parser.getFirstPartitionedCache();
+
+            if (cctx != null && cctx.config().getQueryParallelism() > 1)
+                locSplit = true;
+        }
+
+        boolean splitNeeded = !loc || locSplit;
+
         try {
             GridCacheTwoStepQuery twoStepQry = null;
 
-            if (!loc) {
+            if (splitNeeded) {
                 twoStepQry = GridSqlQuerySplitter.split(
                     
connMgr.connectionForThread().connection(newQry.getSchema()),
                     prepared,
@@ -376,14 +363,14 @@ public class QueryParser {
                     newQry.isCollocated(),
                     newQry.isDistributedJoins(),
                     newQry.isEnforceJoinOrder(),
-                    newQry.isLocal(),
+                    locSplit,
                     idx
                 );
             }
 
             List<GridQueryFieldMetadata> meta = 
H2Utils.meta(stmt.getMetaData());
 
-            QueryParserResultSelect select = new 
QueryParserResultSelect(twoStepQry, meta);
+            QueryParserResultSelect select = new 
QueryParserResultSelect(twoStepQry, locSplit, meta);
 
             return new QueryParserResult(newQry, remainingQry, select, null, 
null);
         }
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultSelect.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultSelect.java
index ba95552..29cbe0e 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultSelect.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultSelect.java
@@ -31,6 +31,9 @@ public class QueryParserResultSelect {
     /** Two-step query, or {@code} null if this result is for local query. */
     private final GridCacheTwoStepQuery twoStepQry;
 
+    /** Whether local split is needed. */
+    private final boolean locSplit;
+
     /** Metadata for two-step query, or {@code} null if this result is for 
local query. */
     private final List<GridQueryFieldMetadata> meta;
 
@@ -38,10 +41,19 @@ public class QueryParserResultSelect {
      * Constructor.
      *
      * @param twoStepQry Distributed query plan.
+     * @param locSplit Whether local split is needed.
      * @param meta Fields metadata.
      */
-    public QueryParserResultSelect(@Nullable GridCacheTwoStepQuery twoStepQry, 
List<GridQueryFieldMetadata> meta) {
+    public QueryParserResultSelect(
+        @Nullable GridCacheTwoStepQuery twoStepQry,
+        boolean locSplit,
+        List<GridQueryFieldMetadata> meta
+    ) {
+        // Local split can be true only is there is a two-step plan.
+        assert twoStepQry == null && !locSplit || twoStepQry != null;
+
         this.twoStepQry = twoStepQry;
+        this.locSplit = locSplit;
         this.meta = meta;
     }
 
@@ -53,6 +65,13 @@ public class QueryParserResultSelect {
     }
 
     /**
+     * @return {@code True} if local query should be split.
+     */
+    public boolean localSplit() {
+        return locSplit;
+    }
+
+    /**
      * @return Two-step query metadata.
      */
     public List<GridQueryFieldMetadata> meta() {
@@ -60,9 +79,9 @@ public class QueryParserResultSelect {
     }
 
     /**
-     * @return Whether this is a local query.
+     * @return Whether split is needed for this query.
      */
-    public boolean isLocal() {
-        return twoStepQry == null;
+    public boolean splitNeeded() {
+        return twoStepQry != null;
     }
 }
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
index 484f5b5..c3f55d5 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
@@ -946,8 +946,11 @@ public final class UpdatePlanBuilder {
                     idx
                 );
 
-                boolean distributed = !qry.isLocal() && qry.skipMergeTable() 
&&  qry.mapQueries().size() == 1 &&
-                    !qry.mapQueries().get(0).hasSubQueries();
+                boolean distributed =
+                    !qry.isLocalSplit() &&                                     
               // No split for local qry
+                    qry.hasCacheIds() &&                                       
               // Over real caches
+                    qry.skipMergeTable() &&                                    
               // No merge table
+                    qry.mapQueries().size() == 1 && 
!qry.mapQueries().get(0).hasSubQueries(); // One w/o subqueries
 
                 if (distributed) {
                     List<Integer> cacheIds = H2Utils.collectCacheIds(idx, 
CU.cacheId(cacheName), qry.tables());
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinContext.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinContext.java
index e22e0bf..d4dead3 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinContext.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinContext.java
@@ -30,9 +30,6 @@ import java.util.UUID;
  * Context for distributed joins.
  */
 public class DistributedJoinContext {
-    /** Local flag. */
-    private final boolean loc;
-
     /** */
     private final AffinityTopologyVersion topVer;
 
@@ -69,7 +66,6 @@ public class DistributedJoinContext {
     /**
      * Constructor.
      *
-     * @param loc Local flag.
      * @param topVer Topology version.
      * @param partsMap Partitions map.
      * @param originNodeId ID of the node started the query.
@@ -78,9 +74,14 @@ public class DistributedJoinContext {
      * @param pageSize Pahe size.
      */
     @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
-    public DistributedJoinContext(boolean loc, AffinityTopologyVersion topVer, 
Map<UUID, int[]> partsMap,
-        UUID originNodeId, long qryId, int segment, int pageSize) {
-        this.loc = loc;
+    public DistributedJoinContext(
+        AffinityTopologyVersion topVer,
+        Map<UUID, int[]> partsMap,
+        UUID originNodeId,
+        long qryId,
+        int segment,
+        int pageSize
+    ) {
         this.topVer = topVer;
         this.partsMap = partsMap;
         this.originNodeId = originNodeId;
@@ -90,13 +91,6 @@ public class DistributedJoinContext {
     }
 
     /**
-     * @return Local flag.
-     */
-    public boolean local() {
-        return loc;
-    }
-
-    /**
      * @return Affinity topology version.
      */
     public AffinityTopologyVersion topologyVersion() {
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
index efae97a..7599516 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
@@ -180,8 +180,6 @@ public class DistributedLookupBatch implements 
IndexLookupBatch {
 
         Object affKey = getAffinityKey(firstRow, lastRow);
 
-        boolean locQry = localQuery();
-
         List<SegmentKey> segmentKeys;
 
         if (affKey != null) {
@@ -189,19 +187,16 @@ public class DistributedLookupBatch implements 
IndexLookupBatch {
             if (affKey == EXPLICIT_NULL) // Affinity key is explicit null, we 
will not find anything.
                 return false;
 
-            segmentKeys = F.asList(rangeSegment(affKey, locQry));
+            segmentKeys = F.asList(rangeSegment(affKey));
         }
         else {
             // Affinity key is not provided or is not the same in upper and 
lower bounds, we have to broadcast.
             if (broadcastSegments == null)
-                broadcastSegments = broadcastSegments(locQry);
+                broadcastSegments = broadcastSegments();
 
             segmentKeys = broadcastSegments;
         }
 
-        if (locQry && segmentKeys.isEmpty())
-            return false; // Nothing to do
-
         assert !F.isEmpty(segmentKeys) : segmentKeys;
 
         final int rangeId = res.size();
@@ -268,13 +263,6 @@ public class DistributedLookupBatch implements 
IndexLookupBatch {
     }
 
     /**
-     * @return {@code True} if local query execution is enforced.
-     */
-    private boolean localQuery() {
-        return joinCtx != null && joinCtx.local();
-    }
-
-    /**
      *
      */
     private void startStreams() {
@@ -331,84 +319,58 @@ public class DistributedLookupBatch implements 
IndexLookupBatch {
 
     /**
      * @param affKeyObj Affinity key.
-     * @param isLocalQry Local query flag.
      * @return Segment key for Affinity key.
      */
-    public SegmentKey rangeSegment(Object affKeyObj, boolean isLocalQry) {
+    public SegmentKey rangeSegment(Object affKeyObj) {
         assert affKeyObj != null && affKeyObj != EXPLICIT_NULL : affKeyObj;
 
         ClusterNode node;
 
         int partition = cctx.affinity().partition(affKeyObj);
 
-        if (isLocalQry) {
-            if (joinCtx.partitionsMap() != null) {
-                // If we have explicit partitions map, we have to use it to 
calculate affinity node.
-                UUID nodeId = joinCtx.nodeForPartition(partition, cctx);
-
-                if(!cctx.localNodeId().equals(nodeId))
-                    return null; // Prevent remote index call for local 
queries.
-            }
-
-            if (!cctx.affinity().primaryByKey(cctx.localNode(), partition, 
joinCtx.topologyVersion()))
-                return null;
+        if (joinCtx.partitionsMap() != null) {
+            // If we have explicit partitions map, we have to use it to 
calculate affinity node.
+            UUID nodeId = joinCtx.nodeForPartition(partition, cctx);
 
-            node = cctx.localNode();
+            node = cctx.discovery().node(nodeId);
         }
-        else{
-            if (joinCtx.partitionsMap() != null) {
-                // If we have explicit partitions map, we have to use it to 
calculate affinity node.
-                UUID nodeId = joinCtx.nodeForPartition(partition, cctx);
+        else // Get primary node for current topology version.
+            node = cctx.affinity().primaryByKey(affKeyObj, 
joinCtx.topologyVersion());
 
-                node = cctx.discovery().node(nodeId);
-            }
-            else // Get primary node for current topology version.
-                node = cctx.affinity().primaryByKey(affKeyObj, 
joinCtx.topologyVersion());
-
-            if (node == null) // Node was not found, probably topology changed 
and we need to retry the whole query.
-                throw H2Utils.retryException("Failed to get primary node by 
key for range segment.");
-        }
+        if (node == null) // Node was not found, probably topology changed and 
we need to retry the whole query.
+            throw H2Utils.retryException("Failed to get primary node by key 
for range segment.");
 
         return new SegmentKey(node, idx.segmentForPartition(partition));
     }
 
     /**
-     * @param isLocalQry Local query flag.
      * @return Collection of nodes for broadcasting.
      */
-    public List<SegmentKey> broadcastSegments(boolean isLocalQry) {
+    public List<SegmentKey> broadcastSegments() {
         Map<UUID, int[]> partMap = joinCtx.partitionsMap();
 
         List<ClusterNode> nodes;
 
-        if (isLocalQry) {
-            if (partMap != null && !partMap.containsKey(cctx.localNodeId()))
-                return Collections.emptyList(); // Prevent remote index call 
for local queries.
-
-            nodes = Collections.singletonList(cctx.localNode());
-        }
+        if (partMap == null)
+            nodes = new ArrayList<>(CU.affinityNodes(cctx, 
joinCtx.topologyVersion()));
         else {
-            if (partMap == null)
-                nodes = new ArrayList<>(CU.affinityNodes(cctx, 
joinCtx.topologyVersion()));
-            else {
-                nodes = new ArrayList<>(partMap.size());
+            nodes = new ArrayList<>(partMap.size());
 
-                for (UUID nodeId : partMap.keySet()) {
-                    ClusterNode node = 
cctx.kernalContext().discovery().node(nodeId);
+            for (UUID nodeId : partMap.keySet()) {
+                ClusterNode node = 
cctx.kernalContext().discovery().node(nodeId);
 
-                    if (node == null)
-                        throw H2Utils.retryException("Failed to get node by ID 
during broadcast [" +
-                            "nodeId=" + nodeId + ']');
+                if (node == null)
+                    throw H2Utils.retryException("Failed to get node by ID 
during broadcast [" +
+                        "nodeId=" + nodeId + ']');
 
-                    nodes.add(node);
-                }
+                nodes.add(node);
             }
-
-            if (F.isEmpty(nodes))
-                throw H2Utils.retryException("Failed to collect affinity nodes 
during broadcast [" +
-                    "cacheName=" + cctx.name() + ']');
         }
 
+        if (F.isEmpty(nodes))
+            throw H2Utils.retryException("Failed to collect affinity nodes 
during broadcast [" +
+                "cacheName=" + cctx.name() + ']');
+
         int segmentsCount = idx.segmentsCount();
 
         List<SegmentKey> res = new ArrayList<>(nodes.size() * segmentsCount);
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index e3fca3c..19c80d7 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -115,8 +115,8 @@ public class GridSqlQuerySplitter {
     /** */
     private final boolean collocatedGrpBy;
 
-    /** */
-    private final boolean distributedJoins;
+    /** Whether partition extraction is possible. */
+    private final boolean canExtractPartitions;
 
     /** */
     private final IdentityHashMap<GridSqlAst, GridSqlAlias> uniqueFromAliases 
= new IdentityHashMap<>();
@@ -128,15 +128,25 @@ public class GridSqlQuerySplitter {
      * @param params Query parameters.
      * @param collocatedGrpBy If it is a collocated GROUP BY query.
      * @param distributedJoins Distributed joins flag.
+     * @param locSplit Local split flag.
      * @param extractor Partition extractor.
      */
     @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
-    public GridSqlQuerySplitter(Object[] params, boolean collocatedGrpBy, 
boolean distributedJoins,
-        PartitionExtractor extractor) {
+    public GridSqlQuerySplitter(
+        Object[] params,
+        boolean collocatedGrpBy,
+        boolean distributedJoins,
+        boolean locSplit,
+        PartitionExtractor extractor
+    ) {
         this.params = params;
         this.collocatedGrpBy = collocatedGrpBy;
-        this.distributedJoins = distributedJoins;
         this.extractor = extractor;
+
+        // Partitions *CANNOT* be extracted if:
+        // 1) Distributed joins are enabled 
(https://issues.apache.org/jira/browse/IGNITE-10971)
+        // 2) This is a local query with split 
(https://issues.apache.org/jira/browse/IGNITE-11316)
+        canExtractPartitions = !distributedJoins && !locSplit;
     }
 
     /**
@@ -171,6 +181,7 @@ public class GridSqlQuerySplitter {
      * @param collocatedGrpBy Whether the query has collocated GROUP BY keys.
      * @param distributedJoins If distributed joins enabled.
      * @param enforceJoinOrder Enforce join order.
+     * @param locSplit Whether this is a split for local query.
      * @param idx Indexing.
      * @return Two step query.
      * @throws SQLException If failed.
@@ -183,7 +194,7 @@ public class GridSqlQuerySplitter {
         boolean collocatedGrpBy,
         boolean distributedJoins,
         boolean enforceJoinOrder,
-        boolean local,
+        boolean locSplit,
         IgniteH2Indexing idx
     ) throws SQLException, IgniteCheckedException {
         SplitterContext.set(distributedJoins);
@@ -196,7 +207,7 @@ public class GridSqlQuerySplitter {
                 collocatedGrpBy,
                 distributedJoins,
                 enforceJoinOrder,
-                local,
+                locSplit,
                 idx
             );
         }
@@ -212,6 +223,7 @@ public class GridSqlQuerySplitter {
      * @param collocatedGrpBy Whether the query has collocated GROUP BY keys.
      * @param distributedJoins If distributed joins enabled.
      * @param enforceJoinOrder Enforce join order.
+     * @param locSplit Whether this is a split for local query.
      * @param idx Indexing.
      * @return Two step query.
      * @throws SQLException If failed.
@@ -224,7 +236,7 @@ public class GridSqlQuerySplitter {
         boolean collocatedGrpBy,
         boolean distributedJoins,
         boolean enforceJoinOrder,
-        boolean local,
+        boolean locSplit,
         IgniteH2Indexing idx
     ) throws SQLException, IgniteCheckedException {
         if (params == null)
@@ -244,6 +256,7 @@ public class GridSqlQuerySplitter {
             params,
             collocatedGrpBy,
             distributedJoins,
+            locSplit,
             idx.partitionExtractor()
         );
 
@@ -306,7 +319,7 @@ public class GridSqlQuerySplitter {
             splitter.extractor.mergeMapQueries(splitter.mapSqlQrys),
             cacheIds,
             mvccEnabled,
-            local || F.isEmpty(cacheIds)
+            locSplit
         );
     }
 
@@ -1257,7 +1270,7 @@ public class GridSqlQuerySplitter {
         map.partitioned(SplitterUtils.hasPartitionedTables(mapQry));
         map.hasSubQueries(SplitterUtils.hasSubQueries(mapQry));
 
-        if (map.isPartitioned() && !distributedJoins)
+        if (map.isPartitioned() && canExtractPartitions)
             map.derivedPartitions(extractor.extract(mapQry));
 
         mapSqlQrys.add(map);
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 7a104b0..e6bb564 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -297,13 +297,12 @@ public class GridMapQueryExecutor {
         final int[] parts = qryParts == null ? partsMap == null ? null : 
partsMap.get(ctx.localNodeId()) : qryParts;
 
         boolean distributedJoins = 
req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS);
-        boolean local = req.isFlagSet(GridH2QueryRequest.FLAG_IS_LOCAL);
+        boolean enforceJoinOrder = 
req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER);
+        boolean explain = req.isFlagSet(GridH2QueryRequest.FLAG_EXPLAIN);
+        boolean replicated = req.isFlagSet(GridH2QueryRequest.FLAG_REPLICATED);
+        boolean lazy = (FORCE_LAZY && req.queries().size() == 1) || 
req.isFlagSet(GridH2QueryRequest.FLAG_LAZY);
 
-        final boolean enforceJoinOrder = 
req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER);
-        final boolean explain = req.isFlagSet(GridH2QueryRequest.FLAG_EXPLAIN);
-        final boolean replicated = 
req.isFlagSet(GridH2QueryRequest.FLAG_REPLICATED);
-        final boolean lazy = (FORCE_LAZY && req.queries().size() == 1) || 
req.isFlagSet(GridH2QueryRequest.FLAG_LAZY);
-        final Boolean dataPageScanEnabled = req.isDataPageScanEnabled();
+        Boolean dataPageScanEnabled = req.isDataPageScanEnabled();
 
         final List<Integer> cacheIds = req.caches();
 
@@ -393,7 +392,6 @@ public class GridMapQueryExecutor {
                     parts,
                     req.pageSize(),
                     distributedJoins,
-                    local,
                     enforceJoinOrder,
                     false, // Replicated is always false here (see condition 
above).
                     req.timeout(),
@@ -421,7 +419,6 @@ public class GridMapQueryExecutor {
                                 parts,
                                 req.pageSize(),
                                 distributedJoins,
-                                local,
                                 enforceJoinOrder,
                                 false,
                                 req.timeout(),
@@ -452,7 +449,6 @@ public class GridMapQueryExecutor {
             parts,
             req.pageSize(),
             distributedJoins,
-            local,
             enforceJoinOrder,
             replicated,
             req.timeout(),
@@ -477,8 +473,7 @@ public class GridMapQueryExecutor {
      * @param partsMap Partitions map for unstable topology.
      * @param parts Explicit partitions for current node.
      * @param pageSize Page size.
-     * @param distributeJoins Query distributed join mode.
-     * @param local Lcoal flag.
+     * @param distributedJoins Query distributed join mode.
      * @param lazy Streaming flag.
      * @param mvccSnapshot MVCC snapshot.
      * @param tx Transaction.
@@ -498,8 +493,7 @@ public class GridMapQueryExecutor {
         final Map<UUID, int[]> partsMap,
         final int[] parts,
         final int pageSize,
-        final boolean distributeJoins,
-        final boolean local,
+        final boolean distributedJoins,
         final boolean enforceJoinOrder,
         final boolean replicated,
         final int timeout,
@@ -537,8 +531,7 @@ public class GridMapQueryExecutor {
                         partsMap,
                         parts,
                         pageSize,
-                        distributeJoins,
-                        local,
+                        distributedJoins,
                         enforceJoinOrder,
                         replicated,
                         timeout,
@@ -618,9 +611,8 @@ public class GridMapQueryExecutor {
             // Prepare query context.
             DistributedJoinContext distributedJoinCtx = null;
 
-            if (distributeJoins && !replicated) {
+            if (distributedJoins && !replicated) {
                 distributedJoinCtx = new DistributedJoinContext(
-                    local,
                     topVer,
                     partsMap,
                     node.id(),
@@ -642,7 +634,7 @@ public class GridMapQueryExecutor {
 
             Connection conn = 
h2.connections().connectionForThread().connection(schemaName);
 
-            H2Utils.setupConnection(conn, distributeJoins, enforceJoinOrder);
+            H2Utils.setupConnection(conn, distributedJoins, enforceJoinOrder);
 
             qryCtxRegistry.setThreadLocal(qctx);
 
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 24e93cd..ae399f7 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -408,7 +408,8 @@ public class GridReduceQueryExecutor {
         boolean forUpdate,
         int pageSize
     ) {
-        if (qry.isLocal() && parts != null)
+        // If explicit partitions are set, but there are no real tables, 
ignore.
+        if (!qry.hasCacheIds() && parts != null)
             parts = null;
 
         assert !qry.mvccEnabled() || mvccTracker != null;
@@ -544,7 +545,7 @@ public class GridReduceQueryExecutor {
                     throw new CacheException("Partitions are not supported for 
replicated caches");
             }
 
-            if (qry.isLocal())
+            if (qry.isLocalSplit() || !qry.hasCacheIds())
                 nodes = singletonList(ctx.discovery().localNode());
             else {
                 ReducePartitionMapResult nodesParts =
@@ -655,8 +656,6 @@ public class GridReduceQueryExecutor {
                             .parameterIndexes(mapQry.parameterIndexes()));
                 }
 
-                final boolean distributedJoins = qry.distributedJoins();
-
                 final long qryReqId0 = qryReqId;
 
                 cancel.set(new Runnable() {
@@ -669,12 +668,10 @@ public class GridReduceQueryExecutor {
 
                 int flags = singlePartMode && !enforceJoinOrder ? 0 : 
GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER;
 
-                if (distributedJoins)
+                // Distributed joins flag is set if it is either reald
+                if (qry.distributedJoins())
                     flags |= GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS;
 
-                if (qry.isLocal())
-                    flags |= GridH2QueryRequest.FLAG_IS_LOCAL;
-
                 if (qry.explain())
                     flags |= GridH2QueryRequest.FLAG_EXPLAIN;
 
@@ -691,7 +688,7 @@ public class GridReduceQueryExecutor {
                     .topologyVersion(topVer)
                     .pageSize(r.pageSize())
                     .caches(qry.cacheIds())
-                    .tables(distributedJoins ? qry.tables() : null)
+                    .tables(qry.distributedJoins() ? qry.tables() : null)
                     .partitions(convert(partsMap))
                     .queries(mapQrys)
                     .parameters(params)
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
index 93e27f0..b5c1f77 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@ -68,9 +68,10 @@ public class GridH2QueryRequest implements Message, 
GridCacheQueryMarshallable {
     public static final int FLAG_ENFORCE_JOIN_ORDER = 1 << 1;
 
     /**
-     * Restrict distributed joins range-requests to local index segments. 
Range requests to other nodes will not be sent.
+     * Unused. Keep for backward compatibility.
      */
-    public static final int FLAG_IS_LOCAL = 1 << 2;
+    @SuppressWarnings("unused")
+    public static final int FLAG_UNUSED = 1 << 2;
 
     /**
      * If it is an EXPLAIN command.
@@ -426,10 +427,10 @@ public class GridH2QueryRequest implements Message, 
GridCacheQueryMarshallable {
     }
 
     /**
-     * @param txDetails TX details holder for {@code SELECT FOR UPDATE}, or 
{@code null} if not applicable.
+     * @param txReq TX details holder for {@code SELECT FOR UPDATE}, or {@code 
null} if not applicable.
      */
-    public void txDetails(GridH2SelectForUpdateTxDetails txDetails) {
-        this.txReq = txDetails;
+    public void txDetails(GridH2SelectForUpdateTxDetails txReq) {
+        this.txReq = txReq;
     }
 
     /**
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryDedicatedPoolTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryDedicatedPoolTest.java
index 416e66c..4cd3faf 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryDedicatedPoolTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryDedicatedPoolTest.java
@@ -91,7 +91,6 @@ public class IgniteQueryDedicatedPoolTest extends 
GridCommonAbstractTest {
     /**
      * Tests that SQL queries involving actual network IO are executed in 
dedicated pool.
      * @throws Exception If failed.
-     * @see GridCacheTwoStepQuery#isLocal()
      */
     @Test
     public void testSqlQueryUsesDedicatedThreadPool() throws Exception {
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
index be8a5f0..8f57367 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
@@ -27,13 +27,12 @@ import javax.cache.Cache;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheKeyConfiguration;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
 import org.apache.ignite.cache.eviction.fifo.FifoEvictionPolicy;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
 import 
org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
 import org.junit.Test;
 
@@ -57,20 +56,7 @@ public class IgniteSqlSegmentedIndexSelfTest extends 
AbstractIndexingCommonTest
     private static final int ORPHAN_ROWS = 10;
 
     /** */
-    private static int QRY_PARALLELISM_LVL = 97;
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        CacheKeyConfiguration keyCfg = new CacheKeyConfiguration("MyCache", 
"affKey");
-
-        cfg.setCacheKeyConfiguration(keyCfg);
-
-        cfg.setPeerClassLoadingEnabled(false);
-
-        return cfg;
-    }
+    private static final int QRY_PARALLELISM_LVL = 97;
 
     /** @return number of nodes to be prestarted. */
     protected int nodesCount() {
@@ -78,11 +64,13 @@ public class IgniteSqlSegmentedIndexSelfTest extends 
AbstractIndexingCommonTest
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("deprecation")
     @Override protected void beforeTestsStarted() throws Exception {
         startGrids(nodesCount());
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("deprecation")
     @Override protected void afterTest() throws Exception {
         super.afterTest();
 
@@ -105,11 +93,11 @@ public class IgniteSqlSegmentedIndexSelfTest extends 
AbstractIndexingCommonTest
     }
 
     /**
-     * @throws Exception If failed.
+     * Test segmented index.
      */
     @Test
-    public void testSegmentedIndex() throws Exception {
-        ignite(0).createCache(cacheConfig(PERSON_CAHE_NAME, true, 
Integer.class, Person.class));
+    public void testSegmentedIndex() {
+        ignite(0).createCache(cacheConfig(PERSON_CAHE_NAME, true, 
PersonKey.class, Person.class));
         ignite(0).createCache(cacheConfig(ORG_CACHE_NAME, true, Integer.class, 
Organization.class));
 
         fillCache();
@@ -123,10 +111,9 @@ public class IgniteSqlSegmentedIndexSelfTest extends 
AbstractIndexingCommonTest
 
     /**
      * Check correct index snapshots with segmented indices.
-     * @throws Exception If failed.
      */
     @Test
-    public void testSegmentedIndexReproducableResults() throws Exception {
+    public void testSegmentedIndexReproducableResults() {
         ignite(0).createCache(cacheConfig(ORG_CACHE_NAME, true, Integer.class, 
Organization.class));
 
         IgniteCache<Object, Object> cache = ignite(0).cache(ORG_CACHE_NAME);
@@ -149,10 +136,9 @@ public class IgniteSqlSegmentedIndexSelfTest extends 
AbstractIndexingCommonTest
 
     /**
      * Checks correct <code>select count(*)</code> result with segmented 
indices.
-     * @throws Exception If failed.
      */
     @Test
-    public void testSegmentedIndexSizeReproducableResults() throws Exception {
+    public void testSegmentedIndexSizeReproducableResults() {
         ignite(0).createCache(cacheConfig(ORG_CACHE_NAME, true, Integer.class, 
Organization.class));
 
         IgniteCache<Object, Object> cache = ignite(0).cache(ORG_CACHE_NAME);
@@ -174,12 +160,11 @@ public class IgniteSqlSegmentedIndexSelfTest extends 
AbstractIndexingCommonTest
     }
 
     /**
-     * Run tests on single-node grid
-     *
-     * @throws Exception If failed.
+     * Run tests on single-node grid.
      */
+    @SuppressWarnings("deprecation")
     @Test
-    public void testSegmentedIndexWithEvictionPolicy() throws Exception {
+    public void testSegmentedIndexWithEvictionPolicy() {
         final IgniteCache<Object, Object> cache = ignite(0).createCache(
             cacheConfig(ORG_CACHE_NAME, true, Integer.class, 
Organization.class)
                 .setEvictionPolicy(new FifoEvictionPolicy(10))
@@ -199,11 +184,10 @@ public class IgniteSqlSegmentedIndexSelfTest extends 
AbstractIndexingCommonTest
 
     /**
      * Verifies that <code>select count(*)</code> return valid result on a 
single-node grid.
-     *
-     * @throws Exception If failed.
      */
+    @SuppressWarnings("deprecation")
     @Test
-    public void testSizeOnSegmentedIndexWithEvictionPolicy() throws Exception {
+    public void testSizeOnSegmentedIndexWithEvictionPolicy() {
         final IgniteCache<Object, Object> cache = ignite(0).createCache(
             cacheConfig(ORG_CACHE_NAME, true, Integer.class, 
Organization.class)
                 .setEvictionPolicy(new FifoEvictionPolicy(10))
@@ -228,7 +212,7 @@ public class IgniteSqlSegmentedIndexSelfTest extends 
AbstractIndexingCommonTest
      */
     @Test
     public void testSegmentedPartitionedWithReplicated() throws Exception {
-        ignite(0).createCache(cacheConfig(PERSON_CAHE_NAME, true, 
Integer.class, Person.class));
+        ignite(0).createCache(cacheConfig(PERSON_CAHE_NAME, true, 
PersonKey.class, Person.class));
         ignite(0).createCache(cacheConfig(ORG_CACHE_NAME, false, 
Integer.class, Organization.class));
 
         fillCache();
@@ -242,17 +226,15 @@ public class IgniteSqlSegmentedIndexSelfTest extends 
AbstractIndexingCommonTest
 
     /**
      * Check distributed joins.
-     *
-     * @throws Exception If failed.
      */
-    public void checkDistributedQueryWithSegmentedIndex() throws Exception {
+    public void checkDistributedQueryWithSegmentedIndex() {
         for (int i = 0; i < nodesCount(); i++) {
-            IgniteCache<Integer, Person> c1 = 
ignite(i).cache(PERSON_CAHE_NAME);
+            IgniteCache<PersonKey, Person> c1 = 
ignite(i).cache(PERSON_CAHE_NAME);
 
             long expPersons = 0;
 
-            for (Cache.Entry<Integer, Person> e : c1) {
-                final Integer orgId = e.getValue().orgId;
+            for (Cache.Entry<PersonKey, Person> e : c1) {
+                final Integer orgId = e.getKey().orgId;
 
                 // We have as orphan ORG rows as orphan PERSON rows.
                 if (ORPHAN_ROWS <= orgId && orgId < 500)
@@ -269,14 +251,12 @@ public class IgniteSqlSegmentedIndexSelfTest extends 
AbstractIndexingCommonTest
 
     /**
      * Test local query.
-     *
-     * @throws Exception If failed.
      */
-    public void checkLocalQueryWithSegmentedIndex() throws Exception {
+    public void checkLocalQueryWithSegmentedIndex() {
         for (int i = 0; i < nodesCount(); i++) {
             final Ignite node = ignite(i);
 
-            IgniteCache<Integer, Person> c1 = node.cache(PERSON_CAHE_NAME);
+            IgniteCache<PersonKey, Person> c1 = node.cache(PERSON_CAHE_NAME);
             IgniteCache<Integer, Organization> c2 = node.cache(ORG_CACHE_NAME);
 
             Set<Integer> locOrgIds = new HashSet<>();
@@ -286,8 +266,8 @@ public class IgniteSqlSegmentedIndexSelfTest extends 
AbstractIndexingCommonTest
 
             long expPersons = 0;
 
-            for (Cache.Entry<Integer, Person> e : c1.localEntries()) {
-                final Integer orgId = e.getValue().orgId;
+            for (Cache.Entry<PersonKey, Person> e : c1.localEntries()) {
+                final Integer orgId = e.getKey().orgId;
 
                 if (locOrgIds.contains(orgId))
                     expPersons++;
@@ -303,14 +283,12 @@ public class IgniteSqlSegmentedIndexSelfTest extends 
AbstractIndexingCommonTest
 
     /**
      * Verifies that local <code>select count(*)</code> query returns a 
correct result.
-     *
-     * @throws Exception If failed.
      */
-    public void checkLocalSizeQueryWithSegmentedIndex() throws Exception {
+    public void checkLocalSizeQueryWithSegmentedIndex() {
         for (int i = 0; i < nodesCount(); i++) {
             final Ignite node = ignite(i);
 
-            IgniteCache<Integer, Person> c1 = node.cache(PERSON_CAHE_NAME);
+            IgniteCache<PersonKey, Person> c1 = node.cache(PERSON_CAHE_NAME);
             IgniteCache<Integer, Organization> c2 = node.cache(ORG_CACHE_NAME);
 
             Set<Integer> locOrgIds = new HashSet<>();
@@ -320,8 +298,8 @@ public class IgniteSqlSegmentedIndexSelfTest extends 
AbstractIndexingCommonTest
 
             int expPersons = 0;
 
-            for (Cache.Entry<Integer, Person> e : c1.localEntries()) {
-                final Integer orgId = e.getValue().orgId;
+            for (Cache.Entry<PersonKey, Person> e : c1.localEntries()) {
+                final Integer orgId = e.getKey().orgId;
 
                 if (locOrgIds.contains(orgId))
                     expPersons++;
@@ -349,7 +327,22 @@ public class IgniteSqlSegmentedIndexSelfTest extends 
AbstractIndexingCommonTest
             // We have as orphan ORG rows as orphan PERSON rows.
             int orgID = ORPHAN_ROWS + random.nextInt(ORG_CACHE_SIZE + 
ORPHAN_ROWS);
 
-            c1.put(i, new Person(orgID, "pers-" + i));
+            c1.put(new PersonKey(i, orgID), new Person("pers-" + i));
+        }
+    }
+
+    private static class PersonKey {
+        @QuerySqlField
+        int id;
+
+        /** */
+        @AffinityKeyMapped
+        @QuerySqlField
+        Integer orgId;
+
+        public PersonKey(int id, Integer orgId) {
+            this.id = id;
+            this.orgId = orgId;
         }
     }
 
@@ -358,26 +351,14 @@ public class IgniteSqlSegmentedIndexSelfTest extends 
AbstractIndexingCommonTest
      */
     private static class Person implements Serializable {
         /** */
-        @QuerySqlField(index = true)
-        Integer orgId;
-
-        /** */
         @QuerySqlField
         String name;
 
-        /**
-         *
-         */
-        public Person() {
-            // No-op.
-        }
 
         /**
-         * @param orgId Organization ID.
          * @param name Name.
          */
-        public Person(int orgId, String name) {
-            this.orgId = orgId;
+        public Person(String name) {
             this.name = name;
         }
     }

Reply via email to