http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java index 98117b2..9e55442 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.query.h2; import java.lang.reflect.Array; +import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Time; @@ -150,7 +151,8 @@ public class DmlStatementsProcessor { * Execute DML statement, possibly with few re-attempts in case of concurrent data modifications. * * @param schemaName Schema. - * @param prepared Prepared JDBC statement. + * @param conn Connection. + * @param prepared Prepared statement. * @param fieldsQry Original query. * @param loc Query locality flag. * @param filters Cache name and key filter. @@ -158,13 +160,14 @@ public class DmlStatementsProcessor { * @return Update result (modified items count and failed keys). * @throws IgniteCheckedException if failed. */ - private UpdateResult updateSqlFields(String schemaName, Prepared prepared, SqlFieldsQuery fieldsQry, - boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel) throws IgniteCheckedException { + private UpdateResult updateSqlFields(String schemaName, Connection conn, Prepared prepared, + SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel) + throws IgniteCheckedException { Object[] errKeys = null; long items = 0; - UpdatePlan plan = getPlanForStatement(schemaName, prepared, null); + UpdatePlan plan = getPlanForStatement(schemaName, conn, prepared, fieldsQry, loc, null); GridCacheContext<?, ?> cctx = plan.tbl.rowDescriptor().context(); @@ -188,14 +191,14 @@ public class DmlStatementsProcessor { UpdateResult r; try { - r = executeUpdateStatement(schemaName, cctx, prepared, fieldsQry, loc, filters, cancel, errKeys); + r = executeUpdateStatement(schemaName, cctx, conn, prepared, fieldsQry, loc, filters, cancel, errKeys); } finally { cctx.operationContextPerCall(opCtx); } - items += r.cnt; - errKeys = r.errKeys; + items += r.counter(); + errKeys = r.errorKeys(); if (F.isEmpty(errKeys)) break; @@ -213,19 +216,22 @@ public class DmlStatementsProcessor { /** * @param schemaName Schema. - * @param p Prepared. + * @param c Connection. + * @param p Prepared statement. * @param fieldsQry Initial query * @param cancel Query cancel. * @return Update result wrapped into {@link GridQueryFieldsResult} * @throws IgniteCheckedException if failed. */ @SuppressWarnings("unchecked") - QueryCursorImpl<List<?>> updateSqlFieldsDistributed(String schemaName, Prepared p, + QueryCursorImpl<List<?>> updateSqlFieldsDistributed(String schemaName, Connection c, Prepared p, SqlFieldsQuery fieldsQry, GridQueryCancel cancel) throws IgniteCheckedException { - UpdateResult res = updateSqlFields(schemaName, p, fieldsQry, false, null, cancel); + UpdateResult res = updateSqlFields(schemaName, c, p, fieldsQry, false, null, cancel); + + checkUpdateResult(res); QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList - (Collections.singletonList(res.cnt)), cancel, false); + (Collections.singletonList(res.counter())), cancel, false); resCur.fieldsMeta(UPDATE_RESULT_META); @@ -236,6 +242,7 @@ public class DmlStatementsProcessor { * Execute DML statement on local cache. * * @param schemaName Schema. + * @param conn Connection. * @param stmt Prepared statement. * @param fieldsQry Fields query. * @param filters Cache name and key filter. @@ -244,14 +251,14 @@ public class DmlStatementsProcessor { * @throws IgniteCheckedException if failed. */ @SuppressWarnings("unchecked") - GridQueryFieldsResult updateSqlFieldsLocal(String schemaName, PreparedStatement stmt, + GridQueryFieldsResult updateSqlFieldsLocal(String schemaName, Connection conn, PreparedStatement stmt, SqlFieldsQuery fieldsQry, IndexingQueryFilter filters, GridQueryCancel cancel) throws IgniteCheckedException { - UpdateResult res = updateSqlFields(schemaName, GridSqlQueryParser.prepared(stmt), fieldsQry, true, + UpdateResult res = updateSqlFields(schemaName, conn, GridSqlQueryParser.prepared(stmt), fieldsQry, true, filters, cancel); return new GridQueryFieldsResultAdapter(UPDATE_RESULT_META, - new IgniteSingletonIterator(Collections.singletonList(res.cnt))); + new IgniteSingletonIterator(Collections.singletonList(res.counter()))); } /** @@ -272,7 +279,7 @@ public class DmlStatementsProcessor { assert p != null; - UpdatePlan plan = UpdatePlanBuilder.planForStatement(p, null); + UpdatePlan plan = UpdatePlanBuilder.planForStatement(p, true, idx, null, null, null); if (!F.eq(streamer.cacheName(), plan.tbl.rowDescriptor().context().name())) throw new IgniteSQLException("Cross cache streaming is not supported, please specify cache explicitly" + @@ -340,6 +347,7 @@ public class DmlStatementsProcessor { * * @param schemaName Schema name. * @param cctx Cache context. + * @param c Connection. * @param prepared Prepared statement for DML query. * @param fieldsQry Fields query. * @param loc Local query flag. @@ -350,14 +358,14 @@ public class DmlStatementsProcessor { * @throws IgniteCheckedException if failed. */ @SuppressWarnings({"ConstantConditions", "unchecked"}) - private UpdateResult executeUpdateStatement(String schemaName, final GridCacheContext cctx, + private UpdateResult executeUpdateStatement(String schemaName, final GridCacheContext cctx, Connection c, Prepared prepared, SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel, Object[] failedKeys) throws IgniteCheckedException { int mainCacheId = CU.cacheId(cctx.name()); Integer errKeysPos = null; - UpdatePlan plan = getPlanForStatement(schemaName, prepared, errKeysPos); + UpdatePlan plan = getPlanForStatement(schemaName, c, prepared, fieldsQry, loc, errKeysPos); if (plan.fastUpdateArgs != null) { assert F.isEmpty(failedKeys) && errKeysPos == null; @@ -365,6 +373,14 @@ public class DmlStatementsProcessor { return doFastUpdate(plan, fieldsQry.getArgs()); } + if (plan.distributed != null) { + UpdateResult result = doDistributedUpdate(schemaName, fieldsQry, plan, cancel); + + // null is returned in case not all nodes support distributed DML. + if (result != null) + return result; + } + assert !F.isEmpty(plan.selectQry); QueryCursorImpl<List<?>> cur; @@ -401,18 +417,31 @@ public class DmlStatementsProcessor { int pageSize = loc ? 0 : fieldsQry.getPageSize(); + return processDmlSelectResult(cctx, plan, cur, pageSize); + } + + /** + * @param cctx Cache context. + * @param plan Update plan. + * @param cursor Cursor over select results. + * @param pageSize Page size. + * @return Pair [number of successfully processed items; keys that have failed to be processed] + * @throws IgniteCheckedException if failed. + */ + private UpdateResult processDmlSelectResult(GridCacheContext cctx, UpdatePlan plan, Iterable<List<?>> cursor, + int pageSize) throws IgniteCheckedException { switch (plan.mode) { case MERGE: - return new UpdateResult(doMerge(plan, cur, pageSize), X.EMPTY_OBJECT_ARRAY); + return new UpdateResult(doMerge(plan, cursor, pageSize), X.EMPTY_OBJECT_ARRAY); case INSERT: - return new UpdateResult(doInsert(plan, cur, pageSize), X.EMPTY_OBJECT_ARRAY); + return new UpdateResult(doInsert(plan, cursor, pageSize), X.EMPTY_OBJECT_ARRAY); case UPDATE: - return doUpdate(plan, cur, pageSize); + return doUpdate(plan, cursor, pageSize); case DELETE: - return doDelete(cctx, cur, pageSize); + return doDelete(cctx, cursor, pageSize); default: throw new IgniteSQLException("Unexpected DML operation [mode=" + plan.mode + ']', @@ -425,20 +454,23 @@ public class DmlStatementsProcessor { * if available. * * @param schema Schema. - * @param p Prepared JDBC statement. + * @param conn Connection. + * @param p Prepared statement. + * @param fieldsQry Original fields query. + * @param loc Local query flag. * @return Update plan. */ @SuppressWarnings({"unchecked", "ConstantConditions"}) - private UpdatePlan getPlanForStatement(String schema, Prepared p, @Nullable Integer errKeysPos) - throws IgniteCheckedException { - H2DmlPlanKey planKey = new H2DmlPlanKey(schema, p.getSQL()); + private UpdatePlan getPlanForStatement(String schema, Connection conn, Prepared p, SqlFieldsQuery fieldsQry, + boolean loc, @Nullable Integer errKeysPos) throws IgniteCheckedException { + H2DmlPlanKey planKey = new H2DmlPlanKey(schema, p.getSQL(), loc, fieldsQry); UpdatePlan res = (errKeysPos == null ? planCache.get(planKey) : null); if (res != null) return res; - res = UpdatePlanBuilder.planForStatement(p, errKeysPos); + res = UpdatePlanBuilder.planForStatement(p, loc, idx, conn, fieldsQry, errKeysPos); // Don't cache re-runs if (errKeysPos == null) @@ -449,6 +481,7 @@ public class DmlStatementsProcessor { /** * Perform single cache operation based on given args. + * @param plan Update plan. * @param args Query parameters. * @return 1 if an item was affected, 0 otherwise. * @throws IgniteCheckedException if failed. @@ -487,6 +520,25 @@ public class DmlStatementsProcessor { } /** + * @param schemaName Schema name. + * @param fieldsQry Initial query. + * @param plan Update plan. + * @param cancel Cancel state. + * @return Update result. + * @throws IgniteCheckedException if failed. + */ + private UpdateResult doDistributedUpdate(String schemaName, SqlFieldsQuery fieldsQry, UpdatePlan plan, + GridQueryCancel cancel) throws IgniteCheckedException { + assert plan.distributed != null; + + if (cancel == null) + cancel = new GridQueryCancel(); + + return idx.runDistributedUpdate(schemaName, fieldsQry, plan.distributed.getCacheIds(), + plan.distributed.isReplicatedOnly(), cancel); + } + + /** * Perform DELETE operation on top of results of SELECT. * @param cctx Cache context. * @param cursor SELECT results. @@ -573,7 +625,7 @@ public class DmlStatementsProcessor { GridQueryProperty prop = plan.tbl.rowDescriptor().type().property(plan.colNames[i]); - assert prop != null; + assert prop != null : "Unknown property: " + plan.colNames[i]; newColVals.put(plan.colNames[i], convert(row.get(i + 2), desc, prop.type(), plan.colTypes[i])); } @@ -981,6 +1033,31 @@ public class DmlStatementsProcessor { return new IgniteBiTuple<>(key, val); } + /** + * + * @param schemaName Schema name. + * @param stmt Prepared statement. + * @param fldsQry Query. + * @param filter Filter. + * @param cancel Cancel state. + * @param local Locality flag. + * @return Update result. + * @throws IgniteCheckedException if failed. + */ + UpdateResult mapDistributedUpdate(String schemaName, PreparedStatement stmt, SqlFieldsQuery fldsQry, + IndexingQueryFilter filter, GridQueryCancel cancel, boolean local) throws IgniteCheckedException { + Connection c; + + try { + c = stmt.getConnection(); + } + catch (SQLException e) { + throw new IgniteCheckedException(e); + } + + return updateSqlFields(schemaName, c, GridSqlQueryParser.prepared(stmt), fldsQry, local, filter, cancel); + } + /** */ private final static class InsertEntryProcessor implements EntryProcessor<Object, Object, Boolean> { /** Value to set. */ @@ -1079,26 +1156,19 @@ public class DmlStatementsProcessor { return stmt instanceof Merge || stmt instanceof Insert || stmt instanceof Update || stmt instanceof Delete; } - /** Update result - modifications count and keys to re-run query with, if needed. */ - private final static class UpdateResult { - /** Result to return for operations that affected 1 item - mostly to be used for fast updates and deletes. */ - final static UpdateResult ONE = new UpdateResult(1, X.EMPTY_OBJECT_ARRAY); - - /** Result to return for operations that affected 0 items - mostly to be used for fast updates and deletes. */ - final static UpdateResult ZERO = new UpdateResult(0, X.EMPTY_OBJECT_ARRAY); - - /** Number of processed items. */ - final long cnt; + /** + * Check update result for erroneous keys and throws concurrent update exception if necessary. + * + * @param r Update result. + */ + static void checkUpdateResult(UpdateResult r) { + if (!F.isEmpty(r.errorKeys())) { + String msg = "Failed to update some keys because they had been modified concurrently " + + "[keys=" + r.errorKeys() + ']'; - /** Keys that failed to be updated or deleted due to concurrent modification of values. */ - @NotNull - final Object[] errKeys; + SQLException conEx = createJdbcSqlException(msg, IgniteQueryErrorCode.CONCURRENT_UPDATE); - /** */ - @SuppressWarnings("ConstantConditions") - private UpdateResult(long cnt, Object[] errKeys) { - this.cnt = cnt; - this.errKeys = U.firstNotNull(errKeys, X.EMPTY_OBJECT_ARRAY); + throw new IgniteSQLException(conEx); } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2DmlPlanKey.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2DmlPlanKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2DmlPlanKey.java index 3a43ea1..455b5e5 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2DmlPlanKey.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2DmlPlanKey.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.query.h2; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; @@ -30,20 +32,33 @@ public class H2DmlPlanKey { /** SQL. */ private final String sql; + /** Flags. */ + private final byte flags; + /** * Constructor. * * @param schemaName Schema name. * @param sql SQL. */ - public H2DmlPlanKey(String schemaName, String sql) { + public H2DmlPlanKey(String schemaName, String sql, boolean loc, SqlFieldsQuery fieldsQry) { this.schemaName = schemaName; this.sql = sql; + + if (loc || !UpdatePlanBuilder.isSkipReducerOnUpdateQuery(fieldsQry)) + this.flags = 0; // flags only relevant for server side updates. + else { + this.flags = (byte)(1 + + (fieldsQry.isDistributedJoins() ? 2 : 0) + + (fieldsQry.isEnforceJoinOrder() ? 4 : 0) + + (fieldsQry.isCollocated() ? 8 : 0)); + } } /** {@inheritDoc} */ @Override public int hashCode() { - return 31 * (schemaName != null ? schemaName.hashCode() : 0) + (sql != null ? sql.hashCode() : 0); + return 31 * (31 * (schemaName != null ? schemaName.hashCode() : 0) + (sql != null ? sql.hashCode() : 0)) + + flags; } /** {@inheritDoc} */ @@ -56,7 +71,7 @@ public class H2DmlPlanKey { H2DmlPlanKey other = (H2DmlPlanKey)o; - return F.eq(sql, other.sql) && F.eq(schemaName, other.schemaName); + return F.eq(sql, other.sql) && F.eq(schemaName, other.schemaName) && flags == other.flags; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 22ed592..fddd2e8 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -59,7 +59,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridTopic; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.jdbc2.JdbcSqlFieldsQuery; +import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryImpl; import org.apache.ignite.internal.processors.cache.CacheObject; @@ -834,7 +834,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { fldsQry.setEnforceJoinOrder(enforceJoinOrder); fldsQry.setTimeout(timeout, TimeUnit.MILLISECONDS); - return dmlProc.updateSqlFieldsLocal(schemaName, stmt, fldsQry, filter, cancel); + return dmlProc.updateSqlFieldsLocal(schemaName, conn, stmt, fldsQry, filter, cancel); } else if (DdlStatementsProcessor.isDdlStatement(p)) throw new IgniteSQLException("DDL statements are supported for the whole cluster only", @@ -1215,6 +1215,27 @@ public class IgniteH2Indexing implements GridQueryIndexing { }; } + /** + * Run DML on remote nodes. + * + * @param schemaName Schema name. + * @param fieldsQry Initial update query. + * @param cacheIds Cache identifiers. + * @param isReplicatedOnly Whether query uses only replicated caches. + * @param cancel Cancel state. + * @return Update result. + */ + UpdateResult runDistributedUpdate( + String schemaName, + SqlFieldsQuery fieldsQry, + List<Integer> cacheIds, + boolean isReplicatedOnly, + GridQueryCancel cancel) { + return rdcQryExec.update(schemaName, cacheIds, fieldsQry.getSql(), fieldsQry.getArgs(), + fieldsQry.isEnforceJoinOrder(), fieldsQry.getPageSize(), fieldsQry.getTimeout(), + fieldsQry.getPartitions(), isReplicatedOnly, cancel); + } + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public <K, V> QueryCursor<Cache.Entry<K, V>> queryDistributedSql(String schemaName, String cacheName, @@ -1429,8 +1450,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (twoStepQry == null) { if (DmlStatementsProcessor.isDmlStatement(prepared)) { try { - res.add(dmlProc.updateSqlFieldsDistributed(schemaName, prepared, - new SqlFieldsQuery(qry).setSql(sqlQry).setArgs(args), cancel)); + res.add(dmlProc.updateSqlFieldsDistributed(schemaName, c, prepared, + qry.copy().setSql(sqlQry).setArgs(args), cancel)); continue; } @@ -1452,33 +1473,13 @@ public class IgniteH2Indexing implements GridQueryIndexing { } } - LinkedHashSet<Integer> caches0 = new LinkedHashSet<>(); - assert twoStepQry != null; - int tblCnt = twoStepQry.tablesCount(); - - if (mainCacheId != null) - caches0.add(mainCacheId); - - if (tblCnt > 0) { - for (QueryTable tblKey : twoStepQry.tables()) { - GridH2Table tbl = dataTable(tblKey); - - int cacheId = CU.cacheId(tbl.cacheName()); - - caches0.add(cacheId); - } - } + List<Integer> cacheIds = collectCacheIds(mainCacheId, twoStepQry); - if (caches0.isEmpty()) + if (F.isEmpty(cacheIds)) twoStepQry.local(true); else { - //Prohibit usage indices with different numbers of segments in same query. - List<Integer> cacheIds = new ArrayList<>(caches0); - - checkCacheIndexSegmentation(cacheIds); - twoStepQry.cacheIds(cacheIds); twoStepQry.local(qry.isLocal()); } @@ -1517,7 +1518,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { * @param isQry {@code true} for select queries, otherwise (DML/DDL queries) {@code false}. */ private void checkQueryType(SqlFieldsQuery qry, boolean isQry) { - if (qry instanceof JdbcSqlFieldsQuery && ((JdbcSqlFieldsQuery)qry).isQuery() != isQry) + if (qry instanceof SqlFieldsQueryEx && ((SqlFieldsQueryEx)qry).isQuery() != null && + ((SqlFieldsQueryEx)qry).isQuery() != isQry) throw new IgniteSQLException("Given statement type does not match that declared by JDBC driver", IgniteQueryErrorCode.STMT_TYPE_MISMATCH); } @@ -1568,6 +1570,29 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** + * Run DML request from other node. + * + * @param schemaName Schema name. + * @param fldsQry Query. + * @param filter Filter. + * @param cancel Cancel state. + * @param local Locality flag. + * @return Update result. + * @throws IgniteCheckedException if failed. + */ + public UpdateResult mapDistributedUpdate(String schemaName, SqlFieldsQuery fldsQry, IndexingQueryFilter filter, + GridQueryCancel cancel, boolean local) throws IgniteCheckedException { + Connection conn = connectionForSchema(schemaName); + + H2Utils.setupConnection(conn, false, fldsQry.isEnforceJoinOrder()); + + PreparedStatement stmt = preparedStatementWithParams(conn, fldsQry.getSql(), + Arrays.asList(fldsQry.getArgs()), true); + + return dmlProc.mapDistributedUpdate(schemaName, stmt, fldsQry, filter, cancel, local); + } + + /** * @throws IllegalStateException if segmented indices used with non-segmented indices. */ private void checkCacheIndexSegmentation(List<Integer> cacheIds) { @@ -2524,6 +2549,43 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** + * Collect cache identifiers from two-step query. + * + * @param mainCacheId Id of main cache. + * @param twoStepQry Two-step query. + * @return Result. + */ + public List<Integer> collectCacheIds(@Nullable Integer mainCacheId, GridCacheTwoStepQuery twoStepQry) { + LinkedHashSet<Integer> caches0 = new LinkedHashSet<>(); + + int tblCnt = twoStepQry.tablesCount(); + + if (mainCacheId != null) + caches0.add(mainCacheId); + + if (tblCnt > 0) { + for (QueryTable tblKey : twoStepQry.tables()) { + GridH2Table tbl = dataTable(tblKey); + + int cacheId = CU.cacheId(tbl.cacheName()); + + caches0.add(cacheId); + } + } + + if (caches0.isEmpty()) + return null; + else { + //Prohibit usage indices with different numbers of segments in same query. + List<Integer> cacheIds = new ArrayList<>(caches0); + + checkCacheIndexSegmentation(cacheIds); + + return cacheIds; + } + } + + /** * Closeable iterator. */ private interface ClIter<X> extends AutoCloseable, Iterator<X> { http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/UpdateResult.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/UpdateResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/UpdateResult.java new file mode 100644 index 0000000..de0e63f --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/UpdateResult.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2; + +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Update result - modifications count and keys to re-run query with, if needed. + */ +public final class UpdateResult { + /** Result to return for operations that affected 1 item - mostly to be used for fast updates and deletes. */ + final static UpdateResult ONE = new UpdateResult(1, X.EMPTY_OBJECT_ARRAY); + + /** Result to return for operations that affected 0 items - mostly to be used for fast updates and deletes. */ + final static UpdateResult ZERO = new UpdateResult(0, X.EMPTY_OBJECT_ARRAY); + + /** Number of processed items. */ + private final long cnt; + + /** Keys that failed to be updated or deleted due to concurrent modification of values. */ + private final Object[] errKeys; + + /** + * Constructor. + * + * @param cnt Updated rows count. + * @param errKeys Array of erroneous keys. + */ + public @SuppressWarnings("ConstantConditions") UpdateResult(long cnt, Object[] errKeys) { + this.cnt = cnt; + this.errKeys = U.firstNotNull(errKeys, X.EMPTY_OBJECT_ARRAY); + } + + /** + * @return Update counter. + */ + public long counter() { + return cnt; + } + + /** + * @return Error keys. + */ + public Object[] errorKeys() { + return errKeys; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java index b81ac60..a99d811 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.query.h2.dml; +import java.util.List; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; import org.apache.ignite.internal.util.typedef.F; @@ -64,10 +65,13 @@ public final class UpdatePlan { /** Arguments for fast UPDATE or DELETE. */ public final FastUpdateArguments fastUpdateArgs; + /** Additional info for distributed update. */ + public final DistributedPlanInfo distributed; + /** */ private UpdatePlan(UpdateMode mode, GridH2Table tbl, String[] colNames, int[] colTypes, KeyValueSupplier keySupplier, KeyValueSupplier valSupplier, int keyColIdx, int valColIdx, String selectQry, boolean isLocSubqry, - int rowsNum, FastUpdateArguments fastUpdateArgs) { + int rowsNum, FastUpdateArguments fastUpdateArgs, DistributedPlanInfo distributed) { this.colNames = colNames; this.colTypes = colTypes; this.rowsNum = rowsNum; @@ -83,46 +87,84 @@ public final class UpdatePlan { this.selectQry = selectQry; this.isLocSubqry = isLocSubqry; this.fastUpdateArgs = fastUpdateArgs; + this.distributed = distributed; } /** */ public static UpdatePlan forMerge(GridH2Table tbl, String[] colNames, int[] colTypes, KeyValueSupplier keySupplier, KeyValueSupplier valSupplier, int keyColIdx, int valColIdx, String selectQry, boolean isLocSubqry, - int rowsNum) { + int rowsNum, DistributedPlanInfo distributed) { assert !F.isEmpty(colNames); return new UpdatePlan(UpdateMode.MERGE, tbl, colNames, colTypes, keySupplier, valSupplier, keyColIdx, valColIdx, - selectQry, isLocSubqry, rowsNum, null); + selectQry, isLocSubqry, rowsNum, null, distributed); } /** */ public static UpdatePlan forInsert(GridH2Table tbl, String[] colNames, int[] colTypes, KeyValueSupplier keySupplier, - KeyValueSupplier valSupplier, int keyColIdx, int valColIdx, String selectQry, boolean isLocSubqry, int rowsNum) { + KeyValueSupplier valSupplier, int keyColIdx, int valColIdx, String selectQry, boolean isLocSubqry, + int rowsNum, DistributedPlanInfo distributed) { assert !F.isEmpty(colNames); - return new UpdatePlan(UpdateMode.INSERT, tbl, colNames, colTypes, keySupplier, valSupplier, keyColIdx, valColIdx, - selectQry, isLocSubqry, rowsNum, null); + return new UpdatePlan(UpdateMode.INSERT, tbl, colNames, colTypes, keySupplier, valSupplier, keyColIdx, + valColIdx, selectQry, isLocSubqry, rowsNum, null, distributed); } /** */ public static UpdatePlan forUpdate(GridH2Table tbl, String[] colNames, int[] colTypes, KeyValueSupplier valSupplier, - int valColIdx, String selectQry) { + int valColIdx, String selectQry, DistributedPlanInfo distributed) { assert !F.isEmpty(colNames); return new UpdatePlan(UpdateMode.UPDATE, tbl, colNames, colTypes, null, valSupplier, -1, valColIdx, selectQry, - false, 0, null); + false, 0, null, distributed); } /** */ - public static UpdatePlan forDelete(GridH2Table tbl, String selectQry) { - return new UpdatePlan(UpdateMode.DELETE, tbl, null, null, null, null, -1, -1, selectQry, false, 0, null); + public static UpdatePlan forDelete(GridH2Table tbl, String selectQry, DistributedPlanInfo distributed) { + return new UpdatePlan(UpdateMode.DELETE, tbl, null, null, null, null, -1, -1, selectQry, false, 0, null, + distributed); } /** */ public static UpdatePlan forFastUpdate(UpdateMode mode, GridH2Table tbl, FastUpdateArguments fastUpdateArgs) { assert mode == UpdateMode.UPDATE || mode == UpdateMode.DELETE; - return new UpdatePlan(mode, tbl, null, null, null, null, -1, -1, null, false, 0, fastUpdateArgs); + return new UpdatePlan(mode, tbl, null, null, null, null, -1, -1, null, false, 0, fastUpdateArgs, null); } + /** + * Additional information about distributed update plan. + */ + public final static class DistributedPlanInfo { + /** Whether update involves only replicated caches. */ + private final boolean replicatedOnly; + + /** Identifiers of caches involved in update (used for cluster nodes mapping). */ + private final List<Integer> cacheIds; + + /** + * Constructor. + * + * @param replicatedOnly Whether all caches are replicated. + * @param cacheIds List of cache identifiers. + */ + DistributedPlanInfo(boolean replicatedOnly, List<Integer> cacheIds) { + this.replicatedOnly = replicatedOnly; + this.cacheIds = cacheIds; + } + + /** + * @return {@code true} in case all involved caches are replicated. + */ + public boolean isReplicatedOnly() { + return replicatedOnly; + } + + /** + * @return cache identifiers. + */ + public List<Integer> getCacheIds() { + return cacheIds; + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java index 804f7d8..c845266 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java @@ -18,19 +18,26 @@ package org.apache.ignite.internal.processors.query.h2.dml; import java.lang.reflect.Constructor; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; import java.util.HashSet; import java.util.List; import java.util.Set; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.binary.BinaryObjectBuilder; +import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; +import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx; import org.apache.ignite.internal.processors.query.GridQueryProperty; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.processors.query.h2.DmlStatementsProcessor; +import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; import org.apache.ignite.internal.processors.query.h2.sql.DmlAstUtils; @@ -41,12 +48,15 @@ import org.apache.ignite.internal.processors.query.h2.sql.GridSqlInsert; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlMerge; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuery; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser; +import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlSelect; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlTable; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlUnion; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlUpdate; import org.apache.ignite.internal.util.GridUnsafe; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.h2.command.Prepared; @@ -71,29 +81,39 @@ public final class UpdatePlanBuilder { * if available. * * @param prepared H2's {@link Prepared}. + * @param loc Local query flag. + * @param idx Indexing. + * @param conn Connection. + * @param fieldsQuery Original query. * @return Update plan. */ - public static UpdatePlan planForStatement(Prepared prepared, - @Nullable Integer errKeysPos) throws IgniteCheckedException { + public static UpdatePlan planForStatement(Prepared prepared, boolean loc, IgniteH2Indexing idx, + @Nullable Connection conn, @Nullable SqlFieldsQuery fieldsQuery, @Nullable Integer errKeysPos) + throws IgniteCheckedException { assert !prepared.isQuery(); GridSqlStatement stmt = new GridSqlQueryParser(false).parse(prepared); if (stmt instanceof GridSqlMerge || stmt instanceof GridSqlInsert) - return planForInsert(stmt); + return planForInsert(stmt, loc, idx, conn, fieldsQuery); else - return planForUpdate(stmt, errKeysPos); + return planForUpdate(stmt, loc, idx, conn, fieldsQuery, errKeysPos); } /** * Prepare update plan for INSERT or MERGE. * * @param stmt INSERT or MERGE statement. + * @param loc Local query flag. + * @param idx Indexing. + * @param conn Connection. + * @param fieldsQuery Original query. * @return Update plan. * @throws IgniteCheckedException if failed. */ @SuppressWarnings("ConstantConditions") - private static UpdatePlan planForInsert(GridSqlStatement stmt) throws IgniteCheckedException { + private static UpdatePlan planForInsert(GridSqlStatement stmt, boolean loc, IgniteH2Indexing idx, + @Nullable Connection conn, @Nullable SqlFieldsQuery fieldsQuery) throws IgniteCheckedException { GridSqlQuery sel; GridSqlElement target; @@ -191,23 +211,33 @@ public final class UpdatePlanBuilder { KeyValueSupplier keySupplier = createSupplier(cctx, desc.type(), keyColIdx, hasKeyProps, true, false); KeyValueSupplier valSupplier = createSupplier(cctx, desc.type(), valColIdx, hasValProps, false, false); + String selectSql = sel.getSQL(); + + UpdatePlan.DistributedPlanInfo distributed = (rowsNum == 0 && !F.isEmpty(selectSql)) ? + checkPlanCanBeDistributed(idx, conn, fieldsQuery, loc, selectSql, tbl.dataTable().cacheName()) : null; + if (stmt instanceof GridSqlMerge) return UpdatePlan.forMerge(tbl.dataTable(), colNames, colTypes, keySupplier, valSupplier, keyColIdx, - valColIdx, sel.getSQL(), !isTwoStepSubqry, rowsNum); + valColIdx, selectSql, !isTwoStepSubqry, rowsNum, distributed); else return UpdatePlan.forInsert(tbl.dataTable(), colNames, colTypes, keySupplier, valSupplier, keyColIdx, - valColIdx, sel.getSQL(), !isTwoStepSubqry, rowsNum); + valColIdx, selectSql, !isTwoStepSubqry, rowsNum, distributed); } /** * Prepare update plan for UPDATE or DELETE. * * @param stmt UPDATE or DELETE statement. + * @param loc Local query flag. + * @param idx Indexing. + * @param conn Connection. + * @param fieldsQuery Original query. * @param errKeysPos index to inject param for re-run keys at. Null if it's not a re-run plan. * @return Update plan. * @throws IgniteCheckedException if failed. */ - private static UpdatePlan planForUpdate(GridSqlStatement stmt, @Nullable Integer errKeysPos) + private static UpdatePlan planForUpdate(GridSqlStatement stmt, boolean loc, IgniteH2Indexing idx, + @Nullable Connection conn, @Nullable SqlFieldsQuery fieldsQuery, @Nullable Integer errKeysPos) throws IgniteCheckedException { GridSqlElement target; @@ -286,12 +316,23 @@ public final class UpdatePlanBuilder { sel = DmlAstUtils.selectForUpdate((GridSqlUpdate) stmt, errKeysPos); - return UpdatePlan.forUpdate(gridTbl, colNames, colTypes, newValSupplier, valColIdx, sel.getSQL()); + String selectSql = sel.getSQL(); + + UpdatePlan.DistributedPlanInfo distributed = F.isEmpty(selectSql) ? null : + checkPlanCanBeDistributed(idx, conn, fieldsQuery, loc, selectSql, tbl.dataTable().cacheName()); + + return UpdatePlan.forUpdate(gridTbl, colNames, colTypes, newValSupplier, valColIdx, selectSql, + distributed); } else { sel = DmlAstUtils.selectForDelete((GridSqlDelete) stmt, errKeysPos); - return UpdatePlan.forDelete(gridTbl, sel.getSQL()); + String selectSql = sel.getSQL(); + + UpdatePlan.DistributedPlanInfo distributed = F.isEmpty(selectSql) ? null : + checkPlanCanBeDistributed(idx, conn, fieldsQuery, loc, selectSql, tbl.dataTable().cacheName()); + + return UpdatePlan.forDelete(gridTbl, selectSql, distributed); } } } @@ -494,6 +535,62 @@ public final class UpdatePlanBuilder { } /** + * Checks whether the given update plan can be distributed and returns additional info. + * + * @param idx Indexing. + * @param conn Connection. + * @param fieldsQry Initial update query. + * @param loc Local query flag. + * @param selectQry Derived select query. + * @param cacheName Cache name. + * @return distributed update plan info, or {@code null} if cannot be distributed. + * @throws IgniteCheckedException if failed. + */ + private static UpdatePlan.DistributedPlanInfo checkPlanCanBeDistributed(IgniteH2Indexing idx, + Connection conn, SqlFieldsQuery fieldsQry, boolean loc, String selectQry, String cacheName) + throws IgniteCheckedException { + + if (loc || !isSkipReducerOnUpdateQuery(fieldsQry)) + return null; + + assert conn != null; + + try { + // Get a new prepared statement for derived select query. + try (PreparedStatement stmt = conn.prepareStatement(selectQry)) { + idx.bindParameters(stmt, F.asList(fieldsQry.getArgs())); + + GridCacheTwoStepQuery qry = GridSqlQuerySplitter.split(conn, + GridSqlQueryParser.prepared(stmt), + fieldsQry.getArgs(), + fieldsQry.isCollocated(), + fieldsQry.isDistributedJoins(), + fieldsQry.isEnforceJoinOrder(), idx); + + boolean distributed = qry.skipMergeTable() && qry.mapQueries().size() == 1 && + !qry.mapQueries().get(0).hasSubQueries(); + + return distributed ? new UpdatePlan.DistributedPlanInfo(qry.isReplicatedOnly(), + idx.collectCacheIds(CU.cacheId(cacheName), qry)): null; + } + } + catch (SQLException e) { + throw new IgniteCheckedException(e); + } + } + + /** + * Checks whether query flags are compatible with server side update. + * + * @param qry Query. + * @return {@code true} if update can be distributed. + */ + public static boolean isSkipReducerOnUpdateQuery(SqlFieldsQuery qry) { + return qry != null && !qry.isLocal() && + qry instanceof SqlFieldsQueryEx && ((SqlFieldsQueryEx)qry).isSkipReducerOnUpdate(); + } + + /** * Simple supplier that just takes specified element of a given row. */ private final static class PlainValueSupplier implements KeyValueSupplier { http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java index 7f28203..c96b486 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java @@ -1509,6 +1509,19 @@ public class GridSqlQuerySplitter { rdcQry.distinct(true); } + // -- SUB-QUERIES + boolean hasSubQueries = hasSubQueries(mapQry.where()) || hasSubQueries(mapQry.from()); + + if (!hasSubQueries) { + for (int i = 0; i < mapQry.columns(false).size(); i++) { + if (hasSubQueries(mapQry.column(i))) { + hasSubQueries = true; + + break; + } + } + } + // Replace the given select with generated reduce query in the parent. prnt.child(childIdx, rdcQry); @@ -1519,6 +1532,7 @@ public class GridSqlQuerySplitter { map.columns(collectColumns(mapExps)); map.sortColumns(mapQry.sort()); map.partitioned(hasPartitionedTables(mapQry)); + map.hasSubQueries(hasSubQueries); if (map.isPartitioned()) map.derivedPartitions(derivePartitionsFromQuery(mapQry, ctx)); @@ -1543,6 +1557,25 @@ public class GridSqlQuerySplitter { } /** + * @param ast Map query AST. + * @return {@code true} If the given AST has sub-queries. + */ + private boolean hasSubQueries(GridSqlAst ast) { + if (ast == null) + return false; + + if (ast instanceof GridSqlSubquery) + return true; + + for (int childIdx = 0; childIdx < ast.size(); childIdx++) { + if (hasSubQueries(ast.child(childIdx))) + return true; + } + + return false; + } + + /** * @param sqlQry Query. * @param qryAst Select AST. * @param params All parameters. http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/DistributedUpdateRun.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/DistributedUpdateRun.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/DistributedUpdateRun.java new file mode 100644 index 0000000..a783b8a --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/DistributedUpdateRun.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.UUID; +import javax.cache.CacheException; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; +import org.apache.ignite.internal.processors.query.h2.UpdateResult; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlResponse; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.F; + +/** + * Context for DML operation on reducer node. + */ +class DistributedUpdateRun { + /** Expected number of responses. */ + private final int nodeCount; + + /** Registers nodes that have responded. */ + private final HashSet<UUID> rspNodes; + + /** Accumulates total number of updated rows. */ + private long updCntr = 0L; + + /** Accumulates error keys. */ + private HashSet<Object> errorKeys; + + /** Query info. */ + private final GridRunningQueryInfo qry; + + /** Result future. */ + private final GridFutureAdapter<UpdateResult> fut = new GridFutureAdapter<>(); + + /** + * Constructor. + * + * @param nodeCount Number of nodes to await results from. + * @param qry Query info. + */ + DistributedUpdateRun(int nodeCount, GridRunningQueryInfo qry) { + this.nodeCount = nodeCount; + this.qry = qry; + + rspNodes = new HashSet<>(nodeCount); + } + + /** + * @return Query info. + */ + GridRunningQueryInfo queryInfo() { + return qry; + } + + /** + * @return Result future. + */ + GridFutureAdapter<UpdateResult> future() { + return fut; + } + + /** + * Handle disconnection. + * @param e Pre-formatted error. + */ + void handleDisconnect(CacheException e) { + fut.onDone(new IgniteCheckedException("Update failed because client node have disconnected.", e)); + } + + /** + * Handle leave of a node. + * + * @param nodeId Node id. + */ + void handleNodeLeft(UUID nodeId) { + fut.onDone(new IgniteCheckedException("Update failed because map node left topology [nodeId=" + nodeId + "]")); + } + + /** + * Handle response from remote node. + * + * @param id Node id. + * @param msg Response message. + */ + void handleResponse(UUID id, GridH2DmlResponse msg) { + synchronized (this) { + if (!rspNodes.add(id)) + return; // ignore duplicated messages + + String err = msg.error(); + + if (err != null) { + fut.onDone(new IgniteCheckedException("Update failed. " + (F.isEmpty(err) ? "" : err) + "[reqId=" + + msg.requestId() + ", node=" + id + "].")); + + return; + } + + if (!F.isEmpty(msg.errorKeys())) { + List<Object> errList = Arrays.asList(msg.errorKeys()); + + if (errorKeys == null) + errorKeys = new HashSet<>(errList); + else + errorKeys.addAll(errList); + } + + updCntr += msg.updateCounter(); + + if (rspNodes.size() == nodeCount) + fut.onDone(new UpdateResult(updCntr, errorKeys == null ? null : errorKeys.toArray())); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 0cc4172..77b928f 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -21,6 +21,7 @@ import java.sql.Connection; import java.sql.ResultSet; import java.util.AbstractCollection; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Iterator; @@ -30,12 +31,14 @@ import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.query.QueryCancelledException; +import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.CacheQueryExecutedEvent; import org.apache.ignite.events.DiscoveryEvent; @@ -54,8 +57,10 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservabl import org.apache.ignite.internal.processors.cache.query.CacheQueryType; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; +import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.h2.H2Utils; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; +import org.apache.ignite.internal.processors.query.h2.UpdateResult; import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode; import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext; import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException; @@ -63,6 +68,8 @@ import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQuery import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlRequest; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlResponse; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.typedef.CI1; @@ -71,6 +78,7 @@ import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.thread.IgniteThread; +import org.apache.ignite.spi.indexing.IndexingQueryFilter; import org.h2.jdbc.JdbcResultSet; import org.h2.value.Value; import org.jetbrains.annotations.Nullable; @@ -208,6 +216,8 @@ public class GridMapQueryExecutor { onNextPageRequest(node, (GridQueryNextPageRequest)msg); else if (msg instanceof GridQueryCancelRequest) onCancel(node, (GridQueryCancelRequest)msg); + else if (msg instanceof GridH2DmlRequest) + onDmlRequest(node, (GridH2DmlRequest)msg); else processed = false; @@ -735,6 +745,102 @@ public class GridMapQueryExecutor { /** * @param node Node. + * @param req DML request. + */ + private void onDmlRequest(final ClusterNode node, final GridH2DmlRequest req) throws IgniteCheckedException { + int[] parts = req.queryPartitions(); + + List<Integer> cacheIds = req.caches(); + + long reqId = req.requestId(); + + AffinityTopologyVersion topVer = req.topologyVersion(); + + List<GridReservable> reserved = new ArrayList<>(); + + if (!reservePartitions(cacheIds, topVer, parts, reserved)) { + U.error(log, "Failed to reserve partitions for DML request. [localNodeId=" + ctx.localNodeId() + + ", nodeId=" + node.id() + ", reqId=" + req.requestId() + ", cacheIds=" + cacheIds + + ", topVer=" + topVer + ", parts=" + Arrays.toString(parts) + ']'); + + sendUpdateResponse(node, reqId, null, "Failed to reserve partitions for DML request. " + + "Explanation (Retry your request when re-balancing is over)."); + + return; + } + + MapNodeResults nodeResults = resultsForNode(node.id()); + + try { + IndexingQueryFilter filter = h2.backupFilter(topVer, parts); + + GridQueryCancel cancel = nodeResults.putUpdate(reqId); + + SqlFieldsQuery fldsQry = new SqlFieldsQuery(req.query()); + + if (req.parameters() != null) + fldsQry.setArgs(req.parameters()); + + fldsQry.setEnforceJoinOrder(req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER)); + fldsQry.setTimeout(req.timeout(), TimeUnit.MILLISECONDS); + fldsQry.setPageSize(req.pageSize()); + fldsQry.setLocal(true); + + boolean local = true; + + final boolean replicated = req.isFlagSet(GridH2QueryRequest.FLAG_REPLICATED); + + if (!replicated && !F.isEmpty(cacheIds) && + findFirstPartitioned(cacheIds).config().getQueryParallelism() > 1) { + fldsQry.setDistributedJoins(true); + + local = false; + } + + UpdateResult updRes = h2.mapDistributedUpdate(req.schemaName(), fldsQry, filter, cancel, local); + + GridCacheContext<?, ?> mainCctx = + !F.isEmpty(cacheIds) ? ctx.cache().context().cacheContext(cacheIds.get(0)) : null; + + boolean evt = local && mainCctx != null && ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED); + + if (evt) { + ctx.event().record(new CacheQueryExecutedEvent<>( + node, + "SQL query executed.", + EVT_CACHE_QUERY_EXECUTED, + CacheQueryType.SQL.name(), + mainCctx.name(), + null, + req.query(), + null, + null, + req.parameters(), + node.id(), + null)); + } + + sendUpdateResponse(node, reqId, updRes, null); + } + catch (Exception e) { + U.error(log, "Error processing dml request. [localNodeId=" + ctx.localNodeId() + + ", nodeId=" + node.id() + ", req=" + req + ']', e); + + sendUpdateResponse(node, reqId, null, e.getMessage()); + } + finally { + if (!F.isEmpty(reserved)) { + // Release reserved partitions. + for (int i = 0; i < reserved.size(); i++) + reserved.get(i).release(); + } + + nodeResults.removeUpdate(reqId); + } + } + + /** + * @param node Node. * @param qryReqId Query request ID. * @param err Error. */ @@ -758,6 +864,36 @@ public class GridMapQueryExecutor { } /** + * Sends update response for DML request. + * + * @param node Node. + * @param reqId Request id. + * @param updResult Update result. + * @param error Error message. + */ + @SuppressWarnings("deprecation") + private void sendUpdateResponse(ClusterNode node, long reqId, UpdateResult updResult, String error) { + try { + GridH2DmlResponse rsp = new GridH2DmlResponse(reqId, updResult == null ? 0 : updResult.counter(), + updResult == null ? null : updResult.errorKeys(), error); + + if (log.isDebugEnabled()) + log.debug("Sending: [localNodeId=" + ctx.localNodeId() + ", node=" + node.id() + ", msg=" + rsp + "]"); + + if (node.isLocal()) + h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), rsp); + else { + rsp.marshall(ctx.config().getMarshaller()); + + ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, rsp, QUERY_POOL); + } + } + catch (Exception e) { + U.error(log, "Failed to send message.", e); + } + } + + /** * @param node Node. * @param req Request. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 8638794..f85cd94 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -59,6 +60,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator; @@ -67,6 +69,7 @@ import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; import org.apache.ignite.internal.processors.query.h2.H2FieldsIterator; import org.apache.ignite.internal.processors.query.h2.H2Utils; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; +import org.apache.ignite.internal.processors.query.h2.UpdateResult; import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlSortColumn; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlType; @@ -74,6 +77,8 @@ import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQuery import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlRequest; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlResponse; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest; import org.apache.ignite.internal.util.GridIntIterator; import org.apache.ignite.internal.util.GridIntList; @@ -130,6 +135,9 @@ public class GridReduceQueryExecutor { /** */ private final ConcurrentMap<Long, ReduceQueryRun> runs = new ConcurrentHashMap8<>(); + /** Contexts of running DML requests. */ + private final ConcurrentMap<Long, DistributedUpdateRun> updRuns = new ConcurrentHashMap<>(); + /** */ private volatile List<GridThreadLocalTable> fakeTbls = Collections.emptyList(); @@ -197,6 +205,10 @@ public class GridReduceQueryExecutor { } } } + + for (DistributedUpdateRun r : updRuns.values()) + r.handleNodeLeft(nodeId); + } }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT); } @@ -229,6 +241,8 @@ public class GridReduceQueryExecutor { onNextPage(node, (GridQueryNextPageResponse)msg); else if (msg instanceof GridQueryFailResponse) onFail(node, (GridQueryFailResponse)msg); + else if (msg instanceof GridH2DmlResponse) + onDmlResponse(node, (GridH2DmlResponse)msg); else processed = false; @@ -575,25 +589,11 @@ public class GridReduceQueryExecutor { if (qry.isLocal()) nodes = singletonList(ctx.discovery().localNode()); else { - if (isPreloadingActive(cacheIds)) { - if (isReplicatedOnly) - nodes = replicatedUnstableDataNodes(cacheIds); - else { - partsMap = partitionedUnstableDataNodes(cacheIds); - - if (partsMap != null) { - qryMap = narrowForQuery(partsMap, parts); - - nodes = qryMap == null ? null : qryMap.keySet(); - } - } - } - else { - qryMap = stableDataNodes(isReplicatedOnly, topVer, cacheIds, parts); + NodesForPartitionsResult nodesParts = nodesForPartitions(cacheIds, topVer, parts, isReplicatedOnly); - if (qryMap != null) - nodes = qryMap.keySet(); - } + nodes = nodesParts.nodes(); + partsMap = nodesParts.partitionsMap(); + qryMap = nodesParts.queryPartitionsMap(); if (nodes == null) continue; // Retry. @@ -845,6 +845,153 @@ public class GridReduceQueryExecutor { } /** + * + * @param schemaName Schema name. + * @param cacheIds Cache ids. + * @param selectQry Select query. + * @param params SQL parameters. + * @param enforceJoinOrder Enforce join order of tables. + * @param pageSize Page size. + * @param timeoutMillis Timeout. + * @param parts Partitions. + * @param isReplicatedOnly Whether query uses only replicated caches. + * @param cancel Cancel state. + * @return Update result, or {@code null} when some map node doesn't support distributed DML. + */ + public UpdateResult update( + String schemaName, + List<Integer> cacheIds, + String selectQry, + Object[] params, + boolean enforceJoinOrder, + int pageSize, + int timeoutMillis, + final int[] parts, + boolean isReplicatedOnly, + GridQueryCancel cancel + ) { + AffinityTopologyVersion topVer = h2.readyTopologyVersion(); + + NodesForPartitionsResult nodesParts = nodesForPartitions(cacheIds, topVer, parts, isReplicatedOnly); + + final long reqId = qryIdGen.incrementAndGet(); + + final GridRunningQueryInfo qryInfo = new GridRunningQueryInfo(reqId, selectQry, GridCacheQueryType.SQL_FIELDS, + schemaName, U.currentTimeMillis(), cancel, false); + + Collection<ClusterNode> nodes = nodesParts.nodes(); + + if (nodes == null) + throw new CacheException("Failed to determine nodes participating in the update. " + + "Explanation (Retry update once topology recovers)."); + + if (isReplicatedOnly) { + ClusterNode locNode = ctx.discovery().localNode(); + + if (nodes.contains(locNode)) + nodes = singletonList(locNode); + else + nodes = singletonList(F.rand(nodes)); + } + + for (ClusterNode n : nodes) { + if (!n.version().greaterThanEqual(2, 3, 0)) { + log.warning("Server-side DML optimization is skipped because map node does not support it. " + + "Falling back to normal DML. [node=" + n.id() + ", v=" + n.version() + "]."); + + return null; + } + } + + final DistributedUpdateRun r = new DistributedUpdateRun(nodes.size(), qryInfo); + + int flags = enforceJoinOrder ? GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER : 0; + + if (isReplicatedOnly) + flags |= GridH2QueryRequest.FLAG_REPLICATED; + + GridH2DmlRequest req = new GridH2DmlRequest() + .requestId(reqId) + .topologyVersion(topVer) + .caches(cacheIds) + .schemaName(schemaName) + .query(selectQry) + .pageSize(pageSize) + .parameters(params) + .timeout(timeoutMillis) + .flags(flags); + + updRuns.put(reqId, r); + + boolean release = false; + + try { + Map<ClusterNode, IntArray> partsMap = (nodesParts.queryPartitionsMap() != null) ? + nodesParts.queryPartitionsMap() : nodesParts.partitionsMap(); + + ExplicitPartitionsSpecializer partsSpec = (parts == null) ? null : + new ExplicitPartitionsSpecializer(partsMap); + + final Collection<ClusterNode> finalNodes = nodes; + + cancel.set(new Runnable() { + @Override public void run() { + r.future().onCancelled(); + + send(finalNodes, new GridQueryCancelRequest(reqId), null, false); + } + }); + + // send() logs the debug message + if (send(nodes, req, partsSpec, false)) + return r.future().get(); + + throw new CacheException("Failed to send update request to participating nodes."); + } + catch (IgniteCheckedException | RuntimeException e) { + release = true; + + U.error(log, "Error during update [localNodeId=" + ctx.localNodeId() + "]", e); + + throw new CacheException("Failed to run update. " + e.getMessage(), e); + } + finally { + if (release) + send(nodes, new GridQueryCancelRequest(reqId), null, false); + + if (!updRuns.remove(reqId, r)) + U.warn(log, "Update run was already removed: " + reqId); + } + } + + /** + * Process response for DML request. + * + * @param node Node. + * @param msg Message. + */ + private void onDmlResponse(final ClusterNode node, GridH2DmlResponse msg) { + try { + long reqId = msg.requestId(); + + DistributedUpdateRun r = updRuns.get(reqId); + + if (r == null) { + U.warn(log, "Unexpected dml response (will ignore). [localNodeId=" + ctx.localNodeId() + ", nodeId=" + + node.id() + ", msg=" + msg.toString() + ']'); + + return; + } + + r.handleResponse(node.id(), msg); + } + catch (Exception e) { + U.error(log, "Error in dml response processing. [localNodeId=" + ctx.localNodeId() + ", nodeId=" + + node.id() + ", msg=" + msg.toString() + ']', e); + } + } + + /** * @param cacheIds Cache IDs. * @return The first partitioned cache context. */ @@ -1309,6 +1456,44 @@ public class GridReduceQueryExecutor { } /** + * Evaluates nodes and nodes to partitions map given a list of cache ids, topology version and partitions. + * + * @param cacheIds Cache ids. + * @param topVer Topology version. + * @param parts Partitions array. + * @param isReplicatedOnly Allow only replicated caches. + * @return Result. + */ + private NodesForPartitionsResult nodesForPartitions(List<Integer> cacheIds, AffinityTopologyVersion topVer, + int[] parts, boolean isReplicatedOnly) { + Collection<ClusterNode> nodes = null; + Map<ClusterNode, IntArray> partsMap = null; + Map<ClusterNode, IntArray> qryMap = null; + + if (isPreloadingActive(cacheIds)) { + if (isReplicatedOnly) + nodes = replicatedUnstableDataNodes(cacheIds); + else { + partsMap = partitionedUnstableDataNodes(cacheIds); + + if (partsMap != null) { + qryMap = narrowForQuery(partsMap, parts); + + nodes = qryMap == null ? null : qryMap.keySet(); + } + } + } + else { + qryMap = stableDataNodes(isReplicatedOnly, topVer, cacheIds, parts); + + if (qryMap != null) + nodes = qryMap.keySet(); + } + + return new NodesForPartitionsResult(nodes, partsMap, qryMap); + } + + /** * @param conn Connection. * @param qry Query. * @param explain Explain. @@ -1403,6 +1588,9 @@ public class GridReduceQueryExecutor { for (Map.Entry<Long, ReduceQueryRun> e : runs.entrySet()) e.getValue().disconnected(err); + + for (DistributedUpdateRun r: updRuns.values()) + r.handleDisconnect(err); } /** @@ -1421,6 +1609,11 @@ public class GridReduceQueryExecutor { res.add(run.queryInfo()); } + for (DistributedUpdateRun upd: updRuns.values()) { + if (upd.queryInfo().longQuery(curTime, duration)) + res.add(upd.queryInfo()); + } + return res; } @@ -1435,6 +1628,12 @@ public class GridReduceQueryExecutor { if (run != null) run.queryInfo().cancel(); + else { + DistributedUpdateRun upd = updRuns.get(qryId); + + if (upd != null) + upd.queryInfo().cancel(); + } } } @@ -1478,11 +1677,64 @@ public class GridReduceQueryExecutor { /** {@inheritDoc} */ @Override public Message apply(ClusterNode node, Message msg) { - GridH2QueryRequest rq = new GridH2QueryRequest((GridH2QueryRequest)msg); + if (msg instanceof GridH2QueryRequest) { + GridH2QueryRequest rq = new GridH2QueryRequest((GridH2QueryRequest)msg); + + rq.queryPartitions(toArray(partsMap.get(node))); + + return rq; + } else if (msg instanceof GridH2DmlRequest) { + GridH2DmlRequest rq = new GridH2DmlRequest((GridH2DmlRequest)msg); + + rq.queryPartitions(toArray(partsMap.get(node))); + + return rq; + } + + return msg; + } + } + + /** + * Result of nodes to partitions mapping for a query or update. + */ + static class NodesForPartitionsResult { + /** */ + final Collection<ClusterNode> nodes; - rq.queryPartitions(toArray(partsMap.get(node))); + /** */ + final Map<ClusterNode, IntArray> partsMap; - return rq; + /** */ + final Map<ClusterNode, IntArray> qryMap; + + /** */ + NodesForPartitionsResult(Collection<ClusterNode> nodes, Map<ClusterNode, IntArray> partsMap, + Map<ClusterNode, IntArray> qryMap) { + this.nodes = nodes; + this.partsMap = partsMap; + this.qryMap = qryMap; + } + + /** + * @return Collection of nodes a message shall be sent to. + */ + Collection<ClusterNode> nodes() { + return nodes; + } + + /** + * @return Maps a node to partition array. + */ + Map<ClusterNode, IntArray> partitionsMap() { + return partsMap; + } + + /** + * @return Maps a node to partition array. + */ + Map<ClusterNode, IntArray> queryPartitionsMap() { + return qryMap; } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java index 2d20c8d..c0637ea 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.query.h2.twostep; +import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap; import org.jsr166.ConcurrentHashMap8; @@ -32,6 +33,9 @@ class MapNodeResults { /** */ private final ConcurrentMap<MapRequestKey, MapQueryResults> res = new ConcurrentHashMap8<>(); + /** Cancel state for update requests. */ + private final ConcurrentMap<Long, GridQueryCancel> updCancels = new ConcurrentHashMap8<>(); + /** */ private final GridBoundedConcurrentLinkedHashMap<Long, Boolean> qryHist = new GridBoundedConcurrentLinkedHashMap<>(1024, 1024, 0.75f, 64, PER_SEGMENT_Q); @@ -88,6 +92,12 @@ class MapNodeResults { removed.cancel(true); } } + + // Cancel update request + GridQueryCancel updCancel = updCancels.remove(reqId); + + if (updCancel != null) + updCancel.cancel(); } /** @@ -111,11 +121,34 @@ class MapNodeResults { } /** + * @param reqId Request id. + * @return Cancel state. + */ + public GridQueryCancel putUpdate(long reqId) { + GridQueryCancel cancel = new GridQueryCancel(); + + updCancels.put(reqId, cancel); + + return cancel; + } + + /** + * @param reqId Request id. + */ + public void removeUpdate(long reqId) { + updCancels.remove(reqId); + } + + /** * Cancel all node queries. */ public void cancelAll() { for (MapQueryResults ress : res.values()) ress.cancel(true); + + // Cancel update requests + for (GridQueryCancel upd: updCancels.values()) + upd.cancel(); } }
