Repository: ignite
Updated Branches:
  refs/heads/ignite-6149 085a32190 -> c6f894817


ignite-6149


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c6f89481
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c6f89481
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c6f89481

Branch: refs/heads/ignite-6149
Commit: c6f894817ef063984cee1ea886313eecc8da3be0
Parents: 085a321
Author: sboikov <[email protected]>
Authored: Mon Sep 11 16:51:11 2017 +0300
Committer: sboikov <[email protected]>
Committed: Mon Sep 11 17:31:51 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     | 39 +++++++++-----------
 .../cache/GridCacheUpdateTxResult.java          | 22 +++++------
 .../distributed/dht/GridDhtTxFinishFuture.java  | 14 ++++++-
 .../near/GridNearTxFinishFuture.java            | 13 +++++++
 .../mvcc/CacheCoordinatorsSharedManager.java    |  4 +-
 .../transactions/IgniteTxLocalAdapter.java      | 22 ++++++++++-
 .../processors/cache/GridCacheTestEntryEx.java  | 11 +++---
 7 files changed, 80 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c6f89481/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 22754d7..db4b88b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -63,6 +63,7 @@ import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
 import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
 import org.apache.ignite.internal.processors.dr.GridDrType;
 import 
org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
+import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.IgniteTree;
 import org.apache.ignite.internal.util.lang.GridClosureException;
 import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter;
@@ -914,11 +915,11 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
     ) throws IgniteCheckedException, GridCacheEntryRemovedException {
         CacheObject old;
 
-        boolean valid = valid(tx != null ? tx.topologyVersion() : topVer);
+        final boolean valid = valid(tx != null ? tx.topologyVersion() : 
topVer);
 
         // Lock should be held by now.
         if (!cctx.isAll(this, filter))
-            return new GridCacheUpdateTxResult(false, null);
+            return new GridCacheUpdateTxResult(false);
 
         final GridCacheVersion newVer;
 
@@ -931,6 +932,8 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
         ensureFreeSpace();
 
+        GridLongList mvccWaitTxs = null;
+
         synchronized (this) {
             checkObsolete();
 
@@ -939,7 +942,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
                 // It is possible that 'get' could load more recent value.
                 if (!((GridNearCacheEntry)this).recordDhtVersion(dhtVer))
-                    return new GridCacheUpdateTxResult(false, null);
+                    return new GridCacheUpdateTxResult(false);
             }
 
             assert tx == null || (!tx.local() && tx.onePhaseCommit()) || 
tx.ownsLock(this) :
@@ -975,7 +978,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                 key0 = e.key();
 
                 if (interceptorVal == null)
-                    return new GridCacheUpdateTxResult(false, 
(CacheObject)cctx.unwrapTemporary(old));
+                    return new GridCacheUpdateTxResult(false);
                 else if (interceptorVal != val0)
                     val0 = cctx.unwrapTemporary(interceptorVal);
 
@@ -1010,7 +1013,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             if (cctx.mvccEnabled()) {
                 assert mvccVer != null;
 
-                cctx.offheap().mvccUpdate(this, val, newVer, mvccVer);
+                mvccWaitTxs = cctx.offheap().mvccUpdate(this, val, newVer, 
mvccVer);
             }
             else
                 storeValue(val, expireTime, newVer, null);
@@ -1080,8 +1083,8 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         if (intercept)
             cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, 
key, key0, val, val0, keepBinary, updateCntr0));
 
