This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 9bdd3a8 IGNITE-13558 Better parallelization down to partition level
when restoring partition states - Fixes #8333.
9bdd3a8 is described below
commit 9bdd3a8db43c78e58d687c35cd79cd4c33d6bc58
Author: ibessonov <[email protected]>
AuthorDate: Fri Oct 16 13:01:15 2020 +0300
IGNITE-13558 Better parallelization down to partition level when restoring
partition states - Fixes #8333.
Signed-off-by: Sergey Chugunov <[email protected]>
---
.../org/apache/ignite/internal/IgniteFeatures.java | 3 +
.../processors/cache/GridCacheProcessor.java | 53 ++++++--
.../GridCacheDatabaseSharedManager.java | 2 +-
.../cache/persistence/GridCacheOffheapManager.java | 149 ++++++++++++---------
4 files changed, 129 insertions(+), 78 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
index 11ef198..25361d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
@@ -173,6 +173,9 @@ public enum IgniteFeatures {
* @return {@code True} if feature is declared to be supported by remote
node.
*/
public static boolean nodeSupports(byte[] featuresAttrBytes,
IgniteFeatures feature) {
+ if (featuresAttrBytes == null)
+ return false;
+
int featureId = feature.getFeatureId();
// Same as "BitSet.valueOf(features).get(featureId)"
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index d3beafc..1260b60 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -33,6 +33,8 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -150,7 +152,6 @@ import
org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.IgniteCollectors;
import org.apache.ignite.internal.util.InitializationProtector;
-import org.apache.ignite.internal.util.StripedExecutor;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -192,7 +193,6 @@ import static java.lang.String.format;
import static java.util.Arrays.asList;
import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
-import static
org.apache.ignite.IgniteSystemProperties.IGNITE_ALLOW_START_CACHES_IN_PARALLEL;
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL;
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
@@ -306,10 +306,13 @@ public class GridCacheProcessor extends
GridProcessorAdapter {
private final CacheRecoveryLifecycle recovery = new
CacheRecoveryLifecycle();
/** Cache configuration splitter. */
- private CacheConfigurationSplitter splitter;
+ private final CacheConfigurationSplitter splitter;
/** Cache configuration enricher. */
- private CacheConfigurationEnricher enricher;
+ private final CacheConfigurationEnricher enricher;
+
+ /** Pool to use while restoring partition states. */
+ private final ForkJoinPool restorePartitionsPool;
/**
* @param ctx Kernal context.
@@ -324,6 +327,19 @@ public class GridCacheProcessor extends
GridProcessorAdapter {
marsh = MarshallerUtils.jdkMarshaller(ctx.igniteInstanceName());
splitter = new CacheConfigurationSplitterImpl(ctx, marsh);
enricher = new CacheConfigurationEnricher(ctx, marsh,
U.resolveClassLoader(ctx.config()));
+
+ ForkJoinPool.ForkJoinWorkerThreadFactory factory = new
ForkJoinPool.ForkJoinWorkerThreadFactory() {
+ @Override public ForkJoinWorkerThread newThread(ForkJoinPool pool)
{
+ ForkJoinWorkerThread worker =
ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
+
+ worker.setName("restore-partition-states-" +
worker.getPoolIndex());
+
+ return worker;
+ }
+ };
+
+ int stripesCnt = ctx.getStripedExecutorService().stripesCount();
+ restorePartitionsPool = new ForkJoinPool(stripesCnt, factory, null,
false);
}
/**
@@ -744,6 +760,8 @@ public class GridCacheProcessor extends
GridProcessorAdapter {
sharedCtx.cleanup();
+ restorePartitionsPool.shutdownNow();
+
if (log.isDebugEnabled())
log.debug("Stopped cache processor.");
}
@@ -5283,6 +5301,20 @@ public class GridCacheProcessor extends
GridProcessorAdapter {
}
/**
+ * Returns {@code ForkJoinPool} instance to be used in partition states
restoration.<br/>
+ * It's more convenient than regular pools because it can be used to
parallel by cache groups and by partitions
+ * without sacrificing code simplicity (cache group tasks won't
exclusively occupy their threads and won't block
+ * partition tasks as a result).<br/>
+ * <br/>
+ * There's a chance that this pool will later be replaced with a more
common one, like system pool, for example.
+ *
+ * @return Pool instance.
+ */
+ public ForkJoinPool restorePartitionsPool() {
+ return restorePartitionsPool;
+ }
+
+ /**
* Pages list view supplier.
*
* @param filter Filter.
@@ -5464,12 +5496,10 @@ public class GridCacheProcessor extends
GridProcessorAdapter {
AtomicReference<IgniteCheckedException> restoreStateError = new
AtomicReference<>();
- StripedExecutor stripedExec = ctx.getStripedExecutorService();
-
- int roundRobin = 0;
+ CountDownLatch completionLatch = new
CountDownLatch(forGroups.size());
for (CacheGroupContext grp : forGroups) {
- stripedExec.execute(roundRobin % stripedExec.stripesCount(),
() -> {
+ restorePartitionsPool.submit(() -> {
try {
long processed =
grp.offheap().restorePartitionStates(partitionStates);
@@ -5486,14 +5516,15 @@ public class GridCacheProcessor extends
GridProcessorAdapter {
: new IgniteCheckedException(e)
);
}
+ finally {
+ completionLatch.countDown();
+ }
});
-
- roundRobin++;
}
try {
// Await completion restore state tasks in all stripes.
- stripedExec.awaitComplete();
+ completionLatch.await();
}
catch (InterruptedException e) {
throw new IgniteInterruptedException(e);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 50224f3..d363f35 100755
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -1142,7 +1142,7 @@ public class GridCacheDatabaseSharedManager extends
IgniteCacheDatabaseSharedMan
if (fut.localJoinExchange() || fut.activateCluster()
|| (fut.exchangeActions() != null &&
!F.isEmpty(fut.exchangeActions().cacheGroupsToStart()))) {
U.doInParallel(
- cctx.kernalContext().getSystemExecutorService(),
+ cctx.cache().restorePartitionsPool(),
cctx.cache().cacheGroups(),
cacheGroup -> {
if (cacheGroup.isLocal())
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 1fba65d..faed634 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -31,7 +31,10 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.IntConsumer;
import java.util.function.ToLongFunction;
+import java.util.stream.IntStream;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -576,114 +579,128 @@ public class GridCacheOffheapManager extends
IgniteCacheOffheapManagerImpl imple
if (partitionStatesRestored)
return 0;
- long processed = 0;
+ AtomicLong processed = new AtomicLong();
+ AtomicReference<IgniteCheckedException> err = new AtomicReference<>();
PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
- for (int p = 0; p < grp.affinity().partitions(); p++) {
+ IntConsumer partConsumer = p -> {
Integer recoverState = partitionRecoveryStates.get(new
GroupPartitionId(grp.groupId(), p));
long startTime = U.currentTimeMillis();
- if (ctx.pageStore().exists(grp.groupId(), p)) {
- ctx.pageStore().ensure(grp.groupId(), p);
+ try {
+ if (ctx.pageStore().exists(grp.groupId(), p)) {
+ ctx.pageStore().ensure(grp.groupId(), p);
+
+ if (ctx.pageStore().pages(grp.groupId(), p) <= 1) {
+ if (log.isDebugEnabled())
+ log.debug("Skipping partition on recovery (pages
less than 1) " +
+ "[grp=" + grp.cacheOrGroupName() + ", p=" + p
+ ']');
+
+ return;
+ }
- if (ctx.pageStore().pages(grp.groupId(), p) <= 1) {
if (log.isDebugEnabled())
- log.debug("Skipping partition on recovery (pages less
than 1) " +
+ log.debug("Creating partition on recovery (exists in
page store) " +
"[grp=" + grp.cacheOrGroupName() + ", p=" + p +
']');
- continue;
- }
+ processed.incrementAndGet();
- if (log.isDebugEnabled())
- log.debug("Creating partition on recovery (exists in page
store) " +
- "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
+ GridDhtLocalPartition part =
grp.topology().forceCreatePartition(p);
- processed++;
+ // Triggers initialization of existing(having datafile)
partition before acquiring cp read lock.
+ part.dataStore().init();
- GridDhtLocalPartition part =
grp.topology().forceCreatePartition(p);
+ ctx.database().checkpointReadLock();
- // Triggers initialization of existing(having datafile)
partition before acquiring cp read lock.
- part.dataStore().init();
+ try {
+ long partMetaId =
pageMem.partitionMetaPageId(grp.groupId(), p);
+ long partMetaPage = pageMem.acquirePage(grp.groupId(),
partMetaId);
- ctx.database().checkpointReadLock();
+ try {
+ long pageAddr = pageMem.writeLock(grp.groupId(),
partMetaId, partMetaPage);
- try {
- long partMetaId =
pageMem.partitionMetaPageId(grp.groupId(), p);
- long partMetaPage = pageMem.acquirePage(grp.groupId(),
partMetaId);
+ boolean changed = false;
- try {
- long pageAddr = pageMem.writeLock(grp.groupId(),
partMetaId, partMetaPage);
+ try {
+ PagePartitionMetaIO io =
PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
- boolean changed = false;
+ if (recoverState != null) {
+ changed = io.setPartitionState(pageAddr,
(byte)recoverState.intValue());
- try {
- PagePartitionMetaIO io =
PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
+ updateState(part, recoverState);
- if (recoverState != null) {
- changed = io.setPartitionState(pageAddr,
(byte)recoverState.intValue());
+ if (log.isDebugEnabled())
+ log.debug("Restored partition state
(from WAL) " +
+ "[grp=" + grp.cacheOrGroupName() +
", p=" + p + ", state=" + part.state() +
+ ", updCntr=" +
part.initialUpdateCounter() +
+ ", size=" + part.fullSize() + ']');
+ }
+ else {
+ int stateId =
io.getPartitionState(pageAddr);
- updateState(part, recoverState);
+ updateState(part, stateId);
- if (log.isDebugEnabled())
- log.debug("Restored partition state (from
WAL) " +
- "[grp=" + grp.cacheOrGroupName() + ",
p=" + p + ", state=" + part.state() +
- ", updCntr=" +
part.initialUpdateCounter() +
- ", size=" + part.fullSize() + ']');
+ if (log.isDebugEnabled())
+ log.debug("Restored partition state
(from page memory) " +
+ "[grp=" + grp.cacheOrGroupName() +
", p=" + p + ", state=" + part.state() +
+ ", updCntr=" +
part.initialUpdateCounter() + ", stateId=" + stateId +
+ ", size=" + part.fullSize() + ']');
+ }
}
- else {
- int stateId = io.getPartitionState(pageAddr);
-
- updateState(part, stateId);
-
- if (log.isDebugEnabled())
- log.debug("Restored partition state (from
page memory) " +
- "[grp=" + grp.cacheOrGroupName() + ",
p=" + p + ", state=" + part.state() +
- ", updCntr=" +
part.initialUpdateCounter() + ", stateId=" + stateId +
- ", size=" + part.fullSize() + ']');
+ finally {
+ pageMem.writeUnlock(grp.groupId(), partMetaId,
partMetaPage, null, changed);
}
}
finally {
- pageMem.writeUnlock(grp.groupId(), partMetaId,
partMetaPage, null, changed);
+ pageMem.releasePage(grp.groupId(), partMetaId,
partMetaPage);
}
}
finally {
- pageMem.releasePage(grp.groupId(), partMetaId,
partMetaPage);
+ ctx.database().checkpointReadUnlock();
}
}
- finally {
- ctx.database().checkpointReadUnlock();
- }
- }
- else if (recoverState != null) { // Pre-create partition if having
valid state.
- GridDhtLocalPartition part =
grp.topology().forceCreatePartition(p);
+ else if (recoverState != null) { // Pre-create partition if
having valid state.
+ GridDhtLocalPartition part =
grp.topology().forceCreatePartition(p);
- updateState(part, recoverState);
+ updateState(part, recoverState);
- processed++;
+ processed.incrementAndGet();
+
+ if (log.isDebugEnabled())
+ log.debug("Restored partition state (from WAL) " +
+ "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ",
state=" + part.state() +
+ ", updCntr=" + part.initialUpdateCounter() +
+ ", size=" + part.fullSize() + ']');
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Skipping partition on recovery (no page
store OR wal state) " +
+ "[grp=" + grp.cacheOrGroupName() + ", p=" + p +
']');
+ }
if (log.isDebugEnabled())
- log.debug("Restored partition state (from WAL) " +
- "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ",
state=" + part.state() +
- ", updCntr=" + part.initialUpdateCounter() +
- ", size=" + part.fullSize() + ']');
+ log.debug("Finished restoring partition state " +
+ "[grp=" + grp.cacheOrGroupName() + ", p=" + p +
+ ", time=" + (U.currentTimeMillis() - startTime) + "
ms]");
}
- else {
- if (log.isDebugEnabled())
- log.debug("Skipping partition on recovery (no page store
OR wal state) " +
- "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
+ catch (IgniteCheckedException e) {
+ if (!err.compareAndSet(null, e))
+ err.get().addSuppressed(e);
}
+ };
- if (log.isDebugEnabled())
- log.debug("Finished restoring partition state " +
- "[grp=" + grp.cacheOrGroupName() + ", p=" + p +
- ", time=" + (U.currentTimeMillis() - startTime) + " ms]");
- }
+ ctx.cache().restorePartitionsPool().submit(
+ () -> IntStream.range(0,
grp.affinity().partitions()).parallel().forEach(partConsumer)
+ ).join();
+
+ if (err.get() != null)
+ throw err.get();
partitionStatesRestored = true;
- return processed;
+ return processed.get();
}
/**