This is an automated email from the ASF dual-hosted git repository.

dgovorukhin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 217accf  IGNITE-11589 Fix GridDhtPartitionsExchangeFuture remained 
incompleted in some circumstances - Fixes #6316.
217accf is described below

commit 217accfbc396d1c94e509a6fa9e8d51db6e157d2
Author: Slava Koptilin <[email protected]>
AuthorDate: Tue Mar 26 18:13:54 2019 +0300

    IGNITE-11589 Fix GridDhtPartitionsExchangeFuture remained incompleted in 
some circumstances - Fixes #6316.
    
    Signed-off-by: Dmitriy Govorukhin <[email protected]>
---
 .../preloader/GridDhtPartitionsExchangeFuture.java | 152 +++++++++++----------
 .../apache/ignite/internal/util/IgniteUtils.java   | 101 ++++++++++----
 2 files changed, 157 insertions(+), 96 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 2aa28cf..18cd31c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -137,6 +137,7 @@ import static 
org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvent
 import static 
org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents.serverLeftEvent;
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap.PARTIAL_COUNTERS_MAP_SINCE;
 import static org.apache.ignite.internal.util.IgniteUtils.doInParallel;
+import static 
org.apache.ignite.internal.util.IgniteUtils.doInParallelUninterruptibly;
 
 /**
  * Future for exchanging partition maps.
@@ -2116,109 +2117,118 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
         assert res != null || err != null;
 
-        waitUntilNewCachesAreRegistered();
+        try {
+            waitUntilNewCachesAreRegistered();
 
-        if (err == null &&
-            !cctx.kernalContext().clientNode() &&
-            (serverNodeDiscoveryEvent() || affChangeMsg != null)) {
-            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                if (!cacheCtx.affinityNode() || cacheCtx.isLocal())
-                    continue;
+            if (err == null &&
+                !cctx.kernalContext().clientNode() &&
+                (serverNodeDiscoveryEvent() || affChangeMsg != null)) {
+                for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                    if (!cacheCtx.affinityNode() || cacheCtx.isLocal())
+                        continue;
 
-                cacheCtx.continuousQueries().flushBackupQueue(res);
+                    cacheCtx.continuousQueries().flushBackupQueue(res);
+                }
             }
-        }
 
-        if (err == null) {
-            if (centralizedAff || forceAffReassignment) {
-                assert !exchCtx.mergeExchanges();
+            if (err == null) {
+                if (centralizedAff || forceAffReassignment) {
+                    assert !exchCtx.mergeExchanges();
 
-                Collection<CacheGroupContext> grpToRefresh = 
U.newHashSet(cctx.cache().cacheGroups().size());
+                    Collection<CacheGroupContext> grpToRefresh = 
U.newHashSet(cctx.cache().cacheGroups().size());
 
-                for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
-                    if (grp.isLocal())
-                        continue;
+                    for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                        if (grp.isLocal())
+                            continue;
 
-                    try {
-                        if 
(grp.topology().initPartitionsWhenAffinityReady(res, this))
-                            grpToRefresh.add(grp);
-                    }
-                    catch (IgniteInterruptedCheckedException e) {
-                        U.error(log, "Failed to initialize partitions.", e);
-                    }
+                        try {
+                            if 
(grp.topology().initPartitionsWhenAffinityReady(res, this))
+                                grpToRefresh.add(grp);
+                        }
+                        catch (IgniteInterruptedCheckedException e) {
+                            U.error(log, "Failed to initialize partitions.", 
e);
+                        }
 
-                }
+                    }
 
-                if (!grpToRefresh.isEmpty()){
-                    if (log.isDebugEnabled())
-                        log.debug("Refresh partitions due to partitions 
initialized when affinity ready [" +
-                            
grpToRefresh.stream().map(CacheGroupContext::name).collect(Collectors.toList()) 
+ ']');
+                    if (!grpToRefresh.isEmpty()) {
+                        if (log.isDebugEnabled())
+                            log.debug("Refresh partitions due to partitions 
initialized when affinity ready [" +
+                                
grpToRefresh.stream().map(CacheGroupContext::name).collect(Collectors.toList()) 
+ ']');
 
-                    cctx.exchange().refreshPartitions(grpToRefresh);
+                        cctx.exchange().refreshPartitions(grpToRefresh);
+                    }
                 }
-            }
 
-            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                GridCacheContext drCacheCtx = cacheCtx.isNear() ? 
cacheCtx.near().dht().context() : cacheCtx;
+                for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                    GridCacheContext drCacheCtx = cacheCtx.isNear() ? 
cacheCtx.near().dht().context() : cacheCtx;
 
-                if (drCacheCtx.isDrEnabled()) {
-                    try {
-                        drCacheCtx.dr().onExchange(res, exchId.isLeft(), 
activateCluster());
-                    }
-                    catch (IgniteCheckedException e) {
-                        U.error(log, "Failed to notify DR: " + e, e);
+                    if (drCacheCtx.isDrEnabled()) {
+                        try {
+                            drCacheCtx.dr().onExchange(res, exchId.isLeft(), 
activateCluster());
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Failed to notify DR: " + e, e);
+                        }
                     }
                 }
-            }
 
-            if (serverNodeDiscoveryEvent() || localJoinExchange())
-                detectLostPartitions(res);
+                if (serverNodeDiscoveryEvent() || localJoinExchange())
+                    detectLostPartitions(res);
 
-            Map<Integer, CacheGroupValidation> m = 
U.newHashMap(cctx.cache().cacheGroups().size());
+                Map<Integer, CacheGroupValidation> m = 
U.newHashMap(cctx.cache().cacheGroups().size());
 
-            for (CacheGroupContext grp : cctx.cache().cacheGroups())
-                m.put(grp.groupId(), validateCacheGroup(grp, 
events().lastEvent().topologyNodes()));
+                for (CacheGroupContext grp : cctx.cache().cacheGroups())
+                    m.put(grp.groupId(), validateCacheGroup(grp, 
events().lastEvent().topologyNodes()));
 
-            grpValidRes = m;
-        }
+                grpValidRes = m;
+            }
 
-        if (!cctx.localNode().isClient())
-            tryToPerformLocalSnapshotOperation();
+            if (!cctx.localNode().isClient())
+                tryToPerformLocalSnapshotOperation();
 
-        if (err == null)
-            cctx.coordinators().onExchangeDone(events().discoveryCache());
+            if (err == null)
+                cctx.coordinators().onExchangeDone(events().discoveryCache());
 
-        // Create and destory caches and cache proxies.
-        cctx.cache().onExchangeDone(initialVersion(), exchActions, err);
+            // Create and destory caches and cache proxies.
+            cctx.cache().onExchangeDone(initialVersion(), exchActions, err);
 
-        cctx.kernalContext().authentication().onActivate();
+            cctx.kernalContext().authentication().onActivate();
 
-        Map<T2<Integer, Integer>, Long> localReserved = 
partHistSuppliers.getReservations(cctx.localNodeId());
+            Map<T2<Integer, Integer>, Long> localReserved = 
partHistSuppliers.getReservations(cctx.localNodeId());
 
-        if (localReserved != null) {
-            for (Map.Entry<T2<Integer, Integer>, Long> e : 
localReserved.entrySet()) {
-                boolean success = cctx.database().reserveHistoryForPreloading(
-                    e.getKey().get1(), e.getKey().get2(), e.getValue());
+            if (localReserved != null) {
+                for (Map.Entry<T2<Integer, Integer>, Long> e : 
localReserved.entrySet()) {
+                    boolean success = 
cctx.database().reserveHistoryForPreloading(
+                        e.getKey().get1(), e.getKey().get2(), e.getValue());
 
-                if (!success) {
-                    // TODO: how to handle?
-                    err = new IgniteCheckedException("Could not reserve 
history");
+                    if (!success) {
+                        // TODO: how to handle?
+                        err = new IgniteCheckedException("Could not reserve 
history");
+                    }
                 }
             }
-        }
 
-        cctx.database().releaseHistoryForExchange();
+            cctx.database().releaseHistoryForExchange();
 
-        if (err == null) {
-            cctx.database().rebuildIndexesIfNeeded(this);
+            if (err == null) {
+                cctx.database().rebuildIndexesIfNeeded(this);
 
-            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
-                if (!grp.isLocal())
-                    grp.topology().onExchangeDone(this, 
grp.affinity().readyAffinity(res), false);
+                for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                    if (!grp.isLocal())
+                        grp.topology().onExchangeDone(this, 
grp.affinity().readyAffinity(res), false);
+                }
+
+                if (changedAffinity())
+                    cctx.walState().changeLocalStatesOnExchangeDone(res, 
changedBaseline());
             }
+        }
+        catch (Throwable t) {
+            // In any case, this exchange future has to be completed. The 
original error should be preserved if exists.
+            if (err != null)
+                t.addSuppressed(err);
 
-            if (changedAffinity())
-                cctx.walState().changeLocalStatesOnExchangeDone(res, 
changedBaseline());
+            err = t;
         }
 
         final Throwable err0 = err;
@@ -3144,7 +3154,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
         try {
             // Reserve at least 2 threads for system operations.
-            doInParallel(
+            doInParallelUninterruptibly(
                 U.availableThreadCount(cctx.kernalContext(), 
GridIoPolicy.SYSTEM_POOL, 2),
                 cctx.kernalContext().getSystemExecutorService(),
                 cctx.cache().cacheGroups(),
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index ffbb76d..5bae576 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -254,8 +254,8 @@ import org.apache.ignite.spi.IgniteSpi;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.discovery.DiscoverySpi;
 import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
-import org.apache.ignite.transactions.TransactionAlreadyCompletedException;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.transactions.TransactionAlreadyCompletedException;
 import org.apache.ignite.transactions.TransactionDeadlockException;
 import org.apache.ignite.transactions.TransactionDuplicateKeyException;
 import org.apache.ignite.transactions.TransactionHeuristicException;
@@ -10792,7 +10792,49 @@ public abstract class IgniteUtils {
         Collection<T> srcDatas,
         IgniteThrowableFunction<T, R> operation
     ) throws IgniteCheckedException, IgniteInterruptedCheckedException {
-        if(srcDatas.isEmpty())
+        return doInParallel(parallelismLvl, executorSvc, srcDatas, operation, 
false);
+    }
+
+    /**
+     * Execute operation on data in parallel uninterruptibly.
+     *
+     * @param parallelismLvl Number of threads on which it should be executed.
+     * @param executorSvc Service for parallel execution.
+     * @param srcDatas List of data for parallelization.
+     * @param operation Logic for execution of on each item of data.
+     * @param <T> Type of data.
+     * @param <R> Type of return value.
+     * @throws IgniteCheckedException if parallel execution was failed.
+     */
+    public static <T, R> Collection<R> doInParallelUninterruptibly(
+        int parallelismLvl,
+        ExecutorService executorSvc,
+        Collection<T> srcDatas,
+        IgniteThrowableFunction<T, R> operation
+    ) throws IgniteCheckedException, IgniteInterruptedCheckedException {
+        return doInParallel(parallelismLvl, executorSvc, srcDatas, operation, 
true);
+    }
+
+    /**
+     * Execute operation on data in parallel.
+     *
+     * @param parallelismLvl Number of threads on which it should be executed.
+     * @param executorSvc Service for parallel execution.
+     * @param srcDatas List of data for parallelization.
+     * @param operation Logic for execution of on each item of data.
+     * @param <T> Type of data.
+     * @param <R> Type of return value.
+     * @param uninterruptible {@code true} if a result should be awaited in 
any case.
+     * @throws IgniteCheckedException if parallel execution was failed.
+     */
+    private static <T, R> Collection<R> doInParallel(
+        int parallelismLvl,
+        ExecutorService executorSvc,
+        Collection<T> srcDatas,
+        IgniteThrowableFunction<T, R> operation,
+        boolean uninterruptible
+    ) throws IgniteCheckedException, IgniteInterruptedCheckedException {
+        if (srcDatas.isEmpty())
             return Collections.emptyList();
 
         int[] batchSizes = calculateOptimalBatchSizes(parallelismLvl, 
srcDatas.size());
@@ -10808,7 +10850,7 @@ public abstract class IgniteUtils {
         for (int idx = 0; idx < batchSizes.length; idx++) {
             int batchSize = batchSizes[idx];
 
-            Batch<T, R> batch = new Batch<>(batchSize);
+            Batch<T, R> batch = new Batch<>(batchSize, uninterruptible);
 
             for (int i = 0; i < batchSize; i++)
                 batch.addTask(iterator.next());
@@ -10821,11 +10863,10 @@ public abstract class IgniteUtils {
             // Add to set only after check that batch is not empty.
             .peek(sharedBatchesSet::add)
             // Setup future in batch for waiting result.
-            .peek(batch -> batch.future = executorSvc.submit(() -> {
+            .peek(batch -> batch.fut = executorSvc.submit(() -> {
                 // Batch was stolen by the main stream.
-                if (!sharedBatchesSet.remove(batch)) {
+                if (!sharedBatchesSet.remove(batch))
                     return null;
-                }
 
                 Collection<R> results = new ArrayList<>(batch.tasks.size());
 
@@ -10861,7 +10902,7 @@ public abstract class IgniteUtils {
         // Final result collection.
         Collection<R> results = new ArrayList<>(srcDatas.size());
 
-        for (Batch<T, R> batch: batches) {
+        for (Batch<T, R> batch : batches) {
             try {
                 Throwable err = batch.error;
 
@@ -10957,13 +10998,19 @@ public abstract class IgniteUtils {
         private Throwable error;
 
         /** */
-        private Future<Collection<R>> future;
+        private Future<Collection<R>> fut;
+
+        /** */
+        private final boolean uninterruptible;
 
         /**
          * @param batchSize Batch size.
+         * @param uninterruptible {@code true} if a result should be awaited 
in any case.
          */
-        private Batch(int batchSize) {
-            this.tasks = new ArrayList<>(batchSize);
+        private Batch(int batchSize, boolean uninterruptible) {
+            tasks = new ArrayList<>(batchSize);
+
+            this.uninterruptible = uninterruptible;
         }
 
         /**
@@ -10977,7 +11024,7 @@ public abstract class IgniteUtils {
          * @param res Setup results for tasks.
          */
         public void result(Collection<R> res) {
-            this.result = res;
+            result = res;
         }
 
         /**
@@ -10991,9 +11038,12 @@ public abstract class IgniteUtils {
          * Get tasks results.
          */
         public Collection<R> result() throws ExecutionException, 
InterruptedException {
-            assert future != null;
+            assert fut != null;
 
-            return result != null ? result : future.get();
+            if (result != null)
+                return result;
+
+            return uninterruptible ? getUninterruptibly(fut) : fut.get();
         }
     }
 
@@ -11016,22 +11066,23 @@ public abstract class IgniteUtils {
      * @param fut Future to wait for completion.
      * @throws ExecutionException If the future
      */
-    private static void getUninterruptibly(Future fut) throws 
ExecutionException {
+    private static <R> R getUninterruptibly(Future<R> fut) throws 
ExecutionException {
         boolean interrupted = false;
 
-        while (true) {
-            try {
-                fut.get();
-
-                break;
-            }
-            catch (InterruptedException e) {
-                interrupted = true;
+        try {
+            while (true) {
+                try {
+                    return fut.get();
+                }
+                catch (InterruptedException e) {
+                    interrupted = true;
+                }
             }
         }
-
-        if (interrupted)
-            Thread.currentThread().interrupt();
+        finally {
+            if (interrupted)
+                Thread.currentThread().interrupt();
+        }
     }
 
     /**

Reply via email to