Repository: ignite
Updated Branches:
  refs/heads/ignite-7485-2 138ff50fd -> 234488ede


IGNITE-7475 Improved VerifyBackupPartitionsTask to calculate partition hashes 
in parallel - Fixes #3407.

Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/53c0fd1e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/53c0fd1e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/53c0fd1e

Branch: refs/heads/ignite-7485-2
Commit: 53c0fd1e44cda1972e29c8d256edb808c0368dde
Parents: aca9732
Author: Ivan Rakov <ivan.glu...@gmail.com>
Authored: Wed Jan 31 12:51:09 2018 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Wed Jan 31 12:51:09 2018 +0300

----------------------------------------------------------------------
 .../verify/VerifyBackupPartitionsTask.java      | 157 ++++++++++++++-----
 1 file changed, 118 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/53c0fd1e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTask.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTask.java
index 23aa0e1..b884cb0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTask.java
@@ -19,13 +19,22 @@ package org.apache.ignite.internal.processors.cache.verify;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.compute.ComputeJob;
@@ -162,6 +171,9 @@ public class VerifyBackupPartitionsTask extends 
ComputeTaskAdapter<Set<String>,
         /** Cache names. */
         private Set<String> cacheNames;
 
+        /** Counter of processed partitions. */
+        private final AtomicInteger completionCntr = new AtomicInteger(0);
+
         /**
          * @param names Names.
          */
@@ -208,7 +220,9 @@ public class VerifyBackupPartitionsTask extends 
ComputeTaskAdapter<Set<String>,
                 }
             }
 