-        return valid ? new GridCacheUpdateTxResult(true, retval ? old : null, 
updateCntr0) :
-            new GridCacheUpdateTxResult(false, null);
+        return valid ? new GridCacheUpdateTxResult(true, updateCntr0, 
mvccWaitTxs) :
+            new GridCacheUpdateTxResult(false);
     }
 
     /**
@@ -1119,11 +1122,11 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
         GridCacheVersion newVer;
 
-        boolean valid = valid(tx != null ? tx.topologyVersion() : topVer);
+        final boolean valid = valid(tx != null ? tx.topologyVersion() : 
topVer);
 
         // Lock should be held by now.
         if (!cctx.isAll(this, filter))
-            return new GridCacheUpdateTxResult(false, null);
+            return new GridCacheUpdateTxResult(false);
 
         GridCacheVersion obsoleteVer = null;
 
@@ -1147,7 +1150,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
                 // It is possible that 'get' could load more recent value.
                 if (!((GridNearCacheEntry)this).recordDhtVersion(dhtVer))
-                    return new GridCacheUpdateTxResult(false, null);
+                    return new GridCacheUpdateTxResult(false);
             }
 
             assert tx == null || (!tx.local() && tx.onePhaseCommit()) || 
tx.ownsLock(this) :
@@ -1175,7 +1178,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                 if (cctx.cancelRemove(interceptRes)) {
                     CacheObject ret = 
cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2()));
 
-                    return new GridCacheUpdateTxResult(false, ret);
+                    return new GridCacheUpdateTxResult(false);
                 }
             }
 
@@ -1289,18 +1292,10 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         if (intercept)
             cctx.config().getInterceptor().onAfterRemove(entry0);
 
-        if (valid) {
-            CacheObject ret;
-
-            if (interceptRes != null)
-                ret = 
cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2()));
-            else
-                ret = old;
-
-            return new GridCacheUpdateTxResult(true, ret, updateCntr0);
-        }
+        if (valid)
+            return new GridCacheUpdateTxResult(true, updateCntr0, null);
         else
-            return new GridCacheUpdateTxResult(false, null);
+            return new GridCacheUpdateTxResult(false);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/c6f89481/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
index 461baa7..951d02c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
 
@@ -28,34 +28,30 @@ public class GridCacheUpdateTxResult {
     /** Success flag.*/
     private final boolean success;
 
-    /** Old value. */
-    @GridToStringInclude
-    private final CacheObject oldVal;
-
     /** Partition idx. */
     private long updateCntr;
 
+    /** */
+    private GridLongList mvccWaitTxs;
+
     /**
      * Constructor.
      *
      * @param success Success flag.
-     * @param oldVal Old value (if any),
      */
