IGNITE-2236: Optimized GridCompoundFuture: - Listener is "inlined" into the class; - Removed "ignoreChildFailures" field.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1c302e40 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1c302e40 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1c302e40 Branch: refs/heads/ignite-2324 Commit: 1c302e401b90928d48370757518e505383eb46bf Parents: 27c9064 Author: vozerov-gridgain <[email protected]> Authored: Wed Jan 20 17:17:01 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Wed Jan 20 17:17:01 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMvccManager.java | 43 ++- .../distributed/GridCacheTxRecoveryFuture.java | 2 +- .../dht/CacheDistributedGetFutureAdapter.java | 2 +- .../cache/distributed/dht/GridDhtGetFuture.java | 2 +- .../distributed/dht/GridDhtLockFuture.java | 2 +- .../distributed/dht/GridDhtTxFinishFuture.java | 2 +- .../colocated/GridDhtColocatedLockFuture.java | 2 +- .../distributed/near/GridNearLockFuture.java | 2 +- ...arOptimisticSerializableTxPrepareFuture.java | 47 ++- .../GridNearPessimisticTxPrepareFuture.java | 6 +- .../near/GridNearTxFinishFuture.java | 29 +- .../service/GridServiceProcessor.java | 17 +- .../util/future/GridCompoundFuture.java | 314 +++++-------------- .../util/future/GridCompoundIdentityFuture.java | 6 +- .../internal/util/future/GridFutureAdapter.java | 2 + .../ignite/testframework/GridTestUtils.java | 7 +- 16 files changed, 195 insertions(+), 290 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index dbc6992..c7d1f62 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -17,17 +17,6 @@ package org.apache.ignite.internal.processors.cache; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.events.DiscoveryEvent; @@ -63,6 +52,18 @@ import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import org.jsr166.ConcurrentLinkedDeque8; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; + import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap; @@ -982,9 +983,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { */ @SuppressWarnings("unchecked") public IgniteInternalFuture<?> finishAtomicUpdates(AffinityTopologyVersion topVer) { - GridCompoundFuture<Object, Object> res = new GridCompoundFuture<>(); - - res.ignoreChildFailures(ClusterTopologyCheckedException.class, CachePartialUpdateCheckedException.class); + GridCompoundFuture<Object, Object> res = new FinishAtomicUpdateFuture(); for (GridCacheAtomicFuture<?> fut : atomicFuts.values()) { IgniteInternalFuture<Void> complete = fut.completeFuture(topVer); @@ -1221,4 +1220,20 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { return S.toString(FinishLockFuture.class, this, super.toString()); } } + + /** + * Finish atomic update future. + */ + private static class FinishAtomicUpdateFuture extends GridCompoundFuture<Object, Object> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override protected boolean ignoreFailure(Throwable err) { + Class cls = err.getClass(); + + return ClusterTopologyCheckedException.class.isAssignableFrom(cls) || + CachePartialUpdateCheckedException.class.isAssignableFrom(cls); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java index 1648de0..5a4a1ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java @@ -90,7 +90,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea UUID failedNodeId, Map<UUID, Collection<UUID>> txNodes) { - super(cctx.kernalContext(), CU.boolReducer()); + super(CU.boolReducer()); this.cctx = cctx; this.tx = tx; http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java index 40eec63..7efaf49 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java @@ -132,7 +132,7 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoun boolean needVer, boolean keepCacheObjects ) { - super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size())); + super(CU.<K, V>mapsReducer(keys.size())); assert !F.isEmpty(keys); http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index e410228..cb8c842 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -137,7 +137,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col @Nullable IgniteCacheExpiryPolicy expiryPlc, boolean skipVals ) { - super(cctx.kernalContext(), CU.<GridCacheEntryInfo>collectionsReducer()); + super(CU.<GridCacheEntryInfo>collectionsReducer()); assert reader != null; assert !F.isEmpty(keys); http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index 1c3e052..07755e0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -194,7 +194,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> CacheEntryPredicate[] filter, boolean skipStore, boolean keepBinary) { - super(cctx.kernalContext(), CU.boolReducer()); + super(CU.boolReducer()); assert nearNodeId != null; assert nearLockVer != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index 0e5db05..8c295ce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -99,7 +99,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur * @param commit Commit flag. */ public GridDhtTxFinishFuture(GridCacheSharedContext<K, V> cctx, GridDhtTxLocalAdapter tx, boolean commit) { - super(cctx.kernalContext(), F.<IgniteInternalTx>identityReducer(tx)); + super(F.<IgniteInternalTx>identityReducer(tx)); this.cctx = cctx; this.tx = tx; http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index cfeee4b..e4c6b71 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -169,7 +169,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture CacheEntryPredicate[] filter, boolean skipStore, boolean keepBinary) { - super(cctx.kernalContext(), CU.boolReducer()); + super(CU.boolReducer()); assert keys != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 55c5ab6..5d4fc01 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -177,7 +177,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean CacheEntryPredicate[] filter, boolean skipStore, boolean keepBinary) { - super(cctx.kernalContext(), CU.boolReducer()); + super(CU.boolReducer()); assert keys != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index 2090e04..4f9f227 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -90,9 +90,11 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim super(cctx, tx); assert tx.optimistic() && tx.serializable() : tx; + } - // Should wait for all mini futures completion before finishing tx. - ignoreChildFailures(IgniteCheckedException.class); + /** {@inheritDoc} */ + @Override protected boolean ignoreFailure(Throwable err) { + return IgniteCheckedException.class.isAssignableFrom(err.getClass()); } /** {@inheritDoc} */ @@ -629,32 +631,43 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim } /** - * + * Client remap future. */ private static class ClientRemapFuture extends GridCompoundFuture<GridNearTxPrepareResponse, Boolean> { /** */ - private boolean remap = true; + private static final long serialVersionUID = 0L; /** - * + * Constructor. */ public ClientRemapFuture() { - super(); + super(new ClientRemapFutureReducer()); + } + } - reducer(new IgniteReducer<GridNearTxPrepareResponse, Boolean>() { - @Override public boolean collect(GridNearTxPrepareResponse res) { - assert res != null; + /** + * Client remap future reducer. + */ + private static class ClientRemapFutureReducer implements IgniteReducer<GridNearTxPrepareResponse, Boolean> { + /** */ + private static final long serialVersionUID = 0L; - if (res.clientRemapVersion() == null) - remap = false; + /** Remap flag. */ + private boolean remap = true; - return true; - } + /** {@inheritDoc} */ + @Override public boolean collect(@Nullable GridNearTxPrepareResponse res) { + assert res != null; - @Override public Boolean reduce() { - return remap; - } - }); + if (res.clientRemapVersion() == null) + remap = false; + + return true; + } + + /** {@inheritDoc} */ + @Override public Boolean reduce() { + return remap; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index 9ee9aea..8170008 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -63,9 +63,11 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA super(cctx, tx); assert tx.pessimistic() : tx; + } - // Should wait for all mini futures completion before finishing tx. - ignoreChildFailures(IgniteCheckedException.class); + /** {@inheritDoc} */ + @Override protected boolean ignoreFailure(Throwable err) { + return IgniteCheckedException.class.isAssignableFrom(err.getClass()); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index 26e189b..3c33bc4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -28,7 +28,6 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -107,7 +106,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu * @param commit Commit flag. */ public GridNearTxFinishFuture(GridCacheSharedContext<K, V> cctx, GridNearTxLocal tx, boolean commit) { - super(cctx.kernalContext(), F.<IgniteInternalTx>identityReducer(tx)); + super(F.<IgniteInternalTx>identityReducer(tx)); this.cctx = cctx; this.tx = tx; @@ -644,16 +643,30 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu if (f.getClass() == FinishMiniFuture.class) { FinishMiniFuture fut = (FinishMiniFuture)f; - return "FinishFuture[node=" + fut.node().id() + - ", loc=" + fut.node().isLocal() + - ", done=" + fut.isDone() + "]"; + ClusterNode node = fut.node(); + + if (node != null) { + return "FinishFuture[node=" + node.id() + + ", loc=" + node.isLocal() + + ", done=" + fut.isDone() + ']'; + } + else { + return "FinishFuture[node=null, done=" + fut.isDone() + ']'; + } } else if (f.getClass() == CheckBackupMiniFuture.class) { CheckBackupMiniFuture fut = (CheckBackupMiniFuture)f; - return "CheckBackupFuture[node=" + fut.node().id() + - ", loc=" + fut.node().isLocal() + - ", done=" + f.isDone() + "]"; + ClusterNode node = fut.node(); + + if (node != null) { + return "CheckBackupFuture[node=" + node.id() + + ", loc=" + node.isLocal() + + ", done=" + f.isDone() + "]"; + } + else { + return "CheckBackupFuture[node=null, done=" + f.isDone() + "]"; + } } else if (f.getClass() == CheckRemoteTxMiniFuture.class) { CheckRemoteTxMiniFuture fut = (CheckRemoteTxMiniFuture)f; http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 6b05edd..2841083 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -515,10 +515,10 @@ public class GridServiceProcessor extends GridProcessorAdapter { */ @SuppressWarnings("unchecked") public IgniteInternalFuture<?> cancelAll() { - Collection<IgniteInternalFuture<?>> futs = new ArrayList<>(); - Iterator<Cache.Entry<Object, Object>> it = serviceEntries(ServiceDeploymentPredicate.INSTANCE); + GridCompoundFuture res = null; + while (it.hasNext()) { Cache.Entry<Object, Object> e = it.next(); @@ -527,11 +527,20 @@ public class GridServiceProcessor extends GridProcessorAdapter { GridServiceDeployment dep = (GridServiceDeployment)e.getValue(); + if (res == null) + res = new GridCompoundFuture<>(); + // Cancel each service separately. - futs.add(cancel(dep.configuration().getName())); + res.add(cancel(dep.configuration().getName())); } - return futs.isEmpty() ? new GridFinishedFuture<>() : new GridCompoundFuture(null, futs); + if (res != null) { + res.markInitialized(); + + return res; + } + else + return new GridFinishedFuture<>(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java index 4b2461e..c382497 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java @@ -17,15 +17,11 @@ package org.apache.ignite.internal.util.future; -import java.util.ArrayList; -import java.util.Collection; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteFutureCancelledCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.F; @@ -35,55 +31,48 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteReducer; import org.jetbrains.annotations.Nullable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + /** * Future composed of multiple inner futures. */ -public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> { +public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements IgniteInClosure<IgniteInternalFuture<T>> { /** */ private static final long serialVersionUID = 0L; - /** */ - private static final int INITED = 0b1; + /** Initialization flag. */ + private static final int INIT_FLAG = 0x1; - /** */ - private static final AtomicIntegerFieldUpdater<GridCompoundFuture> flagsUpd = - AtomicIntegerFieldUpdater.newUpdater(GridCompoundFuture.class, "flags"); + /** Flags updater. */ + private static final AtomicIntegerFieldUpdater<GridCompoundFuture> FLAGS_UPD = + AtomicIntegerFieldUpdater.newUpdater(GridCompoundFuture.class, "initFlag"); - /** */ - private static final AtomicIntegerFieldUpdater<GridCompoundFuture> lsnrCallsUpd = + /** Listener calls updater. */ + private static final AtomicIntegerFieldUpdater<GridCompoundFuture> LSNR_CALLS_UPD = AtomicIntegerFieldUpdater.newUpdater(GridCompoundFuture.class, "lsnrCalls"); /** Futures. */ protected final ArrayList<IgniteInternalFuture<T>> futs = new ArrayList<>(); - /** */ - @GridToStringExclude - private final Listener lsnr = new Listener(); - /** Reducer. */ @GridToStringInclude - private IgniteReducer<T, R> rdc; - - /** Exceptions to ignore. */ - private Class<? extends Throwable>[] ignoreChildFailures; + private final IgniteReducer<T, R> rdc; - /** - * Updated via {@link #flagsUpd}. - * - * @see #INITED - */ + /** Initialization flag. Updated via {@link #FLAGS_UPD}. */ @SuppressWarnings("unused") - private volatile int flags; + private volatile int initFlag; - /** Updated via {@link #lsnrCallsUpd}. */ + /** Listener calls. Updated via {@link #LSNR_CALLS_UPD}. */ @SuppressWarnings("unused") private volatile int lsnrCalls; /** - * + * Default constructor. */ public GridCompoundFuture() { - // No-op. + this(null); } /** @@ -93,19 +82,59 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> { this.rdc = rdc; } - /** - * @param rdc Reducer to add. - * @param futs Futures to add. - */ - public GridCompoundFuture( - @Nullable IgniteReducer<T, R> rdc, - @Nullable Iterable<IgniteInternalFuture<T>> futs - ) { - this.rdc = rdc; + /** {@inheritDoc} */ + @Override public void apply(IgniteInternalFuture<T> fut) { + try { + T t = fut.get(); + + try { + if (rdc != null && !rdc.collect(t)) + onDone(rdc.reduce()); + } + catch (RuntimeException e) { + U.error(null, "Failed to execute compound future reducer: " + this, e); - addAll(futs); + // Exception in reducer is a bug, so we bypass checkComplete here. + onDone(e); + } + catch (AssertionError e) { + U.error(null, "Failed to execute compound future reducer: " + this, e); - markInitialized(); + // Bypass checkComplete because need to rethrow. + onDone(e); + + throw e; + } + } + catch (IgniteTxOptimisticCheckedException | IgniteFutureCancelledCheckedException | + ClusterTopologyCheckedException e) { + if (!ignoreFailure(e)) + onDone(e); + } + catch (IgniteCheckedException e) { + if (!ignoreFailure(e)) { + U.error(null, "Failed to execute compound future reducer: " + this, e); + + onDone(e); + } + } + catch (RuntimeException e) { + U.error(null, "Failed to execute compound future reducer: " + this, e); + + onDone(e); + } + catch (AssertionError e) { + U.error(null, "Failed to execute compound future reducer: " + this, e); + + // Bypass checkComplete because need to rethrow. + onDone(e); + + throw e; + } + + LSNR_CALLS_UPD.incrementAndGet(GridCompoundFuture.this); + + checkComplete(); } /** {@inheritDoc} */ @@ -125,43 +154,20 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> { * * @return Collection of futures. */ - private Collection<IgniteInternalFuture<T>> futures(boolean pending) { + public Collection<IgniteInternalFuture<T>> futures() { synchronized (futs) { - Collection<IgniteInternalFuture<T>> res = new ArrayList<>(futs.size()); - - for (IgniteInternalFuture<T> fut : futs) { - if (!pending || !fut.isDone()) - res.add(fut); - } - - return res; + return new ArrayList<>(futs); } } /** - * Gets collection of futures. - * - * @return Collection of futures. - */ - public Collection<IgniteInternalFuture<T>> futures() { - return futures(false); - } - - /** - * Gets pending (unfinished) futures. + * Checks if this compound future should ignore this particular exception. * - * @return Pending futures. - */ - public Collection<IgniteInternalFuture<T>> pending() { - return futures(true); - } - - /** - * @param ignoreChildFailures Flag indicating whether compound future should ignore child futures failures. + * @param err Exception to check. + * @return {@code True} if this error should be ignored. */ - @SafeVarargs - public final void ignoreChildFailures(Class<? extends Throwable>... ignoreChildFailures) { - this.ignoreChildFailures = ignoreChildFailures; + protected boolean ignoreFailure(Throwable err) { + return false; } /** @@ -187,14 +193,6 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> { } /** - * @return {@code True} if this future was initialized. Initialization happens when - * {@link #markInitialized()} method is called on future. - */ - public boolean initialized() { - return flagSet(INITED); - } - - /** * Adds a future to this compound future. * * @param fut Future to add. @@ -206,7 +204,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> { futs.add(fut); } - fut.listen(lsnr); + fut.listen(this); if (isCancelled()) { try { @@ -219,76 +217,18 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> { } /** - * Adds futures to this compound future. - * - * @param futs Futures to add. - */ - @SafeVarargs - public final void addAll(@Nullable IgniteInternalFuture<T>... futs) { - addAll(F.asList(futs)); - } - - /** - * Adds futures to this compound future. - * - * @param futs Futures to add. - */ - public void addAll(@Nullable Iterable<IgniteInternalFuture<T>> futs) { - if (futs != null) { - for (IgniteInternalFuture<T> fut : futs) - add(fut); - } - } - - /** - * Gets optional reducer. - * - * @return Optional reducer. - */ - @Nullable public IgniteReducer<T, R> reducer() { - return rdc; - } - - /** - * Sets optional reducer. - * - * @param rdc Optional reducer. - */ - public void reducer(@Nullable IgniteReducer<T, R> rdc) { - this.rdc = rdc; - } - - /** - * @param flag Flag to CAS. - * @return {@code True} if CAS succeeds. - */ - private boolean casFlag(int flag) { - for (;;) { - int flags0 = flags; - - if ((flags0 & flag) != 0) - return false; - - if (flagsUpd.compareAndSet(this, flags0, flags0 | flag)) - return true; - } - } - - /** - * @param flag Flag to check. - * @return {@code True} if set. + * @return {@code True} if this future was initialized. Initialization happens when + * {@link #markInitialized()} method is called on future. */ - private boolean flagSet(int flag) { - return (flags & flag) != 0; + public boolean initialized() { + return initFlag == INIT_FLAG; } /** * Mark this future as initialized. */ public void markInitialized() { - if (casFlag(INITED)) - // Check complete to make sure that we take care - // of all the ignored callbacks. + if (FLAGS_UPD.compareAndSet(this, 0, INIT_FLAG)) checkComplete(); } @@ -296,7 +236,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> { * Check completeness of the future. */ private void checkComplete() { - if (flagSet(INITED) && !isDone() && lsnrCalls == futuresSize()) { + if (initialized() && !isDone() && lsnrCalls == futuresSize()) { try { onDone(rdc != null ? rdc.reduce() : null); } @@ -324,26 +264,6 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> { } } - /** - * Checks if this compound future should ignore this particular exception. - * - * @param err Exception to check. - * @return {@code True} if this error should be ignored. - */ - private boolean ignoreFailure(@Nullable Throwable err) { - if (err == null) - return true; - - if (ignoreChildFailures != null) { - for (Class<? extends Throwable> ignoreCls : ignoreChildFailures) { - if (ignoreCls.isAssignableFrom(err.getClass())) - return true; - } - } - - return false; - } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridCompoundFuture.class, this, @@ -358,72 +278,4 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> { }) ); } - - /** - * Listener for futures. - */ - private class Listener implements IgniteInClosure<IgniteInternalFuture<T>> { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public void apply(IgniteInternalFuture<T> fut) { - try { - T t = fut.get(); - - try { - if (rdc != null && !rdc.collect(t)) - onDone(rdc.reduce()); - } - catch (RuntimeException e) { - U.error(null, "Failed to execute compound future reducer: " + this, e); - - // Exception in reducer is a bug, so we bypass checkComplete here. - onDone(e); - } - catch (AssertionError e) { - U.error(null, "Failed to execute compound future reducer: " + this, e); - - // Bypass checkComplete because need to rethrow. - onDone(e); - - throw e; - } - } - catch (IgniteTxOptimisticCheckedException | IgniteFutureCancelledCheckedException | - ClusterTopologyCheckedException e) { - if (!ignoreFailure(e)) - onDone(e); - } - catch (IgniteCheckedException e) { - if (!ignoreFailure(e)) { - U.error(null, "Failed to execute compound future reducer: " + this, e); - - onDone(e); - } - } - catch (RuntimeException e) { - U.error(null, "Failed to execute compound future reducer: " + this, e); - - onDone(e); - } - catch (AssertionError e) { - U.error(null, "Failed to execute compound future reducer: " + this, e); - - // Bypass checkComplete because need to rethrow. - onDone(e); - - throw e; - } - - lsnrCallsUpd.incrementAndGet(GridCompoundFuture.this); - - checkComplete(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "Compound future listener []"; - } - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundIdentityFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundIdentityFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundIdentityFuture.java index bb5abf2..4010ccd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundIdentityFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundIdentityFuture.java @@ -17,13 +17,12 @@ package org.apache.ignite.internal.util.future; -import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteReducer; import org.jetbrains.annotations.Nullable; /** - * Future composed of multiple inner futures. + * Compound future with reducer which accepts and produces results of the same type. */ public class GridCompoundIdentityFuture<T> extends GridCompoundFuture<T, T> { /** */ @@ -37,10 +36,9 @@ public class GridCompoundIdentityFuture<T> extends GridCompoundFuture<T, T> { } /** - * @param ctx Context. * @param rdc Reducer. */ - public GridCompoundIdentityFuture(GridKernalContext ctx, @Nullable IgniteReducer<T, T> rdc) { + public GridCompoundIdentityFuture(@Nullable IgniteReducer<T, T> rdc) { super(rdc); } http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java index a1720d5..c6a6a44 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.IgniteFutureCancelledCheckedException; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; @@ -72,6 +73,7 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements private boolean ignoreInterrupts; /** */ + @GridToStringExclude private IgniteInClosure<? super IgniteInternalFuture<R>> lsnr; /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java index 26a8994..c7940c6 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java @@ -648,10 +648,11 @@ public final class GridTestUtils { }); // Compound future, that adds cancel() support to execution future. - GridCompoundFuture<Long, Long> compFut = new GridCompoundFuture<>(); + GridCompoundFuture<Long, Long> compFut = new GridCompoundFuture<>(F.sumLongReducer()); + + compFut.add(cancelFut); + compFut.add(runFut); - compFut.addAll(cancelFut, runFut); - compFut.reducer(F.sumLongReducer()); compFut.markInitialized(); cancelFut.onDone();
