Native batching prototype.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/03f9fe71
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/03f9fe71
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/03f9fe71

Branch: refs/heads/ignite-6022-proto
Commit: 03f9fe71ed1e41353e983cc8993b7b369cd91936
Parents: dd70a84
Author: devozerov <[email protected]>
Authored: Mon Dec 18 14:15:41 2017 +0300
Committer: devozerov <[email protected]>
Committed: Mon Dec 18 14:15:41 2017 +0300

----------------------------------------------------------------------
 .../cache/query/SqlFieldsQueryEx.java           |  44 ++++-
 .../odbc/jdbc/JdbcRequestHandler.java           |  42 +++--
 .../query/h2/DmlStatementsProcessor.java        | 178 ++++++++++++++++++-
 .../processors/query/h2/IgniteH2Indexing.java   |  17 +-
 .../processors/query/h2/dml/DmlUtils.java       |  19 ++
 .../processors/query/h2/dml/UpdatePlan.java     |  34 +++-
 .../query/h2/dml/UpdatePlanBuilder.java         |   2 +-
 7 files changed, 303 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/03f9fe71/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java
index c5f786e..fb098a7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.processors.cache.query;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 
@@ -33,13 +35,23 @@ public final class SqlFieldsQueryEx extends SqlFieldsQuery {
     /** Whether server side DML should be enabled. */
     private boolean skipReducerOnUpdate;
 
+    /** Batched arguments. */
+    private List<Object[]> batchedArgs;
+
+    public SqlFieldsQueryEx(String sql, Boolean isQry) {
+        super(sql);
+
+        this.isQry = isQry;
+    }
+
     /**
      * @param sql SQL query.
      * @param isQry Flag indicating whether this object denotes a query or an 
update operation.
      */
-    public SqlFieldsQueryEx(String sql, Boolean isQry) {
-        super(sql);
-        this.isQry = isQry;
+    public SqlFieldsQueryEx(String sql, Boolean isQry, int batchedArgsSize) {
+        this(sql, isQry);
+
+        this.batchedArgs = new ArrayList<>(batchedArgsSize);
     }
 
     /**
@@ -50,6 +62,7 @@ public final class SqlFieldsQueryEx extends SqlFieldsQuery {
 
         this.isQry = qry.isQry;
         this.skipReducerOnUpdate = qry.skipReducerOnUpdate;
+        this.batchedArgs = qry.batchedArgs;
     }
 
     /**
@@ -151,6 +164,31 @@ public final class SqlFieldsQueryEx extends SqlFieldsQuery 
{
         return skipReducerOnUpdate;
     }
 
+    /**
+     * Add batched arguments.
+     *
+     * @param args Arguments.
+     */
+    public void addBatchedArgs(Object[] args) {
+        if (batchedArgs == null)
+            batchedArgs = new ArrayList<>();
+
+        batchedArgs.add(args);
+    }
+
+    /**
+     * Batched arguments.
+     *
+     * @return Batched arguments.
+     */
+    public List<Object[]> batchedArgs() {
+        return batchedArgs;
+    }
+
+    public void clearBatchedArgs() {
+        batchedArgs = null;
+    }
+
     /** {@inheritDoc} */
     @Override public SqlFieldsQuery copy() {
         return new SqlFieldsQueryEx(this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/03f9fe71/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
index e3b6f5b..2530360 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
@@ -475,28 +475,45 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
         if (F.isEmpty(schemaName))
             schemaName = QueryUtils.DFLT_SCHEMA;
 
+        int qryCnt = req.queries().size();
+
         int successQueries = 0;
-        int updCnts[] = new int[req.queries().size()];
+        int updCnts[] = new int[qryCnt];
 
         try {
-            String sql = null;
+            SqlFieldsQueryEx qry = null;
 
             for (JdbcQuery q : req.queries()) {
-                if (q.sql() != null)
-                    sql = q.sql();
+                if (q.sql() != null) {
+                    if (qry != null) {
+                        QueryCursorImpl<List<?>> qryCur = 
(QueryCursorImpl<List<?>>)ctx.query()
+                            .querySqlFieldsNoCache(qry, true, true).get(0);
+
+                        assert !qryCur.isQuery();
+
+                        List<List<?>> items = qryCur.getAll();
+
+                        // TODO: Set on correct positions.
+                        updCnts[successQueries++] = 
((Long)items.get(0).get(0)).intValue();
+                    }
+
+                    qry = new SqlFieldsQueryEx(q.sql(), false, qryCnt);
 
-                SqlFieldsQuery qry = new SqlFieldsQueryEx(sql, false);
+                    qry.setDistributedJoins(distributedJoins);
+                    qry.setEnforceJoinOrder(enforceJoinOrder);
+                    qry.setCollocated(collocated);
+                    qry.setReplicatedOnly(replicatedOnly);
+                    qry.setLazy(lazy);
 
-                qry.setArgs(q.args());
+                    qry.setSchema(schemaName);
+                }
 
-                qry.setDistributedJoins(distributedJoins);
-                qry.setEnforceJoinOrder(enforceJoinOrder);
-                qry.setCollocated(collocated);
-                qry.setReplicatedOnly(replicatedOnly);
-                qry.setLazy(lazy);
+                assert qry != null;
 
-                qry.setSchema(schemaName);
+                qry.addBatchedArgs(q.args());
+            }
 
+            if (qry != null) {
                 QueryCursorImpl<List<?>> qryCur = 
(QueryCursorImpl<List<?>>)ctx.query()
                     .querySqlFieldsNoCache(qry, true, true).get(0);
 
@@ -504,6 +521,7 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
 
                 List<List<?>> items = qryCur.getAll();
 
+                // TODO: Set on correct positions.
                 updCnts[successQueries++] = 
((Long)items.get(0).get(0)).intValue();
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/03f9fe71/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
index 9a6b0af..df14c85 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -22,6 +22,7 @@ import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -42,6 +43,7 @@ import 
org.apache.ignite.internal.processors.cache.CacheOperationContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
 import org.apache.ignite.internal.processors.odbc.SqlStateCode;
 import 
org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
@@ -50,6 +52,7 @@ import 
org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.h2.dml.DmlBatchSender;
 import 
org.apache.ignite.internal.processors.query.h2.dml.DmlDistributedPlanInfo;
+import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils;
 import org.apache.ignite.internal.processors.query.h2.dml.UpdateMode;
 import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
 import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder;
@@ -189,6 +192,80 @@ public class DmlStatementsProcessor {
     }
 
     /**
+     * Execute DML statement, possibly with few re-attempts in case of 
concurrent data modifications.
+     *
+     * @param schemaName Schema.
+     * @param conn Connection.
+     * @param prepared Prepared statement.
+     * @param fieldsQry Original query.
+     * @param loc Query locality flag.
+     * @param filters Cache name and key filter.
+     * @param cancel Cancel.
+     * @return Update result (modified items count and failed keys).
+     * @throws IgniteCheckedException if failed.
+     */
+    private Collection<UpdateResult> updateSqlFieldsBatched(String schemaName, 
Connection conn, Prepared prepared,
+        SqlFieldsQueryEx fieldsQry, boolean loc, IndexingQueryFilter filters, 
GridQueryCancel cancel)
+        throws IgniteCheckedException {
+        List<Object[]> argss = fieldsQry.batchedArgs();
+
+        UpdatePlan plan = getPlanForStatement(schemaName, conn, prepared, 
fieldsQry, loc, null);
+
+        if (plan.hasRows()) {
+            GridCacheContext<?, ?> cctx = plan.cacheContext();
+
+            CacheOperationContext opCtx = cctx.operationContextPerCall();
+
+            // Force keepBinary for operation context to avoid binary 
deserialization inside entry processor
+            if (cctx.binaryMarshaller()) {
+                CacheOperationContext newOpCtx = null;
+
+                if (opCtx == null)
+                    // Mimics behavior of GridCacheAdapter#keepBinary and 
GridCacheProxyImpl#keepBinary
+                    newOpCtx = new CacheOperationContext(false, null, true, 
null, false, null, false);
+                else if (!opCtx.isKeepBinary())
+                    newOpCtx = opCtx.keepBinary();
+
+                if (newOpCtx != null)
+                    cctx.operationContextPerCall(newOpCtx);
+            }
+
+            try {
+                List<List<?>> cur = plan.createRows(argss);
+
+                UpdateResult res = processDmlSelectResultBatched(cctx, plan, 
cur, fieldsQry.getPageSize());
+
+                Collection<UpdateResult> ress = new ArrayList<>(1);
+
+                // TODO: Wrong!
+                ress.add(res);
+
+                return ress;
+            }
+            finally {
+                cctx.operationContextPerCall(opCtx);
+            }
+        }
+        else {
+            // Fallback to previous mode.
+            Collection<UpdateResult> ress = new ArrayList<>(argss.size());
+
+            for (Object[] args : argss) {
+                SqlFieldsQueryEx qry0 = (SqlFieldsQueryEx)fieldsQry.copy();
+
+                qry0.clearBatchedArgs();
+                qry0.setArgs(args);
+
+                UpdateResult res = updateSqlFields(schemaName, conn, prepared, 
qry0, loc, filters, cancel);
+
+                ress.add(res);
+            }
+
+            return ress;
+        }
+    }
+
+    /**
      * @param schemaName Schema.
      * @param c Connection.
      * @param p Prepared statement.
@@ -198,18 +275,44 @@ public class DmlStatementsProcessor {
      * @throws IgniteCheckedException if failed.
      */
     @SuppressWarnings("unchecked")
-    QueryCursorImpl<List<?>> updateSqlFieldsDistributed(String schemaName, 
Connection c, Prepared p,
+    List<QueryCursorImpl<List<?>>> updateSqlFieldsDistributed(String 
schemaName, Connection c, Prepared p,
         SqlFieldsQuery fieldsQry, GridQueryCancel cancel) throws 
IgniteCheckedException {
-        UpdateResult res = updateSqlFields(schemaName, c, p, fieldsQry, false, 
null, cancel);
+        if (DmlUtils.isBatched(fieldsQry)) {
+            // TODO: Refactor.
+            Collection<UpdateResult> ress = updateSqlFieldsBatched(schemaName, 
c, p, (SqlFieldsQueryEx)fieldsQry,
+                false, null, cancel);
+
+            ArrayList<QueryCursorImpl<List<?>>> resCurs = new 
ArrayList<>(ress.size());
 
-        checkUpdateResult(res);
+            for (UpdateResult res : ress) {
+                checkUpdateResult(res);
 
-        QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new 
QueryCursorImpl(Collections.singletonList
-            (Collections.singletonList(res.counter())), cancel, false);
+                QueryCursorImpl<List<?>> resCur = 
(QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList
+                    (Collections.singletonList(res.counter())), cancel, false);
 
-        resCur.fieldsMeta(UPDATE_RESULT_META);
+                resCur.fieldsMeta(UPDATE_RESULT_META);
 
-        return resCur;
+                resCurs.add(resCur);
+            }
+
+            return resCurs;
+        }
+        else {
+            UpdateResult res = updateSqlFields(schemaName, c, p, fieldsQry, 
false, null, cancel);
+
+            ArrayList<QueryCursorImpl<List<?>>> resCurs = new ArrayList<>(1);
+
+            checkUpdateResult(res);
+
+            QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new 
QueryCursorImpl(Collections.singletonList
+                (Collections.singletonList(res.counter())), cancel, false);
+
+            resCur.fieldsMeta(UPDATE_RESULT_META);
+
+            resCurs.add(resCur);
+
+            return resCurs;
+        }
     }
 
     /**
@@ -399,6 +502,22 @@ public class DmlStatementsProcessor {
         return processDmlSelectResult(cctx, plan, cur, pageSize);
     }
 
+    private UpdateResult processDmlSelectResultBatched(GridCacheContext cctx, 
UpdatePlan plan, Collection<List<?>> rows,
+        int pageSize) throws IgniteCheckedException {
+        switch (plan.mode()) {
+            case MERGE:
+                // TODO
+                throw new IgniteCheckedException("Unsupported, fix");
+
+            case INSERT:
+                return new UpdateResult(doInsertBatched(plan, rows, pageSize), 
X.EMPTY_OBJECT_ARRAY);
+
+            default:
+                throw new IgniteSQLException("Unexpected DML operation [mode=" 
+ plan.mode() + ']',
+                    IgniteQueryErrorCode.UNEXPECTED_OPERATION);
+        }
+    }
+
     /**
      * @param cctx Cache context.
      * @param plan Update plan.
@@ -673,6 +792,50 @@ public class DmlStatementsProcessor {
     }
 
     /**
+     * Execute INSERT statement plan.
+     * @param cursor Cursor to take inserted data from.
+     * @param pageSize Batch size for streaming, anything <= 0 for single page 
operations.
+     * @return Number of items affected.
+     * @throws IgniteCheckedException if failed, particularly in case of 
duplicate keys.
+     */
+    @SuppressWarnings({"unchecked", "ConstantConditions"})
+    private long doInsertBatched(UpdatePlan plan, Collection<List<?>> cursor, 
int pageSize)
+        throws IgniteCheckedException {
+        GridCacheContext cctx = plan.cacheContext();
+
+        // Keys that failed to INSERT due to duplication.
+        DmlBatchSender sender = new DmlBatchSender(cctx, pageSize);
+
+        for (List<?> row : cursor) {
+            final IgniteBiTuple keyValPair = plan.processRow(row);
+
+            sender.add(keyValPair.getKey(), new 
InsertEntryProcessor(keyValPair.getValue()));
+        }
+
+        // TODO: Tale page size in count?
+        sender.flush();
+
+        SQLException resEx = sender.error();
+
+        if (!F.isEmpty(sender.failedKeys())) {
+            String msg = "Failed to INSERT some keys because they are already 
in cache " +
+                "[keys=" + sender.failedKeys() + ']';
+
+            SQLException dupEx = new SQLException(msg, 
SqlStateCode.CONSTRAINT_VIOLATION);
+
+            if (resEx == null)
+                resEx = dupEx;
+            else
+                resEx.setNextException(dupEx);
+        }
+
+        if (resEx != null)
+            throw new IgniteSQLException(resEx);
+
+        return sender.updateCount();
+    }
+
+    /**
      *
      * @param schemaName Schema name.
      * @param stmt Prepared statement.
@@ -810,5 +973,4 @@ public class DmlStatementsProcessor {
             throw new IgniteSQLException(conEx);
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/03f9fe71/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 6fdcd27..31a6645 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1507,14 +1507,17 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                     int paramsCnt = prepared.getParameters().size();
 
                     if (paramsCnt > 0) {
-                        if (argsOrig == null || argsOrig.length < firstArg + 
paramsCnt) {
-                            throw new IgniteException("Invalid number of query 
parameters. " +
-                                "Cannot find " + (argsOrig.length + 1 - 
firstArg) + " parameter.");
-                        }
+                        // TODO: Check remainer.
+                        if (prepared.isQuery()) {
+                            if (argsOrig == null || argsOrig.length < firstArg 
+ paramsCnt) {
+                                throw new IgniteException("Invalid number of 
query parameters. " +
+                                    "Cannot find " + (argsOrig.length + 1 - 
firstArg) + " parameter.");
+                            }
 
-                        args = Arrays.copyOfRange(argsOrig, firstArg, firstArg 
+ paramsCnt);
+                            args = Arrays.copyOfRange(argsOrig, firstArg, 
firstArg + paramsCnt);
 
-                        firstArg += paramsCnt;
+                            firstArg += paramsCnt;
+                        }
                     }
 
                     cachedQryKey = new H2TwoStepCachedQueryKey(schemaName, 
sqlQry, grpByCollocated,
@@ -1555,7 +1558,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                 if (twoStepQry == null) {
                     if (DmlStatementsProcessor.isDmlStatement(prepared)) {
                         try {
-                            
res.add(dmlProc.updateSqlFieldsDistributed(schemaName, c, prepared,
+                            
res.addAll(dmlProc.updateSqlFieldsDistributed(schemaName, c, prepared,
                                 qry.copy().setSql(sqlQry).setArgs(args), 
cancel));
 
                             continue;

http://git-wip-us.apache.org/repos/asf/ignite/blob/03f9fe71/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java
index 8d4861e..e4b52a6 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java
@@ -22,10 +22,13 @@ import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.Date;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.h2.util.DateTimeUtils;
 import org.h2.util.LocalDateTimeUtils;
@@ -117,6 +120,22 @@ public class DmlUtils {
     }
 
     /**
+     * Check whether query is batched.
+     *
+     * @param qry Query.
+     * @return {@code True} if batched.
+     */
+    public static boolean isBatched(SqlFieldsQuery qry) {
+        if (qry instanceof SqlFieldsQueryEx) {
+            SqlFieldsQueryEx qry0 = (SqlFieldsQueryEx)qry;
+
+            return !F.isEmpty(qry0.batchedArgs());
+        }
+
+        return false;
+    }
+
+    /**
      * Private constructor.
      */
     private DmlUtils() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/03f9fe71/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
index 6a45c3c..248041a 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
@@ -376,10 +376,10 @@ public final class UpdatePlan {
     public List<List<?>> createRows(Object[] args) throws 
IgniteCheckedException {
         assert rowsNum > 0 && !F.isEmpty(colNames);
 
-        List<List<?>> res = new ArrayList<>(rowsNum);
-
         GridH2RowDescriptor desc = tbl.rowDescriptor();
 
+        List<List<?>> res = new ArrayList<>(rowsNum);
+
         for (List<DmlArgument> row : rows) {
             List<Object> resRow = new ArrayList<>();
 
@@ -401,6 +401,36 @@ public final class UpdatePlan {
         return res;
     }
 
+    public List<List<?>> createRows(List<Object[]> argss) throws 
IgniteCheckedException {
+        assert rowsNum > 0 && !F.isEmpty(colNames);
+
+        GridH2RowDescriptor desc = tbl.rowDescriptor();
+
+        List<List<?>> res = new ArrayList<>(rowsNum * argss.size());
+
+        for (Object[] args : argss) {
+            for (List<DmlArgument> row : rows) {
+                List<Object> resRow = new ArrayList<>();
+
+                for (int j = 0; j < colNames.length; j++) {
+                    Object colVal = row.get(j).get(args);
+
+                    if (j == keyColIdx || j == valColIdx) {
+                        Class<?> colCls = j == keyColIdx ? 
desc.type().keyClass() : desc.type().valueClass();
+
+                        colVal = DmlUtils.convert(colVal, desc, colCls, 
colTypes[j]);
+                    }
+
+                    resRow.add(colVal);
+                }
+
+                res.add(resRow);
+            }
+        }
+
+        return res;
+    }
+
     /**
      * @return Update mode.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/03f9fe71/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
index 5ffd264..3305b00 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
@@ -619,7 +619,7 @@ public final class UpdatePlanBuilder {
         Connection conn, SqlFieldsQuery fieldsQry, boolean loc, String 
selectQry, String cacheName)
         throws IgniteCheckedException {
 
-        if (loc || !isSkipReducerOnUpdateQuery(fieldsQry))
+        if (loc || !isSkipReducerOnUpdateQuery(fieldsQry) || 
DmlUtils.isBatched(fieldsQry))
             return null;
 
         assert conn != null;

Reply via email to