-            Map<PartitionKey, PartitionHashRecord> res = new HashMap<>();
+            List<Future<Map<PartitionKey, PartitionHashRecord>>> 
partHashCalcFutures = new ArrayList<>();
+
+            completionCntr.set(0);
 
             for (Integer grpId : grpIds) {
                 CacheGroupContext grpCtx = 
ignite.context().cache().cacheGroup(grpId);
@@ -218,62 +232,127 @@ public class VerifyBackupPartitionsTask extends 
ComputeTaskAdapter<Set<String>,
 
                 List<GridDhtLocalPartition> parts = 
grpCtx.topology().localPartitions();
 
-                for (GridDhtLocalPartition part : parts) {
-                    if (!part.reserve())
-                        continue;
+                for (GridDhtLocalPartition part : parts)
+                    
partHashCalcFutures.add(calculatePartitionHashAsync(grpCtx, part));
+            }
 
-                    int partHash = 0;
-                    long partSize;
-                    long updateCntrBefore;
+            Map<PartitionKey, PartitionHashRecord> res = new HashMap<>();
 
-                    try {
-                        if (part.state() != GridDhtPartitionState.OWNING)
-                            continue;
+            long lastProgressLogTs = U.currentTimeMillis();
 
-                        updateCntrBefore = part.updateCounter();
+            for (int i = 0; i < partHashCalcFutures.size(); ) {
+                Future<Map<PartitionKey, PartitionHashRecord>> fut = 
partHashCalcFutures.get(i);
 
-                        partSize = part.dataStore().fullSize();
+                try {
+                    Map<PartitionKey, PartitionHashRecord> partHash = 
fut.get(10, TimeUnit.SECONDS);
 
-                        GridIterator<CacheDataRow> it = 
grpCtx.offheap().partitionIterator(part.id());
+                    res.putAll(partHash);
 
-                        while (it.hasNextX()) {
-                            CacheDataRow row = it.nextX();
+                    i++;
+                }
+                catch (InterruptedException | ExecutionException e) {
+                    for (int j = i + 1; j < partHashCalcFutures.size(); j++)
+                        partHashCalcFutures.get(j).cancel(false);
+
+                    if (e instanceof InterruptedException)
+                        throw new 
IgniteInterruptedException((InterruptedException)e);
+                    else if (e.getCause() instanceof IgniteException)
+                        throw (IgniteException)e.getCause();
+                    else
+                        throw new IgniteException(e.getCause());
+                }
+                catch (TimeoutException e) {
+                    if (U.currentTimeMillis() - lastProgressLogTs > 3 * 60 * 
1000L) {
+                        lastProgressLogTs = U.currentTimeMillis();
 
-                            partHash += row.key().hashCode();
+                        log.warning("idle_verify is still running, processed " 
+ completionCntr.get() + " of " +
+                            partHashCalcFutures.size() + " local partitions");
+                    }
+                }
+            }
 
-                            partHash += 
Arrays.hashCode(row.value().valueBytes(grpCtx.cacheObjectContext()));
-                        }
+            return res;
+        }
 
-                        long updateCntrAfter = part.updateCounter();
+        /**
+         * @param grpCtx Group context.
+         * @param part Local partition.
+         */
+        private Future<Map<PartitionKey, PartitionHashRecord>> 
calculatePartitionHashAsync(
+            final CacheGroupContext grpCtx,
+            final GridDhtLocalPartition part
+        ) {
+            return ForkJoinPool.commonPool().submit(new 
Callable<Map<PartitionKey, PartitionHashRecord>>() {
+                @Override public Map<PartitionKey, PartitionHashRecord> call() 
throws Exception {
+                    return calculatePartitionHash(grpCtx, part);
+                }
+            });
+        }
 
-                        if (updateCntrBefore != updateCntrAfter) {
-                            throw new IgniteException("Cluster is not idle: 
update counter of partition [grpId=" +
-                                grpId + ", partId=" + part.id() + "] changed 
during hash calculation [before=" +
-                                updateCntrBefore + ", after=" + 
updateCntrAfter + "]");
-                        }
-                    }
-                    catch (IgniteCheckedException e) {
-                        U.error(log, "Can't calculate partition hash [grpId=" 
+ grpId +
-                            ", partId=" + part.id() + "]", e);
 
-                        continue;
-                    }
-                    finally {
-                        part.release();
-                    }
+        /**
+         * @param grpCtx Group context.
+         * @param part Local partition.
+         */
+        private Map<PartitionKey, PartitionHashRecord> calculatePartitionHash(
+            CacheGroupContext grpCtx,
+            GridDhtLocalPartition part
+        ) {
+            if (!part.reserve())
+                return Collections.emptyMap();
+
+            int partHash = 0;
+            long partSize;
+            long updateCntrBefore;
+
+            try {
+                if (part.state() != GridDhtPartitionState.OWNING)
+                    return Collections.emptyMap();
 
-                    Object consId = 
ignite.context().discovery().localNode().consistentId();
+                updateCntrBefore = part.updateCounter();
 
-                    boolean isPrimary = 
part.primary(grpCtx.topology().readyTopologyVersion());
+                partSize = part.dataStore().fullSize();
+
+                GridIterator<CacheDataRow> it = 
grpCtx.offheap().partitionIterator(part.id());
+
+                while (it.hasNextX()) {
+                    CacheDataRow row = it.nextX();
+
+                    partHash += row.key().hashCode();
+
+                    partHash += 
Arrays.hashCode(row.value().valueBytes(grpCtx.cacheObjectContext()));
+                }
 
-                    PartitionKey partKey = new PartitionKey(grpId, part.id(), 
grpCtx.cacheOrGroupName());
+                long updateCntrAfter = part.updateCounter();
 
-                    res.put(partKey, new PartitionHashRecord(
-                        partKey, isPrimary, consId, partHash, 
updateCntrBefore, partSize));
+                if (updateCntrBefore != updateCntrAfter) {
+                    throw new IgniteException("Cluster is not idle: update 
counter of partition [grpId=" +
+                        grpCtx.groupId() + ", partId=" + part.id() + "] 
changed during hash calculation [before=" +
+                        updateCntrBefore + ", after=" + updateCntrAfter + "]");
                 }
             }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Can't calculate partition hash [grpId=" + 
grpCtx.groupId() +
+                    ", partId=" + part.id() + "]", e);
 
-            return res;
+                return Collections.emptyMap();
+            }
+            finally {
+                part.release();
+            }
+
+            Object consId = 
ignite.context().discovery().localNode().consistentId();
+
+            boolean isPrimary = 
part.primary(grpCtx.topology().readyTopologyVersion());
+
+            PartitionKey partKey = new PartitionKey(grpCtx.groupId(), 
part.id(), grpCtx.cacheOrGroupName());
+
+            PartitionHashRecord partRec = new PartitionHashRecord(
+                partKey, isPrimary, consId, partHash, updateCntrBefore, 
partSize);
+
+            completionCntr.incrementAndGet();
+
+            return Collections.singletonMap(partKey, partRec);
         }
     }
 

Reply via email to