This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new 6419fd36 IGNITE-20954 Prohibit the same group for event and metadata
consumers (#246)
6419fd36 is described below
commit 6419fd36d9bb9400699311b5665afc4c5d33d38a
Author: Nikita Amelchev <[email protected]>
AuthorDate: Fri Nov 24 12:42:03 2023 +0300
IGNITE-20954 Prohibit the same group for event and metadata consumers (#246)
---
.../kafka/AbstractKafkaToIgniteCdcStreamer.java | 8 +++-
.../ignite/cdc/kafka/KafkaToIgniteLoaderTest.java | 7 ++++
.../same-consumers-group/kafka-to-ignite.xml | 48 ++++++++++++++++++++++
.../loader/same-consumers-group/kafka.properties | 2 +
4 files changed, 63 insertions(+), 2 deletions(-)
diff --git
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java
index 27de5274..2df4d204 100644
---
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java
+++
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java
@@ -107,8 +107,12 @@ abstract class AbstractKafkaToIgniteCdcStreamer implements
Runnable {
appliers = new ArrayList<>(streamerCfg.getThreadCount());
runners = new ArrayList<>(streamerCfg.getThreadCount());
- if (!kafkaProps.containsKey(ConsumerConfig.GROUP_ID_CONFIG))
- throw new IllegalArgumentException("Kafka properties don't
contains " + ConsumerConfig.GROUP_ID_CONFIG);
+ String grp = kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
+
+ A.ensure(grp != null, "Kafka properties don't contains " +
ConsumerConfig.GROUP_ID_CONFIG);
+ A.ensure(
+ !grp.equals(streamerCfg.getMetadataConsumerGroup()),
+ "The group of event and metadata consumers must be different.");
kafkaProps.put(KEY_DESERIALIZER_CLASS_CONFIG,
IntegerDeserializer.class.getName());
kafkaProps.put(VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
diff --git
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
index cb2d385f..6938448b 100644
---
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
+++
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
@@ -44,6 +44,13 @@ public class KafkaToIgniteLoaderTest extends
GridCommonAbstractTest {
"Spring bean with provided name doesn't exist"
);
+ assertThrows(
+ null,
+ () ->
loadKafkaToIgniteStreamer("loader/same-consumers-group/kafka-to-ignite.xml"),
+ IllegalArgumentException.class,
+ "The group of event and metadata consumers must be different."
+ );
+
KafkaToIgniteCdcStreamer streamer =
loadKafkaToIgniteStreamer("loader/kafka-to-ignite-correct.xml");
assertNotNull(streamer);
diff --git
a/modules/cdc-ext/src/test/resources/loader/same-consumers-group/kafka-to-ignite.xml
b/modules/cdc-ext/src/test/resources/loader/same-consumers-group/kafka-to-ignite.xml
new file mode 100644
index 00000000..55a6a9e0
--- /dev/null
+++
b/modules/cdc-ext/src/test/resources/loader/same-consumers-group/kafka-to-ignite.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="grid.cfg"
class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="dataStorageConfiguration">
+ <bean
class="org.apache.ignite.configuration.DataStorageConfiguration">
+ <property name="defaultDataRegionConfiguration">
+ <bean
class="org.apache.ignite.configuration.DataRegionConfiguration">
+ <property name="cdcEnabled" value="true" />
+ <property name="persistenceEnabled" value="true" />
+ </bean>
+ </property>
+ </bean>
+ </property>
+ </bean>
+
+ <bean id="streamer.cfg"
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+ <property name="topic" value="ignite" />
+ <property name="metadataTopic" value="ignite-metadata" />
+ <property name="kafkaPartsFrom" value="0" />
+ <property name="kafkaPartsTo" value="16" />
+ <property name="metadataConsumerGroup" value="group1" />
+ </bean>
+
+ <util:properties id="kafkaProperties"
location="loader/same-consumers-group/kafka.properties" />
+</beans>
diff --git
a/modules/cdc-ext/src/test/resources/loader/same-consumers-group/kafka.properties
b/modules/cdc-ext/src/test/resources/loader/same-consumers-group/kafka.properties
new file mode 100644
index 00000000..346e21a7
--- /dev/null
+++
b/modules/cdc-ext/src/test/resources/loader/same-consumers-group/kafka.properties
@@ -0,0 +1,2 @@
+bootstrap.servers=127.0.0.1
+group.id=group1