IGNITE-2265: Replaced atomics with updaters on hot paths.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cd5cd2ef Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cd5cd2ef Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cd5cd2ef Branch: refs/heads/ignite-2234 Commit: cd5cd2efe08efef42aef4f130474c563391e6fbe Parents: 8057925 Author: vozerov-gridgain <[email protected]> Authored: Sun Jan 3 22:58:06 2016 +0400 Committer: vozerov-gridgain <[email protected]> Committed: Sun Jan 3 22:58:06 2016 +0400 ---------------------------------------------------------------------- .../GridDistributedTxRemoteAdapter.java | 12 ++- .../dht/CacheDistributedGetFutureAdapter.java | 10 ++- .../distributed/dht/GridDhtTxFinishFuture.java | 15 +++- .../cache/distributed/dht/GridDhtTxLocal.java | 31 ++++--- .../distributed/dht/GridDhtTxPrepareFuture.java | 48 ++++++---- .../dht/GridPartitionedGetFuture.java | 2 +- .../distributed/near/GridNearGetFuture.java | 2 +- ...arOptimisticSerializableTxPrepareFuture.java | 94 +++++++++++--------- .../near/GridNearOptimisticTxPrepareFuture.java | 63 +++++++------ .../GridNearPessimisticTxPrepareFuture.java | 6 +- .../cache/distributed/near/GridNearTxLocal.java | 69 ++++++++------ .../near/GridNearTxPrepareFutureAdapter.java | 8 +- .../cache/local/GridLocalLockFuture.java | 19 ++-- .../cache/transactions/IgniteTxAdapter.java | 24 ++--- .../cache/transactions/IgniteTxEntry.java | 12 ++- .../transactions/IgniteTxLocalAdapter.java | 38 ++++---- .../internal/GridUpdateNotifierSelfTest.java | 2 +- 17 files changed, 284 insertions(+), 171 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 1fd0b2e..8e9d4a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -25,7 +25,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -87,6 +88,10 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter /** */ private static final long serialVersionUID = 0L; + /** Commit allowed field updater. */ + private static final AtomicIntegerFieldUpdater<GridDistributedTxRemoteAdapter> COMMIT_ALLOWED_UPD = + AtomicIntegerFieldUpdater.newUpdater(GridDistributedTxRemoteAdapter.class, "commitAllowed"); + /** Explicit versions. */ @GridToStringInclude private List<GridCacheVersion> explicitVers; @@ -96,8 +101,9 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter private boolean started; /** {@code True} only if all write entries are locked by this transaction. */ + @SuppressWarnings("UnusedDeclaration") @GridToStringInclude - private AtomicBoolean commitAllowed = new AtomicBoolean(false); + private volatile int commitAllowed; /** */ @GridToStringInclude @@ -440,7 +446,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter } // Only one thread gets to commit. - if (commitAllowed.compareAndSet(false, true)) { + if (COMMIT_ALLOWED_UPD.compareAndSet(this, 0, 1)) { IgniteCheckedException err = null; Map<IgniteTxKey, IgniteTxEntry> writeMap = txState.writeMap(); http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java index cfbc21b..c43cce9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java @@ -21,7 +21,8 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -50,6 +51,10 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoun /** Maximum number of attempts to remap key to the same primary node. */ protected static final int MAX_REMAP_CNT = getInteger(IGNITE_NEAR_GET_MAX_REMAPS, DFLT_MAX_REMAP_CNT); + /** Remap count updater. */ + protected static final AtomicIntegerFieldUpdater<CacheDistributedGetFutureAdapter> REMAP_CNT_UPD = + AtomicIntegerFieldUpdater.newUpdater(CacheDistributedGetFutureAdapter.class, "remapCnt"); + /** Context. */ protected final GridCacheContext<K, V> cctx; @@ -69,7 +74,8 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoun protected boolean trackable; /** Remap count. */ - protected AtomicInteger remapCnt = new AtomicInteger(); + @SuppressWarnings("UnusedDeclaration") + protected volatile int remapCnt; /** Subject ID. */ protected UUID subjId; http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index 9a0d778..0e5db05 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -22,6 +22,8 @@ import java.util.Collection; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; @@ -57,6 +59,10 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur /** Logger reference. */ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); + /** Error updater. */ + private static final AtomicReferenceFieldUpdater<GridDhtTxFinishFuture, Throwable> ERR_UPD = + AtomicReferenceFieldUpdater.newUpdater(GridDhtTxFinishFuture.class, Throwable.class, "err"); + /** Logger. */ private static IgniteLogger log; @@ -74,8 +80,9 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur private boolean commit; /** Error. */ + @SuppressWarnings("UnusedDeclaration") @GridToStringExclude - private AtomicReference<Throwable> err = new AtomicReference<>(null); + private volatile Throwable err; /** DHT mappings. */ private Map<UUID, GridDistributedTxMapping> dhtMap; @@ -142,7 +149,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur * @param e Error. */ public void onError(Throwable e) { - if (err.compareAndSet(null, e)) { + if (ERR_UPD.compareAndSet(this, null, e)) { boolean marked = tx.setRollbackOnly(); if (e instanceof IgniteTxRollbackCheckedException) { @@ -199,7 +206,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur if (this.tx.onePhaseCommit() && (this.tx.state() == COMMITTING)) this.tx.tmFinish(err == null); - Throwable e = this.err.get(); + Throwable e = this.err; if (e == null && commit) e = this.tx.commitError(); @@ -235,7 +242,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur * Completeness callback. */ private void onComplete() { - onDone(tx, err.get()); + onDone(tx, err); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index f344d48..e026b4e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -22,7 +22,8 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.UUID; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; @@ -83,10 +84,14 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa /** Near XID. */ private GridCacheVersion nearXidVer; + /** Future updater. */ + private static final AtomicReferenceFieldUpdater<GridDhtTxLocal, GridDhtTxPrepareFuture> PREP_FUT_UPD = + AtomicReferenceFieldUpdater.newUpdater(GridDhtTxLocal.class, GridDhtTxPrepareFuture.class, "prepFut"); + /** Future. */ + @SuppressWarnings("UnusedDeclaration") @GridToStringExclude - private final AtomicReference<GridDhtTxPrepareFuture> prepFut = - new AtomicReference<>(); + private volatile GridDhtTxPrepareFuture prepFut; /** * Empty constructor required for {@link Externalizable}. @@ -306,18 +311,18 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa } // For pessimistic mode we don't distribute prepare request. - GridDhtTxPrepareFuture fut = prepFut.get(); + GridDhtTxPrepareFuture fut = prepFut; if (fut == null) { // Future must be created before any exception can be thrown. - if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture( + if (!PREP_FUT_UPD.compareAndSet(this, null, fut = new GridDhtTxPrepareFuture( cctx, this, nearMiniId, Collections.<IgniteTxKey, GridCacheVersion>emptyMap(), true, needReturnValue()))) - return prepFut.get(); + return prepFut; } else // Prepare was called explicitly. @@ -383,20 +388,20 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa boolean last ) { // In optimistic mode prepare still can be called explicitly from salvageTx. - GridDhtTxPrepareFuture fut = prepFut.get(); + GridDhtTxPrepareFuture fut = prepFut; if (fut == null) { init(); // Future must be created before any exception can be thrown. - if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture( + if (!PREP_FUT_UPD.compareAndSet(this, null, fut = new GridDhtTxPrepareFuture( cctx, this, nearMiniId, verMap, last, needReturnValue()))) { - GridDhtTxPrepareFuture f = prepFut.get(); + GridDhtTxPrepareFuture f = prepFut; assert f.nearMiniId().equals(nearMiniId) : "Wrong near mini id on existing future " + "[futMiniId=" + f.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + f + ']'; @@ -492,7 +497,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa cctx.mvcc().addFuture(fut, fut.futureId()); - GridDhtTxPrepareFuture prep = prepFut.get(); + GridDhtTxPrepareFuture prep = prepFut; if (prep != null) { if (prep.isDone()) { @@ -571,12 +576,12 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa @Override protected void clearPrepareFuture(GridDhtTxPrepareFuture fut) { assert optimistic(); - prepFut.compareAndSet(fut, null); + PREP_FUT_UPD.compareAndSet(this, fut, null); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<IgniteInternalTx> rollbackAsync() { - GridDhtTxPrepareFuture prepFut = this.prepFut.get(); + GridDhtTxPrepareFuture prepFut = this.prepFut; final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, /*rollback*/false); @@ -687,7 +692,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Nullable @Override public IgniteInternalFuture<?> currentPrepareFuture() { - return prepFut.get(); + return prepFut; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 47dafc8..23fdbf5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -26,8 +26,9 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.cache.expiry.Duration; import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; @@ -100,6 +101,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter /** Logger reference. */ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); + /** Error updater. */ + private static final AtomicReferenceFieldUpdater<GridDhtTxPrepareFuture, Throwable> ERR_UPD = + AtomicReferenceFieldUpdater.newUpdater(GridDhtTxPrepareFuture.class, Throwable.class, "err"); + /** */ private static final IgniteReducer<IgniteInternalTx, GridNearTxPrepareResponse> REDUCER = new IgniteReducer<IgniteInternalTx, GridNearTxPrepareResponse>() { @@ -113,6 +118,14 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter } }; + /** Replied flag updater. */ + private static final AtomicIntegerFieldUpdater<GridDhtTxPrepareFuture> REPLIED_UPD = + AtomicIntegerFieldUpdater.newUpdater(GridDhtTxPrepareFuture.class, "replied"); + + /** Mapped flag updater. */ + private static final AtomicIntegerFieldUpdater<GridDhtTxPrepareFuture> MAPPED_UPD = + AtomicIntegerFieldUpdater.newUpdater(GridDhtTxPrepareFuture.class, "mapped"); + /** Logger. */ private static IgniteLogger log; @@ -133,13 +146,16 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter private Map<UUID, GridDistributedTxMapping> dhtMap; /** Error. */ - private AtomicReference<Throwable> err = new AtomicReference<>(null); + @SuppressWarnings("UnusedDeclaration") + private volatile Throwable err; /** Replied flag. */ - private AtomicBoolean replied = new AtomicBoolean(false); + @SuppressWarnings("UnusedDeclaration") + private volatile int replied; /** All replies flag. */ - private AtomicBoolean mapped = new AtomicBoolean(false); + @SuppressWarnings("UnusedDeclaration") + private volatile int mapped; /** Prepare reads. */ private Iterable<IgniteTxEntry> reads; @@ -570,9 +586,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter * * @return {@code True} if all locks are acquired, {@code false} otherwise. */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") private boolean mapIfLocked() { if (checkLocks()) { - if (!mapped.compareAndSet(false, true)) + if (!MAPPED_UPD.compareAndSet(this, 0, 1)) return false; if (forceKeysFut == null || (forceKeysFut.isDone() && forceKeysFut.error() == null)) @@ -606,7 +623,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter assert err != null || (initialized() && !hasPending()) : "On done called for prepare future that has " + "pending mini futures: " + this; - this.err.compareAndSet(null, err); + ERR_UPD.compareAndSet(this, null, err); // Must clear prepare future before response is sent or listeners are notified. if (tx.optimistic()) @@ -616,7 +633,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter if (tx.onePhaseCommit() && tx.commitOnPrepare()) { assert last; - Throwable prepErr = this.err.get(); + Throwable prepErr = this.err; // Must create prepare response before transaction is committed to grab correct return value. final GridNearTxPrepareResponse res = createPrepareResponse(prepErr); @@ -631,7 +648,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter new CIX1<IgniteInternalFuture<IgniteInternalTx>>() { @Override public void applyx(IgniteInternalFuture<IgniteInternalTx> fut) { try { - if (replied.compareAndSet(false, true)) + if (REPLIED_UPD.compareAndSet(GridDhtTxPrepareFuture.this, 0, 1)) sendPrepareResponse(res); } catch (IgniteCheckedException e) { @@ -669,7 +686,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter } else { try { - if (replied.compareAndSet(false, true)) + if (REPLIED_UPD.compareAndSet(this, 0, 1)) sendPrepareResponse(res); } catch (IgniteCheckedException e) { @@ -680,8 +697,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter return true; } else { - if (replied.compareAndSet(false, true)) { - GridNearTxPrepareResponse res = createPrepareResponse(this.err.get()); + if (REPLIED_UPD.compareAndSet(this, 0, 1)) { + GridNearTxPrepareResponse res = createPrepareResponse(this.err); try { sendPrepareResponse(res); @@ -720,7 +737,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter */ private void sendPrepareResponse(GridNearTxPrepareResponse res) throws IgniteCheckedException { if (!tx.nearNodeId().equals(cctx.localNodeId())) { - Throwable err = this.err.get(); + Throwable err = this.err; if (err != null && err instanceof IgniteFutureCancelledException) return; @@ -851,7 +868,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter if (last || tx.isSystemInvalidate()) tx.state(PREPARED); - if (super.onDone(res, err.get())) { + if (super.onDone(res, err)) { // Don't forget to clean up. cctx.mvcc().removeMvccFuture(this); @@ -1045,11 +1062,11 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter } if (err0 != null) { - err.compareAndSet(null, err0); + ERR_UPD.compareAndSet(this, null, err0); tx.rollbackAsync(); - final GridNearTxPrepareResponse res = createPrepareResponse(err.get()); + final GridNearTxPrepareResponse res = createPrepareResponse(err); onDone(res, res.error()); @@ -1467,6 +1484,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter /** * @param res Result callback. */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") void onResult(GridDhtTxPrepareResponse res) { if (res.error() != null) // Fail the whole compound future. http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index e8aaca0..19df1c2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -474,7 +474,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(node); if (keys != null && keys.containsKey(key)) { - if (remapCnt.incrementAndGet() > MAX_REMAP_CNT) { + if (REMAP_CNT_UPD.incrementAndGet(this) > MAX_REMAP_CNT) { onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " + MAX_REMAP_CNT + " attempts (key got remapped to the same node) [key=" + key + ", node=" + U.toShortString(node) + ", mappings=" + mapped + ']')); http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index a121af9..c547a88 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -577,7 +577,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(affNode); if (keys != null && keys.containsKey(key)) { - if (remapCnt.incrementAndGet() > MAX_REMAP_CNT) { + if (REMAP_CNT_UPD.incrementAndGet(this) > MAX_REMAP_CNT) { onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " + MAX_REMAP_CNT + " attempts (key got remapped to the same node) " + "[key=" + key + ", node=" + U.toShortString(affNode) + ", mappings=" + mapped + ']')); http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index 37dc564..2090e04 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -23,7 +23,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.cluster.ClusterTopologyException; @@ -118,7 +119,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim new IgniteTxOptimisticCheckedException("Failed to prepare transaction, " + "read/write conflict [key=" + key + ", cache=" + ctx.name() + ']'); - err.compareAndSet(null, err0); + ERR_UPD.compareAndSet(this, null, err0); } break; @@ -187,7 +188,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim tx.removeMapping(m.node().id()); } - err.compareAndSet(null, e); + ERR_UPD.compareAndSet(this, null, e); if (keyLockFut != null) keyLockFut.onDone(e); @@ -209,7 +210,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim return false; if (err != null) { - this.err.compareAndSet(null, err); + ERR_UPD.compareAndSet(this, null, err); if (keyLockFut != null) keyLockFut.onDone(err); @@ -263,7 +264,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim * @return {@code True} if future was finished by this call. */ private boolean onComplete() { - Throwable err0 = err.get(); + Throwable err0 = err; if (err0 == null || tx.needCheckBackup()) tx.state(PREPARED); @@ -366,7 +367,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim for (GridDistributedTxMapping m : mappings.values()) { assert !m.empty(); - add(new MiniFuture(m)); + add(new MiniFuture(this, m)); } Collection<IgniteInternalFuture<?>> futs = (Collection)futures(); @@ -410,7 +411,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim * @return {@code True} if skip future during remap. */ private boolean skipFuture(boolean remap, IgniteInternalFuture<?> fut) { - return !(isMini(fut)) || (remap && ((MiniFuture)fut).rcvRes.get()); + return !(isMini(fut)) || (remap && (((MiniFuture)fut).rcvRes == 1)); } /** @@ -630,7 +631,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim /** * */ - private class ClientRemapFuture extends GridCompoundFuture<GridNearTxPrepareResponse, Boolean> { + private static class ClientRemapFuture extends GridCompoundFuture<GridNearTxPrepareResponse, Boolean> { /** */ private boolean remap = true; @@ -660,24 +661,34 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim /** * */ - private class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> { + private static class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> { /** */ private static final long serialVersionUID = 0L; + /** Receive result flag updater. */ + private static AtomicIntegerFieldUpdater<MiniFuture> RCV_RES_UPD = + AtomicIntegerFieldUpdater.newUpdater(MiniFuture.class, "rcvRes"); + /** */ private final IgniteUuid futId = IgniteUuid.randomUuid(); + /** Parent future. */ + private final GridNearOptimisticSerializableTxPrepareFuture parent; + /** Keys. */ @GridToStringInclude private GridDistributedTxMapping m; /** Flag to signal some result being processed. */ - private AtomicBoolean rcvRes = new AtomicBoolean(false); + @SuppressWarnings("UnusedDeclaration") + private volatile int rcvRes; /** + * @param parent Parent future. * @param m Mapping. */ - MiniFuture(GridDistributedTxMapping m) { + MiniFuture(GridNearOptimisticSerializableTxPrepareFuture parent, GridDistributedTxMapping m) { + this.parent = parent; this.m = m; } @@ -706,8 +717,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim * @param e Error. */ void onResult(Throwable e) { - if (rcvRes.compareAndSet(false, true)) { - onError(m, e); + if (RCV_RES_UPD.compareAndSet(this, 0, 1)) { + parent.onError(m, e); if (log.isDebugEnabled()) log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']'); @@ -717,7 +728,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim } else U.warn(log, "Received error after another result has been processed [fut=" + - GridNearOptimisticSerializableTxPrepareFuture.this + ", mini=" + this + ']', e); + parent + ", mini=" + this + ']', e); } /** @@ -727,11 +738,11 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim if (isDone()) return; - if (rcvRes.compareAndSet(false, true)) { + if (RCV_RES_UPD.compareAndSet(this, 0, 1)) { if (log.isDebugEnabled()) log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + this); - onError(null, e); + parent.onError(null, e); onDone(e); } @@ -740,40 +751,40 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim /** * @param res Result callback. */ - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"}) void onResult(final GridNearTxPrepareResponse res) { if (isDone()) return; - if (rcvRes.compareAndSet(false, true)) { + if (RCV_RES_UPD.compareAndSet(this, 0, 1)) { if (res.error() != null) { // Fail the whole compound future. - onError(m, res.error()); + parent.onError(m, res.error()); onDone(res.error()); } else { if (res.clientRemapVersion() != null) { - assert cctx.kernalContext().clientNode(); + assert parent.cctx.kernalContext().clientNode(); assert m.clientFirst(); - tx.removeMapping(m.node().id()); + parent.tx.removeMapping(m.node().id()); ClientRemapFuture remapFut0 = null; - synchronized (GridNearOptimisticSerializableTxPrepareFuture.this) { - if (remapFut == null) { - remapFut = new ClientRemapFuture(); + synchronized (parent) { + if (parent.remapFut == null) { + parent.remapFut = new ClientRemapFuture(); - remapFut0 = remapFut; + remapFut0 = parent.remapFut; } } if (remapFut0 != null) { - Collection<IgniteInternalFuture<?>> futs = (Collection)futures(); + Collection<IgniteInternalFuture<?>> futs = (Collection)parent.futures(); for (IgniteInternalFuture<?> fut : futs) { - if (isMini(fut) && fut != this) + if (parent.isMini(fut) && fut != this) remapFut0.add((MiniFuture)fut); } @@ -783,22 +794,22 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim @Override public void apply(IgniteInternalFuture<Boolean> remapFut0) { try { IgniteInternalFuture<?> affFut = - cctx.exchange().affinityReadyFuture(res.clientRemapVersion()); + parent.cctx.exchange().affinityReadyFuture(res.clientRemapVersion()); if (affFut == null) affFut = new GridFinishedFuture<Object>(); - if (remapFut.get()) { + if (parent.remapFut.get()) { if (log.isDebugEnabled()) { log.debug("Will remap client tx [" + - "fut=" + GridNearOptimisticSerializableTxPrepareFuture.this + + "fut=" + parent + ", topVer=" + res.topologyVersion() + ']'); } - synchronized (GridNearOptimisticSerializableTxPrepareFuture.this) { - assert remapFut0 == remapFut; + synchronized (parent) { + assert remapFut0 == parent.remapFut; - remapFut = null; + parent.remapFut = null; } affFut.listen(new CI1<IgniteInternalFuture<?>>() { @@ -809,7 +820,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim remap(res); } catch (IgniteCheckedException e) { - err.compareAndSet(null, e); + ERR_UPD.compareAndSet(parent, null, e); onDone(e); } @@ -822,7 +833,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim err0.retryReadyFuture(affFut); - err.compareAndSet(null, err0); + ERR_UPD.compareAndSet(parent, null, err0); onDone(err0); } @@ -830,10 +841,10 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim catch (IgniteCheckedException e) { if (log.isDebugEnabled()) { log.debug("Prepare failed, will not remap tx: " + - GridNearOptimisticSerializableTxPrepareFuture.this); + parent); } - err.compareAndSet(null, e); + ERR_UPD.compareAndSet(parent, null, e); onDone(e); } @@ -844,10 +855,10 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim onDone(res); } else { - onPrepareResponse(m, res); + parent.onPrepareResponse(m, res); // Finish this mini future (need result only on client node). - onDone(cctx.kernalContext().clientNode() ? res : null); + onDone(parent.cctx.kernalContext().clientNode() ? res : null); } } } @@ -857,8 +868,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim * @param res Response. */ private void remap(final GridNearTxPrepareResponse res) { - prepareOnTopology(true, new Runnable() { - @Override public void run() { + parent.prepareOnTopology(true, new Runnable() { + @Override + public void run() { onDone(res); } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index a9f158a..bae0327 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -24,7 +24,8 @@ import java.util.List; import java.util.Map; import java.util.Queue; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.cluster.ClusterTopologyException; @@ -132,7 +133,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa } } - if (err.compareAndSet(null, e)) { + if (ERR_UPD.compareAndSet(this, null, e)) { boolean marked = tx.setRollbackOnly(); if (e instanceof IgniteTxRollbackCheckedException) { @@ -199,7 +200,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa if (isDone()) return false; - this.err.compareAndSet(null, err); + ERR_UPD.compareAndSet(this, null, err); return onComplete(); } @@ -218,7 +219,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa * @return {@code True} if future was finished by this call. */ private boolean onComplete() { - Throwable err0 = err.get(); + Throwable err0 = err; if (err0 == null || tx.needCheckBackup()) tx.state(PREPARED); @@ -448,7 +449,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa } } - final MiniFuture fut = new MiniFuture(m, mappings); + final MiniFuture fut = new MiniFuture(this, m, mappings); req.miniId(fut.futureId()); @@ -611,10 +612,17 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa /** * */ - private class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> { + private static class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> { /** */ private static final long serialVersionUID = 0L; + /** Receive result flag updater. */ + private static AtomicIntegerFieldUpdater<MiniFuture> RCV_RES_UPD = + AtomicIntegerFieldUpdater.newUpdater(MiniFuture.class, "rcvRes"); + + /** Parent future. */ + private final GridNearOptimisticTxPrepareFuture parent; + /** */ private final IgniteUuid futId = IgniteUuid.randomUuid(); @@ -623,19 +631,20 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa private GridDistributedTxMapping m; /** Flag to signal some result being processed. */ - private AtomicBoolean rcvRes = new AtomicBoolean(false); + @SuppressWarnings("UnusedDeclaration") + private volatile int rcvRes; /** Mappings to proceed prepare. */ private Queue<GridDistributedTxMapping> mappings; /** + * @param parent Parent. * @param m Mapping. * @param mappings Queue of mappings to proceed with. */ - MiniFuture( - GridDistributedTxMapping m, - Queue<GridDistributedTxMapping> mappings - ) { + MiniFuture(GridNearOptimisticTxPrepareFuture parent, GridDistributedTxMapping m, + Queue<GridDistributedTxMapping> mappings) { + this.parent = parent; this.m = m; this.mappings = mappings; } @@ -665,7 +674,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa * @param e Error. */ void onResult(Throwable e) { - if (rcvRes.compareAndSet(false, true)) { + if (RCV_RES_UPD.compareAndSet(this, 0, 1)) { if (log.isDebugEnabled()) log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']'); @@ -674,7 +683,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa } else U.warn(log, "Received error after another result has been processed [fut=" + - GridNearOptimisticTxPrepareFuture.this + ", mini=" + this + ']', e); + parent + ", mini=" + this + ']', e); } /** @@ -684,13 +693,13 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa if (isDone()) return; - if (rcvRes.compareAndSet(false, true)) { + if (RCV_RES_UPD.compareAndSet(this, 0, 1)) { if (log.isDebugEnabled()) log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + this); // Fail the whole future (make sure not to remap on different primary node // to prevent multiple lock coordinators). - onError(e); + parent.onError(e); } } @@ -698,21 +707,23 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa * @param nodeId Failed node ID. * @param res Result callback. */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") void onResult(UUID nodeId, final GridNearTxPrepareResponse res) { if (isDone()) return; - if (rcvRes.compareAndSet(false, true)) { + if (RCV_RES_UPD.compareAndSet(this, 0, 1)) { if (res.error() != null) { // Fail the whole compound future. - onError(res.error()); + parent.onError(res.error()); } else { if (res.clientRemapVersion() != null) { - assert cctx.kernalContext().clientNode(); + assert parent.cctx.kernalContext().clientNode(); assert m.clientFirst(); - IgniteInternalFuture<?> affFut = cctx.exchange().affinityReadyFuture(res.clientRemapVersion()); + IgniteInternalFuture<?> affFut = + parent.cctx.exchange().affinityReadyFuture(res.clientRemapVersion()); if (affFut != null && !affFut.isDone()) { affFut.listen(new CI1<IgniteInternalFuture<?>>() { @@ -730,13 +741,12 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa } else remap(); - } - else { - onPrepareResponse(m, res); + } else { + parent.onPrepareResponse(m, res); // Proceed prepare before finishing mini future. if (mappings != null) - proceedPrepare(mappings); + parent.proceedPrepare(mappings); // Finish this mini future. onDone((GridNearTxPrepareResponse)null); @@ -749,9 +759,10 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa * */ private void remap() { - prepareOnTopology(true, new Runnable() { - @Override public void run() { - onDone((GridNearTxPrepareResponse)null); + parent.prepareOnTopology(true, new Runnable() { + @Override + public void run() { + onDone((GridNearTxPrepareResponse) null); } }); } http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index ffe5373..9ee9aea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -279,9 +279,9 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA /** {@inheritDoc} */ @Override public boolean onDone(@Nullable IgniteInternalTx res, @Nullable Throwable err) { if (err != null) - this.err.compareAndSet(null, err); + ERR_UPD.compareAndSet(GridNearPessimisticTxPrepareFuture.this, null, err); - err = this.err.get(); + err = this.err; if (err == null || tx.needCheckBackup()) tx.state(PREPARED); @@ -384,7 +384,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA if (log.isDebugEnabled()) log.debug("Error on tx prepare [fut=" + this + ", err=" + e + ", tx=" + tx + ']'); - if (err.compareAndSet(null, e)) + if (ERR_UPD.compareAndSet(GridNearPessimisticTxPrepareFuture.this, null, e)) tx.setRollbackOnly(); onDone(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index ae4972e..aa4e929f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; @@ -84,20 +85,35 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** */ private static final long serialVersionUID = 0L; + /** Prepare future updater. */ + private static final AtomicReferenceFieldUpdater<GridNearTxLocal, IgniteInternalFuture> PREP_FUT_UPD = + AtomicReferenceFieldUpdater.newUpdater(GridNearTxLocal.class, IgniteInternalFuture.class, "prepFut"); + + /** Prepare future updater. */ + private static final AtomicReferenceFieldUpdater<GridNearTxLocal, GridNearTxFinishFuture> COMMIT_FUT_UPD = + AtomicReferenceFieldUpdater.newUpdater(GridNearTxLocal.class, GridNearTxFinishFuture.class, "commitFut"); + + /** Rollback future updater. */ + private static final AtomicReferenceFieldUpdater<GridNearTxLocal, GridNearTxFinishFuture> ROLLBACK_FUT_UPD = + AtomicReferenceFieldUpdater.newUpdater(GridNearTxLocal.class, GridNearTxFinishFuture.class, "rollbackFut"); + /** DHT mappings. */ private IgniteTxMappings mappings; - /** Future. */ + /** Prepare future. */ + @SuppressWarnings("UnusedDeclaration") @GridToStringExclude - private final AtomicReference<IgniteInternalFuture<?>> prepFut = new AtomicReference<>(); + private volatile IgniteInternalFuture<?> prepFut; - /** */ + /** Commit future. */ + @SuppressWarnings("UnusedDeclaration") @GridToStringExclude - private final AtomicReference<GridNearTxFinishFuture> commitFut = new AtomicReference<>(); + private volatile GridNearTxFinishFuture commitFut; - /** */ + /** Rollback future. */ + @SuppressWarnings("UnusedDeclaration") @GridToStringExclude - private final AtomicReference<GridNearTxFinishFuture> rollbackFut = new AtomicReference<>(); + private volatile GridNearTxFinishFuture rollbackFut; /** Entries to lock on next step of prepare stage. */ private Collection<IgniteTxEntry> optimisticLockEntries = Collections.emptyList(); @@ -225,7 +241,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** {@inheritDoc} */ @Override protected void clearPrepareFuture(GridDhtTxPrepareFuture fut) { - prepFut.compareAndSet(fut, null); + PREP_FUT_UPD.compareAndSet(this, fut, null); } /** {@inheritDoc} */ @@ -630,7 +646,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** {@inheritDoc} */ @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) { - GridCacheMvccFuture<IgniteInternalTx> fut = (GridCacheMvccFuture<IgniteInternalTx>)prepFut.get(); + GridCacheMvccFuture<IgniteInternalTx> fut = (GridCacheMvccFuture<IgniteInternalTx>)prepFut; return fut != null && fut.onOwnerChanged(entry, owner); } @@ -784,7 +800,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> prepareAsync() { - GridNearTxPrepareFutureAdapter fut = (GridNearTxPrepareFutureAdapter)prepFut.get(); + GridNearTxPrepareFutureAdapter fut = (GridNearTxPrepareFutureAdapter)prepFut; if (fut == null) { // Future must be created before any exception can be thrown. @@ -796,8 +812,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { else fut = new GridNearPessimisticTxPrepareFuture(cctx, this); - if (!prepFut.compareAndSet(null, fut)) - return prepFut.get(); + if (!PREP_FUT_UPD.compareAndSet(this, null, fut)) + return prepFut; } else // Prepare was called explicitly. @@ -818,18 +834,19 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { prepareAsync(); - GridNearTxFinishFuture fut = commitFut.get(); + GridNearTxFinishFuture fut = commitFut; - if (fut == null && !commitFut.compareAndSet(null, fut = new GridNearTxFinishFuture<>(cctx, this, true))) - return commitFut.get(); + if (fut == null && + !COMMIT_FUT_UPD.compareAndSet(this, null, fut = new GridNearTxFinishFuture<>(cctx, this, true))) + return commitFut; cctx.mvcc().addFuture(fut, fut.futureId()); - final IgniteInternalFuture<?> prepareFut = prepFut.get(); + final IgniteInternalFuture<?> prepareFut = prepFut; prepareFut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> f) { - GridNearTxFinishFuture fut0 = commitFut.get(); + GridNearTxFinishFuture fut0 = commitFut; try { // Make sure that here are no exceptions. @@ -838,14 +855,14 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { fut0.finish(); } catch (Error | RuntimeException e) { - commitErr.compareAndSet(null, e); + COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e); fut0.onDone(e); throw e; } catch (IgniteCheckedException e) { - commitErr.compareAndSet(null, e); + COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e); fut0.onDone(e); } @@ -860,17 +877,17 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { if (log.isDebugEnabled()) log.debug("Rolling back near tx: " + this); - GridNearTxFinishFuture fut = rollbackFut.get(); + GridNearTxFinishFuture fut = rollbackFut; if (fut != null) return fut; - if (!rollbackFut.compareAndSet(null, fut = new GridNearTxFinishFuture<>(cctx, this, false))) - return rollbackFut.get(); + if (!ROLLBACK_FUT_UPD.compareAndSet(this, null, fut = new GridNearTxFinishFuture<>(cctx, this, false))) + return rollbackFut; cctx.mvcc().addFuture(fut, fut.futureId()); - IgniteInternalFuture<?> prepFut = this.prepFut.get(); + IgniteInternalFuture<?> prepFut = this.prepFut; if (prepFut == null || prepFut.isDone()) { try { @@ -897,7 +914,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { log.debug("Got optimistic tx failure [tx=" + this + ", err=" + e + ']'); } - GridNearTxFinishFuture fut0 = rollbackFut.get(); + GridNearTxFinishFuture fut0 = rollbackFut; fut0.finish(); } @@ -997,7 +1014,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { if (pessimistic()) prepareAsync(); - IgniteInternalFuture<?> prep = prepFut.get(); + IgniteInternalFuture<?> prep = prepFut; // Do not create finish future if there are no remote nodes. if (F.isEmpty(dhtMap) && F.isEmpty(nearMap)) { @@ -1070,7 +1087,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { cctx.mvcc().addFuture(fut, fut.futureId()); - IgniteInternalFuture<?> prep = prepFut.get(); + IgniteInternalFuture<?> prep = prepFut; if (prep == null || prep.isDone()) { try { @@ -1279,7 +1296,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Nullable @Override public IgniteInternalFuture<?> currentPrepareFuture() { - return prepFut.get(); + return prepFut; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java index 52cad91..ce4d2e0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -53,6 +54,10 @@ public abstract class GridNearTxPrepareFutureAdapter extends /** Logger reference. */ protected static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); + /** Error updater. */ + protected static final AtomicReferenceFieldUpdater<GridNearTxPrepareFutureAdapter, Throwable> ERR_UPD = + AtomicReferenceFieldUpdater.newUpdater(GridNearTxPrepareFutureAdapter.class, Throwable.class, "err"); + /** */ private static final IgniteReducer<GridNearTxPrepareResponse, IgniteInternalTx> REDUCER = new IgniteReducer<GridNearTxPrepareResponse, IgniteInternalTx>() { @@ -81,7 +86,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends /** Error. */ @GridToStringExclude - protected AtomicReference<Throwable> err = new AtomicReference<>(null); + protected volatile Throwable err; /** Trackable flag. */ protected boolean trackable = true; @@ -165,6 +170,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends * @param m Mapping. * @param res Response. */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") protected final void onPrepareResponse(GridDistributedTxMapping m, GridNearTxPrepareResponse res) { if (res == null) return; http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java index 9f53c18..2e41f2a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java @@ -22,6 +22,8 @@ import java.util.Collection; import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; @@ -53,6 +55,10 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean> /** Logger reference. */ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); + /** Error updater. */ + private static final AtomicReferenceFieldUpdater<GridLocalLockFuture, Throwable> ERR_UPD = + AtomicReferenceFieldUpdater.newUpdater(GridLocalLockFuture.class, Throwable.class, "err"); + /** Logger. */ private static IgniteLogger log; @@ -79,7 +85,8 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean> private GridCacheVersion lockVer; /** Error. */ - private AtomicReference<Throwable> err = new AtomicReference<>(null); + @SuppressWarnings("UnusedDeclaration") + private volatile Throwable err; /** Timeout object. */ @GridToStringExclude @@ -274,7 +281,7 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean> * @param t Error. */ void onError(Throwable t) { - if (err.compareAndSet(null, t)) + if (ERR_UPD.compareAndSet(this, null, t)) onFailed(); } @@ -392,7 +399,7 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean> if (!success) undoLocks(); - if (onDone(success, err.get())) { + if (onDone(success, err)) { if (log.isDebugEnabled()) log.debug("Completing future: " + this); @@ -409,8 +416,10 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean> * @throws IgniteCheckedException If execution failed. */ private void checkError() throws IgniteCheckedException { - if (err.get() != null) - throw U.cast(err.get()); + Throwable err0 = err; + + if (err0 != null) + throw U.cast(err0); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 53f4f56..22e27c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; @@ -95,14 +96,17 @@ import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK; /** * Managed transaction adapter. */ -public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter - implements IgniteInternalTx, Externalizable { +public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implements IgniteInternalTx, Externalizable { /** */ private static final long serialVersionUID = 0L; /** Static logger to avoid re-creation. */ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); + /** Finalizing status updater. */ + private static final AtomicReferenceFieldUpdater<IgniteTxAdapter, FinalizationStatus> FINALIZING_UPD = + AtomicReferenceFieldUpdater.newUpdater(IgniteTxAdapter.class, FinalizationStatus.class, "finalizing"); + /** Logger. */ protected static IgniteLogger log; @@ -191,8 +195,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter /** Commit version. */ private volatile GridCacheVersion commitVer; - /** */ - private AtomicReference<FinalizationStatus> finalizing = new AtomicReference<>(FinalizationStatus.NONE); + /** Finalizing status. */ + private volatile FinalizationStatus finalizing = FinalizationStatus.NONE; /** Done marker. */ protected volatile boolean isDone; @@ -524,23 +528,23 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter switch (status) { case USER_FINISH: - res = finalizing.compareAndSet(FinalizationStatus.NONE, FinalizationStatus.USER_FINISH); + res = FINALIZING_UPD.compareAndSet(this, FinalizationStatus.NONE, FinalizationStatus.USER_FINISH); break; case RECOVERY_WAIT: - finalizing.compareAndSet(FinalizationStatus.NONE, FinalizationStatus.RECOVERY_WAIT); + FINALIZING_UPD.compareAndSet(this, FinalizationStatus.NONE, FinalizationStatus.RECOVERY_WAIT); - FinalizationStatus cur = finalizing.get(); + FinalizationStatus cur = finalizing; res = cur == FinalizationStatus.RECOVERY_WAIT || cur == FinalizationStatus.RECOVERY_FINISH; break; case RECOVERY_FINISH: - FinalizationStatus old = finalizing.get(); + FinalizationStatus old = finalizing; - res = old != FinalizationStatus.USER_FINISH && finalizing.compareAndSet(old, status); + res = old != FinalizationStatus.USER_FINISH && FINALIZING_UPD.compareAndSet(this, old, status); break; @@ -565,7 +569,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter * @return Finalization status. */ protected FinalizationStatus finalizationStatus() { - return finalizing.get(); + return finalizing; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 2c6c3df..c42bc7f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -22,7 +22,7 @@ import java.nio.ByteBuffer; import java.util.Collection; import java.util.LinkedList; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; @@ -73,6 +73,10 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { /** Dummy version for any existing entry read in SERIALIZABLE transaction. */ public static final GridCacheVersion SER_READ_NOT_EMPTY_VER = new GridCacheVersion(0, 0, 0, 1); + /** Prepared flag updater. */ + private static final AtomicIntegerFieldUpdater<IgniteTxEntry> PREPARED_UPD = + AtomicIntegerFieldUpdater.newUpdater(IgniteTxEntry.class, "prepared"); + /** Owning transaction. */ @GridToStringExclude @GridDirectTransient @@ -149,9 +153,9 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { private GridCacheContext<?, ?> ctx; /** Prepared flag to prevent multiple candidate add. */ - @SuppressWarnings({"TransientFieldNotInitialized"}) + @SuppressWarnings("UnusedDeclaration") @GridDirectTransient - private AtomicBoolean prepared = new AtomicBoolean(); + private transient volatile int prepared; /** Lock flag for collocated cache. */ @GridDirectTransient @@ -441,7 +445,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { * @return True if entry was marked prepared by this call. */ boolean markPrepared() { - return prepared.compareAndSet(false, true); + return PREPARED_UPD.compareAndSet(this, 0, 1); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 70c79a5..21ff0cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -29,8 +29,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.cache.Cache; import javax.cache.CacheException; import javax.cache.expiry.Duration; @@ -115,16 +115,24 @@ import static org.apache.ignite.transactions.TransactionState.UNKNOWN; /** * Transaction adapter for cache transactions. */ -public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter - implements IgniteTxLocalEx { +public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements IgniteTxLocalEx { /** */ private static final long serialVersionUID = 0L; + /** Commit error updater. */ + protected static final AtomicReferenceFieldUpdater<IgniteTxLocalAdapter, Throwable> COMMIT_ERR_UPD = + AtomicReferenceFieldUpdater.newUpdater(IgniteTxLocalAdapter.class, Throwable.class, "commitErr"); + + /** Done flag updater. */ + protected static final AtomicIntegerFieldUpdater<IgniteTxLocalAdapter> DONE_FLAG_UPD = + AtomicIntegerFieldUpdater.newUpdater(IgniteTxLocalAdapter.class, "doneFlag"); + /** Minimal version encountered (either explicit lock or XID of this transaction). */ protected GridCacheVersion minVer; /** Flag indicating with TM commit happened. */ - protected AtomicBoolean doneFlag = new AtomicBoolean(false); + @SuppressWarnings("UnusedDeclaration") + protected volatile int doneFlag; /** Committed versions, relative to base. */ private Collection<GridCacheVersion> committedVers = Collections.emptyList(); @@ -139,7 +147,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter private boolean sndTransformedVals; /** Commit error. */ - protected AtomicReference<Throwable> commitErr = new AtomicReference<>(); + protected volatile Throwable commitErr; /** Need return value. */ protected boolean needRetVal; @@ -248,12 +256,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter /** {@inheritDoc} */ @Override public Throwable commitError() { - return commitErr.get(); + return commitErr; } /** {@inheritDoc} */ @Override public void commitError(Throwable e) { - commitErr.compareAndSet(null, e); + COMMIT_ERR_UPD.compareAndSet(this, null, e); } /** {@inheritDoc} */ @@ -1164,7 +1172,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter U.error(log, "Heuristic transaction failure.", err); - commitErr.compareAndSet(null, err); + COMMIT_ERR_UPD.compareAndSet(this, null, err); state(UNKNOWN); @@ -1194,7 +1202,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter // Do not unlock transaction entries if one-phase commit. if (!onePhaseCommit()) { - if (doneFlag.compareAndSet(false, true)) { + if (DONE_FLAG_UPD.compareAndSet(this, 0, 1)) { // Unlock all locks. cctx.tm().commitTx(this); @@ -1215,7 +1223,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter public void tmFinish(boolean commit) { assert onePhaseCommit(); - if (doneFlag.compareAndSet(false, true)) { + if (DONE_FLAG_UPD.compareAndSet(this, 0, 1)) { // Unlock all locks. if (commit) cctx.tm().commitTx(this); @@ -1272,8 +1280,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (state != ROLLING_BACK && state != ROLLED_BACK) { setRollbackOnly(); - throw new IgniteCheckedException("Invalid transaction state for rollback [state=" + state + ", tx=" + this + ']', - commitErr.get()); + throw new IgniteCheckedException("Invalid transaction state for rollback [state=" + state + + ", tx=" + this + ']', commitErr); } if (near()) { @@ -1283,7 +1291,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter evictNearEntry(e, false); } - if (doneFlag.compareAndSet(false, true)) { + if (DONE_FLAG_UPD.compareAndSet(this, 0, 1)) { try { cctx.tm().rollbackTx(this); @@ -1302,7 +1310,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } } catch (Error | IgniteCheckedException | RuntimeException e) { - U.addLastCause(e, commitErr.get(), log); + U.addLastCause(e, commitErr, log); throw e; } http://git-wip-us.apache.org/repos/asf/ignite/blob/cd5cd2ef/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java index 93fd916..afaa645 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java @@ -37,7 +37,7 @@ public class GridUpdateNotifierSelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected long getTestTimeout() { - return 30 * 1000; + return 120 * 1000; } /** {@inheritDoc} */
