Test fix
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e11fee9c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e11fee9c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e11fee9c Branch: refs/heads/ignite-6022-proto Commit: e11fee9c416493ef084032520f2e6dec35852f29 Parents: 799098c Author: Alexander Paschenko <[email protected]> Authored: Fri Dec 15 18:42:07 2017 +0300 Committer: Alexander Paschenko <[email protected]> Committed: Fri Dec 15 18:42:07 2017 +0300 ---------------------------------------------------------------------- .../query/h2/DmlStatementsProcessor.java | 135 +++++++------------ .../processors/query/h2/dml/DmlAstUtils.java | 4 +- .../processors/query/h2/dml/UpdatePlan.java | 57 +++++++- .../query/h2/dml/UpdatePlanBuilder.java | 4 +- 4 files changed, 106 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e11fee9c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java index 9e41bfe..dd62c75 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java @@ -17,12 +17,16 @@ package org.apache.ignite.internal.processors.query.h2; +import java.lang.reflect.Array; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Date; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; @@ -32,7 +36,6 @@ import java.util.concurrent.TimeUnit; import javax.cache.processor.EntryProcessor; import javax.cache.processor.EntryProcessorException; import javax.cache.processor.MutableEntry; - import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteException; @@ -49,15 +52,14 @@ import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.GridQueryFieldsResult; import org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter; import org.apache.ignite.internal.processors.query.IgniteSQLException; -import org.apache.ignite.internal.processors.query.QueryUtils; -import org.apache.ignite.internal.processors.query.h2.dml.FastUpdateArgument; -import org.apache.ignite.internal.processors.query.h2.dml.FastUpdateArguments; import org.apache.ignite.internal.processors.query.h2.dml.DmlBatchSender; import org.apache.ignite.internal.processors.query.h2.dml.DmlDistributedPlanInfo; import org.apache.ignite.internal.processors.query.h2.dml.FastUpdate; +import org.apache.ignite.internal.processors.query.h2.dml.FastUpdateArgument; import org.apache.ignite.internal.processors.query.h2.dml.UpdateMode; import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan; import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser; import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap; import org.apache.ignite.internal.util.lang.IgniteSingletonIterator; @@ -73,14 +75,12 @@ import org.h2.command.dml.Delete; import org.h2.command.dml.Insert; import org.h2.command.dml.Merge; import org.h2.command.dml.Update; -import org.h2.table.Column; import org.h2.util.DateTimeUtils; import org.h2.util.LocalDateTimeUtils; import org.h2.value.Value; import org.h2.value.ValueDate; import org.h2.value.ValueTime; import org.h2.value.ValueTimestamp; -import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.createJdbcSqlException; @@ -258,15 +258,13 @@ public class DmlStatementsProcessor { * @throws IgniteCheckedException if failed. */ @SuppressWarnings({"unchecked", "ConstantConditions"}) - long streamUpdateQuery(IgniteDataStreamer streamer, PreparedStatement stmt, Object[] args) + long streamUpdateQuery(IgniteDataStreamer streamer, PreparedStatement stmt, final Object[] args) throws IgniteCheckedException { - args = U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY); - Prepared p = GridSqlQueryParser.prepared(stmt); assert p != null; - UpdatePlan plan = UpdatePlanBuilder.planForStatement(p, true, idx, null, null, null); + final UpdatePlan plan = UpdatePlanBuilder.planForStatement(p, true, idx, null, null, null); if (!F.eq(streamer.cacheName(), plan.cacheContext().name())) throw new IgniteSQLException("Cross cache streaming is not supported, please specify cache explicitly" + @@ -281,14 +279,22 @@ public class DmlStatementsProcessor { final ArrayList<List<?>> data = new ArrayList<>(plan.rowCount()); - final GridQueryFieldsResult res = idx.queryLocalSqlFields(idx.schema(cctx.name()), plan.selectQuery(), - F.asList(args), null, false, 0, null); - QueryCursorImpl<List<?>> stepCur = new QueryCursorImpl<>(new Iterable<List<?>>() { @Override public Iterator<List<?>> iterator() { try { - return new GridQueryCacheObjectsIterator(res.iterator(), idx.objectContext(), - cctx.keepBinary()); + Iterator<List<?>> it; + + if (!F.isEmpty(plan.selectQuery())) { + GridQueryFieldsResult res = idx.queryLocalSqlFields(idx.schema(cctx.name()), + plan.selectQuery(), F.asList(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)), + null, false, 0, null); + + it = res.iterator(); + } + else + it = planToRows(plan, U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)).iterator(); + + return new GridQueryCacheObjectsIterator(it, idx.objectContext(), cctx.keepBinary()); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -370,8 +376,6 @@ public class DmlStatementsProcessor { return result; } - assert !F.isEmpty(plan.selectQuery()); - Iterable<List<?>> cur; // Do a two-step query only if locality flag is not set AND if plan's SELECT corresponds to an actual @@ -387,11 +391,10 @@ public class DmlStatementsProcessor { .setPageSize(fieldsQry.getPageSize()) .setTimeout(fieldsQry.getTimeout(), TimeUnit.MILLISECONDS); - cur = (QueryCursorImpl<List<?>>)idx.queryDistributedSqlFields(schemaName, newFieldsQry, true, - cancel, mainCacheId, true).get(0); + cur = idx.queryDistributedSqlFields(schemaName, newFieldsQry, true, cancel, mainCacheId, true).get(0); } - else if (F.isEmpty(plan.rows)) { - final GridQueryFieldsResult res = idx.queryLocalSqlFields(schemaName, plan.selectQry(), + else if (F.isEmpty(plan.rows())) { + final GridQueryFieldsResult res = idx.queryLocalSqlFields(schemaName, plan.selectQuery(), F.asList(fieldsQry.getArgs()), filters, fieldsQry.isEnforceJoinOrder(), fieldsQry.getTimeout(), cancel); cur = new QueryCursorImpl<>(new Iterable<List<?>>() { @@ -405,35 +408,45 @@ public class DmlStatementsProcessor { } }, cancel); } - else { - assert plan.rowsNum > 0 && !F.isEmpty(plan.colNames); + else + cur = planToRows(plan, fieldsQry.getArgs()); - List<List<?>> args = new ArrayList<>(plan.rowsNum); + int pageSize = loc ? 0 : fieldsQry.getPageSize(); - GridH2RowDescriptor desc = plan.tbl.rowDescriptor(); + return processDmlSelectResult(cctx, plan, cur, pageSize); + } - for (List<FastUpdateArgument> argRow : plan.rows) { - List<Object> row = new ArrayList<>(); + /** + * Extract rows from plan without performing any query. + * @param plan Plan. + * @param args Original query arguments. + * @return Rows from plan. + * @throws IgniteCheckedException if failed. + */ + private List<List<?>> planToRows(UpdatePlan plan, Object[] args) throws IgniteCheckedException { + assert plan.rowCount() > 0 && !F.isEmpty(plan.columnNames()); - for (int j = 0; j < plan.colNames.length; j++) { - Object colVal = argRow.get(j).apply(fieldsQry.getArgs()); + List<List<?>> rows = new ArrayList<>(plan.rowCount()); - if (j == plan.keyColIdx || j == plan.valColIdx) - colVal = convert(colVal, desc, j == plan.keyColIdx ? desc.type().keyClass() : - desc.type().valueClass(), plan.colTypes[j]); + GridH2RowDescriptor desc = plan.table().rowDescriptor(); - row.add(colVal); - } + for (List<FastUpdateArgument> argRow : plan.rows()) { + List<Object> row = new ArrayList<>(); + + for (int j = 0; j < plan.columnNames().length; j++) { + Object colVal = argRow.get(j).apply(args); + + if (j == plan.keyColumnIndex() || j == plan.valueColumnIndex()) + colVal = convert(colVal, desc, j == plan.keyColumnIndex() ? desc.type().keyClass() : + desc.type().valueClass(), plan.columnTypes()[j]); - args.add(row); + row.add(colVal); } - cur = args; + rows.add(row); } - int pageSize = loc ? 0 : fieldsQry.getPageSize(); - - return processDmlSelectResult(cctx, plan, cur, pageSize); + return rows; } /** @@ -680,50 +693,6 @@ public class DmlStatementsProcessor { } /** - * Process errors of entry processor - split the keys into duplicated/concurrently modified and those whose - * processing yielded an exception. - * - * @param res Result of {@link GridCacheAdapter#invokeAll)} - * @return pair [array of duplicated/concurrently modified keys, SQL exception for erroneous keys] (exception is - * null if all keys are duplicates/concurrently modified ones). - */ - private static PageProcessingErrorResult splitErrors(Map<Object, EntryProcessorResult<Boolean>> res) { - Set<Object> errKeys = new LinkedHashSet<>(res.keySet()); - - SQLException currSqlEx = null; - - SQLException firstSqlEx = null; - - int errors = 0; - - // Let's form a chain of SQL exceptions - for (Map.Entry<Object, EntryProcessorResult<Boolean>> e : res.entrySet()) { - try { - e.getValue().get(); - } - catch (EntryProcessorException ex) { - SQLException next = createJdbcSqlException("Failed to process key '" + e.getKey() + '\'', - IgniteQueryErrorCode.ENTRY_PROCESSING); - - next.initCause(ex); - - if (currSqlEx != null) - currSqlEx.setNextException(next); - else - firstSqlEx = next; - - currSqlEx = next; - - errKeys.remove(e.getKey()); - - errors++; - } - } - - return new PageProcessingErrorResult(errKeys.toArray(), firstSqlEx, errors); - } - - /** * Execute MERGE statement plan. * @param cursor Cursor to take inserted data from. * @param pageSize Batch size to stream data from {@code cursor}, anything <= 0 for single page operations. http://git-wip-us.apache.org/repos/asf/ignite/blob/e11fee9c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlAstUtils.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlAstUtils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlAstUtils.java index a2cd553..b6c4a2a 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlAstUtils.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlAstUtils.java @@ -83,7 +83,6 @@ public final class DmlAstUtils { * @param cols Columns to insert values into. * @param rows Rows to create pseudo-SELECT upon. * @param subQry Subquery to use rather than rows. - * @param desc Row descriptor. * @return Subquery or pseudo-SELECT to evaluate inserted expressions, or {@code null} no query needs to be run. */ public static GridSqlQuery selectForInsertOrMerge(GridSqlColumn[] cols, List<GridSqlElement[]> rows, @@ -219,8 +218,7 @@ public final class DmlAstUtils { if (!(set instanceof GridSqlConst || set instanceof GridSqlParameter)) return null; - return new FastUpdateArguments(operandForElement(filter.getKey()), operandForElement(filter.getValue()), - operandForElement(set)); + return FastUpdate.create(filter.getKey(), filter.getValue(), set); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/e11fee9c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java index fa86836..96298d8 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java @@ -17,7 +17,9 @@ package org.apache.ignite.internal.processors.query.h2.dml; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.binary.BinaryObjectBuilder; @@ -35,10 +37,6 @@ import org.apache.ignite.lang.IgniteBiTuple; import org.h2.table.Column; import org.jetbrains.annotations.Nullable; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import static org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.DEFAULT_COLUMNS_COUNT; /** @@ -48,7 +46,7 @@ public final class UpdatePlan { /** Initial statement to drive the rest of the logic. */ private final UpdateMode mode; - /** Target table to be affected by initial DML statement. */ + /** to be affected by initial DML statement. */ private final GridH2Table tbl; /** Column names to set or update. */ @@ -75,8 +73,8 @@ public final class UpdatePlan { /** Subquery flag - {@code true} if {@link #selectQry} is an actual subquery that retrieves data from some cache. */ private final boolean isLocSubqry; - /** */ - public final List<List<FastUpdateArgument>> rows; + /** Rows for query-less MERGE or INSERT. */ + private final List<List<FastUpdateArgument>> rows; /** Number of rows in rows based MERGE or INSERT. */ private final int rowsNum; @@ -106,6 +104,7 @@ public final class UpdatePlan { * @param valColIdx value column index. * @param selectQry Select query. * @param isLocSubqry Local subquery flag. + * @param rows Rows for query-less INSERT or MERGE. * @param rowsNum Rows number. * @param fastUpdate Fast update (if any). * @param distributed Distributed plan (if any) @@ -121,6 +120,7 @@ public final class UpdatePlan { int valColIdx, String selectQry, boolean isLocSubqry, + List<List<FastUpdateArgument>> rows, int rowsNum, @Nullable FastUpdate fastUpdate, @Nullable DmlDistributedPlanInfo distributed @@ -172,6 +172,7 @@ public final class UpdatePlan { -1, selectQry, false, + null, 0, fastUpdate, distributed @@ -398,6 +399,48 @@ public final class UpdatePlan { return fastUpdate; } + /** + * @return Names of affected columns. + */ + public String[] columnNames() { + return colNames; + } + + /** + * @return Types of affected columns. + */ + public int[] columnTypes() { + return colTypes; + } + + /** + * @return Rows for query-less MERGE or INSERT. + */ + public List<List<FastUpdateArgument>> rows() { + return rows; + } + + /** + * @return Key column index. + */ + public int keyColumnIndex() { + return keyColIdx; + } + + /** + * @return Value column index. + */ + public int valueColumnIndex() { + return valColIdx; + } + + /** + * @return Target table. + */ + public GridH2Table table() { + return tbl; + } + /* public static UpdatePlan forMerge(GridH2Table tbl, String[] colNames, int[] colTypes, KeyValueSupplier keySupplier, KeyValueSupplier valSupplier, int keyColIdx, int valColIdx, String selectQry, boolean isLocSubqry, http://git-wip-us.apache.org/repos/asf/ignite/blob/e11fee9c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java index 52efd6d..d04cea9 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java @@ -247,7 +247,7 @@ public final class UpdatePlanBuilder { KeyValueSupplier keySupplier = createSupplier(cctx, desc.type(), keyColIdx, hasKeyProps, true, false); KeyValueSupplier valSupplier = createSupplier(cctx, desc.type(), valColIdx, hasValProps, false, false); - String selectSql = sel.getSQL(); + String selectSql = sel != null ? sel.getSQL() : null; DmlDistributedPlanInfo distributed = (rowsNum == 0 && !F.isEmpty(selectSql)) ? checkPlanCanBeDistributed(idx, conn, fieldsQuery, loc, selectSql, tbl.dataTable().cacheName()) : null; @@ -265,6 +265,7 @@ public final class UpdatePlanBuilder { valColIdx, selectSql, !isTwoStepSubqry, + rows, rowsNum, null, distributed @@ -391,6 +392,7 @@ public final class UpdatePlanBuilder { valColIdx, selectSql, false, + null, 0, null, distributed
