This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 728b1de4ab2 IGNITE-25871 Fix CdcCommandLineStartup interruption
(#12179)
728b1de4ab2 is described below
commit 728b1de4ab2987a63be632abd6d9767424986a5a
Author: Maksim Davydov <[email protected]>
AuthorDate: Fri Jul 18 12:07:42 2025 +0300
IGNITE-25871 Fix CdcCommandLineStartup interruption (#12179)
---
.../startup/cmdline/CdcCommandLineStartup.java | 16 ++-
.../test/config/cdc/cdc-command-line-consumer.xml | 46 ++++++
.../ignite/cdc/CdcCommandLineStartupTest.java | 157 +++++++++++++++++++++
.../ignite/testsuites/IgniteSpringTestSuite.java | 2 +
4 files changed, 220 insertions(+), 1 deletion(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/startup/cmdline/CdcCommandLineStartup.java
b/modules/core/src/main/java/org/apache/ignite/startup/cmdline/CdcCommandLineStartup.java
index 4a69d0641c3..63d645122ad 100644
---
a/modules/core/src/main/java/org/apache/ignite/startup/cmdline/CdcCommandLineStartup.java
+++
b/modules/core/src/main/java/org/apache/ignite/startup/cmdline/CdcCommandLineStartup.java
@@ -75,6 +75,7 @@ public class CdcCommandLineStartup {
exit("Invalid arguments: " + args[0], true, -1);
AtomicReference<CdcMain> cdc = new AtomicReference<>();
+ Thread appThread = null;
try {
cdc.set(CdcLoader.loadCdc(args[0]));
@@ -87,7 +88,7 @@ public class CdcCommandLineStartup {
});
}
- Thread appThread = new Thread(cdc.get());
+ appThread = new Thread(cdc.get());
appThread.start();
@@ -95,6 +96,19 @@ public class CdcCommandLineStartup {
}
catch (InterruptedException ignore) {
X.error("CDC was interrupted.");
+
+ if (appThread != null) {
+ // In unit tests, CDC is started and stopped within the same
JVM. Since JVM shutdown hooks are not
+ // triggered in this scenario, we explicitly interrupt the
thread to ensure the CDC shuts down cleanly.
+ appThread.interrupt();
+
+ try {
+ appThread.join();
+ }
+ catch (InterruptedException e) {
+ // No-op
+ }
+ }
}
catch (Throwable e) {
e.printStackTrace();
diff --git a/modules/spring/src/test/config/cdc/cdc-command-line-consumer.xml
b/modules/spring/src/test/config/cdc/cdc-command-line-consumer.xml
new file mode 100644
index 00000000000..f1f6434c305
--- /dev/null
+++ b/modules/spring/src/test/config/cdc/cdc-command-line-consumer.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ http://www.springframework.org/schema/util/spring-util.xsd">
+
+ <bean id="grid.cfg"
class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="dataStorageConfiguration">
+ <bean
class="org.apache.ignite.configuration.DataStorageConfiguration">
+ <property name="defaultDataRegionConfiguration">
+ <bean
class="org.apache.ignite.configuration.DataRegionConfiguration">
+ <property name="cdcEnabled" value="true" />
+ </bean>
+ </property>
+ </bean>
+ </property>
+ </bean>
+
+ <bean id="cdc.cfg" class="org.apache.ignite.cdc.CdcConfiguration">
+ <property name="consumer">
+ <bean
class="org.apache.ignite.cdc.CdcCommandLineStartupTest$CdcCommandLineConsumer"/>
+ </property>
+ </bean>
+</beans>
diff --git
a/modules/spring/src/test/java/org/apache/ignite/cdc/CdcCommandLineStartupTest.java
b/modules/spring/src/test/java/org/apache/ignite/cdc/CdcCommandLineStartupTest.java
new file mode 100644
index 00000000000..0a0050b9ff6
--- /dev/null
+++
b/modules/spring/src/test/java/org/apache/ignite/cdc/CdcCommandLineStartupTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.metric.MetricRegistry;
+import org.apache.ignite.startup.cmdline.CdcCommandLineStartup;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/** */
+public class CdcCommandLineStartupTest extends GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setDataStorageConfiguration(
+ new DataStorageConfiguration().setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration().setCdcEnabled(true)));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ cleanPersistenceDir();
+ }
+
+ /** */
+ @Test
+ public void testCdcCommandLineStartup() throws Exception {
+ try (IgniteEx ign = startGrid(0)) {
+ IgniteInternalFuture<?> fut = runAsync(() ->
+ CdcCommandLineStartup.main(new String[]
{"modules/spring/src/test/config/cdc/cdc-command-line-consumer.xml"}),
+ "cmdline-consumer"
+ );
+
+ assertTrue(waitForCondition(CdcCommandLineConsumer::started,
getTestTimeout()));
+
+ fut.cancel();
+
+ assertTrue(fut.isDone());
+
+ assertTrue(CdcCommandLineConsumer.interrupted());
+ }
+ }
+
+ /** */
+ public static class CdcCommandLineConsumer implements CdcConsumer {
+ /** */
+ private static final AtomicBoolean started = new AtomicBoolean(false);
+
+ /** */
+ private static final AtomicBoolean interrupted = new
AtomicBoolean(false);
+
+ /** {@inheritDoc} */
+ @Override public void start(MetricRegistry mreg) {
+ try {
+ started.compareAndSet(false, true);
+
+ while (true)
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e) {
+ try {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException ex) {
+ // No-op.
+ }
+
+ interrupted.compareAndSet(false, true);
+
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onEvents(Iterator<CdcEvent> events) {
+ events.forEachRemaining(evt -> {
+ // No-op.
+ });
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onTypes(Iterator<BinaryType> types) {
+ types.forEachRemaining(type -> {
+ // No-op.
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMappings(Iterator<TypeMapping> mappings) {
+ mappings.forEachRemaining(mapping -> {
+ // No-op.
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onCacheChange(Iterator<CdcCacheEvent>
cacheEvents) {
+ cacheEvents.forEachRemaining(cacheEvt -> {
+ // No-op.
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onCacheDestroy(Iterator<Integer> caches) {
+ caches.forEachRemaining(cache -> {
+ // No-op.
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() {
+ started.compareAndSet(true, false);
+ }
+
+ /** */
+ public static boolean started() {
+ return started.get();
+ }
+
+ /** */
+ public static boolean interrupted() {
+ return interrupted.get();
+ }
+ }
+}
diff --git
a/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
b/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
index 36e7b4c2937..c012cc43542 100644
---
a/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
+++
b/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
@@ -21,6 +21,7 @@ import
org.apache.ignite.cache.store.jdbc.CacheJdbcBlobStoreFactorySelfTest;
import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactorySelfTest;
import org.apache.ignite.cache.store.jdbc.CachePojoStoreXmlSelfTest;
import
org.apache.ignite.cache.store.jdbc.CachePojoStoreXmlWithSqlEscapeSelfTest;
+import org.apache.ignite.cdc.CdcCommandLineStartupTest;
import org.apache.ignite.cdc.CdcConfigurationTest;
import org.apache.ignite.cluster.ClusterStateXmlPropertiesTest;
import org.apache.ignite.encryption.SpringEncryptedCacheRestartClientTest;
@@ -87,6 +88,7 @@ import org.junit.runners.Suite;
// CDC tests.
CdcConfigurationTest.class,
+ CdcCommandLineStartupTest.class,
SqlPlanHistoryConfigTest.class,
})