ignite-4929 Fixed issue with incorrect return value on backup for one-phase tx 
invoke (anyway old value is sent on backups on changed topology, use this value 
on backup for invoke)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a4c39707
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a4c39707
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a4c39707

Branch: refs/heads/ignite-4535
Commit: a4c397076954d10cfb200fe30060ed0d118a3fc1
Parents: a508826
Author: sboikov <sboi...@gridgain.com>
Authored: Thu Apr 20 10:26:09 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Thu Apr 20 10:26:09 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheUtils.java        |   9 -
 .../distributed/GridDistributedLockRequest.java |  14 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |  26 +--
 .../near/GridNearSingleGetRequest.java          |   4 +-
 .../cache/transactions/IgniteTxAdapter.java     |  16 +-
 .../cache/transactions/IgniteTxEntry.java       |  39 ++--
 .../IgfsMetaDirectoryListingAddProcessor.java   |   5 +-
 .../internal/TestRecordingCommunicationSpi.java |   9 +
 .../cache/IgniteOnePhaseCommitInvokeTest.java   | 213 +++++++++++++++++++
 .../IgniteCachePutRetryAbstractSelfTest.java    |  25 +--
 ...gniteCachePutRetryTransactionalSelfTest.java |   2 +-
 .../atomic/IgniteCacheAtomicProtocolTest.java   |  16 +-
 .../testframework/junits/GridAbstractTest.java  |   9 +-
 .../junits/common/GridCommonAbstractTest.java   |  79 +++++++
 .../testsuites/IgniteCacheTestSuite2.java       |   3 +
 15 files changed, 360 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a4c39707/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 51a95a6..5abb6de 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -196,15 +196,6 @@ public class GridCacheUtils {
     /** Expire time: must be calculated based on TTL value. */
     public static final long EXPIRE_TIME_CALCULATE = -1L;
 
-    /** Skip store flag bit mask. */
-    public static final int SKIP_STORE_FLAG_MASK = 0x1;
-
-    /** Keep serialized flag. */
-    public static final int KEEP_BINARY_FLAG_MASK = 0x2;
-
-    /** Flag indicating that old value for 'invoke' operation was non null on 
primary node. */
-    public static final int OLD_VAL_ON_PRIMARY = 0x4;
-
     /** Empty predicate array. */
     private static final IgnitePredicate[] EMPTY = new IgnitePredicate[0];
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4c39707/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
index b1c2c27..74f34a9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
@@ -38,9 +38,6 @@ import 
org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.jetbrains.annotations.Nullable;
 
-import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.KEEP_BINARY_FLAG_MASK;
-import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_STORE_FLAG_MASK;
-
 /**
  * Lock request message.
  */
@@ -48,6 +45,12 @@ public class GridDistributedLockRequest extends 
GridDistributedBaseMessage {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Skip store flag bit mask. */
+    private static final int SKIP_STORE_FLAG_MASK = 0x01;
+
+    /** Keep binary flag. */
+    private static final int KEEP_BINARY_FLAG_MASK = 0x02;
+
     /** Sender node ID. */
     private UUID nodeId;
 
@@ -90,10 +93,7 @@ public class GridDistributedLockRequest extends 
GridDistributedBaseMessage {
     /** Key count. */
     private int txSize;
 
-    /**
-     * Additional flags.
-     * GridCacheUtils.SKIP_STORE_FLAG_MASK - for skipStore flag value.
-     */
+    /** Additional flags. */
     private byte flags;
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4c39707/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 464df6e..e2b7803 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -67,7 +67,6 @@ import 
org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
 import 
org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
 import 
org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
-import org.apache.ignite.internal.util.F0;
 import org.apache.ignite.internal.util.GridLeanSet;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -373,8 +372,6 @@ public final class GridDhtTxPrepareFuture extends 
GridCacheCompoundFuture<Ignite
                     tx.nearOnOriginatingNode() || tx.hasInterceptor();
 
                 if (readOld) {
-                    cached.unswap(retVal);
-
                     boolean readThrough = !txEntry.skipStore() &&
                         (txEntry.op() == TRANSFORM || ((retVal || hasFilters) 
&& cacheCtx.config().isLoadPreviousValue()));
 
@@ -482,7 +479,8 @@ public final class GridDhtTxPrepareFuture extends 
GridCacheCompoundFuture<Ignite
                 }
 
                 // Send old value in case if rebalancing is not finished.
-                final boolean sndOldVal = !cacheCtx.isLocal() && 
!cacheCtx.topology().rebalanceFinished(tx.topologyVersion());
+                final boolean sndOldVal = !cacheCtx.isLocal() &&
+                    
!cacheCtx.topology().rebalanceFinished(tx.topologyVersion());
 
                 if (sndOldVal) {
                     if (oldVal == null && !readOld) {
@@ -499,11 +497,10 @@ public final class GridDhtTxPrepareFuture extends 
GridCacheCompoundFuture<Ignite
                             /*keepBinary*/true);
                     }
 
-                    if (oldVal != null) {
+                    if (oldVal != null)
                         oldVal.prepareMarshal(cacheCtx.cacheObjectContext());
 
-                        txEntry.oldValue(oldVal, true);
-                    }
+                    txEntry.oldValue(oldVal);
                 }
             }
             catch (IgniteCheckedException e) {
@@ -1532,21 +1529,6 @@ public final class GridDhtTxPrepareFuture extends 
GridCacheCompoundFuture<Ignite
     ) {
         GridDistributedTxMapping global = globalMap.get(n.id());
 
-        if (!F.isEmpty(entry.entryProcessors())) {
-            GridDhtPartitionState state = 
entry.context().topology().partitionState(n.id(),
-                entry.cached().partition());
-
-            if (state != GridDhtPartitionState.OWNING && state != 
GridDhtPartitionState.EVICTED) {
-                T2<GridCacheOperation, CacheObject> procVal = 
entry.entryProcessorCalculatedValue();
-
-                assert procVal != null : entry;
-
-                entry.op(procVal.get1());
-                entry.value(procVal.get2(), true, false);
-                entry.entryProcessors(null);
-            }
-        }
-
         if (global == null)
             globalMap.put(n.id(), global = new GridDistributedTxMapping(n));
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4c39707/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
index ab0afb1..0faa8ec 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
@@ -32,8 +32,6 @@ import 
org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.NotNull;
 
-import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_STORE_FLAG_MASK;
-
 /**
  *
  */
@@ -212,7 +210,7 @@ public class GridNearSingleGetRequest extends 
GridCacheMessage implements GridCa
      * @return Read through flag.
      */
     public boolean readThrough() {
-        return (flags & SKIP_STORE_FLAG_MASK) != 0;
+        return (flags & READ_THROUGH_FLAG_MASK) != 0;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4c39707/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index d3b39bd..5cba0cf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -83,6 +83,7 @@ import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.CRE
 import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
 import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP;
 import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.RELOAD;
+import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
 import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
 import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
@@ -1381,6 +1382,8 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
         IgniteTxEntry txEntry,
         boolean metrics,
         @Nullable GridCacheReturn ret) throws GridCacheEntryRemovedException, 
IgniteCheckedException {
+        assert txEntry.op() != TRANSFORM || 
!F.isEmpty(txEntry.entryProcessors()) : txEntry;
+
         GridCacheContext cacheCtx = txEntry.context();
 
         assert cacheCtx != null;
@@ -1404,18 +1407,25 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
 
             final boolean keepBinary = txEntry.keepBinary();
 
-            CacheObject cacheVal = txEntry.hasValue() ? txEntry.value() :
-                txEntry.cached().innerGet(
+            CacheObject cacheVal;
+
+            if (txEntry.hasValue())
+                cacheVal = txEntry.value();
+            else if (txEntry.hasOldValue())
+                cacheVal = txEntry.oldValue();
+            else {
+                cacheVal = txEntry.cached().innerGet(
                     null,
                     this,
                     /*read through*/false,
                     /*metrics*/metrics,
                     /*event*/recordEvt,
                     /*subjId*/subjId,
-                    /**closure name */recordEvt ? 
F.first(txEntry.entryProcessors()).get1() : null,
+                    /*closure name */recordEvt ? 
F.first(txEntry.entryProcessors()).get1() : null,
                     resolveTaskName(),
                     null,
                     keepBinary);
+            }
 
             boolean modified = false;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4c39707/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 1f8a107..163ed99 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -37,7 +37,6 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import 
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.GridCacheUtils;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -57,9 +56,6 @@ import org.jetbrains.annotations.Nullable;
 
 import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.READ;
 import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
-import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.KEEP_BINARY_FLAG_MASK;
-import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.OLD_VAL_ON_PRIMARY;
-import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_STORE_FLAG_MASK;
 
 /**
  * Transaction entry. Note that it is essential that this class does not 
override
@@ -83,6 +79,15 @@ public class IgniteTxEntry implements GridPeerDeployAware, 
Message {
     /** */
     public static final GridCacheVersion GET_ENTRY_INVALID_VER_AFTER_GET = new 
GridCacheVersion(0, 0, 3);
 
+    /** Skip store flag bit mask. */
+    private static final int TX_ENTRY_SKIP_STORE_FLAG_MASK = 0x01;
+
+    /** Keep binary flag. */
+    private static final int TX_ENTRY_KEEP_BINARY_FLAG_MASK = 0x02;
+
+    /** Flag indicating that old value for 'invoke' operation was non null on 
primary node. */
+    private static final int TX_ENTRY_OLD_VAL_ON_PRIMARY = 0x04;
+
     /** Prepared flag updater. */
     private static final AtomicIntegerFieldUpdater<IgniteTxEntry> PREPARED_UPD 
=
         AtomicIntegerFieldUpdater.newUpdater(IgniteTxEntry.class, "prepared");
@@ -194,13 +199,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, 
Message {
     /** Expiry policy bytes. */
     private byte[] expiryPlcBytes;
 
-    /**
-     * Additional flags:
-     * <ul>
-     * <li>{@link GridCacheUtils#SKIP_STORE_FLAG_MASK} - for skipStore flag 
value.</li>
-     * <li>{@link GridCacheUtils#KEEP_BINARY_FLAG_MASK} - for withKeepBinary 
flag.</li>
-     * </ul>
-     */
+    /** Additional flags. */
     private byte flags;
 
     /** Partition update counter. */
@@ -484,28 +483,28 @@ public class IgniteTxEntry implements 
GridPeerDeployAware, Message {
      * @param skipStore Skip store flag.
      */
     public void skipStore(boolean skipStore) {
-        setFlag(skipStore, SKIP_STORE_FLAG_MASK);
+        setFlag(skipStore, TX_ENTRY_SKIP_STORE_FLAG_MASK);
     }
 
     /**
      * @return Skip store flag.
      */
     public boolean skipStore() {
-        return isFlag(SKIP_STORE_FLAG_MASK);
+        return isFlag(TX_ENTRY_SKIP_STORE_FLAG_MASK);
     }
 
     /**
      * @param oldValOnPrimary {@code True} If old value for was non null on 
primary node.
      */
     public void oldValueOnPrimary(boolean oldValOnPrimary) {
-        setFlag(oldValOnPrimary, OLD_VAL_ON_PRIMARY);
+        setFlag(oldValOnPrimary, TX_ENTRY_OLD_VAL_ON_PRIMARY);
     }
 
     /**
      * @return {@code True} If old value for 'invoke' operation was non null 
on primary node.
      */
-    public boolean oldValueOnPrimary() {
-        return isFlag(OLD_VAL_ON_PRIMARY);
+    boolean oldValueOnPrimary() {
+        return isFlag(TX_ENTRY_OLD_VAL_ON_PRIMARY);
     }
 
     /**
@@ -514,14 +513,14 @@ public class IgniteTxEntry implements 
GridPeerDeployAware, Message {
      * @param keepBinary Keep binary flag value.
      */
     public void keepBinary(boolean keepBinary) {
-        setFlag(keepBinary, KEEP_BINARY_FLAG_MASK);
+        setFlag(keepBinary, TX_ENTRY_KEEP_BINARY_FLAG_MASK);
     }
 
     /**
      * @return Keep binary flag value.
      */
     public boolean keepBinary() {
-        return isFlag(KEEP_BINARY_FLAG_MASK);
+        return isFlag(TX_ENTRY_KEEP_BINARY_FLAG_MASK);
     }
 
     /**
@@ -588,11 +587,11 @@ public class IgniteTxEntry implements 
GridPeerDeployAware, Message {
     /**
      * @param oldVal Old value.
      */
-    public void oldValue(CacheObject oldVal, boolean hasOldVal) {
+    public void oldValue(CacheObject oldVal) {
         if (this.oldVal == null)
             this.oldVal = new TxEntryValueHolder();
 
-        this.oldVal.value(op(), oldVal, hasOldVal, hasOldVal);
+        this.oldVal.value(op(), oldVal, true, true);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4c39707/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingAddProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingAddProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingAddProcessor.java
index 380d997..2e7ecae 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingAddProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingAddProcessor.java
@@ -80,17 +80,18 @@ public final class IgfsMetaDirectoryListingAddProcessor 
implements EntryProcesso
     @Override public Void process(MutableEntry<IgniteUuid, IgfsEntryInfo> e, 
Object... args) {
         IgfsEntryInfo fileInfo = e.getValue();
 
-        assert fileInfo.isDirectory();
+        assert fileInfo != null && fileInfo.isDirectory() : fileInfo;
 
         Map<String, IgfsListingEntry> listing = new 
HashMap<>(fileInfo.listing());
 
         // Modify listing in-place.
         IgfsListingEntry oldEntry = listing.put(fileName, entry);
 
-        if (oldEntry != null && !oldEntry.fileId().equals(entry.fileId()))
+        if (oldEntry != null && !oldEntry.fileId().equals(entry.fileId())) {
             throw new IgniteException("Directory listing contains unexpected 
file" +
                 " [listing=" + listing + ", fileName=" + fileName + ", entry=" 
+ entry +
                 ", oldEntry=" + oldEntry + ']');
+        }
 
         e.setValue(fileInfo.listing(listing));
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4c39707/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
index c3d26b7..aa0cc09 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
@@ -60,6 +61,14 @@ public class TestRecordingCommunicationSpi extends 
TcpCommunicationSpi {
     /** */
     private IgnitePredicate<GridIoMessage> blockP;
 
+    /**
+     * @param node Node.
+     * @return Test SPI.
+     */
+    public static TestRecordingCommunicationSpi spi(Ignite node) {
+        return 
(TestRecordingCommunicationSpi)node.configuration().getCommunicationSpi();
+    }
+
     /** {@inheritDoc} */
     @Override public void sendMessage(ClusterNode node, Message msg, 
IgniteInClosure<IgniteException> ackC)
         throws IgniteSpiException {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4c39707/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitInvokeTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitInvokeTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitInvokeTest.java
new file mode 100644
index 0000000..601c067
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitInvokeTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.concurrent.Callable;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteOnePhaseCommitInvokeTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** */
+    private static final String CACHE_NAME = "testCache";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        TestRecordingCommunicationSpi commSpi = new 
TestRecordingCommunicationSpi();
+
+        cfg.setCommunicationSpi(commSpi);
+
+        cfg.setClientMode(client);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(CACHE_NAME);
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setBackups(1);
+        ccfg.setRebalanceMode(ASYNC);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOnePhaseInvoke() throws Exception {
+        boolean flags[] = {true, false};
+
+        for (boolean withOldVal : flags) {
+            for (boolean setVal : flags) {
+                for (boolean retPrev : flags) {
+                    onePhaseInvoke(withOldVal, setVal, retPrev);
+
+                    stopAllGrids();
+                }
+            }
+        }
+    }
+
+    /**
+     * @param withOldVal If {@code true}
+     * @param setVal Flag whether set value from entry processor.
+     * @param retPrev Flag whether entry processor should return previous 
value.
+     * @throws Exception If failed.
+     */
+    private void onePhaseInvoke(final boolean withOldVal,
+        final boolean setVal,
+        final boolean retPrev)
+        throws Exception
+    {
+        log.info("Test onePhaseInvoke [withOldVal=" + withOldVal + ", setVal=" 
+ setVal + ", retPrev=" + retPrev + ']');
+
+        Ignite srv0 = startGrid(0);
+
+        if (withOldVal)
+            srv0.cache(CACHE_NAME).put(1, 1);
+
+        client = true;
+
+        final Ignite clientNode = startGrid(1);
+
+        TestRecordingCommunicationSpi.spi(srv0).blockMessages(new 
IgnitePredicate<GridIoMessage>() {
+            @Override public boolean apply(GridIoMessage msg0) {
+                Message msg = msg0.message();
+
+                return msg instanceof GridDhtPartitionSupplyMessage &&
+                    ((GridDhtPartitionSupplyMessage)msg).cacheId() == 
CU.cacheId(CACHE_NAME);
+            }
+        });
+
+        client = false;
+
+        Ignite srv1 = startGrid(2);
+
+        
TestRecordingCommunicationSpi.spi(srv1).blockMessages(GridDhtTxPrepareResponse.class,
 srv0.name());
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new 
Callable<Void>() {
+            @Override public Void call() throws Exception {
+                Object res = clientNode.cache(CACHE_NAME).invoke(1, new 
TestEntryProcessor(setVal, retPrev));
+
+                Object expRes;
+
+                if (retPrev)
+                    expRes = withOldVal ? 1 : null;
+                else
+                    expRes = null;
+
+                assertEquals(expRes, res);
+
+                return null;
+            }
+        });
+
+        U.sleep(1000);
+
+        stopGrid(0);
+
+        fut.get();
+
+        if (!setVal)
+            checkCacheData(F.asMap(1, null), CACHE_NAME);
+        else {
+            Object expVal;
+
+            if (setVal)
+                expVal = 2;
+            else
+                expVal = withOldVal ? 1 : null;
+
+            checkCacheData(F.asMap(1, expVal), CACHE_NAME);
+        }
+
+        checkOnePhaseCommitReturnValuesCleaned(-1);
+    }
+
+    /**
+     *
+     */
+    static class TestEntryProcessor implements CacheEntryProcessor {
+        /** */
+        private final boolean setVal;
+
+        /** */
+        private final boolean retPrev;
+
+        /**
+         * @param setVal Set value flag.
+         * @param retPrev Return previous value flag.
+         */
+        TestEntryProcessor(boolean setVal, boolean retPrev) {
+            this.setVal = setVal;
+            this.retPrev = retPrev;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object process(MutableEntry e, Object... args) {
+            Object val = e.getValue();
+
+            if (setVal)
+                e.setValue(2);
+
+            return retPrev ? val : null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4c39707/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
index ca55a47..abec33c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
@@ -443,13 +443,13 @@ public abstract class IgniteCachePutRetryAbstractSelfTest 
extends GridCommonAbst
     private void checkInternalCleanup() throws Exception{
         checkNoAtomicFutures();
 
-        checkOnePhaseCommitReturnValuesCleaned();
+        checkOnePhaseCommitReturnValuesCleaned(GRID_CNT);
     }
 
     /**
      * @throws Exception If failed.
      */
-    void checkNoAtomicFutures() throws Exception {
+    private void checkNoAtomicFutures() throws Exception {
         for (int i = 0; i < GRID_CNT; i++) {
             final IgniteKernal ignite = (IgniteKernal)grid(i);
 
@@ -468,27 +468,6 @@ public abstract class IgniteCachePutRetryAbstractSelfTest 
extends GridCommonAbst
     /**
      * @throws Exception If failed.
      */
-    void checkOnePhaseCommitReturnValuesCleaned() throws Exception {
-        U.sleep(DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT);
-
-        for (int i = 0; i < GRID_CNT; i++) {
-            IgniteKernal ignite = (IgniteKernal)grid(i);
-
-            IgniteTxManager tm = ignite.context().cache().context().tm();
-
-            Map completedVersHashMap = U.field(tm, "completedVersHashMap");
-
-            for (Object o : completedVersHashMap.values()) {
-                assertTrue("completedVersHashMap contains" + o.getClass() + " 
instead of boolean. " +
-                    "These values should be replaced by boolean after 
onePhaseCommit finished. " +
-                    "[node=" + i + "]", o instanceof Boolean);
-            }
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
     public void testFailsWithNoRetries() throws Exception {
         checkFailsWithNoRetries(false);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4c39707/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
index b439bcc..8e4b3a4 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
@@ -260,7 +260,7 @@ public class IgniteCachePutRetryTransactionalSelfTest 
extends IgniteCachePutRetr
             fut2.get();
         }
 
-        checkOnePhaseCommitReturnValuesCleaned();
+        checkOnePhaseCommitReturnValuesCleaned(GRID_CNT);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4c39707/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
index dfb3f65..29d67e2 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
@@ -763,21 +763,7 @@ public class IgniteCacheAtomicProtocolTest extends 
GridCommonAbstractTest {
      * @param expData Expected cache data.
      */
     private void checkData(Map<Integer, Integer> expData) {
-        assert !expData.isEmpty();
-
-        List<Ignite> nodes = G.allGrids();
-
-        assertFalse(nodes.isEmpty());
-
-        for (Ignite node : nodes) {
-            IgniteCache<Integer, Integer> cache = node.cache(TEST_CACHE);
-
-            for (Map.Entry<Integer, Integer> e : expData.entrySet()) {
-                assertEquals("Invalid value [key=" + e.getKey() + ", node=" + 
node.name() + ']',
-                    e.getValue(),
-                    cache.get(e.getKey()));
-            }
-        }
+        checkCacheData(expData, TEST_CACHE);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4c39707/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index af623da..8a7150d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -2024,10 +2024,9 @@ public abstract class GridAbstractTest extends TestCase {
     }
 
     /**
-     *
-     * @throws IgniteInterruptedCheckedException
+     * @throws IgniteInterruptedCheckedException If interrupted.
      */
-    public void awaitTopologyChange() throws IgniteInterruptedCheckedException 
{
+    private void awaitTopologyChange() throws 
IgniteInterruptedCheckedException {
         for (Ignite g : G.allGrids()) {
             final GridKernalContext ctx = ((IgniteKernal)g).context();
 
@@ -2038,7 +2037,9 @@ public abstract class GridAbstractTest extends TestCase {
             AffinityTopologyVersion exchVer = 
ctx.cache().context().exchange().readyAffinityVersion();
 
             if (! topVer.equals(exchVer)) {
-                info("topology version mismatch: node "  + g.name() + " " + 
exchVer + ", " + topVer);
+                info("Topology version mismatch [node="  + g.name() +
+                    ", exchVer=" + exchVer +
+                    ", topVer=" + topVer + ']');
 
                 GridTestUtils.waitForCondition(new GridAbsPredicate() {
                     @Override public boolean apply() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4c39707/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index cef35e5..c76c83e 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -80,7 +80,9 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
 import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.PA;
@@ -1577,4 +1579,81 @@ public abstract class GridCommonAbstractTest extends 
GridAbstractTest {
             .setEnforceJoinOrder(qry.isEnforceJoinOrder()))
             .getAll().get(0).get(0);
     }
+
+    /**
+     * @param expData Expected cache data.
+     * @param cacheName Cache name.
+     */
+    protected final void checkCacheData(Map<?, ?> expData, String cacheName) {
+        assert !expData.isEmpty();
+
+        List<Ignite> nodes = G.allGrids();
+
+        assertFalse(nodes.isEmpty());
+
+        for (Ignite node : nodes) {
+            IgniteCache<Object, Object> cache = node.cache(cacheName);
+
+            for (Map.Entry<?, ?> e : expData.entrySet()) {
+                assertEquals("Invalid value [key=" + e.getKey() + ", node=" + 
node.name() + ']',
+                    e.getValue(),
+                    cache.get(e.getKey()));
+            }
+        }
+    }
+
+    /**
+     * @param nodesCnt Expected nodes number or {@code -1} to use all nodes.
+     * @throws Exception If failed.
+     */
+    protected final void checkOnePhaseCommitReturnValuesCleaned(final int 
nodesCnt) throws Exception {
+        final List<Ignite> nodes;
+
+        if (nodesCnt == -1) {
+            nodes = G.allGrids();
+
+            assertTrue(nodes.size() > 0);
+        }
+        else {
+            nodes = new ArrayList<>(nodesCnt);
+
+            for (int i = 0; i < nodesCnt; i++)
+                nodes.add(grid(i));
+        }
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                for (Ignite node : nodes) {
+                    Map completedVersHashMap = completedTxsMap(node);
+
+                    for (Object o : completedVersHashMap.values()) {
+                        if (!(o instanceof Boolean))
+                            return false;
+                    }
+                }
+
+                return true;
+            }
+        }, 5000);
+
+        for (Ignite node : nodes) {
+            Map completedVersHashMap = completedTxsMap(node);
+
+            for (Object o : completedVersHashMap.values()) {
+                assertTrue("completedVersHashMap contains " + 
o.getClass().getName() + " instead of boolean. " +
+                    "These values should be replaced by boolean after 
onePhaseCommit finished. " +
+                    "[node=" + node.name() + "]", o instanceof Boolean);
+            }
+        }
+    }
+
+    /**
+     * @param ignite Node.
+     * @return Completed txs map.
+     */
+    private Map completedTxsMap(Ignite ignite) {
+        IgniteTxManager tm = 
((IgniteKernal)ignite).context().cache().context().tm();
+
+        return U.field(tm, "completedVersHashMap");
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4c39707/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index 62e6b78..89e8f01 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -42,6 +42,7 @@ import 
org.apache.ignite.internal.processors.cache.IgniteCacheEntryProcessorNode
 import org.apache.ignite.internal.processors.cache.IgniteCacheIncrementTxTest;
 import 
org.apache.ignite.internal.processors.cache.IgniteCachePartitionMapUpdateTest;
 import 
org.apache.ignite.internal.processors.cache.IgniteDynamicCacheAndNodeStop;
+import 
org.apache.ignite.internal.processors.cache.IgniteOnePhaseCommitInvokeTest;
 import 
org.apache.ignite.internal.processors.cache.MemoryPolicyConfigValidationTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.CacheLoadingConcurrentGridStartSelfTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.CacheLoadingConcurrentGridStartSelfTestAllowOverwrite;
@@ -262,6 +263,8 @@ public class IgniteCacheTestSuite2 extends TestSuite {
 
         suite.addTest(new TestSuite(GridNearCacheStoreUpdateTest.class));
 
+        suite.addTest(new TestSuite(IgniteOnePhaseCommitInvokeTest.class));
+
         return suite;
     }
 }

Reply via email to