This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new fe4829a  Revert "IGNITE-13558 Better parallelization down to partition 
level when restoring partition states - Fixes #8333."
fe4829a is described below

commit fe4829ac1f565f78097e49b50027ea945d11b2c1
Author: Sergey Chugunov <[email protected]>
AuthorDate: Wed Oct 21 10:46:52 2020 +0300

    Revert "IGNITE-13558 Better parallelization down to partition level when 
restoring partition states - Fixes #8333."
    
    This reverts commit 9bdd3a8db43c78e58d687c35cd79cd4c33d6bc58.
    
    Reverted due to possible deadlocks found after merge.
---
 .../org/apache/ignite/internal/IgniteFeatures.java |   3 -
 .../processors/cache/GridCacheProcessor.java       |  53 ++------
 .../GridCacheDatabaseSharedManager.java            |   2 +-
 .../cache/persistence/GridCacheOffheapManager.java | 149 +++++++++------------
 4 files changed, 78 insertions(+), 129 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
index 25361d2..11ef198 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
@@ -173,9 +173,6 @@ public enum IgniteFeatures {
      * @return {@code True} if feature is declared to be supported by remote 
node.
      */
     public static boolean nodeSupports(byte[] featuresAttrBytes, 
IgniteFeatures feature) {
-        if (featuresAttrBytes == null)
-            return false;
-
         int featureId = feature.getFeatureId();
 
         // Same as "BitSet.valueOf(features).get(featureId)"
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 1260b60..d3beafc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -33,8 +33,6 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.ForkJoinWorkerThread;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -152,6 +150,7 @@ import 
org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
 import org.apache.ignite.internal.util.F0;
 import org.apache.ignite.internal.util.IgniteCollectors;
 import org.apache.ignite.internal.util.InitializationProtector;
+import org.apache.ignite.internal.util.StripedExecutor;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -193,6 +192,7 @@ import static java.lang.String.format;
 import static java.util.Arrays.asList;
 import static java.util.Objects.isNull;
 import static java.util.Objects.nonNull;
+import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_ALLOW_START_CACHES_IN_PARALLEL;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
 import static org.apache.ignite.IgniteSystemProperties.getBoolean;
@@ -306,13 +306,10 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     private final CacheRecoveryLifecycle recovery = new 
CacheRecoveryLifecycle();
 
     /** Cache configuration splitter. */
-    private final CacheConfigurationSplitter splitter;
+    private CacheConfigurationSplitter splitter;
 
     /** Cache configuration enricher. */
-    private final CacheConfigurationEnricher enricher;
-
-    /** Pool to use while restoring partition states. */
-    private final ForkJoinPool restorePartitionsPool;
+    private CacheConfigurationEnricher enricher;
 
     /**
      * @param ctx Kernal context.
@@ -327,19 +324,6 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         marsh = MarshallerUtils.jdkMarshaller(ctx.igniteInstanceName());
         splitter = new CacheConfigurationSplitterImpl(ctx, marsh);
         enricher = new CacheConfigurationEnricher(ctx, marsh, 
U.resolveClassLoader(ctx.config()));
-
-        ForkJoinPool.ForkJoinWorkerThreadFactory factory = new 
ForkJoinPool.ForkJoinWorkerThreadFactory() {
-            @Override public ForkJoinWorkerThread newThread(ForkJoinPool pool) 
{
-                ForkJoinWorkerThread worker = 
ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
-
-                worker.setName("restore-partition-states-" + 
worker.getPoolIndex());
-
-                return worker;
-            }
-        };
-
-        int stripesCnt = ctx.getStripedExecutorService().stripesCount();
-        restorePartitionsPool = new ForkJoinPool(stripesCnt, factory, null, 
false);
     }
 
     /**
@@ -760,8 +744,6 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
         sharedCtx.cleanup();
 
-        restorePartitionsPool.shutdownNow();
-
         if (log.isDebugEnabled())
             log.debug("Stopped cache processor.");
     }
@@ -5301,20 +5283,6 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     }
 
     /**
-     * Returns {@code ForkJoinPool} instance to be used in partition states 
restoration.<br/>
-     * It's more convenient than regular pools because it can be used to 
parallel by cache groups and by partitions
-     * without sacrificing code simplicity (cache group tasks won't 
exclusively occupy their threads and won't block
-     * partition tasks as a result).<br/>
-     * <br/>
-     * There's a chance that this pool will later be replaced with a more 
common one, like system pool, for example.
-     *
-     * @return Pool instance.
-     */
-    public ForkJoinPool restorePartitionsPool() {
-        return restorePartitionsPool;
-    }
-
-    /**
      * Pages list view supplier.
      *
      * @param filter Filter.
@@ -5496,10 +5464,12 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
             AtomicReference<IgniteCheckedException> restoreStateError = new 
AtomicReference<>();
 
-            CountDownLatch completionLatch = new 
CountDownLatch(forGroups.size());
+            StripedExecutor stripedExec = ctx.getStripedExecutorService();
+
+            int roundRobin = 0;
 
             for (CacheGroupContext grp : forGroups) {
-                restorePartitionsPool.submit(() -> {
+                stripedExec.execute(roundRobin % stripedExec.stripesCount(), 
() -> {
                     try {
                         long processed = 
grp.offheap().restorePartitionStates(partitionStates);
 
@@ -5516,15 +5486,14 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                                 : new IgniteCheckedException(e)
                         );
                     }
-                    finally {
-                        completionLatch.countDown();
-                    }
                 });
+
+                roundRobin++;
             }
 
             try {
                 // Await completion restore state tasks in all stripes.
-                completionLatch.await();
+                stripedExec.awaitComplete();
             }
             catch (InterruptedException e) {
                 throw new IgniteInterruptedException(e);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index d363f35..50224f3 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -1142,7 +1142,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
         if (fut.localJoinExchange() || fut.activateCluster()
             || (fut.exchangeActions() != null && 
!F.isEmpty(fut.exchangeActions().cacheGroupsToStart()))) {
             U.doInParallel(
-                cctx.cache().restorePartitionsPool(),
+                cctx.kernalContext().getSystemExecutorService(),
                 cctx.cache().cacheGroups(),
                 cacheGroup -> {
                     if (cacheGroup.isLocal())
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index faed634..1fba65d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -31,10 +31,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.IntConsumer;
 import java.util.function.ToLongFunction;
-import java.util.stream.IntStream;
 import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -579,128 +576,114 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
         if (partitionStatesRestored)
             return 0;
 
-        AtomicLong processed = new AtomicLong();
-        AtomicReference<IgniteCheckedException> err = new AtomicReference<>();
+        long processed = 0;
 
         PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
 
-        IntConsumer partConsumer = p -> {
+        for (int p = 0; p < grp.affinity().partitions(); p++) {
             Integer recoverState = partitionRecoveryStates.get(new 
GroupPartitionId(grp.groupId(), p));
 
             long startTime = U.currentTimeMillis();
 
-            try {
-                if (ctx.pageStore().exists(grp.groupId(), p)) {
-                    ctx.pageStore().ensure(grp.groupId(), p);
-
-                    if (ctx.pageStore().pages(grp.groupId(), p) <= 1) {
-                        if (log.isDebugEnabled())
-                            log.debug("Skipping partition on recovery (pages 
less than 1) " +
-                                "[grp=" + grp.cacheOrGroupName() + ", p=" + p 
+ ']');
-
-                        return;
-                    }
+            if (ctx.pageStore().exists(grp.groupId(), p)) {
+                ctx.pageStore().ensure(grp.groupId(), p);
 
+                if (ctx.pageStore().pages(grp.groupId(), p) <= 1) {
                     if (log.isDebugEnabled())
-                        log.debug("Creating partition on recovery (exists in 
page store) " +
+                        log.debug("Skipping partition on recovery (pages less 
than 1) " +
                             "[grp=" + grp.cacheOrGroupName() + ", p=" + p + 
']');
 
-                    processed.incrementAndGet();
+                    continue;
+                }
 
-                    GridDhtLocalPartition part = 
grp.topology().forceCreatePartition(p);
+                if (log.isDebugEnabled())
+                    log.debug("Creating partition on recovery (exists in page 
store) " +
+                        "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
 
-                    // Triggers initialization of existing(having datafile) 
partition before acquiring cp read lock.
-                    part.dataStore().init();
+                processed++;
 
-                    ctx.database().checkpointReadLock();
+                GridDhtLocalPartition part = 
grp.topology().forceCreatePartition(p);
 
-                    try {
-                        long partMetaId = 
pageMem.partitionMetaPageId(grp.groupId(), p);
-                        long partMetaPage = pageMem.acquirePage(grp.groupId(), 
partMetaId);
+                // Triggers initialization of existing(having datafile) 
partition before acquiring cp read lock.
+                part.dataStore().init();
 
-                        try {
-                            long pageAddr = pageMem.writeLock(grp.groupId(), 
partMetaId, partMetaPage);
+                ctx.database().checkpointReadLock();
 
-                            boolean changed = false;
+                try {
+                    long partMetaId = 
pageMem.partitionMetaPageId(grp.groupId(), p);
+                    long partMetaPage = pageMem.acquirePage(grp.groupId(), 
partMetaId);
 
-                            try {
-                                PagePartitionMetaIO io = 
PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
+                    try {
+                        long pageAddr = pageMem.writeLock(grp.groupId(), 
partMetaId, partMetaPage);
 
-                                if (recoverState != null) {
-                                    changed = io.setPartitionState(pageAddr, 
(byte)recoverState.intValue());
+                        boolean changed = false;
 
-                                    updateState(part, recoverState);
+                        try {
+                            PagePartitionMetaIO io = 
PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
 
-                                    if (log.isDebugEnabled())
-                                        log.debug("Restored partition state 
(from WAL) " +
-                                            "[grp=" + grp.cacheOrGroupName() + 
", p=" + p + ", state=" + part.state() +
-                                            ", updCntr=" + 
part.initialUpdateCounter() +
-                                            ", size=" + part.fullSize() + ']');
-                                }
-                                else {
-                                    int stateId = 
io.getPartitionState(pageAddr);
+                            if (recoverState != null) {
+                                changed = io.setPartitionState(pageAddr, 
(byte)recoverState.intValue());
 
-                                    updateState(part, stateId);
+                                updateState(part, recoverState);
 
-                                    if (log.isDebugEnabled())
-                                        log.debug("Restored partition state 
(from page memory) " +
-                                            "[grp=" + grp.cacheOrGroupName() + 
", p=" + p + ", state=" + part.state() +
-                                            ", updCntr=" + 
part.initialUpdateCounter() + ", stateId=" + stateId +
-                                            ", size=" + part.fullSize() + ']');
-                                }
+                                if (log.isDebugEnabled())
+                                    log.debug("Restored partition state (from 
WAL) " +
+                                        "[grp=" + grp.cacheOrGroupName() + ", 
p=" + p + ", state=" + part.state() +
+                                        ", updCntr=" + 
part.initialUpdateCounter() +
+                                        ", size=" + part.fullSize() + ']');
                             }
-                            finally {
-                                pageMem.writeUnlock(grp.groupId(), partMetaId, 
partMetaPage, null, changed);
+                            else {
+                                int stateId = io.getPartitionState(pageAddr);
+
+                                updateState(part, stateId);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Restored partition state (from 
page memory) " +
+                                        "[grp=" + grp.cacheOrGroupName() + ", 
p=" + p + ", state=" + part.state() +
+                                        ", updCntr=" + 
part.initialUpdateCounter() + ", stateId=" + stateId +
+                                        ", size=" + part.fullSize() + ']');
                             }
                         }
                         finally {
-                            pageMem.releasePage(grp.groupId(), partMetaId, 
partMetaPage);
+                            pageMem.writeUnlock(grp.groupId(), partMetaId, 
partMetaPage, null, changed);
                         }
                     }
                     finally {
-                        ctx.database().checkpointReadUnlock();
+                        pageMem.releasePage(grp.groupId(), partMetaId, 
partMetaPage);
                     }
                 }
-                else if (recoverState != null) { // Pre-create partition if 
having valid state.
-                    GridDhtLocalPartition part = 
grp.topology().forceCreatePartition(p);
+                finally {
+                    ctx.database().checkpointReadUnlock();
+                }
+            }
+            else if (recoverState != null) { // Pre-create partition if having 
valid state.
+                GridDhtLocalPartition part = 
grp.topology().forceCreatePartition(p);
 
-                    updateState(part, recoverState);
+                updateState(part, recoverState);
 
-                    processed.incrementAndGet();
-
-                    if (log.isDebugEnabled())
-                        log.debug("Restored partition state (from WAL) " +
-                            "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", 
state=" + part.state() +
-                            ", updCntr=" + part.initialUpdateCounter() +
-                            ", size=" + part.fullSize() + ']');
-                }
-                else {
-                    if (log.isDebugEnabled())
-                        log.debug("Skipping partition on recovery (no page 
store OR wal state) " +
-                            "[grp=" + grp.cacheOrGroupName() + ", p=" + p + 
']');
-                }
+                processed++;
 
                 if (log.isDebugEnabled())
-                    log.debug("Finished restoring partition state " +
-                        "[grp=" + grp.cacheOrGroupName() + ", p=" + p +
-                        ", time=" + (U.currentTimeMillis() - startTime) + " 
ms]");
+                    log.debug("Restored partition state (from WAL) " +
+                        "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", 
state=" + part.state() +
+                        ", updCntr=" + part.initialUpdateCounter() +
+                        ", size=" + part.fullSize() + ']');
             }
-            catch (IgniteCheckedException e) {
-                if (!err.compareAndSet(null, e))
-                    err.get().addSuppressed(e);
+            else {
+                if (log.isDebugEnabled())
+                    log.debug("Skipping partition on recovery (no page store 
OR wal state) " +
+                        "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
             }
-        };
 
-        ctx.cache().restorePartitionsPool().submit(
-            () -> IntStream.range(0, 
grp.affinity().partitions()).parallel().forEach(partConsumer)
-        ).join();
-
-        if (err.get() != null)
-            throw err.get();
+            if (log.isDebugEnabled())
+                log.debug("Finished restoring partition state " +
+                    "[grp=" + grp.cacheOrGroupName() + ", p=" + p +
+                    ", time=" + (U.currentTimeMillis() - startTime) + " ms]");
+        }
 
         partitionStatesRestored = true;
 
-        return processed.get();
+        return processed;
     }
 
     /**

Reply via email to