This is an automated email from the ASF dual-hosted git repository.

namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 378f0643117 IGNITE-19286 Fixed NPE in case of simultaneous cache 
destroy and transaction rollback (#10647)
378f0643117 is described below

commit 378f0643117eca00f6081a08a783b9683f3adc19
Author: Nikita Amelchev <[email protected]>
AuthorDate: Mon Apr 17 19:41:59 2023 +0300

    IGNITE-19286 Fixed NPE in case of simultaneous cache destroy and 
transaction rollback (#10647)
---
 .../processors/cache/GridCacheProcessor.java       |  4 +
 .../StartImplicitlyTxOnStopCacheTest.java          | 92 +++++++++++++++++++++-
 2 files changed, 94 insertions(+), 2 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 3ad724da2c1..85b4bcef3d7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2777,10 +2777,14 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                     }
 
                     for (ExchangeActions.CacheActionData action : 
cachesToStopByGrp.getValue()) {
+                        // Rollback tx started before gateway blocked to avoid 
deadlock with gateway stop.
                         
context().tm().rollbackTransactionsForStoppingCache(action.descriptor().cacheId());
 
                         stopGateway(action.request());
 
+                        // Rollback tx started after gateway blocked but not 
stopped.
+                        
context().tm().rollbackTransactionsForStoppingCache(action.descriptor().cacheId());
+
                         String cacheName = action.request().cacheName();
 
                         GridCacheAdapter<?, ?> cache = caches.get(cacheName);
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/StartImplicitlyTxOnStopCacheTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/StartImplicitlyTxOnStopCacheTest.java
index 3305d84286b..ea19f4214e1 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/StartImplicitlyTxOnStopCacheTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/StartImplicitlyTxOnStopCacheTest.java
@@ -18,32 +18,60 @@
 package org.apache.ignite.internal.processors.cache.transactions;
 
 import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.AbstractFailureHandler;
+import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheGateway;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
 
 /**
  * The test starts an implicit transaction during the cache is stopping.
  * The transaction has to be completed, and the cache is stopped.
  */
 public class StartImplicitlyTxOnStopCacheTest extends GridCommonAbstractTest {
+    /** */
+    private static final String GROUP = "test-group";
+
+    /** Node failure occurs. */
+    private final AtomicBoolean failure = new AtomicBoolean();
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName)
             .setConsistentId(igniteInstanceName)
             .setCommunicationSpi(new TestRecordingCommunicationSpi())
+            .setFailureHandler(new AbstractFailureHandler() {
+                @Override protected boolean handle(Ignite ignite, 
FailureContext failureCtx) {
+                    failure.set(true);
+
+                    return true;
+                }
+            })
             .setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME)
+                .setGroupName(GROUP)
                 .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
 
         return cfg;
@@ -87,9 +115,9 @@ public class StartImplicitlyTxOnStopCacheTest extends 
GridCommonAbstractTest {
             }
         });
 
-        IgniteInternalFuture runTxFut = GridTestUtils.runAsync(() -> 
cache.put(100, 100));
+        IgniteInternalFuture runTxFut = runAsync(() -> cache.put(100, 100));
 
-        IgniteInternalFuture destroyCacheFut = GridTestUtils.runAsync(() ->
+        IgniteInternalFuture destroyCacheFut = runAsync(() ->
             client.destroyCache(DEFAULT_CACHE_NAME));
 
         exchnageStartedBarrier.await();
@@ -100,4 +128,64 @@ public class StartImplicitlyTxOnStopCacheTest extends 
GridCommonAbstractTest {
 
         assertNull(client.cache(DEFAULT_CACHE_NAME));
     }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testTxStartAfterGatewayBlockedOnCacheDestroy() throws 
Exception {
+        IgniteEx crd = (IgniteEx)startGridsMultiThreaded(2);
+
+        // Cache group with multiple caches are important here, partition 
topology will not be stopped on cache destroy.
+        crd.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME + "_1")
+            .setGroupName(GROUP)
+            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
+
+        GridCacheSharedContext<Object, Object> sharedCtx = 
crd.context().cache().context();
+        GridCacheContext<Object, Object> cacheCtx = 
sharedCtx.cacheContext(CU.cacheId(DEFAULT_CACHE_NAME));
+
+        CountDownLatch txStarted = new CountDownLatch(1);
+        CountDownLatch gatewayStopped = new CountDownLatch(1);
+
+        IgniteTxManager tm = Mockito.spy(sharedCtx.tm());
+        sharedCtx.setTxManager(tm);
+
+        Mockito.doAnswer(m -> {
+            // Create tx after gateway was stopped (but not blocked).
+            txStarted.countDown();
+            gatewayStopped.await();
+
+            return m.callRealMethod();
+        }).when(tm).onCreated(Mockito.any(), Mockito.any());
+
+        GridCacheGateway gate = Mockito.spy(cacheCtx.gate());
+        setFieldValue(cacheCtx, "gate", gate);
+
+        Mockito.doAnswer(m -> {
+            // Gateway is ready to block.
+            gatewayStopped.countDown();
+
+            return m.callRealMethod();
+        }).when(gate).onStopped();
+
+        IgniteInternalFuture<Object> fut = runAsync(() -> {
+            txStarted.await();
+
+            grid(1).destroyCache(DEFAULT_CACHE_NAME);
+        });
+
+        try {
+            Integer remoteKey = primaryKey(grid(1).cache(DEFAULT_CACHE_NAME));
+
+            crd.cache(DEFAULT_CACHE_NAME).putAsync(remoteKey, "val").get();
+        }
+        catch (CacheException e) {
+            // No-op.
+        }
+
+        fut.get();
+
+        // Make sure exchange worker processed previous task to get potential 
failure.
+        crd.getOrCreateCache("new-cache");
+
+        assertFalse(failure.get());
+    }
 }

Reply via email to