This is an automated email from the ASF dual-hosted git repository.
vozerov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 62e9bc8 IGNITE-11310: SQL: Removed distirbuted joins from local
segmented queries. This closes #6103.
62e9bc8 is described below
commit 62e9bc8761057fed8ba0bdbd479a1a29ddf73d4c
Author: devozerov <[email protected]>
AuthorDate: Thu Feb 14 20:53:57 2019 +0300
IGNITE-11310: SQL: Removed distirbuted joins from local segmented queries.
This closes #6103.
---
.../query/h2/H2IndexingAbstractGeoSelfTest.java | 39 --------
.../cache/query/GridCacheTwoStepQuery.java | 22 ++++-
.../processors/query/h2/IgniteH2Indexing.java | 6 +-
.../internal/processors/query/h2/QueryParser.java | 97 ++++++++-----------
.../query/h2/QueryParserResultSelect.java | 27 +++++-
.../processors/query/h2/dml/UpdatePlanBuilder.java | 7 +-
.../query/h2/opt/join/DistributedJoinContext.java | 22 ++---
.../query/h2/opt/join/DistributedLookupBatch.java | 88 +++++------------
.../query/h2/sql/GridSqlQuerySplitter.java | 33 +++++--
.../query/h2/twostep/GridMapQueryExecutor.java | 28 ++----
.../query/h2/twostep/GridReduceQueryExecutor.java | 15 ++-
.../query/h2/twostep/msg/GridH2QueryRequest.java | 11 ++-
.../query/IgniteQueryDedicatedPoolTest.java | 1 -
.../query/IgniteSqlSegmentedIndexSelfTest.java | 107 +++++++++------------
14 files changed, 213 insertions(+), 290 deletions(-)
diff --git
a/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/H2IndexingAbstractGeoSelfTest.java
b/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/H2IndexingAbstractGeoSelfTest.java
index 423f714..ddf1344 100644
---
a/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/H2IndexingAbstractGeoSelfTest.java
+++
b/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/H2IndexingAbstractGeoSelfTest.java
@@ -562,8 +562,6 @@ public abstract class H2IndexingAbstractGeoSelfTest extends
GridCacheAbstractSel
}
checkDistributedQuery();
-
- checkLocalQuery();
}
finally {
destroy(c1, grid(0), dynamic);
@@ -604,43 +602,6 @@ public abstract class H2IndexingAbstractGeoSelfTest
extends GridCacheAbstractSel
}
/**
- * Check local query.
- *
- * @throws ParseException If failed.
- */
- private void checkLocalQuery() throws ParseException {
- IgniteCache<Integer, Enemy> c1 = grid(0).cache("enemy");
- IgniteCache<Integer, EnemyCamp> c2 = grid(0).cache("camp");
-
- final Geometry lethalArea = new WKTReader().read("POLYGON((30 30, 30
70, 70 70, 70 30, 30 30))");
-
- Set<Integer> localCampsIDs = new HashSet<>();
-
- for(Cache.Entry<Integer, EnemyCamp> e : c2.localEntries())
- localCampsIDs.add(e.getKey());
-
- int expectedEnemies = 0;
-
- for (Cache.Entry<Integer, Enemy> e : c1.localEntries()) {
- final Integer campID = e.getValue().campId;
-
- if (localCampsIDs.contains(campID)) {
- final EnemyCamp camp = c2.get(campID);
-
- if (lethalArea.covers(camp.coords))
- expectedEnemies++;
- }
- }
-
- final SqlFieldsQuery query = new SqlFieldsQuery("select e._val, c._val
from \"enemy\".Enemy e, " +
- "\"camp\".EnemyCamp c where e.campId = c._key and c.coords &&
?").setArgs(lethalArea);
-
- List<List<?>> result = c1.query(query.setLocal(true)).getAll();
-
- assertEquals(expectedEnemies, result.size());
- }
-
- /**
*
*/
private static class Enemy {
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
index aad9cdd..0fcddeb 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -56,7 +56,7 @@ public class GridCacheTwoStepQuery {
private final List<Integer> cacheIds;
/** */
- private final boolean local;
+ private final boolean locSplit;
/** */
private final PartitionResult derivedPartitions;
@@ -87,7 +87,7 @@ public class GridCacheTwoStepQuery {
PartitionResult derivedPartitions,
List<Integer> cacheIds,
boolean mvccEnabled,
- boolean local
+ boolean locSplit
) {
this.originalSql = originalSql;
this.paramsCnt = paramsCnt;
@@ -101,7 +101,7 @@ public class GridCacheTwoStepQuery {
this.derivedPartitions = derivedPartitions;
this.cacheIds = cacheIds;
this.mvccEnabled = mvccEnabled;
- this.local = local;
+ this.locSplit = locSplit;
}
/**
@@ -163,6 +163,13 @@ public class GridCacheTwoStepQuery {
}
/**
+ * @return Whether cache IDs exist.
+ */
+ public boolean hasCacheIds() {
+ return !F.isEmpty(cacheIds);
+ }
+
+ /**
* @return Original query SQL.
*/
public String originalSql() {
@@ -173,7 +180,14 @@ public class GridCacheTwoStepQuery {
* @return {@code True} If query is local.
*/
public boolean isLocal() {
- return local;
+ return F.isEmpty(cacheIds) || locSplit;
+ }
+
+ /**
+ * @return {@code True} if this is local query with split.
+ */
+ public boolean isLocalSplit() {
+ return locSplit;
}
/**
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 621d6e8..0b44cfc 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1550,14 +1550,14 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
// Execute SQL.
assert select != null;
- if (!select.isLocal()) {
+ if (select.splitNeeded()) {
// Distributed query.
GridCacheTwoStepQuery twoStepQry = select.twoStepQuery();
if (ctx.security().enabled())
checkSecurity(twoStepQry.cacheIds());
- FieldsQueryCursor<List<?>> res = executeQuery(
+ FieldsQueryCursor<List<?>> res = executeQueryWithSplit(
schemaName,
qry,
twoStepQry,
@@ -1769,7 +1769,7 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
* @param registerAsNewQry {@code true} In case it's new query which
should be registered as running query,
* @return Cursor representing distributed query result.
*/
- private FieldsQueryCursor<List<?>> executeQuery(String schemaName,
SqlFieldsQuery qry,
+ private FieldsQueryCursor<List<?>> executeQueryWithSplit(String
schemaName, SqlFieldsQuery qry,
GridCacheTwoStepQuery twoStepQry, List<GridQueryFieldMetadata> meta,
boolean keepBinary,
boolean startTx, MvccQueryTracker mvccTracker, GridQueryCancel cancel,
boolean registerAsNewQry) {
if (log.isDebugEnabled())
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
index 35fec00..54a3bbd 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
@@ -29,7 +29,6 @@ import
org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils;
-import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuery;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement;
@@ -245,19 +244,17 @@ public class QueryParser {
H2Utils.setupConnection(c, /*distributedJoins*/false,
/*enforceJoinOrder*/enforceJoinOrderOnParsing);
- boolean loc = qry.isLocal();
-
PreparedStatement stmt;
try {
- stmt = connMgr.prepareStatement(c, qry.getSql());
+ stmt = connMgr.prepareStatementNoCache(c, qry.getSql());
}
catch (SQLException e) {
throw new IgniteSQLException("Failed to parse query. " +
e.getMessage(),
IgniteQueryErrorCode.PARSING, e);
}
- if (loc && GridSqlQueryParser.checkMultipleStatements(stmt))
+ if (qry.isLocal() && GridSqlQueryParser.checkMultipleStatements(stmt))
throw new IgniteSQLException("Multiple statements queries are not
supported for local queries.",
IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
@@ -299,52 +296,6 @@ public class QueryParser {
SqlFieldsQuery newQry =
cloneFieldsQuery(qry).setSql(prepared.getSQL()).setArgs(args);
- // TODO: WTF is that? Modifies global query flag (distr joins),
invokes additional parsing.
- if (prepared.isQuery()) {
- try {
- H2Utils.bindParameters(stmt, F.asList(args));
- }
- catch (IgniteCheckedException e) {
- U.closeQuiet(stmt);
-
- throw new IgniteSQLException("Failed to bind parameters:
[qry=" + prepared.getSQL() + ", params=" +
- Arrays.deepToString(args) + "]",
IgniteQueryErrorCode.PARSING, e);
- }
-
- GridSqlQueryParser parser = null;
-
- if (!loc) {
- parser = new GridSqlQueryParser(false);
-
- GridSqlStatement parsedStmt = parser.parse(prepared);
-
- // Legit assertion - we have H2 query flag above.
- assert parsedStmt instanceof GridSqlQuery;
-
- loc = parser.isLocalQuery();
- }
-
- if (loc) {
- if (parser == null) {
- parser = new GridSqlQueryParser(false);
-
- parser.parse(prepared);
- }
-
- GridCacheContext cctx = parser.getFirstPartitionedCache();
-
- if (cctx != null && cctx.config().getQueryParallelism() > 1) {
- loc = false;
-
- newQry.setDistributedJoins(true);
- }
- }
- }
-
- // Do not cache multiple statements and distributed queries as whole
two step query will be cached later on.
- if (remainingQry != null || !loc)
- connMgr.statementCacheForThread().remove(schemaName, qry.getSql());
-
if (CommandProcessor.isCommand(prepared)) {
GridSqlStatement cmdH2 = new
GridSqlQueryParser(false).parse(prepared);
@@ -364,11 +315,47 @@ public class QueryParser {
IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
}
- // Parse SELECT.
+ // Parse SELECT. Split is required either if query is distirubted, or
when it is local, but executed
+ // over segmented PARTITIONED case. In this case multiple map queries
will be executed against local
+ // node stripes in parallel and then merged through reduce process.
+
+
+ // Calculate if query is in fact can be executed locally.
+ boolean loc = qry.isLocal();
+
+ GridSqlQueryParser parser = null;
+
+ if (!loc) {
+ parser = new GridSqlQueryParser(false);
+
+ parser.parse(prepared);
+
+ if (parser.isLocalQuery())
+ loc = true;
+ }
+
+ // If this is a local query, check if it must be split.
+ boolean locSplit = false;
+
+ if (loc) {
+ if (parser == null) {
+ parser = new GridSqlQueryParser(false);
+
+ parser.parse(prepared);
+ }
+
+ GridCacheContext cctx = parser.getFirstPartitionedCache();
+
+ if (cctx != null && cctx.config().getQueryParallelism() > 1)
+ locSplit = true;
+ }
+
+ boolean splitNeeded = !loc || locSplit;
+
try {
GridCacheTwoStepQuery twoStepQry = null;
- if (!loc) {
+ if (splitNeeded) {
twoStepQry = GridSqlQuerySplitter.split(
connMgr.connectionForThread().connection(newQry.getSchema()),
prepared,
@@ -376,14 +363,14 @@ public class QueryParser {
newQry.isCollocated(),
newQry.isDistributedJoins(),
newQry.isEnforceJoinOrder(),
- newQry.isLocal(),
+ locSplit,
idx
);
}
List<GridQueryFieldMetadata> meta =
H2Utils.meta(stmt.getMetaData());
- QueryParserResultSelect select = new
QueryParserResultSelect(twoStepQry, meta);
+ QueryParserResultSelect select = new
QueryParserResultSelect(twoStepQry, locSplit, meta);
return new QueryParserResult(newQry, remainingQry, select, null,
null);
}
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultSelect.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultSelect.java
index ba95552..29cbe0e 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultSelect.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultSelect.java
@@ -31,6 +31,9 @@ public class QueryParserResultSelect {
/** Two-step query, or {@code} null if this result is for local query. */
private final GridCacheTwoStepQuery twoStepQry;
+ /** Whether local split is needed. */
+ private final boolean locSplit;
+
/** Metadata for two-step query, or {@code} null if this result is for
local query. */
private final List<GridQueryFieldMetadata> meta;
@@ -38,10 +41,19 @@ public class QueryParserResultSelect {
* Constructor.
*
* @param twoStepQry Distributed query plan.
+ * @param locSplit Whether local split is needed.
* @param meta Fields metadata.
*/
- public QueryParserResultSelect(@Nullable GridCacheTwoStepQuery twoStepQry,
List<GridQueryFieldMetadata> meta) {
+ public QueryParserResultSelect(
+ @Nullable GridCacheTwoStepQuery twoStepQry,
+ boolean locSplit,
+ List<GridQueryFieldMetadata> meta
+ ) {
+ // Local split can be true only is there is a two-step plan.
+ assert twoStepQry == null && !locSplit || twoStepQry != null;
+
this.twoStepQry = twoStepQry;
+ this.locSplit = locSplit;
this.meta = meta;
}
@@ -53,6 +65,13 @@ public class QueryParserResultSelect {
}
/**
+ * @return {@code True} if local query should be split.
+ */
+ public boolean localSplit() {
+ return locSplit;
+ }
+
+ /**
* @return Two-step query metadata.
*/
public List<GridQueryFieldMetadata> meta() {
@@ -60,9 +79,9 @@ public class QueryParserResultSelect {
}
/**
- * @return Whether this is a local query.
+ * @return Whether split is needed for this query.
*/
- public boolean isLocal() {
- return twoStepQry == null;
+ public boolean splitNeeded() {
+ return twoStepQry != null;
}
}
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
index 484f5b5..c3f55d5 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
@@ -946,8 +946,11 @@ public final class UpdatePlanBuilder {
idx
);
- boolean distributed = !qry.isLocal() && qry.skipMergeTable()
&& qry.mapQueries().size() == 1 &&
- !qry.mapQueries().get(0).hasSubQueries();
+ boolean distributed =
+ !qry.isLocalSplit() &&
// No split for local qry
+ qry.hasCacheIds() &&
// Over real caches
+ qry.skipMergeTable() &&
// No merge table
+ qry.mapQueries().size() == 1 &&
!qry.mapQueries().get(0).hasSubQueries(); // One w/o subqueries
if (distributed) {
List<Integer> cacheIds = H2Utils.collectCacheIds(idx,
CU.cacheId(cacheName), qry.tables());
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinContext.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinContext.java
index e22e0bf..d4dead3 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinContext.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinContext.java
@@ -30,9 +30,6 @@ import java.util.UUID;
* Context for distributed joins.
*/
public class DistributedJoinContext {
- /** Local flag. */
- private final boolean loc;
-
/** */
private final AffinityTopologyVersion topVer;
@@ -69,7 +66,6 @@ public class DistributedJoinContext {
/**
* Constructor.
*
- * @param loc Local flag.
* @param topVer Topology version.
* @param partsMap Partitions map.
* @param originNodeId ID of the node started the query.
@@ -78,9 +74,14 @@ public class DistributedJoinContext {
* @param pageSize Pahe size.
*/
@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
- public DistributedJoinContext(boolean loc, AffinityTopologyVersion topVer,
Map<UUID, int[]> partsMap,
- UUID originNodeId, long qryId, int segment, int pageSize) {
- this.loc = loc;
+ public DistributedJoinContext(
+ AffinityTopologyVersion topVer,
+ Map<UUID, int[]> partsMap,
+ UUID originNodeId,
+ long qryId,
+ int segment,
+ int pageSize
+ ) {
this.topVer = topVer;
this.partsMap = partsMap;
this.originNodeId = originNodeId;
@@ -90,13 +91,6 @@ public class DistributedJoinContext {
}
/**
- * @return Local flag.
- */
- public boolean local() {
- return loc;
- }
-
- /**
* @return Affinity topology version.
*/
public AffinityTopologyVersion topologyVersion() {
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
index efae97a..7599516 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
@@ -180,8 +180,6 @@ public class DistributedLookupBatch implements
IndexLookupBatch {
Object affKey = getAffinityKey(firstRow, lastRow);
- boolean locQry = localQuery();
-
List<SegmentKey> segmentKeys;
if (affKey != null) {
@@ -189,19 +187,16 @@ public class DistributedLookupBatch implements
IndexLookupBatch {
if (affKey == EXPLICIT_NULL) // Affinity key is explicit null, we
will not find anything.
return false;
- segmentKeys = F.asList(rangeSegment(affKey, locQry));
+ segmentKeys = F.asList(rangeSegment(affKey));
}
else {
// Affinity key is not provided or is not the same in upper and
lower bounds, we have to broadcast.
if (broadcastSegments == null)
- broadcastSegments = broadcastSegments(locQry);
+ broadcastSegments = broadcastSegments();
segmentKeys = broadcastSegments;
}
- if (locQry && segmentKeys.isEmpty())
- return false; // Nothing to do
-
assert !F.isEmpty(segmentKeys) : segmentKeys;
final int rangeId = res.size();
@@ -268,13 +263,6 @@ public class DistributedLookupBatch implements
IndexLookupBatch {
}
/**
- * @return {@code True} if local query execution is enforced.
- */
- private boolean localQuery() {
- return joinCtx != null && joinCtx.local();
- }
-
- /**
*
*/
private void startStreams() {
@@ -331,84 +319,58 @@ public class DistributedLookupBatch implements
IndexLookupBatch {
/**
* @param affKeyObj Affinity key.
- * @param isLocalQry Local query flag.
* @return Segment key for Affinity key.
*/
- public SegmentKey rangeSegment(Object affKeyObj, boolean isLocalQry) {
+ public SegmentKey rangeSegment(Object affKeyObj) {
assert affKeyObj != null && affKeyObj != EXPLICIT_NULL : affKeyObj;
ClusterNode node;
int partition = cctx.affinity().partition(affKeyObj);
- if (isLocalQry) {
- if (joinCtx.partitionsMap() != null) {
- // If we have explicit partitions map, we have to use it to
calculate affinity node.
- UUID nodeId = joinCtx.nodeForPartition(partition, cctx);
-
- if(!cctx.localNodeId().equals(nodeId))
- return null; // Prevent remote index call for local
queries.
- }
-
- if (!cctx.affinity().primaryByKey(cctx.localNode(), partition,
joinCtx.topologyVersion()))
- return null;
+ if (joinCtx.partitionsMap() != null) {
+ // If we have explicit partitions map, we have to use it to
calculate affinity node.
+ UUID nodeId = joinCtx.nodeForPartition(partition, cctx);
- node = cctx.localNode();
+ node = cctx.discovery().node(nodeId);
}
- else{
- if (joinCtx.partitionsMap() != null) {
- // If we have explicit partitions map, we have to use it to
calculate affinity node.
- UUID nodeId = joinCtx.nodeForPartition(partition, cctx);
+ else // Get primary node for current topology version.
+ node = cctx.affinity().primaryByKey(affKeyObj,
joinCtx.topologyVersion());
- node = cctx.discovery().node(nodeId);
- }
- else // Get primary node for current topology version.
- node = cctx.affinity().primaryByKey(affKeyObj,
joinCtx.topologyVersion());
-
- if (node == null) // Node was not found, probably topology changed
and we need to retry the whole query.
- throw H2Utils.retryException("Failed to get primary node by
key for range segment.");
- }
+ if (node == null) // Node was not found, probably topology changed and
we need to retry the whole query.
+ throw H2Utils.retryException("Failed to get primary node by key
for range segment.");
return new SegmentKey(node, idx.segmentForPartition(partition));
}
/**
- * @param isLocalQry Local query flag.
* @return Collection of nodes for broadcasting.
*/
- public List<SegmentKey> broadcastSegments(boolean isLocalQry) {
+ public List<SegmentKey> broadcastSegments() {
Map<UUID, int[]> partMap = joinCtx.partitionsMap();
List<ClusterNode> nodes;
- if (isLocalQry) {
- if (partMap != null && !partMap.containsKey(cctx.localNodeId()))
- return Collections.emptyList(); // Prevent remote index call
for local queries.
-
- nodes = Collections.singletonList(cctx.localNode());
- }
+ if (partMap == null)
+ nodes = new ArrayList<>(CU.affinityNodes(cctx,
joinCtx.topologyVersion()));
else {
- if (partMap == null)
- nodes = new ArrayList<>(CU.affinityNodes(cctx,
joinCtx.topologyVersion()));
- else {
- nodes = new ArrayList<>(partMap.size());
+ nodes = new ArrayList<>(partMap.size());
- for (UUID nodeId : partMap.keySet()) {
- ClusterNode node =
cctx.kernalContext().discovery().node(nodeId);
+ for (UUID nodeId : partMap.keySet()) {
+ ClusterNode node =
cctx.kernalContext().discovery().node(nodeId);
- if (node == null)
- throw H2Utils.retryException("Failed to get node by ID
during broadcast [" +
- "nodeId=" + nodeId + ']');
+ if (node == null)
+ throw H2Utils.retryException("Failed to get node by ID
during broadcast [" +
+ "nodeId=" + nodeId + ']');
- nodes.add(node);
- }
+ nodes.add(node);
}
-
- if (F.isEmpty(nodes))
- throw H2Utils.retryException("Failed to collect affinity nodes
during broadcast [" +
- "cacheName=" + cctx.name() + ']');
}
+ if (F.isEmpty(nodes))
+ throw H2Utils.retryException("Failed to collect affinity nodes
during broadcast [" +
+ "cacheName=" + cctx.name() + ']');
+
int segmentsCount = idx.segmentsCount();
List<SegmentKey> res = new ArrayList<>(nodes.size() * segmentsCount);
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index e3fca3c..19c80d7 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -115,8 +115,8 @@ public class GridSqlQuerySplitter {
/** */
private final boolean collocatedGrpBy;
- /** */
- private final boolean distributedJoins;
+ /** Whether partition extraction is possible. */
+ private final boolean canExtractPartitions;
/** */
private final IdentityHashMap<GridSqlAst, GridSqlAlias> uniqueFromAliases
= new IdentityHashMap<>();
@@ -128,15 +128,25 @@ public class GridSqlQuerySplitter {
* @param params Query parameters.
* @param collocatedGrpBy If it is a collocated GROUP BY query.
* @param distributedJoins Distributed joins flag.
+ * @param locSplit Local split flag.
* @param extractor Partition extractor.
*/
@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
- public GridSqlQuerySplitter(Object[] params, boolean collocatedGrpBy,
boolean distributedJoins,
- PartitionExtractor extractor) {
+ public GridSqlQuerySplitter(
+ Object[] params,
+ boolean collocatedGrpBy,
+ boolean distributedJoins,
+ boolean locSplit,
+ PartitionExtractor extractor
+ ) {
this.params = params;
this.collocatedGrpBy = collocatedGrpBy;
- this.distributedJoins = distributedJoins;
this.extractor = extractor;
+
+ // Partitions *CANNOT* be extracted if:
+ // 1) Distributed joins are enabled
(https://issues.apache.org/jira/browse/IGNITE-10971)
+ // 2) This is a local query with split
(https://issues.apache.org/jira/browse/IGNITE-11316)
+ canExtractPartitions = !distributedJoins && !locSplit;
}
/**
@@ -171,6 +181,7 @@ public class GridSqlQuerySplitter {
* @param collocatedGrpBy Whether the query has collocated GROUP BY keys.
* @param distributedJoins If distributed joins enabled.
* @param enforceJoinOrder Enforce join order.
+ * @param locSplit Whether this is a split for local query.
* @param idx Indexing.
* @return Two step query.
* @throws SQLException If failed.
@@ -183,7 +194,7 @@ public class GridSqlQuerySplitter {
boolean collocatedGrpBy,
boolean distributedJoins,
boolean enforceJoinOrder,
- boolean local,
+ boolean locSplit,
IgniteH2Indexing idx
) throws SQLException, IgniteCheckedException {
SplitterContext.set(distributedJoins);
@@ -196,7 +207,7 @@ public class GridSqlQuerySplitter {
collocatedGrpBy,
distributedJoins,
enforceJoinOrder,
- local,
+ locSplit,
idx
);
}
@@ -212,6 +223,7 @@ public class GridSqlQuerySplitter {
* @param collocatedGrpBy Whether the query has collocated GROUP BY keys.
* @param distributedJoins If distributed joins enabled.
* @param enforceJoinOrder Enforce join order.
+ * @param locSplit Whether this is a split for local query.
* @param idx Indexing.
* @return Two step query.
* @throws SQLException If failed.
@@ -224,7 +236,7 @@ public class GridSqlQuerySplitter {
boolean collocatedGrpBy,
boolean distributedJoins,
boolean enforceJoinOrder,
- boolean local,
+ boolean locSplit,
IgniteH2Indexing idx
) throws SQLException, IgniteCheckedException {
if (params == null)
@@ -244,6 +256,7 @@ public class GridSqlQuerySplitter {
params,
collocatedGrpBy,
distributedJoins,
+ locSplit,
idx.partitionExtractor()
);
@@ -306,7 +319,7 @@ public class GridSqlQuerySplitter {
splitter.extractor.mergeMapQueries(splitter.mapSqlQrys),
cacheIds,
mvccEnabled,
- local || F.isEmpty(cacheIds)
+ locSplit
);
}
@@ -1257,7 +1270,7 @@ public class GridSqlQuerySplitter {
map.partitioned(SplitterUtils.hasPartitionedTables(mapQry));
map.hasSubQueries(SplitterUtils.hasSubQueries(mapQry));
- if (map.isPartitioned() && !distributedJoins)
+ if (map.isPartitioned() && canExtractPartitions)
map.derivedPartitions(extractor.extract(mapQry));
mapSqlQrys.add(map);
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 7a104b0..e6bb564 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -297,13 +297,12 @@ public class GridMapQueryExecutor {
final int[] parts = qryParts == null ? partsMap == null ? null :
partsMap.get(ctx.localNodeId()) : qryParts;
boolean distributedJoins =
req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS);
- boolean local = req.isFlagSet(GridH2QueryRequest.FLAG_IS_LOCAL);
+ boolean enforceJoinOrder =
req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER);
+ boolean explain = req.isFlagSet(GridH2QueryRequest.FLAG_EXPLAIN);
+ boolean replicated = req.isFlagSet(GridH2QueryRequest.FLAG_REPLICATED);
+ boolean lazy = (FORCE_LAZY && req.queries().size() == 1) ||
req.isFlagSet(GridH2QueryRequest.FLAG_LAZY);
- final boolean enforceJoinOrder =
req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER);
- final boolean explain = req.isFlagSet(GridH2QueryRequest.FLAG_EXPLAIN);
- final boolean replicated =
req.isFlagSet(GridH2QueryRequest.FLAG_REPLICATED);
- final boolean lazy = (FORCE_LAZY && req.queries().size() == 1) ||
req.isFlagSet(GridH2QueryRequest.FLAG_LAZY);
- final Boolean dataPageScanEnabled = req.isDataPageScanEnabled();
+ Boolean dataPageScanEnabled = req.isDataPageScanEnabled();
final List<Integer> cacheIds = req.caches();
@@ -393,7 +392,6 @@ public class GridMapQueryExecutor {
parts,
req.pageSize(),
distributedJoins,
- local,
enforceJoinOrder,
false, // Replicated is always false here (see condition
above).
req.timeout(),
@@ -421,7 +419,6 @@ public class GridMapQueryExecutor {
parts,
req.pageSize(),
distributedJoins,
- local,
enforceJoinOrder,
false,
req.timeout(),
@@ -452,7 +449,6 @@ public class GridMapQueryExecutor {
parts,
req.pageSize(),
distributedJoins,
- local,
enforceJoinOrder,
replicated,
req.timeout(),
@@ -477,8 +473,7 @@ public class GridMapQueryExecutor {
* @param partsMap Partitions map for unstable topology.
* @param parts Explicit partitions for current node.
* @param pageSize Page size.
- * @param distributeJoins Query distributed join mode.
- * @param local Lcoal flag.
+ * @param distributedJoins Query distributed join mode.
* @param lazy Streaming flag.
* @param mvccSnapshot MVCC snapshot.
* @param tx Transaction.
@@ -498,8 +493,7 @@ public class GridMapQueryExecutor {
final Map<UUID, int[]> partsMap,
final int[] parts,
final int pageSize,
- final boolean distributeJoins,
- final boolean local,
+ final boolean distributedJoins,
final boolean enforceJoinOrder,
final boolean replicated,
final int timeout,
@@ -537,8 +531,7 @@ public class GridMapQueryExecutor {
partsMap,
parts,
pageSize,
- distributeJoins,
- local,
+ distributedJoins,
enforceJoinOrder,
replicated,
timeout,
@@ -618,9 +611,8 @@ public class GridMapQueryExecutor {
// Prepare query context.
DistributedJoinContext distributedJoinCtx = null;
- if (distributeJoins && !replicated) {
+ if (distributedJoins && !replicated) {
distributedJoinCtx = new DistributedJoinContext(
- local,
topVer,
partsMap,
node.id(),
@@ -642,7 +634,7 @@ public class GridMapQueryExecutor {
Connection conn =
h2.connections().connectionForThread().connection(schemaName);
- H2Utils.setupConnection(conn, distributeJoins, enforceJoinOrder);
+ H2Utils.setupConnection(conn, distributedJoins, enforceJoinOrder);
qryCtxRegistry.setThreadLocal(qctx);
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 24e93cd..ae399f7 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -408,7 +408,8 @@ public class GridReduceQueryExecutor {
boolean forUpdate,
int pageSize
) {
- if (qry.isLocal() && parts != null)
+ // If explicit partitions are set, but there are no real tables,
ignore.
+ if (!qry.hasCacheIds() && parts != null)
parts = null;
assert !qry.mvccEnabled() || mvccTracker != null;
@@ -544,7 +545,7 @@ public class GridReduceQueryExecutor {
throw new CacheException("Partitions are not supported for
replicated caches");
}
- if (qry.isLocal())
+ if (qry.isLocalSplit() || !qry.hasCacheIds())
nodes = singletonList(ctx.discovery().localNode());
else {
ReducePartitionMapResult nodesParts =
@@ -655,8 +656,6 @@ public class GridReduceQueryExecutor {
.parameterIndexes(mapQry.parameterIndexes()));
}
- final boolean distributedJoins = qry.distributedJoins();
-
final long qryReqId0 = qryReqId;
cancel.set(new Runnable() {
@@ -669,12 +668,10 @@ public class GridReduceQueryExecutor {
int flags = singlePartMode && !enforceJoinOrder ? 0 :
GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER;
- if (distributedJoins)
+ // Distributed joins flag is set if it is either reald
+ if (qry.distributedJoins())
flags |= GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS;
- if (qry.isLocal())
- flags |= GridH2QueryRequest.FLAG_IS_LOCAL;
-
if (qry.explain())
flags |= GridH2QueryRequest.FLAG_EXPLAIN;
@@ -691,7 +688,7 @@ public class GridReduceQueryExecutor {
.topologyVersion(topVer)
.pageSize(r.pageSize())
.caches(qry.cacheIds())
- .tables(distributedJoins ? qry.tables() : null)
+ .tables(qry.distributedJoins() ? qry.tables() : null)
.partitions(convert(partsMap))
.queries(mapQrys)
.parameters(params)
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
index 93e27f0..b5c1f77 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@ -68,9 +68,10 @@ public class GridH2QueryRequest implements Message,
GridCacheQueryMarshallable {
public static final int FLAG_ENFORCE_JOIN_ORDER = 1 << 1;
/**
- * Restrict distributed joins range-requests to local index segments.
Range requests to other nodes will not be sent.
+ * Unused. Keep for backward compatibility.
*/
- public static final int FLAG_IS_LOCAL = 1 << 2;
+ @SuppressWarnings("unused")
+ public static final int FLAG_UNUSED = 1 << 2;
/**
* If it is an EXPLAIN command.
@@ -426,10 +427,10 @@ public class GridH2QueryRequest implements Message,
GridCacheQueryMarshallable {
}
/**
- * @param txDetails TX details holder for {@code SELECT FOR UPDATE}, or
{@code null} if not applicable.
+ * @param txReq TX details holder for {@code SELECT FOR UPDATE}, or {@code
null} if not applicable.
*/
- public void txDetails(GridH2SelectForUpdateTxDetails txDetails) {
- this.txReq = txDetails;
+ public void txDetails(GridH2SelectForUpdateTxDetails txReq) {
+ this.txReq = txReq;
}
/**
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryDedicatedPoolTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryDedicatedPoolTest.java
index 416e66c..4cd3faf 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryDedicatedPoolTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryDedicatedPoolTest.java
@@ -91,7 +91,6 @@ public class IgniteQueryDedicatedPoolTest extends
GridCommonAbstractTest {
/**
* Tests that SQL queries involving actual network IO are executed in
dedicated pool.
* @throws Exception If failed.
- * @see GridCacheTwoStepQuery#isLocal()
*/
@Test
public void testSqlQueryUsesDedicatedThreadPool() throws Exception {
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
index be8a5f0..8f57367 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
@@ -27,13 +27,12 @@ import javax.cache.Cache;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheKeyConfiguration;
import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
import org.apache.ignite.cache.eviction.fifo.FifoEvictionPolicy;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
import
org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
import org.junit.Test;
@@ -57,20 +56,7 @@ public class IgniteSqlSegmentedIndexSelfTest extends
AbstractIndexingCommonTest
private static final int ORPHAN_ROWS = 10;
/** */
- private static int QRY_PARALLELISM_LVL = 97;
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName)
throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- CacheKeyConfiguration keyCfg = new CacheKeyConfiguration("MyCache",
"affKey");
-
- cfg.setCacheKeyConfiguration(keyCfg);
-
- cfg.setPeerClassLoadingEnabled(false);
-
- return cfg;
- }
+ private static final int QRY_PARALLELISM_LVL = 97;
/** @return number of nodes to be prestarted. */
protected int nodesCount() {
@@ -78,11 +64,13 @@ public class IgniteSqlSegmentedIndexSelfTest extends
AbstractIndexingCommonTest
}
/** {@inheritDoc} */
+ @SuppressWarnings("deprecation")
@Override protected void beforeTestsStarted() throws Exception {
startGrids(nodesCount());
}
/** {@inheritDoc} */
+ @SuppressWarnings("deprecation")
@Override protected void afterTest() throws Exception {
super.afterTest();
@@ -105,11 +93,11 @@ public class IgniteSqlSegmentedIndexSelfTest extends
AbstractIndexingCommonTest
}
/**
- * @throws Exception If failed.
+ * Test segmented index.
*/
@Test
- public void testSegmentedIndex() throws Exception {
- ignite(0).createCache(cacheConfig(PERSON_CAHE_NAME, true,
Integer.class, Person.class));
+ public void testSegmentedIndex() {
+ ignite(0).createCache(cacheConfig(PERSON_CAHE_NAME, true,
PersonKey.class, Person.class));
ignite(0).createCache(cacheConfig(ORG_CACHE_NAME, true, Integer.class,
Organization.class));
fillCache();
@@ -123,10 +111,9 @@ public class IgniteSqlSegmentedIndexSelfTest extends
AbstractIndexingCommonTest
/**
* Check correct index snapshots with segmented indices.
- * @throws Exception If failed.
*/
@Test
- public void testSegmentedIndexReproducableResults() throws Exception {
+ public void testSegmentedIndexReproducableResults() {
ignite(0).createCache(cacheConfig(ORG_CACHE_NAME, true, Integer.class,
Organization.class));
IgniteCache<Object, Object> cache = ignite(0).cache(ORG_CACHE_NAME);
@@ -149,10 +136,9 @@ public class IgniteSqlSegmentedIndexSelfTest extends
AbstractIndexingCommonTest
/**
* Checks correct <code>select count(*)</code> result with segmented
indices.
- * @throws Exception If failed.
*/
@Test
- public void testSegmentedIndexSizeReproducableResults() throws Exception {
+ public void testSegmentedIndexSizeReproducableResults() {
ignite(0).createCache(cacheConfig(ORG_CACHE_NAME, true, Integer.class,
Organization.class));
IgniteCache<Object, Object> cache = ignite(0).cache(ORG_CACHE_NAME);
@@ -174,12 +160,11 @@ public class IgniteSqlSegmentedIndexSelfTest extends
AbstractIndexingCommonTest
}
/**
- * Run tests on single-node grid
- *
- * @throws Exception If failed.
+ * Run tests on single-node grid.
*/
+ @SuppressWarnings("deprecation")
@Test
- public void testSegmentedIndexWithEvictionPolicy() throws Exception {
+ public void testSegmentedIndexWithEvictionPolicy() {
final IgniteCache<Object, Object> cache = ignite(0).createCache(
cacheConfig(ORG_CACHE_NAME, true, Integer.class,
Organization.class)
.setEvictionPolicy(new FifoEvictionPolicy(10))
@@ -199,11 +184,10 @@ public class IgniteSqlSegmentedIndexSelfTest extends
AbstractIndexingCommonTest
/**
* Verifies that <code>select count(*)</code> return valid result on a
single-node grid.
- *
- * @throws Exception If failed.
*/
+ @SuppressWarnings("deprecation")
@Test
- public void testSizeOnSegmentedIndexWithEvictionPolicy() throws Exception {
+ public void testSizeOnSegmentedIndexWithEvictionPolicy() {
final IgniteCache<Object, Object> cache = ignite(0).createCache(
cacheConfig(ORG_CACHE_NAME, true, Integer.class,
Organization.class)
.setEvictionPolicy(new FifoEvictionPolicy(10))
@@ -228,7 +212,7 @@ public class IgniteSqlSegmentedIndexSelfTest extends
AbstractIndexingCommonTest
*/
@Test
public void testSegmentedPartitionedWithReplicated() throws Exception {
- ignite(0).createCache(cacheConfig(PERSON_CAHE_NAME, true,
Integer.class, Person.class));
+ ignite(0).createCache(cacheConfig(PERSON_CAHE_NAME, true,
PersonKey.class, Person.class));
ignite(0).createCache(cacheConfig(ORG_CACHE_NAME, false,
Integer.class, Organization.class));
fillCache();
@@ -242,17 +226,15 @@ public class IgniteSqlSegmentedIndexSelfTest extends
AbstractIndexingCommonTest
/**
* Check distributed joins.
- *
- * @throws Exception If failed.
*/
- public void checkDistributedQueryWithSegmentedIndex() throws Exception {
+ public void checkDistributedQueryWithSegmentedIndex() {
for (int i = 0; i < nodesCount(); i++) {
- IgniteCache<Integer, Person> c1 =
ignite(i).cache(PERSON_CAHE_NAME);
+ IgniteCache<PersonKey, Person> c1 =
ignite(i).cache(PERSON_CAHE_NAME);
long expPersons = 0;
- for (Cache.Entry<Integer, Person> e : c1) {
- final Integer orgId = e.getValue().orgId;
+ for (Cache.Entry<PersonKey, Person> e : c1) {
+ final Integer orgId = e.getKey().orgId;
// We have as orphan ORG rows as orphan PERSON rows.
if (ORPHAN_ROWS <= orgId && orgId < 500)
@@ -269,14 +251,12 @@ public class IgniteSqlSegmentedIndexSelfTest extends
AbstractIndexingCommonTest
/**
* Test local query.
- *
- * @throws Exception If failed.
*/
- public void checkLocalQueryWithSegmentedIndex() throws Exception {
+ public void checkLocalQueryWithSegmentedIndex() {
for (int i = 0; i < nodesCount(); i++) {
final Ignite node = ignite(i);
- IgniteCache<Integer, Person> c1 = node.cache(PERSON_CAHE_NAME);
+ IgniteCache<PersonKey, Person> c1 = node.cache(PERSON_CAHE_NAME);
IgniteCache<Integer, Organization> c2 = node.cache(ORG_CACHE_NAME);
Set<Integer> locOrgIds = new HashSet<>();
@@ -286,8 +266,8 @@ public class IgniteSqlSegmentedIndexSelfTest extends
AbstractIndexingCommonTest
long expPersons = 0;
- for (Cache.Entry<Integer, Person> e : c1.localEntries()) {
- final Integer orgId = e.getValue().orgId;
+ for (Cache.Entry<PersonKey, Person> e : c1.localEntries()) {
+ final Integer orgId = e.getKey().orgId;
if (locOrgIds.contains(orgId))
expPersons++;
@@ -303,14 +283,12 @@ public class IgniteSqlSegmentedIndexSelfTest extends
AbstractIndexingCommonTest
/**
* Verifies that local <code>select count(*)</code> query returns a
correct result.
- *
- * @throws Exception If failed.
*/
- public void checkLocalSizeQueryWithSegmentedIndex() throws Exception {
+ public void checkLocalSizeQueryWithSegmentedIndex() {
for (int i = 0; i < nodesCount(); i++) {
final Ignite node = ignite(i);
- IgniteCache<Integer, Person> c1 = node.cache(PERSON_CAHE_NAME);
+ IgniteCache<PersonKey, Person> c1 = node.cache(PERSON_CAHE_NAME);
IgniteCache<Integer, Organization> c2 = node.cache(ORG_CACHE_NAME);
Set<Integer> locOrgIds = new HashSet<>();
@@ -320,8 +298,8 @@ public class IgniteSqlSegmentedIndexSelfTest extends
AbstractIndexingCommonTest
int expPersons = 0;
- for (Cache.Entry<Integer, Person> e : c1.localEntries()) {
- final Integer orgId = e.getValue().orgId;
+ for (Cache.Entry<PersonKey, Person> e : c1.localEntries()) {
+ final Integer orgId = e.getKey().orgId;
if (locOrgIds.contains(orgId))
expPersons++;
@@ -349,7 +327,22 @@ public class IgniteSqlSegmentedIndexSelfTest extends
AbstractIndexingCommonTest
// We have as orphan ORG rows as orphan PERSON rows.
int orgID = ORPHAN_ROWS + random.nextInt(ORG_CACHE_SIZE +
ORPHAN_ROWS);
- c1.put(i, new Person(orgID, "pers-" + i));
+ c1.put(new PersonKey(i, orgID), new Person("pers-" + i));
+ }
+ }
+
+ private static class PersonKey {
+ @QuerySqlField
+ int id;
+
+ /** */
+ @AffinityKeyMapped
+ @QuerySqlField
+ Integer orgId;
+
+ public PersonKey(int id, Integer orgId) {
+ this.id = id;
+ this.orgId = orgId;
}
}
@@ -358,26 +351,14 @@ public class IgniteSqlSegmentedIndexSelfTest extends
AbstractIndexingCommonTest
*/
private static class Person implements Serializable {
/** */
- @QuerySqlField(index = true)
- Integer orgId;
-
- /** */
@QuerySqlField
String name;
- /**
- *
- */
- public Person() {
- // No-op.
- }
/**
- * @param orgId Organization ID.
* @param name Name.
*/
- public Person(int orgId, String name) {
- this.orgId = orgId;
+ public Person(String name) {
this.name = name;
}
}