Native batching prototype.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/03f9fe71 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/03f9fe71 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/03f9fe71 Branch: refs/heads/ignite-6022-proto Commit: 03f9fe71ed1e41353e983cc8993b7b369cd91936 Parents: dd70a84 Author: devozerov <[email protected]> Authored: Mon Dec 18 14:15:41 2017 +0300 Committer: devozerov <[email protected]> Committed: Mon Dec 18 14:15:41 2017 +0300 ---------------------------------------------------------------------- .../cache/query/SqlFieldsQueryEx.java | 44 ++++- .../odbc/jdbc/JdbcRequestHandler.java | 42 +++-- .../query/h2/DmlStatementsProcessor.java | 178 ++++++++++++++++++- .../processors/query/h2/IgniteH2Indexing.java | 17 +- .../processors/query/h2/dml/DmlUtils.java | 19 ++ .../processors/query/h2/dml/UpdatePlan.java | 34 +++- .../query/h2/dml/UpdatePlanBuilder.java | 2 +- 7 files changed, 303 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/03f9fe71/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java index c5f786e..fb098a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.cache.query; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.ignite.cache.query.SqlFieldsQuery; @@ -33,13 +35,23 @@ public final class SqlFieldsQueryEx extends SqlFieldsQuery { /** Whether server side DML should be enabled. */ private boolean skipReducerOnUpdate; + /** Batched arguments. */ + private List<Object[]> batchedArgs; + + public SqlFieldsQueryEx(String sql, Boolean isQry) { + super(sql); + + this.isQry = isQry; + } + /** * @param sql SQL query. * @param isQry Flag indicating whether this object denotes a query or an update operation. */ - public SqlFieldsQueryEx(String sql, Boolean isQry) { - super(sql); - this.isQry = isQry; + public SqlFieldsQueryEx(String sql, Boolean isQry, int batchedArgsSize) { + this(sql, isQry); + + this.batchedArgs = new ArrayList<>(batchedArgsSize); } /** @@ -50,6 +62,7 @@ public final class SqlFieldsQueryEx extends SqlFieldsQuery { this.isQry = qry.isQry; this.skipReducerOnUpdate = qry.skipReducerOnUpdate; + this.batchedArgs = qry.batchedArgs; } /** @@ -151,6 +164,31 @@ public final class SqlFieldsQueryEx extends SqlFieldsQuery { return skipReducerOnUpdate; } + /** + * Add batched arguments. + * + * @param args Arguments. + */ + public void addBatchedArgs(Object[] args) { + if (batchedArgs == null) + batchedArgs = new ArrayList<>(); + + batchedArgs.add(args); + } + + /** + * Batched arguments. + * + * @return Batched arguments. + */ + public List<Object[]> batchedArgs() { + return batchedArgs; + } + + public void clearBatchedArgs() { + batchedArgs = null; + } + /** {@inheritDoc} */ @Override public SqlFieldsQuery copy() { return new SqlFieldsQueryEx(this); http://git-wip-us.apache.org/repos/asf/ignite/blob/03f9fe71/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java index e3b6f5b..2530360 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java @@ -475,28 +475,45 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { if (F.isEmpty(schemaName)) schemaName = QueryUtils.DFLT_SCHEMA; + int qryCnt = req.queries().size(); + int successQueries = 0; - int updCnts[] = new int[req.queries().size()]; + int updCnts[] = new int[qryCnt]; try { - String sql = null; + SqlFieldsQueryEx qry = null; for (JdbcQuery q : req.queries()) { - if (q.sql() != null) - sql = q.sql(); + if (q.sql() != null) { + if (qry != null) { + QueryCursorImpl<List<?>> qryCur = (QueryCursorImpl<List<?>>)ctx.query() + .querySqlFieldsNoCache(qry, true, true).get(0); + + assert !qryCur.isQuery(); + + List<List<?>> items = qryCur.getAll(); + + // TODO: Set on correct positions. + updCnts[successQueries++] = ((Long)items.get(0).get(0)).intValue(); + } + + qry = new SqlFieldsQueryEx(q.sql(), false, qryCnt); - SqlFieldsQuery qry = new SqlFieldsQueryEx(sql, false); + qry.setDistributedJoins(distributedJoins); + qry.setEnforceJoinOrder(enforceJoinOrder); + qry.setCollocated(collocated); + qry.setReplicatedOnly(replicatedOnly); + qry.setLazy(lazy); - qry.setArgs(q.args()); + qry.setSchema(schemaName); + } - qry.setDistributedJoins(distributedJoins); - qry.setEnforceJoinOrder(enforceJoinOrder); - qry.setCollocated(collocated); - qry.setReplicatedOnly(replicatedOnly); - qry.setLazy(lazy); + assert qry != null; - qry.setSchema(schemaName); + qry.addBatchedArgs(q.args()); + } + if (qry != null) { QueryCursorImpl<List<?>> qryCur = (QueryCursorImpl<List<?>>)ctx.query() .querySqlFieldsNoCache(qry, true, true).get(0); @@ -504,6 +521,7 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { List<List<?>> items = qryCur.getAll(); + // TODO: Set on correct positions. updCnts[successQueries++] = ((Long)items.get(0).get(0)).intValue(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/03f9fe71/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java index 9a6b0af..df14c85 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java @@ -22,6 +22,7 @@ import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.LinkedHashMap; @@ -42,6 +43,7 @@ import org.apache.ignite.internal.processors.cache.CacheOperationContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.QueryCursorImpl; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; +import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx; import org.apache.ignite.internal.processors.odbc.SqlStateCode; import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator; import org.apache.ignite.internal.processors.query.GridQueryCancel; @@ -50,6 +52,7 @@ import org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter; import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.h2.dml.DmlBatchSender; import org.apache.ignite.internal.processors.query.h2.dml.DmlDistributedPlanInfo; +import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils; import org.apache.ignite.internal.processors.query.h2.dml.UpdateMode; import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan; import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder; @@ -189,6 +192,80 @@ public class DmlStatementsProcessor { } /** + * Execute DML statement, possibly with few re-attempts in case of concurrent data modifications. + * + * @param schemaName Schema. + * @param conn Connection. + * @param prepared Prepared statement. + * @param fieldsQry Original query. + * @param loc Query locality flag. + * @param filters Cache name and key filter. + * @param cancel Cancel. + * @return Update result (modified items count and failed keys). + * @throws IgniteCheckedException if failed. + */ + private Collection<UpdateResult> updateSqlFieldsBatched(String schemaName, Connection conn, Prepared prepared, + SqlFieldsQueryEx fieldsQry, boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel) + throws IgniteCheckedException { + List<Object[]> argss = fieldsQry.batchedArgs(); + + UpdatePlan plan = getPlanForStatement(schemaName, conn, prepared, fieldsQry, loc, null); + + if (plan.hasRows()) { + GridCacheContext<?, ?> cctx = plan.cacheContext(); + + CacheOperationContext opCtx = cctx.operationContextPerCall(); + + // Force keepBinary for operation context to avoid binary deserialization inside entry processor + if (cctx.binaryMarshaller()) { + CacheOperationContext newOpCtx = null; + + if (opCtx == null) + // Mimics behavior of GridCacheAdapter#keepBinary and GridCacheProxyImpl#keepBinary + newOpCtx = new CacheOperationContext(false, null, true, null, false, null, false); + else if (!opCtx.isKeepBinary()) + newOpCtx = opCtx.keepBinary(); + + if (newOpCtx != null) + cctx.operationContextPerCall(newOpCtx); + } + + try { + List<List<?>> cur = plan.createRows(argss); + + UpdateResult res = processDmlSelectResultBatched(cctx, plan, cur, fieldsQry.getPageSize()); + + Collection<UpdateResult> ress = new ArrayList<>(1); + + // TODO: Wrong! + ress.add(res); + + return ress; + } + finally { + cctx.operationContextPerCall(opCtx); + } + } + else { + // Fallback to previous mode. + Collection<UpdateResult> ress = new ArrayList<>(argss.size()); + + for (Object[] args : argss) { + SqlFieldsQueryEx qry0 = (SqlFieldsQueryEx)fieldsQry.copy(); + + qry0.clearBatchedArgs(); + qry0.setArgs(args); + + UpdateResult res = updateSqlFields(schemaName, conn, prepared, qry0, loc, filters, cancel); + + ress.add(res); + } + + return ress; + } + } + + /** * @param schemaName Schema. * @param c Connection. * @param p Prepared statement. @@ -198,18 +275,44 @@ public class DmlStatementsProcessor { * @throws IgniteCheckedException if failed. */ @SuppressWarnings("unchecked") - QueryCursorImpl<List<?>> updateSqlFieldsDistributed(String schemaName, Connection c, Prepared p, + List<QueryCursorImpl<List<?>>> updateSqlFieldsDistributed(String schemaName, Connection c, Prepared p, SqlFieldsQuery fieldsQry, GridQueryCancel cancel) throws IgniteCheckedException { - UpdateResult res = updateSqlFields(schemaName, c, p, fieldsQry, false, null, cancel); + if (DmlUtils.isBatched(fieldsQry)) { + // TODO: Refactor. + Collection<UpdateResult> ress = updateSqlFieldsBatched(schemaName, c, p, (SqlFieldsQueryEx)fieldsQry, + false, null, cancel); + + ArrayList<QueryCursorImpl<List<?>>> resCurs = new ArrayList<>(ress.size()); - checkUpdateResult(res); + for (UpdateResult res : ress) { + checkUpdateResult(res); - QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList - (Collections.singletonList(res.counter())), cancel, false); + QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList + (Collections.singletonList(res.counter())), cancel, false); - resCur.fieldsMeta(UPDATE_RESULT_META); + resCur.fieldsMeta(UPDATE_RESULT_META); - return resCur; + resCurs.add(resCur); + } + + return resCurs; + } + else { + UpdateResult res = updateSqlFields(schemaName, c, p, fieldsQry, false, null, cancel); + + ArrayList<QueryCursorImpl<List<?>>> resCurs = new ArrayList<>(1); + + checkUpdateResult(res); + + QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList + (Collections.singletonList(res.counter())), cancel, false); + + resCur.fieldsMeta(UPDATE_RESULT_META); + + resCurs.add(resCur); + + return resCurs; + } } /** @@ -399,6 +502,22 @@ public class DmlStatementsProcessor { return processDmlSelectResult(cctx, plan, cur, pageSize); } + private UpdateResult processDmlSelectResultBatched(GridCacheContext cctx, UpdatePlan plan, Collection<List<?>> rows, + int pageSize) throws IgniteCheckedException { + switch (plan.mode()) { + case MERGE: + // TODO + throw new IgniteCheckedException("Unsupported, fix"); + + case INSERT: + return new UpdateResult(doInsertBatched(plan, rows, pageSize), X.EMPTY_OBJECT_ARRAY); + + default: + throw new IgniteSQLException("Unexpected DML operation [mode=" + plan.mode() + ']', + IgniteQueryErrorCode.UNEXPECTED_OPERATION); + } + } + /** * @param cctx Cache context. * @param plan Update plan. @@ -673,6 +792,50 @@ public class DmlStatementsProcessor { } /** + * Execute INSERT statement plan. + * @param cursor Cursor to take inserted data from. + * @param pageSize Batch size for streaming, anything <= 0 for single page operations. + * @return Number of items affected. + * @throws IgniteCheckedException if failed, particularly in case of duplicate keys. + */ + @SuppressWarnings({"unchecked", "ConstantConditions"}) + private long doInsertBatched(UpdatePlan plan, Collection<List<?>> cursor, int pageSize) + throws IgniteCheckedException { + GridCacheContext cctx = plan.cacheContext(); + + // Keys that failed to INSERT due to duplication. + DmlBatchSender sender = new DmlBatchSender(cctx, pageSize); + + for (List<?> row : cursor) { + final IgniteBiTuple keyValPair = plan.processRow(row); + + sender.add(keyValPair.getKey(), new InsertEntryProcessor(keyValPair.getValue())); + } + + // TODO: Tale page size in count? + sender.flush(); + + SQLException resEx = sender.error(); + + if (!F.isEmpty(sender.failedKeys())) { + String msg = "Failed to INSERT some keys because they are already in cache " + + "[keys=" + sender.failedKeys() + ']'; + + SQLException dupEx = new SQLException(msg, SqlStateCode.CONSTRAINT_VIOLATION); + + if (resEx == null) + resEx = dupEx; + else + resEx.setNextException(dupEx); + } + + if (resEx != null) + throw new IgniteSQLException(resEx); + + return sender.updateCount(); + } + + /** * * @param schemaName Schema name. * @param stmt Prepared statement. @@ -810,5 +973,4 @@ public class DmlStatementsProcessor { throw new IgniteSQLException(conEx); } } - } http://git-wip-us.apache.org/repos/asf/ignite/blob/03f9fe71/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 6fdcd27..31a6645 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -1507,14 +1507,17 @@ public class IgniteH2Indexing implements GridQueryIndexing { int paramsCnt = prepared.getParameters().size(); if (paramsCnt > 0) { - if (argsOrig == null || argsOrig.length < firstArg + paramsCnt) { - throw new IgniteException("Invalid number of query parameters. " + - "Cannot find " + (argsOrig.length + 1 - firstArg) + " parameter."); - } + // TODO: Check remainer. + if (prepared.isQuery()) { + if (argsOrig == null || argsOrig.length < firstArg + paramsCnt) { + throw new IgniteException("Invalid number of query parameters. " + + "Cannot find " + (argsOrig.length + 1 - firstArg) + " parameter."); + } - args = Arrays.copyOfRange(argsOrig, firstArg, firstArg + paramsCnt); + args = Arrays.copyOfRange(argsOrig, firstArg, firstArg + paramsCnt); - firstArg += paramsCnt; + firstArg += paramsCnt; + } } cachedQryKey = new H2TwoStepCachedQueryKey(schemaName, sqlQry, grpByCollocated, @@ -1555,7 +1558,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (twoStepQry == null) { if (DmlStatementsProcessor.isDmlStatement(prepared)) { try { - res.add(dmlProc.updateSqlFieldsDistributed(schemaName, c, prepared, + res.addAll(dmlProc.updateSqlFieldsDistributed(schemaName, c, prepared, qry.copy().setSql(sqlQry).setArgs(args), cancel)); continue; http://git-wip-us.apache.org/repos/asf/ignite/blob/03f9fe71/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java index 8d4861e..e4b52a6 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java @@ -22,10 +22,13 @@ import java.sql.Time; import java.sql.Timestamp; import java.util.Date; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; +import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx; import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.h2.H2Utils; import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.h2.util.DateTimeUtils; import org.h2.util.LocalDateTimeUtils; @@ -117,6 +120,22 @@ public class DmlUtils { } /** + * Check whether query is batched. + * + * @param qry Query. + * @return {@code True} if batched. + */ + public static boolean isBatched(SqlFieldsQuery qry) { + if (qry instanceof SqlFieldsQueryEx) { + SqlFieldsQueryEx qry0 = (SqlFieldsQueryEx)qry; + + return !F.isEmpty(qry0.batchedArgs()); + } + + return false; + } + + /** * Private constructor. */ private DmlUtils() { http://git-wip-us.apache.org/repos/asf/ignite/blob/03f9fe71/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java index 6a45c3c..248041a 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java @@ -376,10 +376,10 @@ public final class UpdatePlan { public List<List<?>> createRows(Object[] args) throws IgniteCheckedException { assert rowsNum > 0 && !F.isEmpty(colNames); - List<List<?>> res = new ArrayList<>(rowsNum); - GridH2RowDescriptor desc = tbl.rowDescriptor(); + List<List<?>> res = new ArrayList<>(rowsNum); + for (List<DmlArgument> row : rows) { List<Object> resRow = new ArrayList<>(); @@ -401,6 +401,36 @@ public final class UpdatePlan { return res; } + public List<List<?>> createRows(List<Object[]> argss) throws IgniteCheckedException { + assert rowsNum > 0 && !F.isEmpty(colNames); + + GridH2RowDescriptor desc = tbl.rowDescriptor(); + + List<List<?>> res = new ArrayList<>(rowsNum * argss.size()); + + for (Object[] args : argss) { + for (List<DmlArgument> row : rows) { + List<Object> resRow = new ArrayList<>(); + + for (int j = 0; j < colNames.length; j++) { + Object colVal = row.get(j).get(args); + + if (j == keyColIdx || j == valColIdx) { + Class<?> colCls = j == keyColIdx ? desc.type().keyClass() : desc.type().valueClass(); + + colVal = DmlUtils.convert(colVal, desc, colCls, colTypes[j]); + } + + resRow.add(colVal); + } + + res.add(resRow); + } + } + + return res; + } + /** * @return Update mode. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/03f9fe71/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java index 5ffd264..3305b00 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java @@ -619,7 +619,7 @@ public final class UpdatePlanBuilder { Connection conn, SqlFieldsQuery fieldsQry, boolean loc, String selectQry, String cacheName) throws IgniteCheckedException { - if (loc || !isSkipReducerOnUpdateQuery(fieldsQry)) + if (loc || !isSkipReducerOnUpdateQuery(fieldsQry) || DmlUtils.isBatched(fieldsQry)) return null; assert conn != null;
