Repository: ignite Updated Branches: refs/heads/ignite-1702 [created] f49067fd2
IGNITE-1702 - Fixed error handling for cache updates Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ee76924f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ee76924f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ee76924f Branch: refs/heads/ignite-1702 Commit: ee76924fa800a8b16fb0e5859bbf67d93f23e12e Parents: 02b59e4 Author: Valentin Kulichenko <[email protected]> Authored: Fri Oct 16 11:35:41 2015 -0700 Committer: Valentin Kulichenko <[email protected]> Committed: Fri Oct 16 11:35:41 2015 -0700 ---------------------------------------------------------------------- .../distributed/dht/GridDhtTxFinishFuture.java | 4 +- .../distributed/dht/GridDhtTxLocalAdapter.java | 24 +-- .../distributed/dht/GridDhtTxPrepareFuture.java | 34 +++- .../dht/atomic/GridDhtAtomicCache.java | 7 +- .../distributed/near/GridNearLockFuture.java | 7 - .../cache/distributed/near/GridNearTxLocal.java | 12 +- .../cache/transactions/IgniteTxHandler.java | 8 +- .../CachePutEventListenerErrorSelfTest.java | 179 +++++++++++++++++++ .../ignite/testsuites/IgniteCacheTestSuite.java | 3 + 9 files changed, 245 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ee76924f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index 79bccc2..177d5b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.util.Collection; import java.util.Map; -import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; @@ -223,6 +222,9 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur Throwable e = this.err.get(); + if (e == null) + e = this.tx.commitError(); + if (super.onDone(tx, e != null ? e : err)) { // Always send finish reply. this.tx.sendFinishReply(commit, error()); http://git-wip-us.apache.org/repos/asf/ignite/blob/ee76924f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 8c7d985..84e1d72 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -141,20 +141,20 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { int taskNameHash ) { super( - cctx, - xidVer, - implicit, - implicitSingle, - sys, - plc, - concurrency, - isolation, - timeout, + cctx, + xidVer, + implicit, + implicitSingle, + sys, + plc, + concurrency, + isolation, + timeout, invalidate, storeEnabled, onePhaseCommit, - txSize, - subjId, + txSize, + subjId, taskNameHash ); @@ -889,7 +889,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { /** * @return {@code True} if transaction is finished on prepare step. */ - protected final boolean commitOnPrepare() { + public final boolean commitOnPrepare() { return onePhaseCommit() && !near() && !nearOnOriginatingNode; } http://git-wip-us.apache.org/repos/asf/ignite/blob/ee76924f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 761bbb0..4e946c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -57,6 +57,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.dr.GridDrType; +import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.GridLeanSet; @@ -575,10 +576,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter if (tx.commitOnPrepare()) { if (tx.markFinalizing(IgniteInternalTx.FinalizationStatus.USER_FINISH)) { - IgniteInternalFuture<IgniteInternalTx> fut = this.err.get() == null ? - tx.commitAsync() : tx.rollbackAsync(); - - fut.listen(new CIX1<IgniteInternalFuture<IgniteInternalTx>>() { + CIX1<IgniteInternalFuture<IgniteInternalTx>> responseClo = new CIX1<IgniteInternalFuture<IgniteInternalTx>>() { @Override public void applyx(IgniteInternalFuture<IgniteInternalTx> fut) { try { if (replied.compareAndSet(false, true)) @@ -588,7 +586,33 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter U.error(log, "Failed to send prepare response for transaction: " + tx, e); } } - }); + }; + + IgniteInternalFuture<IgniteInternalTx> fut; + + if (this.err.get() == null) { + try { + fut = tx.commitAsync(); + } + catch (RuntimeException | Error e) { + Exception hEx = new IgniteTxHeuristicCheckedException("Commit produced a runtime " + + "exception: " + CU.txString(tx), e); + + res.error(hEx); + + tx.systemInvalidate(true); + + fut = tx.rollbackAsync(); + + fut.listen(responseClo); + + throw e; + } + } + else + fut = tx.rollbackAsync(); + + fut.listen(responseClo); } } else { http://git-wip-us.apache.org/repos/asf/ignite/blob/ee76924f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 854a83d..93c0586 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1246,7 +1246,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { remap = true; } - catch (Exception e) { + catch (Throwable e) { // At least RuntimeException can be thrown by the code above when GridCacheContext is cleaned and there is // an attempt to use cleaned resources. U.error(log, "Unexpected exception during cache update", e); @@ -1255,6 +1255,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { completionCb.apply(req, res); + if (e instanceof Error) + throw e; + return; } @@ -2966,4 +2969,4 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { pendingResponses.remove(nodeId, this); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ee76924f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index e6b1e02..808e3f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -276,13 +276,6 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean } /** - * @return {@code True} if commit is synchronous. - */ - private boolean syncCommit() { - return tx != null && tx.syncCommit(); - } - - /** * @return {@code True} if rollback is synchronous. */ private boolean syncRollback() { http://git-wip-us.apache.org/repos/asf/ignite/blob/ee76924f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index ea96649..ee3fe54 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -71,6 +72,7 @@ import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; import static org.apache.ignite.transactions.TransactionState.COMMITTED; import static org.apache.ignite.transactions.TransactionState.COMMITTING; import static org.apache.ignite.transactions.TransactionState.PREPARING; @@ -216,7 +218,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** {@inheritDoc} */ @Override protected IgniteInternalFuture<Boolean> addReader( - long msgId, + long msgId, GridDhtCacheEntry cached, IgniteTxEntry entry, AffinityTopologyVersion topVer @@ -284,7 +286,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { return true; for (int cacheId : activeCacheIds()) { - if (cctx.cacheContext(cacheId).config().getWriteSynchronizationMode() == FULL_SYNC) + CacheWriteSynchronizationMode mode = cctx.cacheContext(cacheId).config().getWriteSynchronizationMode(); + + if (mode == FULL_SYNC || mode == PRIMARY_SYNC) return true; } @@ -1143,8 +1147,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** {@inheritDoc} */ @Override protected GridCacheEntryEx entryEx( - GridCacheContext cacheCtx, - IgniteTxKey key, + GridCacheContext cacheCtx, + IgniteTxKey key, AffinityTopologyVersion topVer ) { if (cacheCtx.isColocated()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/ee76924f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index c2cc629..35ea25b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -413,7 +413,7 @@ public class IgniteTxHandler { req.last(), req.lastBackups()); - if (tx.isRollbackOnly()) { + if (tx.isRollbackOnly() && !tx.commitOnPrepare()) { try { tx.rollback(); } @@ -714,6 +714,10 @@ public class IgniteTxHandler { } } catch (Throwable e) { + tx.commitError(e); + + tx.systemInvalidate(true); + U.error(log, "Failed completing transaction [commit=" + req.commit() + ", tx=" + tx + ']', e); IgniteInternalFuture<IgniteInternalTx> res = null; @@ -1382,4 +1386,4 @@ public class IgniteTxHandler { fut.onResult(nodeId, res); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ee76924f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CachePutEventListenerErrorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CachePutEventListenerErrorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CachePutEventListenerErrorSelfTest.java new file mode 100644 index 0000000..f8c6d37 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CachePutEventListenerErrorSelfTest.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.UUID; +import javax.cache.CacheException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMemoryMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.Event; +import org.apache.ignite.events.EventType; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Test for cache put with error in event listener. + */ +public class CachePutEventListenerErrorSelfTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + cfg.setIncludeEventTypes(EventType.EVT_CACHE_OBJECT_PUT); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(3); + + Ignition.setClientMode(true); + + Ignite ignite = startGrid("client"); + + ignite.events().remoteListen( + new IgniteBiPredicate<UUID, Event>() { + @Override public boolean apply(UUID uuid, Event evt) { + return true; + } + }, + new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + throw new NoClassDefFoundError("XXX"); + } + }, + EventType.EVT_CACHE_OBJECT_PUT + ); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionedAtomicOnHeap() throws Exception { + doTest(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC, CacheMemoryMode.ONHEAP_TIERED); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionedAtomicOffHeap() throws Exception { + doTest(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC, CacheMemoryMode.OFFHEAP_TIERED); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionedTransactionalOnHeap() throws Exception { + doTest(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL, CacheMemoryMode.ONHEAP_TIERED); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionedTransactionalOffHeap() throws Exception { + doTest(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL, CacheMemoryMode.OFFHEAP_TIERED); + } + + /** + * @throws Exception If failed. + */ + public void testReplicatedAtomicOnHeap() throws Exception { + doTest(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC, CacheMemoryMode.ONHEAP_TIERED); + } + + /** + * @throws Exception If failed. + */ + public void testReplicatedAtomicOffHeap() throws Exception { + doTest(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC, CacheMemoryMode.OFFHEAP_TIERED); + } + + /** + * @throws Exception If failed. + */ + public void testReplicatedTransactionalOnHeap() throws Exception { + doTest(CacheMode.REPLICATED, CacheAtomicityMode.TRANSACTIONAL, CacheMemoryMode.ONHEAP_TIERED); + } + + /** + * @throws Exception If failed. + */ + public void testReplicatedTransactionalOffHeap() throws Exception { + doTest(CacheMode.REPLICATED, CacheAtomicityMode.TRANSACTIONAL, CacheMemoryMode.OFFHEAP_TIERED); + } + + /** + * @param cacheMode Cache mode. + * @param atomicityMode Atomicity mode. + * @param memMode Memory mode. + * @throws Exception If failed. + */ + private void doTest(CacheMode cacheMode, CacheAtomicityMode atomicityMode, CacheMemoryMode memMode) + throws Exception { + Ignite ignite = grid("client"); + + try { + CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<>("cache"); + + cfg.setCacheMode(cacheMode); + cfg.setAtomicityMode(atomicityMode); + cfg.setMemoryMode(memMode); + + IgniteCache<Integer, Integer> cache = ignite.createCache(cfg).withAsync(); + + cache.put(0, 0); + + try { + cache.future().get(2000); + + assert false : "Exception was not thrown"; + } + catch (CacheException e) { + info("Caught expected exception: " + e); + } + } + finally { + ignite.destroyCache("cache"); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ee76924f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index 1deb3bc..3e6d335 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -35,6 +35,7 @@ import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreMultithreadedSel import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreSelfTest; import org.apache.ignite.internal.processors.cache.CacheAffinityCallSelfTest; import org.apache.ignite.internal.processors.cache.CacheFutureExceptionSelfTest; +import org.apache.ignite.internal.processors.cache.CachePutEventListenerErrorSelfTest; import org.apache.ignite.internal.processors.cache.GridCacheAffinityApiSelfTest; import org.apache.ignite.internal.processors.cache.GridCacheAffinityMapperSelfTest; import org.apache.ignite.internal.processors.cache.GridCacheAffinityRoutingSelfTest; @@ -266,6 +267,8 @@ public class IgniteCacheTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheNearLockValueSelfTest.class); + suite.addTestSuite(CachePutEventListenerErrorSelfTest.class); + return suite; } }
