This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new fe4829a Revert "IGNITE-13558 Better parallelization down to partition
level when restoring partition states - Fixes #8333."
fe4829a is described below
commit fe4829ac1f565f78097e49b50027ea945d11b2c1
Author: Sergey Chugunov <[email protected]>
AuthorDate: Wed Oct 21 10:46:52 2020 +0300
Revert "IGNITE-13558 Better parallelization down to partition level when
restoring partition states - Fixes #8333."
This reverts commit 9bdd3a8db43c78e58d687c35cd79cd4c33d6bc58.
Reverted due to possible deadlocks found after merge.
---
.../org/apache/ignite/internal/IgniteFeatures.java | 3 -
.../processors/cache/GridCacheProcessor.java | 53 ++------
.../GridCacheDatabaseSharedManager.java | 2 +-
.../cache/persistence/GridCacheOffheapManager.java | 149 +++++++++------------
4 files changed, 78 insertions(+), 129 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
index 25361d2..11ef198 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
@@ -173,9 +173,6 @@ public enum IgniteFeatures {
* @return {@code True} if feature is declared to be supported by remote
node.
*/
public static boolean nodeSupports(byte[] featuresAttrBytes,
IgniteFeatures feature) {
- if (featuresAttrBytes == null)
- return false;
-
int featureId = feature.getFeatureId();
// Same as "BitSet.valueOf(features).get(featureId)"
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 1260b60..d3beafc 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -33,8 +33,6 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -152,6 +150,7 @@ import
org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.IgniteCollectors;
import org.apache.ignite.internal.util.InitializationProtector;
+import org.apache.ignite.internal.util.StripedExecutor;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -193,6 +192,7 @@ import static java.lang.String.format;
import static java.util.Arrays.asList;
import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
+import static
org.apache.ignite.IgniteSystemProperties.IGNITE_ALLOW_START_CACHES_IN_PARALLEL;
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL;
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
@@ -306,13 +306,10 @@ public class GridCacheProcessor extends
GridProcessorAdapter {
private final CacheRecoveryLifecycle recovery = new
CacheRecoveryLifecycle();
/** Cache configuration splitter. */
- private final CacheConfigurationSplitter splitter;
+ private CacheConfigurationSplitter splitter;
/** Cache configuration enricher. */
- private final CacheConfigurationEnricher enricher;
-
- /** Pool to use while restoring partition states. */
- private final ForkJoinPool restorePartitionsPool;
+ private CacheConfigurationEnricher enricher;
/**
* @param ctx Kernal context.
@@ -327,19 +324,6 @@ public class GridCacheProcessor extends
GridProcessorAdapter {
marsh = MarshallerUtils.jdkMarshaller(ctx.igniteInstanceName());
splitter = new CacheConfigurationSplitterImpl(ctx, marsh);
enricher = new CacheConfigurationEnricher(ctx, marsh,
U.resolveClassLoader(ctx.config()));
-
- ForkJoinPool.ForkJoinWorkerThreadFactory factory = new
ForkJoinPool.ForkJoinWorkerThreadFactory() {
- @Override public ForkJoinWorkerThread newThread(ForkJoinPool pool)
{
- ForkJoinWorkerThread worker =
ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
-
- worker.setName("restore-partition-states-" +
worker.getPoolIndex());
-
- return worker;
- }
- };
-
- int stripesCnt = ctx.getStripedExecutorService().stripesCount();
- restorePartitionsPool = new ForkJoinPool(stripesCnt, factory, null,
false);
}
/**
@@ -760,8 +744,6 @@ public class GridCacheProcessor extends
GridProcessorAdapter {
sharedCtx.cleanup();
- restorePartitionsPool.shutdownNow();
-
if (log.isDebugEnabled())
log.debug("Stopped cache processor.");
}
@@ -5301,20 +5283,6 @@ public class GridCacheProcessor extends
GridProcessorAdapter {
}
/**
- * Returns {@code ForkJoinPool} instance to be used in partition states
restoration.<br/>
- * It's more convenient than regular pools because it can be used to
parallel by cache groups and by partitions
- * without sacrificing code simplicity (cache group tasks won't
exclusively occupy their threads and won't block
- * partition tasks as a result).<br/>
- * <br/>
- * There's a chance that this pool will later be replaced with a more
common one, like system pool, for example.
- *
- * @return Pool instance.
- */
- public ForkJoinPool restorePartitionsPool() {
- return restorePartitionsPool;
- }
-
- /**
* Pages list view supplier.
*
* @param filter Filter.
@@ -5496,10 +5464,12 @@ public class GridCacheProcessor extends
GridProcessorAdapter {
AtomicReference<IgniteCheckedException> restoreStateError = new
AtomicReference<>();
- CountDownLatch completionLatch = new
CountDownLatch(forGroups.size());
+ StripedExecutor stripedExec = ctx.getStripedExecutorService();
+
+ int roundRobin = 0;
for (CacheGroupContext grp : forGroups) {
- restorePartitionsPool.submit(() -> {
+ stripedExec.execute(roundRobin % stripedExec.stripesCount(),
() -> {
try {
long processed =
grp.offheap().restorePartitionStates(partitionStates);
@@ -5516,15 +5486,14 @@ public class GridCacheProcessor extends
GridProcessorAdapter {
: new IgniteCheckedException(e)
);
}
- finally {
- completionLatch.countDown();
- }
});
+
+ roundRobin++;
}
try {
// Await completion restore state tasks in all stripes.
- completionLatch.await();
+ stripedExec.awaitComplete();
}
catch (InterruptedException e) {
throw new IgniteInterruptedException(e);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index d363f35..50224f3 100755
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -1142,7 +1142,7 @@ public class GridCacheDatabaseSharedManager extends
IgniteCacheDatabaseSharedMan
if (fut.localJoinExchange() || fut.activateCluster()
|| (fut.exchangeActions() != null &&
!F.isEmpty(fut.exchangeActions().cacheGroupsToStart()))) {
U.doInParallel(
- cctx.cache().restorePartitionsPool(),
+ cctx.kernalContext().getSystemExecutorService(),
cctx.cache().cacheGroups(),
cacheGroup -> {
if (cacheGroup.isLocal())
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index faed634..1fba65d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -31,10 +31,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.IntConsumer;
import java.util.function.ToLongFunction;
-import java.util.stream.IntStream;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -579,128 +576,114 @@ public class GridCacheOffheapManager extends
IgniteCacheOffheapManagerImpl imple
if (partitionStatesRestored)
return 0;
- AtomicLong processed = new AtomicLong();
- AtomicReference<IgniteCheckedException> err = new AtomicReference<>();
+ long processed = 0;
PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
- IntConsumer partConsumer = p -> {
+ for (int p = 0; p < grp.affinity().partitions(); p++) {
Integer recoverState = partitionRecoveryStates.get(new
GroupPartitionId(grp.groupId(), p));
long startTime = U.currentTimeMillis();
- try {
- if (ctx.pageStore().exists(grp.groupId(), p)) {
- ctx.pageStore().ensure(grp.groupId(), p);
-
- if (ctx.pageStore().pages(grp.groupId(), p) <= 1) {
- if (log.isDebugEnabled())
- log.debug("Skipping partition on recovery (pages
less than 1) " +
- "[grp=" + grp.cacheOrGroupName() + ", p=" + p
+ ']');
-
- return;
- }
+ if (ctx.pageStore().exists(grp.groupId(), p)) {
+ ctx.pageStore().ensure(grp.groupId(), p);
+ if (ctx.pageStore().pages(grp.groupId(), p) <= 1) {
if (log.isDebugEnabled())
- log.debug("Creating partition on recovery (exists in
page store) " +
+ log.debug("Skipping partition on recovery (pages less
than 1) " +
"[grp=" + grp.cacheOrGroupName() + ", p=" + p +
']');
- processed.incrementAndGet();
+ continue;
+ }
- GridDhtLocalPartition part =
grp.topology().forceCreatePartition(p);
+ if (log.isDebugEnabled())
+ log.debug("Creating partition on recovery (exists in page
store) " +
+ "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
- // Triggers initialization of existing(having datafile)
partition before acquiring cp read lock.
- part.dataStore().init();
+ processed++;
- ctx.database().checkpointReadLock();
+ GridDhtLocalPartition part =
grp.topology().forceCreatePartition(p);
- try {
- long partMetaId =
pageMem.partitionMetaPageId(grp.groupId(), p);
- long partMetaPage = pageMem.acquirePage(grp.groupId(),
partMetaId);
+ // Triggers initialization of existing(having datafile)
partition before acquiring cp read lock.
+ part.dataStore().init();
- try {
- long pageAddr = pageMem.writeLock(grp.groupId(),
partMetaId, partMetaPage);
+ ctx.database().checkpointReadLock();
- boolean changed = false;
+ try {
+ long partMetaId =
pageMem.partitionMetaPageId(grp.groupId(), p);
+ long partMetaPage = pageMem.acquirePage(grp.groupId(),
partMetaId);
- try {
- PagePartitionMetaIO io =
PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
+ try {
+ long pageAddr = pageMem.writeLock(grp.groupId(),
partMetaId, partMetaPage);
- if (recoverState != null) {
- changed = io.setPartitionState(pageAddr,
(byte)recoverState.intValue());
+ boolean changed = false;
- updateState(part, recoverState);
+ try {
+ PagePartitionMetaIO io =
PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
- if (log.isDebugEnabled())
- log.debug("Restored partition state
(from WAL) " +
- "[grp=" + grp.cacheOrGroupName() +
", p=" + p + ", state=" + part.state() +
- ", updCntr=" +
part.initialUpdateCounter() +
- ", size=" + part.fullSize() + ']');
- }
- else {
- int stateId =
io.getPartitionState(pageAddr);
+ if (recoverState != null) {
+ changed = io.setPartitionState(pageAddr,
(byte)recoverState.intValue());
- updateState(part, stateId);
+ updateState(part, recoverState);
- if (log.isDebugEnabled())
- log.debug("Restored partition state
(from page memory) " +
- "[grp=" + grp.cacheOrGroupName() +
", p=" + p + ", state=" + part.state() +
- ", updCntr=" +
part.initialUpdateCounter() + ", stateId=" + stateId +
- ", size=" + part.fullSize() + ']');
- }
+ if (log.isDebugEnabled())
+ log.debug("Restored partition state (from
WAL) " +
+ "[grp=" + grp.cacheOrGroupName() + ",
p=" + p + ", state=" + part.state() +
+ ", updCntr=" +
part.initialUpdateCounter() +
+ ", size=" + part.fullSize() + ']');
}
- finally {
- pageMem.writeUnlock(grp.groupId(), partMetaId,
partMetaPage, null, changed);
+ else {
+ int stateId = io.getPartitionState(pageAddr);
+
+ updateState(part, stateId);
+
+ if (log.isDebugEnabled())
+ log.debug("Restored partition state (from
page memory) " +
+ "[grp=" + grp.cacheOrGroupName() + ",
p=" + p + ", state=" + part.state() +
+ ", updCntr=" +
part.initialUpdateCounter() + ", stateId=" + stateId +
+ ", size=" + part.fullSize() + ']');
}
}
finally {
- pageMem.releasePage(grp.groupId(), partMetaId,
partMetaPage);
+ pageMem.writeUnlock(grp.groupId(), partMetaId,
partMetaPage, null, changed);
}
}
finally {
- ctx.database().checkpointReadUnlock();
+ pageMem.releasePage(grp.groupId(), partMetaId,
partMetaPage);
}
}
- else if (recoverState != null) { // Pre-create partition if
having valid state.
- GridDhtLocalPartition part =
grp.topology().forceCreatePartition(p);
+ finally {
+ ctx.database().checkpointReadUnlock();
+ }
+ }
+ else if (recoverState != null) { // Pre-create partition if having
valid state.
+ GridDhtLocalPartition part =
grp.topology().forceCreatePartition(p);
- updateState(part, recoverState);
+ updateState(part, recoverState);
- processed.incrementAndGet();
-
- if (log.isDebugEnabled())
- log.debug("Restored partition state (from WAL) " +
- "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ",
state=" + part.state() +
- ", updCntr=" + part.initialUpdateCounter() +
- ", size=" + part.fullSize() + ']');
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Skipping partition on recovery (no page
store OR wal state) " +
- "[grp=" + grp.cacheOrGroupName() + ", p=" + p +
']');
- }
+ processed++;
if (log.isDebugEnabled())
- log.debug("Finished restoring partition state " +
- "[grp=" + grp.cacheOrGroupName() + ", p=" + p +
- ", time=" + (U.currentTimeMillis() - startTime) + "
ms]");
+ log.debug("Restored partition state (from WAL) " +
+ "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ",
state=" + part.state() +
+ ", updCntr=" + part.initialUpdateCounter() +
+ ", size=" + part.fullSize() + ']');
}
- catch (IgniteCheckedException e) {
- if (!err.compareAndSet(null, e))
- err.get().addSuppressed(e);
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Skipping partition on recovery (no page store
OR wal state) " +
+ "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
}
- };
- ctx.cache().restorePartitionsPool().submit(
- () -> IntStream.range(0,
grp.affinity().partitions()).parallel().forEach(partConsumer)
- ).join();
-
- if (err.get() != null)
- throw err.get();
+ if (log.isDebugEnabled())
+ log.debug("Finished restoring partition state " +
+ "[grp=" + grp.cacheOrGroupName() + ", p=" + p +
+ ", time=" + (U.currentTimeMillis() - startTime) + " ms]");
+ }
partitionStatesRestored = true;
- return processed.get();
+ return processed;
}
/**