Test fix

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e11fee9c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e11fee9c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e11fee9c

Branch: refs/heads/ignite-6022-proto
Commit: e11fee9c416493ef084032520f2e6dec35852f29
Parents: 799098c
Author: Alexander Paschenko <[email protected]>
Authored: Fri Dec 15 18:42:07 2017 +0300
Committer: Alexander Paschenko <[email protected]>
Committed: Fri Dec 15 18:42:07 2017 +0300

----------------------------------------------------------------------
 .../query/h2/DmlStatementsProcessor.java        | 135 +++++++------------
 .../processors/query/h2/dml/DmlAstUtils.java    |   4 +-
 .../processors/query/h2/dml/UpdatePlan.java     |  57 +++++++-
 .../query/h2/dml/UpdatePlanBuilder.java         |   4 +-
 4 files changed, 106 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e11fee9c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
index 9e41bfe..dd62c75 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -17,12 +17,16 @@
 
 package org.apache.ignite.internal.processors.query.h2;
 
+import java.lang.reflect.Array;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Date;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -32,7 +36,6 @@ import java.util.concurrent.TimeUnit;
 import javax.cache.processor.EntryProcessor;
 import javax.cache.processor.EntryProcessorException;
 import javax.cache.processor.MutableEntry;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
@@ -49,15 +52,14 @@ import 
org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
 import 
org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
-import org.apache.ignite.internal.processors.query.QueryUtils;
-import org.apache.ignite.internal.processors.query.h2.dml.FastUpdateArgument;
-import org.apache.ignite.internal.processors.query.h2.dml.FastUpdateArguments;
 import org.apache.ignite.internal.processors.query.h2.dml.DmlBatchSender;
 import 
org.apache.ignite.internal.processors.query.h2.dml.DmlDistributedPlanInfo;
 import org.apache.ignite.internal.processors.query.h2.dml.FastUpdate;
+import org.apache.ignite.internal.processors.query.h2.dml.FastUpdateArgument;
 import org.apache.ignite.internal.processors.query.h2.dml.UpdateMode;
 import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
 import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
 import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
 import org.apache.ignite.internal.util.lang.IgniteSingletonIterator;
@@ -73,14 +75,12 @@ import org.h2.command.dml.Delete;
 import org.h2.command.dml.Insert;
 import org.h2.command.dml.Merge;
 import org.h2.command.dml.Update;
-import org.h2.table.Column;
 import org.h2.util.DateTimeUtils;
 import org.h2.util.LocalDateTimeUtils;
 import org.h2.value.Value;
 import org.h2.value.ValueDate;
 import org.h2.value.ValueTime;
 import org.h2.value.ValueTimestamp;
