IGNITE-1702 - Merged fix.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c6e117fe Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c6e117fe Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c6e117fe Branch: refs/heads/ignite-1282 Commit: c6e117fe63a1d1ad7a2b247c658ec479ba0ba5fc Parents: 784ea7c Author: Alexey Goncharuk <alexey.goncha...@gmail.com> Authored: Sat Nov 28 16:28:22 2015 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Sat Nov 28 16:28:22 2015 +0300 ---------------------------------------------------------------------- .../distributed/dht/GridDhtTxFinishFuture.java | 12 +- .../distributed/dht/GridDhtTxLocalAdapter.java | 2 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 37 +++- .../dht/atomic/GridDhtAtomicCache.java | 5 +- .../distributed/near/GridNearLockFuture.java | 7 - .../cache/transactions/IgniteTxHandler.java | 6 +- .../CachePutEventListenerErrorSelfTest.java | 180 +++++++++++++++++++ .../ignite/testsuites/IgniteCacheTestSuite.java | 3 + 8 files changed, 232 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c6e117fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index c4a90b1..9a0d778 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -201,9 +201,17 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur Throwable e = this.err.get(); - if (super.onDone(tx, e != null ? e : err)) { + if (e == null && commit) + e = this.tx.commitError(); + + Throwable finishErr = e != null ? e : err; + + if (super.onDone(tx, finishErr)) { + if (finishErr == null) + finishErr = this.tx.commitError(); + // Always send finish reply. - this.tx.sendFinishReply(commit, error()); + this.tx.sendFinishReply(commit, finishErr); // Don't forget to clean up. cctx.mvcc().removeFuture(futId); http://git-wip-us.apache.org/repos/asf/ignite/blob/c6e117fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 2330a95..534a560 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -822,7 +822,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { /** * @return {@code True} if transaction is finished on prepare step. */ - protected final boolean commitOnPrepare() { + public final boolean commitOnPrepare() { return onePhaseCommit() && !near() && !nearOnOriginatingNode; } http://git-wip-us.apache.org/repos/asf/ignite/blob/c6e117fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 1d418ea..34addfa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -58,6 +58,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.dr.GridDrType; +import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.GridLeanSet; @@ -602,13 +603,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter if (tx.markFinalizing(IgniteInternalTx.FinalizationStatus.USER_FINISH)) { IgniteInternalFuture<IgniteInternalTx> fut = null; - if (prepErr == null) - fut = tx.commitAsync(); - else if (!cctx.kernalContext().isStopping()) - fut = tx.rollbackAsync(); - - if (fut != null) { - fut.listen(new CIX1<IgniteInternalFuture<IgniteInternalTx>>() { + CIX1<IgniteInternalFuture<IgniteInternalTx>> responseClo = + new CIX1<IgniteInternalFuture<IgniteInternalTx>>() { @Override public void applyx(IgniteInternalFuture<IgniteInternalTx> fut) { try { if (replied.compareAndSet(false, true)) @@ -618,8 +614,33 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter U.error(log, "Failed to send prepare response for transaction: " + tx, e); } } - }); + }; + + if (prepErr == null) { + try { + fut = tx.commitAsync(); + } + catch (RuntimeException | Error e) { + Exception hEx = new IgniteTxHeuristicCheckedException("Commit produced a runtime " + + "exception: " + CU.txString(tx), e); + + res.error(hEx); + + tx.systemInvalidate(true); + + fut = tx.rollbackAsync(); + + fut.listen(responseClo); + + throw e; + } + } + else if (!cctx.kernalContext().isStopping()) + fut = tx.rollbackAsync(); + + if (fut != null) + fut.listen(responseClo); } } else { http://git-wip-us.apache.org/repos/asf/ignite/blob/c6e117fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 8fe1b3a..a49341b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1393,7 +1393,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { remap = true; } - catch (Exception e) { + catch (Throwable e) { // At least RuntimeException can be thrown by the code above when GridCacheContext is cleaned and there is // an attempt to use cleaned resources. U.error(log, "Unexpected exception during cache update", e); @@ -1402,6 +1402,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { completionCb.apply(req, res); + if (e instanceof Error) + throw e; + return; } http://git-wip-us.apache.org/repos/asf/ignite/blob/c6e117fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 417303b..4cb7248 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -263,13 +263,6 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean } /** - * @return {@code True} if commit is synchronous. - */ - private boolean syncCommit() { - return tx != null && tx.syncCommit(); - } - - /** * @return {@code True} if rollback is synchronous. */ private boolean syncRollback() { http://git-wip-us.apache.org/repos/asf/ignite/blob/c6e117fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 63a4cbe..61a9bed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -413,7 +413,7 @@ public class IgniteTxHandler { req.transactionNodes(), req.last()); - if (tx.isRollbackOnly()) { + if (tx.isRollbackOnly() && !tx.commitOnPrepare()) { try { if (tx.state() != TransactionState.ROLLED_BACK && tx.state() != TransactionState.ROLLING_BACK) tx.rollback(); @@ -713,6 +713,10 @@ public class IgniteTxHandler { } } catch (Throwable e) { + tx.commitError(e); + + tx.systemInvalidate(true); + U.error(log, "Failed completing transaction [commit=" + req.commit() + ", tx=" + tx + ']', e); IgniteInternalFuture<IgniteInternalTx> res = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/c6e117fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CachePutEventListenerErrorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CachePutEventListenerErrorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CachePutEventListenerErrorSelfTest.java new file mode 100644 index 0000000..0e0e521 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CachePutEventListenerErrorSelfTest.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.UUID; +import javax.cache.CacheException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMemoryMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.Event; +import org.apache.ignite.events.EventType; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Test for cache put with error in event listener. + */ +public class CachePutEventListenerErrorSelfTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + cfg.setIncludeEventTypes(EventType.EVT_CACHE_OBJECT_PUT); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(3); + + Ignition.setClientMode(true); + + Ignite ignite = startGrid("client"); + + ignite.events().remoteListen( + new IgniteBiPredicate<UUID, Event>() { + @Override public boolean apply(UUID uuid, Event evt) { + return true; + } + }, + new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + throw new NoClassDefFoundError("XXX"); + } + }, + EventType.EVT_CACHE_OBJECT_PUT + ); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionedAtomicOnHeap() throws Exception { + doTest(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC, CacheMemoryMode.ONHEAP_TIERED); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionedAtomicOffHeap() throws Exception { + doTest(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC, CacheMemoryMode.OFFHEAP_TIERED); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionedTransactionalOnHeap() throws Exception { + doTest(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL, CacheMemoryMode.ONHEAP_TIERED); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionedTransactionalOffHeap() throws Exception { + doTest(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL, CacheMemoryMode.OFFHEAP_TIERED); + } + + /** + * @throws Exception If failed. + */ + public void testReplicatedAtomicOnHeap() throws Exception { + doTest(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC, CacheMemoryMode.ONHEAP_TIERED); + } + + /** + * @throws Exception If failed. + */ + public void testReplicatedAtomicOffHeap() throws Exception { + doTest(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC, CacheMemoryMode.OFFHEAP_TIERED); + } + + /** + * @throws Exception If failed. + */ + public void testReplicatedTransactionalOnHeap() throws Exception { + doTest(CacheMode.REPLICATED, CacheAtomicityMode.TRANSACTIONAL, CacheMemoryMode.ONHEAP_TIERED); + } + + /** + * @throws Exception If failed. + */ + public void testReplicatedTransactionalOffHeap() throws Exception { + doTest(CacheMode.REPLICATED, CacheAtomicityMode.TRANSACTIONAL, CacheMemoryMode.OFFHEAP_TIERED); + } + + /** + * @param cacheMode Cache mode. + * @param atomicityMode Atomicity mode. + * @param memMode Memory mode. + * @throws Exception If failed. + */ + private void doTest(CacheMode cacheMode, CacheAtomicityMode atomicityMode, CacheMemoryMode memMode) + throws Exception { + Ignite ignite = grid("client"); + + try { + CacheConfiguration<Integer, Integer> cfg = defaultCacheConfiguration(); + + cfg.setName("cache"); + cfg.setCacheMode(cacheMode); + cfg.setAtomicityMode(atomicityMode); + cfg.setMemoryMode(memMode); + + IgniteCache<Integer, Integer> cache = ignite.createCache(cfg).withAsync(); + + cache.put(0, 0); + + try { + cache.future().get(2000); + + assert false : "Exception was not thrown"; + } + catch (CacheException e) { + info("Caught expected exception: " + e); + } + } + finally { + ignite.destroyCache("cache"); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c6e117fe/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index 4797ff1..ca31c28 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -38,6 +38,7 @@ import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreSelfTest; import org.apache.ignite.internal.processors.cache.CacheAffinityCallSelfTest; import org.apache.ignite.internal.processors.cache.CacheDeferredDeleteSanitySelfTest; import org.apache.ignite.internal.processors.cache.CacheFutureExceptionSelfTest; +import org.apache.ignite.internal.processors.cache.CachePutEventListenerErrorSelfTest; import org.apache.ignite.internal.processors.cache.CacheNamesSelfTest; import org.apache.ignite.internal.processors.cache.GridCacheAffinityApiSelfTest; import org.apache.ignite.internal.processors.cache.GridCacheAffinityMapperSelfTest; @@ -276,6 +277,8 @@ public class IgniteCacheTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheNearLockValueSelfTest.class); + suite.addTestSuite(CachePutEventListenerErrorSelfTest.class); + return suite; } }