IGNITE-4509: Send unicast SQL request when partition can be deduced from the query. This closes #1916.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fbf0e353 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fbf0e353 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fbf0e353 Branch: refs/heads/ignite-5075 Commit: fbf0e35371949309f07105331f3bafb54da6c29c Parents: 79d45e3 Author: Sergey Kalashnikov <[email protected]> Authored: Fri Jun 2 14:20:37 2017 +0300 Committer: devozerov <[email protected]> Committed: Fri Jun 2 14:20:37 2017 +0300 ---------------------------------------------------------------------- .../cache/query/GridCacheSqlQuery.java | 23 + .../cache/query/CacheQueryPartitionInfo.java | 110 ++++ .../cache/query/GridCacheTwoStepQuery.java | 18 + .../processors/query/h2/IgniteH2Indexing.java | 50 +- .../query/h2/sql/GridSqlQuerySplitter.java | 228 +++++++- .../processors/query/IgniteSqlRoutingTest.java | 552 +++++++++++++++++++ .../IgniteCacheQuerySelfTestSuite.java | 2 + 7 files changed, 976 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/fbf0e353/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java index 780e462..d3746f3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java @@ -69,6 +69,11 @@ public class GridCacheSqlQuery implements Message { /** Single node to execute the query on. */ private UUID node; + /** Derived partition info. */ + @GridToStringInclude + @GridDirectTransient + private transient Object[] derivedPartitions; + /** * For {@link Message}. */ @@ -253,6 +258,7 @@ public class GridCacheSqlQuery implements Message { cp.paramIdxs = paramIdxs; cp.sort = sort; cp.partitioned = partitioned; + cp.derivedPartitions = derivedPartitions; return cp; } @@ -324,4 +330,21 @@ public class GridCacheSqlQuery implements Message { return res; } + + /** + * @return Derived partitions. + */ + public Object[] derivedPartitions() { + return derivedPartitions; + } + + /** + * @param derivedPartitions Derived partitions. + * @return {@code this}. + */ + public GridCacheSqlQuery derivedPartitions(Object[] derivedPartitions) { + this.derivedPartitions = derivedPartitions; + + return this; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/fbf0e353/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryPartitionInfo.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryPartitionInfo.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryPartitionInfo.java new file mode 100644 index 0000000..1329d5c --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryPartitionInfo.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query; + +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * Holds the partition calculation info extracted from a query. + * The query may have several such items associated with it. + * + * The query may contain expressions containing key or affinity key. + * Such expressions can be used as hints to derive small isolated set + * of partitions the query needs to run on. + * + * In case expression contains constant (e.g. _key = 100), the partition + * can be calculated right away and saved into cache along with the query. + * + * In case expression has a parameter (e.g. _key = ?), the effective + * partition varies with each run of the query. Hence, instead of partition, + * one must store the info required to calculate partition. + * + * The given class holds the required info, so that effective partition + * can be calculated during query parameter binding. + */ +public class CacheQueryPartitionInfo { + /** */ + private int partId; + + /** */ + private String cacheName; + + /** */ + private int paramIdx; + + /** + * @param partId Partition id, or -1 if parameter binding required. + * @param cacheName Cache name required for partition calculation. + * @param paramIdx Query parameter index required for partition calculation. + */ + public CacheQueryPartitionInfo(int partId, String cacheName, int paramIdx) { + this.partId = partId; + this.cacheName = cacheName; + this.paramIdx = paramIdx; + } + + /** + * @return Partition id, or -1 if parameter binding is required to calculate partition. + */ + public int partition() { + return partId; + } + + /** + * @return Cache name required for partition calculation. + */ + public String cacheName() { + return cacheName; + } + + /** + * @return Query parameter index required for partition calculation. + */ + public int paramIdx() { + return paramIdx; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return partId ^ paramIdx ^ (cacheName == null ? 0 : cacheName.hashCode()); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + if (this == obj) + return true; + + if (!(obj instanceof CacheQueryPartitionInfo)) + return false; + + CacheQueryPartitionInfo other = (CacheQueryPartitionInfo)obj; + + if (partId >= 0) + return partId == other.partId; + + if (other.cacheName == null) + return false; + + return other.cacheName.equals(cacheName) && other.paramIdx == paramIdx; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheQueryPartitionInfo.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/fbf0e353/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java index 9e9a875..24958af 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java @@ -62,6 +62,9 @@ public class GridCacheTwoStepQuery { /** */ private boolean local; + /** */ + private CacheQueryPartitionInfo[] derivedPartitions; + /** * @param originalSql Original query SQL. * @param tbls Tables in query. @@ -210,6 +213,20 @@ public class GridCacheTwoStepQuery { } /** + * @return Query derived partitions info. + */ + public CacheQueryPartitionInfo[] derivedPartitions() { + return this.derivedPartitions; + } + + /** + * @param derivedPartitions Query derived partitions info. + */ + public void derivedPartitions(CacheQueryPartitionInfo[] derivedPartitions) { + this.derivedPartitions = derivedPartitions; + } + + /** * @return Copy. */ public GridCacheTwoStepQuery copy() { @@ -222,6 +239,7 @@ public class GridCacheTwoStepQuery { cp.skipMergeTbl = skipMergeTbl; cp.pageSize = pageSize; cp.distributedJoins = distributedJoins; + cp.derivedPartitions = derivedPartitions; for (int i = 0; i < mapQrys.size(); i++) cp.mapQrys.add(mapQrys.get(i).copy()); http://git-wip-us.apache.org/repos/asf/ignite/blob/fbf0e353/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 50c8e41..12addbd 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -73,6 +73,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio import org.apache.ignite.internal.processors.cache.QueryCursorImpl; import org.apache.ignite.internal.processors.cache.database.CacheDataRow; import org.apache.ignite.internal.processors.cache.database.tree.io.PageIO; +import org.apache.ignite.internal.processors.cache.query.CacheQueryPartitionInfo; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; @@ -1387,9 +1388,20 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (cancel == null) cancel = new GridQueryCancel(); + int partitions[] = qry.getPartitions(); + + if (partitions == null && twoStepQry.derivedPartitions() != null) { + try { + partitions = calculateQueryPartitions(twoStepQry.derivedPartitions(), qry.getArgs()); + } catch (IgniteCheckedException e) { + throw new CacheException("Failed to calculate derived partitions: [qry=" + sqlQry + ", params=" + + Arrays.deepToString(qry.getArgs()) + "]", e); + } + } + QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>( runQueryTwoStep(schemaName, twoStepQry, keepBinary, enforceJoinOrder, qry.getTimeout(), cancel, - qry.getArgs(), qry.getPartitions()), cancel); + qry.getArgs(), partitions), cancel); cursor.fieldsMeta(meta); @@ -2245,6 +2257,42 @@ public class IgniteH2Indexing implements GridQueryIndexing { rdcQryExec.onDisconnected(reconnectFut); } + /** + * Bind query parameters and calculate partitions derived from the query. + * + * @return Partitions. + */ + private int[] calculateQueryPartitions(CacheQueryPartitionInfo[] partInfoList, Object[] params) + throws IgniteCheckedException { + + ArrayList<Integer> list = new ArrayList<>(partInfoList.length); + + for (CacheQueryPartitionInfo partInfo: partInfoList) { + int partId = partInfo.partition() < 0 ? + kernalContext().affinity().partition(partInfo.cacheName(), params[partInfo.paramIdx()]) : + partInfo.partition(); + + int i = 0; + + while (i < list.size() && list.get(i) < partId) + i++; + + if (i < list.size()) { + if (list.get(i) > partId) + list.add(i, partId); + } + else + list.add(partId); + } + + int[] result = new int[list.size()]; + + for (int i = 0; i < list.size(); i++) + result[i] = list.get(i); + + return result; + } + /** {@inheritDoc} */ @Override public Collection<GridRunningQueryInfo> runningQueries(long duration) { Collection<GridRunningQueryInfo> res = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/fbf0e353/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java index e2b82ab..1b9619e 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java @@ -34,11 +34,15 @@ import java.util.TreeSet; import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.cache.query.CacheQueryPartitionInfo; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; import org.apache.ignite.internal.processors.cache.query.QueryTable; import org.apache.ignite.internal.processors.query.h2.H2Utils; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; @@ -46,6 +50,7 @@ import org.h2.command.Prepared; import org.h2.command.dml.Query; import org.h2.command.dml.SelectUnion; import org.h2.jdbc.JdbcPreparedStatement; +import org.h2.table.IndexColumn; import org.h2.value.Value; import org.jetbrains.annotations.Nullable; @@ -116,13 +121,17 @@ public class GridSqlQuerySplitter { /** */ private IdentityHashMap<GridSqlAst, GridSqlAlias> uniqueFromAliases = new IdentityHashMap<>(); + /** */ + private GridKernalContext ctx; + /** * @param params Query parameters. * @param collocatedGrpBy If it is a collocated GROUP BY query. */ - public GridSqlQuerySplitter(Object[] params, boolean collocatedGrpBy) { + public GridSqlQuerySplitter(Object[] params, boolean collocatedGrpBy, GridKernalContext ctx) { this.params = params; this.collocatedGrpBy = collocatedGrpBy; + this.ctx = ctx; } /** @@ -182,7 +191,7 @@ public class GridSqlQuerySplitter { qry.explain(false); - GridSqlQuerySplitter splitter = new GridSqlQuerySplitter(params, collocatedGrpBy); + GridSqlQuerySplitter splitter = new GridSqlQuerySplitter(params, collocatedGrpBy, h2.kernalContext()); // Normalization will generate unique aliases for all the table filters in FROM. // Also it will collect all tables and schemas from the query. @@ -234,13 +243,16 @@ public class GridSqlQuerySplitter { twoStepQry.explain(explain); twoStepQry.distributedJoins(distributedJoins); + // all map queries must have non-empty derivedPartitions to use this feature. + twoStepQry.derivedPartitions(mergePartitionsFromMultipleQueries(twoStepQry.mapQueries())); + return twoStepQry; } /** * @param qry Optimized and normalized query to split. */ - private void splitQuery(GridSqlQuery qry) { + private void splitQuery(GridSqlQuery qry) throws IgniteCheckedException { // Create a fake parent AST element for the query to allow replacing the query in the parent by split. GridSqlSubquery fakeQryPrnt = new GridSqlSubquery(qry); @@ -1016,7 +1028,7 @@ public class GridSqlQuerySplitter { /** * @param qrym Query model. */ - private void splitQueryModel(QueryModel qrym) { + private void splitQueryModel(QueryModel qrym) throws IgniteCheckedException { switch (qrym.type) { case SELECT: if (qrym.needSplit) { @@ -1191,7 +1203,7 @@ public class GridSqlQuerySplitter { private void splitSelect( final GridSqlAst prnt, final int childIdx - ) { + ) throws IgniteCheckedException { if (++splitId > 99) throw new CacheException("Too complex query to process."); @@ -1317,6 +1329,9 @@ public class GridSqlQuerySplitter { map.sortColumns(mapQry.sort()); map.partitioned(hasPartitionedTables(mapQry)); + if (map.isPartitioned()) + map.derivedPartitions(derivePartitionsFromQuery(mapQry, ctx)); + mapSqlQrys.add(map); } @@ -1985,8 +2000,209 @@ public class GridSqlQuerySplitter { * @return true if given type is fractional */ private static boolean isFractionalType(int type) { - return type == Value.DECIMAL || type == Value.FLOAT || type == Value.DOUBLE; + return type == Value.DECIMAL || type == Value.FLOAT || type == Value.DOUBLE; + } + + /** + * Checks if given query contains expressions over key or affinity key + * that make it possible to run it only on a small isolated + * set of partitions. + * + * @param qry Query. + * @param ctx Kernal context. + * @return Array of partitions, or {@code null} if none identified + */ + private static CacheQueryPartitionInfo[] derivePartitionsFromQuery(GridSqlQuery qry, GridKernalContext ctx) + throws IgniteCheckedException { + + if (!(qry instanceof GridSqlSelect)) + return null; + + GridSqlSelect select = (GridSqlSelect)qry; + + // no joins support yet + if (select.from() == null || select.from().size() != 1) + return null; + + return extractPartition(select.where(), ctx); + } + + /** + * @param el AST element to start with. + * @param ctx Kernal context. + * @return Array of partition info objects, or {@code null} if none identified + */ + private static CacheQueryPartitionInfo[] extractPartition(GridSqlAst el, GridKernalContext ctx) + throws IgniteCheckedException { + + if (!(el instanceof GridSqlOperation)) + return null; + + GridSqlOperation op = (GridSqlOperation)el; + + switch (op.operationType()) { + case EQUAL: { + CacheQueryPartitionInfo partInfo = extractPartitionFromEquality(op, ctx); + + if (partInfo != null) + return new CacheQueryPartitionInfo[] { partInfo }; + + return null; + } + + case AND: { + assert op.size() == 2; + + CacheQueryPartitionInfo[] partsLeft = extractPartition(op.child(0), ctx); + CacheQueryPartitionInfo[] partsRight = extractPartition(op.child(1), ctx); + + if (partsLeft != null && partsRight != null) + return null; //kind of conflict (_key = 1) and (_key = 2) + + if (partsLeft != null) + return partsLeft; + + if (partsRight != null) + return partsRight; + + return null; + } + + case OR: { + assert op.size() == 2; + + CacheQueryPartitionInfo[] partsLeft = extractPartition(op.child(0), ctx); + CacheQueryPartitionInfo[] partsRight = extractPartition(op.child(1), ctx); + + if (partsLeft != null && partsRight != null) + return mergePartitionInfo(partsLeft, partsRight); + + return null; + } + + default: + return null; + } + } + + /** + * Analyses the equality operation and extracts the partition if possible + * + * @param op AST equality operation. + * @param ctx Kernal Context. + * @return partition info, or {@code null} if none identified + */ + private static CacheQueryPartitionInfo extractPartitionFromEquality(GridSqlOperation op, GridKernalContext ctx) + throws IgniteCheckedException { + + assert op.operationType() == GridSqlOperationType.EQUAL; + + GridSqlElement left = op.child(0); + GridSqlElement right = op.child(1); + + if (!(left instanceof GridSqlColumn)) + return null; + + if (!(right instanceof GridSqlConst) && !(right instanceof GridSqlParameter)) + return null; + + GridSqlColumn column = (GridSqlColumn)left; + + assert column.column().getTable() instanceof GridH2Table; + + GridH2Table tbl = (GridH2Table) column.column().getTable(); + + GridH2RowDescriptor desc = tbl.rowDescriptor(); + + IndexColumn affKeyCol = tbl.getAffinityKeyColumn(); + + int colId = column.column().getColumnId(); + + if ((affKeyCol == null || colId != affKeyCol.column.getColumnId()) && !desc.isKeyColumn(colId)) + return null; + + if (right instanceof GridSqlConst) { + GridSqlConst constant = (GridSqlConst)right; + + return new CacheQueryPartitionInfo(ctx.affinity().partition(tbl.cacheName(), + constant.value().getObject()), null, -1); + } + + assert right instanceof GridSqlParameter; + + GridSqlParameter param = (GridSqlParameter) right; + + return new CacheQueryPartitionInfo(-1, tbl.cacheName(), param.index()); } + + /** + * Merges two partition info arrays, removing duplicates + * + * @param a Partition info array. + * @param b Partition info array. + * @return Result. + */ + private static CacheQueryPartitionInfo[] mergePartitionInfo(CacheQueryPartitionInfo[] a, CacheQueryPartitionInfo[] b) { + assert a != null; + assert b != null; + + if (a.length == 1 && b.length == 1) { + if (a[0].equals(b[0])) + return new CacheQueryPartitionInfo[] { a[0] }; + + return new CacheQueryPartitionInfo[] { a[0], b[0] }; + } + + ArrayList<CacheQueryPartitionInfo> list = new ArrayList<>(a.length + b.length); + + for (CacheQueryPartitionInfo part: a) + list.add(part); + + for (CacheQueryPartitionInfo part: b) { + int i = 0; + + while (i < list.size() && !list.get(i).equals(part)) + i++; + + if (i == list.size()) + list.add(part); + } + + CacheQueryPartitionInfo[] result = new CacheQueryPartitionInfo[list.size()]; + + for (int i = 0; i < list.size(); i++) + result[i] = list.get(i); + + return result; + } + + /** + * Ensures all given queries have non-empty derived partitions and merges them. + * + * @param queries Collection of queries. + * @return Derived partitions for all queries, or {@code null}. + */ + private static CacheQueryPartitionInfo[] mergePartitionsFromMultipleQueries(List<GridCacheSqlQuery> queries) { + CacheQueryPartitionInfo[] result = null; + + for (GridCacheSqlQuery qry : queries) { + CacheQueryPartitionInfo[] partInfo = (CacheQueryPartitionInfo[])qry.derivedPartitions(); + + if (partInfo == null) { + result = null; + + break; + } + + if (result == null) + result = partInfo; + else + result = mergePartitionInfo(result, partInfo); + } + + return result; + } + /** * Simplified tree-like model for a query. * - SELECT : All the children are list of joined query models in the FROM clause. http://git-wip-us.apache.org/repos/asf/ignite/blob/fbf0e353/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlRoutingTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlRoutingTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlRoutingTest.java new file mode 100644 index 0000000..fddd3f4 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlRoutingTest.java @@ -0,0 +1,552 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query; + +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheKeyConfiguration; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.affinity.AffinityKeyMapped; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.CacheQueryExecutedEvent; +import org.apache.ignite.events.Event; +import org.apache.ignite.internal.binary.BinaryMarshaller; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED; + +public class IgniteSqlRoutingTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static String NODE_CLIENT = "client"; + + /** */ + private static String CACHE_PERSON = "Person"; + + /** */ + private static String CACHE_CALL = "Call"; + + /** */ + private static int NODE_COUNT = 4; + + /** broadcast query to ensure events came from all nodes */ + private static String FINAL_QRY = "select count(1) from {0} where name=?"; + + /** Param to distinguish the final query event */ + private static String FINAL_QRY_PARAM = "Abracadabra"; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + c.setDiscoverySpi(disco); + + c.setMarshaller(new BinaryMarshaller()); + + List<CacheConfiguration> ccfgs = new ArrayList<>(); + + CacheConfiguration ccfg = buildCacheConfiguration(gridName); + + if (ccfg != null) + ccfgs.add(ccfg); + + ccfgs.add(buildCacheConfiguration(CACHE_PERSON)); + ccfgs.add(buildCacheConfiguration(CACHE_CALL)); + + c.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()])); + + if (gridName.equals(NODE_CLIENT)) + c.setClientMode(true); + + c.setCacheKeyConfiguration(new CacheKeyConfiguration(CallKey.class)); + + return c; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(NODE_COUNT); + + startGrid(NODE_CLIENT); + + fillCaches(); + } + + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + private CacheConfiguration buildCacheConfiguration(String name) { + if (name.equals(CACHE_PERSON)) { + CacheConfiguration ccfg = new CacheConfiguration(CACHE_PERSON); + + ccfg.setCacheMode(CacheMode.PARTITIONED); + + QueryEntity entity = new QueryEntity(); + + entity.setKeyType(Integer.class.getName()); + + entity.setValueType(Person.class.getName()); + + LinkedHashMap<String, String> fields = new LinkedHashMap<>(); + + fields.put("name", String.class.getName()); + fields.put("age", Integer.class.getName()); + + entity.setFields(fields); + + ccfg.setQueryEntities(Arrays.asList(entity)); + + return ccfg; + } + + if (name.equals(CACHE_CALL)) { + CacheConfiguration ccfg = new CacheConfiguration(CACHE_CALL); + + ccfg.setCacheMode(CacheMode.PARTITIONED); + + QueryEntity entity = new QueryEntity(CallKey.class.getName(), Call.class.getName()); + + Set<String> keyFields = new HashSet<>(); + + keyFields.add("personId"); + keyFields.add("id"); + + entity.setKeyFields(keyFields); + + LinkedHashMap<String, String> fields = new LinkedHashMap<>(); + + fields.put("personId", Integer.class.getName()); + fields.put("id", Integer.class.getName()); + fields.put("name", String.class.getName()); + fields.put("duration", Integer.class.getName()); + + entity.setFields(fields); + + ccfg.setQueryEntities(Arrays.asList(entity)); + + return ccfg; + } + return null; + } + + /** */ + public void testUnicastQuerySelectAffinityKeyEqualsConstant() throws Exception { + IgniteCache<CallKey, Call> cache = grid(NODE_CLIENT).cache(CACHE_CALL); + + List<List<?>> result = runQueryEnsureUnicast(cache, + new SqlFieldsQuery("select id, name, duration from Call where personId=100 order by id"), 1); + + assertEquals(2, result.size()); + + checkResultsRow(result, 0, 1, "caller1", 100); + checkResultsRow(result, 1, 2, "caller2", 200); + } + + /** */ + public void testUnicastQuerySelectAffinityKeyEqualsParameter() throws Exception { + IgniteCache<CallKey, Call> cache = grid(NODE_CLIENT).cache(CACHE_CALL); + + List<List<?>> result = runQueryEnsureUnicast(cache, + new SqlFieldsQuery("select id, name, duration from Call where personId=? order by id").setArgs(100), 1); + + assertEquals(2, result.size()); + + checkResultsRow(result, 0, 1, "caller1", 100); + checkResultsRow(result, 1, 2, "caller2", 200); + } + + /** */ + public void testUnicastQuerySelectKeyEqualsParameterReused() throws Exception { + IgniteCache<Integer, Person> cache = grid(NODE_CLIENT).cache(CACHE_PERSON); + + for (int key : new int[] {0, 250, 500, 750, 1000} ) { + List<List<?>> result = runQueryEnsureUnicast(cache, + new SqlFieldsQuery("select name, age from Person where _key=?").setArgs(key), 1); + + assertEquals(1, result.size()); + + Person person = cache.get(key); + + checkResultsRow(result, 0, person.name, person.age); + } + } + + /** */ + public void testUnicastQuerySelectKeyEqualsParameter() throws Exception { + IgniteCache<CallKey, Call> cache = grid(NODE_CLIENT).cache(CACHE_CALL); + + CallKey callKey = new CallKey(5, 1); + + List<List<?>> result = runQueryEnsureUnicast(cache, + new SqlFieldsQuery("select name, duration from Call where _key=?") + .setArgs(callKey), 1); + + assertEquals(1, result.size()); + + Call call = cache.get(callKey); + + checkResultsRow(result, 0, call.name, call.duration); + } + + /** Check group, having, ordering allowed to be unicast requests */ + public void testUnicastQueryGroups() throws Exception { + IgniteCache<CallKey, Call> cache = grid(NODE_CLIENT).cache(CACHE_CALL); + + String qry = "select name, count(1) " + + "from Call " + + "where personId = ? " + + "group by name " + + "having count(1) = 1 " + + "order by name"; + + final int personId = 10; + + List<List<?>> result = runQueryEnsureUnicast(cache, new SqlFieldsQuery(qry).setArgs(personId), 1); + + assertEquals(2, result.size()); + + checkResultsRow(result, 0, "caller1", 1L); + checkResultsRow(result, 1, "caller2", 1L); + } + + /** */ + public void testUnicastQuerySelectKeyEqualAndFieldParameter() throws Exception { + IgniteCache<CallKey, Call> cache = grid(NODE_CLIENT).cache(CACHE_CALL); + + CallKey callKey = new CallKey(5, 1); + + List<List<?>> result = runQueryEnsureUnicast(cache, + new SqlFieldsQuery("select name, duration from Call where _key=? and duration=?") + .setArgs(callKey, 100), 1); + + assertEquals(1, result.size()); + + Call call = cache.get(callKey); + + checkResultsRow(result, 0, call.name, call.duration); + } + + /** */ + public void testUnicastQuerySelect2KeyEqualsAndFieldParameter() throws Exception { + IgniteCache<CallKey, Call> cache = grid(NODE_CLIENT).cache(CACHE_CALL); + + CallKey callKey1 = new CallKey(5, 1); + CallKey callKey2 = new CallKey(1000, 1); + + List<List<?>> result = runQueryEnsureUnicast(cache, + new SqlFieldsQuery("select name, duration from Call where (_key=? and duration=?) or (_key=?)") + .setArgs(callKey1, 100, callKey2), 2); + + assertEquals(2, result.size()); + + Call call = cache.get(callKey1); + + checkResultsRow(result, 0, call.name, call.duration); + + call = cache.get(callKey2); + + checkResultsRow(result, 1, call.name, call.duration); + } + + /** */ + public void testBroadcastQuerySelectKeyEqualsOrFieldParameter() throws Exception { + IgniteCache<CallKey, Call> cache = grid(NODE_CLIENT).cache(CACHE_CALL); + + CallKey callKey = new CallKey(5, 1); + + List<List<?>> result = runQueryEnsureBroadcast(cache, + new SqlFieldsQuery("select name, duration from Call where _key=? or duration=?") + .setArgs(callKey, 100)); + + assertEquals(cache.size() / 2, result.size()); + } + + /** */ + private void fillCaches() { + IgniteCache<CallKey, Call> callCache = grid(NODE_CLIENT).cache(CACHE_CALL); + IgniteCache<Integer, Person> personCache = grid(NODE_CLIENT).cache(CACHE_PERSON); + + int count = affinity(personCache).partitions(); + + String[] names = {"John", "Bob", "James", "David", "Chuck"}; + + for (int i = 0; i < count; i++) { + Person person = new Person(names[i % names.length], 20 + (i % names.length)); + + personCache.put(i, person); + + // each person gets 2 calls + callCache.put(new CallKey(i, 1), new Call("caller1", 100)); + callCache.put(new CallKey(i, 2), new Call("caller2", 200)); + } + } + + /** */ + private void checkResultsRow(List<List<?>> results, int rowId, Object ... expected) throws Exception { + assertTrue(rowId < results.size()); + + List<?> row = results.get(rowId); + + assertEquals(expected.length, row.size()); + + for(int col = 0; col < expected.length; ++col) + assertEquals(expected[col], row.get(col)); + } + + /** Run query and check that only one node did generate 'query executed' event for it */ + private List<List<?>> runQueryEnsureUnicast(IgniteCache<?,?> cache, SqlFieldsQuery qry, int nodeCnt) throws Exception { + try (EventCounter evtCounter = new EventCounter(nodeCnt)) { + List<List<?>> result = cache.query(qry).getAll(); + + // do broadcast 'marker' query to ensure that we received all events from previous qry + cache.query(new SqlFieldsQuery( + MessageFormat.format(FINAL_QRY, cache.getName())) + .setArgs(FINAL_QRY_PARAM)).getAll(); + + // wait for all events from 'marker' query + evtCounter.await(); + + // return result set of first query + return result; + } + } + + private List<List<?>> runQueryEnsureBroadcast(IgniteCache<?, ?> cache, SqlFieldsQuery qry) throws Exception { + final CountDownLatch execLatch = new CountDownLatch(NODE_COUNT); + + final IgnitePredicate<Event> pred = new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + assert evt instanceof CacheQueryExecutedEvent; + + CacheQueryExecutedEvent qe = (CacheQueryExecutedEvent)evt; + + assertNotNull(qe.clause()); + + execLatch.countDown(); + + return true; + } + }; + + for (int i = 0; i < NODE_COUNT; i++) + grid(i).events().localListen(pred, EVT_CACHE_QUERY_EXECUTED); + + List<List<?>> result = cache.query(qry).getAll(); + + assertTrue(execLatch.await(5000, MILLISECONDS)); + + for (int i = 0; i < NODE_COUNT; i++) + grid(i).events().stopLocalListen(pred); + + return result; + } + + /** */ + private class EventCounter implements AutoCloseable { + /** */ + final AtomicInteger cnt; + + /** */ + final CountDownLatch execLatch; + + /** */ + final IgnitePredicate<Event> pred = new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + assert evt instanceof CacheQueryExecutedEvent; + + CacheQueryExecutedEvent qe = (CacheQueryExecutedEvent)evt; + + String cacheName = qe.cacheName(); + + assert cacheName != null; + + if (!cacheName.equals(CACHE_PERSON) && + !cacheName.equals(CACHE_CALL)) + return true; + + assertNotNull(qe.clause()); + + Object[] args = qe.arguments(); + + if ((args != null) && (args.length > 0) && (args[0] instanceof String)) { + String strParam = (String)args[0]; + + if (FINAL_QRY_PARAM.equals(strParam)) { + execLatch.countDown(); + + return true; + } + } + cnt.decrementAndGet(); + + return true; + } + }; + + /** */ + private EventCounter(int cnt) { + this.cnt = new AtomicInteger(cnt); + + this.execLatch = new CountDownLatch(NODE_COUNT); + + for (int i = 0; i < NODE_COUNT; i++) + grid(i).events().localListen(pred, EVT_CACHE_QUERY_EXECUTED); + } + + /** */ + public void await() throws Exception { + assertTrue(execLatch.await(5000, MILLISECONDS)); + + assertEquals(0, cnt.get()); + } + + /** {@inheritDoc} */ + @Override public void close() throws Exception { + for (int i = 0; i < NODE_COUNT; i++) + grid(i).events().stopLocalListen(pred); + } + } + + /** */ + private static class Person { + /** */ + private String name; + + /** */ + private int age; + + /** */ + public Person(String name, int age) { + this.name = name; + this.age = age; + } + + /** */ + @Override public int hashCode() { + return name.hashCode() ^ age; + } + + /** */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (!(o instanceof Person)) + return false; + + Person other = (Person)o; + + return name.equals(other.name) && age == other.age; + } + } + + /** */ + private static class CallKey { + /** */ + @AffinityKeyMapped + private int personId; + + /** */ + private int id; + + /** */ + public CallKey(int personId, int id) { + this.personId = personId; + this.id = id; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return personId ^ id; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (!(o instanceof CallKey)) + return false; + + CallKey other = (CallKey)o; + + return this.personId == other.personId && this.id == other.id; + } + } + + /** */ + private static class Call { + /** */ + private String name; + + /** */ + private int duration; + + /** */ + public Call(String name, int duration) { + this.name = name; + + this.duration = duration; + } + + /** */ + @Override public int hashCode() { + return name.hashCode() ^ duration; + } + + /** */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (!(o instanceof Call)) + return false; + + Call other = (Call)o; + + return name.equals(other.name) && duration == other.duration; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/fbf0e353/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index 14fb6ce..13dfef7 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -110,6 +110,7 @@ import org.apache.ignite.internal.processors.cache.query.IndexingSpiQueryTxSelfT import org.apache.ignite.internal.processors.query.IgniteQueryDedicatedPoolTest; import org.apache.ignite.internal.processors.query.IgniteSqlEntryCacheModeAgnosticTest; import org.apache.ignite.internal.processors.query.IgniteSqlKeyValueFieldsTest; +import org.apache.ignite.internal.processors.query.IgniteSqlRoutingTest; import org.apache.ignite.internal.processors.query.IgniteSqlSchemaIndexingTest; import org.apache.ignite.internal.processors.query.IgniteSqlSegmentedIndexMultiNodeSelfTest; import org.apache.ignite.internal.processors.query.IgniteSqlSegmentedIndexSelfTest; @@ -287,6 +288,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest.class); suite.addTestSuite(IgniteCacheDistributedPartitionQueryConfigurationSelfTest.class); suite.addTestSuite(IgniteSqlKeyValueFieldsTest.class); + suite.addTestSuite(IgniteSqlRoutingTest.class); return suite; }
