This is an automated email from the ASF dual-hosted git repository.

namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git


The following commit(s) were added to refs/heads/master by this push:
     new e932e702 IGNITE-22670 Added KafkaToIgnite loader tests (#279)
e932e702 is described below

commit e932e7025eb6f793fda616b6e2d5fad9beb9dfe0
Author: Maksim Davydov <[email protected]>
AuthorDate: Wed Jul 10 18:00:45 2024 +0300

    IGNITE-22670 Added KafkaToIgnite loader tests (#279)
---
 .../ignite/cdc/kafka/KafkaToIgniteLoaderTest.java  | 56 ++++++++++++++++++++++
 ...gnite-with-incorrect-partition-distribution.xml | 52 ++++++++++++++++++++
 ...kafka-to-ignite-with-incorrect-thread-count.xml | 53 ++++++++++++++++++++
 ...afka-to-ignite-with-negative-partition-from.xml | 52 ++++++++++++++++++++
 .../kafka-to-ignite-with-negative-partition-to.xml | 52 ++++++++++++++++++++
 .../kafka-to-ignite-with-negative-thread-count.xml | 53 ++++++++++++++++++++
 .../kafka-to-ignite-without-metadata-topic.xml     | 51 ++++++++++++++++++++
 .../loader/kafka-to-ignite-without-topic.xml       | 51 ++++++++++++++++++++
 ...lient-with-incorrect-partition-distribution.xml | 47 ++++++++++++++++++
 ...o-ignite-client-with-incorrect-thread-count.xml | 48 +++++++++++++++++++
 ...-ignite-client-with-negative-partition-from.xml | 47 ++++++++++++++++++
 ...to-ignite-client-with-negative-partition-to.xml | 47 ++++++++++++++++++
 ...to-ignite-client-with-negative-thread-count.xml | 48 +++++++++++++++++++
 ...fka-to-ignite-client-without-metadata-topic.xml | 46 ++++++++++++++++++
 .../thin/kafka-to-ignite-client-without-topic.xml  | 46 ++++++++++++++++++
 15 files changed, 749 insertions(+)

diff --git 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
index 918c0f8a..5753270a 100644
--- 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
+++ 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
@@ -123,6 +123,62 @@ public class KafkaToIgniteLoaderTest extends 
GridCommonAbstractTest {
         });
     }
 
