Repository: ignite
Updated Branches:
  refs/heads/master 493e0aa9e -> 395811ddc


IGNITE-10277 Fixed prepare-commit WAL ordering for one-phase commit - Fixes 
#5448.

Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/395811dd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/395811dd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/395811dd

Branch: refs/heads/master
Commit: 395811ddc11ed9d8bfb8972b110a4b7f0555f294
Parents: 493e0aa
Author: Anton Kalashnikov <kaa....@yandex.ru>
Authored: Mon Dec 3 11:02:51 2018 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Mon Dec 3 11:02:51 2018 +0300

----------------------------------------------------------------------
 .../dht/GridDhtTransactionalCacheAdapter.java   |  4 ++-
 .../cache/distributed/dht/GridDhtTxLocal.java   |  7 ++++-
 .../distributed/dht/GridDhtTxPrepareFuture.java | 19 +++++++++++---
 ...arOptimisticSerializableTxPrepareFuture.java | 11 +++++---
 .../near/GridNearOptimisticTxPrepareFuture.java |  5 ++--
 .../GridNearPessimisticTxPrepareFuture.java     |  5 ++--
 .../cache/transactions/IgniteTxAdapter.java     | 27 ++++++++++++++++++--
 .../cache/transactions/IgniteTxHandler.java     | 24 ++++++++++++++---
 8 files changed, 84 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/395811dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 4693232..dcd7be5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -1122,7 +1122,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, 
V> extends GridDhtCach
                             null,
                             req.subjectId(),
                             req.taskNameHash(),
-                            req.txLabel());
+                            req.txLabel(),
+                            null);
 
                         if (req.syncCommit())
                             tx.syncMode(FULL_SYNC);
@@ -2158,6 +2159,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, 
V> extends GridDhtCach
                     null,
                     txSubjectId,
                     txTaskNameHash,
+                    null,
                     null);
 
                 // if (req.syncCommit())

http://git-wip-us.apache.org/repos/asf/ignite/blob/395811dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 6bad093..53c4942 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -36,6 +36,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
+import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
@@ -122,6 +123,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter 
implements GridCacheMa
      * @param txSize Expected transaction size.
      * @param txNodes Transaction nodes mapping.
      * @param lb Transaction label.
