This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new e932e702 IGNITE-22670 Added KafkaToIgnite loader tests (#279)
e932e702 is described below
commit e932e7025eb6f793fda616b6e2d5fad9beb9dfe0
Author: Maksim Davydov <[email protected]>
AuthorDate: Wed Jul 10 18:00:45 2024 +0300
IGNITE-22670 Added KafkaToIgnite loader tests (#279)
---
.../ignite/cdc/kafka/KafkaToIgniteLoaderTest.java | 56 ++++++++++++++++++++++
...gnite-with-incorrect-partition-distribution.xml | 52 ++++++++++++++++++++
...kafka-to-ignite-with-incorrect-thread-count.xml | 53 ++++++++++++++++++++
...afka-to-ignite-with-negative-partition-from.xml | 52 ++++++++++++++++++++
.../kafka-to-ignite-with-negative-partition-to.xml | 52 ++++++++++++++++++++
.../kafka-to-ignite-with-negative-thread-count.xml | 53 ++++++++++++++++++++
.../kafka-to-ignite-without-metadata-topic.xml | 51 ++++++++++++++++++++
.../loader/kafka-to-ignite-without-topic.xml | 51 ++++++++++++++++++++
...lient-with-incorrect-partition-distribution.xml | 47 ++++++++++++++++++
...o-ignite-client-with-incorrect-thread-count.xml | 48 +++++++++++++++++++
...-ignite-client-with-negative-partition-from.xml | 47 ++++++++++++++++++
...to-ignite-client-with-negative-partition-to.xml | 47 ++++++++++++++++++
...to-ignite-client-with-negative-thread-count.xml | 48 +++++++++++++++++++
...fka-to-ignite-client-without-metadata-topic.xml | 46 ++++++++++++++++++
.../thin/kafka-to-ignite-client-without-topic.xml | 46 ++++++++++++++++++
15 files changed, 749 insertions(+)
diff --git
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
index 918c0f8a..5753270a 100644
---
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
+++
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
@@ -123,6 +123,62 @@ public class KafkaToIgniteLoaderTest extends
GridCommonAbstractTest {
});
}
+ /** Tests setting kafka properties of kafka to ignite loaders. */
+ @Test
+ public void testKafkaProperties() {
+ Stream.of(
+ new String[] {
+ "loader/thin/kafka-to-ignite-client-without-topic.xml",
+ "Ouch! Argument cannot be null: Kafka topic"},
+ new String[] {
+
"loader/thin/kafka-to-ignite-client-without-metadata-topic.xml",
+ "Ouch! Argument cannot be null: Kafka metadata topic"},
+ new String[] {
+ "loader/kafka-to-ignite-without-topic.xml",
+ "Ouch! Argument cannot be null: Kafka topic"},
+ new String[] {
+ "loader/kafka-to-ignite-without-metadata-topic.xml",
+ "Ouch! Argument cannot be null: Kafka metadata topic"}
+ ).forEach(args -> assertThrows(null, () ->
loadKafkaToIgniteStreamer(args[0]), NullPointerException.class, args[1]));
+
+ Stream.of(
+ new String[] {
+
"loader/thin/kafka-to-ignite-client-with-negative-partition-from.xml",
+ "Ouch! Argument is invalid: The Kafka partitions lower bound
must be explicitly set to a value greater" +
+ " than or equals to zero."},
+ new String[] {
+
"loader/thin/kafka-to-ignite-client-with-negative-partition-to.xml",
+ "Ouch! Argument is invalid: The Kafka partitions upper bound
must be explicitly set to a value greater" +
+ " than zero."},
+ new String[] {
+
"loader/thin/kafka-to-ignite-client-with-incorrect-partition-distribution.xml",
+ "Ouch! Argument is invalid: The Kafka partitions upper bound
must be greater than lower bound."},
+ new String[] {
+
"loader/thin/kafka-to-ignite-client-with-negative-thread-count.xml",
+ "Ouch! Argument is invalid: Threads count value must me
greater than zero."},
+ new String[] {
+
"loader/thin/kafka-to-ignite-client-with-incorrect-thread-count.xml",
+ "Ouch! Argument is invalid: Threads count must be less or
equals to the total Kafka partitions count."},
+ new String[] {
+ "loader/kafka-to-ignite-with-negative-partition-from.xml",
+ "Ouch! Argument is invalid: The Kafka partitions lower bound
must be explicitly set to a value greater" +
+ " than or equals to zero."},
+ new String[] {
+ "loader/kafka-to-ignite-with-negative-partition-to.xml",
+ "Ouch! Argument is invalid: The Kafka partitions upper bound
must be explicitly set to a value greater" +
+ " than zero."},
+ new String[] {
+
"loader/kafka-to-ignite-with-incorrect-partition-distribution.xml",
+ "Ouch! Argument is invalid: The Kafka partitions upper bound
must be greater than lower bound."},
+ new String[] {
+ "loader/kafka-to-ignite-with-negative-thread-count.xml",
+ "Ouch! Argument is invalid: Threads count value must me
greater than zero."},
+ new String[] {
+ "loader/kafka-to-ignite-with-incorrect-thread-count.xml",
+ "Ouch! Argument is invalid: Threads count must be less or
equals to the total Kafka partitions count."}
+ ).forEach(args -> assertThrows(null, () ->
loadKafkaToIgniteStreamer(args[0]), IllegalArgumentException.class, args[1]));
+ }
+
/** */
@Test
public void testInitSpringContextOnce() throws Exception {
diff --git
a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-with-incorrect-partition-distribution.xml
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-with-incorrect-partition-distribution.xml
new file mode 100644
index 00000000..694daeeb
--- /dev/null
+++
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-with-incorrect-partition-distribution.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="grid.cfg"
class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="dataStorageConfiguration">
+ <bean
class="org.apache.ignite.configuration.DataStorageConfiguration">
+ <property name="defaultDataRegionConfiguration">
+ <bean
class="org.apache.ignite.configuration.DataRegionConfiguration">
+ <property name="cdcEnabled" value="true" />
+ <property name="persistenceEnabled" value="true" />
+ </bean>
+ </property>
+ </bean>
+ </property>
+ </bean>
+
+ <util:constant
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT"
id="consumerPollTimeout" />
+ <util:constant
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT"
id="requestTimeout" />
+
+ <bean id="streamer.cfg"
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+ <property name="topic" value="ignite" />
+ <property name="metadataTopic" value="ignite-metadata" />
+ <property name="kafkaPartsFrom" value="5" />
+ <property name="kafkaPartsTo" value="4" />
+ <property name="kafkaRequestTimeout" ref="requestTimeout" />
+ <property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
+ </bean>
+
+ <util:properties id="kafkaProperties" location="loader/kafka.properties" />
+</beans>
diff --git
a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-with-incorrect-thread-count.xml
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-with-incorrect-thread-count.xml
new file mode 100644
index 00000000..184a043a
--- /dev/null
+++
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-with-incorrect-thread-count.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="grid.cfg"
class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="dataStorageConfiguration">
+ <bean
class="org.apache.ignite.configuration.DataStorageConfiguration">
+ <property name="defaultDataRegionConfiguration">
+ <bean
class="org.apache.ignite.configuration.DataRegionConfiguration">
+ <property name="cdcEnabled" value="true" />
+ <property name="persistenceEnabled" value="true" />
+ </bean>
+ </property>
+ </bean>
+ </property>
+ </bean>
+
+ <util:constant
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT"
id="consumerPollTimeout" />
+ <util:constant
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT"
id="requestTimeout" />
+
+ <bean id="streamer.cfg"
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+ <property name="topic" value="ignite" />
+ <property name="metadataTopic" value="ignite-metadata" />
+ <property name="kafkaPartsFrom" value="0" />
+ <property name="kafkaPartsTo" value="16" />
+ <property name="kafkaRequestTimeout" ref="requestTimeout" />
+ <property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
+ <property name="threadCount" value="20" />
+ </bean>
+
+ <util:properties id="kafkaProperties" location="loader/kafka.properties" />
+</beans>
diff --git
a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-with-negative-partition-from.xml
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-with-negative-partition-from.xml
new file mode 100644
index 00000000..11f139dd
--- /dev/null
+++
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-with-negative-partition-from.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="grid.cfg"
class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="dataStorageConfiguration">
+ <bean
class="org.apache.ignite.configuration.DataStorageConfiguration">
+ <property name="defaultDataRegionConfiguration">
+ <bean
class="org.apache.ignite.configuration.DataRegionConfiguration">
+ <property name="cdcEnabled" value="true" />
+ <property name="persistenceEnabled" value="true" />
+ </bean>
+ </property>
+ </bean>
+ </property>
+ </bean>
+
+ <util:constant
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT"
id="consumerPollTimeout" />
+ <util:constant
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT"
id="requestTimeout" />
+
+ <bean id="streamer.cfg"
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+ <property name="topic" value="ignite" />
+ <property name="metadataTopic" value="ignite-metadata" />
+ <property name="kafkaPartsFrom" value="-1" />
+ <property name="kafkaPartsTo" value="16" />
+ <property name="kafkaRequestTimeout" ref="requestTimeout" />
+ <property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
+ </bean>
+
+ <util:properties id="kafkaProperties" location="loader/kafka.properties" />
+</beans>
diff --git
a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-with-negative-partition-to.xml
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-with-negative-partition-to.xml
new file mode 100644
index 00000000..47073ff5
--- /dev/null
+++
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-with-negative-partition-to.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="grid.cfg"
class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="dataStorageConfiguration">
+ <bean
class="org.apache.ignite.configuration.DataStorageConfiguration">
+ <property name="defaultDataRegionConfiguration">
+ <bean
class="org.apache.ignite.configuration.DataRegionConfiguration">
+ <property name="cdcEnabled" value="true" />
+ <property name="persistenceEnabled" value="true" />
+ </bean>
+ </property>
+ </bean>
+ </property>
+ </bean>
+
+ <util:constant
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT"
id="consumerPollTimeout" />
+ <util:constant
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT"
id="requestTimeout" />
+
+ <bean id="streamer.cfg"
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+ <property name="topic" value="ignite" />
+ <property name="metadataTopic" value="ignite-metadata" />
+ <property name="kafkaPartsFrom" value="0" />
+ <property name="kafkaPartsTo" value="-1" />
+ <property name="kafkaRequestTimeout" ref="requestTimeout" />
+ <property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
+ </bean>
+
+ <util:properties id="kafkaProperties" location="loader/kafka.properties" />
+</beans>
diff --git
a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-with-negative-thread-count.xml
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-with-negative-thread-count.xml
new file mode 100644
index 00000000..2d0167a9
--- /dev/null
+++
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-with-negative-thread-count.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="grid.cfg"
class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="dataStorageConfiguration">
+ <bean
class="org.apache.ignite.configuration.DataStorageConfiguration">
+ <property name="defaultDataRegionConfiguration">
+ <bean
class="org.apache.ignite.configuration.DataRegionConfiguration">
+ <property name="cdcEnabled" value="true" />
+ <property name="persistenceEnabled" value="true" />
+ </bean>
+ </property>
+ </bean>
+ </property>
+ </bean>
+
+ <util:constant
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT"
id="consumerPollTimeout" />
+ <util:constant
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT"
id="requestTimeout" />
+
+ <bean id="streamer.cfg"
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+ <property name="topic" value="ignite" />
+ <property name="metadataTopic" value="ignite-metadata" />
+ <property name="kafkaPartsFrom" value="0" />
+ <property name="kafkaPartsTo" value="16" />
+ <property name="kafkaRequestTimeout" ref="requestTimeout" />
+ <property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
+ <property name="threadCount" value="-4" />
+ </bean>
+
+ <util:properties id="kafkaProperties" location="loader/kafka.properties" />
+</beans>
diff --git
a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-without-metadata-topic.xml
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-without-metadata-topic.xml
new file mode 100644
index 00000000..b5599f72
--- /dev/null
+++
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-without-metadata-topic.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="grid.cfg"
class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="dataStorageConfiguration">
+ <bean
class="org.apache.ignite.configuration.DataStorageConfiguration">
+ <property name="defaultDataRegionConfiguration">
+ <bean
class="org.apache.ignite.configuration.DataRegionConfiguration">
+ <property name="cdcEnabled" value="true" />
+ <property name="persistenceEnabled" value="true" />
+ </bean>
+ </property>
+ </bean>
+ </property>
+ </bean>
+
+ <util:constant
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT"
id="consumerPollTimeout" />
+ <util:constant
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT"
id="requestTimeout" />
+
+ <bean id="streamer.cfg"
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+ <property name="topic" value="ignite" />
+ <property name="kafkaPartsFrom" value="0" />
+ <property name="kafkaPartsTo" value="16" />
+ <property name="kafkaRequestTimeout" ref="requestTimeout" />
+ <property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
+ </bean>
+
+ <util:properties id="kafkaProperties" location="loader/kafka.properties" />
+</beans>
diff --git
a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-without-topic.xml
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-without-topic.xml
new file mode 100644
index 00000000..29787f64
--- /dev/null
+++
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-without-topic.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="grid.cfg"
class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="dataStorageConfiguration">
+ <bean
class="org.apache.ignite.configuration.DataStorageConfiguration">
+ <property name="defaultDataRegionConfiguration">
+ <bean
class="org.apache.ignite.configuration.DataRegionConfiguration">
+ <property name="cdcEnabled" value="true" />
+ <property name="persistenceEnabled" value="true" />
+ </bean>
+ </property>
+ </bean>
+ </property>
+ </bean>
+
+ <util:constant
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT"
id="consumerPollTimeout" />
+ <util:constant
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT"
id="requestTimeout" />
+
+ <bean id="streamer.cfg"
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+ <property name="metadataTopic" value="ignite-metadata" />
+ <property name="kafkaPartsFrom" value="0" />
+ <property name="kafkaPartsTo" value="16" />
+ <property name="kafkaRequestTimeout" ref="requestTimeout" />
+ <property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
+ </bean>
+
+ <util:properties id="kafkaProperties" location="loader/kafka.properties" />
+</beans>
diff --git
a/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-incorrect-partition-distribution.xml
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-incorrect-partition-distribution.xml
new file mode 100644
index 00000000..7ea64fa5
--- /dev/null
+++
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-incorrect-partition-distribution.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="client.cfg"
class="org.apache.ignite.configuration.ClientConfiguration">
+ <property name="addresses">
+ <list>
+ <value>127.0.0.1:10800</value>
+ </list>
+ </property>
+ </bean>
+
+ <util:constant
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT"
id="consumerPollTimeout" />
+ <util:constant
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT"
id="requestTimeout" />
+
+ <bean id="streamer.cfg"
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+ <property name="topic" value="ignite" />
+ <property name="metadataTopic" value="ignite-metadata" />
+ <property name="kafkaPartsFrom" value="5" />
+ <property name="kafkaPartsTo" value="4" />
+ <property name="kafkaRequestTimeout" ref="requestTimeout" />
+ <property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
+ </bean>
+
+ <util:properties id="kafkaProperties" location="loader/kafka.properties" />
+</beans>
diff --git
a/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-incorrect-thread-count.xml
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-incorrect-thread-count.xml
new file mode 100644
index 00000000..883159c2
--- /dev/null
+++
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-incorrect-thread-count.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="client.cfg"
class="org.apache.ignite.configuration.ClientConfiguration">
+ <property name="addresses">
+ <list>
+ <value>127.0.0.1:10800</value>
+ </list>
+ </property>
+ </bean>
+
+ <util:constant
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT"
id="consumerPollTimeout" />
+ <util:constant
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT"
id="requestTimeout" />
+
+ <bean id="streamer.cfg"
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+ <property name="topic" value="ignite" />
+ <property name="metadataTopic" value="ignite-metadata" />
+ <property name="kafkaPartsFrom" value="0" />
+ <property name="kafkaPartsTo" value="16" />
+ <property name="kafkaRequestTimeout" ref="requestTimeout" />
+ <property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
+ <property name="threadCount" value="20" />
+ </bean>
+
+ <util:properties id="kafkaProperties" location="loader/kafka.properties" />
+</beans>
diff --git
a/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-negative-partition-from.xml
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-negative-partition-from.xml
new file mode 100644
index 00000000..a6bd67de
--- /dev/null
+++
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-negative-partition-from.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="client.cfg"
class="org.apache.ignite.configuration.ClientConfiguration">
+ <property name="addresses">
+ <list>
+ <value>127.0.0.1:10800</value>
+ </list>
+ </property>
+ </bean>
+
+ <util:constant
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT"
id="consumerPollTimeout" />
+ <util:constant
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT"
id="requestTimeout" />
+
+ <bean id="streamer.cfg"
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+ <property name="topic" value="ignite" />
+ <property name="metadataTopic" value="ignite-metadata" />
+ <property name="kafkaPartsFrom" value="-1" />
+ <property name="kafkaPartsTo" value="16" />
+ <property name="kafkaRequestTimeout" ref="requestTimeout" />
+ <property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
+ </bean>
+
+ <util:properties id="kafkaProperties" location="loader/kafka.properties" />
+</beans>
diff --git
a/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-negative-partition-to.xml
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-negative-partition-to.xml
new file mode 100644
index 00000000..71fb5164
--- /dev/null
+++
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-negative-partition-to.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="client.cfg"
class="org.apache.ignite.configuration.ClientConfiguration">
+ <property name="addresses">
+ <list>
+ <value>127.0.0.1:10800</value>
+ </list>
+ </property>
+ </bean>
+
+ <util:constant
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT"
id="consumerPollTimeout" />
+ <util:constant
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT"
id="requestTimeout" />
+
+ <bean id="streamer.cfg"
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+ <property name="topic" value="ignite" />
+ <property name="metadataTopic" value="ignite-metadata" />
+ <property name="kafkaPartsFrom" value="0" />
+ <property name="kafkaPartsTo" value="-2" />
+ <property name="kafkaRequestTimeout" ref="requestTimeout" />
+ <property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
+ </bean>
+
+ <util:properties id="kafkaProperties" location="loader/kafka.properties" />
+</beans>
diff --git
a/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-negative-thread-count.xml
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-negative-thread-count.xml
new file mode 100644
index 00000000..8e6b7e3b
--- /dev/null
+++
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-negative-thread-count.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="client.cfg"
class="org.apache.ignite.configuration.ClientConfiguration">
+ <property name="addresses">
+ <list>
+ <value>127.0.0.1:10800</value>
+ </list>
+ </property>
+ </bean>
+
+ <util:constant
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT"
id="consumerPollTimeout" />
+ <util:constant
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT"
id="requestTimeout" />
+
+ <bean id="streamer.cfg"
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+ <property name="topic" value="ignite" />
+ <property name="metadataTopic" value="ignite-metadata" />
+ <property name="kafkaPartsFrom" value="0" />
+ <property name="kafkaPartsTo" value="16" />
+ <property name="kafkaRequestTimeout" ref="requestTimeout" />
+ <property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
+ <property name="threadCount" value="-4" />
+ </bean>
+
+ <util:properties id="kafkaProperties" location="loader/kafka.properties" />
+</beans>
diff --git
a/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-without-metadata-topic.xml
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-without-metadata-topic.xml
new file mode 100644
index 00000000..72ac224c
--- /dev/null
+++
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-without-metadata-topic.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="client.cfg"
class="org.apache.ignite.configuration.ClientConfiguration">
+ <property name="addresses">
+ <list>
+ <value>127.0.0.1:10800</value>
+ </list>
+ </property>
+ </bean>
+
+ <util:constant
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT"
id="consumerPollTimeout" />
+ <util:constant
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT"
id="requestTimeout" />
+
+ <bean id="streamer.cfg"
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+ <property name="topic" value="ignite" />
+ <property name="kafkaPartsFrom" value="0" />
+ <property name="kafkaPartsTo" value="16" />
+ <property name="kafkaRequestTimeout" ref="requestTimeout" />
+ <property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
+ </bean>
+
+ <util:properties id="kafkaProperties" location="loader/kafka.properties" />
+</beans>
diff --git
a/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-without-topic.xml
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-without-topic.xml
new file mode 100644
index 00000000..fe40265a
--- /dev/null
+++
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-without-topic.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="client.cfg"
class="org.apache.ignite.configuration.ClientConfiguration">
+ <property name="addresses">
+ <list>
+ <value>127.0.0.1:10800</value>
+ </list>
+ </property>
+ </bean>
+
+ <util:constant
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT"
id="consumerPollTimeout" />
+ <util:constant
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT"
id="requestTimeout" />
+
+ <bean id="streamer.cfg"
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+ <property name="metadataTopic" value="ignite-metadata" />
+ <property name="kafkaPartsFrom" value="0" />
+ <property name="kafkaPartsTo" value="16" />
+ <property name="kafkaRequestTimeout" ref="requestTimeout" />
+ <property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
+ </bean>
+
+ <util:properties id="kafkaProperties" location="loader/kafka.properties" />
+</beans>