Repository: ignite
Updated Branches:
  refs/heads/ignite-6083 [created] cbf65cb44


IGNITE-6083 Null value has appeared in entry processor, but entry exists


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/038249d7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/038249d7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/038249d7

Branch: refs/heads/ignite-6083
Commit: 038249d718b44a602efa5f8fdae39c2757c09f99
Parents: 9d9d8a8
Author: voipp <[email protected]>
Authored: Tue Feb 27 13:57:51 2018 +0300
Committer: voipp <[email protected]>
Committed: Tue Mar 27 18:29:56 2018 +0300

----------------------------------------------------------------------
 .../cache/distributed/near/GridNearTxLocal.java |   3 +
 .../transactions/IgniteTxLocalAdapter.java      |   2 +
 ...teCacheEntryProcessorSequentialCallTest.java | 311 +++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite.java |   2 +
 4 files changed, 318 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/038249d7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index a3fddaf..4d9bfe3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -1271,6 +1271,9 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements GridTimeou
                         keepBinary,
                         CU.isNearEnabled(cacheCtx));
 
+                    if (op == TRANSFORM && txEntry.value() == null && old != 
null)
+                        txEntry.value(cacheCtx.toCacheObject(old), false, 
false);
+
                     if (enlisted != null)
                         enlisted.add(cacheKey);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/038249d7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 8dec59a..8613600 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1112,6 +1112,8 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter implements Ig
 
                         if (txEntry.op() == TRANSFORM) {
                             if (computeInvoke) {
+                                txEntry.readValue(v);
+
                                 GridCacheVersion ver;
 
                                 try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/038249d7/modules/core/src/test/java/org/apache/ignite/cache/IgniteCacheEntryProcessorSequentialCallTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/cache/IgniteCacheEntryProcessorSequentialCallTest.java
 
b/modules/core/src/test/java/org/apache/ignite/cache/IgniteCacheEntryProcessorSequentialCallTest.java
new file mode 100644
index 0000000..592449d
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/cache/IgniteCacheEntryProcessorSequentialCallTest.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.transactions.TransactionOptimisticException;
+
+/**
+ */
+public class IgniteCacheEntryProcessorSequentialCallTest extends 
GridCommonAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrids(2);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        CacheConfiguration cacheCfg = new CacheConfiguration("cache");
+
+        cacheCfg.setCacheMode(CacheMode.PARTITIONED);
+        cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+        cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+        
cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        cacheCfg.setMaxConcurrentAsyncOperations(0);
+        cacheCfg.setBackups(0);
+
+        cfg.setCacheConfiguration(cacheCfg);
+
+        return cfg;
+    }
+
+    /**
+     *
+     */
+    public void testOptimisticSerializableTxInvokeSequentialCall() throws 
Exception {
+        
transactionInvokeSequentialCallOnPrimaryNode(TransactionConcurrency.OPTIMISTIC, 
TransactionIsolation.SERIALIZABLE);
+
+        
transactionInvokeSequentialCallOnNearNode(TransactionConcurrency.OPTIMISTIC, 
TransactionIsolation.SERIALIZABLE);
+    }
+
+    /**
+     *
+     */
+    public void testOptimisticRepeatableReadTxInvokeSequentialCall() throws 
Exception {
+        
transactionInvokeSequentialCallOnPrimaryNode(TransactionConcurrency.OPTIMISTIC, 
TransactionIsolation.REPEATABLE_READ);
+
+        
transactionInvokeSequentialCallOnNearNode(TransactionConcurrency.OPTIMISTIC, 
TransactionIsolation.REPEATABLE_READ);
+    }
+
+    /**
+     *
+     */
+    public void testOptimisticReadCommittedTxInvokeSequentialCall() throws 
Exception {
+        
transactionInvokeSequentialCallOnPrimaryNode(TransactionConcurrency.OPTIMISTIC, 
TransactionIsolation.READ_COMMITTED);
+
+        
transactionInvokeSequentialCallOnNearNode(TransactionConcurrency.OPTIMISTIC, 
TransactionIsolation.READ_COMMITTED);
+    }
+
+    /**
+     *
+     */
+    public void testPessimisticSerializableTxInvokeSequentialCall() throws 
Exception {
+        
transactionInvokeSequentialCallOnPrimaryNode(TransactionConcurrency.PESSIMISTIC,
 TransactionIsolation.SERIALIZABLE);
+
+        
transactionInvokeSequentialCallOnNearNode(TransactionConcurrency.PESSIMISTIC, 
TransactionIsolation.SERIALIZABLE);
+    }
+
+    /**
+     *
+     */
+    public void testPessimisticRepeatableReadTxInvokeSequentialCall() throws 
Exception {
+        
transactionInvokeSequentialCallOnPrimaryNode(TransactionConcurrency.PESSIMISTIC,
 TransactionIsolation.REPEATABLE_READ);
+
+        
transactionInvokeSequentialCallOnNearNode(TransactionConcurrency.PESSIMISTIC, 
TransactionIsolation.REPEATABLE_READ);
+    }
+
+    /**
+     *
+     */
+    public void testPessimisticReadCommittedTxInvokeSequentialCall() throws 
Exception {
+        
transactionInvokeSequentialCallOnPrimaryNode(TransactionConcurrency.PESSIMISTIC,
 TransactionIsolation.READ_COMMITTED);
+
+        
transactionInvokeSequentialCallOnNearNode(TransactionConcurrency.PESSIMISTIC, 
TransactionIsolation.READ_COMMITTED);
+    }
+
+    /**
+     * Test for sequential entry processor invoking not null value on primary 
cache.
+     * In this test entry processor gets value from local node.
+     *
+     * @param transactionConcurrency Transaction concurrency.
+     * @param transactionIsolation Transaction isolation.
+     */
+    public void 
transactionInvokeSequentialCallOnPrimaryNode(TransactionConcurrency 
transactionConcurrency,
+        TransactionIsolation transactionIsolation) throws Exception {
+        TestKey key = new TestKey(1L);
+        TestValue val = new TestValue();
+        val.value("1");
+
+        Ignite primaryIgnite;
+
+        if 
(ignite(0).affinity("cache").isPrimary(ignite(0).cluster().localNode(), key))
+            primaryIgnite = ignite(0);
+        else
+            primaryIgnite = ignite(1);
+
+        IgniteCache<TestKey, TestValue> cache = primaryIgnite.cache("cache");
+
+        cache.put(key, val);
+
+        NotNullCacheEntryProcessor cacheEntryProcessor = new 
NotNullCacheEntryProcessor();
+
+        try (Transaction transaction = 
primaryIgnite.transactions().txStart(transactionConcurrency,
+            transactionIsolation)) {
+
+            cache.invoke(key, cacheEntryProcessor);
+            cache.invoke(key, cacheEntryProcessor);
+
+            transaction.commit();
+        }
+
+        cache.remove(key);
+    }
+
+    /**
+     * Test for sequential entry processor invoking not null value on near 
cache.
+     * In this test entry processor fetches value from remote node.
+     *
+     * @param transactionConcurrency Transaction concurrency.
+     * @param transactionIsolation Transaction isolation.
+     */
+    public void 
transactionInvokeSequentialCallOnNearNode(TransactionConcurrency 
transactionConcurrency,
+        TransactionIsolation transactionIsolation) throws Exception {
+        TestKey key = new TestKey(1L);
+        TestValue val = new TestValue();
+        val.value("1");
+
+        Ignite nearIgnite;
+        Ignite primaryIgnite;
+
+        if 
(ignite(0).affinity("cache").isPrimary(ignite(0).cluster().localNode(), key)) {
+            primaryIgnite = ignite(0);
+
+            nearIgnite = ignite(1);
+        }
+        else {
+            primaryIgnite = ignite(1);
+
+            nearIgnite = ignite(0);
+        }
+
+        primaryIgnite.cache("cache").put(key, val);
+
+        IgniteCache<TestKey, TestValue> nearCache = nearIgnite.cache("cache");
+
+        NotNullCacheEntryProcessor cacheEntryProcessor = new 
NotNullCacheEntryProcessor();
+
+        try (Transaction transaction = 
nearIgnite.transactions().txStart(transactionConcurrency,
+            transactionIsolation)) {
+
+            nearCache.invoke(key, cacheEntryProcessor);
+            nearCache.invoke(key, cacheEntryProcessor);
+
+            transaction.commit();
+        }
+
+        primaryIgnite.cache("cache").remove(key);
+    }
+
+    /**
+     * Test for sequential entry processor invocation. During transaction 
value is changed externally, which leads to
+     * optimistic conflict exception.
+     */
+    public void testTxInvokeSequentialOptimisticConflict() throws Exception {
+        TestKey key = new TestKey(1L);
+
+        IgniteCache<TestKey, TestValue> cache = ignite(0).cache("cache");
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        cache.put(key, new TestValue("1"));
+
+        multithreadedAsync(new Runnable() {
+            @Override public void run() {
+                try {
+                    latch.await();
+                }
+                catch (InterruptedException e) {
+                    fail();
+                }
+
+                cache.put(key, new TestValue("2"));
+            }
+        }, 1);
+
+        Transaction tx = 
ignite(0).transactions().txStart(TransactionConcurrency.OPTIMISTIC, 
TransactionIsolation.SERIALIZABLE);
+
+        cache.invoke(key, new NotNullCacheEntryProcessor());
+
+        latch.countDown();
+
+        Thread.sleep(1_000);
+
+        cache.invoke(key, new NotNullCacheEntryProcessor());
+
+        GridTestUtils.assertThrowsWithCause(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                tx.commit();
+
+                return null;
+            }
+        }, TransactionOptimisticException.class);
+
+        cache.remove(key);
+    }
+
+    /**
+     * Cache entry processor checking whether entry has got non-null value.
+     */
+    public static class NotNullCacheEntryProcessor implements 
CacheEntryProcessor<TestKey, TestValue, Object> {
+        /** {@inheritDoc} */
+        public Object process(MutableEntry entry, Object... arguments) throws 
EntryProcessorException {
+            assertNotNull(entry.getValue());
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    public static class TestKey {
+        /** Value. */
+        private final Long val;
+
+        /**
+         * @param val Value.
+         */
+        public TestKey(Long val) {
+            this.val = val;
+        }
+    }
+
+    /**
+     *
+     */
+    public static class TestValue {
+        /** Value. */
+        private String val;
+
+        /**
+         * Default constructor.
+         */
+        public TestValue() {
+        }
+
+        /**
+         * @param val Value.
+         */
+        public TestValue(String val) {
+            this.val = val;
+        }
+
+        /**
+         * @return Value.
+         */
+        public String value() {
+            return val;
+        }
+
+        /**
+         * @param val New value.
+         */
+        public void value(String val) {
+            this.val = val;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/038249d7/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index bb397f7..21bd13e 100755
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
 
 import java.util.Set;
 import junit.framework.TestSuite;
+import org.apache.ignite.cache.IgniteCacheEntryProcessorSequentialCallTest;
 import org.apache.ignite.cache.IgniteWarmupClosureSelfTest;
 import org.apache.ignite.cache.store.CacheStoreReadFromBackupTest;
 import org.apache.ignite.cache.store.CacheTransactionalStoreReadFromBackupTest;
@@ -189,6 +190,7 @@ public class IgniteCacheTestSuite extends TestSuite {
         suite.addTestSuite(IgniteCacheTxNearEnabledInvokeTest.class);
         suite.addTestSuite(IgniteCacheTxLocalInvokeTest.class);
         suite.addTestSuite(IgniteCrossCacheTxStoreSelfTest.class);
+        suite.addTestSuite(IgniteCacheEntryProcessorSequentialCallTest.class);
 
         // TODO GG-11148: include test when implemented.
         // Test fails due to incorrect handling of 
CacheConfiguration#getCopyOnRead() and

Reply via email to