This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 288cfd5d945 IGNITE-27325 CdcMain clears corrupted cdc state files
(#12577)
288cfd5d945 is described below
commit 288cfd5d945f75017adac973c6a6f49bd81b7682
Author: Maksim Timonin <[email protected]>
AuthorDate: Wed Dec 17 12:03:59 2025 +0300
IGNITE-27325 CdcMain clears corrupted cdc state files (#12577)
---
.../ignite/internal/cdc/CdcConsumerState.java | 37 +++--
.../ignite/cdc/CorruptedCdcConsumerStateTest.java | 158 +++++++++++++++++++++
.../ignite/testsuites/IgnitePdsTestSuite2.java | 2 +
3 files changed, 188 insertions(+), 9 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerState.java
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerState.java
index 7b5b87734e3..e8fdbbd0990 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerState.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerState.java
@@ -58,7 +58,6 @@ import static
org.apache.ignite.internal.processors.cache.persistence.wal.WALPoi
* @see CdcMain
*/
public class CdcConsumerState {
-
/** Log. */
private final IgniteLogger log;
@@ -161,7 +160,7 @@ public class CdcConsumerState {
* @return Saved state.
*/
public Map<Integer, Long> loadCaches() {
- Map<Integer, Long> state = load(ft.cdcCachesState(), HashMap::new);
+ Map<Integer, Long> state =
loadOrDefaultIfCorrupted(ft.cdcCachesState(), HashMap::new, true);
log.info("Initial caches state loaded [cachesCnt=" + state.size() +
']');
@@ -187,7 +186,7 @@ public class CdcConsumerState {
* @return Saved state.
*/
public Set<T2<Integer, Byte>> loadMappingsState() {
- Set<T2<Integer, Byte>> state = load(ft.cdcMappingsState(),
HashSet::new);
+ Set<T2<Integer, Byte>> state =
loadOrDefaultIfCorrupted(ft.cdcMappingsState(), HashSet::new, true);
assert state != null;
@@ -207,7 +206,7 @@ public class CdcConsumerState {
* @return Saved state.
*/
public Map<Integer, Long> loadTypesState() {
- Map<Integer, Long> state = load(ft.cdcTypesState(), HashMap::new);
+ Map<Integer, Long> state =
loadOrDefaultIfCorrupted(ft.cdcTypesState(), HashMap::new, true);
assert state != null;
@@ -230,16 +229,36 @@ public class CdcConsumerState {
Files.move(tmp, file, ATOMIC_MOVE, REPLACE_EXISTING);
}
- /** Loads data from path. */
- private <D> D load(Path state, Supplier<D> dflt) {
+ /**
+ * Loads data from path.
+ * @param state Path to load data from.
+ * @param dflt Default value, if given file not exist or corrupted.
+ * @param delIfCorrupted Delete given file if corrupted and return {@code
dlft} value.
+ */
+ private <D> D loadOrDefaultIfCorrupted(Path state, Supplier<D> dflt,
boolean delIfCorrupted) {
if (!Files.exists(state))
return dflt.get();
try (ObjectInputStream ois = new
ObjectInputStream(Files.newInputStream(state))) {
-
return (D)ois.readObject();
}
- catch (IOException | ClassNotFoundException e) {
+ catch (IOException e) {
+ if (delIfCorrupted) {
+ try {
+ log.warning("State file was corrupted. Will remove the
file and restore state with default [file=" + state + ']');
+
+ Files.delete(state);
+ }
+ catch (IOException ioe) {
+ throw new RuntimeException(e);
+ }
+
+ return dflt.get();
+ }
+
+ throw new RuntimeException(e);
+ }
+ catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
@@ -250,7 +269,7 @@ public class CdcConsumerState {
* @return CDC mode state.
*/
public CdcMode loadCdcMode() {
- CdcMode state = load(ft.cdcModeState(), () ->
CdcMode.IGNITE_NODE_ACTIVE);
+ CdcMode state = loadOrDefaultIfCorrupted(ft.cdcModeState(), () ->
CdcMode.IGNITE_NODE_ACTIVE, false);
log.info("CDC mode loaded [" + state + ']');
diff --git
a/modules/core/src/test/java/org/apache/ignite/cdc/CorruptedCdcConsumerStateTest.java
b/modules/core/src/test/java/org/apache/ignite/cdc/CorruptedCdcConsumerStateTest.java
new file mode 100644
index 00000000000..302a3041347
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/cdc/CorruptedCdcConsumerStateTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cdc.CdcMain;
+import
org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.metric.MetricRegistry;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/** */
+public class CorruptedCdcConsumerStateTest extends AbstractCdcTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setCacheConfiguration(new
CacheConfiguration<>(DEFAULT_CACHE_NAME));
+
+ cfg.setDataStorageConfiguration(
+ new DataStorageConfiguration()
+ .setWalForceArchiveTimeout(100)
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration().setCdcEnabled(true)
+ ));
+
+ return cfg;
+ }
+
+ /** */
+ @Test
+ public void testCdcMainClearsCorruptedFiles() throws Exception {
+ try (Ignite ign = startGrid(0)) {
+ CountDownLatch sendCacheLatch = new CountDownLatch(1);
+
+ CdcMain cdc = createCdc(new TestCdcConsumer(sendCacheLatch),
ign.configuration());
+
+ IgniteInternalFuture<?> cdcFut = runAsync(cdc);
+
+ // Force writing consumer state to ./state directory.
+ try {
+ ign.getOrCreateCache(DEFAULT_CACHE_NAME).put(0, 0);
+
+ U.await(sendCacheLatch);
+ }
+ finally {
+ cdcFut.cancel();
+ }
+
+ // Corrupt data.
+ NodeFileTree ft = GridTestUtils.getFieldValue(cdc, "ft");
+
+ Path cdcCacheState = ft.cdcCachesState();
+
+ byte[] corrupted = new byte[10];
+ ThreadLocalRandom.current().nextBytes(corrupted);
+
+ Files.write(cdcCacheState, corrupted);
+
+ sendCacheLatch = new CountDownLatch(1);
+
+ cdcFut = runAsync(createCdc(new TestCdcConsumer(sendCacheLatch),
ign.configuration()));
+ cdcFut.listen(sendCacheLatch::countDown);
+
+ // Force writing consumer state to ./state directory.
+ try {
+ ign.getOrCreateCache(DEFAULT_CACHE_NAME).put(0, 0);
+
+ U.await(sendCacheLatch);
+
+ assertFalse(cdcFut.isDone());
+ assertNull(cdcFut.error());
+ }
+ finally {
+ cdcFut.cancel();
+ }
+ }
+ }
+
+ /** */
+ private static final class TestCdcConsumer implements CdcConsumer {
+ /** */
+ private final CountDownLatch latch;
+
+ /** */
+ TestCdcConsumer(CountDownLatch latch) {
+ this.latch = latch;
+ }
+
+ /** */
+ @Override public void start(MetricRegistry mreg) {
+ // No-op.
+ }
+
+ /** */
+ @Override public boolean onEvents(Iterator<CdcEvent> evts) {
+ return false;
+ }
+
+ /** */
+ @Override public void onTypes(Iterator<BinaryType> types) {
+ types.forEachRemaining(t -> {});
+ }
+
+ /** */
+ @Override public void onMappings(Iterator<TypeMapping> mappings) {
+ mappings.forEachRemaining(t -> {});
+ }
+
+ /** */
+ @Override public void onCacheChange(Iterator<CdcCacheEvent> cacheEvts)
{
+ cacheEvts.forEachRemaining(e -> {
+ if (e.cacheId() == CU.cacheId(DEFAULT_CACHE_NAME))
+ latch.countDown();
+ });
+ }
+
+ /** */
+ @Override public void onCacheDestroy(Iterator<Integer> caches) {
+ caches.forEachRemaining(c -> {});
+ }
+
+ /** */
+ @Override public void stop() {
+ // No-op.
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 0d46f7390b1..de17138ef67 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -27,6 +27,7 @@ import org.apache.ignite.cdc.CdcManagerTest;
import org.apache.ignite.cdc.CdcNonDefaultWorkDirTest;
import org.apache.ignite.cdc.CdcPushMetricsExporterTest;
import org.apache.ignite.cdc.CdcSelfTest;
+import org.apache.ignite.cdc.CorruptedCdcConsumerStateTest;
import org.apache.ignite.cdc.RestartWithWalForceArchiveTimeoutTest;
import org.apache.ignite.cdc.TransformedCdcSelfTest;
import org.apache.ignite.cdc.WalForCdcTest;
@@ -161,6 +162,7 @@ public class IgnitePdsTestSuite2 {
GridTestUtils.addTestIfNeeded(suite, TransformedCdcSelfTest.class,
ignoredTests);
GridTestUtils.addTestIfNeeded(suite, CdcCacheVersionTest.class,
ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
RestartWithWalForceArchiveTimeoutTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite,
CorruptedCdcConsumerStateTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, WalRolloverOnStopTest.class,
ignoredTests);
GridTestUtils.addTestIfNeeded(suite, WalForCdcTest.class,
ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
CdcCacheConfigOnRestartTest.class, ignoredTests);