http://git-wip-us.apache.org/repos/asf/ignite/blob/336ad01a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java index 0abe2cb,98d123f..09b4a27 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java @@@ -58,8 -57,9 +57,10 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.query.GridQueryProperty; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.processors.query.IgniteSQLException; + import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.internal.processors.query.h2.dml.FastUpdateArgument; import org.apache.ignite.internal.processors.query.h2.dml.FastUpdateArguments; + import org.apache.ignite.internal.processors.query.h2.dml.UpdateMode; import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan; import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder; import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor; @@@ -238,72 -350,42 +351,69 @@@ public class DmlStatementsProcessor if (plan.fastUpdateArgs != null) { assert F.isEmpty(failedKeys) && errKeysPos == null; - return new UpdateResult(doSingleUpdate(plan, params), X.EMPTY_OBJECT_ARRAY); + return doFastUpdate(plan, fieldsQry.getArgs()); } - assert !F.isEmpty(plan.selectQry); + assert !F.isEmpty(plan.rows) ^ !F.isEmpty(plan.selectQry); - QueryCursorImpl<List<?>> cur; + Iterable<List<?>> cur; // Do a two-step query only if locality flag is not set AND if plan's SELECT corresponds to an actual // subquery and not some dummy stuff like "select 1, 2, 3;" if (!loc && !plan.isLocSubqry) { + assert !F.isEmpty(plan.selectQry); + SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQry, fieldsQry.isCollocated()) - .setArgs(params) + .setArgs(fieldsQry.getArgs()) .setDistributedJoins(fieldsQry.isDistributedJoins()) .setEnforceJoinOrder(fieldsQry.isEnforceJoinOrder()) .setLocal(fieldsQry.isLocal()) .setPageSize(fieldsQry.getPageSize()) .setTimeout(fieldsQry.getTimeout(), TimeUnit.MILLISECONDS); - cur = indexing.queryTwoStep(cctx, newFieldsQry, cancel); + cur = (QueryCursorImpl<List<?>>) idx.queryDistributedSqlFields(schemaName, newFieldsQry, true, cancel, + mainCacheId); } - else { + else if (F.isEmpty(plan.rows)) { - final GridQueryFieldsResult res = indexing.queryLocalSqlFields(cctx.name(), plan.selectQry, F.asList(params), - filters, fieldsQry.isEnforceJoinOrder(), fieldsQry.getTimeout(), cancel); + final GridQueryFieldsResult res = idx.queryLocalSqlFields(schemaName, plan.selectQry, + F.asList(fieldsQry.getArgs()), filters, fieldsQry.isEnforceJoinOrder(), fieldsQry.getTimeout(), cancel); - QueryCursorImpl<List<?>> resCur = new QueryCursorImpl<>(new Iterable<List<?>>() { + cur = new QueryCursorImpl<>(new Iterable<List<?>>() { @Override public Iterator<List<?>> iterator() { try { - return new GridQueryCacheObjectsIterator(res.iterator(), cctx, cctx.keepBinary()); + return new GridQueryCacheObjectsIterator(res.iterator(), idx.objectContext(), true); } catch (IgniteCheckedException e) { throw new IgniteException(e); } } }, cancel); - - resCur.fieldsMeta(res.metaData()); - - cur = resCur; } + else { + assert plan.rowsNum > 0 && !F.isEmpty(plan.colNames); + + List<List<?>> args = new ArrayList<>(plan.rowsNum); + + GridH2RowDescriptor desc = plan.tbl.rowDescriptor(); + + for (List<FastUpdateArgument> argRow : plan.rows) { + List<Object> row = new ArrayList<>(); + + for (int j = 0; j < plan.colNames.length; j++) { + Object colVal = argRow.get(j).apply(fieldsQry.getArgs()); + + if (j == plan.keyColIdx || j == plan.valColIdx) - colVal = convert(colVal, j == plan.keyColIdx ? desc.type().keyClass() : desc.type().valueClass(), - desc); ++ colVal = convert(colVal, desc, j == plan.keyColIdx ? desc.type().keyClass() : ++ desc.type().valueClass(), plan.colTypes[j]); + + row.add(colVal); + } + + args.add(row); + } + + cur = args; + } int pageSize = loc ? 0 : fieldsQry.getPageSize();
http://git-wip-us.apache.org/repos/asf/ignite/blob/336ad01a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/FastUpdateArguments.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/FastUpdateArguments.java index 056dfaa,cb47704..257014c --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/FastUpdateArguments.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/FastUpdateArguments.java @@@ -50,40 -50,4 +50,40 @@@ public final class FastUpdateArguments return null; } }; + + /** Simple constant value based operand. */ + public final static class ValueArgument implements FastUpdateArgument { + /** Value to return. */ + private final Object val; + + /** */ + public ValueArgument(Object val) { + this.val = val; + } + + /** {@inheritDoc} */ + @Override public Object apply(Object[] arg) throws IgniteCheckedException { + return val; + } + } + - /** Simple constant value based operand. */ ++ /** User given param value operand. */ + public final static class ParamArgument implements FastUpdateArgument { - /** Value to return. */ ++ /** Index of param to take. */ + private final int paramIdx; + + /** */ + public ParamArgument(int paramIdx) { + assert paramIdx >= 0; + + this.paramIdx = paramIdx; + } + + /** {@inheritDoc} */ + @Override public Object apply(Object[] arg) throws IgniteCheckedException { + assert arg.length > paramIdx; + + return arg[paramIdx]; + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/336ad01a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java index 9bd1ecf,b81ac60..534a164 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java @@@ -53,9 -58,6 +59,9 @@@ public final class UpdatePlan /** Subquery flag - {@code true} if {@link #selectQry} is an actual subquery that retrieves data from some cache. */ public final boolean isLocSubqry; + /** */ - public final Iterable<List<FastUpdateArgument>> rows; ++ public final List<List<FastUpdateArgument>> rows; + /** Number of rows in rows based MERGE or INSERT. */ public final int rowsNum; @@@ -63,11 -65,11 +69,12 @@@ public final FastUpdateArguments fastUpdateArgs; /** */ - private UpdatePlan(UpdateMode mode, GridH2Table tbl, String[] colNames, KeyValueSupplier keySupplier, + private UpdatePlan(UpdateMode mode, GridH2Table tbl, String[] colNames, int[] colTypes, KeyValueSupplier keySupplier, KeyValueSupplier valSupplier, int keyColIdx, int valColIdx, String selectQry, boolean isLocSubqry, - Iterable<List<FastUpdateArgument>> rows, int rowsNum, FastUpdateArguments fastUpdateArgs) { - int rowsNum, FastUpdateArguments fastUpdateArgs) { ++ List<List<FastUpdateArgument>> rows, int rowsNum, FastUpdateArguments fastUpdateArgs) { this.colNames = colNames; + this.colTypes = colTypes; + this.rows = rows; this.rowsNum = rowsNum; assert mode != null; assert tbl != null; @@@ -84,23 -86,22 +91,23 @@@ } /** */ - public static UpdatePlan forMerge(GridH2Table tbl, String[] colNames, KeyValueSupplier keySupplier, + public static UpdatePlan forMerge(GridH2Table tbl, String[] colNames, int[] colTypes, KeyValueSupplier keySupplier, KeyValueSupplier valSupplier, int keyColIdx, int valColIdx, String selectQry, boolean isLocSubqry, - Iterable<List<FastUpdateArgument>> rows, int rowsNum) { - int rowsNum) { ++ List<List<FastUpdateArgument>> rows, int rowsNum) { assert !F.isEmpty(colNames); - return new UpdatePlan(UpdateMode.MERGE, tbl, colNames, keySupplier, valSupplier, keyColIdx, valColIdx, + return new UpdatePlan(UpdateMode.MERGE, tbl, colNames, colTypes, keySupplier, valSupplier, keyColIdx, valColIdx, - selectQry, isLocSubqry, rowsNum, null); + selectQry, isLocSubqry, rows, rowsNum, null); } /** */ - public static UpdatePlan forInsert(GridH2Table tbl, String[] colNames, KeyValueSupplier keySupplier, + public static UpdatePlan forInsert(GridH2Table tbl, String[] colNames, int[] colTypes, KeyValueSupplier keySupplier, - KeyValueSupplier valSupplier, int keyColIdx, int valColIdx, String selectQry, boolean isLocSubqry, int rowsNum) { + KeyValueSupplier valSupplier, int keyColIdx, int valColIdx, String selectQry, boolean isLocSubqry, - Iterable<List<FastUpdateArgument>> rows, int rowsNum) { ++ List<List<FastUpdateArgument>> rows, int rowsNum) { assert !F.isEmpty(colNames); - return new UpdatePlan(UpdateMode.INSERT, tbl, colNames, keySupplier, valSupplier, keyColIdx, valColIdx, + return new UpdatePlan(UpdateMode.INSERT, tbl, colNames, colTypes, keySupplier, valSupplier, keyColIdx, valColIdx, - selectQry, isLocSubqry, rowsNum, null); + selectQry, isLocSubqry, rows, rowsNum, null); } /** */ @@@ -108,20 -109,20 +115,20 @@@ int valColIdx, String selectQry) { assert !F.isEmpty(colNames); - return new UpdatePlan(UpdateMode.UPDATE, tbl, colNames, null, valSupplier, -1, valColIdx, selectQry, + return new UpdatePlan(UpdateMode.UPDATE, tbl, colNames, colTypes, null, valSupplier, -1, valColIdx, selectQry, - false, 0, null); + false, null, 0, null); } /** */ public static UpdatePlan forDelete(GridH2Table tbl, String selectQry) { - return new UpdatePlan(UpdateMode.DELETE, tbl, null, null, null, -1, -1, selectQry, false, null, 0, null); - return new UpdatePlan(UpdateMode.DELETE, tbl, null, null, null, null, -1, -1, selectQry, false, 0, null); ++ return new UpdatePlan(UpdateMode.DELETE, tbl, null, null, null, null, -1, -1, selectQry, false, null, 0, null); } /** */ public static UpdatePlan forFastUpdate(UpdateMode mode, GridH2Table tbl, FastUpdateArguments fastUpdateArgs) { assert mode == UpdateMode.UPDATE || mode == UpdateMode.DELETE; - return new UpdatePlan(mode, tbl, null, null, null, -1, -1, null, false, null, 0, fastUpdateArgs); - return new UpdatePlan(mode, tbl, null, null, null, null, -1, -1, null, false, 0, fastUpdateArgs); ++ return new UpdatePlan(mode, tbl, null, null, null, null, -1, -1, null, false, null, 0, fastUpdateArgs); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/336ad01a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java index 8700b9a,b304109..cf6cd88 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java @@@ -230,11 -197,11 +234,11 @@@ public final class UpdatePlanBuilder KeyValueSupplier valSupplier = createSupplier(cctx, desc.type(), valColIdx, hasValProps, false, false); if (stmt instanceof GridSqlMerge) - return UpdatePlan.forMerge(tbl.dataTable(), colNames, keySupplier, valSupplier, keyColIdx, + return UpdatePlan.forMerge(tbl.dataTable(), colNames, colTypes, keySupplier, valSupplier, keyColIdx, - valColIdx, sel.getSQL(), !isTwoStepSubqry, rowsNum); + valColIdx, sel != null ? sel.getSQL() : null, !isTwoStepSubqry, rows, rowsNum); else - return UpdatePlan.forInsert(tbl.dataTable(), colNames, keySupplier, valSupplier, keyColIdx, + return UpdatePlan.forInsert(tbl.dataTable(), colNames, colTypes, keySupplier, valSupplier, keyColIdx, - valColIdx, sel.getSQL(), !isTwoStepSubqry, rowsNum); + valColIdx, sel != null ? sel.getSQL() : null, !isTwoStepSubqry, rows, rowsNum); } /** @@@ -296,7 -265,10 +302,10 @@@ for (int i = 0; i < updatedCols.size(); i++) { colNames[i] = updatedCols.get(i).columnName(); - if (VAL_FIELD_NAME.equals(colNames[i])) + colTypes[i] = updatedCols.get(i).resultType().type(); + - Column column = updatedCols.get(i).column(); - if (desc.isValueColumn(column.getColumnId())) ++ Column col = updatedCols.get(i).column(); ++ if (desc.isValueColumn(col.getColumnId())) valColIdx = i; } http://git-wip-us.apache.org/repos/asf/ignite/blob/336ad01a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/DmlAstUtils.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/DmlAstUtils.java index efc7f32,5a1d412..2cdb93c --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/DmlAstUtils.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/DmlAstUtils.java @@@ -21,9 -21,10 +21,9 @@@ import java.util.ArrayList import java.util.HashSet; import java.util.List; import java.util.Set; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.internal.processors.query.IgniteSQLException; - import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; + import org.apache.ignite.internal.processors.query.h2.dml.FastUpdateArgument; import org.apache.ignite.internal.processors.query.h2.dml.FastUpdateArguments; import org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow; import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