-    GridCacheUpdateTxResult(boolean success, @Nullable CacheObject oldVal) {
+    GridCacheUpdateTxResult(boolean success) {
         this.success = success;
-        this.oldVal = oldVal;
     }
 
     /**
      * Constructor.
      *
      * @param success Success flag.
-     * @param oldVal Old value (if any),
      */
-    GridCacheUpdateTxResult(boolean success, @Nullable CacheObject oldVal, 
long updateCntr) {
+    GridCacheUpdateTxResult(boolean success, long updateCntr, @Nullable 
GridLongList mvccWaitTxs) {
         this.success = success;
-        this.oldVal = oldVal;
         this.updateCntr = updateCntr;
+        this.mvccWaitTxs = mvccWaitTxs;
     }
 
     /**
@@ -75,8 +71,8 @@ public class GridCacheUpdateTxResult {
     /**
      * @return Old value.
      */
-    @Nullable public CacheObject oldValue() {
-        return oldVal;
+    @Nullable public GridLongList mvccWaitTransactions() {
+        return mvccWaitTxs;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/c6f89481/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 55078cd..6858c82 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -35,10 +35,10 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFutu
 import org.apache.ignite.internal.processors.cache.GridCacheFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
-import 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -292,6 +292,18 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCacheCompoundIdentity
             // No backup or near nodes to send commit message to (just 
complete then).
             sync = false;
 
+        GridLongList waitTxs = tx.mvccWaitTransactions();
+
+        if (waitTxs != null) {
+            ClusterNode crd = 
cctx.coordinators().coordinator(tx.topologyVersion());
+
+            assert crd != null;
+
+            IgniteInternalFuture fut = cctx.coordinators().waitTxsFuture(crd, 
waitTxs);
+
+            add(fut);
+        }
+
         markInitialized();
 
         if (!sync)

http://git-wip-us.apache.org/repos/asf/ignite/blob/c6f89481/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 7a90ec4..e57976b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -47,6 +47,7 @@ import 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import 
org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
 import 
org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
+import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.C1;
@@ -414,6 +415,18 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentit
 
         try {
             if (tx.localFinish(commit) || (!commit && tx.state() == UNKNOWN)) {
+                GridLongList waitTxs = tx.mvccWaitTransactions();
+
+                if (waitTxs != null) {
+                    ClusterNode crd = 
cctx.coordinators().coordinator(tx.topologyVersion());
+
+                    assert crd != null;
+
+                    IgniteInternalFuture fut = 
cctx.coordinators().waitTxsFuture(crd, waitTxs);
+
+                    add(fut);
+                }
+
                 if ((tx.onePhaseCommit() && needFinishOnePhase(commit)) || 
(!tx.onePhaseCommit() && mappings != null)) {
                     if (mappings.single()) {
                         GridDistributedTxMapping mapping = 
mappings.singleMapping();

http://git-wip-us.apache.org/repos/asf/ignite/blob/c6f89481/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
index 7034aca..d19af59 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
@@ -388,7 +388,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
      * @param nodeId Sender node ID.
      * @param msg Message.
      */
-    private void processCoordinatorTxAckResponse(UUID nodeId, 
CoordinatorFutureResponse msg) {
+    private void processCoordinatorAckResponse(UUID nodeId, 
CoordinatorFutureResponse msg) {
         WaitAckFuture fut = ackFuts.remove(msg.futureId());
 
         if (fut != null)
@@ -706,7 +706,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
             else if (msg instanceof CoordinatorTxAckRequest)
                 processCoordinatorTxAckRequest(nodeId, 
(CoordinatorTxAckRequest)msg);
             else if (msg instanceof CoordinatorFutureResponse)
-                processCoordinatorTxAckResponse(nodeId, 
(CoordinatorFutureResponse)msg);
+                processCoordinatorAckResponse(nodeId, 
(CoordinatorFutureResponse)msg);
             else if (msg instanceof CoordinatorQueryAckRequest)
                 
processCoordinatorQueryAckRequest((CoordinatorQueryAckRequest)msg);
             else if (msg instanceof CoordinatorQueryVersionRequest)

http://git-wip-us.apache.org/repos/asf/ignite/blob/c6f89481/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index f785e2b..1b386d8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -61,6 +61,7 @@ import org.apache.ignite.internal.processors.dr.GridDrType;
 import 
org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
 import 
org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
+import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.lang.GridClosureException;
 import org.apache.ignite.internal.util.lang.GridTuple;
@@ -148,6 +149,9 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter implements Ig
     /** */
     protected CacheWriteSynchronizationMode syncMode;
 
+    /** */
+    private GridLongList mvccWaitTxs;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -208,6 +212,10 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter implements Ig
         txState = implicitSingle ? new IgniteTxImplicitSingleStateImpl() : new 
IgniteTxStateImpl();
     }
 
+    public GridLongList mvccWaitTransactions() {
+        return mvccWaitTxs;
+    }
+
     /**
      * @return Transaction write synchronization mode.
      */
@@ -472,7 +480,7 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter implements Ig
 
     /** {@inheritDoc} */
     @SuppressWarnings({"CatchGenericClass"})
-    @Override public void userCommit() throws IgniteCheckedException {
+    @Override public final void userCommit() throws IgniteCheckedException {
         TransactionState state = state();
 
         if (state != COMMITTING) {
@@ -689,9 +697,19 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter implements Ig
                                             null,
                                             mvccVer);
 
-                                        if (updRes.success())
+                                        if (updRes.success()) {
                                             
txEntry.updateCounter(updRes.updatePartitionCounter());
 
+                                            GridLongList waitTxs = 
updRes.mvccWaitTransactions();
+
+                                            if (waitTxs != null) {
+                                                if (this.mvccWaitTxs == null)
+                                                    this.mvccWaitTxs = waitTxs;
+                                                else
+                                                    
this.mvccWaitTxs.addAll(waitTxs);
+                                            }
+                                        }
+
                                         if (nearCached != null && 
updRes.success()) {
                                             nearCached.innerSet(
                                                 null,

http://git-wip-us.apache.org/repos/asf/ignite/blob/c6f89481/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 3afbb35..f5309e5 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -463,8 +463,11 @@ public class GridCacheTestEntryEx extends 
GridMetadataAwareAdapter implements Gr
         @Nullable Long updateCntr,
         MvccCoordinatorVersion mvccVer
     )
-        throws IgniteCheckedException, GridCacheEntryRemovedException {
-        return new GridCacheUpdateTxResult(true, rawPut(val, ttl));
+        throws IgniteCheckedException, GridCacheEntryRemovedException
+    {
+        rawPut(val, ttl);
+
+        return new GridCacheUpdateTxResult(true);
     }
 
     /** {@inheritDoc} */
@@ -547,11 +550,9 @@ public class GridCacheTestEntryEx extends 
GridMetadataAwareAdapter implements Gr
         ) throws IgniteCheckedException, GridCacheEntryRemovedException {
         obsoleteVer = ver;
 
-        CacheObject old = val;
-
         val = null;
 
-        return new GridCacheUpdateTxResult(true, old);
+        return new GridCacheUpdateTxResult(true);
     }
 
     /** @inheritDoc */

Reply via email to