http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperationType.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperationType.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperationType.java index 38e3839..8d31651 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperationType.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperationType.java @@ -119,7 +119,7 @@ public enum GridSqlOperationType { /** {@inheritDoc} */ @Override public String getSql(GridSqlOperation operation) { - assert operation.opType().childrenCnt == 2; + assert operation.operationType().childrenCnt == 2; return '(' + operation.child(0).getSQL() + " " + delim + " " + operation.child(1).getSQL() + ')'; } @@ -132,7 +132,7 @@ public enum GridSqlOperationType { /** {@inheritDoc} */ @Override public String getSql(GridSqlOperation operation) { - assert operation.opType().childrenCnt == 2; + assert operation.operationType().childrenCnt == 2; return "(INTERSECTS(" + operation.child(0).getSQL() + ", " + operation.child(1).getSQL() + "))"; } @@ -154,7 +154,7 @@ public enum GridSqlOperationType { /** {@inheritDoc} */ @Override public String getSql(GridSqlOperation operation) { - assert operation.opType().childrenCnt == 1; + assert operation.operationType().childrenCnt == 1; return '(' + text + ' ' + operation.child().getSQL() + ')'; } @@ -176,7 +176,7 @@ public enum GridSqlOperationType { /** {@inheritDoc} */ @Override public String getSql(GridSqlOperation operation) { - assert operation.opType().childrenCnt == 1; + assert operation.operationType().childrenCnt == 1; return '(' + operation.child().getSQL() + ' ' + text + ')'; }
http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java index 7001717..a7451c1 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java @@ -21,8 +21,10 @@ import java.lang.reflect.Field; import java.util.ArrayList; import java.util.IdentityHashMap; import java.util.List; +import javax.cache.CacheException; import org.apache.ignite.IgniteException; import org.h2.command.Command; +import org.h2.command.CommandContainer; import org.h2.command.Prepared; import org.h2.command.dml.Explain; import org.h2.command.dml.Query; @@ -48,6 +50,8 @@ import org.h2.expression.Parameter; import org.h2.expression.Subquery; import org.h2.expression.TableFunction; import org.h2.expression.ValueExpression; +import org.h2.index.Index; +import org.h2.index.ViewIndex; import org.h2.jdbc.JdbcPreparedStatement; import org.h2.result.SortOrder; import org.h2.table.Column; @@ -91,14 +95,16 @@ import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlType.fro @SuppressWarnings("TypeMayBeWeakened") public class GridSqlQueryParser { /** */ - private static final GridSqlOperationType[] OPERATION_OP_TYPES = new GridSqlOperationType[] {CONCAT, PLUS, MINUS, MULTIPLY, DIVIDE, null, MODULUS}; + private static final GridSqlOperationType[] OPERATION_OP_TYPES = + {CONCAT, PLUS, MINUS, MULTIPLY, DIVIDE, null, MODULUS}; /** */ - private static final GridSqlOperationType[] COMPARISON_TYPES = new GridSqlOperationType[] { - EQUAL, BIGGER_EQUAL, BIGGER, SMALLER_EQUAL, + private static final GridSqlOperationType[] COMPARISON_TYPES = + {EQUAL, BIGGER_EQUAL, BIGGER, SMALLER_EQUAL, SMALLER, NOT_EQUAL, IS_NULL, IS_NOT_NULL, - null, null, null, SPATIAL_INTERSECTS /* 11 */, null, null, null, null, EQUAL_NULL_SAFE /* 16 */, null, null, null, null, - NOT_EQUAL_NULL_SAFE /* 21 */}; + null, null, null, SPATIAL_INTERSECTS /* 11 */, + null, null, null, null, EQUAL_NULL_SAFE /* 16 */, + null, null, null, null, NOT_EQUAL_NULL_SAFE /* 21 */}; /** */ private static final Getter<Select, Expression> CONDITION = getter(Select.class, "condition"); @@ -134,7 +140,7 @@ public class GridSqlQueryParser { private static final Getter<ConditionAndOr, Expression> ANDOR_RIGHT = getter(ConditionAndOr.class, "right"); /** */ - private static final Getter<TableView, Query> VIEW_QUERY = getter(TableView.class, "viewQuery"); + public static final Getter<TableView, Query> VIEW_QUERY = getter(TableView.class, "viewQuery"); /** */ private static final Getter<TableFilter, String> ALIAS = getter(TableFilter.class, "alias"); @@ -218,7 +224,8 @@ public class GridSqlQueryParser { private static final Getter<Explain, Prepared> EXPLAIN_COMMAND = getter(Explain.class, "command"); /** */ - private static volatile Getter<Command, Prepared> prepared; + private static final Getter<Command, Prepared> PREPARED = + GridSqlQueryParser.<Command, Prepared>getter(CommandContainer.class, "prepared"); /** */ private final IdentityHashMap<Object, Object> h2ObjToGridObj = new IdentityHashMap<>(); @@ -227,22 +234,12 @@ public class GridSqlQueryParser { * @param stmt Prepared statement. * @return Parsed select. */ - public static GridSqlQuery parse(JdbcPreparedStatement stmt) { + public static Prepared prepared(JdbcPreparedStatement stmt) { Command cmd = COMMAND.get(stmt); - Getter<Command, Prepared> p = prepared; + assert cmd instanceof CommandContainer; - if (p == null) { - Class<? extends Command> cls = cmd.getClass(); - - assert "CommandContainer".equals(cls.getSimpleName()); - - prepared = p = getter(cls, "prepared"); - } - - Prepared statement = p.get(cmd); - - return new GridSqlQueryParser().parse(statement); + return PREPARED.get(cmd); } /** @@ -255,11 +252,15 @@ public class GridSqlQueryParser { Table tbl = filter.getTable(); if (tbl instanceof TableBase) - res = new GridSqlTable(tbl.getSchema().getName(), tbl.getName()); + res = new GridSqlTable(tbl); else if (tbl instanceof TableView) { Query qry = VIEW_QUERY.get((TableView)tbl); - res = new GridSqlSubquery(parse(qry)); + Index idx = filter.getIndex(); + + Query idxQry = idx instanceof ViewIndex ? ((ViewIndex)idx).getQuery() : null; + + res = new GridSqlSubquery(parse(qry, idxQry)); } else if (tbl instanceof FunctionTable) res = parseExpression(FUNC_EXPR.get((FunctionTable)tbl), false); @@ -286,7 +287,7 @@ public class GridSqlQueryParser { /** * @param select Select. */ - public GridSqlSelect parse(Select select) { + public GridSqlSelect parse(Select select, @Nullable Query idxQry) { GridSqlSelect res = (GridSqlSelect)h2ObjToGridObj.get(select); if (res != null) @@ -305,6 +306,9 @@ public class GridSqlQueryParser { TableFilter filter = select.getTopTableFilter(); + if (idxQry instanceof Select) + filter = ((Select)idxQry).getTopTableFilter(); + do { assert0(filter != null, select); assert0(filter.getNestedJoin() == null, select); @@ -366,11 +370,35 @@ public class GridSqlQueryParser { } /** - * @param qry Select. + * @param qry Prepared. + * @return Query. + */ + public static Query query(Prepared qry) { + if (qry instanceof Query) + return (Query)qry; + + if (qry instanceof Explain) + return query(EXPLAIN_COMMAND.get((Explain)qry)); + + throw new CacheException("Unsupported query: " + qry); + } + + /** + * @param qry Prepared. + * @return Query. */ public GridSqlQuery parse(Prepared qry) { + return parse(qry, null); + } + + /** + * @param qry Select. + */ + public GridSqlQuery parse(Prepared qry, @Nullable Query idxQry) { + assert qry != null; + if (qry instanceof Select) - return parse((Select)qry); + return parse((Select)qry, idxQry); if (qry instanceof SelectUnion) return parse((SelectUnion)qry); @@ -378,7 +406,7 @@ public class GridSqlQueryParser { if (qry instanceof Explain) return parse(EXPLAIN_COMMAND.get((Explain)qry)).explain(true); - throw new UnsupportedOperationException("Unknown query type: " + qry); + throw new CacheException("Unsupported query: " + qry); } /** @@ -437,11 +465,12 @@ public class GridSqlQueryParser { */ private GridSqlElement parseExpression0(Expression expression, boolean calcTypes) { if (expression instanceof ExpressionColumn) { - TableFilter tblFilter = ((ExpressionColumn)expression).getTableFilter(); - - GridSqlElement gridTblFilter = parseTable(tblFilter); + ExpressionColumn expCol = (ExpressionColumn)expression; - return new GridSqlColumn(gridTblFilter, expression.getColumnName(), expression.getSQL()); + return new GridSqlColumn(expCol.getColumn(), + parseTable(expCol.getTableFilter()), + expression.getColumnName(), + expression.getSQL()); } if (expression instanceof Alias) @@ -475,12 +504,14 @@ public class GridSqlQueryParser { assert opType != null : COMPARISON_TYPE.get(cmp); - GridSqlElement left = parseExpression(COMPARISON_LEFT.get(cmp), calcTypes); + Expression leftExp = COMPARISON_LEFT.get(cmp); + GridSqlElement left = parseExpression(leftExp, calcTypes); if (opType.childrenCount() == 1) return new GridSqlOperation(opType, left); - GridSqlElement right = parseExpression(COMPARISON_RIGHT.get(cmp), calcTypes); + Expression rightExp = COMPARISON_RIGHT.get(cmp); + GridSqlElement right = parseExpression(rightExp, calcTypes); return new GridSqlOperation(opType, left, right); } @@ -685,7 +716,7 @@ public class GridSqlQueryParser { * Field getter. */ @SuppressWarnings("unchecked") - private static class Getter<T, R> { + public static class Getter<T, R> { /** */ private final Field fld; http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java index 4d34ce8..7205a18 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java @@ -23,18 +23,27 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Set; import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; -import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; +import org.apache.ignite.lang.IgnitePredicate; +import org.h2.command.Prepared; import org.h2.jdbc.JdbcPreparedStatement; +import org.h2.table.Column; +import org.h2.table.IndexColumn; import org.h2.util.IntArray; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.internal.processors.query.h2.opt.GridH2CollocationModel.isCollocated; import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType.AVG; import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType.CAST; import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType.COUNT; import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType.SUM; import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlPlaceholder.EMPTY; +import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser.prepared; +import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser.query; +import static org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor.toArray; /** * Splits a single SQL query into two step map-reduce query. @@ -136,26 +145,146 @@ public class GridSqlQuerySplitter { /** * @param stmt Prepared statement. * @param params Parameters. - * @param collocated Collocated query. - * @param igniteH2Indexing Indexing implementation. + * @param collocatedGrpBy Whether the query has collocated GROUP BY keys. + * @param distributedJoins If distributed joins enabled. * @return Two step query. */ - public static GridCacheTwoStepQuery split(JdbcPreparedStatement stmt, Object[] params, boolean collocated, IgniteH2Indexing igniteH2Indexing) { + public static GridCacheTwoStepQuery split( + JdbcPreparedStatement stmt, + Object[] params, + final boolean collocatedGrpBy, + final boolean distributedJoins + ) { if (params == null) params = GridCacheSqlQuery.EMPTY_PARAMS; + Set<String> tbls = new HashSet<>(); Set<String> schemas = new HashSet<>(); + final Prepared prepared = prepared(stmt); + + GridSqlQuery qry = new GridSqlQueryParser().parse(prepared); + + qry = collectAllTables(qry, schemas, tbls); + + // Build resulting two step query. + GridCacheTwoStepQuery res = new GridCacheTwoStepQuery(schemas, tbls); + // Map query will be direct reference to the original query AST. // Thus all the modifications will be performed on the original AST, so we should be careful when // nullifying or updating things, have to make sure that we will not need them in the original form later. - final GridSqlSelect mapQry = wrapUnion(collectAllSpaces(GridSqlQueryParser.parse(stmt), schemas)); + final GridSqlSelect mapQry = wrapUnion(qry); + + GridCacheSqlQuery rdc = split(res, 0, mapQry, params, collocatedGrpBy); + + res.reduceQuery(rdc); + + // We do not have to look at each map query separately here, because if + // the whole initial query is collocated, then all the map sub-queries + // will be collocated as well. + res.distributedJoins(distributedJoins && !isCollocated(query(prepared))); + + return res; + } + + /** + * @param el Either {@link GridSqlSelect#from()} or {@link GridSqlSelect#where()} elements. + */ + private static void findAffinityColumnConditions(GridSqlElement el) { + if (el == null) + return; + + el = GridSqlAlias.unwrap(el); + + if (el instanceof GridSqlJoin) { + GridSqlJoin join = (GridSqlJoin)el; + + findAffinityColumnConditions(join.leftTable()); + findAffinityColumnConditions(join.rightTable()); + findAffinityColumnConditions(join.on()); + } + else if (el instanceof GridSqlOperation) { + GridSqlOperationType type = ((GridSqlOperation)el).operationType(); + + switch(type) { + case AND: + findAffinityColumnConditions(el.child(0)); + findAffinityColumnConditions(el.child(1)); + + break; + + case EQUAL: + findAffinityColumn(el.child(0)); + findAffinityColumn(el.child(1)); + } + } + } + + /** + * @param exp Possible affinity column expression. + */ + private static void findAffinityColumn(GridSqlElement exp) { + if (exp instanceof GridSqlColumn) { + GridSqlColumn col = (GridSqlColumn)exp; + + GridSqlElement from = col.expressionInFrom(); + + if (from instanceof GridSqlTable) { + GridSqlTable fromTbl = (GridSqlTable)from; + + GridH2Table tbl = fromTbl.dataTable(); + + if (tbl != null) { + IndexColumn affKeyCol = tbl.getAffinityKeyColumn(); + Column expCol = col.column(); + + if (affKeyCol != null && expCol != null && + affKeyCol.column.getColumnId() == expCol.getColumnId()) { + // Mark that table lookup will use affinity key. + fromTbl.affinityKeyCondition(true); + } + } + } + } + } + + /** + * @param qry Select. + * @return {@code true} If there is at least one partitioned table in FROM clause. + */ + private static boolean hasPartitionedTableInFrom(GridSqlSelect qry) { + return findTablesInFrom(qry.from(), new IgnitePredicate<GridSqlElement>() { + @Override public boolean apply(GridSqlElement el) { + if (el instanceof GridSqlTable) { + GridH2Table tbl = ((GridSqlTable)el).dataTable(); + + assert tbl != null : el; + + GridCacheContext<?,?> cctx = tbl.rowDescriptor().context(); + return !cctx.isLocal() && !cctx.isReplicated(); + } + + return false; + } + }); + } + + /** + * @param res Resulting two step query. + * @param splitIdx Split index. + * @param mapQry Map query to be split. + * @param params Query parameters. + * @param collocatedGroupBy Whether the query has collocated GROUP BY keys. + * @return Reduce query for the given map query. + */ + private static GridCacheSqlQuery split(GridCacheTwoStepQuery res, int splitIdx, final GridSqlSelect mapQry, + Object[] params, boolean collocatedGroupBy) { final boolean explain = mapQry.explain(); mapQry.explain(false); - GridSqlSelect rdcQry = new GridSqlSelect().from(table(0)); + GridSqlSelect rdcQry = new GridSqlSelect().from(table(splitIdx)); // Split all select expressions into map-reduce parts. List<GridSqlElement> mapExps = new ArrayList<>(mapQry.allColumns()); @@ -172,9 +301,9 @@ public class GridSqlQuerySplitter { boolean aggregateFound = false; for (int i = 0, len = mapExps.size(); i < len; i++) // Remember len because mapExps list can grow. - aggregateFound |= splitSelectExpression(mapExps, rdcExps, colNames, i, collocated, i == havingCol); + aggregateFound |= splitSelectExpression(mapExps, rdcExps, colNames, i, collocatedGroupBy, i == havingCol); - // Fill select expressions. + // -- SELECT mapQry.clearColumns(); for (GridSqlElement exp : mapExps) // Add all map expressions as visible. @@ -189,12 +318,18 @@ public class GridSqlQuerySplitter { for (int i = rdcExps.size(); i < mapExps.size(); i++) // Add all extra map columns as invisible reduce columns. rdcQry.addColumn(column(((GridSqlAlias)mapExps.get(i)).alias()), false); + // -- FROM + findAffinityColumnConditions(mapQry.from()); + + // -- WHERE + findAffinityColumnConditions(mapQry.where()); + // -- GROUP BY - if (mapQry.groupColumns() != null && !collocated) + if (mapQry.groupColumns() != null && !collocatedGroupBy) rdcQry.groupColumns(mapQry.groupColumns()); // -- HAVING - if (havingCol >= 0 && !collocated) { + if (havingCol >= 0 && !collocatedGroupBy) { // TODO IGNITE-1140 - Find aggregate functions in HAVING clause or rewrite query to put all aggregates to SELECT clause. // We need to find HAVING column in reduce query. for (int i = visibleCols; i < rdcQry.allColumns(); i++) { @@ -246,40 +381,25 @@ public class GridSqlQuerySplitter { IntArray paramIdxs = new IntArray(params.length); - GridCacheSqlQuery rdc = new GridCacheSqlQuery(rdcQry.getSQL(), - findParams(rdcQry, params, new ArrayList<>(), paramIdxs).toArray()); - - rdc.parameterIndexes(toIntArray(paramIdxs)); - - paramIdxs = new IntArray(params.length); - GridCacheSqlQuery map = new GridCacheSqlQuery(mapQry.getSQL(), - findParams(mapQry, params, new ArrayList<>(params.length), paramIdxs).toArray()) - .columns(collectColumns(mapExps)); + findParams(mapQry, params, new ArrayList<>(params.length), paramIdxs).toArray()); - map.parameterIndexes(toIntArray(paramIdxs)); + map.columns(collectColumns(mapExps)); + map.parameterIndexes(toArray(paramIdxs)); - Set<String> spaces = new HashSet<>(schemas.size()); + res.addMapQuery(map); - for (String schema : schemas) - spaces.add(igniteH2Indexing.space(schema)); + res.explain(explain); - // Build resulting two step query. - GridCacheTwoStepQuery res = new GridCacheTwoStepQuery(spaces, rdc, rdcQry.simpleQuery()).addMapQuery(map); + paramIdxs = new IntArray(params.length); - res.explain(explain); + GridCacheSqlQuery rdc = new GridCacheSqlQuery(rdcQry.getSQL(), + findParams(rdcQry, params, new ArrayList<>(), paramIdxs).toArray()); - return res; - } + rdc.parameterIndexes(toArray(paramIdxs)); + res.skipMergeTable(rdcQry.simpleQuery()); - /** - * @param arr Integer array. - * @return Primitive int array. - */ - private static int[] toIntArray(IntArray arr) { - int[] res = new int[arr.size()]; - arr.toArray(res); - return res; + return rdc; } /** @@ -315,25 +435,26 @@ public class GridSqlQuerySplitter { /** * @param qry Query. - * @param schemas Shemas' names. + * @param schemas Schema names. + * @param tbls Tables. * @return Query. */ - private static GridSqlQuery collectAllSpaces(GridSqlQuery qry, Set<String> schemas) { + private static GridSqlQuery collectAllTables(GridSqlQuery qry, Set<String> schemas, Set<String> tbls) { if (qry instanceof GridSqlUnion) { GridSqlUnion union = (GridSqlUnion)qry; - collectAllSpaces(union.left(), schemas); - collectAllSpaces(union.right(), schemas); + collectAllTables(union.left(), schemas, tbls); + collectAllTables(union.right(), schemas, tbls); } else { GridSqlSelect select = (GridSqlSelect)qry; - collectAllSpacesInFrom(select.from(), schemas); + collectAllTablesInFrom(select.from(), schemas, tbls); for (GridSqlElement el : select.columns(false)) - collectAllSpacesInSubqueries(el, schemas); + collectAllTablesInSubqueries(el, schemas, tbls); - collectAllSpacesInSubqueries(select.where(), schemas); + collectAllTablesInSubqueries(select.where(), schemas, tbls); } return qry; @@ -341,45 +462,85 @@ public class GridSqlQuerySplitter { /** * @param from From element. - * @param schemas Shemas' names. + * @param schemas Schema names. + * @param tbls Tables. */ - private static void collectAllSpacesInFrom(GridSqlElement from, Set<String> schemas) { - assert from != null; + private static void collectAllTablesInFrom(GridSqlElement from, final Set<String> schemas, final Set<String> tbls) { + findTablesInFrom(from, new IgnitePredicate<GridSqlElement>() { + @Override public boolean apply(GridSqlElement el) { + if (el instanceof GridSqlTable) { + GridSqlTable tbl = (GridSqlTable)el; + + String schema = tbl.schema(); + + boolean addSchema = tbls == null; + + if (tbls != null) + addSchema = tbls.add(tbl.dataTable().identifier()); + + if (addSchema && schema != null && schemas != null) + schemas.add(schema); + } + else if (el instanceof GridSqlSubquery) + collectAllTables(((GridSqlSubquery)el).select(), schemas, tbls); + + return false; + } + }); + } + + /** + * Processes all the tables and subqueries using the given closure. + * + * @param from FROM element. + * @param c Closure each found table and subquery will be passed to. If returns {@code true} the we need to stop. + * @return {@code true} If we have found. + */ + private static boolean findTablesInFrom(GridSqlElement from, IgnitePredicate<GridSqlElement> c) { + if (from == null) + return false; + + if (from instanceof GridSqlTable || from instanceof GridSqlSubquery) + return c.apply(from); if (from instanceof GridSqlJoin) { // Left and right. - collectAllSpacesInFrom(from.child(0), schemas); - collectAllSpacesInFrom(from.child(1), schemas); - } - else if (from instanceof GridSqlTable) { - String schema = ((GridSqlTable)from).schema(); + if (findTablesInFrom(from.child(0), c)) + return true; - if (schema != null) - schemas.add(schema); + if (findTablesInFrom(from.child(1), c)) + return true; + + // We don't process ON condition because it is not a joining part of from here. + return false; } - else if (from instanceof GridSqlSubquery) - collectAllSpaces(((GridSqlSubquery)from).select(), schemas); else if (from instanceof GridSqlAlias) - collectAllSpacesInFrom(from.child(), schemas); - else if (!(from instanceof GridSqlFunction)) - throw new IllegalStateException(from.getClass().getName() + " : " + from.getSQL()); + return findTablesInFrom(from.child(), c); + else if (from instanceof GridSqlFunction) + return false; + + throw new IllegalStateException(from.getClass().getName() + " : " + from.getSQL()); } /** - * Searches spaces in subqueries in SELECT and WHERE clauses. + * Searches schema names and tables in subqueries in SELECT and WHERE clauses. + * * @param el Element. - * @param schemas Schemas' names. + * @param schemas Schema names. + * @param tbls Tables. */ - private static void collectAllSpacesInSubqueries(GridSqlElement el, Set<String> schemas) { - if (el instanceof GridSqlAlias) - el = el.child(); + private static void collectAllTablesInSubqueries(GridSqlElement el, Set<String> schemas, Set<String> tbls) { + if (el == null) + return; + + el = GridSqlAlias.unwrap(el); if (el instanceof GridSqlOperation || el instanceof GridSqlFunction) { for (GridSqlElement child : el) - collectAllSpacesInSubqueries(child, schemas); + collectAllTablesInSubqueries(child, schemas, tbls); } else if (el instanceof GridSqlSubquery) - collectAllSpaces(((GridSqlSubquery)el).select(), schemas); + collectAllTables(((GridSqlSubquery)el).select(), schemas, tbls); } /** @@ -691,7 +852,7 @@ public class GridSqlQuerySplitter { * @return Column. */ private static GridSqlColumn column(String name) { - return new GridSqlColumn(null, name, name); + return new GridSqlColumn(null, null, name, name); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSelect.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSelect.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSelect.java index 07ef1fa..a38ae68 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSelect.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSelect.java @@ -131,13 +131,12 @@ public class GridSqlSelect extends GridSqlQuery { /** * @param buff Statement builder. - * @param expression Alias expression. + * @param exp Alias expression. */ - private static void addAlias(StatementBuilder buff, GridSqlElement expression) { - if (expression instanceof GridSqlAlias) - expression = expression.child(); + private static void addAlias(StatementBuilder buff, GridSqlElement exp) { + exp = GridSqlAlias.unwrap(exp); - buff.append(StringUtils.unEnclose(expression.getSQL())); + buff.append(StringUtils.unEnclose(exp.getSQL())); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlTable.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlTable.java index 0bcdf1c..49c679d 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlTable.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlTable.java @@ -18,7 +18,9 @@ package org.apache.ignite.internal.processors.query.h2.sql; import java.util.Collections; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; import org.h2.command.Parser; +import org.h2.table.Table; import org.jetbrains.annotations.Nullable; /** @@ -31,15 +33,56 @@ public class GridSqlTable extends GridSqlElement { /** */ private final String tblName; + /** */ + private final GridH2Table tbl; + + /** */ + private boolean affKeyCond; + /** * @param schema Schema. * @param tblName Table name. */ public GridSqlTable(@Nullable String schema, String tblName) { + this(schema, tblName, null); + } + + /** + * @param tbl Table. + */ + public GridSqlTable(Table tbl) { + this(tbl.getSchema().getName(), tbl.getName(), tbl); + } + + /** + * @param schema Schema. + * @param tblName Table name. + * @param tbl H2 Table. + */ + private GridSqlTable(@Nullable String schema, String tblName, @Nullable Table tbl) { super(Collections.<GridSqlElement>emptyList()); + assert schema != null : "schema"; + assert tblName != null : "tblName"; + this.schema = schema; this.tblName = tblName; + + this.tbl = tbl instanceof GridH2Table ? (GridH2Table)tbl : null; + } + + /** + * @param affKeyCond If affinity key condition is found. + */ + public void affinityKeyCondition(boolean affKeyCond) { + this.affKeyCond = affKeyCond; + } + + /** + * @return {@code true} If affinity key condition is found. + */ + public boolean affinityKeyCondition() { + return affKeyCond; } /** {@inheritDoc} */ @@ -63,4 +106,31 @@ public class GridSqlTable extends GridSqlElement { public String tableName() { return tblName; } + + /** + * @return Referenced data table. + */ + public GridH2Table dataTable() { + return tbl; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (!super.equals(o)) + return false; + + GridSqlTable that = (GridSqlTable)o; + + return schema.equals(that.schema) && tblName.equals(that.tblName); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int result = 1; + + result = 31 * result + schema.hashCode(); + result = 31 * result + tblName.hashCode(); + + return result; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 580058c..bb5e419 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -18,15 +18,16 @@ package org.apache.ignite.internal.processors.query.h2.twostep; import java.lang.reflect.Field; +import java.sql.Connection; import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; import java.util.AbstractCollection; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.UUID; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReferenceArray; @@ -45,38 +46,46 @@ import org.apache.ignite.internal.GridTopic; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsReservation; import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryRequest; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest; +import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.extensions.communication.Message; import org.h2.jdbc.JdbcResultSet; import org.h2.result.ResultInterface; import org.h2.value.Value; -import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ; import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; +import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP; +import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REPLICATED; import static org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor.QUERY_POOL; import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.toMessages; +import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q; /** * Map query executor. @@ -109,7 +118,7 @@ public class GridMapQueryExecutor { private IgniteH2Indexing h2; /** */ - private ConcurrentMap<UUID, ConcurrentMap<Long, QueryResults>> qryRess = new ConcurrentHashMap8<>(); + private ConcurrentMap<UUID, NodeResults> qryRess = new ConcurrentHashMap8<>(); /** */ private final GridSpinBusyLock busyLock; @@ -136,16 +145,20 @@ public class GridMapQueryExecutor { log = ctx.log(GridMapQueryExecutor.class); + final UUID locNodeId = ctx.localNodeId(); + ctx.event().addLocalEventListener(new GridLocalEventListener() { @Override public void onEvent(final Event evt) { UUID nodeId = ((DiscoveryEvent)evt).eventNode().id(); - ConcurrentMap<Long,QueryResults> nodeRess = qryRess.remove(nodeId); + GridH2QueryContext.clearAfterDeadNode(locNodeId, nodeId); + + NodeResults nodeRess = qryRess.remove(nodeId); if (nodeRess == null) return; - for (QueryResults ress : nodeRess.values()) + for (QueryResults ress : nodeRess.results().values()) ress.cancel(); } }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT); @@ -156,6 +169,9 @@ public class GridMapQueryExecutor { return; try { + if (msg instanceof GridCacheQueryMarshallable) + ((GridCacheQueryMarshallable)msg).unmarshall(ctx.config().getMarshaller(), ctx); + GridMapQueryExecutor.this.onMessage(nodeId, msg); } finally { @@ -180,12 +196,14 @@ public class GridMapQueryExecutor { boolean processed = true; - if (msg instanceof GridQueryRequest) - onQueryRequest(node, (GridQueryRequest)msg); + if (msg instanceof GridH2QueryRequest) + onQueryRequest(node, (GridH2QueryRequest)msg); else if (msg instanceof GridQueryNextPageRequest) onNextPageRequest(node, (GridQueryNextPageRequest)msg); else if (msg instanceof GridQueryCancelRequest) onCancel(node, (GridQueryCancelRequest)msg); + else if (msg instanceof GridQueryRequest) + onQueryRequest(node, (GridQueryRequest)msg); else processed = false; @@ -202,9 +220,19 @@ public class GridMapQueryExecutor { * @param msg Message. */ private void onCancel(ClusterNode node, GridQueryCancelRequest msg) { - ConcurrentMap<Long,QueryResults> nodeRess = resultsForNode(node.id()); + long qryReqId = msg.queryRequestId(); + + NodeResults nodeRess = resultsForNode(node.id()); + + boolean clear = GridH2QueryContext.clear(ctx.localNodeId(), node.id(), qryReqId, MAP); - QueryResults results = nodeRess.remove(msg.queryRequestId()); + if (!clear) { + nodeRess.onCancel(qryReqId); + + GridH2QueryContext.clear(ctx.localNodeId(), node.id(), qryReqId, MAP); + } + + QueryResults results = nodeRess.results().remove(qryReqId); if (results == null) return; @@ -216,13 +244,13 @@ public class GridMapQueryExecutor { * @param nodeId Node ID. * @return Results for node. */ - private ConcurrentMap<Long, QueryResults> resultsForNode(UUID nodeId) { - ConcurrentMap<Long, QueryResults> nodeRess = qryRess.get(nodeId); + private NodeResults resultsForNode(UUID nodeId) { + NodeResults nodeRess = qryRess.get(nodeId); if (nodeRess == null) { - nodeRess = new ConcurrentHashMap8<>(); + nodeRess = new NodeResults(); - ConcurrentMap<Long, QueryResults> old = qryRess.putIfAbsent(nodeId, nodeRess); + NodeResults old = qryRess.putIfAbsent(nodeId, nodeRess); if (old != null) nodeRess = old; @@ -232,19 +260,6 @@ public class GridMapQueryExecutor { } /** - * @param cacheName Cache name. - * @return Cache context or {@code null} if none. - */ - @Nullable private GridCacheContext<?,?> cacheContext(String cacheName) { - GridCacheAdapter<?,?> cache = ctx.cache().internalCache(cacheName); - - if (cache == null) - return null; - - return cache.context(); - } - - /** * @param cctx Cache context. * @param p Partition ID. * @return Partition. @@ -254,7 +269,7 @@ public class GridMapQueryExecutor { } /** - * @param cacheNames Cache names. + * @param cacheIds Cache IDs. * @param topVer Topology version. * @param explicitParts Explicit partitions list. * @param reserved Reserved list. @@ -262,7 +277,7 @@ public class GridMapQueryExecutor { * @throws IgniteCheckedException If failed. */ private boolean reservePartitions( - Collection<String> cacheNames, + List<Integer> cacheIds, AffinityTopologyVersion topVer, final int[] explicitParts, List<GridReservable> reserved @@ -271,8 +286,8 @@ public class GridMapQueryExecutor { Collection<Integer> partIds = wrap(explicitParts); - for (String cacheName : cacheNames) { - GridCacheContext<?, ?> cctx = cacheContext(cacheName); + for (int i = 0; i < cacheIds.size(); i++) { + GridCacheContext<?, ?> cctx = ctx.cache().context().cacheContext(cacheIds.get(i)); if (cctx == null) // Cache was not found, probably was not deployed yet. return false; @@ -394,121 +409,226 @@ public class GridMapQueryExecutor { * @param req Query request. */ private void onQueryRequest(ClusterNode node, GridQueryRequest req) { - ConcurrentMap<Long,QueryResults> nodeRess = resultsForNode(node.id()); + List<Integer> cacheIds; - QueryResults qr = null; + if (req.extraSpaces() != null) { + cacheIds = new ArrayList<>(req.extraSpaces().size() + 1); - List<GridReservable> reserved = new ArrayList<>(); + cacheIds.add(CU.cacheId(req.space())); - try { - // Unmarshall query params. - Collection<GridCacheSqlQuery> qrys; + for (String extraSpace : req.extraSpaces()) + cacheIds.add(CU.cacheId(extraSpace)); + } + else + cacheIds = Collections.singletonList(CU.cacheId(req.space())); + + onQueryRequest0(node, + req.requestId(), + req.queries(), + cacheIds, + req.topologyVersion(), + null, + req.partitions(), + null, + req.pageSize(), + false); + } - try { - qrys = req.queries(); + /** + * @param node Node. + * @param req Query request. + */ + private void onQueryRequest(ClusterNode node, GridH2QueryRequest req) { + Map<UUID,int[]> partsMap = req.partitions(); + int[] parts = partsMap == null ? null : partsMap.get(ctx.localNodeId()); + + onQueryRequest0(node, + req.requestId(), + req.queries(), + req.caches(), + req.topologyVersion(), + partsMap, + parts, + req.tables(), + req.pageSize(), + req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS)); + } + + /** + * @param node Node authored request. + * @param reqId Request ID. + * @param qrys Queries to execute. + * @param cacheIds Caches which will be affected by these queries. + * @param topVer Topology version. + * @param partsMap Partitions map for unstable topology. + * @param parts Explicit partitions for current node. + * @param tbls Tables. + * @param pageSize Page size. + * @param distributedJoins Can we expect distributed joins to be ran. + */ + private void onQueryRequest0( + ClusterNode node, + long reqId, + Collection<GridCacheSqlQuery> qrys, + List<Integer> cacheIds, + AffinityTopologyVersion topVer, + Map<UUID, int[]> partsMap, + int[] parts, + Collection<String> tbls, + int pageSize, + boolean distributedJoins + ) { + // Prepare to run queries. + GridCacheContext<?, ?> mainCctx = ctx.cache().context().cacheContext(cacheIds.get(0)); - if (!node.isLocal()) { - Marshaller m = ctx.config().getMarshaller(); + if (mainCctx == null) + throw new CacheException("Failed to find cache."); - for (GridCacheSqlQuery qry : qrys) - qry.unmarshallParams(m, ctx); - } - } - catch (IgniteCheckedException e) { - throw new CacheException("Failed to unmarshall parameters.", e); - } + NodeResults nodeRess = resultsForNode(node.id()); - List<String> caches = (List<String>)F.concat(true, req.space(), req.extraSpaces()); + QueryResults qr = null; - // Topology version can be null in rolling restart with previous version! - final AffinityTopologyVersion topVer = req.topologyVersion(); + List<GridReservable> reserved = new ArrayList<>(); + try { if (topVer != null) { // Reserve primary for topology version or explicit partitions. - if (!reservePartitions(caches, topVer, req.partitions(), reserved)) { - sendRetry(node, req.requestId()); + if (!reservePartitions(cacheIds, topVer, parts, reserved)) { + sendRetry(node, reqId); return; } } - // Prepare to run queries. - GridCacheContext<?,?> mainCctx = cacheContext(req.space()); + qr = new QueryResults(reqId, qrys.size(), mainCctx); - if (mainCctx == null) - throw new CacheException("Failed to find cache: " + req.space()); + if (nodeRess.results().put(reqId, qr) != null) + throw new IllegalStateException(); - qr = new QueryResults(req.requestId(), qrys.size(), mainCctx); + // Prepare query context. + GridH2QueryContext qctx = new GridH2QueryContext(ctx.localNodeId(), + node.id(), + reqId, + mainCctx.isReplicated() ? REPLICATED : MAP) + .filter(h2.backupFilter(topVer, parts)) + .partitionsMap(partsMap) + .distributedJoins(distributedJoins) + .pageSize(pageSize) + .topologyVersion(topVer) + .reservations(reserved); - if (nodeRess.put(req.requestId(), qr) != null) - throw new IllegalStateException(); + List<GridH2Table> snapshotedTbls = null; - h2.setFilters(h2.backupFilter(caches, topVer, req.partitions())); + if (!F.isEmpty(tbls)) { + snapshotedTbls = new ArrayList<>(tbls.size()); - // TODO Prepare snapshots for all the needed tables before the run. + for (String identifier : tbls) { + GridH2Table tbl = h2.dataTable(identifier); - // Run queries. - int i = 0; + Objects.requireNonNull(tbl, identifier); - for (GridCacheSqlQuery qry : qrys) { - ResultSet rs = h2.executeSqlQueryWithTimer(req.space(), - h2.connectionForSpace(req.space()), - qry.query(), - F.asList(qry.parameters()), - true); + tbl.snapshotIndexes(qctx); - if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { - ctx.event().record(new CacheQueryExecutedEvent<>( - node, - "SQL query executed.", - EVT_CACHE_QUERY_EXECUTED, - CacheQueryType.SQL.name(), - mainCctx.namex(), - null, - qry.query(), - null, - null, - qry.parameters(), - node.id(), - null)); + snapshotedTbls.add(tbl); } + } + + Connection conn = h2.connectionForSpace(mainCctx.name()); + + // Here we enforce join order to have the same behavior on all the nodes. + h2.setupConnection(conn, distributedJoins, true); + + GridH2QueryContext.set(qctx); - assert rs instanceof JdbcResultSet : rs.getClass(); + // qctx is set, we have to release reservations inside of it. + reserved = null; - qr.addResult(i, qry, node.id(), rs); + try { + if (nodeRess.cancelled(reqId)) { + GridH2QueryContext.clear(ctx.localNodeId(), node.id(), reqId, qctx.type()); - if (qr.canceled) { - qr.result(i).close(); + nodeRess.results().remove(reqId); return; } - // Send the first page. - sendNextPage(nodeRess, node, qr, i, req.pageSize()); + // Run queries. + int i = 0; + + boolean evt = ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED); + + for (GridCacheSqlQuery qry : qrys) { + ResultSet rs = h2.executeSqlQueryWithTimer(mainCctx.name(), conn, qry.query(), + F.asList(qry.parameters()), true); + + if (evt) { + ctx.event().record(new CacheQueryExecutedEvent<>( + node, + "SQL query executed.", + EVT_CACHE_QUERY_EXECUTED, + CacheQueryType.SQL.name(), + mainCctx.namex(), + null, + qry.query(), + null, + null, + qry.parameters(), + node.id(), + null)); + } + + assert rs instanceof JdbcResultSet : rs.getClass(); + + qr.addResult(i, qry, node.id(), rs); + + if (qr.canceled) { + qr.result(i).close(); + + return; + } + + // Send the first page. + sendNextPage(nodeRess, node, qr, i, pageSize); + + i++; + } + } + finally { + GridH2QueryContext.clearThreadLocal(); + + if (!distributedJoins) + qctx.clearContext(false); - i++; + if (!F.isEmpty(snapshotedTbls)) { + for (GridH2Table dataTbl : snapshotedTbls) + dataTbl.releaseSnapshots(); + } } } catch (Throwable e) { if (qr != null) { - nodeRess.remove(req.requestId(), qr); + nodeRess.results().remove(reqId, qr); qr.cancel(); } - U.error(log, "Failed to execute local query: " + req, e); + if (X.hasCause(e, GridH2RetryException.class)) + sendRetry(node, reqId); + else { + U.error(log, "Failed to execute local query.", e); - sendError(node, req.requestId(), e); + sendError(node, reqId, e); - if (e instanceof Error) - throw (Error)e; + if (e instanceof Error) + throw (Error)e; + } } finally { - h2.setFilters(null); - - // Release reserved partitions. - for (GridReservable r : reserved) - r.release(); + if (reserved != null) { + // Release reserved partitions. + for (int i = 0; i < reserved.size(); i++) + reserved.get(i).release(); + } } } @@ -521,8 +641,11 @@ public class GridMapQueryExecutor { try { GridQueryFailResponse msg = new GridQueryFailResponse(qryReqId, err); - if (node.isLocal()) + if (node.isLocal()) { + U.error(log, "Failed to run map query on local node.", err); + h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg); + } else ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, QUERY_POOL); } @@ -538,9 +661,9 @@ public class GridMapQueryExecutor { * @param req Request. */ private void onNextPageRequest(ClusterNode node, GridQueryNextPageRequest req) { - ConcurrentMap<Long, QueryResults> nodeRess = qryRess.get(node.id()); + NodeResults nodeRess = qryRess.get(node.id()); - QueryResults qr = nodeRess == null ? null : nodeRess.get(req.queryRequestId()); + QueryResults qr = nodeRess == null ? null : nodeRess.results().get(req.queryRequestId()); if (qr == null || qr.canceled) sendError(node, req.queryRequestId(), new CacheException("No query result found for request: " + req)); @@ -549,12 +672,13 @@ public class GridMapQueryExecutor { } /** + * @param nodeRess Results. * @param node Node. * @param qr Query results. * @param qry Query. * @param pageSize Page size. */ - private void sendNextPage(ConcurrentMap<Long, QueryResults> nodeRess, ClusterNode node, QueryResults qr, int qry, + private void sendNextPage(NodeResults nodeRess, ClusterNode node, QueryResults qr, int qry, int pageSize) { QueryResult res = qr.result(qry); @@ -570,14 +694,14 @@ public class GridMapQueryExecutor { res.close(); if (qr.isAllClosed()) - nodeRess.remove(qr.qryReqId, qr); + nodeRess.results().remove(qr.qryReqId, qr); } try { boolean loc = node.isLocal(); GridQueryNextPageResponse msg = new GridQueryNextPageResponse(qr.qryReqId, qry, page, - page == 0 ? res.rowCount : -1 , + page == 0 ? res.rowCnt : -1 , res.cols, loc ? null : toMessages(rows, new ArrayList<Message>(res.cols)), loc ? rows : null); @@ -597,22 +721,26 @@ public class GridMapQueryExecutor { /** * @param node Node. * @param reqId Request ID. - * @throws IgniteCheckedException If failed. */ - private void sendRetry(ClusterNode node, long reqId) throws IgniteCheckedException { - boolean loc = node.isLocal(); + private void sendRetry(ClusterNode node, long reqId) { + try { + boolean loc = node.isLocal(); - GridQueryNextPageResponse msg = new GridQueryNextPageResponse(reqId, + GridQueryNextPageResponse msg = new GridQueryNextPageResponse(reqId, /*qry*/0, /*page*/0, /*allRows*/0, /*cols*/1, - loc ? null : Collections.<Message>emptyList(), - loc ? Collections.<Value[]>emptyList() : null); + loc ? null : Collections.<Message>emptyList(), + loc ? Collections.<Value[]>emptyList() : null); - msg.retry(h2.readyTopologyVersion()); + msg.retry(h2.readyTopologyVersion()); - if (loc) - h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg); - else - ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, QUERY_POOL); + if (loc) + h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg); + else + ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, QUERY_POOL); + } + catch (Exception e) { + U.warn(log, "Failed to send retry message: " + e.getMessage()); + } } /** @@ -626,6 +754,44 @@ public class GridMapQueryExecutor { } } + + /** + * + */ + private static class NodeResults { + /** */ + private final ConcurrentMap<Long, QueryResults> res = new ConcurrentHashMap8<>(); + + /** */ + private final GridBoundedConcurrentLinkedHashMap<Long, Boolean> qryHist = + new GridBoundedConcurrentLinkedHashMap<>(1024, 1024, 0.75f, 64, PER_SEGMENT_Q); + + /** + * @return All results. + */ + ConcurrentMap<Long, QueryResults> results() { + return res; + } + + /** + * @param qryId Query ID. + * @return {@code False} if query was already cancelled. + */ + boolean cancelled(long qryId) { + return qryHist.get(qryId) != null; + } + + /** + * @param qryId Query ID. + * @return {@code True} if cancelled. + */ + boolean onCancel(long qryId) { + Boolean old = qryHist.putIfAbsent(qryId, Boolean.FALSE); + + return old == null; + } + } + /** * */ @@ -637,7 +803,7 @@ public class GridMapQueryExecutor { private final AtomicReferenceArray<QueryResult> results; /** */ - private final GridCacheContext<?,?> cctx; + private final GridCacheContext<?, ?> cctx; /** */ private volatile boolean canceled; @@ -687,6 +853,9 @@ public class GridMapQueryExecutor { return true; } + /** + * + */ void cancel() { if (canceled) return; @@ -728,7 +897,7 @@ public class GridMapQueryExecutor { private int page; /** */ - private final int rowCount; + private final int rowCnt; /** */ private volatile boolean closed; @@ -752,7 +921,7 @@ public class GridMapQueryExecutor { throw new IllegalStateException(e); // Must not happen. } - rowCount = res.getRowCount(); + rowCnt = res.getRowCount(); cols = res.getVisibleColumnCount(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java index 3914bd7..796ea66 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java @@ -18,18 +18,22 @@ package org.apache.ignite.internal.processors.query.h2.twostep; import java.util.ArrayList; +import java.util.Collection; import java.util.ConcurrentModificationException; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import javax.cache.CacheException; import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridKernalContext; -import org.h2.engine.Constants; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor; +import org.apache.ignite.internal.util.typedef.internal.U; import org.h2.engine.Session; import org.h2.index.BaseIndex; import org.h2.index.Cursor; @@ -41,7 +45,6 @@ import org.h2.result.SortOrder; import org.h2.table.IndexColumn; import org.h2.table.TableFilter; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentHashMap8; import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_MERGE_TABLE_MAX_SIZE; import static org.apache.ignite.IgniteSystemProperties.getInteger; @@ -57,7 +60,7 @@ public abstract class GridMergeIndex extends BaseIndex { private final AtomicInteger expRowsCnt = new AtomicInteger(0); /** Remaining rows per source node ID. */ - private final ConcurrentMap<UUID, Counter> remainingRows = new ConcurrentHashMap8<>(); + private Map<UUID, Counter> remainingRows; /** */ private final AtomicBoolean lastSubmitted = new AtomicBoolean(); @@ -136,11 +139,19 @@ public abstract class GridMergeIndex extends BaseIndex { } /** - * @param nodeId Node ID. + * Set source nodes. + * + * @param nodes Nodes. */ - public void addSource(UUID nodeId) { - if (remainingRows.put(nodeId, new Counter()) != null) - throw new IllegalStateException(); + public void setSources(Collection<ClusterNode> nodes) { + assert remainingRows == null; + + remainingRows = U.newHashMap(nodes.size()); + + for (ClusterNode node : nodes) { + if (remainingRows.put(node.id(), new Counter()) != null) + throw new IllegalStateException("Duplicate node id: " + node.id()); + } } /** @@ -283,8 +294,8 @@ public abstract class GridMergeIndex extends BaseIndex { } /** {@inheritDoc} */ - @Override public double getCost(Session ses, int[] masks, TableFilter filter, SortOrder sortOrder) { - return getRowCountApproximation() + Constants.COST_ROW_OFFSET; + @Override public double getCost(Session ses, int[] masks, TableFilter[] filters, int filter, SortOrder sortOrder) { + return getCostRangeIndex(masks, getRowCountApproximation(), filters, filter, sortOrder, true); } /** {@inheritDoc} */ @@ -318,51 +329,9 @@ public abstract class GridMergeIndex extends BaseIndex { } /** - * Cursor over iterator. - */ - protected class IteratorCursor implements Cursor { - /** */ - protected Iterator<Row> iter; - - /** */ - protected Row cur; - - /** - * @param iter Iterator. - */ - public IteratorCursor(Iterator<Row> iter) { - assert iter != null; - - this.iter = iter; - } - - /** {@inheritDoc} */ - @Override public Row get() { - return cur; - } - - /** {@inheritDoc} */ - @Override public SearchRow getSearchRow() { - return get(); - } - - /** {@inheritDoc} */ - @Override public boolean next() { - cur = iter.hasNext() ? iter.next() : null; - - return cur != null; - } - - /** {@inheritDoc} */ - @Override public boolean previous() { - throw DbException.getUnsupportedException("previous"); - } - } - - /** * Fetching cursor. */ - protected class FetchingCursor extends IteratorCursor { + protected class FetchingCursor extends GridH2Cursor { /** */ private Iterator<Row> stream; http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java index 5639340..8a8577f 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java @@ -25,6 +25,8 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import javax.cache.CacheException; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowFactory; import org.h2.index.Cursor; import org.h2.index.IndexType; import org.h2.result.Row; @@ -73,7 +75,7 @@ public class GridMergeIndexUnsorted extends GridMergeIndex { /** {@inheritDoc} */ @Override protected Cursor findAllFetched(List<Row> fetched, @Nullable SearchRow first, @Nullable SearchRow last) { - return new IteratorCursor(fetched.iterator()); + return new GridH2Cursor(fetched.iterator()); } /** {@inheritDoc} */ @@ -112,7 +114,7 @@ public class GridMergeIndexUnsorted extends GridMergeIndex { } @Override public Row next() { - return new Row(iter.next(), 0); + return GridH2RowFactory.create(iter.next()); } @Override public void remove() { http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java index a86cbcd..1489021 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java @@ -50,8 +50,8 @@ public class GridMergeTable extends TableBase { } /** {@inheritDoc} */ - @Override public void lock(Session session, boolean exclusive, boolean force) { - // No-op. + @Override public boolean lock(Session session, boolean exclusive, boolean force) { + return false; } /** {@inheritDoc} */
