This is an automated email from the ASF dual-hosted git repository.

timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 728b1de4ab2 IGNITE-25871 Fix CdcCommandLineStartup interruption 
(#12179)
728b1de4ab2 is described below

commit 728b1de4ab2987a63be632abd6d9767424986a5a
Author: Maksim Davydov <[email protected]>
AuthorDate: Fri Jul 18 12:07:42 2025 +0300

    IGNITE-25871 Fix CdcCommandLineStartup interruption (#12179)
---
 .../startup/cmdline/CdcCommandLineStartup.java     |  16 ++-
 .../test/config/cdc/cdc-command-line-consumer.xml  |  46 ++++++
 .../ignite/cdc/CdcCommandLineStartupTest.java      | 157 +++++++++++++++++++++
 .../ignite/testsuites/IgniteSpringTestSuite.java   |   2 +
 4 files changed, 220 insertions(+), 1 deletion(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/startup/cmdline/CdcCommandLineStartup.java
 
b/modules/core/src/main/java/org/apache/ignite/startup/cmdline/CdcCommandLineStartup.java
index 4a69d0641c3..63d645122ad 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/startup/cmdline/CdcCommandLineStartup.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/startup/cmdline/CdcCommandLineStartup.java
@@ -75,6 +75,7 @@ public class CdcCommandLineStartup {
             exit("Invalid arguments: " + args[0], true, -1);
 
         AtomicReference<CdcMain> cdc = new AtomicReference<>();
+        Thread appThread = null;
 
         try {
             cdc.set(CdcLoader.loadCdc(args[0]));
@@ -87,7 +88,7 @@ public class CdcCommandLineStartup {
                 });
             }
 
-            Thread appThread = new Thread(cdc.get());
+            appThread = new Thread(cdc.get());
 
             appThread.start();
 
@@ -95,6 +96,19 @@ public class CdcCommandLineStartup {
         }
         catch (InterruptedException ignore) {
             X.error("CDC was interrupted.");
+
+            if (appThread != null) {
+                // In unit tests, CDC is started and stopped within the same 
JVM. Since JVM shutdown hooks are not
+                // triggered in this scenario, we explicitly interrupt the 
thread to ensure the CDC shuts down cleanly.
+                appThread.interrupt();
+
+                try {
+                    appThread.join();
+                }
+                catch (InterruptedException e) {
+                    // No-op 
+                }
+            }
         }
         catch (Throwable e) {
             e.printStackTrace();
diff --git a/modules/spring/src/test/config/cdc/cdc-command-line-consumer.xml 
b/modules/spring/src/test/config/cdc/cdc-command-line-consumer.xml
new file mode 100644
index 00000000000..f1f6434c305
--- /dev/null
+++ b/modules/spring/src/test/config/cdc/cdc-command-line-consumer.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xmlns:util="http://www.springframework.org/schema/util";
+       xsi:schemaLocation="
+            http://www.springframework.org/schema/beans
+            http://www.springframework.org/schema/beans/spring-beans.xsd
+            http://www.springframework.org/schema/util
+            http://www.springframework.org/schema/util/spring-util.xsd";>
+
+    <bean id="grid.cfg" 
class="org.apache.ignite.configuration.IgniteConfiguration">
+        <property name="dataStorageConfiguration">
+            <bean 
class="org.apache.ignite.configuration.DataStorageConfiguration">
+                <property name="defaultDataRegionConfiguration">
+                    <bean 
class="org.apache.ignite.configuration.DataRegionConfiguration">
+                        <property name="cdcEnabled" value="true" />
+                    </bean>
+                </property>
+            </bean>
+        </property>
+    </bean>
+
+    <bean id="cdc.cfg" class="org.apache.ignite.cdc.CdcConfiguration">
+        <property name="consumer">
+            <bean 
class="org.apache.ignite.cdc.CdcCommandLineStartupTest$CdcCommandLineConsumer"/>
+        </property>
+    </bean>
+</beans>
diff --git 
a/modules/spring/src/test/java/org/apache/ignite/cdc/CdcCommandLineStartupTest.java
 
b/modules/spring/src/test/java/org/apache/ignite/cdc/CdcCommandLineStartupTest.java
new file mode 100644
index 00000000000..0a0050b9ff6
--- /dev/null
+++ 
b/modules/spring/src/test/java/org/apache/ignite/cdc/CdcCommandLineStartupTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.metric.MetricRegistry;
+import org.apache.ignite.startup.cmdline.CdcCommandLineStartup;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/** */
+public class CdcCommandLineStartupTest extends GridCommonAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setDataStorageConfiguration(
+            new DataStorageConfiguration().setDefaultDataRegionConfiguration(
+                new DataRegionConfiguration().setCdcEnabled(true)));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        cleanPersistenceDir();
+    }
+
+    /** */
+    @Test
+    public void testCdcCommandLineStartup() throws Exception {
+        try (IgniteEx ign = startGrid(0)) {
+            IgniteInternalFuture<?> fut = runAsync(() ->
+                CdcCommandLineStartup.main(new String[] 
{"modules/spring/src/test/config/cdc/cdc-command-line-consumer.xml"}),
+                "cmdline-consumer"
+            );
+
+            assertTrue(waitForCondition(CdcCommandLineConsumer::started, 
getTestTimeout()));
+
+            fut.cancel();
+
+            assertTrue(fut.isDone());
+
+            assertTrue(CdcCommandLineConsumer.interrupted());
+        }
+    }
+
+    /** */
+    public static class CdcCommandLineConsumer implements CdcConsumer {
+        /** */
+        private static final AtomicBoolean started = new AtomicBoolean(false);
+
+        /** */
+        private static final AtomicBoolean interrupted = new 
AtomicBoolean(false);
+
+        /** {@inheritDoc} */
+        @Override public void start(MetricRegistry mreg) {
+            try {
+                started.compareAndSet(false, true);
+
+                while (true)
+                    Thread.sleep(100);
+            }
+            catch (InterruptedException e) {
+                try {
+                    Thread.sleep(100);
+                }
+                catch (InterruptedException ex) {
+                    // No-op.
+                }
+
+                interrupted.compareAndSet(false, true);
+
+                Thread.currentThread().interrupt();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean onEvents(Iterator<CdcEvent> events) {
+            events.forEachRemaining(evt -> {
+                // No-op.
+            });
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onTypes(Iterator<BinaryType> types) {
+            types.forEachRemaining(type -> {
+                // No-op.
+            });
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMappings(Iterator<TypeMapping> mappings) {
+            mappings.forEachRemaining(mapping -> {
+                // No-op.
+            });
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onCacheChange(Iterator<CdcCacheEvent> 
cacheEvents) {
+            cacheEvents.forEachRemaining(cacheEvt -> {
+                // No-op.
+            });
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onCacheDestroy(Iterator<Integer> caches) {
+            caches.forEachRemaining(cache -> {
+                // No-op.
+            });
+        }
+
+        /** {@inheritDoc} */
+        @Override public void stop() {
+            started.compareAndSet(true, false);
+        }
+
+        /** */
+        public static boolean started() {
+            return started.get();
+        }
+
+        /** */
+        public static boolean interrupted() {
+            return interrupted.get();
+        }
+    }
+}
diff --git 
a/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
 
b/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
index 36e7b4c2937..c012cc43542 100644
--- 
a/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
+++ 
b/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
@@ -21,6 +21,7 @@ import 
org.apache.ignite.cache.store.jdbc.CacheJdbcBlobStoreFactorySelfTest;
 import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactorySelfTest;
 import org.apache.ignite.cache.store.jdbc.CachePojoStoreXmlSelfTest;
 import 
org.apache.ignite.cache.store.jdbc.CachePojoStoreXmlWithSqlEscapeSelfTest;
+import org.apache.ignite.cdc.CdcCommandLineStartupTest;
 import org.apache.ignite.cdc.CdcConfigurationTest;
 import org.apache.ignite.cluster.ClusterStateXmlPropertiesTest;
 import org.apache.ignite.encryption.SpringEncryptedCacheRestartClientTest;
@@ -87,6 +88,7 @@ import org.junit.runners.Suite;
 
     // CDC tests.
     CdcConfigurationTest.class,
+    CdcCommandLineStartupTest.class,
 
     SqlPlanHistoryConfigTest.class,
 })

Reply via email to