This is an automated email from the ASF dual-hosted git repository.
dgovorukhin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 217accf IGNITE-11589 Fix GridDhtPartitionsExchangeFuture remained
incompleted in some circumstances - Fixes #6316.
217accf is described below
commit 217accfbc396d1c94e509a6fa9e8d51db6e157d2
Author: Slava Koptilin <[email protected]>
AuthorDate: Tue Mar 26 18:13:54 2019 +0300
IGNITE-11589 Fix GridDhtPartitionsExchangeFuture remained incompleted in
some circumstances - Fixes #6316.
Signed-off-by: Dmitriy Govorukhin <[email protected]>
---
.../preloader/GridDhtPartitionsExchangeFuture.java | 152 +++++++++++----------
.../apache/ignite/internal/util/IgniteUtils.java | 101 ++++++++++----
2 files changed, 157 insertions(+), 96 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 2aa28cf..18cd31c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -137,6 +137,7 @@ import static
org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvent
import static
org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents.serverLeftEvent;
import static
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap.PARTIAL_COUNTERS_MAP_SINCE;
import static org.apache.ignite.internal.util.IgniteUtils.doInParallel;
+import static
org.apache.ignite.internal.util.IgniteUtils.doInParallelUninterruptibly;
/**
* Future for exchanging partition maps.
@@ -2116,109 +2117,118 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
assert res != null || err != null;
- waitUntilNewCachesAreRegistered();
+ try {
+ waitUntilNewCachesAreRegistered();
- if (err == null &&
- !cctx.kernalContext().clientNode() &&
- (serverNodeDiscoveryEvent() || affChangeMsg != null)) {
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (!cacheCtx.affinityNode() || cacheCtx.isLocal())
- continue;
+ if (err == null &&
+ !cctx.kernalContext().clientNode() &&
+ (serverNodeDiscoveryEvent() || affChangeMsg != null)) {
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ if (!cacheCtx.affinityNode() || cacheCtx.isLocal())
+ continue;
- cacheCtx.continuousQueries().flushBackupQueue(res);
+ cacheCtx.continuousQueries().flushBackupQueue(res);
+ }
}
- }
- if (err == null) {
- if (centralizedAff || forceAffReassignment) {
- assert !exchCtx.mergeExchanges();
+ if (err == null) {
+ if (centralizedAff || forceAffReassignment) {
+ assert !exchCtx.mergeExchanges();
- Collection<CacheGroupContext> grpToRefresh =
U.newHashSet(cctx.cache().cacheGroups().size());
+ Collection<CacheGroupContext> grpToRefresh =
U.newHashSet(cctx.cache().cacheGroups().size());
- for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
- if (grp.isLocal())
- continue;
+ for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+ if (grp.isLocal())
+ continue;
- try {
- if
(grp.topology().initPartitionsWhenAffinityReady(res, this))
- grpToRefresh.add(grp);
- }
- catch (IgniteInterruptedCheckedException e) {
- U.error(log, "Failed to initialize partitions.", e);
- }
+ try {
+ if
(grp.topology().initPartitionsWhenAffinityReady(res, this))
+ grpToRefresh.add(grp);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ U.error(log, "Failed to initialize partitions.",
e);
+ }
- }
+ }
- if (!grpToRefresh.isEmpty()){
- if (log.isDebugEnabled())
- log.debug("Refresh partitions due to partitions
initialized when affinity ready [" +
-
grpToRefresh.stream().map(CacheGroupContext::name).collect(Collectors.toList())
+ ']');
+ if (!grpToRefresh.isEmpty()) {
+ if (log.isDebugEnabled())
+ log.debug("Refresh partitions due to partitions
initialized when affinity ready [" +
+
grpToRefresh.stream().map(CacheGroupContext::name).collect(Collectors.toList())
+ ']');
- cctx.exchange().refreshPartitions(grpToRefresh);
+ cctx.exchange().refreshPartitions(grpToRefresh);
+ }
}
- }
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- GridCacheContext drCacheCtx = cacheCtx.isNear() ?
cacheCtx.near().dht().context() : cacheCtx;
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ GridCacheContext drCacheCtx = cacheCtx.isNear() ?
cacheCtx.near().dht().context() : cacheCtx;
- if (drCacheCtx.isDrEnabled()) {
- try {
- drCacheCtx.dr().onExchange(res, exchId.isLeft(),
activateCluster());
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to notify DR: " + e, e);
+ if (drCacheCtx.isDrEnabled()) {
+ try {
+ drCacheCtx.dr().onExchange(res, exchId.isLeft(),
activateCluster());
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to notify DR: " + e, e);
+ }
}
}
- }
- if (serverNodeDiscoveryEvent() || localJoinExchange())
- detectLostPartitions(res);
+ if (serverNodeDiscoveryEvent() || localJoinExchange())
+ detectLostPartitions(res);
- Map<Integer, CacheGroupValidation> m =
U.newHashMap(cctx.cache().cacheGroups().size());
+ Map<Integer, CacheGroupValidation> m =
U.newHashMap(cctx.cache().cacheGroups().size());
- for (CacheGroupContext grp : cctx.cache().cacheGroups())
- m.put(grp.groupId(), validateCacheGroup(grp,
events().lastEvent().topologyNodes()));
+ for (CacheGroupContext grp : cctx.cache().cacheGroups())
+ m.put(grp.groupId(), validateCacheGroup(grp,
events().lastEvent().topologyNodes()));
- grpValidRes = m;
- }
+ grpValidRes = m;
+ }
- if (!cctx.localNode().isClient())
- tryToPerformLocalSnapshotOperation();
+ if (!cctx.localNode().isClient())
+ tryToPerformLocalSnapshotOperation();
- if (err == null)
- cctx.coordinators().onExchangeDone(events().discoveryCache());
+ if (err == null)
+ cctx.coordinators().onExchangeDone(events().discoveryCache());
- // Create and destory caches and cache proxies.
- cctx.cache().onExchangeDone(initialVersion(), exchActions, err);
+ // Create and destory caches and cache proxies.
+ cctx.cache().onExchangeDone(initialVersion(), exchActions, err);
- cctx.kernalContext().authentication().onActivate();
+ cctx.kernalContext().authentication().onActivate();
- Map<T2<Integer, Integer>, Long> localReserved =
partHistSuppliers.getReservations(cctx.localNodeId());
+ Map<T2<Integer, Integer>, Long> localReserved =
partHistSuppliers.getReservations(cctx.localNodeId());
- if (localReserved != null) {
- for (Map.Entry<T2<Integer, Integer>, Long> e :
localReserved.entrySet()) {
- boolean success = cctx.database().reserveHistoryForPreloading(
- e.getKey().get1(), e.getKey().get2(), e.getValue());
+ if (localReserved != null) {
+ for (Map.Entry<T2<Integer, Integer>, Long> e :
localReserved.entrySet()) {
+ boolean success =
cctx.database().reserveHistoryForPreloading(
+ e.getKey().get1(), e.getKey().get2(), e.getValue());
- if (!success) {
- // TODO: how to handle?
- err = new IgniteCheckedException("Could not reserve
history");
+ if (!success) {
+ // TODO: how to handle?
+ err = new IgniteCheckedException("Could not reserve
history");
+ }
}
}
- }
- cctx.database().releaseHistoryForExchange();
+ cctx.database().releaseHistoryForExchange();
- if (err == null) {
- cctx.database().rebuildIndexesIfNeeded(this);
+ if (err == null) {
+ cctx.database().rebuildIndexesIfNeeded(this);
- for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
- if (!grp.isLocal())
- grp.topology().onExchangeDone(this,
grp.affinity().readyAffinity(res), false);
+ for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+ if (!grp.isLocal())
+ grp.topology().onExchangeDone(this,
grp.affinity().readyAffinity(res), false);
+ }
+
+ if (changedAffinity())
+ cctx.walState().changeLocalStatesOnExchangeDone(res,
changedBaseline());
}
+ }
+ catch (Throwable t) {
+ // In any case, this exchange future has to be completed. The
original error should be preserved if exists.
+ if (err != null)
+ t.addSuppressed(err);
- if (changedAffinity())
- cctx.walState().changeLocalStatesOnExchangeDone(res,
changedBaseline());
+ err = t;
}
final Throwable err0 = err;
@@ -3144,7 +3154,7 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
try {
// Reserve at least 2 threads for system operations.
- doInParallel(
+ doInParallelUninterruptibly(
U.availableThreadCount(cctx.kernalContext(),
GridIoPolicy.SYSTEM_POOL, 2),
cctx.kernalContext().getSystemExecutorService(),
cctx.cache().cacheGroups(),
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index ffbb76d..5bae576 100755
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -254,8 +254,8 @@ import org.apache.ignite.spi.IgniteSpi;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
-import org.apache.ignite.transactions.TransactionAlreadyCompletedException;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.transactions.TransactionAlreadyCompletedException;
import org.apache.ignite.transactions.TransactionDeadlockException;
import org.apache.ignite.transactions.TransactionDuplicateKeyException;
import org.apache.ignite.transactions.TransactionHeuristicException;
@@ -10792,7 +10792,49 @@ public abstract class IgniteUtils {
Collection<T> srcDatas,
IgniteThrowableFunction<T, R> operation
) throws IgniteCheckedException, IgniteInterruptedCheckedException {
- if(srcDatas.isEmpty())
+ return doInParallel(parallelismLvl, executorSvc, srcDatas, operation,
false);
+ }
+
+ /**
+ * Execute operation on data in parallel uninterruptibly.
+ *
+ * @param parallelismLvl Number of threads on which it should be executed.
+ * @param executorSvc Service for parallel execution.
+ * @param srcDatas List of data for parallelization.
+ * @param operation Logic for execution of on each item of data.
+ * @param <T> Type of data.
+ * @param <R> Type of return value.
+ * @throws IgniteCheckedException if parallel execution was failed.
+ */
+ public static <T, R> Collection<R> doInParallelUninterruptibly(
+ int parallelismLvl,
+ ExecutorService executorSvc,
+ Collection<T> srcDatas,
+ IgniteThrowableFunction<T, R> operation
+ ) throws IgniteCheckedException, IgniteInterruptedCheckedException {
+ return doInParallel(parallelismLvl, executorSvc, srcDatas, operation,
true);
+ }
+
+ /**
+ * Execute operation on data in parallel.
+ *
+ * @param parallelismLvl Number of threads on which it should be executed.
+ * @param executorSvc Service for parallel execution.
+ * @param srcDatas List of data for parallelization.
+ * @param operation Logic for execution of on each item of data.
+ * @param <T> Type of data.
+ * @param <R> Type of return value.
+ * @param uninterruptible {@code true} if a result should be awaited in
any case.
+ * @throws IgniteCheckedException if parallel execution was failed.
+ */
+ private static <T, R> Collection<R> doInParallel(
+ int parallelismLvl,
+ ExecutorService executorSvc,
+ Collection<T> srcDatas,
+ IgniteThrowableFunction<T, R> operation,
+ boolean uninterruptible
+ ) throws IgniteCheckedException, IgniteInterruptedCheckedException {
+ if (srcDatas.isEmpty())
return Collections.emptyList();
int[] batchSizes = calculateOptimalBatchSizes(parallelismLvl,
srcDatas.size());
@@ -10808,7 +10850,7 @@ public abstract class IgniteUtils {
for (int idx = 0; idx < batchSizes.length; idx++) {
int batchSize = batchSizes[idx];
- Batch<T, R> batch = new Batch<>(batchSize);
+ Batch<T, R> batch = new Batch<>(batchSize, uninterruptible);
for (int i = 0; i < batchSize; i++)
batch.addTask(iterator.next());
@@ -10821,11 +10863,10 @@ public abstract class IgniteUtils {
// Add to set only after check that batch is not empty.
.peek(sharedBatchesSet::add)
// Setup future in batch for waiting result.
- .peek(batch -> batch.future = executorSvc.submit(() -> {
+ .peek(batch -> batch.fut = executorSvc.submit(() -> {
// Batch was stolen by the main stream.
- if (!sharedBatchesSet.remove(batch)) {
+ if (!sharedBatchesSet.remove(batch))
return null;
- }
Collection<R> results = new ArrayList<>(batch.tasks.size());
@@ -10861,7 +10902,7 @@ public abstract class IgniteUtils {
// Final result collection.
Collection<R> results = new ArrayList<>(srcDatas.size());
- for (Batch<T, R> batch: batches) {
+ for (Batch<T, R> batch : batches) {
try {
Throwable err = batch.error;
@@ -10957,13 +10998,19 @@ public abstract class IgniteUtils {
private Throwable error;
/** */
- private Future<Collection<R>> future;
+ private Future<Collection<R>> fut;
+
+ /** */
+ private final boolean uninterruptible;
/**
* @param batchSize Batch size.
+ * @param uninterruptible {@code true} if a result should be awaited
in any case.
*/
- private Batch(int batchSize) {
- this.tasks = new ArrayList<>(batchSize);
+ private Batch(int batchSize, boolean uninterruptible) {
+ tasks = new ArrayList<>(batchSize);
+
+ this.uninterruptible = uninterruptible;
}
/**
@@ -10977,7 +11024,7 @@ public abstract class IgniteUtils {
* @param res Setup results for tasks.
*/
public void result(Collection<R> res) {
- this.result = res;
+ result = res;
}
/**
@@ -10991,9 +11038,12 @@ public abstract class IgniteUtils {
* Get tasks results.
*/
public Collection<R> result() throws ExecutionException,
InterruptedException {
- assert future != null;
+ assert fut != null;
- return result != null ? result : future.get();
+ if (result != null)
+ return result;
+
+ return uninterruptible ? getUninterruptibly(fut) : fut.get();
}
}
@@ -11016,22 +11066,23 @@ public abstract class IgniteUtils {
* @param fut Future to wait for completion.
* @throws ExecutionException If the future
*/
- private static void getUninterruptibly(Future fut) throws
ExecutionException {
+ private static <R> R getUninterruptibly(Future<R> fut) throws
ExecutionException {
boolean interrupted = false;
- while (true) {
- try {
- fut.get();
-
- break;
- }
- catch (InterruptedException e) {
- interrupted = true;
+ try {
+ while (true) {
+ try {
+ return fut.get();
+ }
+ catch (InterruptedException e) {
+ interrupted = true;
+ }
}
}
-
- if (interrupted)
- Thread.currentThread().interrupt();
+ finally {
+ if (interrupted)
+ Thread.currentThread().interrupt();
+ }
}
/**