Repository: ignite Updated Branches: refs/heads/master aca97329f -> 53c0fd1e4
IGNITE-7475 Improved VerifyBackupPartitionsTask to calculate partition hashes in parallel - Fixes #3407. Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/53c0fd1e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/53c0fd1e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/53c0fd1e Branch: refs/heads/master Commit: 53c0fd1e44cda1972e29c8d256edb808c0368dde Parents: aca9732 Author: Ivan Rakov <[email protected]> Authored: Wed Jan 31 12:51:09 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Wed Jan 31 12:51:09 2018 +0300 ---------------------------------------------------------------------- .../verify/VerifyBackupPartitionsTask.java | 157 ++++++++++++++----- 1 file changed, 118 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/53c0fd1e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTask.java index 23aa0e1..b884cb0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTask.java @@ -19,13 +19,22 @@ package org.apache.ignite.internal.processors.cache.verify; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.compute.ComputeJob; @@ -162,6 +171,9 @@ public class VerifyBackupPartitionsTask extends ComputeTaskAdapter<Set<String>, /** Cache names. */ private Set<String> cacheNames; + /** Counter of processed partitions. */ + private final AtomicInteger completionCntr = new AtomicInteger(0); + /** * @param names Names. */ @@ -208,7 +220,9 @@ public class VerifyBackupPartitionsTask extends ComputeTaskAdapter<Set<String>, } } - Map<PartitionKey, PartitionHashRecord> res = new HashMap<>(); + List<Future<Map<PartitionKey, PartitionHashRecord>>> partHashCalcFutures = new ArrayList<>(); + + completionCntr.set(0); for (Integer grpId : grpIds) { CacheGroupContext grpCtx = ignite.context().cache().cacheGroup(grpId); @@ -218,62 +232,127 @@ public class VerifyBackupPartitionsTask extends ComputeTaskAdapter<Set<String>, List<GridDhtLocalPartition> parts = grpCtx.topology().localPartitions(); - for (GridDhtLocalPartition part : parts) { - if (!part.reserve()) - continue; + for (GridDhtLocalPartition part : parts) + partHashCalcFutures.add(calculatePartitionHashAsync(grpCtx, part)); + } - int partHash = 0; - long partSize; - long updateCntrBefore; + Map<PartitionKey, PartitionHashRecord> res = new HashMap<>(); - try { - if (part.state() != GridDhtPartitionState.OWNING) - continue; + long lastProgressLogTs = U.currentTimeMillis(); - updateCntrBefore = part.updateCounter(); + for (int i = 0; i < partHashCalcFutures.size(); ) { + Future<Map<PartitionKey, PartitionHashRecord>> fut = partHashCalcFutures.get(i); - partSize = part.dataStore().fullSize(); + try { + Map<PartitionKey, PartitionHashRecord> partHash = fut.get(10, TimeUnit.SECONDS); - GridIterator<CacheDataRow> it = grpCtx.offheap().partitionIterator(part.id()); + res.putAll(partHash); - while (it.hasNextX()) { - CacheDataRow row = it.nextX(); + i++; + } + catch (InterruptedException | ExecutionException e) { + for (int j = i + 1; j < partHashCalcFutures.size(); j++) + partHashCalcFutures.get(j).cancel(false); + + if (e instanceof InterruptedException) + throw new IgniteInterruptedException((InterruptedException)e); + else if (e.getCause() instanceof IgniteException) + throw (IgniteException)e.getCause(); + else + throw new IgniteException(e.getCause()); + } + catch (TimeoutException e) { + if (U.currentTimeMillis() - lastProgressLogTs > 3 * 60 * 1000L) { + lastProgressLogTs = U.currentTimeMillis(); - partHash += row.key().hashCode(); + log.warning("idle_verify is still running, processed " + completionCntr.get() + " of " + + partHashCalcFutures.size() + " local partitions"); + } + } + } - partHash += Arrays.hashCode(row.value().valueBytes(grpCtx.cacheObjectContext())); - } + return res; + } - long updateCntrAfter = part.updateCounter(); + /** + * @param grpCtx Group context. + * @param part Local partition. + */ + private Future<Map<PartitionKey, PartitionHashRecord>> calculatePartitionHashAsync( + final CacheGroupContext grpCtx, + final GridDhtLocalPartition part + ) { + return ForkJoinPool.commonPool().submit(new Callable<Map<PartitionKey, PartitionHashRecord>>() { + @Override public Map<PartitionKey, PartitionHashRecord> call() throws Exception { + return calculatePartitionHash(grpCtx, part); + } + }); + } - if (updateCntrBefore != updateCntrAfter) { - throw new IgniteException("Cluster is not idle: update counter of partition [grpId=" + - grpId + ", partId=" + part.id() + "] changed during hash calculation [before=" + - updateCntrBefore + ", after=" + updateCntrAfter + "]"); - } - } - catch (IgniteCheckedException e) { - U.error(log, "Can't calculate partition hash [grpId=" + grpId + - ", partId=" + part.id() + "]", e); - continue; - } - finally { - part.release(); - } + /** + * @param grpCtx Group context. + * @param part Local partition. + */ + private Map<PartitionKey, PartitionHashRecord> calculatePartitionHash( + CacheGroupContext grpCtx, + GridDhtLocalPartition part + ) { + if (!part.reserve()) + return Collections.emptyMap(); + + int partHash = 0; + long partSize; + long updateCntrBefore; + + try { + if (part.state() != GridDhtPartitionState.OWNING) + return Collections.emptyMap(); - Object consId = ignite.context().discovery().localNode().consistentId(); + updateCntrBefore = part.updateCounter(); - boolean isPrimary = part.primary(grpCtx.topology().readyTopologyVersion()); + partSize = part.dataStore().fullSize(); + + GridIterator<CacheDataRow> it = grpCtx.offheap().partitionIterator(part.id()); + + while (it.hasNextX()) { + CacheDataRow row = it.nextX(); + + partHash += row.key().hashCode(); + + partHash += Arrays.hashCode(row.value().valueBytes(grpCtx.cacheObjectContext())); + } - PartitionKey partKey = new PartitionKey(grpId, part.id(), grpCtx.cacheOrGroupName()); + long updateCntrAfter = part.updateCounter(); - res.put(partKey, new PartitionHashRecord( - partKey, isPrimary, consId, partHash, updateCntrBefore, partSize)); + if (updateCntrBefore != updateCntrAfter) { + throw new IgniteException("Cluster is not idle: update counter of partition [grpId=" + + grpCtx.groupId() + ", partId=" + part.id() + "] changed during hash calculation [before=" + + updateCntrBefore + ", after=" + updateCntrAfter + "]"); } } + catch (IgniteCheckedException e) { + U.error(log, "Can't calculate partition hash [grpId=" + grpCtx.groupId() + + ", partId=" + part.id() + "]", e); - return res; + return Collections.emptyMap(); + } + finally { + part.release(); + } + + Object consId = ignite.context().discovery().localNode().consistentId(); + + boolean isPrimary = part.primary(grpCtx.topology().readyTopologyVersion()); + + PartitionKey partKey = new PartitionKey(grpCtx.groupId(), part.id(), grpCtx.cacheOrGroupName()); + + PartitionHashRecord partRec = new PartitionHashRecord( + partKey, isPrimary, consId, partHash, updateCntrBefore, partSize); + + completionCntr.incrementAndGet(); + + return Collections.singletonMap(partKey, partRec); } }
