This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git


The following commit(s) were added to refs/heads/master by this push:
     new 79a88953 IGNITE-20297 Fix k2i NPE (#228)
79a88953 is described below

commit 79a88953ac5fddc232002b54439b4b7b1f0d3d85
Author: Nikolay <[email protected]>
AuthorDate: Tue Aug 29 09:21:30 2023 +0300

    IGNITE-20297 Fix k2i NPE (#228)
---
 .../cdc/kafka/KafkaToIgniteMetadataUpdater.java     |  9 ++++++++-
 .../ignite/cdc/kafka/CdcKafkaReplicationTest.java   |  6 +++++-
 .../cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java | 21 ++++++++++++++++++++-
 3 files changed, 33 insertions(+), 3 deletions(-)

diff --git 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
index b8e8a9ad..6135b9db 100644
--- 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
+++ 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
@@ -20,6 +20,7 @@ package org.apache.ignite.cdc.kafka;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -38,6 +39,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.VoidDeserializer;
@@ -103,7 +105,12 @@ public class KafkaToIgniteMetadataUpdater implements 
AutoCloseable, OffsetCommit
 
         String metaTopic = streamerCfg.getMetadataTopic();
 
-        parts = cnsmr.partitionsFor(metaTopic, 
Duration.ofMillis(kafkaReqTimeout))
+        List<PartitionInfo> topicMeta = cnsmr.partitionsFor(metaTopic, 
Duration.ofMillis(kafkaReqTimeout));
+
+        if (topicMeta == null)
+            throw new IgniteException("Unknown topic: " + metaTopic);
+
+        parts = topicMeta
             .stream()
             .map(pInfo -> new TopicPartition(metaTopic, pInfo.partition()))
             .collect(Collectors.toSet());
diff --git 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
index 403abdac..685c50b1 100644
--- 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
+++ 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
@@ -249,7 +249,11 @@ public class CdcKafkaReplicationTest extends 
AbstractReplicationTest {
         EmbeddedKafkaCluster kafka = curKafka;
 
         if (kafka == null) {
-            kafka = new EmbeddedKafkaCluster(1);
+            Properties brokerProperties = new Properties();
+
+            brokerProperties.put("auto.create.topics.enable", "false");
+
+            kafka = new EmbeddedKafkaCluster(1, brokerProperties);
 
             kafka.start();
         }
diff --git 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java
 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java
index fee428b9..6167503a 100644
--- 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java
+++ 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.cdc.kafka;
 
 import java.util.Collections;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.cdc.CdcConsumer;
 import org.apache.ignite.cdc.TypeMapping;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -41,6 +42,7 @@ import static 
org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.initKafka;
 import static 
org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.kafkaProperties;
 import static 
org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.removeKafkaTopicsAndWait;
 import static 
org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_KAFKA_REQ_TIMEOUT;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 import static org.apache.logging.log4j.Level.DEBUG;
 
@@ -96,6 +98,18 @@ public class KafkaToIgniteMetadataUpdaterTest extends 
GridCommonAbstractTest {
         POLL_SKIP_LISTENER.reset();
     }
 
+    /** */
+    @Test
+    public void testThrowsForUnknownTopic() {
+        KafkaToIgniteCdcStreamerConfiguration cfg = streamerConfiguration();
+
+        String topic = "not-existing-topic";
+
+        cfg.setMetadataTopic(topic);
+
+        assertThrows(null, () -> metadataUpdater(cfg), IgniteException.class, 
"Unknown topic: " + topic);
+    }
+
     /** */
     @Test
     public void testUpdateMetadata() throws Exception {
@@ -147,6 +161,11 @@ public class KafkaToIgniteMetadataUpdaterTest extends 
GridCommonAbstractTest {
 
     /** */
     private KafkaToIgniteMetadataUpdater metadataUpdater() {
+        return metadataUpdater(streamerConfiguration());
+    }
+
+    /** */
+    private KafkaToIgniteMetadataUpdater 
metadataUpdater(KafkaToIgniteCdcStreamerConfiguration streamerCfg) {
         BinaryContext noOpCtx = new 
BinaryContext(BinaryNoopMetadataHandler.instance(), new IgniteConfiguration(), 
log) {
             @Override public boolean registerUserClassName(int typeId, String 
clsName, boolean failIfUnregistered,
                 boolean onlyLocReg, byte platformId) {
@@ -154,7 +173,7 @@ public class KafkaToIgniteMetadataUpdaterTest extends 
GridCommonAbstractTest {
             }
         };
 
-        return new KafkaToIgniteMetadataUpdater(noOpCtx, listeningLog, 
kafkaProperties(kafka), streamerConfiguration());
+        return new KafkaToIgniteMetadataUpdater(noOpCtx, listeningLog, 
kafkaProperties(kafka), streamerCfg);
     }
 
     /** */

Reply via email to