IGNITE-1702 - Merged fix.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c6e117fe
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c6e117fe
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c6e117fe

Branch: refs/heads/ignite-1282
Commit: c6e117fe63a1d1ad7a2b247c658ec479ba0ba5fc
Parents: 784ea7c
Author: Alexey Goncharuk <alexey.goncha...@gmail.com>
Authored: Sat Nov 28 16:28:22 2015 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Sat Nov 28 16:28:22 2015 +0300

----------------------------------------------------------------------
 .../distributed/dht/GridDhtTxFinishFuture.java  |  12 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  |   2 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |  37 +++-
 .../dht/atomic/GridDhtAtomicCache.java          |   5 +-
 .../distributed/near/GridNearLockFuture.java    |   7 -
 .../cache/transactions/IgniteTxHandler.java     |   6 +-
 .../CachePutEventListenerErrorSelfTest.java     | 180 +++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite.java |   3 +
 8 files changed, 232 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c6e117fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index c4a90b1..9a0d778 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -201,9 +201,17 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCompoundIdentityFutur
 
             Throwable e = this.err.get();
 
-            if (super.onDone(tx, e != null ? e : err)) {
+            if (e == null && commit)
+                e = this.tx.commitError();
+
+            Throwable finishErr = e != null ? e : err;
+
+            if (super.onDone(tx, finishErr)) {
+                if (finishErr == null)
+                    finishErr = this.tx.commitError();
+
                 // Always send finish reply.
-                this.tx.sendFinishReply(commit, error());
+                this.tx.sendFinishReply(commit, finishErr);
 
                 // Don't forget to clean up.
                 cctx.mvcc().removeFuture(futId);

http://git-wip-us.apache.org/repos/asf/ignite/blob/c6e117fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 2330a95..534a560 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -822,7 +822,7 @@ public abstract class GridDhtTxLocalAdapter extends 
IgniteTxLocalAdapter {
     /**
      * @return {@code True} if transaction is finished on prepare step.
      */
-    protected final boolean commitOnPrepare() {
+    public final boolean commitOnPrepare() {
         return onePhaseCommit() && !near() && !nearOnOriginatingNode;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c6e117fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 1d418ea..34addfa 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -58,6 +58,7 @@ import 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.dr.GridDrType;
+import 
org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
 import 
org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
 import org.apache.ignite.internal.util.F0;
 import org.apache.ignite.internal.util.GridLeanSet;
@@ -602,13 +603,8 @@ public final class GridDhtTxPrepareFuture extends 
GridCompoundFuture<IgniteInter
                 if 
(tx.markFinalizing(IgniteInternalTx.FinalizationStatus.USER_FINISH)) {
                     IgniteInternalFuture<IgniteInternalTx> fut = null;
 
-                    if (prepErr == null)
-                        fut = tx.commitAsync();
-                    else if (!cctx.kernalContext().isStopping())
-                        fut = tx.rollbackAsync();
-
-                    if (fut != null) {
-                        fut.listen(new 
CIX1<IgniteInternalFuture<IgniteInternalTx>>() {
+                    CIX1<IgniteInternalFuture<IgniteInternalTx>> responseClo =
+                        new CIX1<IgniteInternalFuture<IgniteInternalTx>>() {
                             @Override public void 
applyx(IgniteInternalFuture<IgniteInternalTx> fut) {
                                 try {
                                     if (replied.compareAndSet(false, true))
@@ -618,8 +614,33 @@ public final class GridDhtTxPrepareFuture extends 
GridCompoundFuture<IgniteInter
                                     U.error(log, "Failed to send prepare 
response for transaction: " + tx, e);
                                 }
                             }
-                        });
+                        };
+
+                    if (prepErr == null) {
+                        try {
+                            fut = tx.commitAsync();
+                        }
+                        catch (RuntimeException | Error e) {
+                            Exception hEx = new 
IgniteTxHeuristicCheckedException("Commit produced a runtime " +
+                                "exception: " + CU.txString(tx), e);
+
+                            res.error(hEx);
+
+                            tx.systemInvalidate(true);
+
+                            fut = tx.rollbackAsync();
+
+                            fut.listen(responseClo);
+
+                            throw e;
+                        }
+
                     }
+                    else if (!cctx.kernalContext().isStopping())
+                        fut = tx.rollbackAsync();
+
+                    if (fut != null)
+                        fut.listen(responseClo);
                 }
             }
             else {

http://git-wip-us.apache.org/repos/asf/ignite/blob/c6e117fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 8fe1b3a..a49341b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1393,7 +1393,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
             remap = true;
         }
-        catch (Exception e) {
+        catch (Throwable e) {
             // At least RuntimeException can be thrown by the code above when 
GridCacheContext is cleaned and there is
             // an attempt to use cleaned resources.
             U.error(log, "Unexpected exception during cache update", e);
@@ -1402,6 +1402,9 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
             completionCb.apply(req, res);
 
+            if (e instanceof Error)
+                throw e;
+
             return;
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c6e117fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 417303b..4cb7248 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -263,13 +263,6 @@ public final class GridNearLockFuture extends 
GridCompoundIdentityFuture<Boolean
     }
 
     /**
-     * @return {@code True} if commit is synchronous.
-     */
-    private boolean syncCommit() {
-        return tx != null && tx.syncCommit();
-    }
-
-    /**
      * @return {@code True} if rollback is synchronous.
      */
     private boolean syncRollback() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/c6e117fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 63a4cbe..61a9bed 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -413,7 +413,7 @@ public class IgniteTxHandler {
                 req.transactionNodes(),
                 req.last());
 
-            if (tx.isRollbackOnly()) {
+            if (tx.isRollbackOnly() && !tx.commitOnPrepare()) {
                 try {
                     if (tx.state() != TransactionState.ROLLED_BACK && 
tx.state() != TransactionState.ROLLING_BACK)
                         tx.rollback();
@@ -713,6 +713,10 @@ public class IgniteTxHandler {
             }
         }
         catch (Throwable e) {
+            tx.commitError(e);
+
+            tx.systemInvalidate(true);
+
             U.error(log, "Failed completing transaction [commit=" + 
req.commit() + ", tx=" + tx + ']', e);
 
             IgniteInternalFuture<IgniteInternalTx> res = null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/c6e117fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CachePutEventListenerErrorSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CachePutEventListenerErrorSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CachePutEventListenerErrorSelfTest.java
new file mode 100644
index 0000000..0e0e521
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CachePutEventListenerErrorSelfTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.UUID;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Test for cache put with error in event listener.
+ */
+public class CachePutEventListenerErrorSelfTest extends GridCommonAbstractTest 
{
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(disco);
+
+        cfg.setIncludeEventTypes(EventType.EVT_CACHE_OBJECT_PUT);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGridsMultiThreaded(3);
+
+        Ignition.setClientMode(true);
+
+        Ignite ignite = startGrid("client");
+
+        ignite.events().remoteListen(
+            new IgniteBiPredicate<UUID, Event>() {
+                @Override public boolean apply(UUID uuid, Event evt) {
+                    return true;
+                }
+            },
+            new IgnitePredicate<Event>() {
+                @Override public boolean apply(Event evt) {
+                    throw new NoClassDefFoundError("XXX");
+                }
+            },
+            EventType.EVT_CACHE_OBJECT_PUT
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionedAtomicOnHeap() throws Exception {
+        doTest(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC, 
CacheMemoryMode.ONHEAP_TIERED);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionedAtomicOffHeap() throws Exception {
+        doTest(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC, 
CacheMemoryMode.OFFHEAP_TIERED);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionedTransactionalOnHeap() throws Exception {
+        doTest(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL, 
CacheMemoryMode.ONHEAP_TIERED);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionedTransactionalOffHeap() throws Exception {
+        doTest(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL, 
CacheMemoryMode.OFFHEAP_TIERED);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReplicatedAtomicOnHeap() throws Exception {
+        doTest(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC, 
CacheMemoryMode.ONHEAP_TIERED);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReplicatedAtomicOffHeap() throws Exception {
+        doTest(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC, 
CacheMemoryMode.OFFHEAP_TIERED);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReplicatedTransactionalOnHeap() throws Exception {
+        doTest(CacheMode.REPLICATED, CacheAtomicityMode.TRANSACTIONAL, 
CacheMemoryMode.ONHEAP_TIERED);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReplicatedTransactionalOffHeap() throws Exception {
+        doTest(CacheMode.REPLICATED, CacheAtomicityMode.TRANSACTIONAL, 
CacheMemoryMode.OFFHEAP_TIERED);
+    }
+
+    /**
+     * @param cacheMode Cache mode.
+     * @param atomicityMode Atomicity mode.
+     * @param memMode Memory mode.
+     * @throws Exception If failed.
+     */
+    private void doTest(CacheMode cacheMode, CacheAtomicityMode atomicityMode, 
CacheMemoryMode memMode)
+        throws Exception {
+        Ignite ignite = grid("client");
+
+        try {
+            CacheConfiguration<Integer, Integer> cfg = 
defaultCacheConfiguration();
+
+            cfg.setName("cache");
+            cfg.setCacheMode(cacheMode);
+            cfg.setAtomicityMode(atomicityMode);
+            cfg.setMemoryMode(memMode);
+
+            IgniteCache<Integer, Integer> cache = 
ignite.createCache(cfg).withAsync();
+
+            cache.put(0, 0);
+
+            try {
+                cache.future().get(2000);
+
+                assert false : "Exception was not thrown";
+            }
+            catch (CacheException e) {
+                info("Caught expected exception: " + e);
+            }
+        }
+        finally {
+            ignite.destroyCache("cache");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c6e117fe/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 4797ff1..ca31c28 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -38,6 +38,7 @@ import 
org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreSelfTest;
 import org.apache.ignite.internal.processors.cache.CacheAffinityCallSelfTest;
 import 
org.apache.ignite.internal.processors.cache.CacheDeferredDeleteSanitySelfTest;
 import 
org.apache.ignite.internal.processors.cache.CacheFutureExceptionSelfTest;
+import 
org.apache.ignite.internal.processors.cache.CachePutEventListenerErrorSelfTest;
 import org.apache.ignite.internal.processors.cache.CacheNamesSelfTest;
 import 
org.apache.ignite.internal.processors.cache.GridCacheAffinityApiSelfTest;
 import 
org.apache.ignite.internal.processors.cache.GridCacheAffinityMapperSelfTest;
@@ -276,6 +277,8 @@ public class IgniteCacheTestSuite extends TestSuite {
 
         suite.addTestSuite(IgniteCacheNearLockValueSelfTest.class);
 
+        suite.addTestSuite(CachePutEventListenerErrorSelfTest.class);
+
         return suite;
     }
 }

Reply via email to