ignite-4955 - Correctly execute SQL queries started on replicated cache. - Fixes #1806.
Signed-off-by: Sergi Vladykin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ded599ae Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ded599ae Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ded599ae Branch: refs/heads/ignite-1561-1 Commit: ded599aed95501d7553dcf326590d9440a5e9ef7 Parents: 86c4058 Author: Sergi Vladykin <[email protected]> Authored: Mon Apr 17 10:03:20 2017 +0300 Committer: Sergi Vladykin <[email protected]> Committed: Mon Apr 17 10:03:20 2017 +0300 ---------------------------------------------------------------------- .../ignite/cache/query/SqlFieldsQuery.java | 25 +++ .../org/apache/ignite/cache/query/SqlQuery.java | 31 +++- .../processors/cache/IgniteCacheProxy.java | 4 +- .../cache/query/GridCacheSqlQuery.java | 114 ++++---------- .../cache/query/GridCacheTwoStepQuery.java | 26 +++- .../processors/query/h2/IgniteH2Indexing.java | 34 ++-- .../query/h2/sql/GridSqlQuerySplitter.java | 85 ++++------ .../query/h2/twostep/GridMapQueryExecutor.java | 68 +++++--- .../h2/twostep/GridReduceQueryExecutor.java | 154 ++++++++++--------- .../h2/twostep/msg/GridH2QueryRequest.java | 109 ++++++++++--- .../IgniteCacheAbstractFieldsQuerySelfTest.java | 51 +++--- ...teCacheJoinPartitionedAndReplicatedTest.java | 10 ++ ...iteCacheReplicatedFieldsQueryROSelfTest.java | 27 ++++ .../query/IgniteSqlSplitterSelfTest.java | 125 +++++++++++++-- .../IgniteCacheQuerySelfTestSuite.java | 4 +- 15 files changed, 563 insertions(+), 304 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java index 1f10ca8..8c3a4fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java @@ -67,6 +67,9 @@ public class SqlFieldsQuery extends Query<List<?>> { /** */ private boolean distributedJoins; + /** */ + private boolean replicatedOnly; + /** * Constructs SQL fields query. * @@ -236,6 +239,28 @@ public class SqlFieldsQuery extends Query<List<?>> { return (SqlFieldsQuery)super.setLocal(loc); } + /** + * Specify if the query contains only replicated tables. + * This is a hint for potentially more effective execution. + * + * @param replicatedOnly The query contains only replicated tables. + * @return {@code this} For chaining. + */ + public SqlFieldsQuery setReplicatedOnly(boolean replicatedOnly) { + this.replicatedOnly = replicatedOnly; + + return this; + } + + /** + * Check is the query contains only replicated tables. + * + * @return {@code true} If the query contains only replicated tables. + */ + public boolean isReplicatedOnly() { + return replicatedOnly; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(SqlFieldsQuery.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java index d77e5ce..944c70e 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java @@ -53,6 +53,9 @@ public final class SqlQuery<K, V> extends Query<Cache.Entry<K, V>> { /** */ private boolean distributedJoins; + /** */ + private boolean replicatedOnly; + /** * Constructs query for the given type name and SQL query. * @@ -197,7 +200,7 @@ public final class SqlQuery<K, V> extends Query<Cache.Entry<K, V>> { * @param type Type. * @return {@code this} For chaining. */ - public SqlQuery setType(Class<?> type) { + public SqlQuery<K, V> setType(Class<?> type) { return setType(QueryUtils.typeName(type)); } @@ -210,7 +213,7 @@ public final class SqlQuery<K, V> extends Query<Cache.Entry<K, V>> { * @param distributedJoins Distributed joins enabled. * @return {@code this} For chaining. */ - public SqlQuery setDistributedJoins(boolean distributedJoins) { + public SqlQuery<K, V> setDistributedJoins(boolean distributedJoins) { this.distributedJoins = distributedJoins; return this; @@ -219,12 +222,34 @@ public final class SqlQuery<K, V> extends Query<Cache.Entry<K, V>> { /** * Check if distributed joins are enabled for this query. * - * @return {@code true} If distributed joind enabled. + * @return {@code true} If distributed joins enabled. */ public boolean isDistributedJoins() { return distributedJoins; } + /** + * Specify if the query contains only replicated tables. + * This is a hint for potentially more effective execution. + * + * @param replicatedOnly The query contains only replicated tables. + * @return {@code this} For chaining. + */ + public SqlQuery<K, V> setReplicatedOnly(boolean replicatedOnly) { + this.replicatedOnly = replicatedOnly; + + return this; + } + + /** + * Check is the query contains only replicated tables. + * + * @return {@code true} If the query contains only replicated tables. + */ + public boolean isReplicatedOnly() { + return replicatedOnly; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(SqlQuery.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 14edcac..98f2f93 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -780,7 +780,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V final SqlQuery p = (SqlQuery)qry; - if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal()) + if ((p.isReplicatedOnly() && isReplicatedDataNode()) || ctx.isLocal() || qry.isLocal()) return (QueryCursor<R>)ctx.kernalContext().query().queryLocal(ctx, p, opCtxCall != null && opCtxCall.isKeepBinary()); @@ -794,7 +794,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V SqlFieldsQuery p = (SqlFieldsQuery)qry; - if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal()) + if ((p.isReplicatedOnly() && isReplicatedDataNode()) || ctx.isLocal() || qry.isLocal()) return (QueryCursor<R>)ctx.kernalContext().query().queryLocalFields(ctx, p); return (QueryCursor<R>)ctx.kernalContext().query().queryTwoStep(ctx, p); http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java index ea07fb7..780e462 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java @@ -21,17 +21,11 @@ import java.nio.ByteBuffer; import java.util.LinkedHashMap; import java.util.List; import java.util.UUID; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; import org.apache.ignite.internal.GridDirectTransient; -import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; @@ -39,7 +33,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; /** * Query. */ -public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable { +public class GridCacheSqlQuery implements Message { /** */ private static final long serialVersionUID = 0L; @@ -51,26 +45,12 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable { private String qry; /** */ - @GridToStringInclude(sensitive = true) - @GridDirectTransient - private Object[] params; - - /** */ - private byte[] paramsBytes; - - /** */ @GridToStringInclude - @GridDirectTransient private int[] paramIdxs; /** */ @GridToStringInclude @GridDirectTransient - private int paramsSize; - - /** */ - @GridToStringInclude - @GridDirectTransient private LinkedHashMap<String, ?> cols; /** Field kept for backward compatibility. */ @@ -140,13 +120,6 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable { } /** - * @return Parameters. - */ - public Object[] parameters() { - return params; - } - - /** * @return Parameter indexes. */ public int[] parameterIndexes() { @@ -154,57 +127,16 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable { } /** - * @param params Parameters. * @param paramIdxs Parameter indexes. - * @return {@code this} For chaining. + * @return {@code this}. */ - public GridCacheSqlQuery parameters(Object[] params, int[] paramIdxs) { - this.params = F.isEmpty(params) ? EMPTY_PARAMS : params; - - paramsSize = this.params.length; - + public GridCacheSqlQuery parameterIndexes(int[] paramIdxs) { this.paramIdxs = paramIdxs; return this; } /** {@inheritDoc} */ - @Override public void marshall(Marshaller m) { - if (paramsBytes != null) - return; - - assert params != null; - - try { - paramsBytes = U.marshal(m, params); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - } - - /** {@inheritDoc} */ - @Override public void unmarshall(Marshaller m, GridKernalContext ctx) { - if (params != null) - return; - - assert paramsBytes != null; - - try { - final ClassLoader ldr = U.resolveClassLoader(ctx.config()); - - if (m instanceof BinaryMarshaller) - // To avoid deserializing of enum types. - params = ((BinaryMarshaller)m).binaryMarshaller().unmarshal(paramsBytes, ldr); - else - params = U.unmarshal(m, paramsBytes, ldr); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - } - - /** {@inheritDoc} */ @Override public void onAckReceived() { // No-op. } @@ -239,7 +171,7 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable { writer.incrementState(); case 2: - if (!writer.writeByteArray("paramsBytes", paramsBytes)) + if (!writer.writeIntArray("paramIdxs", paramIdxs)) return false; writer.incrementState(); @@ -280,7 +212,7 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable { reader.incrementState(); case 2: - paramsBytes = reader.readByteArray("paramsBytes"); + paramIdxs = reader.readIntArray("paramIdxs"); if (!reader.isLastRead()) return false; @@ -311,28 +243,17 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable { } /** - * @param args Arguments. * @return Copy. */ - public GridCacheSqlQuery copy(Object[] args) { + public GridCacheSqlQuery copy() { GridCacheSqlQuery cp = new GridCacheSqlQuery(); cp.qry = qry; cp.cols = cols; cp.paramIdxs = paramIdxs; - cp.paramsSize = paramsSize; cp.sort = sort; cp.partitioned = partitioned; - if (F.isEmpty(args)) - cp.params = EMPTY_PARAMS; - else { - cp.params = new Object[paramsSize]; - - for (int paramIdx : paramIdxs) - cp.params[paramIdx] = args[paramIdx]; - } - return cp; } @@ -380,4 +301,27 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable { return this; } + + /** + * @param allParams All parameters. + * @return Parameters only for this query. + */ + public Object[] parameters(Object[] allParams) { + if (F.isEmpty(paramIdxs)) + return EMPTY_PARAMS; + + assert !F.isEmpty(allParams); + + int maxIdx = paramIdxs[paramIdxs.length - 1]; + + Object[] res = new Object[maxIdx + 1]; + + for (int i = 0; i < paramIdxs.length; i++) { + int idx = paramIdxs[i]; + + res[idx] = allParams[idx]; + } + + return res; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java index c127eeb..0e31dc0 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java @@ -95,7 +95,7 @@ public class GridCacheTwoStepQuery { /** * Check if distributed joins are enabled for this query. * - * @return {@code true} If distributed joind enabled. + * @return {@code true} If distributed joins enabled. */ public boolean distributedJoins() { return distributedJoins; @@ -146,12 +146,23 @@ public class GridCacheTwoStepQuery { /** * @param qry SQL Query. - * @return {@code this}. */ - public GridCacheTwoStepQuery addMapQuery(GridCacheSqlQuery qry) { + public void addMapQuery(GridCacheSqlQuery qry) { mapQrys.add(qry); + } + + /** + * @return {@code true} If all the map queries contain only replicated tables. + */ + public boolean isReplicatedOnly() { + assert !mapQrys.isEmpty(); + + for (int i = 0; i < mapQrys.size(); i++) { + if (mapQrys.get(i).isPartitioned()) + return false; + } - return this; + return true; } /** @@ -246,10 +257,9 @@ public class GridCacheTwoStepQuery { } /** - * @param args New arguments to copy with. * @return Copy. */ - public GridCacheTwoStepQuery copy(Object[] args) { + public GridCacheTwoStepQuery copy() { assert !explain; GridCacheTwoStepQuery cp = new GridCacheTwoStepQuery(originalSql, schemas, tbls); @@ -257,13 +267,13 @@ public class GridCacheTwoStepQuery { cp.caches = caches; cp.extraCaches = extraCaches; cp.spaces = spaces; - cp.rdc = rdc.copy(args); + cp.rdc = rdc.copy(); cp.skipMergeTbl = skipMergeTbl; cp.pageSize = pageSize; cp.distributedJoins = distributedJoins; for (int i = 0; i < mapQrys.size(); i++) - cp.mapQrys.add(mapQrys.get(i).copy(args)); + cp.mapQrys.add(mapQrys.get(i).copy()); return cp; } http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 5f2d8c0..531b760 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -78,7 +78,6 @@ import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -91,8 +90,6 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator; -import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; -import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; import org.apache.ignite.internal.processors.query.GridQueryFieldsResult; @@ -101,8 +98,8 @@ import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor; import org.apache.ignite.internal.processors.query.GridQueryIndexing; import org.apache.ignite.internal.processors.query.GridQueryProperty; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; +import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; import org.apache.ignite.internal.processors.query.IgniteSQLException; -import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode; import org.apache.ignite.internal.processors.query.h2.database.H2PkHashIndex; import org.apache.ignite.internal.processors.query.h2.database.H2RowFactory; import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex; @@ -110,6 +107,7 @@ import org.apache.ignite.internal.processors.query.h2.database.io.H2ExtrasInnerI import org.apache.ignite.internal.processors.query.h2.database.io.H2ExtrasLeafIO; import org.apache.ignite.internal.processors.query.h2.database.io.H2InnerIO; import org.apache.ignite.internal.processors.query.h2.database.io.H2LeafIO; +import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode; import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine; import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOffheap; import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap; @@ -1299,13 +1297,18 @@ public class IgniteH2Indexing implements GridQueryIndexing { * @param enforceJoinOrder Enforce join order of tables. * @return Iterable result. */ - private Iterable<List<?>> runQueryTwoStep(final GridCacheContext<?,?> cctx, final GridCacheTwoStepQuery qry, - final boolean keepCacheObj, final boolean enforceJoinOrder, + private Iterable<List<?>> runQueryTwoStep( + final GridCacheContext<?,?> cctx, + final GridCacheTwoStepQuery qry, + final boolean keepCacheObj, + final boolean enforceJoinOrder, final int timeoutMillis, - final GridQueryCancel cancel) { + final GridQueryCancel cancel, + final Object[] params + ) { return new Iterable<List<?>>() { @Override public Iterator<List<?>> iterator() { - return rdcQryExec.query(cctx, qry, keepCacheObj, enforceJoinOrder, timeoutMillis, cancel); + return rdcQryExec.query(cctx, qry, keepCacheObj, enforceJoinOrder, timeoutMillis, cancel, params); } }; } @@ -1403,7 +1406,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { TwoStepCachedQuery cachedQry = twoStepCache.get(cachedQryKey); if (cachedQry != null) { - twoStepQry = cachedQry.twoStepQry.copy(qry.getArgs()); + twoStepQry = cachedQry.twoStepQry.copy(); meta = cachedQry.meta; } else { @@ -1539,12 +1542,13 @@ public class IgniteH2Indexing implements GridQueryIndexing { cancel = new GridQueryCancel(); QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>( - runQueryTwoStep(cctx, twoStepQry, cctx.keepBinary(), enforceJoinOrder, qry.getTimeout(), cancel), cancel); + runQueryTwoStep(cctx, twoStepQry, cctx.keepBinary(), enforceJoinOrder, qry.getTimeout(), cancel, qry.getArgs()), + cancel); cursor.fieldsMeta(meta); if (cachedQry == null && !twoStepQry.explain()) { - cachedQry = new TwoStepCachedQuery(meta, twoStepQry.copy(null)); + cachedQry = new TwoStepCachedQuery(meta, twoStepQry.copy()); twoStepCache.putIfAbsent(cachedQryKey, cachedQry); } @@ -1556,7 +1560,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { */ private void checkCacheIndexSegmentation(List<Integer> caches) { if (caches.isEmpty()) - return; //Nnothing to check + return; // Nothing to check GridCacheSharedContext sharedContext = ctx.cache().context(); @@ -1567,12 +1571,12 @@ public class IgniteH2Indexing implements GridQueryIndexing { assert cctx != null; - if(!cctx.isPartitioned()) + if (!cctx.isPartitioned()) continue; - if(expectedParallelism == 0) + if (expectedParallelism == 0) expectedParallelism = cctx.config().getQueryParallelism(); - else if (expectedParallelism != 0 && cctx.config().getQueryParallelism() != expectedParallelism) + else if (cctx.config().getQueryParallelism() != expectedParallelism) throw new IllegalStateException("Using indexes with different parallelism levels in same query is forbidden."); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java index aec0b36..b3d54e1 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java @@ -30,6 +30,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeSet; import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -43,7 +44,6 @@ import org.h2.command.Prepared; import org.h2.command.dml.Query; import org.h2.command.dml.SelectUnion; import org.h2.jdbc.JdbcPreparedStatement; -import org.h2.util.IntArray; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.setupConnection; @@ -67,7 +67,6 @@ import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlSelect.W import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlSelect.childIndexForColumn; import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlUnion.LEFT_CHILD; import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlUnion.RIGHT_CHILD; -import static org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor.toArray; /** * Splits a single SQL query into two step map-reduce query. @@ -211,7 +210,7 @@ public class GridSqlQuerySplitter { boolean allCollocated = true; for (GridCacheSqlQuery mapSqlQry : splitter.mapSqlQrys) { - Prepared prepared = optimize(h2, conn, mapSqlQry.query(), mapSqlQry.parameters(), + Prepared prepared = optimize(h2, conn, mapSqlQry.query(), mapSqlQry.parameters(params), true, enforceJoinOrder); allCollocated &= isCollocated((Query)prepared); @@ -1344,11 +1343,18 @@ public class GridSqlQuerySplitter { * @param params All parameters. */ private static void setupParameters(GridCacheSqlQuery sqlQry, GridSqlQuery qryAst, Object[] params) { - IntArray paramIdxs = new IntArray(params.length); + TreeSet<Integer> paramIdxs = new TreeSet<>(); - params = findParams(qryAst, params, new ArrayList<>(params.length), paramIdxs).toArray(); + findParamsQuery(qryAst, params, paramIdxs); - sqlQry.parameters(params, toArray(paramIdxs)); + int[] paramIdxsArr = new int[paramIdxs.size()]; + + int i = 0; + + for (Integer paramIdx : paramIdxs) + paramIdxsArr[i++] = paramIdx; + + sqlQry.parameterIndexes(paramIdxsArr); } /** @@ -1451,9 +1457,8 @@ public class GridSqlQuerySplitter { /** * @param prnt Table parent element. * @param childIdx Child index for the table or alias containing the table. - * @return Generated alias. */ - private GridSqlAlias generateUniqueAlias(GridSqlAst prnt, int childIdx) { + private void generateUniqueAlias(GridSqlAst prnt, int childIdx) { GridSqlAst child = prnt.child(childIdx); GridSqlAst tbl = GridSqlAlias.unwrap(child); @@ -1468,8 +1473,6 @@ public class GridSqlQuerySplitter { // Replace the child in the parent. prnt.child(childIdx, uniqueAliasAst); - - return uniqueAliasAst; } /** @@ -1586,64 +1589,54 @@ public class GridSqlQuerySplitter { /** * @param qry Select. * @param params Parameters. - * @param target Extracted parameters. * @param paramIdxs Parameter indexes. - * @return Extracted parameters list. */ - private static List<Object> findParams(GridSqlQuery qry, Object[] params, ArrayList<Object> target, - IntArray paramIdxs) { + private static void findParamsQuery(GridSqlQuery qry, Object[] params, TreeSet<Integer> paramIdxs) { if (qry instanceof GridSqlSelect) - return findParams((GridSqlSelect)qry, params, target, paramIdxs); - - GridSqlUnion union = (GridSqlUnion)qry; - - findParams(union.left(), params, target, paramIdxs); - findParams(union.right(), params, target, paramIdxs); + findParamsSelect((GridSqlSelect)qry, params, paramIdxs); + else { + GridSqlUnion union = (GridSqlUnion)qry; - findParams(qry.limit(), params, target, paramIdxs); - findParams(qry.offset(), params, target, paramIdxs); + findParamsQuery(union.left(), params, paramIdxs); + findParamsQuery(union.right(), params, paramIdxs); - return target; + findParams(qry.limit(), params, paramIdxs); + findParams(qry.offset(), params, paramIdxs); + } } /** * @param select Select. * @param params Parameters. - * @param target Extracted parameters. * @param paramIdxs Parameter indexes. - * @return Extracted parameters list. */ - private static List<Object> findParams( + private static void findParamsSelect( GridSqlSelect select, Object[] params, - ArrayList<Object> target, - IntArray paramIdxs + TreeSet<Integer> paramIdxs ) { if (params.length == 0) - return target; + return; for (GridSqlAst el : select.columns(false)) - findParams(el, params, target, paramIdxs); + findParams(el, params, paramIdxs); - findParams(select.from(), params, target, paramIdxs); - findParams(select.where(), params, target, paramIdxs); + findParams(select.from(), params, paramIdxs); + findParams(select.where(), params, paramIdxs); // Don't search in GROUP BY and HAVING since they expected to be in select list. - findParams(select.limit(), params, target, paramIdxs); - findParams(select.offset(), params, target, paramIdxs); - - return target; + findParams(select.limit(), params, paramIdxs); + findParams(select.offset(), params, paramIdxs); } /** * @param el Element. * @param params Parameters. - * @param target Extracted parameters. * @param paramIdxs Parameter indexes. */ - private static void findParams(@Nullable GridSqlAst el, Object[] params, ArrayList<Object> target, - IntArray paramIdxs) { + private static void findParams(@Nullable GridSqlAst el, Object[] params, + TreeSet<Integer> paramIdxs) { if (el == null) return; @@ -1652,27 +1645,17 @@ public class GridSqlQuerySplitter { // Here we will set them to NULL. final int idx = ((GridSqlParameter)el).index(); - while (target.size() < idx) - target.add(null); - if (params.length <= idx) throw new IgniteException("Invalid number of query parameters. " + "Cannot find " + idx + " parameter."); - Object param = params[idx]; - - if (idx == target.size()) - target.add(param); - else - target.set(idx, param); - paramIdxs.add(idx); } else if (el instanceof GridSqlSubquery) - findParams(((GridSqlSubquery)el).subquery(), params, target, paramIdxs); + findParamsQuery(((GridSqlSubquery)el).subquery(), params, paramIdxs); else { for (int i = 0; i < el.size(); i++) - findParams((GridSqlAst)el.child(i), params, target, paramIdxs); + findParams(el.child(i), params, paramIdxs); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 7cd9f17..e4347b5 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -49,6 +49,7 @@ import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsReservation; import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; @@ -401,6 +402,26 @@ public class GridMapQueryExecutor { } /** + * @param caches Cache IDs. + * @return The first found partitioned cache. + */ + private GridCacheContext<?,?> findFirstPartitioned(List<Integer> caches) { + GridCacheSharedContext<?,?> sctx = ctx.cache().context(); + + for (int i = 0; i < caches.size(); i++) { + GridCacheContext<?,?> mainCctx = sctx.cacheContext(caches.get(i)); + + if (mainCctx == null) + throw new CacheException("Failed to find cache."); + + if (!mainCctx.isLocal() && !mainCctx.isReplicated()) + return mainCctx; + } + + throw new IllegalStateException("Failed to find a partitioned cache."); + } + + /** * @param node Node. * @param req Query request. */ @@ -408,12 +429,7 @@ public class GridMapQueryExecutor { final Map<UUID,int[]> partsMap = req.partitions(); final int[] parts = partsMap == null ? null : partsMap.get(ctx.localNodeId()); - assert req.caches() != null && !req.caches().isEmpty(); - - GridCacheContext<?, ?> mainCctx = ctx.cache().context().cacheContext( req.caches().get(0)); - - if (mainCctx == null) - throw new CacheException("Failed to find cache."); + assert !F.isEmpty(req.caches()); final DistributedJoinMode joinMode = distributedJoinMode( req.isFlagSet(GridH2QueryRequest.FLAG_IS_LOCAL), @@ -421,8 +437,12 @@ public class GridMapQueryExecutor { final boolean enforceJoinOrder = req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER); final boolean explain = req.isFlagSet(GridH2QueryRequest.FLAG_EXPLAIN); + final boolean replicated = req.isFlagSet(GridH2QueryRequest.FLAG_REPLICATED); + + int segments = explain || replicated ? 1 : + findFirstPartitioned(req.caches()).config().getQueryParallelism(); - int segments = explain ? 1 : mainCctx.config().getQueryParallelism(); + final Object[] params = req.parameters(); for (int i = 1; i < segments; i++) { final int segment = i; @@ -442,7 +462,9 @@ public class GridMapQueryExecutor { req.pageSize(), joinMode, enforceJoinOrder, - req.timeout()); + replicated, + req.timeout(), + params); return null; } @@ -462,7 +484,9 @@ public class GridMapQueryExecutor { req.pageSize(), joinMode, enforceJoinOrder, - req.timeout()); + replicated, + req.timeout(), + params); } /** @@ -491,7 +515,9 @@ public class GridMapQueryExecutor { int pageSize, DistributedJoinMode distributedJoinMode, boolean enforceJoinOrder, - int timeout + boolean replicated, + int timeout, + Object[] params ) { // Prepare to run queries. GridCacheContext<?, ?> mainCctx = ctx.cache().context().cacheContext(cacheIds.get(0)); @@ -525,7 +551,7 @@ public class GridMapQueryExecutor { node.id(), reqId, segmentId, - mainCctx.isReplicated() ? REPLICATED : MAP) + replicated ? REPLICATED : MAP) .filter(h2.backupFilter(topVer, parts)) .partitionsMap(partsMap) .distributedJoinMode(distributedJoinMode) @@ -579,7 +605,7 @@ public class GridMapQueryExecutor { if (qry.node() == null || (segmentId == 0 && qry.node().equals(ctx.localNodeId()))) { rs = h2.executeSqlQueryWithTimer(mainCctx.name(), conn, qry.query(), - F.asList(qry.parameters()), true, + F.asList(qry.parameters(params)), true, timeout, qr.cancels[qryIdx]); @@ -594,7 +620,7 @@ public class GridMapQueryExecutor { qry.query(), null, null, - qry.parameters(), + params, node.id(), null)); } @@ -602,7 +628,7 @@ public class GridMapQueryExecutor { assert rs instanceof JdbcResultSet : rs.getClass(); } - qr.addResult(qryIdx, qry, node.id(), rs); + qr.addResult(qryIdx, qry, node.id(), rs, params); if (qr.canceled) { qr.result(qryIdx).close(); @@ -965,8 +991,8 @@ public class GridMapQueryExecutor { * @param qrySrcNodeId Query source node. * @param rs Result set. */ - void addResult(int qry, GridCacheSqlQuery q, UUID qrySrcNodeId, ResultSet rs) { - if (!results.compareAndSet(qry, null, new QueryResult(rs, cctx, qrySrcNodeId, q))) + void addResult(int qry, GridCacheSqlQuery q, UUID qrySrcNodeId, ResultSet rs, Object[] params) { + if (!results.compareAndSet(qry, null, new QueryResult(rs, cctx, qrySrcNodeId, q, params))) throw new IllegalStateException(); } @@ -1046,15 +1072,21 @@ public class GridMapQueryExecutor { /** */ private volatile boolean closed; + /** */ + private final Object[] params; + /** * @param rs Result set. * @param cctx Cache context. * @param qrySrcNodeId Query source node. * @param qry Query. + * @param params Query params. */ - private QueryResult(ResultSet rs, GridCacheContext<?, ?> cctx, UUID qrySrcNodeId, GridCacheSqlQuery qry) { + private QueryResult(ResultSet rs, GridCacheContext<?, ?> cctx, UUID qrySrcNodeId, GridCacheSqlQuery qry, + Object[] params) { this.cctx = cctx; this.qry = qry; + this.params = params; this.qrySrcNodeId = qrySrcNodeId; this.cpNeeded = cctx.isLocalNode(qrySrcNodeId); @@ -1139,7 +1171,7 @@ public class GridMapQueryExecutor { qry.query(), null, null, - qry.parameters(), + params, qrySrcNodeId, null, null, http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 7d255b1..0421ca0 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -62,10 +62,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; -import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator; import org.apache.ignite.internal.processors.query.GridQueryCancel; -import org.apache.ignite.internal.processors.query.h2.GridH2ResultSetIterator; import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext; @@ -102,6 +100,7 @@ import org.jsr166.ConcurrentHashMap8; import static java.util.Collections.singletonList; import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS; +import static org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery.EMPTY_PARAMS; import static org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.setupConnection; import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REDUCE; @@ -406,12 +405,14 @@ public class GridReduceQueryExecutor { } /** + * @param isReplicatedOnly If we must only have replicated caches. * @param topVer Topology version. * @param cctx Cache context for main space. * @param extraSpaces Extra spaces. * @return Data nodes or {@code null} if repartitioning started and we need to retry. */ private Collection<ClusterNode> stableDataNodes( + boolean isReplicatedOnly, AffinityTopologyVersion topVer, final GridCacheContext<?, ?> cctx, List<Integer> extraSpaces @@ -430,7 +431,7 @@ public class GridReduceQueryExecutor { if (extraCctx.isLocal()) continue; // No consistency guaranties for local caches. - if (cctx.isReplicated() && !extraCctx.isReplicated()) + if (isReplicatedOnly && !extraCctx.isReplicated()) throw new CacheException("Queries running on replicated cache should not contain JOINs " + "with partitioned tables [rCache=" + cctx.name() + ", pCache=" + extraSpace + "]"); @@ -439,7 +440,7 @@ public class GridReduceQueryExecutor { if (F.isEmpty(extraNodes)) throw new CacheException("Failed to find data nodes for cache: " + extraSpace); - if (cctx.isReplicated() && extraCctx.isReplicated()) { + if (isReplicatedOnly && extraCctx.isReplicated()) { nodes.retainAll(extraNodes); if (nodes.isEmpty()) { @@ -450,7 +451,7 @@ public class GridReduceQueryExecutor { ", cache2=" + extraSpace + "]"); } } - else if (!cctx.isReplicated() && extraCctx.isReplicated()) { + else if (!isReplicatedOnly && extraCctx.isReplicated()) { if (!extraNodes.containsAll(nodes)) if (isPreloadingActive(cctx, extraSpaces)) return null; // Retry. @@ -458,7 +459,7 @@ public class GridReduceQueryExecutor { throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() + ", cache2=" + extraSpace + "]"); } - else if (!cctx.isReplicated() && !extraCctx.isReplicated()) { + else if (!isReplicatedOnly && !extraCctx.isReplicated()) { if (extraNodes.size() != nodes.size() || !nodes.containsAll(extraNodes)) if (isPreloadingActive(cctx, extraSpaces)) return null; // Retry. @@ -481,6 +482,7 @@ public class GridReduceQueryExecutor { * @param enforceJoinOrder Enforce join order of tables. * @param timeoutMillis Timeout in milliseconds. * @param cancel Query cancel. + * @param params Query parameters. * @return Rows iterator. */ public Iterator<List<?>> query( @@ -489,8 +491,14 @@ public class GridReduceQueryExecutor { boolean keepPortable, boolean enforceJoinOrder, int timeoutMillis, - GridQueryCancel cancel + GridQueryCancel cancel, + Object[] params ) { + if (F.isEmpty(params)) + params = EMPTY_PARAMS; + + final boolean isReplicatedOnly = qry.isReplicatedOnly(); + for (int attempt = 0;; attempt++) { if (attempt != 0) { try { @@ -524,7 +532,7 @@ public class GridReduceQueryExecutor { nodes = singletonList(ctx.discovery().localNode()); else { if (isPreloadingActive(cctx, extraSpaces)) { - if (cctx.isReplicated()) + if (isReplicatedOnly) nodes = replicatedUnstableDataNodes(cctx, extraSpaces); else { partsMap = partitionedUnstableDataNodes(cctx, extraSpaces); @@ -533,19 +541,24 @@ public class GridReduceQueryExecutor { } } else - nodes = stableDataNodes(topVer, cctx, extraSpaces); + nodes = stableDataNodes(isReplicatedOnly, topVer, cctx, extraSpaces); if (nodes == null) continue; // Retry. assert !nodes.isEmpty(); - if (cctx.isReplicated() || qry.explain()) { - assert qry.explain() || !nodes.contains(ctx.discovery().localNode()) : - "We must be on a client node."; + if (isReplicatedOnly || qry.explain()) { + ClusterNode locNode = ctx.discovery().localNode(); - // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node. - nodes = singletonList(F.rand(nodes)); + // Always prefer local node if possible. + if (nodes.contains(locNode)) + nodes = singletonList(locNode); + else { + // Select random data node to run query on a replicated data or + // get EXPLAIN PLAN from a single node. + nodes = singletonList(F.rand(nodes)); + } } } @@ -553,7 +566,8 @@ public class GridReduceQueryExecutor { final boolean skipMergeTbl = !qry.explain() && qry.skipMergeTable(); - final int segmentsPerIndex = qry.explain() ? 1 : cctx.config().getQueryParallelism(); + final int segmentsPerIndex = qry.explain() || isReplicatedOnly ? 1 : + findFirstPartitioned(cctx, extraSpaces).config().getQueryParallelism(); int replicatedQrysCnt = 0; @@ -595,7 +609,7 @@ public class GridReduceQueryExecutor { r.idxs.add(idx); } - r.latch = new CountDownLatch( + r.latch = new CountDownLatch(isReplicatedOnly ? 1 : (r.idxs.size() - replicatedQrysCnt) * nodes.size() * segmentsPerIndex + replicatedQrysCnt); runs.put(qryReqId, r); @@ -616,7 +630,7 @@ public class GridReduceQueryExecutor { for (GridCacheSqlQuery mapQry : qry.mapQueries()) mapQrys.add(new GridCacheSqlQuery("EXPLAIN " + mapQry.query()) - .parameters(mapQry.parameters(), mapQry.parameterIndexes())); + .parameterIndexes(mapQry.parameterIndexes())); } final boolean distributedJoins = qry.distributedJoins(); @@ -643,6 +657,9 @@ public class GridReduceQueryExecutor { if (qry.explain()) flags |= GridH2QueryRequest.FLAG_EXPLAIN; + if (isReplicatedOnly) + flags |= GridH2QueryRequest.FLAG_REPLICATED; + if (send(nodes, new GridH2QueryRequest() .requestId(qryReqId) @@ -652,6 +669,7 @@ public class GridReduceQueryExecutor { .tables(distributedJoins ? qry.tables() : null) .partitions(convert(partsMap)) .queries(mapQrys) + .parameters(params) .flags(flags) .timeout(timeoutMillis), null, @@ -723,14 +741,14 @@ public class GridReduceQueryExecutor { try { if (qry.explain()) - return explainPlan(r.conn, space, qry); + return explainPlan(r.conn, space, qry, params); GridCacheSqlQuery rdc = qry.reduceQuery(); ResultSet res = h2.executeSqlQueryWithTimer(space, r.conn, rdc.query(), - F.asList(rdc.parameters()), + F.asList(rdc.parameters(params)), false, // The statement will cache some extra thread local objects. timeoutMillis, cancel); @@ -790,6 +808,28 @@ public class GridReduceQueryExecutor { } /** + * @param cctx Cache context for main space. + * @param extraSpaces Extra spaces. + * @return The first partitioned cache context. + */ + private GridCacheContext<?,?> findFirstPartitioned(GridCacheContext<?,?> cctx, List<Integer> extraSpaces) { + if (cctx.isLocal()) + throw new CacheException("Cache is LOCAL: " + cctx.name()); + + if (!cctx.isReplicated()) + return cctx; + + for (int i = 0 ; i < extraSpaces.size(); i++) { + GridCacheContext<?, ?> extraCctx = cacheContext(extraSpaces.get(i)); + + if (!extraCctx.isReplicated() && !extraCctx.isLocal()) + return extraCctx; + } + + throw new IllegalStateException("Failed to find partitioned cache."); + } + + /** * Returns true if the exception is triggered by query cancel. * * @param e Exception. @@ -896,9 +936,19 @@ public class GridReduceQueryExecutor { * @param extraSpaces Extra spaces. * @return Collection of all data nodes owning all the caches or {@code null} for retry. */ - private Collection<ClusterNode> replicatedUnstableDataNodes(final GridCacheContext<?, ?> cctx, + private Collection<ClusterNode> replicatedUnstableDataNodes(GridCacheContext<?, ?> cctx, List<Integer> extraSpaces) { - assert cctx.isReplicated() : cctx.name() + " must be replicated"; + int i = 0; + + // The main cache is allowed to be partitioned. + if (!cctx.isReplicated()) { + assert !F.isEmpty(extraSpaces): "no extra replicated caches with partitioned main cache"; + + // Just replace the main cache with the first one extra. + cctx = cacheContext(extraSpaces.get(i++)); + + assert cctx.isReplicated(): "all the extra caches must be replicated here"; + } Set<ClusterNode> nodes = replicatedUnstableDataNodes(cctx); @@ -906,7 +956,7 @@ public class GridReduceQueryExecutor { return null; // Retry. if (!F.isEmpty(extraSpaces)) { - for (int i = 0; i < extraSpaces.size(); i++) { + for (;i < extraSpaces.size(); i++) { GridCacheContext<?, ?> extraCctx = cacheContext(extraSpaces.get(i)); if (extraCctx.isLocal()) @@ -982,9 +1032,12 @@ public class GridReduceQueryExecutor { * @return Partition mapping or {@code null} if we can't calculate it due to repartitioning and we need to retry. */ @SuppressWarnings("unchecked") - private Map<ClusterNode, IntArray> partitionedUnstableDataNodes(final GridCacheContext<?,?> cctx, + private Map<ClusterNode, IntArray> partitionedUnstableDataNodes(GridCacheContext<?,?> cctx, List<Integer> extraSpaces) { - assert !cctx.isReplicated() && !cctx.isLocal() : cctx.name() + " must be partitioned"; + assert !cctx.isLocal() : cctx.name() + " must not be LOCAL"; + + // If the main cache is replicated, just replace it with the first partitioned. + cctx = findFirstPartitioned(cctx, extraSpaces); final int partsCnt = cctx.affinity().partitions(); @@ -1025,6 +1078,10 @@ public class GridReduceQueryExecutor { for (int i = 0; i < extraSpaces.size(); i++) { GridCacheContext<?, ?> extraCctx = cacheContext(extraSpaces.get(i)); + // This is possible if we have replaced a replicated cache with a partitioned one earlier. + if (cctx == extraCctx) + continue; + if (extraCctx.isReplicated() || extraCctx.isLocal()) continue; @@ -1093,32 +1150,14 @@ public class GridReduceQueryExecutor { } /** - * @param mainSpace Main space. - * @param allSpaces All spaces. - * @return List of all extra spaces or {@code null} if none. - */ - private List<String> extraSpaces(String mainSpace, Collection<String> allSpaces) { - if (F.isEmpty(allSpaces) || (allSpaces.size() == 1 && allSpaces.contains(mainSpace))) - return null; - - ArrayList<String> res = new ArrayList<>(allSpaces.size()); - - for (String space : allSpaces) { - if (!F.eq(space, mainSpace)) - res.add(space); - } - - return res; - } - - /** * @param c Connection. * @param space Space. * @param qry Query. + * @param params Query parameters. * @return Cursor for plans. * @throws IgniteCheckedException if failed. */ - private Iterator<List<?>> explainPlan(JdbcConnection c, String space, GridCacheTwoStepQuery qry) + private Iterator<List<?>> explainPlan(JdbcConnection c, String space, GridCacheTwoStepQuery qry, Object[] params) throws IgniteCheckedException { List<List<?>> lists = new ArrayList<>(); @@ -1142,7 +1181,7 @@ public class GridReduceQueryExecutor { ResultSet rs = h2.executeSqlQueryWithTimer(space, c, "EXPLAIN " + rdc.query(), - F.asList(rdc.parameters()), + F.asList(rdc.parameters(params)), false, 0, null); @@ -1419,29 +1458,4 @@ public class GridReduceQueryExecutor { state(e, null); } } - - /** - * - */ - private static class Iter extends GridH2ResultSetIterator<List<?>> { - /** */ - private static final long serialVersionUID = 0L; - - /** - * @param data Data array. - * @throws IgniteCheckedException If failed. - */ - protected Iter(ResultSet data) throws IgniteCheckedException { - super(data, true, false); - } - - /** {@inheritDoc} */ - @Override protected List<?> createRow() { - ArrayList<Object> res = new ArrayList<>(row.length); - - Collections.addAll(res, row); - - return res; - } - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java index f2f9a31..9e7dcbf 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java @@ -22,21 +22,27 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectMap; +import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import static org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery.EMPTY_PARAMS; + /** * Query request. */ @@ -65,6 +71,11 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { */ public static final int FLAG_EXPLAIN = 1 << 3; + /** + * If it is a REPLICATED query. + */ + public static final int FLAG_REPLICATED = 1 << 4; + /** */ private long reqId; @@ -100,6 +111,34 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { /** */ private int timeout; + /** */ + @GridToStringInclude(sensitive = true) + @GridDirectTransient + private Object[] params; + + /** */ + private byte[] paramsBytes; + + /** + * @return Parameters. + */ + public Object[] parameters() { + return params; + } + + /** + * @param params Parameters. + * @return {@code this}. + */ + public GridH2QueryRequest parameters(Object[] params) { + if (params == null) + params = EMPTY_PARAMS; + + this.params = params; + + return this; + } + /** * @param tbls Tables. * @return {@code this}. @@ -258,20 +297,38 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { /** {@inheritDoc} */ @Override public void marshall(Marshaller m) { - if (F.isEmpty(qrys)) + if (paramsBytes != null) return; - for (GridCacheSqlQuery qry : qrys) - qry.marshall(m); + assert params != null; + + try { + paramsBytes = U.marshal(m, params); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } } /** {@inheritDoc} */ @Override public void unmarshall(Marshaller m, GridKernalContext ctx) { - if (F.isEmpty(qrys)) + if (params != null) return; - for (GridCacheSqlQuery qry : qrys) - qry.unmarshall(m, ctx); + assert paramsBytes != null; + + try { + final ClassLoader ldr = U.resolveClassLoader(ctx.config()); + + if (m instanceof BinaryMarshaller) + // To avoid deserializing of enum types. + params = ((BinaryMarshaller)m).binaryMarshaller().unmarshal(paramsBytes, ldr); + else + params = U.unmarshal(m, paramsBytes, ldr); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } } /** {@inheritDoc} */ @@ -305,31 +362,31 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { writer.incrementState(); case 3: - if (!writer.writeMap("parts", parts, MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR)) + if (!writer.writeByteArray("paramsBytes", paramsBytes)) return false; writer.incrementState(); case 4: - if (!writer.writeCollection("qrys", qrys, MessageCollectionItemType.MSG)) + if (!writer.writeMap("parts", parts, MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR)) return false; writer.incrementState(); case 5: - if (!writer.writeLong("reqId", reqId)) + if (!writer.writeCollection("qrys", qrys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 6: - if (!writer.writeCollection("tbls", tbls, MessageCollectionItemType.STRING)) + if (!writer.writeLong("reqId", reqId)) return false; writer.incrementState(); case 7: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeCollection("tbls", tbls, MessageCollectionItemType.STRING)) return false; writer.incrementState(); @@ -339,6 +396,13 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { return false; writer.incrementState(); + + case 9: + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + } return true; @@ -377,7 +441,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { reader.incrementState(); case 3: - parts = reader.readMap("parts", MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR, false); + paramsBytes = reader.readByteArray("paramsBytes"); if (!reader.isLastRead()) return false; @@ -385,7 +449,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { reader.incrementState(); case 4: - qrys = reader.readCollection("qrys", MessageCollectionItemType.MSG); + parts = reader.readMap("parts", MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR, false); if (!reader.isLastRead()) return false; @@ -393,7 +457,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { reader.incrementState(); case 5: - reqId = reader.readLong("reqId"); + qrys = reader.readCollection("qrys", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -401,7 +465,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { reader.incrementState(); case 6: - tbls = reader.readCollection("tbls", MessageCollectionItemType.STRING); + reqId = reader.readLong("reqId"); if (!reader.isLastRead()) return false; @@ -409,7 +473,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { reader.incrementState(); case 7: - topVer = reader.readMessage("topVer"); + tbls = reader.readCollection("tbls", MessageCollectionItemType.STRING); if (!reader.isLastRead()) return false; @@ -423,6 +487,15 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { return false; reader.incrementState(); + + case 9: + topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } return reader.afterMessageRead(GridH2QueryRequest.class); @@ -435,7 +508,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 9; + return 10; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java index 1740fe9..5cb86b1 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java @@ -361,7 +361,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA * */ public void testExplain() { - List<List<?>> res = grid(0).cache(personCache.getName()).query(new SqlFieldsQuery( + List<List<?>> res = grid(0).cache(personCache.getName()).query(sqlFieldsQuery( String.format("explain select p.age, p.name, o.name " + "from \"%s\".Person p, \"%s\".Organization o where p.orgId = o.id", personCache.getName(), orgCache.getName()))).getAll(); @@ -369,7 +369,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA for (List<?> row : res) X.println("____ : " + row); - if (cacheMode() == PARTITIONED) { + if (cacheMode() == PARTITIONED || !isReplicatedOnly()) { assertEquals(2, res.size()); assertTrue(((String)res.get(1).get(0)).contains(GridSqlQuerySplitter.mergeTableIdentifier(0))); @@ -380,7 +380,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA /** @throws Exception If failed. */ public void testExecuteWithMetaData() throws Exception { - QueryCursorImpl<List<?>> cursor = (QueryCursorImpl<List<?>>)personCache.query(new SqlFieldsQuery( + QueryCursorImpl<List<?>> cursor = (QueryCursorImpl<List<?>>)personCache.query(sqlFieldsQuery( String.format("select p._KEY, p.name, p.age, o.name " + "from \"%s\".Person p, \"%s\".Organization o where p.orgId = o.id", personCache.getName(), orgCache.getName()))); @@ -480,7 +480,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA /** @throws Exception If failed. */ public void testExecute() throws Exception { - QueryCursor<List<?>> qry = personCache.query(new SqlFieldsQuery("select _KEY, name, age from Person")); + QueryCursor<List<?>> qry = personCache.query(sqlFieldsQuery("select _KEY, name, age from Person")); List<List<?>> res = new ArrayList<>(qry.getAll()); @@ -526,7 +526,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA /** @throws Exception If failed. */ public void testExecuteWithArguments() throws Exception { QueryCursor<List<?>> qry = personCache - .query(new SqlFieldsQuery("select _KEY, name, age from Person where age > ?").setArgs(30)); + .query(sqlFieldsQuery("select _KEY, name, age from Person where age > ?").setArgs(30)); List<List<?>> res = new ArrayList<>(qry.getAll()); @@ -564,10 +564,23 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA assert cnt == 2; } + protected boolean isReplicatedOnly() { + return false; + } + + private SqlFieldsQuery sqlFieldsQuery(String sql) { + SqlFieldsQuery qry = new SqlFieldsQuery(sql); + + if (isReplicatedOnly()) + qry.setReplicatedOnly(true); + + return qry; + } + /** @throws Exception If failed. */ public void testSelectAllJoined() throws Exception { QueryCursor<List<?>> qry = - personCache.query(new SqlFieldsQuery( + personCache.query(sqlFieldsQuery( String.format("select * from \"%s\".Person p, \"%s\".Organization o where p.orgId = o.id", personCache.getName(), orgCache.getName()))); @@ -631,7 +644,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA /** @throws Exception If failed. */ public void testEmptyResult() throws Exception { QueryCursor<List<?>> qry = - personCache.query(new SqlFieldsQuery("select name from Person where age = 0")); + personCache.query(sqlFieldsQuery("select name from Person where age = 0")); Collection<List<?>> res = qry.getAll(); @@ -641,7 +654,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA /** @throws Exception If failed. */ public void testQueryString() throws Exception { - QueryCursor<List<?>> qry = strCache.query(new SqlFieldsQuery("select * from String")); + QueryCursor<List<?>> qry = strCache.query(sqlFieldsQuery("select * from String")); Collection<List<?>> res = qry.getAll(); @@ -658,7 +671,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA /** @throws Exception If failed. */ public void testQueryIntegersWithJoin() throws Exception { - QueryCursor<List<?>> qry = intCache.query(new SqlFieldsQuery( + QueryCursor<List<?>> qry = intCache.query(sqlFieldsQuery( "select i._KEY, i._VAL, j._KEY, j._VAL from Integer i join Integer j where i._VAL >= 100")); Collection<List<?>> res = qry.getAll(); @@ -682,7 +695,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA public void testPagination() throws Exception { // Query with page size 20. QueryCursor<List<?>> qry = - intCache.query(new SqlFieldsQuery("select * from Integer").setPageSize(20)); + intCache.query(sqlFieldsQuery("select * from Integer").setPageSize(20)); List<List<?>> res = new ArrayList<>(qry.getAll()); @@ -708,7 +721,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA for (int i = 0; i < 200; i++) cache.put(i, i); - QueryCursor<List<?>> qry = cache.query(new SqlFieldsQuery("select * from Integer")); + QueryCursor<List<?>> qry = cache.query(sqlFieldsQuery("select * from Integer")); Collection<List<?>> res = qry.getAll(); @@ -729,7 +742,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA GridTestUtils.assertThrows(log, new Callable<Object>() { @Override public Object call() throws Exception { - return cache.query(new SqlFieldsQuery("select * from String")); + return cache.query(sqlFieldsQuery("select * from String")); } }, CacheException.class, null); } @@ -749,7 +762,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA cache.put(key, val); - Collection<List<?>> res = cache.query(new SqlFieldsQuery("select * from Person")).getAll(); + Collection<List<?>> res = cache.query(sqlFieldsQuery("select * from Person")).getAll(); assertEquals(1, res.size()); @@ -782,7 +795,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA */ public void testPaginationIterator() throws Exception { QueryCursor<List<?>> qry = - intCache.query(new SqlFieldsQuery("select _key, _val from Integer").setPageSize(10)); + intCache.query(sqlFieldsQuery("select _key, _val from Integer").setPageSize(10)); int cnt = 0; @@ -802,7 +815,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA /** @throws Exception If failed. */ public void testPaginationIteratorKeepAll() throws Exception { QueryCursor<List<?>> qry = - intCache.query(new SqlFieldsQuery("select _key, _val from Integer").setPageSize(10)); + intCache.query(sqlFieldsQuery("select _key, _val from Integer").setPageSize(10)); int cnt = 0; @@ -818,7 +831,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA assertEquals(size, cnt); - qry = intCache.query(new SqlFieldsQuery("select _key, _val from Integer").setPageSize(10)); + qry = intCache.query(sqlFieldsQuery("select _key, _val from Integer").setPageSize(10)); List<List<?>> list = new ArrayList<>(qry.getAll()); @@ -844,7 +857,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA public void testMethodAnnotationWithoutGet() throws Exception { if (!binaryMarshaller) { QueryCursor<List<?>> qry = - orgCache.query(new SqlFieldsQuery("select methodField from Organization where methodField='name-A'") + orgCache.query(sqlFieldsQuery("select methodField from Organization where methodField='name-A'") .setPageSize(10)); List<List<?>> flds = qry.getAll(); @@ -860,7 +873,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA */ public void testPaginationGet() throws Exception { QueryCursor<List<?>> qry = - intCache.query(new SqlFieldsQuery("select _key, _val from Integer").setPageSize(10)); + intCache.query(sqlFieldsQuery("select _key, _val from Integer").setPageSize(10)); List<List<?>> list = new ArrayList<>(qry.getAll()); @@ -883,7 +896,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA /** @throws Exception If failed. */ public void testEmptyGrid() throws Exception { QueryCursor<List<?>> qry = personCache - .query(new SqlFieldsQuery("select name, age from Person where age = 25")); + .query(sqlFieldsQuery("select name, age from Person where age = 25")); List<?> res = F.first(qry.getAll()); http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java index aa31f33..d4772c1 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java @@ -208,6 +208,16 @@ public class IgniteCacheJoinPartitionedAndReplicatedTest extends GridCommonAbstr "from \"orgRepl\".Organization o left join \"person\".Person p " + "on (p.orgId = o.id)", orgCacheRepl, 2); + // Left join from replicated to partitioned cache is not supported: + // returns duplicates in result and must fail. + checkQueryFails("select o.name, p._key, p.name " + + "from \"person\".Person p left join \"org\".Organization o " + + "on (p.orgId = o.id)", orgCache); + + checkQueryFails("select o.name, p._key, p.name " + + "from \"org\".Organization o right join \"person\".Person p " + + "on (p.orgId = o.id)", orgCache); + checkQueryFails("select o.name, p._key, p.name " + "from \"person\".Person p left join \"org\".Organization o " + "on (p.orgId = o.id)", personCache); http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedFieldsQueryROSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedFieldsQueryROSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedFieldsQueryROSelfTest.java new file mode 100644 index 0000000..44a68e2 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedFieldsQueryROSelfTest.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.replicated; + +/** + */ +public class IgniteCacheReplicatedFieldsQueryROSelfTest extends IgniteCacheReplicatedFieldsQuerySelfTest { + /** */ + @Override protected boolean isReplicatedOnly() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java index f093ac7..b180eba 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicLong; import javax.cache.CacheException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheKeyConfiguration; import org.apache.ignite.cache.CacheMode; @@ -61,6 +62,9 @@ import org.springframework.util.StringUtils; @SuppressWarnings("unchecked") public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { /** */ + private static final int CLIENT = 7; + + /** */ private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); /** {@inheritDoc} */ @@ -82,14 +86,16 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { return cfg; } - @Override - protected long getTestTimeout() { - return 100_000_000; - } - /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { startGridsMultiThreaded(3, false); + Ignition.setClientMode(true); + try { + startGrid(CLIENT); + } + finally { + Ignition.setClientMode(false); + } } /** {@inheritDoc} */ @@ -156,22 +162,69 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { /** */ - public void testReplicatedOnlyTables() { - doTestReplicatedOnlyTables(1); + public void testReplicatedTablesUsingPartitionedCache() { + doTestReplicatedTablesUsingPartitionedCache(1, false, false); + } + + /** + */ + public void testReplicatedTablesUsingPartitionedCacheSegmented() { + doTestReplicatedTablesUsingPartitionedCache(5, false, false); + } + + /** + */ + public void testReplicatedTablesUsingPartitionedCacheClient() { + doTestReplicatedTablesUsingPartitionedCache(1, true, false); + } + + /** + */ + public void testReplicatedTablesUsingPartitionedCacheSegmentedClient() { + doTestReplicatedTablesUsingPartitionedCache(5, true, false); } /** */ - public void testReplicatedOnlyTablesSegmented() { - doTestReplicatedOnlyTables(5); + public void testReplicatedTablesUsingPartitionedCacheRO() { + doTestReplicatedTablesUsingPartitionedCache(1, false, true); } /** */ - private void doTestReplicatedOnlyTables(int segments) { - IgniteCache<Integer,Value> p = ignite(0).getOrCreateCache(cacheConfig("p", true, + public void testReplicatedTablesUsingPartitionedCacheSegmentedRO() { + doTestReplicatedTablesUsingPartitionedCache(5, false, true); + } + + /** + */ + public void testReplicatedTablesUsingPartitionedCacheClientRO() { + doTestReplicatedTablesUsingPartitionedCache(1, true, true); + } + + /** + */ + public void testReplicatedTablesUsingPartitionedCacheSegmentedClientRO() { + doTestReplicatedTablesUsingPartitionedCache(5, true, true); + } + + /** + */ + private SqlFieldsQuery query(String sql, boolean replicatedOnly) { + SqlFieldsQuery qry = new SqlFieldsQuery(sql); + + if (replicatedOnly) + qry.setReplicatedOnly(true); + + return qry; + } + + /** + */ + private void doTestReplicatedTablesUsingPartitionedCache(int segments, boolean client, boolean replicatedOnlyFlag) { + IgniteCache<Integer,Value> p = ignite(client ? CLIENT : 0).getOrCreateCache(cacheConfig("p", true, Integer.class, Value.class).setQueryParallelism(segments)); - IgniteCache<Integer,Value> r = ignite(0).getOrCreateCache(cacheConfig("r", false, + IgniteCache<Integer,Value> r = ignite(client ? CLIENT : 0).getOrCreateCache(cacheConfig("r", false, Integer.class, Value.class)); try { @@ -181,9 +234,53 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { r.put(i, new Value(i, -i)); // Query data from replicated table using partitioned cache. - assertEquals(cnt, p.query(new SqlFieldsQuery("select 1 from \"r\".Value")).getAll().size()); + assertEquals(cnt, p.query(query("select 1 from \"r\".Value", replicatedOnlyFlag)) + .getAll().size()); + + List<List<?>> res = p.query(query("select count(1) from \"r\".Value", replicatedOnlyFlag)).getAll(); + assertEquals(1, res.size()); + assertEquals(cnt, ((Number)res.get(0).get(0)).intValue()); + } + finally { + p.destroy(); + r.destroy(); + } + } + + public void testPartitionedTablesUsingReplicatedCache() { + doTestPartitionedTablesUsingReplicatedCache(1, false); + } + + public void testPartitionedTablesUsingReplicatedCacheSegmented() { + doTestPartitionedTablesUsingReplicatedCache(7, false); + } + + public void testPartitionedTablesUsingReplicatedCacheClient() { + doTestPartitionedTablesUsingReplicatedCache(1, true); + } + + public void testPartitionedTablesUsingReplicatedCacheSegmentedClient() { + doTestPartitionedTablesUsingReplicatedCache(7, true); + } + + /** + */ + private void doTestPartitionedTablesUsingReplicatedCache(int segments, boolean client) { + IgniteCache<Integer,Value> p = ignite(client ? CLIENT : 0).getOrCreateCache(cacheConfig("p", true, + Integer.class, Value.class).setQueryParallelism(segments)); + IgniteCache<Integer,Value> r = ignite(client ? CLIENT : 0).getOrCreateCache(cacheConfig("r", false, + Integer.class, Value.class)); + + try { + int cnt = 1000; + + for (int i = 0; i < cnt; i++) + p.put(i, new Value(i, -i)); + + // Query data from replicated table using partitioned cache. + assertEquals(cnt, r.query(new SqlFieldsQuery("select 1 from \"p\".Value")).getAll().size()); - List<List<?>> res = p.query(new SqlFieldsQuery("select count(1) from \"r\".Value")).getAll(); + List<List<?>> res = r.query(new SqlFieldsQuery("select count(1) from \"p\".Value")).getAll(); assertEquals(1, res.size()); assertEquals(cnt, ((Number)res.get(0).get(0)).intValue()); }
