This is an automated email from the ASF dual-hosted git repository.

namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git


The following commit(s) were added to refs/heads/master by this push:
     new 6419fd36 IGNITE-20954 Prohibit the same group for event and metadata 
consumers (#246)
6419fd36 is described below

commit 6419fd36d9bb9400699311b5665afc4c5d33d38a
Author: Nikita Amelchev <[email protected]>
AuthorDate: Fri Nov 24 12:42:03 2023 +0300

    IGNITE-20954 Prohibit the same group for event and metadata consumers (#246)
---
 .../kafka/AbstractKafkaToIgniteCdcStreamer.java    |  8 +++-
 .../ignite/cdc/kafka/KafkaToIgniteLoaderTest.java  |  7 ++++
 .../same-consumers-group/kafka-to-ignite.xml       | 48 ++++++++++++++++++++++
 .../loader/same-consumers-group/kafka.properties   |  2 +
 4 files changed, 63 insertions(+), 2 deletions(-)

diff --git 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java
 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java
index 27de5274..2df4d204 100644
--- 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java
+++ 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java
@@ -107,8 +107,12 @@ abstract class AbstractKafkaToIgniteCdcStreamer implements 
Runnable {
         appliers = new ArrayList<>(streamerCfg.getThreadCount());
         runners = new ArrayList<>(streamerCfg.getThreadCount());
 
-        if (!kafkaProps.containsKey(ConsumerConfig.GROUP_ID_CONFIG))
-            throw new IllegalArgumentException("Kafka properties don't 
contains " + ConsumerConfig.GROUP_ID_CONFIG);
+        String grp = kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
+
+        A.ensure(grp != null, "Kafka properties don't contains " + 
ConsumerConfig.GROUP_ID_CONFIG);
+        A.ensure(
+            !grp.equals(streamerCfg.getMetadataConsumerGroup()),
+            "The group of event and metadata consumers must be different.");
 
         kafkaProps.put(KEY_DESERIALIZER_CLASS_CONFIG, 
IntegerDeserializer.class.getName());
         kafkaProps.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
diff --git 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
index cb2d385f..6938448b 100644
--- 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
+++ 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
@@ -44,6 +44,13 @@ public class KafkaToIgniteLoaderTest extends 
GridCommonAbstractTest {
             "Spring bean with provided name doesn't exist"
         );
 
+        assertThrows(
+            null,
+            () -> 
loadKafkaToIgniteStreamer("loader/same-consumers-group/kafka-to-ignite.xml"),
+            IllegalArgumentException.class,
+            "The group of event and metadata consumers must be different."
+        );
+
         KafkaToIgniteCdcStreamer streamer = 
loadKafkaToIgniteStreamer("loader/kafka-to-ignite-correct.xml");
 
         assertNotNull(streamer);
diff --git 
a/modules/cdc-ext/src/test/resources/loader/same-consumers-group/kafka-to-ignite.xml
 
b/modules/cdc-ext/src/test/resources/loader/same-consumers-group/kafka-to-ignite.xml
new file mode 100644
index 00000000..55a6a9e0
--- /dev/null
+++ 
b/modules/cdc-ext/src/test/resources/loader/same-consumers-group/kafka-to-ignite.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xmlns:util="http://www.springframework.org/schema/util";
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/util 
http://www.springframework.org/schema/util/spring-util.xsd";>
+    <bean id="grid.cfg" 
class="org.apache.ignite.configuration.IgniteConfiguration">
+        <property name="dataStorageConfiguration">
+            <bean 
class="org.apache.ignite.configuration.DataStorageConfiguration">
+                <property name="defaultDataRegionConfiguration">
+                    <bean 
class="org.apache.ignite.configuration.DataRegionConfiguration">
+                        <property name="cdcEnabled" value="true" />
+                        <property name="persistenceEnabled" value="true" />
+                    </bean>
+                </property>
+            </bean>
+        </property>
+    </bean>
+
+    <bean id="streamer.cfg" 
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+        <property name="topic" value="ignite" />
+        <property name="metadataTopic" value="ignite-metadata" />
+        <property name="kafkaPartsFrom" value="0" />
+        <property name="kafkaPartsTo" value="16" />
+        <property name="metadataConsumerGroup" value="group1" />
+    </bean>
+
+    <util:properties id="kafkaProperties" 
location="loader/same-consumers-group/kafka.properties" />
+</beans>
diff --git 
a/modules/cdc-ext/src/test/resources/loader/same-consumers-group/kafka.properties
 
b/modules/cdc-ext/src/test/resources/loader/same-consumers-group/kafka.properties
new file mode 100644
index 00000000..346e21a7
--- /dev/null
+++ 
b/modules/cdc-ext/src/test/resources/loader/same-consumers-group/kafka.properties
@@ -0,0 +1,2 @@
+bootstrap.servers=127.0.0.1
+group.id=group1

Reply via email to