+     * @param parentTx Transaction from which this transaction was copied 
by(if it was).
      */
     public GridDhtTxLocal(
         GridCacheSharedContext cctx,
@@ -146,7 +148,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter 
implements GridCacheMa
         Map<UUID, Collection<UUID>> txNodes,
         UUID subjId,
         int taskNameHash,
-        @Nullable String lb
+        @Nullable String lb,
+        GridNearTxLocal parentTx
     ) {
         super(
             cctx,
@@ -180,6 +183,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter 
implements GridCacheMa
 
         threadId = nearThreadId;
 
+        setParentTx(parentTx);
+
         assert !F.eq(xidVer, nearXidVer);
 
         initResult();

http://git-wip-us.apache.org/repos/asf/ignite/blob/395811dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 92e0ce8..58edc9d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1003,7 +1003,7 @@ public final class GridDhtTxPrepareFuture extends 
GridCacheCompoundFuture<Ignite
      * @return {@code True} if {@code done} flag was changed as a result of 
this call.
      */
     private boolean onComplete(@Nullable GridNearTxPrepareResponse res) {
-        if ((last || tx.isSystemInvalidate()) && !(tx.near() && tx.local()))
+        if (!tx.onePhaseCommit() && ((last || tx.isSystemInvalidate()) && 
!(tx.near() && tx.local())))
             tx.state(PREPARED);
 
         if (super.onDone(res, res == null ? err : null)) {
@@ -1311,8 +1311,14 @@ public final class GridDhtTxPrepareFuture extends 
GridCacheCompoundFuture<Ignite
             if (isDone())
                 return;
 
-            if (last)
+            if (last) {
+                recheckOnePhaseCommit();
+
+                if (tx.onePhaseCommit())
+                    tx.chainState(PREPARED);
+
                 sendPrepareRequests();
+            }
         }
         finally {
             markInitialized();
@@ -1320,9 +1326,9 @@ public final class GridDhtTxPrepareFuture extends 
GridCacheCompoundFuture<Ignite
     }
 
     /**
-     *
+     * Checking that one phase commit for transaction still actual.
      */
-    private void sendPrepareRequests() {
+    private void recheckOnePhaseCommit() {
         if (tx.onePhaseCommit() && !tx.nearMap().isEmpty()) {
             for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) 
{
                 if (!tx.dhtMap().containsKey(nearMapping.primary().id())) {
@@ -1332,7 +1338,12 @@ public final class GridDhtTxPrepareFuture extends 
GridCacheCompoundFuture<Ignite
                 }
             }
         }
+    }
 
+    /**
+     *
+     */
+    private void sendPrepareRequests() {
         assert !tx.txState().mvccEnabled() || !tx.onePhaseCommit() || 
tx.mvccSnapshot() != null;
 
         int miniId = 0;

http://git-wip-us.apache.org/repos/asf/ignite/blob/395811dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 2280619..46bab86 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -269,7 +269,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture 
extends GridNearOptim
     private boolean onComplete() {
         Throwable err0 = err;
 
-        if (err0 == null || tx.needCheckBackup())
+        if ((!tx.onePhaseCommit() || tx.mappings().get(cctx.localNodeId()) == 
null) &&
+            (err0 == null || tx.needCheckBackup()))
             tx.state(PREPARED);
 
         if (super.onDone(tx, err0)) {
@@ -350,9 +351,13 @@ public class GridNearOptimisticSerializableTxPrepareFuture 
extends GridNearOptim
                 hasNearCache = true;
         }
 
-        for (IgniteTxEntry read : reads)
+        for (IgniteTxEntry read : reads) {
             map(read, topVer, mappings, txMapping, remap, topLocked);
 
+            if (read.context().isNear())
+                hasNearCache = true;
+        }
+
         if (keyLockFut != null)
             keyLockFut.onAllKeysAdded();
 
@@ -575,7 +580,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture 
extends GridNearOptim
         final MiniFuture fut,
         final boolean nearEntries) {
         IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = nearEntries ?
-            cctx.tm().txHandler().prepareNearTxLocal(req) :
+            cctx.tm().txHandler().prepareNearTxLocal(tx, req) :
             cctx.tm().txHandler().prepareColocatedTx(tx, req);
 
         prepFut.listen(new 
CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/395811dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index d3639c9..140f593 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -297,7 +297,8 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearOptimisticTxPrepa
     private boolean onComplete() {
         Throwable err0 = err;
 
-        if (err0 == null || tx.needCheckBackup())
+        if ((!tx.onePhaseCommit() || tx.mappings().get(cctx.localNodeId()) == 
null) &&
+                (err0 == null || tx.needCheckBackup()))
             tx.state(PREPARED);
 
         if (super.onDone(tx, err0)) {
@@ -567,7 +568,7 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearOptimisticTxPrepa
                     assert !(m.hasColocatedCacheEntries() && 
m.hasNearCacheEntries()) : m;
 
                     IgniteInternalFuture<GridNearTxPrepareResponse> prepFut =
-                        m.hasNearCacheEntries() ? 
cctx.tm().txHandler().prepareNearTxLocal(req)
+                        m.hasNearCacheEntries() ? 
cctx.tm().txHandler().prepareNearTxLocal(tx, req)
                         : cctx.tm().txHandler().prepareColocatedTx(tx, req);
 
                     prepFut.listen(new 
CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/395811dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index a480a99..d2e1586 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -254,7 +254,7 @@ public class GridNearPessimisticTxPrepareFuture extends 
GridNearTxPrepareFutureA
         add((IgniteInternalFuture)fut);
 
         IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = nearEntries ?
-            cctx.tm().txHandler().prepareNearTxLocal(req) :
+            cctx.tm().txHandler().prepareNearTxLocal(tx, req) :
             cctx.tm().txHandler().prepareColocatedTx(tx, req);
 
         prepFut.listen(new 
CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
@@ -438,7 +438,8 @@ public class GridNearPessimisticTxPrepareFuture extends 
GridNearTxPrepareFutureA
 
         err = this.err;
 
-        if (err == null || tx.needCheckBackup())
+        if ((!tx.onePhaseCommit() || tx.mappings().get(cctx.localNodeId()) == 
null) &&
+            (err == null || tx.needCheckBackup()))
             tx.state(PREPARED);
 
         if (super.onDone(tx, err)) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/395811dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index ec1b646..c8b0459 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.processors.cache.transactions;
 
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
@@ -37,8 +39,6 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import javax.cache.expiry.ExpiryPolicy;
-import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -287,6 +287,9 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
     @GridToStringExclude
     private volatile TxCounters txCounters;
 
+    /** Transaction from which this transaction was copied by(if it was). */
+    private GridNearTxLocal parentTx;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -406,6 +409,13 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
     }
 
     /**
+     * @param parentTx Transaction from which this transaction was copied by.
+     */
+    public void setParentTx(GridNearTxLocal parentTx) {
+        this.parentTx = parentTx;
+    }
+
+    /**
      * @return Mvcc info.
      */
     @Override @Nullable public MvccSnapshot mvccSnapshot() {
@@ -1044,6 +1054,19 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
         return state(state, false);
     }
 
+    /**
+     * Changing state for this transaction as well as chained(parent) 
transactions.
+     *
+     * @param state Transaction state.
+     * @return {@code True} if transition was valid, {@code false} otherwise.
+     */
+    public boolean chainState(TransactionState state) {
+        if (parentTx != null)
+            parentTx.state(state);
+
+        return state(state);
+    }
+
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<IgniteInternalTx> finishFuture() {
         GridFutureAdapter<IgniteInternalTx> fut = finFut;

http://git-wip-us.apache.org/repos/asf/ignite/blob/395811dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index aefd54c..319073b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -342,14 +342,17 @@ public class IgniteTxHandler {
     }
 
     /**
+     * @param originTx Transaction for copy.
      * @param req Request.
      * @return Prepare future.
      */
-    public IgniteInternalFuture<GridNearTxPrepareResponse> 
prepareNearTxLocal(final GridNearTxPrepareRequest req) {
+    public IgniteInternalFuture<GridNearTxPrepareResponse>  prepareNearTxLocal(
+        final GridNearTxLocal originTx,
+        final GridNearTxPrepareRequest req) {
         // Make sure not to provide Near entries to DHT cache.
         req.cloneEntries();
 
-        return prepareNearTx(ctx.localNode(), req);
+        return prepareNearTx(originTx, ctx.localNode(), req);
     }
 
     /**
@@ -361,6 +364,20 @@ public class IgniteTxHandler {
         final ClusterNode nearNode,
         final GridNearTxPrepareRequest req
     ) {
+        return prepareNearTx(null, nearNode, req);
+    }
+
+    /**
+     * @param originTx Transaction for copy.
+     * @param nearNode Node that initiated transaction.
+     * @param req Near prepare request.
+     * @return Prepare future or {@code null} if need retry operation.
+     */
+    @Nullable private IgniteInternalFuture<GridNearTxPrepareResponse> 
prepareNearTx(
+        final GridNearTxLocal originTx,
+        final ClusterNode nearNode,
+        final GridNearTxPrepareRequest req
+    ) {
         IgniteTxEntry firstEntry;
 
         try {
@@ -509,7 +526,8 @@ public class IgniteTxHandler {
                     req.transactionNodes(),
                     req.subjectId(),
                     req.taskNameHash(),
-                    req.txLabel()
+                    req.txLabel(),
+                    originTx
                 );
 
                 tx = ctx.tm().onCreated(null, tx);

Reply via email to