http://git-wip-us.apache.org/repos/asf/ignite/blob/799098c6/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java index a7c44c9,243d1dc..9e41bfe --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java @@@ -56,12 -48,10 +48,13 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.GridQueryFieldsResult; import org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter; - import org.apache.ignite.internal.processors.query.GridQueryProperty; - import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.processors.query.IgniteSQLException; +import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.internal.processors.query.h2.dml.FastUpdateArgument; +import org.apache.ignite.internal.processors.query.h2.dml.FastUpdateArguments; + import org.apache.ignite.internal.processors.query.h2.dml.DmlBatchSender; + import org.apache.ignite.internal.processors.query.h2.dml.DmlDistributedPlanInfo; + import org.apache.ignite.internal.processors.query.h2.dml.FastUpdate; import org.apache.ignite.internal.processors.query.h2.dml.UpdateMode; import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan; import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder; @@@ -352,24 -341,32 +352,34 @@@ public class DmlStatementsProcessor Integer errKeysPos = null; - UpdatePlan plan = getPlanForStatement(schemaName, prepStmt, errKeysPos); + UpdatePlan plan = getPlanForStatement(schemaName, c, prepared, fieldsQry, loc, errKeysPos); + + FastUpdate fastUpdate = plan.fastUpdate(); - if (plan.fastUpdateArgs != null) { + if (fastUpdate != null) { assert F.isEmpty(failedKeys) && errKeysPos == null; - return doFastUpdate(plan, fieldsQry.getArgs()); + return fastUpdate.execute(plan.cacheContext().cache(), fieldsQry.getArgs()); } - assert !F.isEmpty(plan.rows) ^ !F.isEmpty(plan.selectQry); + if (plan.distributedPlan() != null) { + UpdateResult result = doDistributedUpdate(schemaName, fieldsQry, plan, cancel); + + // null is returned in case not all nodes support distributed DML. + if (result != null) + return result; + } + + assert !F.isEmpty(plan.selectQuery()); - QueryCursorImpl<List<?>> cur; + Iterable<List<?>> cur; // Do a two-step query only if locality flag is not set AND if plan's SELECT corresponds to an actual - // subquery and not some dummy stuff like "select 1, 2, 3;" - if (!loc && !plan.isLocSubqry) { - assert !F.isEmpty(plan.selectQry); + // sub-query and not some dummy stuff like "select 1, 2, 3;" + if (!loc && !plan.isLocalSubquery()) { ++ assert !F.isEmpty(plan.selectQuery()); + - SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQry, fieldsQry.isCollocated()) + SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQuery(), fieldsQry.isCollocated()) .setArgs(fieldsQry.getArgs()) .setDistributedJoins(fieldsQry.isDistributedJoins()) .setEnforceJoinOrder(fieldsQry.isEnforceJoinOrder()) @@@ -377,11 -374,11 +387,11 @@@ .setPageSize(fieldsQry.getPageSize()) .setTimeout(fieldsQry.getTimeout(), TimeUnit.MILLISECONDS); - cur = (QueryCursorImpl<List<?>>) idx.queryDistributedSqlFields(schemaName, newFieldsQry, true, cancel, - mainCacheId); + cur = (QueryCursorImpl<List<?>>)idx.queryDistributedSqlFields(schemaName, newFieldsQry, true, + cancel, mainCacheId, true).get(0); } - else { - final GridQueryFieldsResult res = idx.queryLocalSqlFields(schemaName, plan.selectQuery(), + else if (F.isEmpty(plan.rows)) { - final GridQueryFieldsResult res = idx.queryLocalSqlFields(schemaName, plan.selectQry, ++ final GridQueryFieldsResult res = idx.queryLocalSqlFields(schemaName, plan.selectQry(), F.asList(fieldsQry.getArgs()), filters, fieldsQry.isEnforceJoinOrder(), fieldsQry.getTimeout(), cancel); cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/799098c6/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlAstUtils.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlAstUtils.java index 0000000,054e708..a2cd553 mode 000000,100644..100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlAstUtils.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlAstUtils.java @@@ -1,0 -1,609 +1,639 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.query.h2.dml; + + import java.util.ArrayList; + import java.util.HashSet; + import java.util.List; + import java.util.Set; + import org.apache.ignite.IgniteException; + import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; + import org.apache.ignite.internal.processors.query.IgniteSQLException; + import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap; + import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor; + import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; + import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAlias; + import org.apache.ignite.internal.processors.query.h2.sql.GridSqlArray; + import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAst; + import org.apache.ignite.internal.processors.query.h2.sql.GridSqlColumn; + import org.apache.ignite.internal.processors.query.h2.sql.GridSqlConst; + import org.apache.ignite.internal.processors.query.h2.sql.GridSqlDelete; + import org.apache.ignite.internal.processors.query.h2.sql.GridSqlElement; + import org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunction; + import org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType; + import org.apache.ignite.internal.processors.query.h2.sql.GridSqlJoin; + import org.apache.ignite.internal.processors.query.h2.sql.GridSqlKeyword; + import org.apache.ignite.internal.processors.query.h2.sql.GridSqlOperation; + import org.apache.ignite.internal.processors.query.h2.sql.GridSqlOperationType; + import org.apache.ignite.internal.processors.query.h2.sql.GridSqlParameter; + import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuery; + import org.apache.ignite.internal.processors.query.h2.sql.GridSqlSelect; + import org.apache.ignite.internal.processors.query.h2.sql.GridSqlSubquery; + import org.apache.ignite.internal.processors.query.h2.sql.GridSqlTable; + import org.apache.ignite.internal.processors.query.h2.sql.GridSqlType; + import org.apache.ignite.internal.processors.query.h2.sql.GridSqlUnion; + import org.apache.ignite.internal.processors.query.h2.sql.GridSqlUpdate; + import org.apache.ignite.internal.util.lang.IgnitePair; + import org.apache.ignite.internal.util.typedef.F; + import org.apache.ignite.internal.util.typedef.internal.U; + import org.apache.ignite.lang.IgnitePredicate; + import org.h2.command.Parser; + import org.h2.expression.Expression; + import org.h2.table.Column; + import org.h2.table.Table; + import org.h2.util.IntArray; + import org.h2.value.DataType; + import org.h2.value.Value; + import org.h2.value.ValueDate; + import org.h2.value.ValueInt; + import org.h2.value.ValueString; + import org.h2.value.ValueTime; + import org.h2.value.ValueTimestamp; + import org.jetbrains.annotations.Nullable; + + /** + * AST utils for DML + */ + public final class DmlAstUtils { + /** + * Empty ctor to prevent initialization. + */ + private DmlAstUtils() { + // No-op. + } + + /** + * Create SELECT on which subsequent INSERT or MERGE will be based. + * + * @param cols Columns to insert values into. + * @param rows Rows to create pseudo-SELECT upon. + * @param subQry Subquery to use rather than rows. - * @return Subquery or pseudo-SELECT to evaluate inserted expressions. ++ * @param desc Row descriptor. ++ * @return Subquery or pseudo-SELECT to evaluate inserted expressions, or {@code null} no query needs to be run. + */ + public static GridSqlQuery selectForInsertOrMerge(GridSqlColumn[] cols, List<GridSqlElement[]> rows, + GridSqlQuery subQry) { + if (!F.isEmpty(rows)) { + assert !F.isEmpty(cols); + + GridSqlSelect sel = new GridSqlSelect(); + + GridSqlFunction from = new GridSqlFunction(GridSqlFunctionType.TABLE); + + sel.from(from); + + GridSqlArray[] args = new GridSqlArray[cols.length]; + ++ boolean noQry = true; ++ + for (int i = 0; i < cols.length; i++) { + GridSqlArray arr = new GridSqlArray(rows.size()); + + String colName = cols[i].columnName(); + + GridSqlAlias alias = new GridSqlAlias(colName, arr); + + alias.resultType(cols[i].resultType()); + + from.addChild(alias); + + args[i] = arr; + + GridSqlColumn newCol = new GridSqlColumn(null, from, null,"TABLE", colName); + + newCol.resultType(cols[i].resultType()); + + sel.addColumn(newCol, true); + } + + for (GridSqlElement[] row : rows) { + assert cols.length == row.length; + - for (int i = 0; i < row.length; i++) ++ for (int i = 0; i < row.length; i++) { ++ GridSqlElement el = row[i]; ++ ++ noQry &= (el instanceof GridSqlConst || el instanceof GridSqlParameter); ++ + args[i].addChild(row[i]); ++ } + } + ++ if (noQry) ++ return null; ++ + return sel; + } + else { + assert subQry != null; + + return subQry; + } + } + + /** + * Generate SQL SELECT based on DELETE's WHERE, LIMIT, etc. + * + * @param del Delete statement. + * @param keysParamIdx Index for . + * @return SELECT statement. + */ + public static GridSqlSelect selectForDelete(GridSqlDelete del, @Nullable Integer keysParamIdx) { + GridSqlSelect mapQry = new GridSqlSelect(); + + mapQry.from(del.from()); + + Set<GridSqlTable> tbls = new HashSet<>(); + + collectAllGridTablesInTarget(del.from(), tbls); + + assert tbls.size() == 1 : "Failed to determine target table for DELETE"; + + GridSqlTable tbl = tbls.iterator().next(); + + GridH2Table gridTbl = tbl.dataTable(); + + assert gridTbl != null : "Failed to determine target grid table for DELETE"; + + Column h2KeyCol = gridTbl.getColumn(GridH2KeyValueRowOnheap.KEY_COL); + + Column h2ValCol = gridTbl.getColumn(GridH2KeyValueRowOnheap.VAL_COL); + + GridSqlColumn keyCol = new GridSqlColumn(h2KeyCol, tbl, h2KeyCol.getName()); + keyCol.resultType(GridSqlType.fromColumn(h2KeyCol)); + + GridSqlColumn valCol = new GridSqlColumn(h2ValCol, tbl, h2ValCol.getName()); + valCol.resultType(GridSqlType.fromColumn(h2ValCol)); + + mapQry.addColumn(keyCol, true); + mapQry.addColumn(valCol, true); + + GridSqlElement where = del.where(); + if (keysParamIdx != null) + where = injectKeysFilterParam(where, keyCol, keysParamIdx); + + mapQry.where(where); + mapQry.limit(del.limit()); + + return mapQry; + } + + /** + * @param update UPDATE statement. + * @return {@code null} if given statement directly updates {@code _val} column with a literal or param value + * and filters by single non expression key (and, optionally, by single non expression value). + */ + public static FastUpdate getFastUpdateArgs(GridSqlUpdate update) { + IgnitePair<GridSqlElement> filter = findKeyValueEqualityCondition(update.where()); + + if (filter == null) + return null; + + if (update.cols().size() != 1) + return null; + + Table tbl = update.cols().get(0).column().getTable(); + if (!(tbl instanceof GridH2Table)) + return null; + + GridH2RowDescriptor desc = ((GridH2Table)tbl).rowDescriptor(); + if (!desc.isValueColumn(update.cols().get(0).column().getColumnId())) + return null; + + GridSqlElement set = update.set().get(update.cols().get(0).columnName()); + + if (!(set instanceof GridSqlConst || set instanceof GridSqlParameter)) + return null; + - return FastUpdate.create(filter.getKey(), filter.getValue(), set); ++ return new FastUpdateArguments(operandForElement(filter.getKey()), operandForElement(filter.getValue()), ++ operandForElement(set)); ++ } ++ ++ /** ++ * Create operand based on exact type of SQL element. ++ * ++ * @param el element. ++ * @return Operand. ++ */ ++ private static FastUpdateArgument operandForElement(GridSqlElement el) { ++ assert el == null ^ (el instanceof GridSqlConst || el instanceof GridSqlParameter); ++ ++ if (el == null) ++ return FastUpdateArguments.NULL_ARGUMENT; ++ ++ if (el instanceof GridSqlConst) ++ return new FastUpdateArguments.ValueArgument(((GridSqlConst)el).value().getObject()); ++ else ++ return new FastUpdateArguments.ParamArgument(((GridSqlParameter)el).index()); + } + + /** + * @param del DELETE statement. + * @return {@code true} if given statement filters by single non expression key. + */ + public static FastUpdate getFastDeleteArgs(GridSqlDelete del) { + IgnitePair<GridSqlElement> filter = findKeyValueEqualityCondition(del.where()); + + if (filter == null) + return null; + + return FastUpdate.create(filter.getKey(), filter.getValue(), null); + } + + /** + * @param where Element to test. + * @return Whether given element corresponds to {@code WHERE _key = ?}, and key is a literal expressed + * in query or a query param. + */ + @SuppressWarnings("RedundantCast") + private static IgnitePair<GridSqlElement> findKeyValueEqualityCondition(GridSqlElement where) { + if (where == null || !(where instanceof GridSqlOperation)) + return null; + + GridSqlOperation whereOp = (GridSqlOperation) where; + + // Does this WHERE limit only by _key? + if (isKeyEqualityCondition(whereOp)) + return new IgnitePair<>((GridSqlElement)whereOp.child(1), null); + + // Or maybe it limits both by _key and _val? + if (whereOp.operationType() != GridSqlOperationType.AND) + return null; + + GridSqlElement left = whereOp.child(0); + + GridSqlElement right = whereOp.child(1); + + if (!(left instanceof GridSqlOperation && right instanceof GridSqlOperation)) + return null; + + GridSqlOperation leftOp = (GridSqlOperation) left; + + GridSqlOperation rightOp = (GridSqlOperation) right; + + if (isKeyEqualityCondition(leftOp)) { // _key = ? and _val = ? + if (!isValueEqualityCondition(rightOp)) + return null; + + return new IgnitePair<>((GridSqlElement)leftOp.child(1), (GridSqlElement)rightOp.child(1)); + } + else if (isKeyEqualityCondition(rightOp)) { // _val = ? and _key = ? + if (!isValueEqualityCondition(leftOp)) + return null; + + return new IgnitePair<>((GridSqlElement)rightOp.child(1), (GridSqlElement)leftOp.child(1)); + } + else // Neither + return null; + } + + /** + * @param op Operation. + * @param key true - check for key equality condition, + * otherwise check for value equality condition + * @return Whether this condition is of form {@code colName} = ? + */ + private static boolean isEqualityCondition(GridSqlOperation op, boolean key) { + if (op.operationType() != GridSqlOperationType.EQUAL) + return false; + + GridSqlElement left = op.child(0); + GridSqlElement right = op.child(1); + + if (!(left instanceof GridSqlColumn)) + return false; + + GridSqlColumn column = (GridSqlColumn)left; + if (!(column.column().getTable() instanceof GridH2Table)) + return false; + + GridH2RowDescriptor desc =((GridH2Table) column.column().getTable()).rowDescriptor(); + + return (key ? desc.isKeyColumn(column.column().getColumnId()) : + desc.isValueColumn(column.column().getColumnId())) && + (right instanceof GridSqlConst || right instanceof GridSqlParameter); + } + + /** + * @param op Operation. + * @return Whether this condition is of form _key = ? + */ + private static boolean isKeyEqualityCondition(GridSqlOperation op) { + return isEqualityCondition(op, true); + } + + /** + * @param op Operation. + * @return Whether this condition is of form _val = ? + */ + private static boolean isValueEqualityCondition(GridSqlOperation op) { + return isEqualityCondition(op, false); + } + + + /** + * Generate SQL SELECT based on UPDATE's WHERE, LIMIT, etc. + * + * @param update Update statement. + * @param keysParamIdx Index of new param for the array of keys. + * @return SELECT statement. + */ + public static GridSqlSelect selectForUpdate(GridSqlUpdate update, @Nullable Integer keysParamIdx) { + GridSqlSelect mapQry = new GridSqlSelect(); + + mapQry.from(update.target()); + + Set<GridSqlTable> tbls = new HashSet<>(); + + collectAllGridTablesInTarget(update.target(), tbls); + + assert tbls.size() == 1 : "Failed to determine target table for UPDATE"; + + GridSqlTable tbl = tbls.iterator().next(); + + GridH2Table gridTbl = tbl.dataTable(); + + assert gridTbl != null : "Failed to determine target grid table for UPDATE"; + + Column h2KeyCol = gridTbl.getColumn(GridH2KeyValueRowOnheap.KEY_COL); + + Column h2ValCol = gridTbl.getColumn(GridH2KeyValueRowOnheap.VAL_COL); + + GridSqlColumn keyCol = new GridSqlColumn(h2KeyCol, tbl, h2KeyCol.getName()); + keyCol.resultType(GridSqlType.fromColumn(h2KeyCol)); + + GridSqlColumn valCol = new GridSqlColumn(h2ValCol, tbl, h2ValCol.getName()); + valCol.resultType(GridSqlType.fromColumn(h2ValCol)); + + mapQry.addColumn(keyCol, true); + mapQry.addColumn(valCol, true); + + for (GridSqlColumn c : update.cols()) { + String newColName = Parser.quoteIdentifier("_upd_" + c.columnName()); + // We have to use aliases to cover cases when the user + // wants to update _val field directly (if it's a literal) + GridSqlAlias alias = new GridSqlAlias(newColName, elementOrDefault(update.set().get(c.columnName()), c), true); + alias.resultType(c.resultType()); + mapQry.addColumn(alias, true); + } + + GridSqlElement where = update.where(); + if (keysParamIdx != null) + where = injectKeysFilterParam(where, keyCol, keysParamIdx); + + mapQry.where(where); + mapQry.limit(update.limit()); + + return mapQry; + } + + /** + * Do what we can to compute default value for this column (mimics H2 behavior). + * @see Table#getDefaultValue + * @see Column#validateConvertUpdateSequence + * @param el SQL element. + * @param col Column. + * @return {@link GridSqlConst#NULL}, if {@code el} is null, or {@code el} if + * it's not {@link GridSqlKeyword#DEFAULT}, or computed default value. + */ + private static GridSqlElement elementOrDefault(GridSqlElement el, GridSqlColumn col) { + if (el == null) + return GridSqlConst.NULL; + + if (el != GridSqlKeyword.DEFAULT) + return el; + + Column h2Col = col.column(); + + Expression dfltExpr = h2Col.getDefaultExpression(); + + Value dfltVal; + + try { + dfltVal = dfltExpr != null ? dfltExpr.getValue(null) : null; + } + catch (Exception ignored) { + throw new IgniteSQLException("Failed to evaluate default value for a column " + col.columnName()); + } + + if (dfltVal != null) + return new GridSqlConst(dfltVal); + + int type = h2Col.getType(); + + DataType dt = DataType.getDataType(type); + + if (dt.decimal) + dfltVal = ValueInt.get(0).convertTo(type); + else if (dt.type == Value.TIMESTAMP) + dfltVal = ValueTimestamp.fromMillis(U.currentTimeMillis()); + else if (dt.type == Value.TIME) + dfltVal = ValueTime.fromNanos(0); + else if (dt.type == Value.DATE) + dfltVal = ValueDate.fromMillis(U.currentTimeMillis()); + else + dfltVal = ValueString.get("").convertTo(type); + + return new GridSqlConst(dfltVal); + } + + /** + * Append additional condition to WHERE for it to select only specific keys. + * + * @param where Initial condition. + * @param keyCol Column to base the new condition on. + * @return New condition. + */ + private static GridSqlElement injectKeysFilterParam(GridSqlElement where, GridSqlColumn keyCol, int paramIdx) { + // Yes, we need a subquery for "WHERE _key IN ?" to work with param being an array without dirty query rewriting. + GridSqlSelect sel = new GridSqlSelect(); + + GridSqlFunction from = new GridSqlFunction(GridSqlFunctionType.TABLE); + + sel.from(from); + + GridSqlColumn col = new GridSqlColumn(null, from, null, "TABLE", "_IGNITE_ERR_KEYS"); + + sel.addColumn(col, true); + + GridSqlAlias alias = new GridSqlAlias("_IGNITE_ERR_KEYS", new GridSqlParameter(paramIdx)); + + alias.resultType(keyCol.resultType()); + + from.addChild(alias); + + GridSqlElement e = new GridSqlOperation(GridSqlOperationType.IN, keyCol, new GridSqlSubquery(sel)); + + if (where == null) + return e; + else + return new GridSqlOperation(GridSqlOperationType.AND, where, e); + } + + /** + * @param qry Select. + * @param params Parameters. + * @param target Extracted parameters. + * @param paramIdxs Parameter indexes. + * @return Extracted parameters list. + */ + @SuppressWarnings("unused") + private static List<Object> findParams(GridSqlQuery qry, Object[] params, ArrayList<Object> target, + IntArray paramIdxs) { + if (qry instanceof GridSqlSelect) + return findParams((GridSqlSelect)qry, params, target, paramIdxs); + + GridSqlUnion union = (GridSqlUnion)qry; + + findParams(union.left(), params, target, paramIdxs); + findParams(union.right(), params, target, paramIdxs); + + findParams((GridSqlElement)qry.limit(), params, target, paramIdxs); + findParams((GridSqlElement)qry.offset(), params, target, paramIdxs); + + return target; + } + + /** + * @param qry Select. + * @param params Parameters. + * @param target Extracted parameters. + * @param paramIdxs Parameter indexes. + * @return Extracted parameters list. + */ + private static List<Object> findParams(GridSqlSelect qry, Object[] params, ArrayList<Object> target, + IntArray paramIdxs) { + if (params.length == 0) + return target; + + for (GridSqlAst el : qry.columns(false)) + findParams((GridSqlElement)el, params, target, paramIdxs); + + findParams((GridSqlElement)qry.from(), params, target, paramIdxs); + findParams((GridSqlElement)qry.where(), params, target, paramIdxs); + + // Don't search in GROUP BY and HAVING since they expected to be in select list. + + findParams((GridSqlElement)qry.limit(), params, target, paramIdxs); + findParams((GridSqlElement)qry.offset(), params, target, paramIdxs); + + return target; + } + + /** + * @param el Element. + * @param params Parameters. + * @param target Extracted parameters. + * @param paramIdxs Parameter indexes. + */ + private static void findParams(@Nullable GridSqlElement el, Object[] params, ArrayList<Object> target, + IntArray paramIdxs) { + if (el == null) + return; + + if (el instanceof GridSqlParameter) { + // H2 Supports queries like "select ?5" but first 4 non-existing parameters are need to be set to any value. + // Here we will set them to NULL. + final int idx = ((GridSqlParameter)el).index(); + + while (target.size() < idx) + target.add(null); + + if (params.length <= idx) + throw new IgniteException("Invalid number of query parameters. " + + "Cannot find " + idx + " parameter."); + + Object param = params[idx]; + + if (idx == target.size()) + target.add(param); + else + target.set(idx, param); + + paramIdxs.add(idx); + } + else if (el instanceof GridSqlSubquery) + findParams(((GridSqlSubquery)el).subquery(), params, target, paramIdxs); + else + for (int i = 0; i < el.size(); i++) + findParams((GridSqlElement)el.child(i), params, target, paramIdxs); + } + + /** + * Processes all the tables and subqueries using the given closure. + * + * @param from FROM element. + * @param c Closure each found table and subquery will be passed to. If returns {@code true} the we need to stop. + * @return {@code true} If we have found. + */ + @SuppressWarnings("RedundantCast") + private static boolean findTablesInFrom(GridSqlElement from, IgnitePredicate<GridSqlElement> c) { + if (from == null) + return false; + + if (from instanceof GridSqlTable || from instanceof GridSqlSubquery) + return c.apply(from); + + if (from instanceof GridSqlJoin) { + // Left and right. + if (findTablesInFrom((GridSqlElement)from.child(0), c)) + return true; + + if (findTablesInFrom((GridSqlElement)from.child(1), c)) + return true; + + // We don't process ON condition because it is not a joining part of from here. + return false; + } + else if (from instanceof GridSqlAlias) + return findTablesInFrom((GridSqlElement)from.child(), c); + else if (from instanceof GridSqlFunction) + return false; + + throw new IllegalStateException(from.getClass().getName() + " : " + from.getSQL()); + } + + /** + * @param from From element. + * @param tbls Tables. + */ + public static void collectAllGridTablesInTarget(GridSqlElement from, final Set<GridSqlTable> tbls) { + findTablesInFrom(from, new IgnitePredicate<GridSqlElement>() { + @Override public boolean apply(GridSqlElement el) { + if (el instanceof GridSqlTable) + tbls.add((GridSqlTable)el); + + return false; + } + }); + } + + /** + * @param target Expression to extract the table from. + * @return Back end table for this element. + */ + public static GridSqlTable gridTableForElement(GridSqlElement target) { + Set<GridSqlTable> tbls = new HashSet<>(); + + collectAllGridTablesInTarget(target, tbls); + + if (tbls.size() != 1) + throw new IgniteSQLException("Failed to determine target table", IgniteQueryErrorCode.TABLE_NOT_FOUND); + + return tbls.iterator().next(); + } + } http://git-wip-us.apache.org/repos/asf/ignite/blob/799098c6/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java index 534a164,31dc52d..fa86836 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java @@@ -17,65 -17,108 +17,119 @@@ package org.apache.ignite.internal.processors.query.h2.dml; +import java.util.List; + import org.apache.ignite.IgniteCheckedException; + import org.apache.ignite.binary.BinaryObject; + import org.apache.ignite.binary.BinaryObjectBuilder; + import org.apache.ignite.internal.processors.cache.GridCacheContext; + import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; + import org.apache.ignite.internal.processors.query.GridQueryProperty; + import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; + import org.apache.ignite.internal.processors.query.IgniteSQLException; + import org.apache.ignite.internal.processors.query.QueryUtils; + import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; import org.apache.ignite.internal.util.typedef.F; + import org.apache.ignite.internal.util.typedef.T3; + import org.apache.ignite.lang.IgniteBiTuple; + import org.h2.table.Column; + import org.jetbrains.annotations.Nullable; + + import java.util.HashMap; + import java.util.List; + import java.util.Map; + + import static org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.DEFAULT_COLUMNS_COUNT; /** * Update plan - where to take data to update cache from and how to construct new keys and values, if needed. */ public final class UpdatePlan { /** Initial statement to drive the rest of the logic. */ - public final UpdateMode mode; + private final UpdateMode mode; /** Target table to be affected by initial DML statement. */ - public final GridH2Table tbl; + private final GridH2Table tbl; /** Column names to set or update. */ - public final String[] colNames; + private final String[] colNames; - /** - * Expected column types to set or insert/merge. - * @see org.h2.value.Value - */ - public final int[] colTypes; + /** Column types to set for insert/merge. */ + private final int[] colTypes; /** Method to create key for INSERT or MERGE, ignored for UPDATE and DELETE. */ - public final KeyValueSupplier keySupplier; + private final KeyValueSupplier keySupplier; /** Method to create value to put to cache, ignored for DELETE. */ - public final KeyValueSupplier valSupplier; + private final KeyValueSupplier valSupplier; - /** Index of key column, if it's explicitly mentioned in column list of MERGE or INSERT, - * ignored for UPDATE and DELETE. */ - public final int keyColIdx; + /** Key column index. */ + private final int keyColIdx; - /** Index of value column, if it's explicitly mentioned in column list. Ignored for UPDATE and DELETE. */ - public final int valColIdx; + /** Value column index. */ + private final int valColIdx; /** SELECT statement built upon initial DML statement. */ - public final String selectQry; + private final String selectQry; /** Subquery flag - {@code true} if {@link #selectQry} is an actual subquery that retrieves data from some cache. */ - public final boolean isLocSubqry; + private final boolean isLocSubqry; + /** */ + public final List<List<FastUpdateArgument>> rows; + /** Number of rows in rows based MERGE or INSERT. */ - public final int rowsNum; + private final int rowsNum; /** Arguments for fast UPDATE or DELETE. */ - public final FastUpdateArguments fastUpdateArgs; + private final FastUpdate fastUpdate; - /** */ + /** Additional info for distributed update. */ + private final DmlDistributedPlanInfo distributed; + ++ /* + private UpdatePlan(UpdateMode mode, GridH2Table tbl, String[] colNames, int[] colTypes, KeyValueSupplier keySupplier, - KeyValueSupplier valSupplier, int keyColIdx, int valColIdx, String selectQry, boolean isLocSubqry, - List<List<FastUpdateArgument>> rows, int rowsNum, FastUpdateArguments fastUpdateArgs) { ++ KeyValueSupplier valSupplier, int keyColIdx, int valColIdx, String selectQry, boolean isLocSubqry, ++ List<List<FastUpdateArgument>> rows, int rowsNum, FastUpdateArguments fastUpdateArgs) { ++ */ ++ + /** + * Constructor. + * + * @param mode Mode. + * @param tbl Table. + * @param colNames Column names. + * @param colTypes Column types. + * @param keySupplier Key supplier. + * @param valSupplier Value supplier. + * @param keyColIdx Key column index. + * @param valColIdx value column index. + * @param selectQry Select query. + * @param isLocSubqry Local subquery flag. + * @param rowsNum Rows number. + * @param fastUpdate Fast update (if any). + * @param distributed Distributed plan (if any) + */ + public UpdatePlan( + UpdateMode mode, + GridH2Table tbl, + String[] colNames, + int[] colTypes, + KeyValueSupplier keySupplier, + KeyValueSupplier valSupplier, + int keyColIdx, + int valColIdx, + String selectQry, + boolean isLocSubqry, + int rowsNum, + @Nullable FastUpdate fastUpdate, + @Nullable DmlDistributedPlanInfo distributed + ) { this.colNames = colNames; this.colTypes = colTypes; + this.rows = rows; this.rowsNum = rowsNum; + assert mode != null; assert tbl != null; @@@ -87,48 -130,260 +141,298 @@@ this.valColIdx = valColIdx; this.selectQry = selectQry; this.isLocSubqry = isLocSubqry; - this.fastUpdateArgs = fastUpdateArgs; + this.fastUpdate = fastUpdate; + this.distributed = distributed; } - /** */ + /** + * Constructor for delete operation or fast update. + * + * @param mode Mode. + * @param tbl Table. + * @param selectQry Select query. + * @param fastUpdate Fast update arguments (if any). + * @param distributed Distributed plan (if any) + */ + public UpdatePlan( + UpdateMode mode, + GridH2Table tbl, + String selectQry, + @Nullable FastUpdate fastUpdate, + @Nullable DmlDistributedPlanInfo distributed + ) { + this( + mode, + tbl, + null, + null, + null, + null, + -1, + -1, + selectQry, + false, + 0, + fastUpdate, + distributed + ); + } + + /** + * Convert a row into key-value pair. + * + * @param row Row to process. + * @throws IgniteCheckedException if failed. + */ + public IgniteBiTuple<?, ?> processRow(List<?> row) throws IgniteCheckedException { + GridH2RowDescriptor rowDesc = tbl.rowDescriptor(); + GridQueryTypeDescriptor desc = rowDesc.type(); + + GridCacheContext cctx = rowDesc.context(); + + Object key = keySupplier.apply(row); + + if (QueryUtils.isSqlType(desc.keyClass())) { + assert keyColIdx != -1; + + key = DmlUtils.convert(key, rowDesc, desc.keyClass(), colTypes[keyColIdx]); + } + + Object val = valSupplier.apply(row); + + if (QueryUtils.isSqlType(desc.valueClass())) { + assert valColIdx != -1; + + val = DmlUtils.convert(val, rowDesc, desc.valueClass(), colTypes[valColIdx]); + } + + if (key == null) { + if (F.isEmpty(desc.keyFieldName())) + throw new IgniteSQLException("Key for INSERT or MERGE must not be null", IgniteQueryErrorCode.NULL_KEY); + else + throw new IgniteSQLException("Null value is not allowed for column '" + desc.keyFieldName() + "'", + IgniteQueryErrorCode.NULL_KEY); + } + + if (val == null) { + if (F.isEmpty(desc.valueFieldName())) + throw new IgniteSQLException("Value for INSERT, MERGE, or UPDATE must not be null", + IgniteQueryErrorCode.NULL_VALUE); + else + throw new IgniteSQLException("Null value is not allowed for column '" + desc.valueFieldName() + "'", + IgniteQueryErrorCode.NULL_VALUE); + } + + Map<String, Object> newColVals = new HashMap<>(); + + for (int i = 0; i < colNames.length; i++) { + if (i == keyColIdx || i == valColIdx) + continue; + + String colName = colNames[i]; + + GridQueryProperty prop = desc.property(colName); + + assert prop != null; + + Class<?> expCls = prop.type(); + + newColVals.put(colName, DmlUtils.convert(row.get(i), rowDesc, expCls, colTypes[i])); + } + + // We update columns in the order specified by the table for a reason - table's + // column order preserves their precedence for correct update of nested properties. + Column[] cols = tbl.getColumns(); + + // First 3 columns are _key, _val and _ver. Skip 'em. + for (int i = DEFAULT_COLUMNS_COUNT; i < cols.length; i++) { + if (tbl.rowDescriptor().isKeyValueOrVersionColumn(i)) + continue; + + String colName = cols[i].getName(); + + if (!newColVals.containsKey(colName)) + continue; + + Object colVal = newColVals.get(colName); + + desc.setValue(colName, key, val, colVal); + } + + if (cctx.binaryMarshaller()) { + if (key instanceof BinaryObjectBuilder) + key = ((BinaryObjectBuilder) key).build(); + + if (val instanceof BinaryObjectBuilder) + val = ((BinaryObjectBuilder) val).build(); + } + + desc.validateKeyAndValue(key, val); + + return new IgniteBiTuple<>(key, val); + } + + /** + * Convert a row into value. + * + * @param row Row to process. + * @throws IgniteCheckedException if failed. + */ + public T3<Object, Object, Object> processRowForUpdate(List<?> row) throws IgniteCheckedException { + GridH2RowDescriptor rowDesc = tbl.rowDescriptor(); + GridQueryTypeDescriptor desc = rowDesc.type(); + + GridCacheContext cctx = rowDesc.context(); + + boolean hasNewVal = (valColIdx != -1); + + boolean hasProps = !hasNewVal || colNames.length > 1; + + Object key = row.get(0); + + Object oldVal = row.get(1); + + if (cctx.binaryMarshaller() && !(oldVal instanceof BinaryObject)) + oldVal = cctx.grid().binary().toBinary(oldVal); + + Object newVal; + + Map<String, Object> newColVals = new HashMap<>(); + + for (int i = 0; i < colNames.length; i++) { + if (hasNewVal && i == valColIdx - 2) + continue; + + GridQueryProperty prop = tbl.rowDescriptor().type().property(colNames[i]); + + assert prop != null : "Unknown property: " + colNames[i]; + + newColVals.put(colNames[i], DmlUtils.convert(row.get(i + 2), rowDesc, prop.type(), colTypes[i])); + } + + newVal = valSupplier.apply(row); + + if (newVal == null) + throw new IgniteSQLException("New value for UPDATE must not be null", IgniteQueryErrorCode.NULL_VALUE); + + // Skip key and value - that's why we start off with 3rd column + for (int i = 0; i < tbl.getColumns().length - DEFAULT_COLUMNS_COUNT; i++) { + Column c = tbl.getColumn(i + DEFAULT_COLUMNS_COUNT); + + if (rowDesc.isKeyValueOrVersionColumn(c.getColumnId())) + continue; + + GridQueryProperty prop = desc.property(c.getName()); + + if (prop.key()) + continue; // Don't get values of key's columns - we won't use them anyway + + boolean hasNewColVal = newColVals.containsKey(c.getName()); + + if (!hasNewColVal) + continue; + + Object colVal = newColVals.get(c.getName()); + + // UPDATE currently does not allow to modify key or its fields, so we must be safe to pass null as key. + rowDesc.setColumnValue(null, newVal, colVal, i); + } + + if (cctx.binaryMarshaller() && hasProps) { + assert newVal instanceof BinaryObjectBuilder; + + newVal = ((BinaryObjectBuilder) newVal).build(); + } + + desc.validateKeyAndValue(key, newVal); + + return new T3<>(key, oldVal, newVal); + } + + /** + * @return Update mode. + */ + public UpdateMode mode() { + return mode; + } + + /** + * @return Cache context. + */ + public GridCacheContext cacheContext() { + return tbl.cache(); + } + + /** + * @return Distributed plan info (for skip-reducer mode). + */ + @Nullable public DmlDistributedPlanInfo distributedPlan() { + return distributed; + } + + /** + * @return Row count. + */ + public int rowCount() { + return rowsNum; + } + + /** + * @return Select query. + */ + public String selectQuery() { + return selectQry; + } + + /** + * @return Local subquery flag. + */ + @Nullable public boolean isLocalSubquery() { + return isLocSubqry; + } + + /** + * @return Fast update. + */ + @Nullable public FastUpdate fastUpdate() { + return fastUpdate; + } ++ ++ /* + public static UpdatePlan forMerge(GridH2Table tbl, String[] colNames, int[] colTypes, KeyValueSupplier keySupplier, - KeyValueSupplier valSupplier, int keyColIdx, int valColIdx, String selectQry, boolean isLocSubqry, - List<List<FastUpdateArgument>> rows, int rowsNum) { ++ KeyValueSupplier valSupplier, int keyColIdx, int valColIdx, String selectQry, boolean isLocSubqry, ++ List<List<FastUpdateArgument>> rows, int rowsNum) { + assert !F.isEmpty(colNames); + + return new UpdatePlan(UpdateMode.MERGE, tbl, colNames, colTypes, keySupplier, valSupplier, keyColIdx, valColIdx, + selectQry, isLocSubqry, rows, rowsNum, null); + } + - /** */ + public static UpdatePlan forInsert(GridH2Table tbl, String[] colNames, int[] colTypes, KeyValueSupplier keySupplier, - KeyValueSupplier valSupplier, int keyColIdx, int valColIdx, String selectQry, boolean isLocSubqry, - List<List<FastUpdateArgument>> rows, int rowsNum) { ++ KeyValueSupplier valSupplier, int keyColIdx, int valColIdx, String selectQry, boolean isLocSubqry, ++ List<List<FastUpdateArgument>> rows, int rowsNum) { + assert !F.isEmpty(colNames); + + return new UpdatePlan(UpdateMode.INSERT, tbl, colNames, colTypes, keySupplier, valSupplier, keyColIdx, valColIdx, + selectQry, isLocSubqry, rows, rowsNum, null); + } + - /** */ + public static UpdatePlan forUpdate(GridH2Table tbl, String[] colNames, int[] colTypes, KeyValueSupplier valSupplier, - int valColIdx, String selectQry) { ++ int valColIdx, String selectQry) { + assert !F.isEmpty(colNames); + + return new UpdatePlan(UpdateMode.UPDATE, tbl, colNames, colTypes, null, valSupplier, -1, valColIdx, selectQry, + false, null, 0, null); + } + - /** */ + public static UpdatePlan forDelete(GridH2Table tbl, String selectQry) { + return new UpdatePlan(UpdateMode.DELETE, tbl, null, null, null, null, -1, -1, selectQry, false, null, 0, null); + } + - /** */ + public static UpdatePlan forFastUpdate(UpdateMode mode, GridH2Table tbl, FastUpdateArguments fastUpdateArgs) { + assert mode == UpdateMode.UPDATE || mode == UpdateMode.DELETE; + + return new UpdatePlan(mode, tbl, null, null, null, null, -1, -1, null, false, null, 0, fastUpdateArgs); + } - ++ */ } http://git-wip-us.apache.org/repos/asf/ignite/blob/799098c6/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java index cf6cd88,a551639..52efd6d --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java @@@ -18,7 -18,9 +18,10 @@@ package org.apache.ignite.internal.processors.query.h2.dml; import java.lang.reflect.Constructor; + import java.sql.Connection; + import java.sql.PreparedStatement; + import java.sql.SQLException; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; @@@ -33,18 -37,17 +38,19 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.processors.query.h2.DmlStatementsProcessor; + import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; - import org.apache.ignite.internal.processors.query.h2.sql.DmlAstUtils; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlColumn; +import org.apache.ignite.internal.processors.query.h2.sql.GridSqlConst; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlDelete; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlElement; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlInsert; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlMerge; +import org.apache.ignite.internal.processors.query.h2.sql.GridSqlParameter; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuery; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser; + import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlSelect; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlTable; @@@ -122,11 -135,7 +142,11 @@@ public final class UpdatePlanBuilder desc = tbl.dataTable().rowDescriptor(); cols = ins.columns(); - sel = DmlAstUtils.selectForInsertOrMerge(cols, ins.rows(), ins.query(), desc); + sel = DmlAstUtils.selectForInsertOrMerge(cols, ins.rows(), ins.query()); + + if (sel == null) + elRows = ins.rows(); + isTwoStepSubqry = (ins.query() != null); rowsNum = isTwoStepSubqry ? 0 : ins.rows().size(); } @@@ -135,53 -144,21 +155,47 @@@ target = merge.into(); - tbl = gridTableForElement(target); + tbl = DmlAstUtils.gridTableForElement(target); desc = tbl.dataTable().rowDescriptor(); - // This check also protects us from attempts to update key or its fields directly - - // when no key except cache key can be used, it will serve only for uniqueness checks, - // not for updates, and hence will allow putting new pairs only. - // We don't quote _key and _val column names on CREATE TABLE, so they are always uppercase here. - GridSqlColumn[] keys = merge.keys(); - if (keys.length != 1 || !desc.isKeyColumn(tbl.dataTable().getColumn(keys[0].columnName()).getColumnId())) - throw new CacheException("SQL MERGE does not support arbitrary keys"); - cols = merge.columns(); - sel = DmlAstUtils.selectForInsertOrMerge(cols, merge.rows(), merge.query(), desc); + sel = DmlAstUtils.selectForInsertOrMerge(cols, merge.rows(), merge.query()); + + if (sel == null) + elRows = merge.rows(); + isTwoStepSubqry = (merge.query() != null); rowsNum = isTwoStepSubqry ? 0 : merge.rows().size(); } - else throw new IgniteSQLException("Unexpected DML operation [cls=" + stmt.getClass().getName() + ']', + else { + throw new IgniteSQLException("Unexpected DML operation [cls=" + stmt.getClass().getName() + ']', IgniteQueryErrorCode.UNEXPECTED_OPERATION); + } + if (elRows != null) { + assert sel == null; + + rows = new ArrayList<>(elRows.size()); + + for (GridSqlElement[] elRow : elRows) { + List<FastUpdateArgument> row = new ArrayList<>(cols.length); + + for (GridSqlElement e : elRow) { + if (e instanceof GridSqlConst) + row.add(new FastUpdateArguments.ValueArgument(((GridSqlConst) e).value().getObject())); + else if (e instanceof GridSqlParameter) + row.add(new FastUpdateArguments.ParamArgument(((GridSqlParameter) e).index())); + else + throw new IgniteSQLException("Unexpected element type: " + e.getClass().getSimpleName(), + IgniteQueryErrorCode.UNEXPECTED_ELEMENT_TYPE); + } + + rows.add(row); + } + } + // Let's set the flag only for subqueries that have their FROM specified. - isTwoStepSubqry = (isTwoStepSubqry && (sel instanceof GridSqlUnion || + isTwoStepSubqry &= (sel != null && (sel instanceof GridSqlUnion || (sel instanceof GridSqlSelect && ((GridSqlSelect) sel).from() != null))); int keyColIdx = -1; @@@ -233,12 -210,28 +247,33 @@@ KeyValueSupplier keySupplier = createSupplier(cctx, desc.type(), keyColIdx, hasKeyProps, true, false); KeyValueSupplier valSupplier = createSupplier(cctx, desc.type(), valColIdx, hasValProps, false, false); - if (stmt instanceof GridSqlMerge) - return UpdatePlan.forMerge(tbl.dataTable(), colNames, colTypes, keySupplier, valSupplier, keyColIdx, - valColIdx, sel != null ? sel.getSQL() : null, !isTwoStepSubqry, rows, rowsNum); - else - return UpdatePlan.forInsert(tbl.dataTable(), colNames, colTypes, keySupplier, valSupplier, keyColIdx, - valColIdx, sel != null ? sel.getSQL() : null, !isTwoStepSubqry, rows, rowsNum); + String selectSql = sel.getSQL(); + + DmlDistributedPlanInfo distributed = (rowsNum == 0 && !F.isEmpty(selectSql)) ? + checkPlanCanBeDistributed(idx, conn, fieldsQuery, loc, selectSql, tbl.dataTable().cacheName()) : null; + + UpdateMode mode = stmt instanceof GridSqlMerge ? UpdateMode.MERGE : UpdateMode.INSERT; + + return new UpdatePlan( + mode, + tbl.dataTable(), + colNames, + colTypes, + keySupplier, + valSupplier, + keyColIdx, + valColIdx, + selectSql, + !isTwoStepSubqry, + rowsNum, + null, + distributed + ); ++ ++ /* ++ if (sel == null) ++ elRows = merge.rows(); ++ */ } /**
