This is an automated email from the ASF dual-hosted git repository.

timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 288cfd5d945 IGNITE-27325 CdcMain clears corrupted cdc state files 
(#12577)
288cfd5d945 is described below

commit 288cfd5d945f75017adac973c6a6f49bd81b7682
Author: Maksim Timonin <[email protected]>
AuthorDate: Wed Dec 17 12:03:59 2025 +0300

    IGNITE-27325 CdcMain clears corrupted cdc state files (#12577)
---
 .../ignite/internal/cdc/CdcConsumerState.java      |  37 +++--
 .../ignite/cdc/CorruptedCdcConsumerStateTest.java  | 158 +++++++++++++++++++++
 .../ignite/testsuites/IgnitePdsTestSuite2.java     |   2 +
 3 files changed, 188 insertions(+), 9 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerState.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerState.java
index 7b5b87734e3..e8fdbbd0990 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerState.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerState.java
@@ -58,7 +58,6 @@ import static 
org.apache.ignite.internal.processors.cache.persistence.wal.WALPoi
  * @see CdcMain
  */
 public class CdcConsumerState {
-
     /** Log. */
     private final IgniteLogger log;
 
@@ -161,7 +160,7 @@ public class CdcConsumerState {
      * @return Saved state.
      */
     public Map<Integer, Long> loadCaches() {
-        Map<Integer, Long> state = load(ft.cdcCachesState(), HashMap::new);
+        Map<Integer, Long> state = 
loadOrDefaultIfCorrupted(ft.cdcCachesState(), HashMap::new, true);
 
         log.info("Initial caches state loaded [cachesCnt=" + state.size() + 
']');
 
@@ -187,7 +186,7 @@ public class CdcConsumerState {
      * @return Saved state.
      */
     public Set<T2<Integer, Byte>> loadMappingsState() {
-        Set<T2<Integer, Byte>> state = load(ft.cdcMappingsState(), 
HashSet::new);
+        Set<T2<Integer, Byte>> state = 
loadOrDefaultIfCorrupted(ft.cdcMappingsState(), HashSet::new, true);
 
         assert state != null;
 
@@ -207,7 +206,7 @@ public class CdcConsumerState {
      * @return Saved state.
      */
     public Map<Integer, Long> loadTypesState() {
-        Map<Integer, Long> state = load(ft.cdcTypesState(), HashMap::new);
+        Map<Integer, Long> state = 
loadOrDefaultIfCorrupted(ft.cdcTypesState(), HashMap::new, true);
 
         assert state != null;
 
@@ -230,16 +229,36 @@ public class CdcConsumerState {
         Files.move(tmp, file, ATOMIC_MOVE, REPLACE_EXISTING);
     }
 
-    /** Loads data from path. */
-    private <D> D load(Path state, Supplier<D> dflt) {
+    /**
+     * Loads data from path.
+     * @param state Path to load data from.
+     * @param dflt Default value, if given file not exist or corrupted.
+     * @param delIfCorrupted Delete given file if corrupted and return {@code 
dlft} value.
+     */
+    private <D> D loadOrDefaultIfCorrupted(Path state, Supplier<D> dflt, 
boolean delIfCorrupted) {
         if (!Files.exists(state))
             return dflt.get();
 
         try (ObjectInputStream ois = new 
ObjectInputStream(Files.newInputStream(state))) {
-
             return (D)ois.readObject();
         }
-        catch (IOException | ClassNotFoundException e) {
+        catch (IOException e) {
+            if (delIfCorrupted) {
+                try {
+                    log.warning("State file was corrupted. Will remove the 
file and restore state with default [file=" + state + ']');
+
+                    Files.delete(state);
+                }
+                catch (IOException ioe) {
+                    throw new RuntimeException(e);
+                }
+
+                return dflt.get();
+            }
+
+            throw new RuntimeException(e);
+        }
+        catch (ClassNotFoundException e) {
             throw new RuntimeException(e);
         }
     }
@@ -250,7 +269,7 @@ public class CdcConsumerState {
      * @return CDC mode state.
      */
     public CdcMode loadCdcMode() {
-        CdcMode state = load(ft.cdcModeState(), () -> 
CdcMode.IGNITE_NODE_ACTIVE);
+        CdcMode state = loadOrDefaultIfCorrupted(ft.cdcModeState(), () -> 
CdcMode.IGNITE_NODE_ACTIVE, false);
 
         log.info("CDC mode loaded [" + state + ']');
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/cdc/CorruptedCdcConsumerStateTest.java
 
b/modules/core/src/test/java/org/apache/ignite/cdc/CorruptedCdcConsumerStateTest.java
new file mode 100644
index 00000000000..302a3041347
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/cdc/CorruptedCdcConsumerStateTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cdc.CdcMain;
+import 
org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.metric.MetricRegistry;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/** */
+public class CorruptedCdcConsumerStateTest extends AbstractCdcTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setCacheConfiguration(new 
CacheConfiguration<>(DEFAULT_CACHE_NAME));
+
+        cfg.setDataStorageConfiguration(
+            new DataStorageConfiguration()
+                .setWalForceArchiveTimeout(100)
+                .setDefaultDataRegionConfiguration(
+                    new DataRegionConfiguration().setCdcEnabled(true)
+            ));
+
+        return cfg;
+    }
+
+    /** */
+    @Test
+    public void testCdcMainClearsCorruptedFiles() throws Exception {
+        try (Ignite ign = startGrid(0)) {
+            CountDownLatch sendCacheLatch = new CountDownLatch(1);
+
+            CdcMain cdc = createCdc(new TestCdcConsumer(sendCacheLatch), 
ign.configuration());
+
+            IgniteInternalFuture<?> cdcFut = runAsync(cdc);
+
+            // Force writing consumer state to ./state directory.
+            try {
+                ign.getOrCreateCache(DEFAULT_CACHE_NAME).put(0, 0);
+
+                U.await(sendCacheLatch);
+            }
+            finally {
+                cdcFut.cancel();
+            }
+
+            // Corrupt data.
+            NodeFileTree ft = GridTestUtils.getFieldValue(cdc, "ft");
+
+            Path cdcCacheState = ft.cdcCachesState();
+
+            byte[] corrupted = new byte[10];
+            ThreadLocalRandom.current().nextBytes(corrupted);
+
+            Files.write(cdcCacheState, corrupted);
+
+            sendCacheLatch = new CountDownLatch(1);
+
+            cdcFut = runAsync(createCdc(new TestCdcConsumer(sendCacheLatch), 
ign.configuration()));
+            cdcFut.listen(sendCacheLatch::countDown);
+
+            // Force writing consumer state to ./state directory.
+            try {
+                ign.getOrCreateCache(DEFAULT_CACHE_NAME).put(0, 0);
+
+                U.await(sendCacheLatch);
+
+                assertFalse(cdcFut.isDone());
+                assertNull(cdcFut.error());
+            }
+            finally {
+                cdcFut.cancel();
+            }
+        }
+    }
+
+    /** */
+    private static final class TestCdcConsumer implements CdcConsumer {
+        /** */
+        private final CountDownLatch latch;
+
+        /** */
+        TestCdcConsumer(CountDownLatch latch) {
+            this.latch = latch;
+        }
+
+        /** */
+        @Override public void start(MetricRegistry mreg) {
+            // No-op.
+        }
+
+        /** */
+        @Override public boolean onEvents(Iterator<CdcEvent> evts) {
+            return false;
+        }
+
+        /** */
+        @Override public void onTypes(Iterator<BinaryType> types) {
+            types.forEachRemaining(t -> {});
+        }
+
+        /** */
+        @Override public void onMappings(Iterator<TypeMapping> mappings) {
+            mappings.forEachRemaining(t -> {});
+        }
+
+        /** */
+        @Override public void onCacheChange(Iterator<CdcCacheEvent> cacheEvts) 
{
+            cacheEvts.forEachRemaining(e -> {
+                if (e.cacheId() == CU.cacheId(DEFAULT_CACHE_NAME))
+                    latch.countDown();
+            });
+        }
+
+        /** */
+        @Override public void onCacheDestroy(Iterator<Integer> caches) {
+            caches.forEachRemaining(c -> {});
+        }
+
+        /** */
+        @Override public void stop() {
+            // No-op.
+        }
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 0d46f7390b1..de17138ef67 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -27,6 +27,7 @@ import org.apache.ignite.cdc.CdcManagerTest;
 import org.apache.ignite.cdc.CdcNonDefaultWorkDirTest;
 import org.apache.ignite.cdc.CdcPushMetricsExporterTest;
 import org.apache.ignite.cdc.CdcSelfTest;
+import org.apache.ignite.cdc.CorruptedCdcConsumerStateTest;
 import org.apache.ignite.cdc.RestartWithWalForceArchiveTimeoutTest;
 import org.apache.ignite.cdc.TransformedCdcSelfTest;
 import org.apache.ignite.cdc.WalForCdcTest;
@@ -161,6 +162,7 @@ public class IgnitePdsTestSuite2 {
         GridTestUtils.addTestIfNeeded(suite, TransformedCdcSelfTest.class, 
ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, CdcCacheVersionTest.class, 
ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
RestartWithWalForceArchiveTimeoutTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, 
CorruptedCdcConsumerStateTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, WalRolloverOnStopTest.class, 
ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, WalForCdcTest.class, 
ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
CdcCacheConfigOnRestartTest.class, ignoredTests);

Reply via email to