Repository: ignite Updated Branches: refs/heads/master 992c9767e -> f74d51cbf
IGNITE-5054: SQL: Simplified query descriptor, partially removed dependencies on 1-to-1 cache-schema dependency. This closes #1962. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f74d51cb Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f74d51cb Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f74d51cb Branch: refs/heads/master Commit: f74d51cbf9a62858718c5d04b0857a3b0ef32c65 Parents: 992c976 Author: devozerov <[email protected]> Authored: Mon May 22 11:43:14 2017 +0300 Committer: devozerov <[email protected]> Committed: Mon May 22 11:43:14 2017 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 2 + .../cache/query/GridCacheTwoStepQuery.java | 84 ++----- .../processors/cache/query/QueryTable.java | 164 +++++++++++++ .../processors/query/h2/IgniteH2Indexing.java | 84 +++---- .../query/h2/opt/GridH2IndexBase.java | 2 +- .../processors/query/h2/opt/GridH2Table.java | 28 ++- .../query/h2/sql/GridSqlQuerySplitter.java | 21 +- .../query/h2/twostep/GridMapQueryExecutor.java | 15 +- .../h2/twostep/GridReduceQueryExecutor.java | 233 +++++++++---------- .../h2/twostep/msg/GridH2QueryRequest.java | 13 +- .../twostep/msg/GridH2ValueMessageFactory.java | 4 + 11 files changed, 385 insertions(+), 265 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f74d51cb/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 17e4a01..753d8af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -178,6 +178,8 @@ public class GridIoMessageFactory implements MessageFactory { Message msg = null; switch (type) { + // -54 is reserved for SQL. + case -53: msg = new SchemaOperationStatusMessage(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f74d51cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java index 0e31dc0..9e9a875 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache.query; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.Set; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -49,13 +48,7 @@ public class GridCacheTwoStepQuery { private String originalSql; /** */ - private Collection<String> spaces; - - /** */ - private Set<String> schemas; - - /** */ - private Set<String> tbls; + private Set<QueryTable> tbls; /** */ private boolean distributedJoins; @@ -64,22 +57,17 @@ public class GridCacheTwoStepQuery { private boolean skipMergeTbl; /** */ - private List<Integer> caches; - - /** */ - private List<Integer> extraCaches; + private List<Integer> cacheIds; /** */ private boolean local; /** * @param originalSql Original query SQL. - * @param schemas Schema names in query. * @param tbls Tables in query. */ - public GridCacheTwoStepQuery(String originalSql, Set<String> schemas, Set<String> tbls) { + public GridCacheTwoStepQuery(String originalSql, Set<QueryTable> tbls) { this.originalSql = originalSql; - this.schemas = schemas; this.tbls = tbls; } @@ -157,8 +145,8 @@ public class GridCacheTwoStepQuery { public boolean isReplicatedOnly() { assert !mapQrys.isEmpty(); - for (int i = 0; i < mapQrys.size(); i++) { - if (mapQrys.get(i).isPartitioned()) + for (GridCacheSqlQuery mapQry : mapQrys) { + if (mapQry.isPartitioned()) return false; } @@ -187,31 +175,17 @@ public class GridCacheTwoStepQuery { } /** - * @return Caches. + * @return Cache IDs. */ - public List<Integer> caches() { - return caches; + public List<Integer> cacheIds() { + return cacheIds; } /** - * @param caches Caches. + * @param cacheIds Cache IDs. */ - public void caches(List<Integer> caches) { - this.caches = caches; - } - - /** - * @return Caches. - */ - public List<Integer> extraCaches() { - return extraCaches; - } - - /** - * @param extraCaches Caches. - */ - public void extraCaches(List<Integer> extraCaches) { - this.extraCaches = extraCaches; + public void cacheIds(List<Integer> cacheIds) { + this.cacheIds = cacheIds; } /** @@ -222,27 +196,6 @@ public class GridCacheTwoStepQuery { } /** - * @return Spaces. - */ - public Collection<String> spaces() { - return spaces; - } - - /** - * @param spaces Spaces. - */ - public void spaces(Collection<String> spaces) { - this.spaces = spaces; - } - - /** - * @return Schemas. - */ - public Set<String> schemas() { - return schemas; - } - - /** * @return {@code True} If query is local. */ public boolean isLocal() { @@ -262,11 +215,9 @@ public class GridCacheTwoStepQuery { public GridCacheTwoStepQuery copy() { assert !explain; - GridCacheTwoStepQuery cp = new GridCacheTwoStepQuery(originalSql, schemas, tbls); + GridCacheTwoStepQuery cp = new GridCacheTwoStepQuery(originalSql, tbls); - cp.caches = caches; - cp.extraCaches = extraCaches; - cp.spaces = spaces; + cp.cacheIds = cacheIds; cp.rdc = rdc.copy(); cp.skipMergeTbl = skipMergeTbl; cp.pageSize = pageSize; @@ -279,9 +230,16 @@ public class GridCacheTwoStepQuery { } /** + * @return Nuumber of tables. + */ + public int tablesCount() { + return tbls.size(); + } + + /** * @return Tables. */ - public Set<String> tables() { + public Set<QueryTable> tables() { return tbls; } http://git-wip-us.apache.org/repos/asf/ignite/blob/f74d51cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryTable.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryTable.java new file mode 100644 index 0000000..54f5f03 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryTable.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query; + +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +import java.nio.ByteBuffer; + +/** + * Query table descriptor. + */ +public class QueryTable implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** Schema. */ + private String schema; + + /** Table. */ + private String tbl; + + /** + * Defalt constructor. + */ + public QueryTable() { + // No-op. + } + + /** + * Constructor. + * + * @param schema Schema. + * @param tbl Table. + */ + public QueryTable(String schema, String tbl) { + this.schema = schema; + this.tbl = tbl; + } + + /** + * @return Schema. + */ + public String schema() { + return schema; + } + + /** + * @return Table. + */ + public String table() { + return tbl; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeString("schema", schema)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeString("tbl", tbl)) + return false; + + writer.incrementState(); + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + schema = reader.readString("schema"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + tbl = reader.readString("tbl"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } + + return reader.afterMessageRead(QueryTable.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -54; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 2; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return 31 * (schema != null ? schema.hashCode() : 0) + (tbl != null ? tbl.hashCode() : 0); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + if (obj instanceof QueryTable) { + QueryTable other = (QueryTable)obj; + + return F.eq(tbl, other.tbl) && F.eq(schema, other.schema); + } + + return super.equals(obj); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QueryTable.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f74d51cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 599baa1..0874ddc 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -46,6 +46,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.UUID; @@ -88,6 +89,7 @@ import org.apache.ignite.internal.processors.cache.database.tree.io.PageIO; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; +import org.apache.ignite.internal.processors.cache.query.QueryTable; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator; import org.apache.ignite.internal.processors.query.GridQueryCancel; @@ -289,7 +291,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** */ private GridTimeoutProcessor.CancelableTask stmtCacheCleanupTask; - /** + /* * Command in H2 prepared statement. */ static { @@ -397,7 +399,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { private DdlStatementsProcessor ddlProc; /** */ - private final ConcurrentMap<String, GridH2Table> dataTables = new ConcurrentHashMap8<>(); + private final ConcurrentMap<QueryTable, GridH2Table> dataTables = new ConcurrentHashMap8<>(); /** Statement cache. */ private final ConcurrentHashMap<Thread, StatementCache> stmtCache = new ConcurrentHashMap<>(); @@ -1672,44 +1674,33 @@ public class IgniteH2Indexing implements GridQueryIndexing { } } - List<Integer> caches; - List<Integer> extraCaches = null; + LinkedHashSet<Integer> caches0 = new LinkedHashSet<>(); // Setup spaces from schemas. - if (!twoStepQry.schemas().isEmpty()) { - Collection<String> spaces = new ArrayList<>(twoStepQry.schemas().size()); - caches = new ArrayList<>(twoStepQry.schemas().size() + 1); - caches.add(cctx.cacheId()); + assert twoStepQry != null; - for (String schema : twoStepQry.schemas()) { - String space0 = space(schema); + int tblCnt = twoStepQry.tablesCount(); - spaces.add(space0); + if (tblCnt > 0) { + caches0.add(cctx.cacheId()); - if (!F.eq(space0, space)) { - int cacheId = CU.cacheId(space0); + for (QueryTable table : twoStepQry.tables()) { + String cacheName = cacheNameForSchemaAndTable(table.schema(), table.table()); - caches.add(cacheId); + int cacheId = CU.cacheId(cacheName); - if (extraCaches == null) - extraCaches = new ArrayList<>(); - - extraCaches.add(cacheId); - } + caches0.add(cacheId); } - - twoStepQry.spaces(spaces); - } - else { - caches = Collections.singletonList(cctx.cacheId()); - extraCaches = null; } + else + caches0.add(cctx.cacheId()); //Prohibit usage indices with different numbers of segments in same query. - checkCacheIndexSegmentation(caches); + List<Integer> cacheIds = new ArrayList<>(caches0); - twoStepQry.caches(caches); - twoStepQry.extraCaches(extraCaches); + checkCacheIndexSegmentation(cacheIds); + + twoStepQry.cacheIds(cacheIds); twoStepQry.local(qry.isLocal()); meta = meta(stmt.getMetaData()); @@ -1750,6 +1741,18 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** + * Get cache for schema and table. + * + * @param schemaName Schema name. + * @param tblName Table name. + * @return Cache name. + */ + private String cacheNameForSchemaAndTable(String schemaName, String tblName) { + // TODO: This need to be changed. + return space(schemaName); + } + + /** * @throws IllegalStateException if segmented indices used with non-segmented indices. */ private void checkCacheIndexSegmentation(List<Integer> caches) { @@ -2007,15 +2010,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { addInitialUserIndex(spaceName, tbl, usrIdx); if (dataTables.putIfAbsent(h2Tbl.identifier(), h2Tbl) != null) - throw new IllegalStateException("Table already exists: " + h2Tbl.identifier()); - } - - /** - * @param identifier Table identifier. - * @return Data table. - */ - public GridH2Table dataTable(String identifier) { - return dataTables.get(identifier); + throw new IllegalStateException("Table already exists: " + h2Tbl.identifierString()); } /** @@ -2026,12 +2021,17 @@ public class IgniteH2Indexing implements GridQueryIndexing { * @return Table or {@code null} if none found. */ public GridH2Table dataTable(String schemaName, String tblName) { - for (GridH2Table tbl : dataTables.values()) { - if (tbl.getSchema().getName().equals(schemaName) && tbl.getName().equals(tblName)) - return tbl; - } + return dataTable(new QueryTable(schemaName, tblName)); + } - return null; + /** + * Find table by it's identifier. + * + * @param tbl Identifier. + * @return Table or {@code null} if none found. + */ + public GridH2Table dataTable(QueryTable tbl) { + return dataTables.get(tbl); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/f74d51cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java index 623da09..12850f4 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java @@ -132,7 +132,7 @@ public abstract class GridH2IndexBase extends BaseIndex { log = ctx.log(getClass()); - msgTopic = new IgniteBiTuple<>(GridTopic.TOPIC_QUERY, tbl.identifier() + '.' + getName()); + msgTopic = new IgniteBiTuple<>(GridTopic.TOPIC_QUERY, tbl.identifierString() + '.' + getName()); msgLsnr = new GridMessageListener() { @Override public void onMessage(UUID nodeId, Object msg) { http://git-wip-us.apache.org/repos/asf/ignite/blob/f74d51cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java index a00ea90..37c03e3 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java @@ -32,6 +32,7 @@ import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.query.QueryTable; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.query.h2.database.H2RowFactory; import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex; @@ -47,7 +48,6 @@ import org.h2.message.DbException; import org.h2.result.Row; import org.h2.result.SearchRow; import org.h2.result.SortOrder; -import org.h2.table.Column; import org.h2.table.IndexColumn; import org.h2.table.TableBase; import org.h2.table.TableType; @@ -58,7 +58,6 @@ import org.jsr166.LongAdder8; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.KEY_COL; -import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.VAL_COL; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP; /** @@ -104,6 +103,12 @@ public class GridH2Table extends TableBase { /** */ private volatile boolean rebuildFromHashInProgress; + /** Identifier. */ + private final QueryTable identifier; + + /** Identifier as string. */ + private final String identifierStr; + /** * Creates table. * @@ -149,6 +154,10 @@ public class GridH2Table extends TableBase { this.rowFactory = rowFactory; + identifier = new QueryTable(getSchema().getName(), getName()); + + identifierStr = identifier.schema() + "." + identifier.table(); + // Indexes must be created in the end when everything is ready. idxs = idxsFactory.createSystemIndexes(this); @@ -221,7 +230,7 @@ public class GridH2Table extends TableBase { if (destroyed) { unlock(exclusive); - throw new IllegalStateException("Table " + identifier() + " already destroyed."); + throw new IllegalStateException("Table " + identifierString() + " already destroyed."); } if (snapshotInLock()) @@ -293,8 +302,15 @@ public class GridH2Table extends TableBase { /** * @return Table identifier. */ - public String identifier() { - return getSchema().getName() + '.' + getName(); + public QueryTable identifier() { + return identifier; + } + + /** + * @return Table identifier as string. + */ + public String identifierString() { + return identifierStr; } /** @@ -352,7 +368,7 @@ public class GridH2Table extends TableBase { */ private void ensureNotDestroyed() { if (destroyed) - throw new IllegalStateException("Table " + identifier() + " already destroyed."); + throw new IllegalStateException("Table " + identifierString() + " already destroyed."); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/f74d51cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java index 26c6b08..9f01346 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java @@ -36,6 +36,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; +import org.apache.ignite.internal.processors.cache.query.QueryTable; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; @@ -93,11 +94,8 @@ public class GridSqlQuerySplitter { /** */ private int splitId = -1; // The first one will be 0. - /** */ - private Set<String> schemas = new HashSet<>(); - - /** */ - private Set<String> tbls = new HashSet<>(); + /** Query tables. */ + private Set<QueryTable> tbls = new HashSet<>(); /** */ private boolean rdcQrySimple; @@ -224,7 +222,7 @@ public class GridSqlQuerySplitter { } // Setup resulting two step query and return it. - GridCacheTwoStepQuery twoStepQry = new GridCacheTwoStepQuery(originalSql, splitter.schemas, splitter.tbls); + GridCacheTwoStepQuery twoStepQry = new GridCacheTwoStepQuery(originalSql, splitter.tbls); twoStepQry.reduceQuery(splitter.rdcSqlQry); @@ -1500,15 +1498,10 @@ public class GridSqlQuerySplitter { if (from instanceof GridSqlTable) { GridSqlTable tbl = (GridSqlTable)from; - String schema = tbl.schema(); - - boolean addSchema = tbls == null; - - if (tbls != null) - addSchema = tbls.add(tbl.dataTable().identifier()); + String schemaName = tbl.dataTable().identifier().schema(); + String tblName = tbl.dataTable().identifier().table(); - if (addSchema && schema != null && schemas != null) - schemas.add(schema); + tbls.add(new QueryTable(schemaName, tblName)); // In case of alias parent we need to replace the alias itself. if (!prntAlias) http://git-wip-us.apache.org/repos/asf/ignite/blob/f74d51cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 1444209..43cc230 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -56,6 +56,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservabl import org.apache.ignite.internal.processors.cache.query.CacheQueryType; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; +import org.apache.ignite.internal.processors.cache.query.QueryTable; import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode; @@ -101,7 +102,7 @@ public class GridMapQueryExecutor { /** */ private static final Field RESULT_FIELD; - /** + /* * Initialize. */ static { @@ -514,7 +515,7 @@ public class GridMapQueryExecutor { AffinityTopologyVersion topVer, Map<UUID, int[]> partsMap, int[] parts, - Collection<String> tbls, + Collection<QueryTable> tbls, int pageSize, DistributedJoinMode distributedJoinMode, boolean enforceJoinOrder, @@ -567,14 +568,14 @@ public class GridMapQueryExecutor { if (!F.isEmpty(tbls)) { snapshotedTbls = new ArrayList<>(tbls.size()); - for (String identifier : tbls) { - GridH2Table tbl = h2.dataTable(identifier); + for (QueryTable tbl : tbls) { + GridH2Table h2Tbl = h2.dataTable(tbl); - Objects.requireNonNull(tbl, identifier); + Objects.requireNonNull(h2Tbl, tbl.toString()); - tbl.snapshotIndexes(qctx); + h2Tbl.snapshotIndexes(qctx); - snapshotedTbls.add(tbl); + snapshotedTbls.add(h2Tbl); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f74d51cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 3d81cb5..75914ef 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -83,7 +83,6 @@ import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiClosure; import org.apache.ignite.lang.IgniteFuture; -import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.extensions.communication.Message; import org.h2.command.ddl.CreateTableData; import org.h2.engine.Session; @@ -348,19 +347,13 @@ public class GridReduceQueryExecutor { } /** - * @param cctx Cache context for main space. - * @param extraSpaces Extra spaces. + * @param cacheIds Cache IDs. * @return {@code true} If preloading is active. */ - private boolean isPreloadingActive(final GridCacheContext<?, ?> cctx, List<Integer> extraSpaces) { - if (hasMovingPartitions(cctx)) - return true; - - if (extraSpaces != null) { - for (int i = 0; i < extraSpaces.size(); i++) { - if (hasMovingPartitions(cacheContext(extraSpaces.get(i)))) - return true; - } + private boolean isPreloadingActive(List<Integer> cacheIds) { + for (Integer cacheId : cacheIds) { + if (hasMovingPartitions(cacheContext(cacheId))) + return true; } return false; @@ -439,17 +432,14 @@ public class GridReduceQueryExecutor { /** * @param isReplicatedOnly If we must only have replicated caches. * @param topVer Topology version. - * @param cctx Cache context for main space. - * @param extraSpaces Extra spaces. + * @param cacheIds Participating cache IDs. * @param parts Partitions. * @return Data nodes or {@code null} if repartitioning started and we need to retry. */ - private Map<ClusterNode, IntArray> stableDataNodes( - boolean isReplicatedOnly, - AffinityTopologyVersion topVer, - final GridCacheContext<?, ?> cctx, - List<Integer> extraSpaces, - int[] parts) { + private Map<ClusterNode, IntArray> stableDataNodes(boolean isReplicatedOnly, AffinityTopologyVersion topVer, + List<Integer> cacheIds, int[] parts) { + GridCacheContext<?, ?> cctx = cacheContext(cacheIds.get(0)); + Map<ClusterNode, IntArray> map = stableDataNodesMap(topVer, cctx, parts); Set<ClusterNode> nodes = map.keySet(); @@ -457,54 +447,53 @@ public class GridReduceQueryExecutor { if (F.isEmpty(map)) throw new CacheException("Failed to find data nodes for cache: " + cctx.name()); - if (!F.isEmpty(extraSpaces)) { - for (int i = 0; i < extraSpaces.size(); i++) { - GridCacheContext<?,?> extraCctx = cacheContext(extraSpaces.get(i)); + for (int i = 1; i < cacheIds.size(); i++) { + GridCacheContext<?,?> extraCctx = cacheContext(cacheIds.get(i)); - String extraSpace = extraCctx.name(); + String extraCacheName = extraCctx.name(); - if (extraCctx.isLocal()) - continue; // No consistency guaranties for local caches. + if (extraCctx.isLocal()) + continue; // No consistency guaranties for local caches. - if (isReplicatedOnly && !extraCctx.isReplicated()) - throw new CacheException("Queries running on replicated cache should not contain JOINs " + - "with partitioned tables [rCache=" + cctx.name() + ", pCache=" + extraSpace + "]"); + if (isReplicatedOnly && !extraCctx.isReplicated()) + throw new CacheException("Queries running on replicated cache should not contain JOINs " + + "with partitioned tables [replicatedCache=" + cctx.name() + + ", partitionedCache=" + extraCacheName + "]"); - Set<ClusterNode> extraNodes = stableDataNodesMap(topVer, extraCctx, parts).keySet(); + Set<ClusterNode> extraNodes = stableDataNodesMap(topVer, extraCctx, parts).keySet(); - if (F.isEmpty(extraNodes)) - throw new CacheException("Failed to find data nodes for cache: " + extraSpace); + if (F.isEmpty(extraNodes)) + throw new CacheException("Failed to find data nodes for cache: " + extraCacheName); - if (isReplicatedOnly && extraCctx.isReplicated()) { - nodes.retainAll(extraNodes); + if (isReplicatedOnly && extraCctx.isReplicated()) { + nodes.retainAll(extraNodes); - if (map.isEmpty()) { - if (isPreloadingActive(cctx, extraSpaces)) - return null; // Retry. - else - throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() + - ", cache2=" + extraSpace + "]"); - } - } - else if (!isReplicatedOnly && extraCctx.isReplicated()) { - if (!extraNodes.containsAll(nodes)) - if (isPreloadingActive(cctx, extraSpaces)) - return null; // Retry. - else - throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() + - ", cache2=" + extraSpace + "]"); - } - else if (!isReplicatedOnly && !extraCctx.isReplicated()) { - if (!extraNodes.equals(nodes)) - if (isPreloadingActive(cctx, extraSpaces)) - return null; // Retry. - else - throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() + - ", cache2=" + extraSpace + "]"); + if (map.isEmpty()) { + if (isPreloadingActive(cacheIds)) + return null; // Retry. + else + throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() + + ", cache2=" + extraCacheName + "]"); } - else - throw new IllegalStateException(); } + else if (!isReplicatedOnly && extraCctx.isReplicated()) { + if (!extraNodes.containsAll(nodes)) + if (isPreloadingActive(cacheIds)) + return null; // Retry. + else + throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() + + ", cache2=" + extraCacheName + "]"); + } + else if (!isReplicatedOnly && !extraCctx.isReplicated()) { + if (!extraNodes.equals(nodes)) + if (isPreloadingActive(cacheIds)) + return null; // Retry. + else + throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() + + ", cache2=" + extraCacheName + "]"); + } + else + throw new IllegalStateException(); } return map; @@ -537,8 +526,6 @@ public class GridReduceQueryExecutor { final boolean isReplicatedOnly = qry.isReplicatedOnly(); // Fail if all caches are replicated and explicit partitions are set. - - for (int attempt = 0;; attempt++) { if (attempt != 0) { try { @@ -561,7 +548,7 @@ public class GridReduceQueryExecutor { AffinityTopologyVersion topVer = h2.readyTopologyVersion(); - List<Integer> extraSpaces = qry.extraCaches(); + List<Integer> cacheIds = qry.cacheIds(); Collection<ClusterNode> nodes = null; @@ -572,29 +559,29 @@ public class GridReduceQueryExecutor { Map<ClusterNode, IntArray> qryMap = null; // Partitions are not supported for queries over all replicated caches. - if (cctx.isReplicated() && parts != null) { - boolean failIfReplicatedOnly = true; + if (parts != null) { + boolean replicatedOnly = true; - for (Integer cacheId : extraSpaces) { + for (Integer cacheId : cacheIds) { if (!cacheContext(cacheId).isReplicated()) { - failIfReplicatedOnly = false; + replicatedOnly = false; break; } } - if (failIfReplicatedOnly) + if (replicatedOnly) throw new CacheException("Partitions are not supported for replicated caches"); } if (qry.isLocal()) nodes = singletonList(ctx.discovery().localNode()); else { - if (isPreloadingActive(cctx, extraSpaces)) { + if (isPreloadingActive(cacheIds)) { if (isReplicatedOnly) - nodes = replicatedUnstableDataNodes(cctx, extraSpaces); + nodes = replicatedUnstableDataNodes(cacheIds); else { - partsMap = partitionedUnstableDataNodes(cctx, extraSpaces); + partsMap = partitionedUnstableDataNodes(cacheIds); if (partsMap != null) { qryMap = narrowForQuery(partsMap, parts); @@ -602,8 +589,9 @@ public class GridReduceQueryExecutor { nodes = qryMap == null ? null : qryMap.keySet(); } } - } else { - qryMap = stableDataNodes(isReplicatedOnly, topVer, cctx, extraSpaces, parts); + } + else { + qryMap = stableDataNodes(isReplicatedOnly, topVer, cacheIds, parts); if (qryMap != null) nodes = qryMap.keySet(); @@ -633,7 +621,7 @@ public class GridReduceQueryExecutor { final boolean skipMergeTbl = !qry.explain() && qry.skipMergeTable(); final int segmentsPerIndex = qry.explain() || isReplicatedOnly ? 1 : - findFirstPartitioned(cctx, extraSpaces).config().getQueryParallelism(); + findFirstPartitioned(cacheIds).config().getQueryParallelism(); int replicatedQrysCnt = 0; @@ -731,7 +719,7 @@ public class GridReduceQueryExecutor { .requestId(qryReqId) .topologyVersion(topVer) .pageSize(r.pageSize) - .caches(qry.caches()) + .caches(qry.cacheIds()) .tables(distributedJoins ? qry.tables() : null) .partitions(convert(partsMap)) .queries(mapQrys) @@ -873,22 +861,18 @@ public class GridReduceQueryExecutor { } /** - * @param cctx Cache context for main space. - * @param extraSpaces Extra spaces. + * @param cacheIds Cache IDs. * @return The first partitioned cache context. */ - private GridCacheContext<?,?> findFirstPartitioned(GridCacheContext<?,?> cctx, List<Integer> extraSpaces) { - if (cctx.isLocal()) - throw new CacheException("Cache is LOCAL: " + cctx.name()); - - if (!cctx.isReplicated()) - return cctx; + private GridCacheContext<?,?> findFirstPartitioned(List<Integer> cacheIds) { + for (int i = 0; i < cacheIds.size(); i++) { + GridCacheContext<?, ?> cctx = cacheContext(cacheIds.get(i)); - for (int i = 0 ; i < extraSpaces.size(); i++) { - GridCacheContext<?, ?> extraCctx = cacheContext(extraSpaces.get(i)); + if (i == 0 && cctx.isLocal()) + throw new CacheException("Cache is LOCAL: " + cctx.name()); - if (!extraCctx.isReplicated() && !extraCctx.isLocal()) - return extraCctx; + if (!cctx.isReplicated() && !cctx.isLocal()) + return cctx; } throw new IllegalStateException("Failed to find partitioned cache."); @@ -997,20 +981,20 @@ public class GridReduceQueryExecutor { /** * Calculates data nodes for replicated caches on unstable topology. * - * @param cctx Cache context for main space. - * @param extraSpaces Extra spaces. + * @param cacheIds Cache IDs. * @return Collection of all data nodes owning all the caches or {@code null} for retry. */ - private Collection<ClusterNode> replicatedUnstableDataNodes(GridCacheContext<?, ?> cctx, - List<Integer> extraSpaces) { + private Collection<ClusterNode> replicatedUnstableDataNodes(List<Integer> cacheIds) { int i = 0; + GridCacheContext<?, ?> cctx = cacheContext(cacheIds.get(i++)); + // The main cache is allowed to be partitioned. if (!cctx.isReplicated()) { - assert !F.isEmpty(extraSpaces): "no extra replicated caches with partitioned main cache"; + assert cacheIds.size() > 1: "no extra replicated caches with partitioned main cache"; // Just replace the main cache with the first one extra. - cctx = cacheContext(extraSpaces.get(i++)); + cctx = cacheContext(cacheIds.get(i++)); assert cctx.isReplicated(): "all the extra caches must be replicated here"; } @@ -1020,27 +1004,26 @@ public class GridReduceQueryExecutor { if (F.isEmpty(nodes)) return null; // Retry. - if (!F.isEmpty(extraSpaces)) { - for (;i < extraSpaces.size(); i++) { - GridCacheContext<?, ?> extraCctx = cacheContext(extraSpaces.get(i)); + for (;i < cacheIds.size(); i++) { + GridCacheContext<?, ?> extraCctx = cacheContext(cacheIds.get(i)); - if (extraCctx.isLocal()) - continue; + if (extraCctx.isLocal()) + continue; - if (!extraCctx.isReplicated()) - throw new CacheException("Queries running on replicated cache should not contain JOINs " + - "with tables in partitioned caches [rCache=" + cctx.name() + ", pCache=" + extraCctx.name() + "]"); + if (!extraCctx.isReplicated()) + throw new CacheException("Queries running on replicated cache should not contain JOINs " + + "with tables in partitioned caches [replicatedCache=" + cctx.name() + ", " + + "partitionedCache=" + extraCctx.name() + "]"); - Set<ClusterNode> extraOwners = replicatedUnstableDataNodes(extraCctx); + Set<ClusterNode> extraOwners = replicatedUnstableDataNodes(extraCctx); - if (F.isEmpty(extraOwners)) - return null; // Retry. + if (F.isEmpty(extraOwners)) + return null; // Retry. - nodes.retainAll(extraOwners); + nodes.retainAll(extraOwners); - if (nodes.isEmpty()) - return null; // Retry. - } + if (nodes.isEmpty()) + return null; // Retry. } return nodes; @@ -1092,23 +1075,19 @@ public class GridReduceQueryExecutor { /** * Calculates partition mapping for partitioned cache on unstable topology. * - * @param cctx Cache context for main space. - * @param extraSpaces Extra spaces. + * @param cacheIds Cache IDs. * @return Partition mapping or {@code null} if we can't calculate it due to repartitioning and we need to retry. */ @SuppressWarnings("unchecked") - private Map<ClusterNode, IntArray> partitionedUnstableDataNodes(GridCacheContext<?,?> cctx, - List<Integer> extraSpaces) { - assert !cctx.isLocal() : cctx.name() + " must not be LOCAL"; - + private Map<ClusterNode, IntArray> partitionedUnstableDataNodes(List<Integer> cacheIds) { // If the main cache is replicated, just replace it with the first partitioned. - cctx = findFirstPartitioned(cctx, extraSpaces); + GridCacheContext<?,?> cctx = findFirstPartitioned(cacheIds); final int partsCnt = cctx.affinity().partitions(); - if (extraSpaces != null) { // Check correct number of partitions for partitioned caches. - for (int i = 0; i < extraSpaces.size(); i++) { - GridCacheContext<?, ?> extraCctx = cacheContext(extraSpaces.get(i)); + if (cacheIds.size() > 1) { // Check correct number of partitions for partitioned caches. + for (Integer cacheId : cacheIds) { + GridCacheContext<?, ?> extraCctx = cacheContext(cacheId); if (extraCctx.isReplicated() || extraCctx.isLocal()) continue; @@ -1117,14 +1096,15 @@ public class GridReduceQueryExecutor { if (parts != partsCnt) throw new CacheException("Number of partitions must be the same for correct collocation [cache1=" + - cctx.name() + ", parts1=" + partsCnt + ", cache2=" + extraCctx.name() + ", parts2=" + parts + "]"); + cctx.name() + ", parts1=" + partsCnt + ", cache2=" + extraCctx.name() + + ", parts2=" + parts + "]"); } } Set<ClusterNode>[] partLocs = new Set[partsCnt]; // Fill partition locations for main cache. - for (int p = 0, parts = cctx.affinity().partitions(); p < parts; p++) { + for (int p = 0; p < partsCnt; p++) { List<ClusterNode> owners = cctx.topology().owners(p); if (F.isEmpty(owners)) { @@ -1143,11 +1123,11 @@ public class GridReduceQueryExecutor { partLocs[p] = new HashSet<>(owners); } - if (extraSpaces != null) { + if (cacheIds.size() > 1) { // Find owner intersections for each participating partitioned cache partition. // We need this for logical collocation between different partitioned caches with the same affinity. - for (int i = 0; i < extraSpaces.size(); i++) { - GridCacheContext<?, ?> extraCctx = cacheContext(extraSpaces.get(i)); + for (Integer cacheId : cacheIds) { + GridCacheContext<?, ?> extraCctx = cacheContext(cacheId); // This is possible if we have replaced a replicated cache with a partitioned one earlier. if (cctx == extraCctx) @@ -1156,7 +1136,7 @@ public class GridReduceQueryExecutor { if (extraCctx.isReplicated() || extraCctx.isLocal()) continue; - for (int p = 0, parts = extraCctx.affinity().partitions(); p < parts; p++) { + for (int p = 0, parts = extraCctx.affinity().partitions(); p < parts; p++) { List<ClusterNode> owners = extraCctx.topology().owners(p); if (partLocs[p] == UNMAPPED_PARTS) @@ -1166,7 +1146,8 @@ public class GridReduceQueryExecutor { if (!F.isEmpty(dataNodes(extraCctx.name(), NONE))) return null; // Retry. - throw new CacheException("Failed to find data nodes [cache=" + extraCctx.name() + ", part=" + p + "]"); + throw new CacheException("Failed to find data nodes [cache=" + extraCctx.name() + + ", part=" + p + "]"); } if (partLocs[p] == null) @@ -1181,8 +1162,8 @@ public class GridReduceQueryExecutor { } // Filter nodes where not all the replicated caches loaded. - for (int i = 0; i < extraSpaces.size(); i++) { - GridCacheContext<?,?> extraCctx = cacheContext(extraSpaces.get(i)); + for (Integer cacheId : cacheIds) { + GridCacheContext<?, ?> extraCctx = cacheContext(cacheId); if (!extraCctx.isReplicated()) continue; http://git-wip-us.apache.org/repos/asf/ignite/blob/f74d51cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java index 17bb9f6..beb1ae2 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java @@ -33,6 +33,7 @@ import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; +import org.apache.ignite.internal.processors.cache.query.QueryTable; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -110,8 +111,8 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { /** */ @GridToStringInclude - @GridDirectCollection(String.class) - private Collection<String> tbls; + @GridDirectCollection(Message.class) + private Collection<QueryTable> tbls; /** */ private int timeout; @@ -173,7 +174,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { * @param tbls Tables. * @return {@code this}. */ - public GridH2QueryRequest tables(Collection<String> tbls) { + public GridH2QueryRequest tables(Collection<QueryTable> tbls) { this.tbls = tbls; return this; @@ -182,7 +183,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { /** * @return Tables. */ - public Collection<String> tables() { + public Collection<QueryTable> tables() { return tbls; } @@ -434,7 +435,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { writer.incrementState(); case 7: - if (!writer.writeCollection("tbls", tbls, MessageCollectionItemType.STRING)) + if (!writer.writeCollection("tbls", tbls, MessageCollectionItemType.MSG)) return false; writer.incrementState(); @@ -527,7 +528,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { reader.incrementState(); case 7: - tbls = reader.readCollection("tbls", MessageCollectionItemType.STRING); + tbls = reader.readCollection("tbls", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/f74d51cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java index 3a825f7..18b1afb 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.Iterator; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.cache.query.QueryTable; import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageFactory; @@ -108,6 +109,9 @@ public class GridH2ValueMessageFactory implements MessageFactory { case -35: return new GridH2RowRangeBounds(); + + case -54: + return new QueryTable(); } return null;
