This is an automated email from the ASF dual-hosted git repository. vozerov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new b03a970 IGNITE-10307: SQL: Partition pruning for joins. This closes #5774. b03a970 is described below commit b03a9705b144196bfde631328562008eb9fa48a6 Author: devozerov <voze...@gridgain.com> AuthorDate: Thu Jan 17 18:47:59 2019 +0300 IGNITE-10307: SQL: Partition pruning for joins. This closes #5774. --- .../processors/query/h2/IgniteH2Indexing.java | 110 +- ...ode.java => PartitionAffinityFunctionType.java} | 35 +- .../query/h2/affinity/PartitionAllNode.java | 5 + .../query/h2/affinity/PartitionCompositeNode.java | 114 +- .../query/h2/affinity/PartitionConstantNode.java | 6 +- .../query/h2/affinity/PartitionExtractor.java | 401 ++++-- .../query/h2/affinity/PartitionGroupNode.java | 23 +- .../query/h2/affinity/PartitionJoinCondition.java | 132 ++ .../query/h2/affinity/PartitionJoinGroup.java | 81 ++ .../query/h2/affinity/PartitionNode.java | 5 + .../query/h2/affinity/PartitionNoneNode.java | 5 + .../query/h2/affinity/PartitionParameterNode.java | 2 +- .../query/h2/affinity/PartitionResult.java | 25 +- .../query/h2/affinity/PartitionSingleNode.java | 21 +- .../query/h2/affinity/PartitionTable.java | 113 ++ .../affinity/PartitionTableAffinityDescriptor.java | 97 ++ .../h2/affinity/PartitionTableDescriptor.java | 73 -- .../query/h2/affinity/PartitionTableModel.java | 157 +++ .../processors/query/h2/opt/GridH2Table.java | 161 ++- .../query/h2/sql/GridSqlQuerySplitter.java | 20 +- .../query/h2/twostep/GridReduceQueryExecutor.java | 5 +- .../BetweenOperationExtractPartitionSelfTest.java | 18 - .../h2/twostep/JoinPartitionPruningSelfTest.java | 1303 ++++++++++++++++++++ .../IgniteBinaryCacheQueryTestSuite.java | 2 + 24 files changed, 2591 insertions(+), 323 deletions(-) diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 0c1b1d6..5e00aea 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -92,7 +92,8 @@ import org.apache.ignite.internal.processors.query.QueryIndexDescriptorImpl; import org.apache.ignite.internal.processors.query.RunningQueryManager; import org.apache.ignite.internal.processors.query.SqlClientContext; import org.apache.ignite.internal.processors.query.UpdateSourceIterator; -import org.apache.ignite.internal.processors.query.h2.affinity.PartitionNode; +import org.apache.ignite.internal.processors.query.h2.affinity.PartitionExtractor; +import org.apache.ignite.internal.processors.query.h2.affinity.PartitionResult; import org.apache.ignite.internal.processors.query.h2.database.H2TreeClientIndex; import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex; import org.apache.ignite.internal.processors.query.h2.database.io.H2ExtrasInnerIO; @@ -139,6 +140,7 @@ import org.apache.ignite.internal.sql.command.SqlSetStreamingCommand; import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap; import org.apache.ignite.internal.util.GridEmptyCloseableIterator; import org.apache.ignite.internal.util.GridSpinBusyLock; +import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.lang.IgniteInClosure2X; @@ -248,6 +250,9 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** */ private DdlStatementsProcessor ddlProc; + /** Partition extractor. */ + private PartitionExtractor partExtractor; + /** */ private final RunningQueryManager runningQueryMgr = new RunningQueryManager(); @@ -2006,46 +2011,27 @@ public class IgniteH2Indexing implements GridQueryIndexing { boolean cursorCreated = false; try { - // TODO: Use intersection (https://issues.apache.org/jira/browse/IGNITE-10567) - int partitions[] = qry.getPartitions(); - - if (partitions == null && twoStepQry.derivedPartitions() != null) { - try { - PartitionNode partTree = twoStepQry.derivedPartitions().tree(); - - Collection<Integer> partitions0 = partTree.apply(qry.getArgs()); - - if (F.isEmpty(partitions0)) - partitions = new int[0]; - else { - partitions = new int[partitions0.size()]; - - int i = 0; - - for (Integer part : partitions0) - partitions[i++] = part; - } - - if (partitions.length == 0) { //here we know that result of requested query is empty - return new QueryCursorImpl<List<?>>(new Iterable<List<?>>() { - @Override public Iterator<List<?>> iterator() { - return new Iterator<List<?>>() { - @Override public boolean hasNext() { - return false; - } + // When explicit partitions are set, there must be an owning cache they should be applied to. + int explicitParts[] = qry.getPartitions(); + PartitionResult derivedParts = twoStepQry.derivedPartitions(); + + int parts[] = calculatePartitions(explicitParts, derivedParts, qry.getArgs()); + + if (parts != null && parts.length == 0) { + return new QueryCursorImpl<>(new Iterable<List<?>>() { + @Override public Iterator<List<?>> iterator() { + return new Iterator<List<?>>() { + @Override public boolean hasNext() { + return false; + } - @Override public List<?> next() { - return null; - } - }; + @SuppressWarnings("IteratorNextCanNotThrowNoSuchElementException") + @Override public List<?> next() { + return null; } - }); + }; } - } - catch (IgniteCheckedException e) { - throw new CacheException("Failed to calculate derived partitions: [qry=" + qry.getSql() + - ", params=" + Arrays.deepToString(qry.getArgs()) + "]", e); - } + }); } Iterable<List<?>> iter = runQueryTwoStep( @@ -2057,7 +2043,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { qry.getTimeout(), cancel, qry.getArgs(), - partitions, + parts, qry.isLazy(), mvccTracker ); @@ -2079,6 +2065,43 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** + * Calculate partitions for the query. + * + * @param explicitParts Explicit partitions provided in SqlFieldsQuery.partitions property. + * @param derivedParts Derived partitions found during partition pruning. + * @param args Arguments. + * @return Calculated partitions or {@code null} if failed to calculate and there should be a broadcast. + */ + @SuppressWarnings("ZeroLengthArrayAllocation") + private int[] calculatePartitions(int[] explicitParts, PartitionResult derivedParts, Object[] args) { + if (!F.isEmpty(explicitParts)) + return explicitParts; + else if (derivedParts != null) { + try { + Collection<Integer> realParts = derivedParts.tree().apply(args); + + if (F.isEmpty(realParts)) + return IgniteUtils.EMPTY_INTS; + else { + int[] realParts0 = new int[realParts.size()]; + + int i = 0; + + for (Integer realPart : realParts) + realParts0[i++] = realPart; + + return realParts0; + } + } + catch (IgniteCheckedException e) { + throw new CacheException("Failed to calculate derived partitions for query.", e); + } + } + + return null; + } + + /** * Do initial parsing of the statement and create query caches, if needed. * @param c Connection. * @param sqlQry Query. @@ -2375,6 +2398,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { dmlProc = new DmlStatementsProcessor(ctx, this); ddlProc = new DdlStatementsProcessor(ctx, schemaMgr); + partExtractor = new PartitionExtractor(this); + if (JdbcUtils.serializer != null) U.warn(log, "Custom H2 serialization is already configured, will override."); @@ -2670,6 +2695,13 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** + * @return Partition extractor. + */ + public PartitionExtractor partitionExtractor() { + return partExtractor; + } + + /** * Collect cache identifiers from two-step query. * * @param mainCacheId Id of main cache. diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionNode.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionAffinityFunctionType.java similarity index 62% copy from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionNode.java copy to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionAffinityFunctionType.java index 238739c..4c88fcb 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionNode.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionAffinityFunctionType.java @@ -17,29 +17,32 @@ package org.apache.ignite.internal.processors.query.h2.affinity; -import org.apache.ignite.IgniteCheckedException; - -import java.util.Collection; - /** - * Common node of partition tree. + * Affinity function type. */ -public interface PartitionNode { +public enum PartitionAffinityFunctionType { + /** Custom affintiy function. */ + CUSTOM(0), + + /** Rendezvous affinity function. */ + RENDEZVOUS(1); + + /** Value. */ + private final int val; + /** - * Get partitions. + * Constructor. * - * @param args Query arguments. - * @return Partitions. - * @throws IgniteCheckedException If failed. + * @param val Value. */ - Collection<Integer> apply(Object... args) throws IgniteCheckedException; + PartitionAffinityFunctionType(int val) { + this.val = val; + } /** - * Try optimizing partition nodes into a simpler form. - * - * @return Optimized node or {@code this} if optimization failed. + * @return Value. */ - default PartitionNode optimize() { - return this; + public int value() { + return val; } } diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionAllNode.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionAllNode.java index 842d82c..30860f5 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionAllNode.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionAllNode.java @@ -41,6 +41,11 @@ public class PartitionAllNode implements PartitionNode { } /** {@inheritDoc} */ + @Override public int joinGroup() { + return PartitionTableModel.GRP_NONE; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(PartitionAllNode.class, this); } diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionCompositeNode.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionCompositeNode.java index 2cb330f..45ceaaf 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionCompositeNode.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionCompositeNode.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.query.h2.affinity; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import java.util.Collection; @@ -65,6 +66,8 @@ public class PartitionCompositeNode implements PartitionNode { return null; // (A, B) and (B, C) -> (B) + leftParts = new HashSet<>(leftParts); + leftParts.retainAll(rightParts); } else { @@ -77,6 +80,8 @@ public class PartitionCompositeNode implements PartitionNode { return leftParts; // (A, B) or (B, C) -> (A, B, C) + leftParts = new HashSet<>(leftParts); + leftParts.addAll(rightParts); } @@ -84,6 +89,12 @@ public class PartitionCompositeNode implements PartitionNode { } /** {@inheritDoc} */ + @Override public int joinGroup() { + // Similar to group node, we cannot cache join group value here as it may be changed dynamically. + return left.joinGroup(); + } + + /** {@inheritDoc} */ @Override public PartitionNode optimize() { PartitionNode left = this.left; PartitionNode right = this.right; @@ -103,9 +114,15 @@ public class PartitionCompositeNode implements PartitionNode { return optimizeSpecial(right, left); // If one of child nodes cannot be optimized, nothing can be done further. - // Note that we cannot return "this" here because left or right parts might have been optimized. - if (left instanceof PartitionCompositeNode || right instanceof PartitionCompositeNode) + // Note that we cannot return "this" here because left or right parts might have been changed. + if (left instanceof PartitionCompositeNode || right instanceof PartitionCompositeNode) { + // Should be "NONE" for AND in fact, but this would violate current non-collocated join semantics as + // explained in "optimizeSimpleAnd" method below. + if (left.joinGroup() != right.joinGroup()) + return PartitionAllNode.INSTANCE; + return new PartitionCompositeNode(left, right, op); + } // Try optimizing composite nodes. if (left instanceof PartitionGroupNode) @@ -182,6 +199,11 @@ public class PartitionCompositeNode implements PartitionNode { private PartitionNode optimizeGroupAnd(PartitionGroupNode left, PartitionNode right) { assert op == PartitionCompositeNodeOperator.AND; + // Should be "NONE" for AND in fact, but this would violate current non-collocated join semantics as + // explained in "optimizeSimpleAnd" method below. + if (left.joinGroup() != right.joinGroup()) + return PartitionAllNode.INSTANCE; + // Optimistic check whether both sides are equal. if (right instanceof PartitionGroupNode) { PartitionGroupNode right0 = (PartitionGroupNode)right; @@ -206,22 +228,50 @@ public class PartitionCompositeNode implements PartitionNode { } if (rightConsts != null) { - // {A, B) and (B, C) -> (B). - consts.retainAll(rightConsts); - - if (consts.isEmpty()) - // {A, B) and (C, D) -> NONE. - return PartitionNoneNode.INSTANCE; - else if (consts.size() == 1) + // Try to merge nodes if they belong to the same table. + boolean sameTbl = true; + String curTblAlias = null; + + for (PartitionSingleNode curConst : consts) { + if (curTblAlias == null) + curTblAlias = curConst.table().alias(); + else if (!F.eq(curTblAlias, curConst.table().alias())) { + sameTbl = false; + + break; + } + } + + if (sameTbl) { + for (PartitionSingleNode curConst : rightConsts) { + if (curTblAlias == null) + curTblAlias = curConst.table().alias(); + else if (!F.eq(curTblAlias, curConst.table().alias())) { + sameTbl = false; + + break; + } + } + } + + if (sameTbl) { // {A, B) and (B, C) -> (B). - return consts.iterator().next(); - else - // {A, B, C) and (B, C, D) -> (B, C). - return new PartitionGroupNode(consts); + consts.retainAll(rightConsts); + + if (consts.isEmpty()) + // {A, B) and (C, D) -> NONE. + return PartitionNoneNode.INSTANCE; + else if (consts.size() == 1) + // {A, B) and (B, C) -> (B). + return consts.iterator().next(); + else + // {A, B, C) and (B, C, D) -> (B, C). + return new PartitionGroupNode(consts); + } } } - // Otherwise it is a mixed set of concrete partitions and arguments. Cancel optimization. + // Otherwise it is a mixed set of concrete partitions and arguments possibly from different caches. // Note that in fact we can optimize expression to certain extent (e.g. (A) and (B, :C) -> (A) and (:C)), // but resulting expression is always composite node still, which cannot be optimized on upper levels. // So we skip any fine-grained optimization in favor of simplicity. @@ -238,6 +288,10 @@ public class PartitionCompositeNode implements PartitionNode { private PartitionNode optimizeGroupOr(PartitionGroupNode left, PartitionNode right) { assert op == PartitionCompositeNodeOperator.OR; + // Cannot merge disjunctive nodes if they belong to different join groups. + if (left.joinGroup() != right.joinGroup()) + return PartitionAllNode.INSTANCE; + HashSet<PartitionSingleNode> siblings = new HashSet<>(left.siblings()); if (right instanceof PartitionSingleNode) @@ -278,18 +332,28 @@ public class PartitionCompositeNode implements PartitionNode { private PartitionNode optimizeSimpleAnd(PartitionSingleNode left, PartitionSingleNode right) { assert op == PartitionCompositeNodeOperator.AND; + // Currently we do not merge such nodes because it may violate existing broken (!!!) join semantics. + // Normally, if we have two non-collocated partition sets, then this should be an empty set for collocated + // query mode. Unfortunately, current semantics of collocated query mode assume that even though both sides + // of expression are located on random nodes, there is a slight chance that they may accidentally reside on + // a single node and hence return some rows. We return "ALL" here to keep this broken semantics consistent + // irrespective of whether partition pruning is used or not. Once non-collocated joins are fixed, this + // condition will be changed to "NONE". + if (left.joinGroup() != right.joinGroup()) + return PartitionAllNode.INSTANCE; + // Check if both sides are equal. if (left.equals(right)) // (X) and (X) -> X // (:X) and (:X) -> :X return left; - // If both sides are constants, and they are not equal, this is empty set. - if (left.constant() && right.constant()) + // If both sides are constants from the same table and they are not equal, this is empty set. + if (left.constant() && right.constant() && F.eq(left.table().alias(), right.tbl.alias())) // X and Y -> NONE return PartitionNoneNode.INSTANCE; - // Otherwise it is a mixed set, cannot reduce. + // Otherwise this is a mixed set, cannot reduce. // X and :Y -> (X) AND (:Y) return new PartitionCompositeNode(left, right, PartitionCompositeNodeOperator.AND); } @@ -304,7 +368,21 @@ public class PartitionCompositeNode implements PartitionNode { private PartitionNode optimizeSimpleOr(PartitionSingleNode left, PartitionSingleNode right) { assert op == PartitionCompositeNodeOperator.OR; - return left.equals(right) ? left : PartitionGroupNode.merge(left, right); + // Cannot merge disjunctive nodes if they belong to different join groups. + if (left.joinGroup() != right.joinGroup()) + return PartitionAllNode.INSTANCE; + + // (A) or (A) -> (A) + if (left.equals(right)) + return left; + + // (A) or (B) -> (A, B) + HashSet<PartitionSingleNode> nodes = new HashSet<>(); + + nodes.add(left); + nodes.add(right); + + return new PartitionGroupNode(nodes); } /** {@inheritDoc} */ diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionConstantNode.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionConstantNode.java index 9efafe4..9e258ae 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionConstantNode.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionConstantNode.java @@ -29,11 +29,11 @@ public class PartitionConstantNode extends PartitionSingleNode { /** * Constructor. * - * @param resolver Resolver. + * @param tbl Table. * @param part Partition. */ - public PartitionConstantNode(PartitionTableDescriptor resolver, int part) { - super(resolver); + public PartitionConstantNode(PartitionTable tbl, int part) { + super(tbl); this.part = part; } diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionExtractor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionExtractor.java index 12549ce..0898e63 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionExtractor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionExtractor.java @@ -21,8 +21,12 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; +import org.apache.ignite.internal.processors.query.h2.H2Utils; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAlias; @@ -30,6 +34,7 @@ import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAst; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlColumn; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlConst; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlElement; +import org.apache.ignite.internal.processors.query.h2.sql.GridSqlJoin; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlOperation; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlOperationType; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlParameter; @@ -41,6 +46,9 @@ import org.h2.table.Column; import org.h2.value.Value; import org.jetbrains.annotations.Nullable; +import java.util.ArrayList; +import java.util.Collections; + /** * Partition tree extractor. */ @@ -84,14 +92,11 @@ public class PartitionExtractor { GridSqlSelect select = (GridSqlSelect)qry; - // Currently we can extract data only from a single table. - GridSqlTable tbl = unwrapTable(select.from()); - - if (tbl == null) - return null; + // Prepare table model. + PartitionTableModel tblModel = prepareTableModel(select.from()); // Do extract. - PartitionNode tree = extractFromExpression(select.where()); + PartitionNode tree = extractFromExpression(select.where(), tblModel, false); assert tree != null; @@ -101,10 +106,8 @@ public class PartitionExtractor { if (tree instanceof PartitionAllNode) return null; - // Return. - PartitionTableDescriptor desc = descriptor(tbl.dataTable()); - - return new PartitionResult(desc, tree); + // Done. + return new PartitionResult(tree, tblModel.joinGroupAffinity(tree.joinGroup())); } /** @@ -114,21 +117,25 @@ public class PartitionExtractor { * @return Partition result or {@code null} if nothing is resolved. */ @SuppressWarnings("IfMayBeConditional") - public PartitionResult merge(List<GridCacheSqlQuery> qrys) { + public PartitionResult mergeMapQueries(List<GridCacheSqlQuery> qrys) { // Check if merge is possible. - PartitionTableDescriptor desc = null; + PartitionTableAffinityDescriptor aff = null; for (GridCacheSqlQuery qry : qrys) { PartitionResult qryRes = (PartitionResult)qry.derivedPartitions(); + // Failed to get results for one query -> broadcast. if (qryRes == null) - // Failed to get results for one query -> broadcast. return null; - if (desc == null) - desc = qryRes.descriptor(); - else if (!F.eq(desc, qryRes.descriptor())) - // Queries refer to different tables, cannot merge -> broadcast. + // This only possible if query is resolved to "NONE". Will be skipped later during map request prepare. + if (qryRes.affinity() == null) + continue; + + if (aff == null) + aff = qryRes.affinity(); + else if (!aff.isCompatible(qryRes.affinity())) + // Queries refer to incompatible affinity groups, cannot merge -> broadcast. return null; } @@ -152,33 +159,232 @@ public class PartitionExtractor { if (tree instanceof PartitionAllNode) return null; - return new PartitionResult(desc, tree); + // If there is no affinity, then we assume "NONE" result. + assert aff != null || tree == PartitionNoneNode.INSTANCE; + + return new PartitionResult(tree, aff); } /** - * Try unwrapping the table. + * Prepare table model. * - * @param from From. - * @return Table or {@code null} if not a table. + * @param from FROM clause. + * @return Join model. */ - @Nullable private static GridSqlTable unwrapTable(GridSqlAst from) { - if (from instanceof GridSqlAlias) - from = from.child(); + private PartitionTableModel prepareTableModel(GridSqlAst from) { + PartitionTableModel res = new PartitionTableModel(); + + prepareTableModel0(from, res); - if (from instanceof GridSqlTable) - return (GridSqlTable)from; + return res; + } + + /** + * Prepare tables which will be used in join model. + * + * @param from From flag. + * @param model Table model. + * @return {@code True} if extracted tables successfully, {@code false} if failed to extract. + */ + private List<PartitionTable> prepareTableModel0(GridSqlAst from, PartitionTableModel model) { + if (from instanceof GridSqlJoin) { + // Process JOIN recursively. + GridSqlJoin join = (GridSqlJoin)from; + + List<PartitionTable> leftTbls = prepareTableModel0(join.leftTable(), model); + List<PartitionTable> rightTbls = prepareTableModel0(join.rightTable(), model); + + if (join.isLeftOuter()) { + // "a LEFT JOIN b" is transformed into "a", and "b" is put into special stop-list. + // If a condition is met on "b" afterwards, we will ignore it. + for (PartitionTable rightTbl : rightTbls) + model.addExcludedTable(rightTbl.alias()); + + return leftTbls; + } + + // Extract equi-join or cross-join from condition. For normal INNER JOINs most likely we will have "1=1" + // cross join here, real join condition will be found in WHERE clause later. + PartitionJoinCondition cond = parseJoinCondition(join.on()); + + if (cond != null && !cond.cross()) + model.addJoin(cond); + + ArrayList<PartitionTable> res = new ArrayList<>(leftTbls.size() + rightTbls.size()); + + res.addAll(leftTbls); + res.addAll(rightTbls); + + return res; + } + + PartitionTable tbl = prepareTable(from, model); + + return tbl != null ? Collections.singletonList(tbl) : Collections.emptyList(); + } + + /** + * Try parsing condition as simple JOIN codition. Only equijoins are supported for now, so anything more complex + * than "A.a = B.b" are not processed. + * + * @param on Initial AST. + * @return Join condition or {@code null} if not simple equijoin. + */ + private static PartitionJoinCondition parseJoinCondition(GridSqlElement on) { + if (on instanceof GridSqlOperation) { + GridSqlOperation on0 = (GridSqlOperation)on; + + if (on0.operationType() == GridSqlOperationType.EQUAL) { + // Check for cross-join first. + GridSqlConst leftConst = unwrapConst(on0.child(0)); + GridSqlConst rightConst = unwrapConst(on0.child(1)); + + if (leftConst != null && rightConst != null) { + try { + int leftConstval = leftConst.value().getInt(); + int rightConstVal = rightConst.value().getInt(); + + if (leftConstval == rightConstVal) + return PartitionJoinCondition.CROSS; + } + catch (Exception ignore) { + // No-op. + } + } + + // This is not cross-join, neither normal join between columns. + if (leftConst != null || rightConst != null) + return null; + + // Check for normal equi-join. + GridSqlColumn left = unwrapColumn(on0.child(0)); + GridSqlColumn right = unwrapColumn(on0.child(1)); + + if (left != null && right != null) { + String leftAlias = left.tableAlias(); + String rightAlias = right.tableAlias(); + + String leftCol = left.columnName(); + String rightCol = right.columnName(); + + return new PartitionJoinCondition(leftAlias, rightAlias, leftCol, rightCol); + } + } + } return null; } /** + * Prepare single table. + * + * @param from Expression. + * @param tblModel Table model. + * @return Added table or {@code null} if table is exlcuded from the model. + */ + private static PartitionTable prepareTable(GridSqlAst from, PartitionTableModel tblModel) { + // Unwrap alias. We assume that every table must be aliased. + assert from instanceof GridSqlAlias; + + String alias = ((GridSqlAlias)from).alias(); + + from = from.child(); + + if (from instanceof GridSqlTable) { + // Normal table. + GridSqlTable from0 = (GridSqlTable)from; + + GridH2Table tbl0 = from0.dataTable(); + + // Unknown table type, e.g. temp table. + if (tbl0 == null) { + tblModel.addExcludedTable(alias); + + return null; + } + + String cacheName = tbl0.cacheName(); + + String affColName = null; + String secondAffColName = null; + + for (Column col : tbl0.getColumns()) { + if (tbl0.isColumnForPartitionPruningStrict(col)) { + if (affColName == null) + affColName = col.getName(); + else { + secondAffColName = col.getName(); + + // Break as we cannot have more than two affinity key columns. + break; + } + } + } + + PartitionTable tbl = new PartitionTable(alias, cacheName, affColName, secondAffColName); + + PartitionTableAffinityDescriptor aff = affinityForCache(tbl0.cacheInfo().config()); + + if (aff == null) { + // Non-standard affinity, exclude table. + tblModel.addExcludedTable(alias); + + return null; + } + + tblModel.addTable(tbl, aff); + + return tbl; + } + else { + // Subquery/union/view, etc. + assert alias != null; + + tblModel.addExcludedTable(alias); + + return null; + } + } + + /** + * Prepare affinity identifier for cache. + * + * @param ccfg Cache configuration. + * @return Affinity identifier. + */ + private static PartitionTableAffinityDescriptor affinityForCache(CacheConfiguration ccfg) { + // Partition could be extracted only from PARTITIONED caches. + if (ccfg.getCacheMode() != CacheMode.PARTITIONED) + return null; + + PartitionAffinityFunctionType aff = ccfg.getAffinity().getClass().equals(RendezvousAffinityFunction.class) ? + PartitionAffinityFunctionType.RENDEZVOUS : PartitionAffinityFunctionType.CUSTOM; + + boolean hasNodeFilter = ccfg.getNodeFilter() != null && + !(ccfg.getNodeFilter() instanceof CacheConfiguration.IgniteAllNodesPredicate); + + return new PartitionTableAffinityDescriptor( + aff, + ccfg.getAffinity().partitions(), + hasNodeFilter, + ccfg.getDataRegionName() + ); + } + + /** * Extract partitions from expression. * * @param expr Expression. + * @param tblModel Table model. + * @param disjunct Whether current processing frame is located under disjunction ("OR"). In this case we cannot + * rely on join expressions like (A.a = B.b) to build co-location model because another conflicting + * join expression on the same tables migth be located on the other side of the "OR". + * Example: "JOIN ON A.a = B.b OR A.a > B.b". * @return Partition tree. */ @SuppressWarnings("EnumSwitchStatementWhichMissesCases") - private PartitionNode extractFromExpression(GridSqlAst expr) throws IgniteCheckedException { + private PartitionNode extractFromExpression(GridSqlAst expr, PartitionTableModel tblModel, boolean disjunct) + throws IgniteCheckedException { PartitionNode res = PartitionAllNode.INSTANCE; if (expr instanceof GridSqlOperation) { @@ -186,22 +392,22 @@ public class PartitionExtractor { switch (op.operationType()) { case AND: - res = extractFromAnd(op); + res = extractFromAnd(op, tblModel, disjunct); break; case OR: - res = extractFromOr(op); + res = extractFromOr(op, tblModel); break; case IN: - res = extractFromIn(op); + res = extractFromIn(op, tblModel); break; case EQUAL: - res = extractFromEqual(op); + res = extractFromEqual(op, tblModel, disjunct); } } @@ -213,18 +419,21 @@ public class PartitionExtractor { * Extract partition information from AND. * * @param op Operation. + * @param tblModel Table model. + * @param disjunct Disjunction marker. * @return Partition. */ - private PartitionNode extractFromAnd(GridSqlOperation op) throws IgniteCheckedException { + private PartitionNode extractFromAnd(GridSqlOperation op, PartitionTableModel tblModel, boolean disjunct) + throws IgniteCheckedException { assert op.size() == 2; - PartitionNode betweenNodes = tryExtractBetween(op); + PartitionNode betweenNodes = tryExtractBetween(op, tblModel); if (betweenNodes != null) return betweenNodes; - PartitionNode part1 = extractFromExpression(op.child(0)); - PartitionNode part2 = extractFromExpression(op.child(1)); + PartitionNode part1 = extractFromExpression(op.child(0), tblModel, disjunct); + PartitionNode part2 = extractFromExpression(op.child(1), tblModel, disjunct); return new PartitionCompositeNode(part1, part2, PartitionCompositeNodeOperator.AND); } @@ -233,13 +442,16 @@ public class PartitionExtractor { * Extract partition information from OR. * * @param op Operation. + * @param tblModel Table model. * @return Partition. */ - private PartitionNode extractFromOr(GridSqlOperation op) throws IgniteCheckedException { + private PartitionNode extractFromOr(GridSqlOperation op, PartitionTableModel tblModel) + throws IgniteCheckedException { assert op.size() == 2; - PartitionNode part1 = extractFromExpression(op.child(0)); - PartitionNode part2 = extractFromExpression(op.child(1)); + // Parse inner expressions recursively with disjuncion flag set. + PartitionNode part1 = extractFromExpression(op.child(0), tblModel, true); + PartitionNode part2 = extractFromExpression(op.child(1), tblModel, true); return new PartitionCompositeNode(part1, part2, PartitionCompositeNodeOperator.OR); } @@ -248,9 +460,11 @@ public class PartitionExtractor { * Extract partition information from IN. * * @param op Operation. + * @param tblModel Table model. * @return Partition. */ - private PartitionNode extractFromIn(GridSqlOperation op) throws IgniteCheckedException { + private PartitionNode extractFromIn(GridSqlOperation op, PartitionTableModel tblModel) + throws IgniteCheckedException { // Operation should contain at least two children: left (column) and right (const or column). if (op.size() < 2) return PartitionAllNode.INSTANCE; @@ -258,11 +472,9 @@ public class PartitionExtractor { // Left operand should be column. GridSqlAst left = op.child(); - GridSqlColumn leftCol; + GridSqlColumn leftCol = unwrapColumn(left); - if (left instanceof GridSqlColumn) - leftCol = (GridSqlColumn)left; - else + if (leftCol == null) return PartitionAllNode.INSTANCE; // Can work only with Ignite tables. @@ -291,8 +503,8 @@ public class PartitionExtractor { // set globally. Hence, returning null. return PartitionAllNode.INSTANCE; - // Do extract. - PartitionSingleNode part = extractSingle(leftCol.column(), rightConst, rightParam); + // Extract. + PartitionSingleNode part = extractSingle(leftCol, rightConst, rightParam, tblModel); // Same thing as above: single unknown partition in disjunction defeats optimization. if (part == null) @@ -308,19 +520,20 @@ public class PartitionExtractor { * Extract partition information from equality. * * @param op Operation. + * @param tblModel Table model. + * @param disjunct Disjunction flag. When set possible join expression will not be processed. * @return Partition. */ - private PartitionNode extractFromEqual(GridSqlOperation op) throws IgniteCheckedException { + private PartitionNode extractFromEqual(GridSqlOperation op, PartitionTableModel tblModel, boolean disjunct) + throws IgniteCheckedException { assert op.operationType() == GridSqlOperationType.EQUAL; GridSqlElement left = op.child(0); GridSqlElement right = op.child(1); - GridSqlColumn leftCol; + GridSqlColumn leftCol = unwrapColumn(left); - if (left instanceof GridSqlColumn) - leftCol = (GridSqlColumn)left; - else + if (leftCol == null) return PartitionAllNode.INSTANCE; if (!(leftCol.column().getTable() instanceof GridH2Table)) @@ -337,10 +550,20 @@ public class PartitionExtractor { rightConst = null; rightParam = (GridSqlParameter)right; } - else + else { + if (right instanceof GridSqlColumn) { + if (!disjunct) { + PartitionJoinCondition cond = parseJoinCondition(op); + + if (cond != null && !cond.cross()) + tblModel.addJoin(cond); + } + + } return PartitionAllNode.INSTANCE; + } - PartitionSingleNode part = extractSingle(leftCol.column(), rightConst, rightParam); + PartitionSingleNode part = extractSingle(leftCol, rightConst, rightParam, tblModel); return part != null ? part : PartitionAllNode.INSTANCE; } @@ -351,52 +574,81 @@ public class PartitionExtractor { * @param leftCol Left column. * @param rightConst Right constant. * @param rightParam Right parameter. + * @param tblModel Table model. * @return Partition or {@code null} if failed to extract. */ - @Nullable private PartitionSingleNode extractSingle(Column leftCol, GridSqlConst rightConst, - GridSqlParameter rightParam) throws IgniteCheckedException { + @Nullable private PartitionSingleNode extractSingle( + GridSqlColumn leftCol, + GridSqlConst rightConst, + GridSqlParameter rightParam, + PartitionTableModel tblModel + ) throws IgniteCheckedException { assert leftCol != null; - assert leftCol.getTable() != null; - assert leftCol.getTable() instanceof GridH2Table; - GridH2Table tbl = (GridH2Table)leftCol.getTable(); + Column leftCol0 = leftCol.column(); - if (!tbl.isColumnForPartitionPruning(leftCol)) + assert leftCol0.getTable() != null; + assert leftCol0.getTable() instanceof GridH2Table; + + GridH2Table tbl = (GridH2Table)leftCol0.getTable(); + + if (!tbl.isColumnForPartitionPruning(leftCol0)) return null; - PartitionTableDescriptor tblDesc = descriptor(tbl); + PartitionTable tbl0 = tblModel.table(leftCol.tableAlias()); + + // If table is in ignored set, then we cannot use it for partition extraction. + if (tbl0 == null) + return null; if (rightConst != null) { - int part = idx.kernalContext().affinity().partition(tbl.cacheName(), rightConst.value().getObject()); + Object constVal = H2Utils.convert(rightConst.value().getObject(), idx, leftCol0.getType()); + + int part = idx.kernalContext().affinity().partition(tbl.cacheName(), constVal); - return new PartitionConstantNode(tblDesc, part); + return new PartitionConstantNode(tbl0, part); } else if (rightParam != null) - return new PartitionParameterNode(tblDesc, idx, rightParam.index(), leftCol.getType()); + return new PartitionParameterNode(tbl0, idx, rightParam.index(), leftCol0.getType()); else return null; } /** - * Get descriptor from table. + * Unwrap constant if possible. + * + * @param ast AST. + * @return Constant or {@code null} if not a constant. + */ + @Nullable public static GridSqlConst unwrapConst(GridSqlAst ast) { + return ast instanceof GridSqlConst ? (GridSqlConst)ast : null; + } + + /** + * Unwrap column if possible. * - * @param tbl Table. - * @return Descriptor. + * @param ast AST. + * @return Column or {@code null} if not a column. */ - private static PartitionTableDescriptor descriptor(GridH2Table tbl) { - return new PartitionTableDescriptor(tbl.cacheName(), tbl.getName()); + @Nullable public static GridSqlColumn unwrapColumn(GridSqlAst ast) { + if (ast instanceof GridSqlAlias) + ast = ast.child(); + + return ast instanceof GridSqlColumn ? (GridSqlColumn)ast : null; } /** * Try to extract partitions from {@code op} assuming that it's between operation or simple range. * * @param op Sql operation. + * @param tblModel Table model. * @return {@code PartitionSingleNode} if operation reduced to one partition, * {@code PartitionGroupNode} if operation reduced to multiple partitions or null if operation is neither * between nor simple range. Null also returns if it's not possible to extract partitions from given operation. * @throws IgniteCheckedException If failed. */ - private PartitionNode tryExtractBetween(GridSqlOperation op) throws IgniteCheckedException { + private PartitionNode tryExtractBetween(GridSqlOperation op, PartitionTableModel tblModel) + throws IgniteCheckedException { // Between operation (or similar range) should contain exact two children. assert op.size() == 2; @@ -487,11 +739,18 @@ public class PartitionExtractor { Set<PartitionSingleNode> parts = new HashSet<>(); - PartitionTableDescriptor desc = descriptor(tbl); + PartitionTable tbl0 = tblModel.table(leftCol.tableAlias()); + + // If table is in ignored set, then we cannot use it for partition extraction. + if (tbl0 == null) + return null; for (long i = leftLongVal; i <= rightLongVal; i++) { - parts.add(new PartitionConstantNode(desc, - idx.kernalContext().affinity().partition((tbl).cacheName(), i))); + Object constVal = H2Utils.convert(i, idx, leftCol.column().getType()); + + int part = idx.kernalContext().affinity().partition(tbl0.cacheName(), constVal); + + parts.add(new PartitionConstantNode(tbl0, part)); if (parts.size() > maxPartsCntBetween) return null; diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionGroupNode.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionGroupNode.java index ef3a154..3d66439 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionGroupNode.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionGroupNode.java @@ -35,22 +35,6 @@ public class PartitionGroupNode implements PartitionNode { private final Set<PartitionSingleNode> siblings; /** - * Merge two simple nodes. - * - * @param node1 Node 1. - * @param node2 Node 2. - * @return Group node. - */ - public static PartitionGroupNode merge(PartitionSingleNode node1, PartitionSingleNode node2) { - HashSet<PartitionSingleNode> nodes = new HashSet<>(); - - nodes.add(node1); - nodes.add(node2); - - return new PartitionGroupNode(nodes); - } - - /** * Constructor. * * @param siblings Partitions. @@ -72,6 +56,13 @@ public class PartitionGroupNode implements PartitionNode { return res; } + /** {@inheritDoc} */ + @Override public int joinGroup() { + // Note that we cannot cache join group in constructor. We have strong invariant that all siblings always + // belongs to the same group. However, number of this group may be changed during expression tree traversing. + return siblings.iterator().next().joinGroup(); + } + /** * @return Siblings */ diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionJoinCondition.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionJoinCondition.java new file mode 100644 index 0000000..244c301 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionJoinCondition.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.affinity; + +import org.apache.ignite.internal.util.typedef.F; + +/** + * Join condition. + */ +public class PartitionJoinCondition { + /** Cross JOIN. */ + public static final PartitionJoinCondition CROSS = new PartitionJoinCondition(null, null, null, null, true); + + /** Left alias. */ + private final String leftAlias; + + /** Right alias. */ + private final String rightAlias; + + /** Left column name. */ + private final String leftCol; + + /** Right column name. */ + private final String rightCol; + + /** Whether this is a cross-join. */ + private final boolean cross; + + /** + * Constructor. + * + * @param leftAlias Left alias. + * @param rightAlias Right alias. + * @param leftCol Left column name. + * @param rightCol Right column name. + */ + public PartitionJoinCondition(String leftAlias, String rightAlias, String leftCol, String rightCol) { + this(leftAlias, rightAlias, leftCol, rightCol, false); + } + + /** + * Constructor. + * + * @param leftAlias Left alias. + * @param rightAlias Right alias. + * @param leftCol Left column name. + * @param rightCol Right column name. + * @param cross Whether this is a cross-join. + */ + private PartitionJoinCondition(String leftAlias, String rightAlias, String leftCol, String rightCol, + boolean cross) { + this.leftAlias = leftAlias; + this.rightAlias = rightAlias; + this.leftCol = leftCol; + this.rightCol = rightCol; + this.cross = cross; + } + + /** + * Left alias. + */ + public String leftAlias() { + return leftAlias; + } + + /** + * Right alias. + */ + public String rightAlias() { + return rightAlias; + } + + /** + * @return Left column. + */ + public String leftColumn() { + return leftCol; + } + + /** + * @return Right column. + */ + public String rightColumn() { + return rightCol; + } + + /** + * @return Wheter this is a cross-join. + */ + public boolean cross() { + return cross; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = leftAlias.hashCode(); + + res = 31 * res + rightAlias.hashCode(); + res = 31 * res + leftCol.hashCode(); + res = 31 * res + rightCol.hashCode(); + res = 31 * res + Boolean.hashCode(cross); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + if (obj instanceof PartitionJoinCondition) { + PartitionJoinCondition other = (PartitionJoinCondition)obj; + + return F.eq(leftAlias, other.leftAlias) && F.eq(rightAlias, other.rightAlias) && + F.eq(leftCol, other.leftCol) && F.eq(rightCol, other.rightCol) && F.eq(cross, other.cross); + } + + return false; + } +} diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionJoinGroup.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionJoinGroup.java new file mode 100644 index 0000000..641d013 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionJoinGroup.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.affinity; + +import java.util.Collection; +import java.util.Collections; +import java.util.IdentityHashMap; + +/** + * Group of joined tables whose affinity function could be "merged". + */ +@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType") +public class PartitionJoinGroup { + /** Tables within a group. */ + private final Collection<PartitionTable> tbls = Collections.newSetFromMap(new IdentityHashMap<>()); + + /** Affinity function descriptor. */ + private final PartitionTableAffinityDescriptor affDesc; + + /** + * Constructor. + * + * @param affDesc Affinity function descriptor. + */ + public PartitionJoinGroup(PartitionTableAffinityDescriptor affDesc) { + this.affDesc = affDesc; + } + + /** + * @return Tables in a group. + */ + public Collection<PartitionTable> tables() { + return tbls; + } + + /** + * Add table to the group. + * + * @param tbl Table. + * @return This for chaining. + */ + public PartitionJoinGroup addTable(PartitionTable tbl) { + tbls.add(tbl); + + return this; + } + + /** + * Remove table from the group. + * + * @param tbl Table. + * @return If group is empty after removal. + */ + public boolean removeTable(PartitionTable tbl) { + tbls.remove(tbl); + + return tbls.isEmpty(); + } + + /** + * @return Affinity descriptor. + */ + public PartitionTableAffinityDescriptor affinityDescriptor() { + return affDesc; + } +} diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionNode.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionNode.java index 238739c..7372fc2 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionNode.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionNode.java @@ -35,6 +35,11 @@ public interface PartitionNode { Collection<Integer> apply(Object... args) throws IgniteCheckedException; /** + * @return Join group for the given node. + */ + int joinGroup(); + + /** * Try optimizing partition nodes into a simpler form. * * @return Optimized node or {@code this} if optimization failed. diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionNoneNode.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionNoneNode.java index b3a1358..5d4b324 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionNoneNode.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionNoneNode.java @@ -42,6 +42,11 @@ public class PartitionNoneNode implements PartitionNode { } /** {@inheritDoc} */ + @Override public int joinGroup() { + return PartitionTableModel.GRP_NONE; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(PartitionNoneNode.class, this); } diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionParameterNode.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionParameterNode.java index 0624f2c..e9f4880 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionParameterNode.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionParameterNode.java @@ -45,7 +45,7 @@ public class PartitionParameterNode extends PartitionSingleNode { * @param idx Parameter index. * @param dataType Parameter data type. */ - public PartitionParameterNode(PartitionTableDescriptor tbl, IgniteH2Indexing indexing, int idx, + public PartitionParameterNode(PartitionTable tbl, IgniteH2Indexing indexing, int idx, int dataType) { super(tbl); diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionResult.java index 13e7f87..daa14d3 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionResult.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionResult.java @@ -24,37 +24,36 @@ import org.apache.ignite.internal.util.typedef.internal.S; * Partition extraction result. */ public class PartitionResult { - /** Descriptor. */ - @GridToStringInclude - private final PartitionTableDescriptor desc; - /** Tree. */ @GridToStringInclude private final PartitionNode tree; + /** Affinity function. */ + private final PartitionTableAffinityDescriptor aff; + /** * Constructor. * - * @param desc Descriptor. * @param tree Tree. + * @param aff Affinity function. */ - public PartitionResult(PartitionTableDescriptor desc, PartitionNode tree) { - this.desc = desc; + public PartitionResult(PartitionNode tree, PartitionTableAffinityDescriptor aff) { this.tree = tree; + this.aff = aff; } /** - * Descriptor. + * Tree. */ - public PartitionTableDescriptor descriptor() { - return desc; + public PartitionNode tree() { + return tree; } /** - * Tree. + * @return Affinity function. */ - public PartitionNode tree() { - return tree; + public PartitionTableAffinityDescriptor affinity() { + return aff; } /** {@inheritDoc} */ diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionSingleNode.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionSingleNode.java index caf966c..35e7d30 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionSingleNode.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionSingleNode.java @@ -30,14 +30,14 @@ import java.util.Collections; public abstract class PartitionSingleNode implements PartitionNode { /** Table descriptor. */ @GridToStringExclude - protected final PartitionTableDescriptor tbl; + protected final PartitionTable tbl; /** * Constructor. * * @param tbl Table descriptor. */ - protected PartitionSingleNode(PartitionTableDescriptor tbl) { + protected PartitionSingleNode(PartitionTable tbl) { this.tbl = tbl; } @@ -59,17 +59,29 @@ public abstract class PartitionSingleNode implements PartitionNode { */ public abstract boolean constant(); + /** {@inheritDoc} */ + @Override public int joinGroup() { + return tbl.joinGroup(); + } + /** * @return Partition for constant node, index for argument node. */ public abstract int value(); + /** + * @return Underlying table. + */ + public PartitionTable table() { + return tbl; + } + /** {@inheritDoc} */ @Override public int hashCode() { int hash = (constant() ? 1 : 0); hash = 31 * hash + value(); - hash = 31 * hash + tbl.hashCode(); + hash = 31 * hash + tbl.alias().hashCode(); return hash; } @@ -84,6 +96,7 @@ public abstract class PartitionSingleNode implements PartitionNode { PartitionSingleNode other = (PartitionSingleNode)obj; - return F.eq(constant(), other.constant()) && F.eq(value(), other.value()) && F.eq(tbl, other.tbl); + return F.eq(constant(), other.constant()) && F.eq(value(), other.value()) && + F.eq(tbl.alias(), other.tbl.alias()); } } diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTable.java new file mode 100644 index 0000000..1b996c1 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTable.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.affinity; + +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +/** + * Single table with affinity info. + */ +public class PartitionTable { + /** Alias used in the query. */ + private final String alias; + + /** Cache name. */ + private final String cacheName; + + /** Affinity column name (if can be resolved). */ + private final String affColName; + + /** Second affinity column name (possible when _KEY is affinity column and an alias for this column exists. */ + private final String secondAffColName; + + /** Join group index. */ + private int joinGrp; + + /** + * Constructor. + * + * @param alias Unique alias. + * @param cacheName Cache name. + * @param affColName Affinity column name. + * @param secondAffColName Second affinity column name. + */ + public PartitionTable( + String alias, + String cacheName, + @Nullable String affColName, + @Nullable String secondAffColName + ) { + this.alias = alias; + this.cacheName = cacheName; + + if (affColName == null && secondAffColName != null) { + this.affColName = secondAffColName; + this.secondAffColName = null; + } + else { + this.affColName = affColName; + this.secondAffColName = secondAffColName; + } + } + + /** + * @return Alias. + */ + public String alias() { + return alias; + } + + /** + * @return Cache name. + */ + public String cacheName() { + return cacheName; + } + + /** + * Check whether passed column is affinity column. + * + * @param colName Column name. + * @return {@code True} if affinity column. + */ + @SuppressWarnings("BooleanMethodIsAlwaysInverted") + public boolean isAffinityColumn(String colName) { + return F.eq(colName, affColName) || F.eq(colName, secondAffColName); + } + + /** + * @return Join group index. + */ + public int joinGroup() { + return joinGrp; + } + + /** + * @param joinGrp Join group index. + */ + public void joinGroup(int joinGrp) { + this.joinGrp = joinGrp; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(PartitionTable.class, this); + } +} diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTableAffinityDescriptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTableAffinityDescriptor.java new file mode 100644 index 0000000..21dab9c --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTableAffinityDescriptor.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.affinity; + +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; + +import java.io.Serializable; + +/** + * Affinity function descriptor. Used to compare affinity functions of two tables. + */ +public class PartitionTableAffinityDescriptor implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** Affinity function type. */ + private final PartitionAffinityFunctionType affFunc; + + /** Number of partitions. */ + private final int parts; + + /** Whether node filter is set. */ + private final boolean hasNodeFilter; + + /** Data region name. */ + private final String dataRegion; + + /** + * Constructor. + * + * @param affFunc Affinity function type. + * @param parts Number of partitions. + * @param hasNodeFilter Whether node filter is set. + * @param dataRegion Data region. + */ + public PartitionTableAffinityDescriptor( + PartitionAffinityFunctionType affFunc, + int parts, + boolean hasNodeFilter, + String dataRegion + ) { + this.affFunc = affFunc; + this.parts = parts; + this.hasNodeFilter = hasNodeFilter; + this.dataRegion = dataRegion; + } + + /** + * Check is provided descriptor is compatible with this instance (i.e. can be used in the same co-location group). + * + * @param other Other descriptor. + * @return {@code True} if compatible. + */ + @SuppressWarnings("BooleanMethodIsAlwaysInverted") + public boolean isCompatible(PartitionTableAffinityDescriptor other) { + if (other == null) + return false; + + // Rendezvous affinity function is deterministic and doesn't depend on previous cluster view changes. + // In future other user affinity functions would be applicable as well if explicityl marked deterministic. + if (affFunc == PartitionAffinityFunctionType.RENDEZVOUS) { + // We cannot be sure that two caches are co-located if custom node filter is present. + // Nota that technically we may try to compare two filters. However, this adds unnecessary complexity + // and potential deserialization issues when SQL is called from client nodes or thin clients. + if (!hasNodeFilter) { + return + other.affFunc == PartitionAffinityFunctionType.RENDEZVOUS && + !other.hasNodeFilter && + other.parts == parts && + F.eq(other.dataRegion, dataRegion); + } + } + + return false; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(PartitionTableAffinityDescriptor.class, this); + } +} diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTableDescriptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTableDescriptor.java deleted file mode 100644 index b11e07e..0000000 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTableDescriptor.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query.h2.affinity; - -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.S; - -/** - * Partition resolver. - */ -public class PartitionTableDescriptor { - /** Cache name. */ - private final String cacheName; - - /** Table name. */ - private final String tblName; - - /** - * Constructor. - * - * @param cacheName Cache name. - * @param tblName Table name. - */ - public PartitionTableDescriptor(String cacheName, String tblName) { - this.cacheName = cacheName; - this.tblName = tblName; - } - - /** - * @return Cache name. - */ - public String cacheName() { - return cacheName; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return 31 * cacheName.hashCode() + tblName.hashCode(); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (o == this) - return true; - - if (o.getClass() != getClass()) - return false; - - PartitionTableDescriptor other = (PartitionTableDescriptor)o; - - return F.eq(cacheName, other.cacheName) && F.eq(tblName, other.tblName); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(PartitionTableDescriptor.class, this); - } -} diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTableModel.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTableModel.java new file mode 100644 index 0000000..6393941 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTableModel.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.affinity; + +import org.jetbrains.annotations.Nullable; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Partition join model. Describes how tables are joined with each other. + */ +public class PartitionTableModel { + /** Join group which could not be applied (e.g. for "ALL" case). */ + public static final int GRP_NONE = -1; + + /** All tables observed during parsing excluding outer. */ + private final Map<String, PartitionTable> tbls = new HashMap<>(); + + /** Join groups. */ + private final Map<Integer, PartitionJoinGroup> grps = new HashMap<>(); + + /** Talbes which are excluded from partition pruning calculation. */ + private Set<String> excludedTblNames; + + /** Group index generator */ + private int grpIdxGen; + + /** + * Add table. + * + * @param tbl Table. + * @param aff Affinity descriptor. + */ + public void addTable(PartitionTable tbl, PartitionTableAffinityDescriptor aff) { + int grpIdx = grpIdxGen++; + + tbl.joinGroup(grpIdx); + + tbls.put(tbl.alias(), tbl); + grps.put(grpIdx, new PartitionJoinGroup(aff).addTable(tbl)); + } + + /** + * Get table by alias. + * + * @param alias Alias. + * @return Table or {@code null} if it cannot be used for partition pruning. + */ + @Nullable public PartitionTable table(String alias) { + PartitionTable res = tbls.get(alias); + + assert res != null || (excludedTblNames != null && excludedTblNames.contains(alias)); + + return res; + } + + /** + * Add excluded table + * + * @param alias Alias. + */ + public void addExcludedTable(String alias) { + PartitionTable tbl = tbls.remove(alias); + + if (tbl != null) { + PartitionJoinGroup grp = grps.get(tbl.joinGroup()); + + assert grp != null; + + if (grp.removeTable(tbl)) + grps.remove(tbl.joinGroup()); + } + + if (excludedTblNames == null) + excludedTblNames = new HashSet<>(); + + excludedTblNames.add(alias); + } + + /** + * Add equi-join condition. Two joined tables may possibly be merged into a single group. + * + * @param cond Condition. + */ + public void addJoin(PartitionJoinCondition cond) { + PartitionTable leftTbl = tbls.get(cond.leftAlias()); + PartitionTable rightTbl = tbls.get(cond.rightAlias()); + + assert leftTbl != null || (excludedTblNames != null && excludedTblNames.contains(cond.leftAlias())); + assert rightTbl != null || (excludedTblNames != null && excludedTblNames.contains(cond.rightAlias())); + + // At least one tables is excluded, return. + if (leftTbl == null || rightTbl == null) + return; + + // At least one column in condition is not affinity column, return. + if (!leftTbl.isAffinityColumn(cond.leftColumn()) || !rightTbl.isAffinityColumn(cond.rightColumn())) + return; + + // Remember join group of the right table as it will be changed below. + int rightGrpId = rightTbl.joinGroup(); + + PartitionJoinGroup leftGrp = grps.get(leftTbl.joinGroup()); + PartitionJoinGroup rightGrp = grps.get(rightGrpId); + + assert leftGrp != null; + assert rightGrp != null; + + // Groups are not compatible, return. + if (!leftGrp.affinityDescriptor().isCompatible(rightGrp.affinityDescriptor())) + return; + + // Safe to merge groups. + for (PartitionTable tbl : rightGrp.tables()) { + tbl.joinGroup(leftTbl.joinGroup()); + + leftGrp.addTable(tbl); + } + + grps.remove(rightGrpId); + } + + /** + * Get affinity descriptor for the group. + * + * @param grpId Group ID. + * @return Affinity descriptor or {@code null} if there is no affinity descriptor (e.g. for "NONE" result). + */ + @Nullable public PartitionTableAffinityDescriptor joinGroupAffinity(int grpId) { + if (grpId == GRP_NONE) + return null; + + PartitionJoinGroup grp = grps.get(grpId); + + assert grp != null; + + return grp.affinityDescriptor(); + } +} diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java index b69a011..fff12e3 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java @@ -62,7 +62,6 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.DEFAULT_COLUMNS_COUNT; -import static org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor.COL_NOT_EXISTS; /** * H2 Table implementation. @@ -101,8 +100,8 @@ public class GridH2Table extends TableBase { /** */ private final IndexColumn affKeyCol; - /** */ - private final int affKeyColId; + /** Whether affinity key column is the whole cache key. */ + private final boolean affKeyColIsKey; /** */ private final LongAdder size = new LongAdder(); @@ -122,6 +121,9 @@ public class GridH2Table extends TableBase { /** Flag remove index or not when table will be destroyed. */ private volatile boolean rmIndex; + /** Columns with thread-safe access. */ + private volatile Column[] safeColumns; + /** * Creates table. * @@ -141,36 +143,8 @@ public class GridH2Table extends TableBase { this.desc = desc; this.cacheInfo = cacheInfo; - if (!desc.type().customAffinityKeyMapper()) { - String affKeyFieldName = desc.type().affinityKey(); - - if (affKeyFieldName != null) { - if (doesColumnExist(affKeyFieldName)) { - int colId = getColumn(affKeyFieldName).getColumnId(); - - if (desc.isKeyColumn(colId)) { - affKeyCol = indexColumn(GridH2KeyValueRowOnheap.KEY_COL, SortOrder.ASCENDING); - affKeyColId = GridH2KeyValueRowOnheap.KEY_COL; - } - else { - affKeyCol = indexColumn(colId, SortOrder.ASCENDING); - affKeyColId = colId; - } - } - else { - affKeyCol = null; - affKeyColId = COL_NOT_EXISTS; - } - } - else { - affKeyCol = indexColumn(GridH2KeyValueRowOnheap.KEY_COL, SortOrder.ASCENDING); - affKeyColId = GridH2KeyValueRowOnheap.KEY_COL; - } - } - else { - affKeyCol = null; - affKeyColId = COL_NOT_EXISTS; - } + affKeyCol = calculateAffinityKeyColumn(); + affKeyColIsKey = affKeyCol != null && desc.isKeyColumn(affKeyCol.column.getColumnId()); this.rowFactory = rowFactory; @@ -210,6 +184,36 @@ public class GridH2Table extends TableBase { } /** + * Calculate affinity key column which will be used for partition pruning and distributed joins. + * + * @return Affinity column or {@code null} if none can be used. + */ + private IndexColumn calculateAffinityKeyColumn() { + // If custome affinity key mapper is set, we do not know how to convert _KEY to partition, return null. + if (desc.type().customAffinityKeyMapper()) + return null; + + String affKeyFieldName = desc.type().affinityKey(); + + // If explicit affinity key field is not set, then use _KEY. + if (affKeyFieldName == null) + return indexColumn(GridH2KeyValueRowOnheap.KEY_COL, SortOrder.ASCENDING); + + // If explicit affinity key field is set, but is not found in the table, do not use anything. + if (!doesColumnExist(affKeyFieldName)) + return null; + + int colId = getColumn(affKeyFieldName).getColumnId(); + + // If affinity key column is either _KEY or it's alias (QueryEntity.keyFieldName), normalize it to _KEY. + if (desc.isKeyColumn(colId)) + return indexColumn(GridH2KeyValueRowOnheap.KEY_COL, SortOrder.ASCENDING); + + // Otherwise use column as is. + return indexColumn(colId, SortOrder.ASCENDING); + } + + /** * @return {@code true} If this is a partitioned table. */ public boolean isPartitioned() { @@ -230,9 +234,65 @@ public class GridH2Table extends TableBase { * @return {@code True} if affinity key column. */ public boolean isColumnForPartitionPruning(Column col) { + return isColumnForPartitionPruning0(col, false); + } + + /** + * Check whether passed column could be used for partition transfer during partition pruning on joined tables and + * for external affinity calculation (e.g. on thin clients). + * <p> + * Note that it is different from {@link #isColumnForPartitionPruning(Column)} method in that not every column + * which qualifies for partition pruning can be used by thin clients or join partinion prunining logic. + * <p> + * Consider the following schema: + * <pre> + * CREATE TABLE dept (id PRIMARY KEY); + * CREATE TABLE emp (id, dept_id AFFINITY KEY, PRIMARY KEY(id, dept_id)); + * </pre> + * For expression-based partition pruning on "emp" table on the <b>server side</b> we may use both "_KEY" and + * "dept_id" columns, as passing them through standard affinity workflow will yield the same result: + * dept_id -> part + * _KEY -> dept_id -> part + * <p> + * But we cannot use "_KEY" on thin client side, as it doesn't know how to extract affinity key field properly. + * Neither we can perform partition transfer in JOINs when "_KEY" is used. + * <p> + * This is OK as data is collocated, so we can merge partitions extracted from both tables: + * <pre> + * SELECT * FROM dept d INNER JOIN emp e ON d.id = e.dept_id WHERE e.dept_id=? AND d.id=? + * </pre> + * But this is not OK as joined data is not collocated, and tables form distinct collocation groups: + * <pre> + * SELECT * FROM dept d INNER JOIN emp e ON d.id = e._KEY WHERE e.dept_id=? AND d.id=? + * </pre> + * NB: The last query is not logically correct and will produce empty result. However, it is correct from SQL + * perspective, so we should make incorrect assumptions about partitions as it may make situation even worse. + * + * @param col Column. + * @return {@code True} if column could be used for partition extraction on both server and client sides and for + * partition transfer in joins. + */ + public boolean isColumnForPartitionPruningStrict(Column col) { + return isColumnForPartitionPruning0(col, true); + } + + /** + * Internal logic to check whether column qualifies for partition extraction or not. + * + * @param col Column. + * @param strict Strict flag. + * @return {@code True} if column could be used for partition. + */ + private boolean isColumnForPartitionPruning0(Column col, boolean strict) { + if (affKeyCol == null) + return false; + int colId = col.getColumnId(); - return colId == affKeyColId || desc.isKeyColumn(colId); + if (colId == affKeyCol.column.getColumnId()) + return true; + + return (affKeyColIsKey || !strict) && desc.isKeyColumn(colId); } /** @@ -333,6 +393,7 @@ public class GridH2Table extends TableBase { * * @param exclusive Exclusive flag. */ + @SuppressWarnings({"LockAcquiredButNotSafelyReleased", "CallToThreadYield"}) private void lock(boolean exclusive) { Lock l = exclusive ? lock.writeLock() : lock.readLock(); @@ -980,12 +1041,14 @@ public class GridH2Table extends TableBase { lock(true); try { - int pos = columns.length; + Column[] safeColumns0 = safeColumns; + + int pos = safeColumns0.length; - Column[] newCols = new Column[columns.length + cols.size()]; + Column[] newCols = new Column[safeColumns0.length + cols.size()]; // First, let's copy existing columns to new array - System.arraycopy(columns, 0, newCols, 0, columns.length); + System.arraycopy(safeColumns0, 0, newCols, 0, safeColumns0.length); // And now, let's add new columns for (QueryField col : cols) { @@ -1026,13 +1089,16 @@ public class GridH2Table extends TableBase { * @param cols Columns. * @param ifExists If EXISTS flag. */ + @SuppressWarnings("ForLoopReplaceableByForEach") public void dropColumns(List<String> cols, boolean ifExists) { assert !ifExists || cols.size() == 1; lock(true); try { - int size = columns.length; + Column[] safeColumns0 = safeColumns; + + int size = safeColumns0.length; for (String name : cols) { if (!doesColumnExist(name)) { @@ -1052,8 +1118,8 @@ public class GridH2Table extends TableBase { int dst = 0; - for (int i = 0; i < columns.length; i++) { - Column column = columns[i]; + for (int i = 0; i < safeColumns0.length; i++) { + Column column = safeColumns0[i]; for (String name : cols) { if (F.eq(name, column.getName())) { @@ -1084,7 +1150,16 @@ public class GridH2Table extends TableBase { } /** {@inheritDoc} */ + @Override protected void setColumns(Column[] columns) { + this.safeColumns = columns; + + super.setColumns(columns); + } + + /** {@inheritDoc} */ @Override public Column[] getColumns() { + Column[] safeColumns0 = safeColumns; + Boolean insertHack = INSERT_HACK.get(); if (insertHack != null && insertHack) { @@ -1093,15 +1168,15 @@ public class GridH2Table extends TableBase { StackTraceElement elem = elems[2]; if (F.eq(elem.getClassName(), Insert.class.getName()) && F.eq(elem.getMethodName(), "prepare")) { - Column[] columns0 = new Column[columns.length - 3]; + Column[] columns0 = new Column[safeColumns0.length - 3]; - System.arraycopy(columns, 3, columns0, 0, columns0.length); + System.arraycopy(safeColumns0, 3, columns0, 0, columns0.length); return columns0; } } - return columns; + return safeColumns0; } /** diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java index fcb3424..818777c 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java @@ -129,6 +129,9 @@ public class GridSqlQuerySplitter { private boolean collocatedGrpBy; /** */ + private boolean distributedJoins; + + /** */ private IdentityHashMap<GridSqlAst, GridSqlAlias> uniqueFromAliases = new IdentityHashMap<>(); /** Partition extractor. */ @@ -137,13 +140,15 @@ public class GridSqlQuerySplitter { /** * @param params Query parameters. * @param collocatedGrpBy If it is a collocated GROUP BY query. - * @param idx Indexing. + * @param distributedJoins Distributed joins flag. + * @param extractor Partition extractor. */ - public GridSqlQuerySplitter(Object[] params, boolean collocatedGrpBy, IgniteH2Indexing idx) { + public GridSqlQuerySplitter(Object[] params, boolean collocatedGrpBy, boolean distributedJoins, + PartitionExtractor extractor) { this.params = params; this.collocatedGrpBy = collocatedGrpBy; - - extractor = new PartitionExtractor(idx); + this.distributedJoins = distributedJoins; + this.extractor = extractor; } /** @@ -207,7 +212,8 @@ public class GridSqlQuerySplitter { qry.explain(false); - GridSqlQuerySplitter splitter = new GridSqlQuerySplitter(params, collocatedGrpBy, h2); + GridSqlQuerySplitter splitter = new GridSqlQuerySplitter(params, collocatedGrpBy, distributedJoins, + h2.partitionExtractor()); // Normalization will generate unique aliases for all the table filters in FROM. // Also it will collect all tables and schemas from the query. @@ -262,7 +268,7 @@ public class GridSqlQuerySplitter { twoStepQry.distributedJoins(distributedJoins); // all map queries must have non-empty derivedPartitions to use this feature. - twoStepQry.derivedPartitions(splitter.extractor.merge(twoStepQry.mapQueries())); + twoStepQry.derivedPartitions(splitter.extractor.mergeMapQueries(twoStepQry.mapQueries())); twoStepQry.forUpdate(forUpdate); @@ -1549,7 +1555,7 @@ public class GridSqlQuerySplitter { map.partitioned(hasPartitionedTables(mapQry)); map.hasSubQueries(hasSubQueries); - if (map.isPartitioned()) + if (map.isPartitioned() && !distributedJoins) map.derivedPartitions(extractor.extract(mapQry)); mapSqlQrys.add(map); diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 62953ec..15788f2 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -393,9 +393,12 @@ public class GridReduceQueryExecutor { int timeoutMillis, GridQueryCancel cancel, Object[] params, - final int[] parts, + int[] parts, boolean lazy, MvccQueryTracker mvccTracker) { + if (qry.isLocal() && parts != null) + parts = null; + assert !qry.mvccEnabled() || mvccTracker != null; if (F.isEmpty(params)) diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/BetweenOperationExtractPartitionSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/BetweenOperationExtractPartitionSelfTest.java index fbdbfb0..f6e73bc 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/BetweenOperationExtractPartitionSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/BetweenOperationExtractPartitionSelfTest.java @@ -27,7 +27,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.QueryEntity; import org.apache.ignite.cache.query.FieldsQueryCursor; @@ -41,7 +40,6 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; -import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; import org.junit.runner.RunWith; @@ -481,22 +479,6 @@ public class BetweenOperationExtractPartitionSelfTest extends GridCommonAbstract } /** - * Check custom partitions limit exceeding. - */ - @Test - public void testBetweenPartitionsCustomLimitExceeding() { - try (GridTestUtils.SystemProperty ignored = new GridTestUtils. - SystemProperty(IgniteSystemProperties.IGNITE_SQL_MAX_EXTRACTED_PARTS_FROM_BETWEEN, "4")){ - - // Default limit (16) not exceeded. - testBetweenConstOperator(BETWEEN_QRY, 1, 4, 4); - - // Default limit (16) exceeded. - testBetweenConstOperator(BETWEEN_QRY, 1, 5, 5, EMPTY_PARTITIONS_ARRAY); - } - } - - /** * Check range expression with constant values. */ @Test diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/JoinPartitionPruningSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/JoinPartitionPruningSelfTest.java new file mode 100644 index 0000000..1429f3f --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/JoinPartitionPruningSelfTest.java @@ -0,0 +1,1303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteException; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.CacheKeyConfiguration; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +/** + * Tests for join partition pruning. + */ +@SuppressWarnings("deprecation") +@RunWith(JUnit4.class) +public class JoinPartitionPruningSelfTest extends GridCommonAbstractTest { + /** Number of intercepted requests. */ + private static final AtomicInteger INTERCEPTED_REQS = new AtomicInteger(); + + /** Parititions tracked during query execution. */ + private static final ConcurrentSkipListSet<Integer> INTERCEPTED_PARTS = new ConcurrentSkipListSet<>(); + + /** IP finder. */ + private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder().setShared(true); + + /** Client node name. */ + private static final String CLI_NAME = "cli"; + + /** Memory. */ + private static final String REGION_MEM = "mem"; + + /** Disk. */ + private static final String REGION_DISK = "disk"; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + cleanPersistenceDir(); + + startGrid(getConfiguration("srv1")); + startGrid(getConfiguration("srv2")); + startGrid(getConfiguration("srv3")); + + startGrid(getConfiguration(CLI_NAME).setClientMode(true)); + + client().cluster().active(true); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + clearIoState(); + + Ignite cli = client(); + + cli.destroyCaches(cli.cacheNames()); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String name) throws Exception { + IgniteConfiguration res = super.getConfiguration(name); + + res.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER)); + res.setCommunicationSpi(new TrackingTcpCommunicationSpi()); + + res.setLocalHost("127.0.0.1"); + + DataRegionConfiguration memRegion = + new DataRegionConfiguration().setName(REGION_MEM).setPersistenceEnabled(false); + + DataRegionConfiguration diskRegion = + new DataRegionConfiguration().setName(REGION_DISK).setPersistenceEnabled(true); + + res.setDataStorageConfiguration(new DataStorageConfiguration().setDataRegionConfigurations(diskRegion) + .setDefaultDataRegionConfiguration(memRegion)); + + return res; + } + + /** + * Test simple join. + */ + @Test + public void testSimpleJoin() { + createPartitionedTable("t1", + pkColumn("k1"), + "v2"); + + createPartitionedTable("t2", + pkColumn("k1"), + affinityColumn("ak2"), + "v3"); + + executeSingle("INSERT INTO t1 VALUES ('1', '1')"); + executeSingle("INSERT INTO t2 VALUES ('1', '1', '1')"); + + executeSingle("INSERT INTO t1 VALUES ('2', '2')"); + executeSingle("INSERT INTO t2 VALUES ('2', '2', '2')"); + + executeSingle("INSERT INTO t1 VALUES ('3', '3')"); + executeSingle("INSERT INTO t2 VALUES ('3', '3', '3')"); + + executeSingle("INSERT INTO t1 VALUES ('4', '4')"); + executeSingle("INSERT INTO t2 VALUES ('4', '4', '4')"); + + executeSingle("INSERT INTO t1 VALUES ('5', '5')"); + executeSingle("INSERT INTO t2 VALUES ('5', '5', '5')"); + + // Key (not alias). + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 = ?", + (res) -> { + assertPartitions( + partition("t1", "1") + ); + assertEquals(1, res.size()); + assertEquals("1", res.get(0).get(0)); + }, + "1" + ); + + // Key (alias). + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1._KEY = ?", + (res) -> { + assertPartitions( + partition("t1", "2") + ); + assertEquals(1, res.size()); + assertEquals("2", res.get(0).get(0)); + }, + "2" + ); + + // Non-affinity key. + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t2.k1 = ?", + (res) -> { + assertNoPartitions(); + assertEquals(1, res.size()); + assertEquals("3", res.get(0).get(0)); + }, + "3" + ); + + // Affinity key. + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t2.ak2 = ?", + (res) -> { + assertPartitions( + partition("t2", "4") + ); + assertEquals(1, res.size()); + assertEquals("4", res.get(0).get(0)); + }, + "4" + ); + + // Complex key. + BinaryObject key = client().binary().builder("t2_key").setField("k1", "5").setField("ak2", "5").build(); + + List<List<?>> res = executeSingle("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t2._KEY = ?", key); + assertPartitions( + partition("t2", "5") + ); + assertEquals(1, res.size()); + assertEquals("5", res.get(0).get(0)); + } + + /** + * Test how partition ownership is transferred in various cases. + */ + @Test + public void testPartitionTransfer() { + // First co-located table. + createPartitionedTable("t1", + pkColumn("k1"), + "v2" + ); + + // Second co-located table. + createPartitionedTable("t2", + pkColumn("k1"), + affinityColumn("ak2"), + "v3" + ); + + // Third co-located table. + createPartitionedTable("t3", + pkColumn("k1"), + affinityColumn("ak2"), + "v3", + "v4" + ); + + // Transfer through "AND". + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 = ? AND t2.ak2 = ?", + (res) -> assertPartitions( + partition("t1", "1") + ), + "1", "1" + ); + + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 = ? AND t2.ak2 = ?", + (res) -> assertNoRequests(), + "1", "2" + ); + + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 = ? AND t2.ak2 IN (?, ?)", + (res) -> assertPartitions( + partition("t1", "1") + ), + "1", "1", "2" + ); + + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 = ? AND t2.ak2 IN (?, ?)", + (res) -> assertNoRequests(), + "1", "2", "3" + ); + + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 IN (?, ?) AND t2.ak2 IN (?, ?)", + (res) -> assertPartitions( + partition("t1", "2") + ), + "1", "2", "2", "3" + ); + + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 IN (?, ?) AND t2.ak2 IN (?, ?)", + (res) -> assertNoRequests(), + "1", "2", "3", "4" + ); + + // Transfer through "OR". + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 = ? OR t2.ak2 = ?", + (res) -> assertPartitions( + partition("t1", "1") + ), + "1", "1" + ); + + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 = ? OR t2.ak2 = ?", + (res) -> assertPartitions( + partition("t1", "1"), + partition("t2", "2") + ), + "1", "2" + ); + + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 = ? OR t2.ak2 IN (?, ?)", + (res) -> assertPartitions( + partition("t1", "1"), + partition("t2", "2") + ), + "1", "1", "2" + ); + + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 = ? OR t2.ak2 IN (?, ?)", + (res) -> assertPartitions( + partition("t1", "1"), + partition("t2", "2"), + partition("t2", "3") + ), + "1", "2", "3" + ); + + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 IN (?, ?) OR t2.ak2 IN (?, ?)", + (res) -> assertPartitions( + partition("t1", "1"), + partition("t1", "2"), + partition("t2", "3") + ), + "1", "2", "2", "3" + ); + + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 IN (?, ?) OR t2.ak2 IN (?, ?)", + (res) -> assertPartitions( + partition("t1", "1"), + partition("t1", "2"), + partition("t2", "3"), + partition("t2", "4") + ), + "1", "2", "3", "4" + ); + + // Multi-way co-located JOIN. + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 INNER JOIN t3 ON t1.k1 = t3.ak2 " + + "WHERE t1.k1 = ? AND t2.ak2 = ? AND t3.ak2 = ?", + (res) -> assertPartitions( + partition("t1", "1") + ), + "1", "1", "1" + ); + + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 INNER JOIN t3 ON t1.k1 = t3.ak2 " + + "WHERE t1.k1 = ? AND t2.ak2 = ? AND t3.ak2 = ?", + (res) -> assertNoRequests(), + "1", "2", "3" + ); + + // No transfer through intermediate table. + execute("SELECT * FROM t1 INNER JOIN t3 ON t1.k1 = t3.v3 INNER JOIN t2 ON t3.v4 = t2.ak2 " + + "WHERE t1.k1 = ? AND t2.ak2 = ?", + (res) -> assertNoPartitions(), + "1", "1" + ); + + // No transfer through disjunction. + execute("SELECT * FROM t1 INNER JOIN t2 ON 1=1 WHERE t1.k1 = ? OR t1.k1 = t2.ak2", + (res) -> assertNoPartitions(), + "1" + ); + } + + /** + * Test cross-joins. They cannot "transfer" partitions between joined tables. + */ + @Test + public void testCrossJoin() { + createPartitionedTable("t1", + pkColumn("k1"), + "v2"); + + createPartitionedTable("t2", + pkColumn("k1"), + affinityColumn("ak2"), + "v3"); + + executeSingle("INSERT INTO t1 VALUES ('1', '1')"); + executeSingle("INSERT INTO t2 VALUES ('1', '1', '1')"); + + executeSingle("INSERT INTO t1 VALUES ('2', '2')"); + executeSingle("INSERT INTO t2 VALUES ('2', '2', '2')"); + + executeSingle("INSERT INTO t1 VALUES ('3', '3')"); + executeSingle("INSERT INTO t2 VALUES ('3', '3', '3')"); + + // Left table, should work. + execute("SELECT * FROM t1, t2 WHERE t1.k1 = ?", + (res) -> { + assertPartitions( + partition("t1", "1") + ); + assertEquals(1, res.size()); + assertEquals("1", res.get(0).get(0)); + }, + "1" + ); + + execute("SELECT * FROM t1 INNER JOIN t2 ON 1=1 WHERE t1.k1 = ?", + (res) -> { + assertPartitions( + partition("t1", "1") + ); + assertEquals(1, res.size()); + assertEquals("1", res.get(0).get(0)); + }, + "1" + ); + + // Right table, should work. + execute("SELECT * FROM t1, t2 WHERE t2.ak2 = ?", + (res) -> { + assertPartitions( + partition("t2", "2") + ); + assertEquals(1, res.size()); + assertEquals("2", res.get(0).get(0)); + }, + "2" + ); + + execute("SELECT * FROM t1 INNER JOIN t2 ON 1=1 WHERE t2.ak2 = ?", + (res) -> { + assertPartitions( + partition("t2", "2") + ); + assertEquals(1, res.size()); + assertEquals("2", res.get(0).get(0)); + }, + "2" + ); + + execute("SELECT * FROM t1, t2 WHERE t1.k1=? AND t2.ak2 = ?", + (res) -> assertNoPartitions(), + "3", "3" + ); + + // Two tables, should not work. + execute("SELECT * FROM t1, t2 WHERE t1.k1=? AND t2.ak2 = ?", + (res) -> assertNoPartitions(), + "3", "3" + ); + + execute("SELECT * FROM t1 INNER JOIN t2 ON 1=1 WHERE t1.k1=? AND t2.ak2 = ?", + (res) -> assertNoPartitions(), + "3", "3" + ); + } + + /** + * Test non-equijoins. + */ + @Test + public void testThetaJoin() { + createPartitionedTable("t1", + pkColumn("k1"), + "v2"); + + createPartitionedTable("t2", + pkColumn("k1"), + affinityColumn("ak2"), + "v3"); + + // Greater than. + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 > t2.ak2 WHERE t1.k1 = ?", + (res) -> assertPartitions( + partition("t1", "1") + ), + "1" + ); + + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 > t2.ak2 WHERE t2.ak2 = ?", + (res) -> assertPartitions( + partition("t1", "1") + ), + "1" + ); + + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 > t2.ak2 WHERE t1.k1 = ? AND t2.ak2 = ?", + (res) -> assertNoPartitions(), + "1", "1" + ); + + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 > t2.ak2 WHERE t1.k1 = ? OR t2.ak2 = ?", + (res) -> assertNoPartitions(), + "1", "2" + ); + + // Less than. + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 < t2.ak2 WHERE t1.k1 = ?", + (res) -> assertPartitions( + partition("t1", "1") + ), + "1" + ); + + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 < t2.ak2 WHERE t2.ak2 = ?", + (res) -> assertPartitions( + partition("t1", "1") + ), + "1" + ); + + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 < t2.ak2 WHERE t1.k1 = ? AND t2.ak2 = ?", + (res) -> assertNoPartitions(), + "1", "1" + ); + + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 < t2.ak2 WHERE t1.k1 = ? OR t2.ak2 = ?", + (res) -> assertNoPartitions(), + "1", "2" + ); + + // Non-equal. + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 <> t2.ak2 WHERE t1.k1 = ?", + (res) -> assertPartitions( + partition("t1", "1") + ), + "1" + ); + + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 <> t2.ak2 WHERE t2.ak2 = ?", + (res) -> assertPartitions( + partition("t1", "1") + ), + "1" + ); + + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 <> t2.ak2 WHERE t1.k1 = ? AND t2.ak2 = ?", + (res) -> assertNoPartitions(), + "1", "1" + ); + + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 <> t2.ak2 WHERE t1.k1 = ? OR t2.ak2 = ?", + (res) -> assertNoPartitions(), + "1", "2" + ); + } + + /** + * Test joins with REPLICATED cache. + */ + @Test + public void testJoinWithReplicated() { + // First co-located table. + createPartitionedTable("t1", + pkColumn("k1"), + "v2" + ); + + // Replicated table. + createReplicatedTable("t2", + pkColumn("k1"), + "v2", + "v3" + ); + + // Only partition from PARTITIONED cache should be used. + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.k1 WHERE t1.k1 = ? AND t2.k1 = ?", + (res) -> assertPartitions( + partition("t1", "1") + ), + "1", "2" + ); + + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.k1 WHERE t1.k1 IN (?, ?) AND t2.k1 = ?", + (res) -> assertPartitions( + partition("t1", "1"), + partition("t1", "2") + ), + "1", "2", "3" + ); + + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.k1 WHERE t1.k1 = ? OR t2.k1 = ?", + (res) -> assertNoPartitions(), + "1", "2" + ); + + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.k1 WHERE t2.k1 = ?", + (res) -> assertNoPartitions(), + "1" + ); + } + + /** + * Test joins with different affinity functions. + */ + @Test + public void testJoinWithDifferentAffinityFunctions() { + // Partition count. + checkAffinityFunctions( + cacheConfiguration(256, 1, false, false, false), + cacheConfiguration(256, 1, false, false, false), + true + ); + + checkAffinityFunctions( + cacheConfiguration(1024, 1, false, false, false), + cacheConfiguration(256, 1, false, false, false), + false + ); + + checkAffinityFunctions( + cacheConfiguration(256, 1, false, false, false), + cacheConfiguration(1024, 1, false, false, false), + false + ); + + // Backups. + checkAffinityFunctions( + cacheConfiguration(256, 1, false, false, false), + cacheConfiguration(256, 2, false, false, false), + true + ); + + // Different affinity functions. + checkAffinityFunctions( + cacheConfiguration(256, 2, true, false, false), + cacheConfiguration(256, 2, false, false, false), + false + ); + + checkAffinityFunctions( + cacheConfiguration(256, 2, false, false, false), + cacheConfiguration(256, 2, true, false, false), + false + ); + + checkAffinityFunctions( + cacheConfiguration(256, 2, true, false, false), + cacheConfiguration(256, 2, true, false, false), + false + ); + + // Node filters. + checkAffinityFunctions( + cacheConfiguration(256, 2, false, true, false), + cacheConfiguration(256, 2, false, false, false), + false + ); + + checkAffinityFunctions( + cacheConfiguration(256, 2, false, false, false), + cacheConfiguration(256, 2, false, true, false), + false + ); + + checkAffinityFunctions( + cacheConfiguration(256, 2, false, true, false), + cacheConfiguration(256, 2, false, true, false), + false + ); + + // With and without persistence. + checkAffinityFunctions( + cacheConfiguration(256, 2, false, false, true), + cacheConfiguration(256, 2, false, false, false), + false + ); + + checkAffinityFunctions( + cacheConfiguration(256, 2, false, false, false), + cacheConfiguration(256, 2, false, false, true), + false + ); + + checkAffinityFunctions( + cacheConfiguration(256, 2, false, false, true), + cacheConfiguration(256, 2, false, false, true), + true + ); + } + + @SuppressWarnings("unchecked") + private void checkAffinityFunctions(CacheConfiguration ccfg1, CacheConfiguration ccfg2, boolean compatible) { + // Destroy old caches. + Ignite cli = client(); + + cli.destroyCaches(cli.cacheNames()); + + // Start new caches. + ccfg1.setName("t1"); + ccfg2.setName("t2"); + + QueryEntity entity1 = new QueryEntity(KeyClass1.class, ValueClass.class).setTableName("t1"); + QueryEntity entity2 = new QueryEntity(KeyClass2.class, ValueClass.class).setTableName("t2"); + + ccfg1.setQueryEntities(Collections.singletonList(entity1)); + ccfg2.setQueryEntities(Collections.singletonList(entity2)); + + ccfg1.setKeyConfiguration(new CacheKeyConfiguration(entity1.getKeyType(), "k1")); + ccfg2.setKeyConfiguration(new CacheKeyConfiguration(entity2.getKeyType(), "ak2")); + + ccfg1.setSqlSchema(QueryUtils.DFLT_SCHEMA); + ccfg2.setSqlSchema(QueryUtils.DFLT_SCHEMA); + + client().createCache(ccfg1); + client().createCache(ccfg2); + + // Conduct tests. + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 = ?", + (res) -> assertPartitions( + partition("t1", "1") + ), + "1" + ); + + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t2.ak2 = ?", + (res) -> assertPartitions( + partition("t2", "2") + ), + "2" + ); + + if (compatible) { + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 = ? OR t2.ak2 = ?", + (res) -> assertPartitions( + partition("t1", "1"), + partition("t2", "2") + ), + "1", "2" + ); + } + else { + execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 = ? OR t2.ak2 = ?", + (res) -> assertNoPartitions(), + "1", "2" + ); + } + } + + /** + * Create custom cache configuration. + * + * @param parts Partitions. + * @param backups Backups. + * @param customAffinity Custom affinity function flag. + * @param nodeFilter Whether to set node filter. + * @param persistent Whether to enable persistence. + * @return Cache configuration. + */ + private static CacheConfiguration cacheConfiguration( + int parts, + int backups, + boolean customAffinity, + boolean nodeFilter, + boolean persistent + ) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(CacheMode.PARTITIONED); + ccfg.setBackups(backups); + + RendezvousAffinityFunction affFunc; + + if (customAffinity) + affFunc = new CustomRendezvousAffinityFunction(); + else + affFunc = new RendezvousAffinityFunction(); + + affFunc.setPartitions(parts); + + ccfg.setAffinity(affFunc); + + if (nodeFilter) + ccfg.setNodeFilter(new CustomNodeFilter()); + + if (persistent) + ccfg.setDataRegionName(REGION_DISK); + + return ccfg; + } + + /** + * Test joins with subqueries. + */ + @Test + public void testJoinWithSubquery() { + createPartitionedTable("t1", + pkColumn("k1"), + "v2"); + + createPartitionedTable("t2", + pkColumn("k1"), + affinityColumn("ak2"), + "v3"); + + execute("SELECT * FROM t1 INNER JOIN (SELECT * FROM t2) T2_SUB ON t1.k1 = T2_SUB.ak2 WHERE t1.k1 = ?", + (res) -> assertPartitions( + partition("t1", "1") + ), + "1" + ); + + execute("SELECT * FROM t1 INNER JOIN (SELECT * FROM t2) T2_SUB ON t1.k1 = T2_SUB.ak2 WHERE T2_SUB.ak2 = ?", + (res) -> assertNoPartitions(), + "1" + ); + } + + /** + * Test joins when explicit partitions are set. + */ + @Test + public void testExplicitPartitions() { + createPartitionedTable("t1", + pkColumn("k1"), + "v2"); + + createPartitionedTable("t2", + pkColumn("k1"), + affinityColumn("ak2"), + "v3"); + + executeSqlFieldsQuery(new SqlFieldsQuery("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 " + + "WHERE t1.k1=? OR t2.ak2=?").setArgs("1", "2").setPartitions(1)); + + assertPartitions(1); + } + + /** + * Test outer joins. + */ + @Test + public void testOuterJoin() { + createPartitionedTable("t1", + pkColumn("k1"), + "v2"); + + createPartitionedTable("t2", + pkColumn("k1"), + affinityColumn("ak2"), + "v3"); + + execute("SELECT * FROM t1 LEFT OUTER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 = ?", + (res) -> assertPartitions( + partition("t1", "1") + ), + "1" + ); + + execute("SELECT * FROM t1 LEFT OUTER JOIN t2 ON t1.k1 = t2.ak2 WHERE t2.ak2 = ?", + (res) -> assertNoPartitions(), + "1" + ); + + execute("SELECT * FROM t1 LEFT OUTER JOIN t2 T2_1 ON t1.k1 = T2_1.ak2 INNER JOIN t2 T2_2 ON T2_1.k1 = T2_2.k1 " + + "WHERE T2_2.ak2 = ?", + (res) -> assertPartitions( + partition("t2", "1") + ), + "1" + ); + + execute("SELECT * FROM t1 LEFT OUTER JOIN t2 T2_1 ON t1.k1 = T2_1.ak2 INNER JOIN t2 T2_2 ON t1.k1 = T2_2.ak2 " + + "WHERE T2_1.ak2 = ? AND T2_2.ak2=?", + (res) -> assertPartitions( + partition("t2", "2") + ), + "1", "2" + ); + } + + /** + * Test JOINs on a single table. + */ + @Test + public void testSelfJoin() { + createPartitionedTable("t1", + pkColumn("k1"), + "v2"); + + execute("SELECT * FROM t1 A INNER JOIN t1 B ON A.k1 = B.k1 WHERE A.k1 = ?", + (res) -> assertPartitions( + partition("t1", "1") + ), + "1" + ); + + execute("SELECT * FROM t1 A INNER JOIN t1 B ON A.k1 = B.k1 WHERE A.k1 = ? AND B.k1 = ?", + (res) -> assertPartitions( + partition("t1", "1") + ), + "1", "1" + ); + + execute("SELECT * FROM t1 A INNER JOIN t1 B ON A.k1 = B.k1 WHERE A.k1 = ? AND B.k1 = ?", + (res) -> assertNoRequests(), + "1", "2" + ); + + execute("SELECT * FROM t1 A INNER JOIN t1 B ON A.k1 = B.k1 WHERE A.k1 = ? OR B.k1 = ?", + (res) -> assertPartitions( + partition("t1", "1"), + partition("t1", "2") + ), + "1", "2" + ); + } + + /** + * Create PARTITIONED table. + * + * @param name Name. + * @param cols Columns. + */ + private void createPartitionedTable(String name, Object... cols) { + createTable0(name, false, cols); + } + + /** + * Create REPLICATED table. + * + * @param name Name. + * @param cols Columns. + */ + @SuppressWarnings("SameParameterValue") + private void createReplicatedTable(String name, Object... cols) { + createTable0(name, true, cols); + } + + /** + * Internal CREATE TABLE routine. + * + * @param name Name. + * @param replicated Replicated table flag. + * @param cols Columns. + */ + @SuppressWarnings("StringConcatenationInsideStringBufferAppend") + private void createTable0(String name, boolean replicated, Object... cols) { + List<String> pkCols = new ArrayList<>(); + + String affCol = null; + + StringBuilder sql = new StringBuilder("CREATE TABLE ").append(name).append("("); + for (Object col : cols) { + Column col0 = col instanceof Column ? (Column)col : new Column((String)col, false, false); + + sql.append(col0.name()).append(" VARCHAR, "); + + if (col0.pk()) + pkCols.add(col0.name()); + + if (col0.affinity()) { + if (affCol != null) + throw new IllegalStateException("Only one affinity column is allowed: " + col0.name()); + + affCol = col0.name(); + } + } + + if (pkCols.isEmpty()) + throw new IllegalStateException("No PKs!"); + + sql.append("PRIMARY KEY ("); + + boolean firstPkCol = true; + + for (String pkCol : pkCols) { + if (firstPkCol) + firstPkCol = false; + else + sql.append(", "); + + sql.append(pkCol); + } + + sql.append(")"); + + sql.append(") WITH \"template=" + (replicated ? "replicated" : "partitioned")); + sql.append(", CACHE_NAME=" + name); + + if (affCol != null) { + sql.append(", AFFINITY_KEY=" + affCol); + sql.append(", KEY_TYPE=" + name + "_key"); + } + + sql.append("\""); + + executeSingle(sql.toString()); + } + + /** + * Execute query with all possible combinations of argument placeholders. + * + * @param sql SQL. + * @param resConsumer Result consumer. + * @param args Arguments. + */ + public void execute(String sql, Consumer<List<List<?>>> resConsumer, Object... args) { + System.out.println(">>> TEST COMBINATION: " + sql); + + // Execute query as is. + List<List<?>> res = executeSingle(sql, args); + + resConsumer.accept(res); + + // Start filling arguments recursively. + if (args != null && args.length > 0) + executeCombinations0(sql, resConsumer, new HashSet<>(), args); + + System.out.println(); + } + + /** + * Execute query with all possible combinations of argument placeholders. + * + * @param sql SQL. + * @param resConsumer Result consumer. + * @param executedSqls Already executed SQLs. + * @param args Arguments. + */ + public void executeCombinations0( + String sql, + Consumer<List<List<?>>> resConsumer, + Set<String> executedSqls, + Object... args + ) { + assert args != null && args.length > 0; + + // Get argument positions. + List<Integer> paramPoss = new ArrayList<>(); + + int pos = 0; + + while (true) { + int paramPos = sql.indexOf('?', pos); + + if (paramPos == -1) + break; + + paramPoss.add(paramPos); + + pos = paramPos + 1; + } + + for (int i = 0; i < args.length; i++) { + // Prepare new SQL and arguments. + int paramPos = paramPoss.get(i); + + String newSql = sql.substring(0, paramPos) + args[i] + sql.substring(paramPos + 1); + + Object[] newArgs = new Object[args.length - 1]; + + int newArgsPos = 0; + + for (int j = 0; j < args.length; j++) { + if (j != i) + newArgs[newArgsPos++] = args[j]; + } + + // Execute if this combination was never executed before. + if (executedSqls.add(newSql)) { + List<List<?>> res = executeSingle(newSql, newArgs); + + resConsumer.accept(res); + } + + // Continue recursively. + if (newArgs.length > 0) + executeCombinations0(newSql, resConsumer, executedSqls, newArgs); + } + } + + /** + * Execute SQL query. + * + * @param sql SQL. + */ + private List<List<?>> executeSingle(String sql, Object... args) { + clearIoState(); + + if (args == null || args.length == 0) + System.out.println(">>> " + sql); + else + System.out.println(">>> " + sql + " " + Arrays.toString(args)); + + SqlFieldsQuery qry = new SqlFieldsQuery(sql); + + if (args != null && args.length > 0) + qry.setArgs(args); + + return executeSqlFieldsQuery(qry); + } + + /** + * Execute prepared SQL fields query. + * + * @param qry Query. + * @return Result. + */ + private List<List<?>> executeSqlFieldsQuery(SqlFieldsQuery qry) { + return client().context().query().querySqlFields(qry, false).getAll(); + } + + /** + * @return Client node. + */ + private IgniteEx client() { + return grid(CLI_NAME); + } + + /** + * Clear partitions. + */ + private static void clearIoState() { + INTERCEPTED_REQS.set(0); + INTERCEPTED_PARTS.clear(); + } + + /** + * Make sure that expected partitions are logged. + * + * @param expParts Expected partitions. + */ + private static void assertPartitions(int... expParts) { + Collection<Integer> expParts0 = new TreeSet<>(); + + for (int expPart : expParts) + expParts0.add(expPart); + + assertPartitions(expParts0); + } + + /** + * Make sure that expected partitions are logged. + * + * @param expParts Expected partitions. + */ + private static void assertPartitions(Collection<Integer> expParts) { + TreeSet<Integer> expParts0 = new TreeSet<>(expParts); + TreeSet<Integer> actualParts = new TreeSet<>(INTERCEPTED_PARTS); + + assertEquals("Unexpected partitions [exp=" + expParts + ", actual=" + actualParts + ']', + expParts0, actualParts); + } + + /** + * Make sure that no partitions were extracted. + */ + private static void assertNoPartitions() { + assertTrue("No requests were sent.", INTERCEPTED_REQS.get() > 0); + assertTrue("Partitions are not empty: " + INTERCEPTED_PARTS, INTERCEPTED_PARTS.isEmpty()); + } + + /** + * Make sure there were no requests sent because we determined empty partition set. + */ + private static void assertNoRequests() { + assertEquals("Requests were sent: " + INTERCEPTED_REQS.get(), 0, INTERCEPTED_REQS.get()); + } + + /** + * @param cacheName Cache name. + * @param key Key. + * @return Partition. + */ + private int partition(String cacheName, Object key) { + return client().affinity(cacheName).partition(key); + } + + /** + * TCP communication SPI which will track outgoing query requests. + */ + private static class TrackingTcpCommunicationSpi extends TcpCommunicationSpi { + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) { + if (msg instanceof GridIoMessage) { + GridIoMessage msg0 = (GridIoMessage)msg; + + if (msg0.message() instanceof GridH2QueryRequest) { + INTERCEPTED_REQS.incrementAndGet(); + + GridH2QueryRequest req = (GridH2QueryRequest)msg0.message(); + + int[] parts = req.queryPartitions(); + + if (!F.isEmpty(parts)) { + for (int part : parts) + INTERCEPTED_PARTS.add(part); + } + } + } + + super.sendMessage(node, msg, ackC); + } + } + + /** + * @param name Name. + * @return PK column. + */ + public Column pkColumn(String name) { + return new Column(name, true, false); + } + + /** + * @param name Name. + * @return Affintiy column. + */ + public Column affinityColumn(String name) { + return new Column(name, true, true); + } + + /** + * Column. + */ + private static class Column { + /** Name. */ + private final String name; + + /** PK. */ + private final boolean pk; + + /** Affinity key. */ + private final boolean aff; + + /** + * Constructor. + * + * @param name Name. + * @param pk PK flag. + * @param aff Affinity flag. + */ + public Column(String name, boolean pk, boolean aff) { + this.name = name; + this.pk = pk; + this.aff = aff; + } + + /** + * @return Name. + */ + public String name() { + return name; + } + + /** + * @return PK flag. + */ + public boolean pk() { + return pk; + } + + /** + * @return Affintiy flag. + */ + public boolean affinity() { + return aff; + } + } + + /** + * Custom affinity function. + */ + private static class CustomRendezvousAffinityFunction extends RendezvousAffinityFunction { + // No-op. + } + + /** + * Custom node filter. + */ + private static class CustomNodeFilter implements IgnitePredicate<ClusterNode> { + @Override public boolean apply(ClusterNode clusterNode) { + return true; + } + } + + /** + * Key class 1. + */ + @SuppressWarnings("unused") + private static class KeyClass1 { + /** Key. */ + @QuerySqlField + private String k1; + } + + /** + * Key class 2. + */ + @SuppressWarnings("unused") + private static class KeyClass2 { + /** Key. */ + @QuerySqlField + private String k1; + + /** Affinity key. */ + @QuerySqlField + private String ak2; + } + + /** + * Value class. + */ + @SuppressWarnings("unused") + private static class ValueClass { + /** Value. */ + @QuerySqlField + private String v; + } +} diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java index ce8f6cc..5ffd7fa 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java @@ -215,6 +215,7 @@ import org.apache.ignite.internal.processors.query.h2.sql.H2CompareBigQueryTest; import org.apache.ignite.internal.processors.query.h2.twostep.AndOperationExtractPartitionSelfTest; import org.apache.ignite.internal.processors.query.h2.twostep.BetweenOperationExtractPartitionSelfTest; import org.apache.ignite.internal.processors.query.h2.twostep.InOperationExtractPartitionSelfTest; +import org.apache.ignite.internal.processors.query.h2.twostep.JoinPartitionPruningSelfTest; import org.apache.ignite.internal.processors.sql.IgniteCachePartitionedAtomicColumnConstraintsTest; import org.apache.ignite.internal.processors.sql.IgniteCachePartitionedTransactionalColumnConstraintsTest; import org.apache.ignite.internal.processors.sql.IgniteCachePartitionedTransactionalSnapshotColumnConstraintTest; @@ -527,6 +528,7 @@ import org.junit.runners.Suite; InOperationExtractPartitionSelfTest.class, AndOperationExtractPartitionSelfTest.class, BetweenOperationExtractPartitionSelfTest.class, + JoinPartitionPruningSelfTest.class, GridCacheDynamicLoadOnClientTest.class, GridCacheDynamicLoadOnClientPersistentTest.class,