+    /** Tests setting kafka properties of kafka to ignite loaders. */
+    @Test
+    public void testKafkaProperties() {
+        Stream.of(
+            new String[] {
+                "loader/thin/kafka-to-ignite-client-without-topic.xml",
+                "Ouch! Argument cannot be null: Kafka topic"},
+            new String[] {
+                
"loader/thin/kafka-to-ignite-client-without-metadata-topic.xml",
+                "Ouch! Argument cannot be null: Kafka metadata topic"},
+            new String[] {
+                "loader/kafka-to-ignite-without-topic.xml",
+                "Ouch! Argument cannot be null: Kafka topic"},
+            new String[] {
+                "loader/kafka-to-ignite-without-metadata-topic.xml",
+                "Ouch! Argument cannot be null: Kafka metadata topic"}
+        ).forEach(args -> assertThrows(null, () -> 
loadKafkaToIgniteStreamer(args[0]), NullPointerException.class, args[1]));
+
+        Stream.of(
+            new String[] {
+                
"loader/thin/kafka-to-ignite-client-with-negative-partition-from.xml",
+                "Ouch! Argument is invalid: The Kafka partitions lower bound 
must be explicitly set to a value greater" +
+                    " than or equals to zero."},
+            new String[] {
+                
"loader/thin/kafka-to-ignite-client-with-negative-partition-to.xml",
+                "Ouch! Argument is invalid: The Kafka partitions upper bound 
must be explicitly set to a value greater" +
+                    " than zero."},
+            new String[] {
+                
"loader/thin/kafka-to-ignite-client-with-incorrect-partition-distribution.xml",
+                "Ouch! Argument is invalid: The Kafka partitions upper bound 
must be greater than lower bound."},
+            new String[] {
+                
"loader/thin/kafka-to-ignite-client-with-negative-thread-count.xml",
+                "Ouch! Argument is invalid: Threads count value must me 
greater than zero."},
+            new String[] {
+                
"loader/thin/kafka-to-ignite-client-with-incorrect-thread-count.xml",
+                "Ouch! Argument is invalid: Threads count must be less or 
equals to the total Kafka partitions count."},
+            new String[] {
+                "loader/kafka-to-ignite-with-negative-partition-from.xml",
+                "Ouch! Argument is invalid: The Kafka partitions lower bound 
must be explicitly set to a value greater" +
+                    " than or equals to zero."},
+            new String[] {
+                "loader/kafka-to-ignite-with-negative-partition-to.xml",
+                "Ouch! Argument is invalid: The Kafka partitions upper bound 
must be explicitly set to a value greater" +
+                    " than zero."},
+            new String[] {
+                
"loader/kafka-to-ignite-with-incorrect-partition-distribution.xml",
+                "Ouch! Argument is invalid: The Kafka partitions upper bound 
must be greater than lower bound."},
+            new String[] {
+                "loader/kafka-to-ignite-with-negative-thread-count.xml",
+                "Ouch! Argument is invalid: Threads count value must me 
greater than zero."},
+            new String[] {
+                "loader/kafka-to-ignite-with-incorrect-thread-count.xml",
+                "Ouch! Argument is invalid: Threads count must be less or 
equals to the total Kafka partitions count."}
+        ).forEach(args -> assertThrows(null, () -> 
loadKafkaToIgniteStreamer(args[0]), IllegalArgumentException.class, args[1]));
+    }
+
     /** */
     @Test
     public void testInitSpringContextOnce() throws Exception {
diff --git 
a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-with-incorrect-partition-distribution.xml
 
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-with-incorrect-partition-distribution.xml
new file mode 100644
index 00000000..694daeeb
--- /dev/null
+++ 
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-with-incorrect-partition-distribution.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xmlns:util="http://www.springframework.org/schema/util";
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/util 
http://www.springframework.org/schema/util/spring-util.xsd";>
+    <bean id="grid.cfg" 
class="org.apache.ignite.configuration.IgniteConfiguration">
+        <property name="dataStorageConfiguration">
+            <bean 
class="org.apache.ignite.configuration.DataStorageConfiguration">
+                <property name="defaultDataRegionConfiguration">
+                    <bean 
class="org.apache.ignite.configuration.DataRegionConfiguration">
+                        <property name="cdcEnabled" value="true" />
+                        <property name="persistenceEnabled" value="true" />
+                    </bean>
+                </property>
+            </bean>
+        </property>
+    </bean>
+
+    <util:constant 
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT"
 id="consumerPollTimeout" />
+    <util:constant 
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT"
 id="requestTimeout" />
+
+    <bean id="streamer.cfg" 
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+        <property name="topic" value="ignite" />
+        <property name="metadataTopic" value="ignite-metadata" />
+        <property name="kafkaPartsFrom" value="5" />
+        <property name="kafkaPartsTo" value="4" />
+        <property name="kafkaRequestTimeout" ref="requestTimeout" />
+        <property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
+    </bean>
+
+    <util:properties id="kafkaProperties" location="loader/kafka.properties" />
+</beans>
diff --git 
a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-with-incorrect-thread-count.xml
 
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-with-incorrect-thread-count.xml
new file mode 100644
index 00000000..184a043a
--- /dev/null
+++ 
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-with-incorrect-thread-count.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xmlns:util="http://www.springframework.org/schema/util";
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/util 
http://www.springframework.org/schema/util/spring-util.xsd";>
+    <bean id="grid.cfg" 
class="org.apache.ignite.configuration.IgniteConfiguration">
+        <property name="dataStorageConfiguration">
+            <bean 
class="org.apache.ignite.configuration.DataStorageConfiguration">
+                <property name="defaultDataRegionConfiguration">
+                    <bean 
class="org.apache.ignite.configuration.DataRegionConfiguration">
+                        <property name="cdcEnabled" value="true" />
+                        <property name="persistenceEnabled" value="true" />
+                    </bean>
+                </property>
+            </bean>
+        </property>
+    </bean>
+
+    <util:constant 
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT"
 id="consumerPollTimeout" />
+    <util:constant 
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT"
 id="requestTimeout" />
+
+    <bean id="streamer.cfg" 
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+        <property name="topic" value="ignite" />
+        <property name="metadataTopic" value="ignite-metadata" />
+        <property name="kafkaPartsFrom" value="0" />
+        <property name="kafkaPartsTo" value="16" />
+        <property name="kafkaRequestTimeout" ref="requestTimeout" />
+        <property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
+        <property name="threadCount" value="20" />
+    </bean>
+
+    <util:properties id="kafkaProperties" location="loader/kafka.properties" />
+</beans>
diff --git 
a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-with-negative-partition-from.xml
 
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-with-negative-partition-from.xml
new file mode 100644
index 00000000..11f139dd
--- /dev/null
+++ 
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-with-negative-partition-from.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xmlns:util="http://www.springframework.org/schema/util";
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/util 
http://www.springframework.org/schema/util/spring-util.xsd";>
+    <bean id="grid.cfg" 
class="org.apache.ignite.configuration.IgniteConfiguration">
+        <property name="dataStorageConfiguration">
+            <bean 
class="org.apache.ignite.configuration.DataStorageConfiguration">
+                <property name="defaultDataRegionConfiguration">
+                    <bean 
class="org.apache.ignite.configuration.DataRegionConfiguration">
+                        <property name="cdcEnabled" value="true" />
+                        <property name="persistenceEnabled" value="true" />
+                    </bean>
+                </property>
+            </bean>
+        </property>
+    </bean>
+
+    <util:constant 
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT"
 id="consumerPollTimeout" />
+    <util:constant 
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT"
 id="requestTimeout" />
+
+    <bean id="streamer.cfg" 
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+        <property name="topic" value="ignite" />
+        <property name="metadataTopic" value="ignite-metadata" />
+        <property name="kafkaPartsFrom" value="-1" />
+        <property name="kafkaPartsTo" value="16" />
+        <property name="kafkaRequestTimeout" ref="requestTimeout" />
+        <property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
+    </bean>
+
+    <util:properties id="kafkaProperties" location="loader/kafka.properties" />
+</beans>
diff --git 
a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-with-negative-partition-to.xml
 
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-with-negative-partition-to.xml
new file mode 100644
index 00000000..47073ff5
--- /dev/null
+++ 
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-with-negative-partition-to.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xmlns:util="http://www.springframework.org/schema/util";
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/util 
http://www.springframework.org/schema/util/spring-util.xsd";>
+    <bean id="grid.cfg" 
class="org.apache.ignite.configuration.IgniteConfiguration">
+        <property name="dataStorageConfiguration">
+            <bean 
class="org.apache.ignite.configuration.DataStorageConfiguration">
+                <property name="defaultDataRegionConfiguration">
+                    <bean 
class="org.apache.ignite.configuration.DataRegionConfiguration">
+                        <property name="cdcEnabled" value="true" />
+                        <property name="persistenceEnabled" value="true" />
+                    </bean>
+                </property>
+            </bean>
+        </property>
+    </bean>
+
+    <util:constant 
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT"
 id="consumerPollTimeout" />
+    <util:constant 
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT"
 id="requestTimeout" />
+
+    <bean id="streamer.cfg" 
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+        <property name="topic" value="ignite" />
+        <property name="metadataTopic" value="ignite-metadata" />
+        <property name="kafkaPartsFrom" value="0" />
+        <property name="kafkaPartsTo" value="-1" />
+        <property name="kafkaRequestTimeout" ref="requestTimeout" />
+        <property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
+    </bean>
+
+    <util:properties id="kafkaProperties" location="loader/kafka.properties" />
+</beans>
diff --git 
a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-with-negative-thread-count.xml
 
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-with-negative-thread-count.xml
new file mode 100644
index 00000000..2d0167a9
--- /dev/null
+++ 
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-with-negative-thread-count.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xmlns:util="http://www.springframework.org/schema/util";
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/util 
http://www.springframework.org/schema/util/spring-util.xsd";>
+    <bean id="grid.cfg" 
class="org.apache.ignite.configuration.IgniteConfiguration">
+        <property name="dataStorageConfiguration">
+            <bean 
class="org.apache.ignite.configuration.DataStorageConfiguration">
+                <property name="defaultDataRegionConfiguration">
+                    <bean 
class="org.apache.ignite.configuration.DataRegionConfiguration">
+                        <property name="cdcEnabled" value="true" />
+                        <property name="persistenceEnabled" value="true" />
+                    </bean>
+                </property>
+            </bean>
+        </property>
+    </bean>
+
+    <util:constant 
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT"
 id="consumerPollTimeout" />
+    <util:constant 
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT"
 id="requestTimeout" />
+
+    <bean id="streamer.cfg" 
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+        <property name="topic" value="ignite" />
+        <property name="metadataTopic" value="ignite-metadata" />
+        <property name="kafkaPartsFrom" value="0" />
+        <property name="kafkaPartsTo" value="16" />
+        <property name="kafkaRequestTimeout" ref="requestTimeout" />
+        <property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
+        <property name="threadCount" value="-4" />
+    </bean>
+
+    <util:properties id="kafkaProperties" location="loader/kafka.properties" />
+</beans>
diff --git 
a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-without-metadata-topic.xml
 
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-without-metadata-topic.xml
new file mode 100644
index 00000000..b5599f72
--- /dev/null
+++ 
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-without-metadata-topic.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xmlns:util="http://www.springframework.org/schema/util";
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/util 
http://www.springframework.org/schema/util/spring-util.xsd";>
+    <bean id="grid.cfg" 
class="org.apache.ignite.configuration.IgniteConfiguration">
+        <property name="dataStorageConfiguration">
+            <bean 
class="org.apache.ignite.configuration.DataStorageConfiguration">
+                <property name="defaultDataRegionConfiguration">
+                    <bean 
class="org.apache.ignite.configuration.DataRegionConfiguration">
+                        <property name="cdcEnabled" value="true" />
+                        <property name="persistenceEnabled" value="true" />
+                    </bean>
+                </property>
+            </bean>
+        </property>
+    </bean>
+
+    <util:constant 
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT"
 id="consumerPollTimeout" />
+    <util:constant 
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT"
 id="requestTimeout" />
+
+    <bean id="streamer.cfg" 
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+        <property name="topic" value="ignite" />
+        <property name="kafkaPartsFrom" value="0" />
+        <property name="kafkaPartsTo" value="16" />
+        <property name="kafkaRequestTimeout" ref="requestTimeout" />
+        <property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
+    </bean>
+
+    <util:properties id="kafkaProperties" location="loader/kafka.properties" />
+</beans>
diff --git 
a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-without-topic.xml 
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-without-topic.xml
new file mode 100644
index 00000000..29787f64
--- /dev/null
+++ 
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-without-topic.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xmlns:util="http://www.springframework.org/schema/util";
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/util 
http://www.springframework.org/schema/util/spring-util.xsd";>
+    <bean id="grid.cfg" 
class="org.apache.ignite.configuration.IgniteConfiguration">
+        <property name="dataStorageConfiguration">
+            <bean 
class="org.apache.ignite.configuration.DataStorageConfiguration">
+                <property name="defaultDataRegionConfiguration">
+                    <bean 
class="org.apache.ignite.configuration.DataRegionConfiguration">
+                        <property name="cdcEnabled" value="true" />
+                        <property name="persistenceEnabled" value="true" />
+                    </bean>
+                </property>
+            </bean>
+        </property>
+    </bean>
+
+    <util:constant 
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT"
 id="consumerPollTimeout" />
+    <util:constant 
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT"
 id="requestTimeout" />
+
+    <bean id="streamer.cfg" 
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+        <property name="metadataTopic" value="ignite-metadata" />
+        <property name="kafkaPartsFrom" value="0" />
+        <property name="kafkaPartsTo" value="16" />
+        <property name="kafkaRequestTimeout" ref="requestTimeout" />
+        <property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
+    </bean>
+
+    <util:properties id="kafkaProperties" location="loader/kafka.properties" />
+</beans>
diff --git 
a/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-incorrect-partition-distribution.xml
 
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-incorrect-partition-distribution.xml
new file mode 100644
index 00000000..7ea64fa5
--- /dev/null
+++ 
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-incorrect-partition-distribution.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xmlns:util="http://www.springframework.org/schema/util";
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/util 
http://www.springframework.org/schema/util/spring-util.xsd";>
+    <bean id="client.cfg" 
class="org.apache.ignite.configuration.ClientConfiguration">
+        <property name="addresses">
+            <list>
+                <value>127.0.0.1:10800</value>
+            </list>
+        </property>
+    </bean>
+
+    <util:constant 
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT"
 id="consumerPollTimeout" />
+    <util:constant 
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT"
 id="requestTimeout" />
+
+    <bean id="streamer.cfg" 
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+        <property name="topic" value="ignite" />
+        <property name="metadataTopic" value="ignite-metadata" />
+        <property name="kafkaPartsFrom" value="5" />
+        <property name="kafkaPartsTo" value="4" />
+        <property name="kafkaRequestTimeout" ref="requestTimeout" />
+        <property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
+    </bean>
+
+    <util:properties id="kafkaProperties" location="loader/kafka.properties" />
+</beans>
diff --git 
a/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-incorrect-thread-count.xml
 
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-incorrect-thread-count.xml
new file mode 100644
index 00000000..883159c2
--- /dev/null
+++ 
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-incorrect-thread-count.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xmlns:util="http://www.springframework.org/schema/util";
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/util 
http://www.springframework.org/schema/util/spring-util.xsd";>
+    <bean id="client.cfg" 
class="org.apache.ignite.configuration.ClientConfiguration">
+        <property name="addresses">
+            <list>
+                <value>127.0.0.1:10800</value>
+            </list>
+        </property>
+    </bean>
+
+    <util:constant 
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT"
 id="consumerPollTimeout" />
+    <util:constant 
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT"
 id="requestTimeout" />
+
+    <bean id="streamer.cfg" 
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+        <property name="topic" value="ignite" />
+        <property name="metadataTopic" value="ignite-metadata" />
+        <property name="kafkaPartsFrom" value="0" />
+        <property name="kafkaPartsTo" value="16" />
+        <property name="kafkaRequestTimeout" ref="requestTimeout" />
+        <property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
+        <property name="threadCount" value="20" />
+    </bean>
+
+    <util:properties id="kafkaProperties" location="loader/kafka.properties" />
+</beans>
diff --git 
a/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-negative-partition-from.xml
 
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-negative-partition-from.xml
new file mode 100644
index 00000000..a6bd67de
--- /dev/null
+++ 
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-negative-partition-from.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xmlns:util="http://www.springframework.org/schema/util";
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/util 
http://www.springframework.org/schema/util/spring-util.xsd";>
+    <bean id="client.cfg" 
class="org.apache.ignite.configuration.ClientConfiguration">
+        <property name="addresses">
+            <list>
+                <value>127.0.0.1:10800</value>
+            </list>
+        </property>
+    </bean>
+
+    <util:constant 
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT"
 id="consumerPollTimeout" />
+    <util:constant 
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT"
 id="requestTimeout" />
+
+    <bean id="streamer.cfg" 
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+        <property name="topic" value="ignite" />
+        <property name="metadataTopic" value="ignite-metadata" />
+        <property name="kafkaPartsFrom" value="-1" />
+        <property name="kafkaPartsTo" value="16" />
+        <property name="kafkaRequestTimeout" ref="requestTimeout" />
+        <property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
+    </bean>
+
+    <util:properties id="kafkaProperties" location="loader/kafka.properties" />
+</beans>
diff --git 
a/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-negative-partition-to.xml
 
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-negative-partition-to.xml
new file mode 100644
index 00000000..71fb5164
--- /dev/null
+++ 
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-negative-partition-to.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xmlns:util="http://www.springframework.org/schema/util";
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/util 
http://www.springframework.org/schema/util/spring-util.xsd";>
+    <bean id="client.cfg" 
class="org.apache.ignite.configuration.ClientConfiguration">
+        <property name="addresses">
+            <list>
+                <value>127.0.0.1:10800</value>
+            </list>
+        </property>
+    </bean>
+
+    <util:constant 
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT"
 id="consumerPollTimeout" />
+    <util:constant 
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT"
 id="requestTimeout" />
+
+    <bean id="streamer.cfg" 
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+        <property name="topic" value="ignite" />
+        <property name="metadataTopic" value="ignite-metadata" />
+        <property name="kafkaPartsFrom" value="0" />
+        <property name="kafkaPartsTo" value="-2" />
+        <property name="kafkaRequestTimeout" ref="requestTimeout" />
+        <property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
+    </bean>
+
+    <util:properties id="kafkaProperties" location="loader/kafka.properties" />
+</beans>
diff --git 
a/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-negative-thread-count.xml
 
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-negative-thread-count.xml
new file mode 100644
index 00000000..8e6b7e3b
--- /dev/null
+++ 
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-negative-thread-count.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xmlns:util="http://www.springframework.org/schema/util";
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/util 
http://www.springframework.org/schema/util/spring-util.xsd";>
+    <bean id="client.cfg" 
class="org.apache.ignite.configuration.ClientConfiguration">
+        <property name="addresses">
+            <list>
+                <value>127.0.0.1:10800</value>
+            </list>
+        </property>
+    </bean>
+
+    <util:constant 
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT"
 id="consumerPollTimeout" />
+    <util:constant 
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT"
 id="requestTimeout" />
+
+    <bean id="streamer.cfg" 
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+        <property name="topic" value="ignite" />
+        <property name="metadataTopic" value="ignite-metadata" />
+        <property name="kafkaPartsFrom" value="0" />
+        <property name="kafkaPartsTo" value="16" />
+        <property name="kafkaRequestTimeout" ref="requestTimeout" />
+        <property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
+        <property name="threadCount" value="-4" />
+    </bean>
+
+    <util:properties id="kafkaProperties" location="loader/kafka.properties" />
+</beans>
diff --git 
a/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-without-metadata-topic.xml
 
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-without-metadata-topic.xml
new file mode 100644
index 00000000..72ac224c
--- /dev/null
+++ 
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-without-metadata-topic.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xmlns:util="http://www.springframework.org/schema/util";
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/util 
http://www.springframework.org/schema/util/spring-util.xsd";>
+    <bean id="client.cfg" 
class="org.apache.ignite.configuration.ClientConfiguration">
+        <property name="addresses">
+            <list>
+                <value>127.0.0.1:10800</value>
+            </list>
+        </property>
+    </bean>
+
+    <util:constant 
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT"
 id="consumerPollTimeout" />
+    <util:constant 
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT"
 id="requestTimeout" />
+
+    <bean id="streamer.cfg" 
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+        <property name="topic" value="ignite" />
+        <property name="kafkaPartsFrom" value="0" />
+        <property name="kafkaPartsTo" value="16" />
+        <property name="kafkaRequestTimeout" ref="requestTimeout" />
+        <property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
+    </bean>
+
+    <util:properties id="kafkaProperties" location="loader/kafka.properties" />
+</beans>
diff --git 
a/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-without-topic.xml
 
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-without-topic.xml
new file mode 100644
index 00000000..fe40265a
--- /dev/null
+++ 
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-without-topic.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xmlns:util="http://www.springframework.org/schema/util";
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/util 
http://www.springframework.org/schema/util/spring-util.xsd";>
+    <bean id="client.cfg" 
class="org.apache.ignite.configuration.ClientConfiguration">
+        <property name="addresses">
+            <list>
+                <value>127.0.0.1:10800</value>
+            </list>
+        </property>
+    </bean>
+
+    <util:constant 
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT"
 id="consumerPollTimeout" />
+    <util:constant 
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT"
 id="requestTimeout" />
+
+    <bean id="streamer.cfg" 
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+        <property name="metadataTopic" value="ignite-metadata" />
+        <property name="kafkaPartsFrom" value="0" />
+        <property name="kafkaPartsTo" value="16" />
+        <property name="kafkaRequestTimeout" ref="requestTimeout" />
+        <property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
+    </bean>
+
+    <util:properties id="kafkaProperties" location="loader/kafka.properties" />
+</beans>


Reply via email to