-import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 import static 
org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.createJdbcSqlException;
@@ -258,15 +258,13 @@ public class DmlStatementsProcessor {
      * @throws IgniteCheckedException if failed.
      */
     @SuppressWarnings({"unchecked", "ConstantConditions"})
-    long streamUpdateQuery(IgniteDataStreamer streamer, PreparedStatement 
stmt, Object[] args)
+    long streamUpdateQuery(IgniteDataStreamer streamer, PreparedStatement 
stmt, final Object[] args)
         throws IgniteCheckedException {
-        args = U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY);
-
         Prepared p = GridSqlQueryParser.prepared(stmt);
 
         assert p != null;
 
-        UpdatePlan plan = UpdatePlanBuilder.planForStatement(p, true, idx, 
null, null, null);
+        final UpdatePlan plan = UpdatePlanBuilder.planForStatement(p, true, 
idx, null, null, null);
 
         if (!F.eq(streamer.cacheName(), plan.cacheContext().name()))
             throw new IgniteSQLException("Cross cache streaming is not 
supported, please specify cache explicitly" +
@@ -281,14 +279,22 @@ public class DmlStatementsProcessor {
 
             final ArrayList<List<?>> data = new ArrayList<>(plan.rowCount());
 
-            final GridQueryFieldsResult res = 
idx.queryLocalSqlFields(idx.schema(cctx.name()), plan.selectQuery(),
-                F.asList(args), null, false, 0, null);
-
             QueryCursorImpl<List<?>> stepCur = new QueryCursorImpl<>(new 
Iterable<List<?>>() {
                 @Override public Iterator<List<?>> iterator() {
                     try {
-                        return new 
GridQueryCacheObjectsIterator(res.iterator(), idx.objectContext(),
-                            cctx.keepBinary());
+                        Iterator<List<?>> it;
+
+                        if (!F.isEmpty(plan.selectQuery())) {
+                            GridQueryFieldsResult res = 
idx.queryLocalSqlFields(idx.schema(cctx.name()),
+                                plan.selectQuery(), 
F.asList(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)),
+                                null, false, 0, null);
+
+                            it = res.iterator();
+                        }
+                        else
+                            it = planToRows(plan, U.firstNotNull(args, 
X.EMPTY_OBJECT_ARRAY)).iterator();
+
+                        return new GridQueryCacheObjectsIterator(it, 
idx.objectContext(), cctx.keepBinary());
                     }
                     catch (IgniteCheckedException e) {
                         throw new IgniteException(e);
@@ -370,8 +376,6 @@ public class DmlStatementsProcessor {
                 return result;
         }
 
-        assert !F.isEmpty(plan.selectQuery());
-
         Iterable<List<?>> cur;
 
         // Do a two-step query only if locality flag is not set AND if plan's 
SELECT corresponds to an actual
@@ -387,11 +391,10 @@ public class DmlStatementsProcessor {
                 .setPageSize(fieldsQry.getPageSize())
                 .setTimeout(fieldsQry.getTimeout(), TimeUnit.MILLISECONDS);
 
-            cur = 
(QueryCursorImpl<List<?>>)idx.queryDistributedSqlFields(schemaName, 
newFieldsQry, true,
-                cancel, mainCacheId, true).get(0);
+            cur = idx.queryDistributedSqlFields(schemaName, newFieldsQry, 
true, cancel, mainCacheId, true).get(0);
         }
-        else if (F.isEmpty(plan.rows)) {
-            final GridQueryFieldsResult res = 
idx.queryLocalSqlFields(schemaName, plan.selectQry(),
+        else if (F.isEmpty(plan.rows())) {
+            final GridQueryFieldsResult res = 
idx.queryLocalSqlFields(schemaName, plan.selectQuery(),
                 F.asList(fieldsQry.getArgs()), filters, 
fieldsQry.isEnforceJoinOrder(), fieldsQry.getTimeout(), cancel);
 
             cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
@@ -405,35 +408,45 @@ public class DmlStatementsProcessor {
                 }
             }, cancel);
         }
-        else {
-            assert plan.rowsNum > 0 && !F.isEmpty(plan.colNames);
+        else
+            cur = planToRows(plan, fieldsQry.getArgs());
 
-            List<List<?>> args = new ArrayList<>(plan.rowsNum);
+        int pageSize = loc ? 0 : fieldsQry.getPageSize();
 
-            GridH2RowDescriptor desc = plan.tbl.rowDescriptor();
+        return processDmlSelectResult(cctx, plan, cur, pageSize);
+    }
 
-            for (List<FastUpdateArgument> argRow : plan.rows) {
-                List<Object> row = new ArrayList<>();
+    /**
+     * Extract rows from plan without performing any query.
+     * @param plan Plan.
+     * @param args Original query arguments.
+     * @return Rows from plan.
+     * @throws IgniteCheckedException if failed.
+     */
+    private List<List<?>> planToRows(UpdatePlan plan, Object[] args) throws 
IgniteCheckedException {
+        assert plan.rowCount() > 0 && !F.isEmpty(plan.columnNames());
 
-                for (int j = 0; j < plan.colNames.length; j++) {
-                    Object colVal = argRow.get(j).apply(fieldsQry.getArgs());
+        List<List<?>> rows = new ArrayList<>(plan.rowCount());
 
-                    if (j == plan.keyColIdx || j == plan.valColIdx)
-                        colVal = convert(colVal, desc, j == plan.keyColIdx ? 
desc.type().keyClass() :
-                            desc.type().valueClass(), plan.colTypes[j]);
+        GridH2RowDescriptor desc = plan.table().rowDescriptor();
 
-                    row.add(colVal);
-                }
+        for (List<FastUpdateArgument> argRow : plan.rows()) {
+            List<Object> row = new ArrayList<>();
+
+            for (int j = 0; j < plan.columnNames().length; j++) {
+                Object colVal = argRow.get(j).apply(args);
+
+                if (j == plan.keyColumnIndex() || j == plan.valueColumnIndex())
+                    colVal = convert(colVal, desc, j == plan.keyColumnIndex() 
? desc.type().keyClass() :
+                        desc.type().valueClass(), plan.columnTypes()[j]);
 
-                args.add(row);
+                row.add(colVal);
             }
 
-            cur = args;
+            rows.add(row);
         }
 
-        int pageSize = loc ? 0 : fieldsQry.getPageSize();
-
-        return processDmlSelectResult(cctx, plan, cur, pageSize);
+        return rows;
     }
 
     /**
@@ -680,50 +693,6 @@ public class DmlStatementsProcessor {
     }
 
     /**
-     * Process errors of entry processor - split the keys into 
duplicated/concurrently modified and those whose
-     * processing yielded an exception.
-     *
-     * @param res Result of {@link GridCacheAdapter#invokeAll)}
-     * @return pair [array of duplicated/concurrently modified keys, SQL 
exception for erroneous keys] (exception is
-     * null if all keys are duplicates/concurrently modified ones).
-     */
-    private static PageProcessingErrorResult splitErrors(Map<Object, 
EntryProcessorResult<Boolean>> res) {
-        Set<Object> errKeys = new LinkedHashSet<>(res.keySet());
-
-        SQLException currSqlEx = null;
-
-        SQLException firstSqlEx = null;
-
-        int errors = 0;
-
-        // Let's form a chain of SQL exceptions
-        for (Map.Entry<Object, EntryProcessorResult<Boolean>> e : 
res.entrySet()) {
-            try {
-                e.getValue().get();
-            }
-            catch (EntryProcessorException ex) {
-                SQLException next = createJdbcSqlException("Failed to process 
key '" + e.getKey() + '\'',
-                    IgniteQueryErrorCode.ENTRY_PROCESSING);
-
-                next.initCause(ex);
-
-                if (currSqlEx != null)
-                    currSqlEx.setNextException(next);
-                else
-                    firstSqlEx = next;
-
-                currSqlEx = next;
-
-                errKeys.remove(e.getKey());
-
-                errors++;
-            }
-        }
-
-        return new PageProcessingErrorResult(errKeys.toArray(), firstSqlEx, 
errors);
-    }
-
-    /**
      * Execute MERGE statement plan.
      * @param cursor Cursor to take inserted data from.
      * @param pageSize Batch size to stream data from {@code cursor}, anything 
<= 0 for single page operations.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e11fee9c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlAstUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlAstUtils.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlAstUtils.java
index a2cd553..b6c4a2a 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlAstUtils.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlAstUtils.java
@@ -83,7 +83,6 @@ public final class DmlAstUtils {
      * @param cols Columns to insert values into.
      * @param rows Rows to create pseudo-SELECT upon.
      * @param subQry Subquery to use rather than rows.
-     * @param desc Row descriptor.
      * @return Subquery or pseudo-SELECT to evaluate inserted expressions, or 
{@code null} no query needs to be run.
      */
     public static GridSqlQuery selectForInsertOrMerge(GridSqlColumn[] cols, 
List<GridSqlElement[]> rows,
@@ -219,8 +218,7 @@ public final class DmlAstUtils {
         if (!(set instanceof GridSqlConst || set instanceof GridSqlParameter))
             return null;
 
-        return new FastUpdateArguments(operandForElement(filter.getKey()), 
operandForElement(filter.getValue()),
-            operandForElement(set));
+        return FastUpdate.create(filter.getKey(), filter.getValue(), set);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/e11fee9c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
index fa86836..96298d8 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
@@ -17,7 +17,9 @@
 
 package org.apache.ignite.internal.processors.query.h2.dml;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.binary.BinaryObjectBuilder;
@@ -35,10 +37,6 @@ import org.apache.ignite.lang.IgniteBiTuple;
 import org.h2.table.Column;
 import org.jetbrains.annotations.Nullable;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.DEFAULT_COLUMNS_COUNT;
 
 /**
@@ -48,7 +46,7 @@ public final class UpdatePlan {
     /** Initial statement to drive the rest of the logic. */
     private final UpdateMode mode;
 
-    /** Target table to be affected by initial DML statement. */
+    /**  to be affected by initial DML statement. */
     private final GridH2Table tbl;
 
     /** Column names to set or update. */
@@ -75,8 +73,8 @@ public final class UpdatePlan {
     /** Subquery flag - {@code true} if {@link #selectQry} is an actual 
subquery that retrieves data from some cache. */
     private final boolean isLocSubqry;
 
-    /** */
-    public final List<List<FastUpdateArgument>> rows;
+    /** Rows for query-less MERGE or INSERT. */
+    private final List<List<FastUpdateArgument>> rows;
 
     /** Number of rows in rows based MERGE or INSERT. */
     private final int rowsNum;
@@ -106,6 +104,7 @@ public final class UpdatePlan {
      * @param valColIdx value column index.
      * @param selectQry Select query.
      * @param isLocSubqry Local subquery flag.
+     * @param rows Rows for query-less INSERT or MERGE.
      * @param rowsNum Rows number.
      * @param fastUpdate Fast update (if any).
      * @param distributed Distributed plan (if any)
@@ -121,6 +120,7 @@ public final class UpdatePlan {
         int valColIdx,
         String selectQry,
         boolean isLocSubqry,
+        List<List<FastUpdateArgument>> rows,
         int rowsNum,
         @Nullable FastUpdate fastUpdate,
         @Nullable DmlDistributedPlanInfo distributed
@@ -172,6 +172,7 @@ public final class UpdatePlan {
             -1,
             selectQry,
             false,
+            null,
             0,
             fastUpdate,
             distributed
@@ -398,6 +399,48 @@ public final class UpdatePlan {
         return fastUpdate;
     }
 
+    /**
+     * @return Names of affected columns.
+     */
+    public String[] columnNames() {
+        return colNames;
+    }
+
+    /**
+     * @return Types of affected columns.
+     */
+    public int[] columnTypes() {
+        return colTypes;
+    }
+
+    /**
+     * @return Rows for query-less MERGE or INSERT.
+     */
+    public List<List<FastUpdateArgument>> rows() {
+        return rows;
+    }
+
+    /**
+     * @return Key column index.
+     */
+    public int keyColumnIndex() {
+        return keyColIdx;
+    }
+
+    /**
+     * @return Value column index.
+     */
+    public int valueColumnIndex() {
+        return valColIdx;
+    }
+
+    /**
+     * @return Target table.
+     */
+    public GridH2Table table() {
+        return tbl;
+    }
+
     /*
     public static UpdatePlan forMerge(GridH2Table tbl, String[] colNames, 
int[] colTypes, KeyValueSupplier keySupplier,
                                       KeyValueSupplier valSupplier, int 
keyColIdx, int valColIdx, String selectQry, boolean isLocSubqry,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e11fee9c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
index 52efd6d..d04cea9 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
@@ -247,7 +247,7 @@ public final class UpdatePlanBuilder {
         KeyValueSupplier keySupplier = createSupplier(cctx, desc.type(), 
keyColIdx, hasKeyProps, true, false);
         KeyValueSupplier valSupplier = createSupplier(cctx, desc.type(), 
valColIdx, hasValProps, false, false);
 
-        String selectSql = sel.getSQL();
+        String selectSql = sel != null ? sel.getSQL() : null;
 
         DmlDistributedPlanInfo distributed = (rowsNum == 0 && 
!F.isEmpty(selectSql)) ?
             checkPlanCanBeDistributed(idx, conn, fieldsQuery, loc, selectSql, 
tbl.dataTable().cacheName()) : null;
@@ -265,6 +265,7 @@ public final class UpdatePlanBuilder {
             valColIdx,
             selectSql,
             !isTwoStepSubqry,
+            rows,
             rowsNum,
             null,
             distributed
@@ -391,6 +392,7 @@ public final class UpdatePlanBuilder {
                     valColIdx,
                     selectSql,
                     false,
+                    null,
                     0,
                     null,
                     distributed

Reply via email to