This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 378f0643117 IGNITE-19286 Fixed NPE in case of simultaneous cache
destroy and transaction rollback (#10647)
378f0643117 is described below
commit 378f0643117eca00f6081a08a783b9683f3adc19
Author: Nikita Amelchev <[email protected]>
AuthorDate: Mon Apr 17 19:41:59 2023 +0300
IGNITE-19286 Fixed NPE in case of simultaneous cache destroy and
transaction rollback (#10647)
---
.../processors/cache/GridCacheProcessor.java | 4 +
.../StartImplicitlyTxOnStopCacheTest.java | 92 +++++++++++++++++++++-
2 files changed, 94 insertions(+), 2 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 3ad724da2c1..85b4bcef3d7 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2777,10 +2777,14 @@ public class GridCacheProcessor extends
GridProcessorAdapter {
}
for (ExchangeActions.CacheActionData action :
cachesToStopByGrp.getValue()) {
+ // Rollback tx started before gateway blocked to avoid
deadlock with gateway stop.
context().tm().rollbackTransactionsForStoppingCache(action.descriptor().cacheId());
stopGateway(action.request());
+ // Rollback tx started after gateway blocked but not
stopped.
+
context().tm().rollbackTransactionsForStoppingCache(action.descriptor().cacheId());
+
String cacheName = action.request().cacheName();
GridCacheAdapter<?, ?> cache = caches.get(cacheName);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/StartImplicitlyTxOnStopCacheTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/StartImplicitlyTxOnStopCacheTest.java
index 3305d84286b..ea19f4214e1 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/StartImplicitlyTxOnStopCacheTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/StartImplicitlyTxOnStopCacheTest.java
@@ -18,32 +18,60 @@
package org.apache.ignite.internal.processors.cache.transactions;
import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.AbstractFailureHandler;
+import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheGateway;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
/**
* The test starts an implicit transaction during the cache is stopping.
* The transaction has to be completed, and the cache is stopped.
*/
public class StartImplicitlyTxOnStopCacheTest extends GridCommonAbstractTest {
+ /** */
+ private static final String GROUP = "test-group";
+
+ /** Node failure occurs. */
+ private final AtomicBoolean failure = new AtomicBoolean();
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName)
.setConsistentId(igniteInstanceName)
.setCommunicationSpi(new TestRecordingCommunicationSpi())
+ .setFailureHandler(new AbstractFailureHandler() {
+ @Override protected boolean handle(Ignite ignite,
FailureContext failureCtx) {
+ failure.set(true);
+
+ return true;
+ }
+ })
.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME)
+ .setGroupName(GROUP)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
return cfg;
@@ -87,9 +115,9 @@ public class StartImplicitlyTxOnStopCacheTest extends
GridCommonAbstractTest {
}
});
- IgniteInternalFuture runTxFut = GridTestUtils.runAsync(() ->
cache.put(100, 100));
+ IgniteInternalFuture runTxFut = runAsync(() -> cache.put(100, 100));
- IgniteInternalFuture destroyCacheFut = GridTestUtils.runAsync(() ->
+ IgniteInternalFuture destroyCacheFut = runAsync(() ->
client.destroyCache(DEFAULT_CACHE_NAME));
exchnageStartedBarrier.await();
@@ -100,4 +128,64 @@ public class StartImplicitlyTxOnStopCacheTest extends
GridCommonAbstractTest {
assertNull(client.cache(DEFAULT_CACHE_NAME));
}
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testTxStartAfterGatewayBlockedOnCacheDestroy() throws
Exception {
+ IgniteEx crd = (IgniteEx)startGridsMultiThreaded(2);
+
+ // Cache group with multiple caches are important here, partition
topology will not be stopped on cache destroy.
+ crd.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME + "_1")
+ .setGroupName(GROUP)
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
+
+ GridCacheSharedContext<Object, Object> sharedCtx =
crd.context().cache().context();
+ GridCacheContext<Object, Object> cacheCtx =
sharedCtx.cacheContext(CU.cacheId(DEFAULT_CACHE_NAME));
+
+ CountDownLatch txStarted = new CountDownLatch(1);
+ CountDownLatch gatewayStopped = new CountDownLatch(1);
+
+ IgniteTxManager tm = Mockito.spy(sharedCtx.tm());
+ sharedCtx.setTxManager(tm);
+
+ Mockito.doAnswer(m -> {
+ // Create tx after gateway was stopped (but not blocked).
+ txStarted.countDown();
+ gatewayStopped.await();
+
+ return m.callRealMethod();
+ }).when(tm).onCreated(Mockito.any(), Mockito.any());
+
+ GridCacheGateway gate = Mockito.spy(cacheCtx.gate());
+ setFieldValue(cacheCtx, "gate", gate);
+
+ Mockito.doAnswer(m -> {
+ // Gateway is ready to block.
+ gatewayStopped.countDown();
+
+ return m.callRealMethod();
+ }).when(gate).onStopped();
+
+ IgniteInternalFuture<Object> fut = runAsync(() -> {
+ txStarted.await();
+
+ grid(1).destroyCache(DEFAULT_CACHE_NAME);
+ });
+
+ try {
+ Integer remoteKey = primaryKey(grid(1).cache(DEFAULT_CACHE_NAME));
+
+ crd.cache(DEFAULT_CACHE_NAME).putAsync(remoteKey, "val").get();
+ }
+ catch (CacheException e) {
+ // No-op.
+ }
+
+ fut.get();
+
+ // Make sure exchange worker processed previous task to get potential
failure.
+ crd.getOrCreateCache("new-cache");
+
+ assertFalse(failure.get());
+ }
}