IGNITE-6080: DML batches are now grouped by affinity. This closes #2454.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/37e58bad Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/37e58bad Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/37e58bad Branch: refs/heads/ignite-5578 Commit: 37e58badefcf36536644344023dc62828f41d0f9 Parents: 5283e19 Author: devozerov <[email protected]> Authored: Thu Aug 17 11:24:06 2017 +0300 Committer: devozerov <[email protected]> Committed: Thu Aug 17 11:24:06 2017 +0300 ---------------------------------------------------------------------- .../query/h2/DmlStatementsProcessor.java | 274 +++++++++++-------- 1 file changed, 163 insertions(+), 111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/37e58bad/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java index 4f7c288..0ff9cfe 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java @@ -32,6 +32,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import javax.cache.processor.EntryProcessor; @@ -46,7 +47,9 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.binary.BinaryObjectBuilder; import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheOperationContext; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -83,7 +86,6 @@ import org.h2.command.dml.Update; import org.h2.table.Column; import org.h2.util.DateTimeUtils; import org.h2.util.LocalDateTimeUtils; -import org.h2.value.DataType; import org.h2.value.Value; import org.h2.value.ValueDate; import org.h2.value.ValueTime; @@ -365,7 +367,7 @@ public class DmlStatementsProcessor { QueryCursorImpl<List<?>> cur; // Do a two-step query only if locality flag is not set AND if plan's SELECT corresponds to an actual - // subquery and not some dummy stuff like "select 1, 2, 3;" + // sub-query and not some dummy stuff like "select 1, 2, 3;" if (!loc && !plan.isLocSubqry) { SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQry, fieldsQry.isCollocated()) .setArgs(fieldsQry.getArgs()) @@ -493,52 +495,28 @@ public class DmlStatementsProcessor { @SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"}) private UpdateResult doDelete(GridCacheContext cctx, Iterable<List<?>> cursor, int pageSize) throws IgniteCheckedException { - // With DELETE, we have only two columns - key and value. - long res = 0; + BatchSender sender = new BatchSender(cctx, pageSize); - // Keys that failed to DELETE due to concurrent updates. - List<Object> failedKeys = new ArrayList<>(); + for (List<?> row : cursor) { + if (row.size() != 2) { + U.warn(log, "Invalid row size on DELETE - expected 2, got " + row.size()); - SQLException resEx = null; - - - Iterator<List<?>> it = cursor.iterator(); - Map<Object, EntryProcessor<Object, Object, Boolean>> rows = new LinkedHashMap<>(); - - while (it.hasNext()) { - List<?> e = it.next(); - if (e.size() != 2) { - U.warn(log, "Invalid row size on DELETE - expected 2, got " + e.size()); continue; } - rows.put(e.get(0), new ModifyingEntryProcessor(e.get(1), RMV)); - - if ((pageSize > 0 && rows.size() == pageSize) || (!it.hasNext())) { - PageProcessingResult pageRes = processPage(cctx, rows); - - res += pageRes.cnt; - - failedKeys.addAll(F.asList(pageRes.errKeys)); + sender.add(row.get(0), new ModifyingEntryProcessor(row.get(1), RMV)); + } - if (pageRes.ex != null) { - if (resEx == null) - resEx = pageRes.ex; - else - resEx.setNextException(pageRes.ex); - } + sender.flush(); - if (it.hasNext()) - rows.clear(); // No need to clear after the last batch. - } - } + SQLException resEx = sender.error(); if (resEx != null) { - if (!F.isEmpty(failedKeys)) { + if (!F.isEmpty(sender.failedKeys())) { // Don't go for a re-run if processing of some keys yielded exceptions and report keys that // had been modified concurrently right away. String msg = "Failed to DELETE some keys because they had been modified concurrently " + - "[keys=" + failedKeys + ']'; + "[keys=" + sender.failedKeys() + ']'; SQLException conEx = createJdbcSqlException(msg, IgniteQueryErrorCode.CONCURRENT_UPDATE); @@ -550,7 +528,7 @@ public class DmlStatementsProcessor { throw new IgniteSQLException(resEx); } - return new UpdateResult(res, failedKeys.toArray()); + return new UpdateResult(sender.updateCount(), sender.failedKeys().toArray()); } /** @@ -579,20 +557,10 @@ public class DmlStatementsProcessor { // or if its list of updated columns includes only _val, i.e. is single element. boolean hasProps = !hasNewVal || updatedColNames.length > 1; - long res = 0; - - Map<Object, EntryProcessor<Object, Object, Boolean>> rows = new LinkedHashMap<>(); - - // Keys that failed to UPDATE due to concurrent updates. - List<Object> failedKeys = new ArrayList<>(); - - SQLException resEx = null; - - Iterator<List<?>> it = cursor.iterator(); + BatchSender sender = new BatchSender(cctx, pageSize); - while (it.hasNext()) { - List<?> e = it.next(); - Object key = e.get(0); + for (List<?> row : cursor) { + Object key = row.get(0); Object newVal; @@ -606,10 +574,10 @@ public class DmlStatementsProcessor { assert prop != null; - newColVals.put(plan.colNames[i], convert(e.get(i + 2), desc, prop.type(), plan.colTypes[i])); + newColVals.put(plan.colNames[i], convert(row.get(i + 2), desc, prop.type(), plan.colTypes[i])); } - newVal = plan.valSupplier.apply(e); + newVal = plan.valSupplier.apply(row); if (newVal == null) throw new IgniteSQLException("New value for UPDATE must not be null", IgniteQueryErrorCode.NULL_VALUE); @@ -643,38 +611,24 @@ public class DmlStatementsProcessor { newVal = ((BinaryObjectBuilder) newVal).build(); } - Object srcVal = e.get(1); + Object srcVal = row.get(1); if (bin && !(srcVal instanceof BinaryObject)) srcVal = cctx.grid().binary().toBinary(srcVal); - rows.put(key, new ModifyingEntryProcessor(srcVal, new EntryValueUpdater(newVal))); - - if ((pageSize > 0 && rows.size() == pageSize) || (!it.hasNext())) { - PageProcessingResult pageRes = processPage(cctx, rows); - - res += pageRes.cnt; - - failedKeys.addAll(F.asList(pageRes.errKeys)); + sender.add(key, new ModifyingEntryProcessor(srcVal, new EntryValueUpdater(newVal))); + } - if (pageRes.ex != null) { - if (resEx == null) - resEx = pageRes.ex; - else - resEx.setNextException(pageRes.ex); - } + sender.flush(); - if (it.hasNext()) - rows.clear(); // No need to clear after the last batch. - } - } + SQLException resEx = sender.error(); if (resEx != null) { - if (!F.isEmpty(failedKeys)) { + if (!F.isEmpty(sender.failedKeys())) { // Don't go for a re-run if processing of some keys yielded exceptions and report keys that // had been modified concurrently right away. String msg = "Failed to UPDATE some keys because they had been modified concurrently " + - "[keys=" + failedKeys + ']'; + "[keys=" + sender.failedKeys() + ']'; SQLException dupEx = createJdbcSqlException(msg, IgniteQueryErrorCode.CONCURRENT_UPDATE); @@ -686,7 +640,7 @@ public class DmlStatementsProcessor { throw new IgniteSQLException(resEx); } - return new UpdateResult(res, failedKeys.toArray()); + return new UpdateResult(sender.updateCount(), sender.failedKeys().toArray()); } /** @@ -864,47 +818,22 @@ public class DmlStatementsProcessor { IgniteQueryErrorCode.DUPLICATE_KEY); } else { - Map<Object, EntryProcessor<Object, Object, Boolean>> rows = plan.isLocSubqry ? - new LinkedHashMap<Object, EntryProcessor<Object, Object, Boolean>>(plan.rowsNum) : - new LinkedHashMap<Object, EntryProcessor<Object, Object, Boolean>>(); - // Keys that failed to INSERT due to duplication. - List<Object> duplicateKeys = new ArrayList<>(); - - int resCnt = 0; - - SQLException resEx = null; - - Iterator<List<?>> it = cursor.iterator(); - - while (it.hasNext()) { - List<?> row = it.next(); - - final IgniteBiTuple t = rowToKeyValue(cctx, row, plan); - - rows.put(t.getKey(), new InsertEntryProcessor(t.getValue())); - - if (!it.hasNext() || (pageSize > 0 && rows.size() == pageSize)) { - PageProcessingResult pageRes = processPage(cctx, rows); + BatchSender sender = new BatchSender(cctx, pageSize); - resCnt += pageRes.cnt; + for (List<?> row : cursor) { + final IgniteBiTuple keyValPair = rowToKeyValue(cctx, row, plan); - duplicateKeys.addAll(F.asList(pageRes.errKeys)); + sender.add(keyValPair.getKey(), new InsertEntryProcessor(keyValPair.getValue())); + } - if (pageRes.ex != null) { - if (resEx == null) - resEx = pageRes.ex; - else - resEx.setNextException(pageRes.ex); - } + sender.flush(); - rows.clear(); - } - } + SQLException resEx = sender.error(); - if (!F.isEmpty(duplicateKeys)) { + if (!F.isEmpty(sender.failedKeys())) { String msg = "Failed to INSERT some keys because they are already in cache " + - "[keys=" + duplicateKeys + ']'; + "[keys=" + sender.failedKeys() + ']'; SQLException dupEx = new SQLException(msg, null, IgniteQueryErrorCode.DUPLICATE_KEY); @@ -917,7 +846,7 @@ public class DmlStatementsProcessor { if (resEx != null) throw new IgniteSQLException(resEx); - return resCnt; + return sender.updateCount(); } } @@ -1133,7 +1062,7 @@ public class DmlStatementsProcessor { /** Number of processed items. */ final long cnt; - /** Keys that failed to be UPDATEd or DELETEd due to concurrent modification of values. */ + /** Keys that failed to be updated or deleted due to concurrent modification of values. */ @NotNull final Object[] errKeys; @@ -1150,7 +1079,7 @@ public class DmlStatementsProcessor { /** Number of successfully processed items. */ final long cnt; - /** Keys that failed to be UPDATEd or DELETEd due to concurrent modification of values. */ + /** Keys that failed to be updated or deleted due to concurrent modification of values. */ @NotNull final Object[] errKeys; @@ -1193,4 +1122,127 @@ public class DmlStatementsProcessor { this.ex = ex; } } + + /** + * Batch sender class. + */ + private static class BatchSender { + /** Cache context. */ + private final GridCacheContext cctx; + + /** Batch size. */ + private final int size; + + /** Batches. */ + private final Map<UUID, Map<Object, EntryProcessor<Object, Object, Boolean>>> batches = new HashMap<>(); + + /** Result count. */ + private long updateCnt; + + /** Failed keys. */ + private List<Object> failedKeys; + + /** Exception. */ + private SQLException err; + + /** + * Constructor. + * + * @param cctx Cache context. + * @param size Batch. + */ + public BatchSender(GridCacheContext cctx, int size) { + this.cctx = cctx; + this.size = size; + } + + /** + * Add entry to batch. + * + * @param key Key. + * @param proc Processor. + */ + public void add(Object key, EntryProcessor<Object, Object, Boolean> proc) throws IgniteCheckedException { + ClusterNode node = cctx.affinity().primaryByKey(key, AffinityTopologyVersion.NONE); + + if (node == null) + throw new IgniteCheckedException("Failed to map key to node."); + + UUID nodeId = node.id(); + + Map<Object, EntryProcessor<Object, Object, Boolean>> batch = batches.get(nodeId); + + if (batch == null) { + batch = new HashMap<>(); + + batches.put(nodeId, batch); + } + + batch.put(key, proc); + + if (batch.size() >= size) { + sendBatch(batch); + + batch.clear(); + } + } + + /** + * Flush any remaining entries. + * + * @throws IgniteCheckedException If failed. + */ + public void flush() throws IgniteCheckedException { + for (Map<Object, EntryProcessor<Object, Object, Boolean>> batch : batches.values()) { + if (!batch.isEmpty()) + sendBatch(batch); + } + } + + /** + * @return Update count. + */ + public long updateCount() { + return updateCnt; + } + + /** + * @return Failed keys. + */ + public List<Object> failedKeys() { + return failedKeys != null ? failedKeys : Collections.emptyList(); + } + + /** + * @return Error. + */ + public SQLException error() { + return err; + } + + /** + * Send the batch. + * + * @param batch Batch. + * @throws IgniteCheckedException If failed. + */ + private void sendBatch(Map<Object, EntryProcessor<Object, Object, Boolean>> batch) + throws IgniteCheckedException { + PageProcessingResult pageRes = processPage(cctx, batch); + + updateCnt += pageRes.cnt; + + if (failedKeys == null) + failedKeys = new ArrayList<>(); + + failedKeys.addAll(F.asList(pageRes.errKeys)); + + if (pageRes.ex != null) { + if (err == null) + err = pageRes.ex; + else + err.setNextException(pageRes.ex); + } + } + } }
