Repository: ignite Updated Branches: refs/heads/ignite-6083 [created] cbf65cb44
IGNITE-6083 Null value has appeared in entry processor, but entry exists Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/038249d7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/038249d7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/038249d7 Branch: refs/heads/ignite-6083 Commit: 038249d718b44a602efa5f8fdae39c2757c09f99 Parents: 9d9d8a8 Author: voipp <[email protected]> Authored: Tue Feb 27 13:57:51 2018 +0300 Committer: voipp <[email protected]> Committed: Tue Mar 27 18:29:56 2018 +0300 ---------------------------------------------------------------------- .../cache/distributed/near/GridNearTxLocal.java | 3 + .../transactions/IgniteTxLocalAdapter.java | 2 + ...teCacheEntryProcessorSequentialCallTest.java | 311 +++++++++++++++++++ .../ignite/testsuites/IgniteCacheTestSuite.java | 2 + 4 files changed, 318 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/038249d7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index a3fddaf..4d9bfe3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -1271,6 +1271,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou keepBinary, CU.isNearEnabled(cacheCtx)); + if (op == TRANSFORM && txEntry.value() == null && old != null) + txEntry.value(cacheCtx.toCacheObject(old), false, false); + if (enlisted != null) enlisted.add(cacheKey); http://git-wip-us.apache.org/repos/asf/ignite/blob/038249d7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 8dec59a..8613600 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1112,6 +1112,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig if (txEntry.op() == TRANSFORM) { if (computeInvoke) { + txEntry.readValue(v); + GridCacheVersion ver; try { http://git-wip-us.apache.org/repos/asf/ignite/blob/038249d7/modules/core/src/test/java/org/apache/ignite/cache/IgniteCacheEntryProcessorSequentialCallTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/IgniteCacheEntryProcessorSequentialCallTest.java b/modules/core/src/test/java/org/apache/ignite/cache/IgniteCacheEntryProcessorSequentialCallTest.java new file mode 100644 index 0000000..592449d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cache/IgniteCacheEntryProcessorSequentialCallTest.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.transactions.TransactionOptimisticException; + +/** + */ +public class IgniteCacheEntryProcessorSequentialCallTest extends GridCommonAbstractTest { + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrids(2); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + CacheConfiguration cacheCfg = new CacheConfiguration("cache"); + + cacheCfg.setCacheMode(CacheMode.PARTITIONED); + cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC); + cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setMaxConcurrentAsyncOperations(0); + cacheCfg.setBackups(0); + + cfg.setCacheConfiguration(cacheCfg); + + return cfg; + } + + /** + * + */ + public void testOptimisticSerializableTxInvokeSequentialCall() throws Exception { + transactionInvokeSequentialCallOnPrimaryNode(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE); + + transactionInvokeSequentialCallOnNearNode(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE); + } + + /** + * + */ + public void testOptimisticRepeatableReadTxInvokeSequentialCall() throws Exception { + transactionInvokeSequentialCallOnPrimaryNode(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ); + + transactionInvokeSequentialCallOnNearNode(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ); + } + + /** + * + */ + public void testOptimisticReadCommittedTxInvokeSequentialCall() throws Exception { + transactionInvokeSequentialCallOnPrimaryNode(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED); + + transactionInvokeSequentialCallOnNearNode(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED); + } + + /** + * + */ + public void testPessimisticSerializableTxInvokeSequentialCall() throws Exception { + transactionInvokeSequentialCallOnPrimaryNode(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE); + + transactionInvokeSequentialCallOnNearNode(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE); + } + + /** + * + */ + public void testPessimisticRepeatableReadTxInvokeSequentialCall() throws Exception { + transactionInvokeSequentialCallOnPrimaryNode(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ); + + transactionInvokeSequentialCallOnNearNode(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ); + } + + /** + * + */ + public void testPessimisticReadCommittedTxInvokeSequentialCall() throws Exception { + transactionInvokeSequentialCallOnPrimaryNode(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED); + + transactionInvokeSequentialCallOnNearNode(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED); + } + + /** + * Test for sequential entry processor invoking not null value on primary cache. + * In this test entry processor gets value from local node. + * + * @param transactionConcurrency Transaction concurrency. + * @param transactionIsolation Transaction isolation. + */ + public void transactionInvokeSequentialCallOnPrimaryNode(TransactionConcurrency transactionConcurrency, + TransactionIsolation transactionIsolation) throws Exception { + TestKey key = new TestKey(1L); + TestValue val = new TestValue(); + val.value("1"); + + Ignite primaryIgnite; + + if (ignite(0).affinity("cache").isPrimary(ignite(0).cluster().localNode(), key)) + primaryIgnite = ignite(0); + else + primaryIgnite = ignite(1); + + IgniteCache<TestKey, TestValue> cache = primaryIgnite.cache("cache"); + + cache.put(key, val); + + NotNullCacheEntryProcessor cacheEntryProcessor = new NotNullCacheEntryProcessor(); + + try (Transaction transaction = primaryIgnite.transactions().txStart(transactionConcurrency, + transactionIsolation)) { + + cache.invoke(key, cacheEntryProcessor); + cache.invoke(key, cacheEntryProcessor); + + transaction.commit(); + } + + cache.remove(key); + } + + /** + * Test for sequential entry processor invoking not null value on near cache. + * In this test entry processor fetches value from remote node. + * + * @param transactionConcurrency Transaction concurrency. + * @param transactionIsolation Transaction isolation. + */ + public void transactionInvokeSequentialCallOnNearNode(TransactionConcurrency transactionConcurrency, + TransactionIsolation transactionIsolation) throws Exception { + TestKey key = new TestKey(1L); + TestValue val = new TestValue(); + val.value("1"); + + Ignite nearIgnite; + Ignite primaryIgnite; + + if (ignite(0).affinity("cache").isPrimary(ignite(0).cluster().localNode(), key)) { + primaryIgnite = ignite(0); + + nearIgnite = ignite(1); + } + else { + primaryIgnite = ignite(1); + + nearIgnite = ignite(0); + } + + primaryIgnite.cache("cache").put(key, val); + + IgniteCache<TestKey, TestValue> nearCache = nearIgnite.cache("cache"); + + NotNullCacheEntryProcessor cacheEntryProcessor = new NotNullCacheEntryProcessor(); + + try (Transaction transaction = nearIgnite.transactions().txStart(transactionConcurrency, + transactionIsolation)) { + + nearCache.invoke(key, cacheEntryProcessor); + nearCache.invoke(key, cacheEntryProcessor); + + transaction.commit(); + } + + primaryIgnite.cache("cache").remove(key); + } + + /** + * Test for sequential entry processor invocation. During transaction value is changed externally, which leads to + * optimistic conflict exception. + */ + public void testTxInvokeSequentialOptimisticConflict() throws Exception { + TestKey key = new TestKey(1L); + + IgniteCache<TestKey, TestValue> cache = ignite(0).cache("cache"); + + CountDownLatch latch = new CountDownLatch(1); + + cache.put(key, new TestValue("1")); + + multithreadedAsync(new Runnable() { + @Override public void run() { + try { + latch.await(); + } + catch (InterruptedException e) { + fail(); + } + + cache.put(key, new TestValue("2")); + } + }, 1); + + Transaction tx = ignite(0).transactions().txStart(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE); + + cache.invoke(key, new NotNullCacheEntryProcessor()); + + latch.countDown(); + + Thread.sleep(1_000); + + cache.invoke(key, new NotNullCacheEntryProcessor()); + + GridTestUtils.assertThrowsWithCause(new Callable<Object>() { + @Override public Object call() throws Exception { + tx.commit(); + + return null; + } + }, TransactionOptimisticException.class); + + cache.remove(key); + } + + /** + * Cache entry processor checking whether entry has got non-null value. + */ + public static class NotNullCacheEntryProcessor implements CacheEntryProcessor<TestKey, TestValue, Object> { + /** {@inheritDoc} */ + public Object process(MutableEntry entry, Object... arguments) throws EntryProcessorException { + assertNotNull(entry.getValue()); + + return null; + } + } + + /** + * + */ + public static class TestKey { + /** Value. */ + private final Long val; + + /** + * @param val Value. + */ + public TestKey(Long val) { + this.val = val; + } + } + + /** + * + */ + public static class TestValue { + /** Value. */ + private String val; + + /** + * Default constructor. + */ + public TestValue() { + } + + /** + * @param val Value. + */ + public TestValue(String val) { + this.val = val; + } + + /** + * @return Value. + */ + public String value() { + return val; + } + + /** + * @param val New value. + */ + public void value(String val) { + this.val = val; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/038249d7/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index bb397f7..21bd13e 100755 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -19,6 +19,7 @@ package org.apache.ignite.testsuites; import java.util.Set; import junit.framework.TestSuite; +import org.apache.ignite.cache.IgniteCacheEntryProcessorSequentialCallTest; import org.apache.ignite.cache.IgniteWarmupClosureSelfTest; import org.apache.ignite.cache.store.CacheStoreReadFromBackupTest; import org.apache.ignite.cache.store.CacheTransactionalStoreReadFromBackupTest; @@ -189,6 +190,7 @@ public class IgniteCacheTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheTxNearEnabledInvokeTest.class); suite.addTestSuite(IgniteCacheTxLocalInvokeTest.class); suite.addTestSuite(IgniteCrossCacheTxStoreSelfTest.class); + suite.addTestSuite(IgniteCacheEntryProcessorSequentialCallTest.class); // TODO GG-11148: include test when implemented. // Test fails due to incorrect handling of CacheConfiguration#getCopyOnRead() and
