This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new 79a88953 IGNITE-20297 Fix k2i NPE (#228)
79a88953 is described below
commit 79a88953ac5fddc232002b54439b4b7b1f0d3d85
Author: Nikolay <[email protected]>
AuthorDate: Tue Aug 29 09:21:30 2023 +0300
IGNITE-20297 Fix k2i NPE (#228)
---
.../cdc/kafka/KafkaToIgniteMetadataUpdater.java | 9 ++++++++-
.../ignite/cdc/kafka/CdcKafkaReplicationTest.java | 6 +++++-
.../cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java | 21 ++++++++++++++++++++-
3 files changed, 33 insertions(+), 3 deletions(-)
diff --git
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
index b8e8a9ad..6135b9db 100644
---
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
+++
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
@@ -20,6 +20,7 @@ package org.apache.ignite.cdc.kafka;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@@ -38,6 +39,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.VoidDeserializer;
@@ -103,7 +105,12 @@ public class KafkaToIgniteMetadataUpdater implements
AutoCloseable, OffsetCommit
String metaTopic = streamerCfg.getMetadataTopic();
- parts = cnsmr.partitionsFor(metaTopic,
Duration.ofMillis(kafkaReqTimeout))
+ List<PartitionInfo> topicMeta = cnsmr.partitionsFor(metaTopic,
Duration.ofMillis(kafkaReqTimeout));
+
+ if (topicMeta == null)
+ throw new IgniteException("Unknown topic: " + metaTopic);
+
+ parts = topicMeta
.stream()
.map(pInfo -> new TopicPartition(metaTopic, pInfo.partition()))
.collect(Collectors.toSet());
diff --git
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
index 403abdac..685c50b1 100644
---
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
+++
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
@@ -249,7 +249,11 @@ public class CdcKafkaReplicationTest extends
AbstractReplicationTest {
EmbeddedKafkaCluster kafka = curKafka;
if (kafka == null) {
- kafka = new EmbeddedKafkaCluster(1);
+ Properties brokerProperties = new Properties();
+
+ brokerProperties.put("auto.create.topics.enable", "false");
+
+ kafka = new EmbeddedKafkaCluster(1, brokerProperties);
kafka.start();
}
diff --git
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java
index fee428b9..6167503a 100644
---
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java
+++
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.cdc.kafka;
import java.util.Collections;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cdc.CdcConsumer;
import org.apache.ignite.cdc.TypeMapping;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -41,6 +42,7 @@ import static
org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.initKafka;
import static
org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.kafkaProperties;
import static
org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.removeKafkaTopicsAndWait;
import static
org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_KAFKA_REQ_TIMEOUT;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
import static org.apache.logging.log4j.Level.DEBUG;
@@ -96,6 +98,18 @@ public class KafkaToIgniteMetadataUpdaterTest extends
GridCommonAbstractTest {
POLL_SKIP_LISTENER.reset();
}
+ /** */
+ @Test
+ public void testThrowsForUnknownTopic() {
+ KafkaToIgniteCdcStreamerConfiguration cfg = streamerConfiguration();
+
+ String topic = "not-existing-topic";
+
+ cfg.setMetadataTopic(topic);
+
+ assertThrows(null, () -> metadataUpdater(cfg), IgniteException.class,
"Unknown topic: " + topic);
+ }
+
/** */
@Test
public void testUpdateMetadata() throws Exception {
@@ -147,6 +161,11 @@ public class KafkaToIgniteMetadataUpdaterTest extends
GridCommonAbstractTest {
/** */
private KafkaToIgniteMetadataUpdater metadataUpdater() {
+ return metadataUpdater(streamerConfiguration());
+ }
+
+ /** */
+ private KafkaToIgniteMetadataUpdater
metadataUpdater(KafkaToIgniteCdcStreamerConfiguration streamerCfg) {
BinaryContext noOpCtx = new
BinaryContext(BinaryNoopMetadataHandler.instance(), new IgniteConfiguration(),
log) {
@Override public boolean registerUserClassName(int typeId, String
clsName, boolean failIfUnregistered,
boolean onlyLocReg, byte platformId) {
@@ -154,7 +173,7 @@ public class KafkaToIgniteMetadataUpdaterTest extends
GridCommonAbstractTest {
}
};
- return new KafkaToIgniteMetadataUpdater(noOpCtx, listeningLog,
kafkaProperties(kafka), streamerConfiguration());
+ return new KafkaToIgniteMetadataUpdater(noOpCtx, listeningLog,
kafkaProperties(kafka), streamerCfg);
}
/** */