This is an automated email from the ASF dual-hosted git repository.
ivandasch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 6229b46c277 IGNITE-18935 Fix late stopping of TTL workers during
deactivation leads to corrupted PDS (#10570)
6229b46c277 is described below
commit 6229b46c277707743da4f5fce881450a8a6ca151
Author: Ivan Daschinskiy <[email protected]>
AuthorDate: Tue Mar 7 11:03:29 2023 +0300
IGNITE-18935 Fix late stopping of TTL workers during deactivation leads to
corrupted PDS (#10570)
---
modules/compress/pom.xml | 7 +
.../processors/cache/GridCacheProcessor.java | 58 ++++--
...IgnitePdsWithTtlExpirationOnDeactivateTest.java | 222 +++++++++++++++++++++
.../ignite/testsuites/IgnitePdsTestSuite.java | 2 +
4 files changed, 268 insertions(+), 21 deletions(-)
diff --git a/modules/compress/pom.xml b/modules/compress/pom.xml
index f9c4db51f42..c07bb0b7f8a 100644
--- a/modules/compress/pom.xml
+++ b/modules/compress/pom.xml
@@ -131,6 +131,13 @@
<version>${guava.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>${commons.lang3.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 53b29937c6f..6cb8c84b5b0 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2744,9 +2744,6 @@ public class GridCacheProcessor extends
GridProcessorAdapter {
// Wait until all evictions are finished.
grpsToStop.forEach(t ->
sharedCtx.evict().onCacheGroupStopped(t.get1()));
- if (!exchActions.cacheStopRequests().isEmpty())
- removeOffheapListenerAfterCheckpoint(grpsToStop);
-
Map<Integer, List<ExchangeActions.CacheActionData>> cachesToStop =
exchActions.cacheStopRequests().stream()
.collect(groupingBy(action -> action.descriptor().groupId()));
@@ -2763,31 +2760,50 @@ public class GridCacheProcessor extends
GridProcessorAdapter {
CacheGroupContext gctx = cacheGrps.get(groupId);
- if (gctx != null)
- gctx.preloader().pause();
+ if (gctx != null) {
+ final String msg = "Failed to wait for topology
update, cache group is stopping.";
- try {
- if (gctx != null) {
- final String msg = "Failed to wait for topology
update, cache group is stopping.";
+ // If snapshot operation in progress we must throw
CacheStoppedException
+ // for correct cache proxy restart. For more details
see
+ // IgniteCacheProxy.cacheException()
+ gctx.affinity().cancelFutures(new
CacheStoppedException(msg));
+ }
- // If snapshot operation in progress we must throw
CacheStoppedException
- // for correct cache proxy restart. For more
details see
- // IgniteCacheProxy.cacheException()
- gctx.affinity().cancelFutures(new
CacheStoppedException(msg));
- }
+ for (ExchangeActions.CacheActionData action :
cachesToStopByGrp.getValue()) {
+
context().tm().rollbackTransactionsForStoppingCache(action.descriptor().cacheId());
- for (ExchangeActions.CacheActionData action :
cachesToStopByGrp.getValue()) {
-
context().tm().rollbackTransactionsForStoppingCache(action.descriptor().cacheId());
+ stopGateway(action.request());
- stopGateway(action.request());
+ String cacheName = action.request().cacheName();
- String cacheName = action.request().cacheName();
+ GridCacheAdapter<?, ?> cache = caches.get(cacheName);
- // TTL manager has to be unregistered before the
checkpointReadLock is acquired.
- GridCacheAdapter<?, ?> cache =
caches.get(cacheName);
+ if (cache != null)
+ cache.context().ttl().unregister();
+ }
- if (cache != null)
- cache.context().ttl().unregister();
+ return null;
+ }
+ );
+
+ if (!exchActions.cacheStopRequests().isEmpty())
+ removeOffheapListenerAfterCheckpoint(grpsToStop);
+
+ doInParallel(
+ parallelismLvl,
+ sharedCtx.kernalContext().pools().getSystemExecutorService(),
+ cachesToStop.entrySet(),
+ cachesToStopByGrp -> {
+ Integer groupId = cachesToStopByGrp.getKey();
+
+ CacheGroupContext gctx = cacheGrps.get(groupId);
+
+ if (gctx != null)
+ gctx.preloader().pause();
+
+ try {
+ for (ExchangeActions.CacheActionData action :
cachesToStopByGrp.getValue()) {
+ String cacheName = action.request().cacheName();
sharedCtx.database().checkpointReadLock();
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlExpirationOnDeactivateTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlExpirationOnDeactivateTest.java
new file mode 100644
index 00000000000..26a82def544
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlExpirationOnDeactivateTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import javax.cache.expiry.AccessedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureHandler;
+import org.apache.ignite.failure.NoOpFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.MvccFeatureChecker;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cluster.ClusterState.INACTIVE;
+import static
org.apache.ignite.testframework.GridTestUtils.runMultiThreadedAsync;
+
+/**
+ * Tests if TTL worker is correctly stopped on deactivation and PDS is not
corrupted after restart.
+ */
+@WithSystemProperty(key =
IgniteSystemProperties.IGNITE_UNWIND_THROTTLING_TIMEOUT, value = "5")
+public class IgnitePdsWithTtlExpirationOnDeactivateTest extends
GridCommonAbstractTest {
+ /** */
+ private static final String CACHE_NAME_ATOMIC = "expirable-cache-atomic";
+
+ /** */
+ private static final int EXPIRATION_TIMEOUT = 5_000;
+
+ /** */
+ private static final String PAYLOAD =
RandomStringUtils.randomAlphanumeric(10000);
+
+ /** */
+ private static final int WORKLOAD_THREADS_CNT =
Runtime.getRuntime().availableProcessors();
+
+ /** Failure handler triggered flag. */
+ private volatile boolean failureHndTriggered;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+
MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.EXPIRATION);
+
+ super.beforeTest();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ final IgniteConfiguration cfg =
super.getConfiguration(igniteInstanceName);
+
+ DataRegionConfiguration dfltRegion = new DataRegionConfiguration()
+ .setMaxSize(512 * 1024 * 1024)
+ .setCheckpointPageBufferSize(64 * 1024 * 1024)
+ .setPersistenceEnabled(true);
+
+ // Setting MaxWalArchiveSize to a relatively small value leads to
frequent checkpoints (too many WAL segments).
+ cfg.setDataStorageConfiguration(
+ new DataStorageConfiguration()
+ .setWalSegmentSize(8 * 1024 * 1024)
+ .setMaxWalArchiveSize(16 * 1024 * 1024)
+ .setCheckpointFrequency(10_000)
+ .setDefaultDataRegionConfiguration(dfltRegion)
+ .setWalMode(WALMode.LOG_ONLY));
+
+ cfg.setCacheConfiguration(getCacheConfiguration(CACHE_NAME_ATOMIC));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected FailureHandler getFailureHandler(String
igniteInstanceName) {
+ return new NoOpFailureHandler() {
+ @Override protected boolean handle(Ignite ignite, FailureContext
failureCtx) {
+ failureHndTriggered = true;
+
+ return super.handle(ignite, failureCtx);
+ }
+ };
+ }
+
+ /**
+ * Returns a new cache configuration with the given name and {@code
GROUP_NAME} group.
+ *
+ * @param name Cache name.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<?, ?> getCacheConfiguration(String name) {
+ CacheConfiguration<?, ?> ccfg = new CacheConfiguration<>();
+
+ ccfg.setName(name);
+ ccfg.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(new
Duration(TimeUnit.MILLISECONDS, EXPIRATION_TIMEOUT)));
+ ccfg.setEagerTtl(true);
+
+ ccfg.setAtomicityMode(ATOMIC);
+
+ return ccfg;
+ }
+
+ /** */
+ @Test
+ public void testStartAfterDeactivateWithTtlExpiring() throws Exception {
+ IgniteEx srv = startGrid(0);
+
+ srv.cluster().state(ClusterState.ACTIVE);
+
+ IgniteCache<Integer, String> cache = srv.cache(CACHE_NAME_ATOMIC);
+
+ AtomicBoolean timeoutReached = new AtomicBoolean(false);
+
+ AtomicInteger threadId = new AtomicInteger(0);
+
+ IgniteInternalFuture<?> ldrFut = runMultiThreadedAsync(() -> {
+ int id = threadId.getAndIncrement();
+
+ int i = 0;
+ while (!timeoutReached.get()) {
+ cache.put(id * 1_000_000 + i, PAYLOAD);
+ i++;
+ }
+ }, WORKLOAD_THREADS_CNT, "loader");
+
+ doSleep(EXPIRATION_TIMEOUT);
+ timeoutReached.set(true);
+ ldrFut.get();
+
+ // Add listener on "cache stop" event, that slow down a little been
sys pool workers.
+ addCheckpointListener(srv, new CheckpointListener() {
+ @Override public void onMarkCheckpointBegin(Context ctx) {
+ // No-op.
+ }
+
+ @Override public void onCheckpointBegin(Context ctx) {
+ // No-op.
+ }
+
+ @Override public void beforeCheckpointBegin(Context ctx) {
+ // No-op.
+ }
+
+ @Override public void afterCheckpointEnd(Context ctx) {
+ if ("caches stop".equals(ctx.progress().reason())) {
+ ExecutorService sysPool =
srv.context().pools().getSystemExecutorService();
+ try {
+ sysPool.invokeAll(IntStream.range(0,
WORKLOAD_THREADS_CNT).mapToObj(i -> new Callable<Void>() {
+ @Override public Void call() {
+ doSleep(EXPIRATION_TIMEOUT);
+ return null;
+ }
+ }).collect(Collectors.toList()));
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ });
+
+ // Deactivate and restart.
+ srv.cluster().state(INACTIVE);
+ stopGrid(0);
+ startGrid(0);
+
+ GridTestUtils.waitForCondition(() -> failureHndTriggered,
EXPIRATION_TIMEOUT);
+
+ assertFalse(failureHndTriggered);
+ }
+
+ /** */
+ private void addCheckpointListener(IgniteEx grid, CheckpointListener lsnr)
{
+ GridCacheDatabaseSharedManager dbMgr =
(GridCacheDatabaseSharedManager)grid.context().cache().context()
+ .database();
+
+ dbMgr.addCheckpointListener(lsnr);
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
index 83321a4d901..9bc0f91915a 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
@@ -35,6 +35,7 @@ import
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsSingleNo
import
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsSporadicDataRecordsOnBackupTest;
import
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsCacheRestoreTest;
import
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsDataRegionMetricsTest;
+import
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWithTtlExpirationOnDeactivateTest;
import
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWithTtlTest;
import
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWithTtlTest2;
import
org.apache.ignite.internal.processors.cache.persistence.db.file.DefaultPageSizeBackwardsCompatibilityTest;
@@ -116,6 +117,7 @@ public class IgnitePdsTestSuite {
GridTestUtils.addTestIfNeeded(suite,
IgniteDbPutGetWithCacheStoreTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgnitePdsWithTtlTest.class,
ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgnitePdsWithTtlTest2.class,
ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite,
IgnitePdsWithTtlExpirationOnDeactivateTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
IgnitePdsSporadicDataRecordsOnBackupTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
IgniteClusterActivateDeactivateTestWithPersistence.class, ignoredTests);