This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f54b9bb8dd7 [HUDI-6191] Improve passing the debezium checkpoint values 
to start job from offset (#11686)
f54b9bb8dd7 is described below

commit f54b9bb8dd7c77ba159f71feaca6f8475c15b535
Author: Vova Kolmakov <[email protected]>
AuthorDate: Fri Jul 26 08:25:56 2024 +0700

    [HUDI-6191] Improve passing the debezium checkpoint values to start job 
from offset (#11686)
    
    Co-authored-by: Vova Kolmakov <[email protected]>
---
 .../hudi/utilities/config/KafkaSourceConfig.java   | 14 ++++-
 .../utilities/sources/helpers/KafkaOffsetGen.java  | 45 +++++++++++----
 .../sources/helpers/TestKafkaOffsetGen.java        | 66 ++++++++++++++++++----
 3 files changed, 102 insertions(+), 23 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
index 6215e99d665..92f1f1cc507 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
@@ -41,15 +41,25 @@ import static 
org.apache.hudi.common.util.ConfigUtils.STREAMER_CONFIG_PREFIX;
     description = "Configurations controlling the behavior of Kafka source in 
Hudi Streamer.")
 public class KafkaSourceConfig extends HoodieConfig {
 
+  public static final String KAFKA_CHECKPOINT_TYPE_STRING = "string";
+  public static final String KAFKA_CHECKPOINT_TYPE_TIMESTAMP = "timestamp";
+  public static final String KAFKA_CHECKPOINT_TYPE_SINGLE_OFFSET = 
"single_offset";
+
   private static final String PREFIX = STREAMER_CONFIG_PREFIX + 
"source.kafka.";
   private static final String OLD_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + 
"source.kafka.";
 
   public static final ConfigProperty<String> KAFKA_CHECKPOINT_TYPE = 
ConfigProperty
       .key(PREFIX + "checkpoint.type")
-      .defaultValue("string")
+      .defaultValue(KAFKA_CHECKPOINT_TYPE_STRING)
       .withAlternatives(OLD_PREFIX + "checkpoint.type")
       .markAdvanced()
-      .withDocumentation("Kafka checkpoint type.");
+      .withDocumentation("Kafka checkpoint type. Value must be one of the 
following: "
+          + KAFKA_CHECKPOINT_TYPE_STRING + ", " + 
KAFKA_CHECKPOINT_TYPE_TIMESTAMP + ", " + KAFKA_CHECKPOINT_TYPE_SINGLE_OFFSET
+          + ". Default type is " + KAFKA_CHECKPOINT_TYPE_STRING + ". "
+          + "For type " + KAFKA_CHECKPOINT_TYPE_STRING + ", checkpoint should 
be provided as: topicName,0:offset0,1:offset1,2:offset2. "
+          + "For type " + KAFKA_CHECKPOINT_TYPE_TIMESTAMP + ", checkpoint 
should be provided as long value of desired timestamp. "
+          + "For type " + KAFKA_CHECKPOINT_TYPE_SINGLE_OFFSET + ", we assume 
that topic consists of a single partition, "
+          + "so checkpoint should be provided as long value of desired 
offset.");
 
   public static final ConfigProperty<String> 
KAFKA_AVRO_VALUE_DESERIALIZER_CLASS = ConfigProperty
       .key(PREFIX + "value.deserializer.class")
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index 6274f838f84..028e44bbe50 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -60,6 +60,8 @@ import static 
org.apache.hudi.common.util.ConfigUtils.checkRequiredProperties;
 import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
+import static 
org.apache.hudi.utilities.config.KafkaSourceConfig.KAFKA_CHECKPOINT_TYPE_SINGLE_OFFSET;
+import static 
org.apache.hudi.utilities.config.KafkaSourceConfig.KAFKA_CHECKPOINT_TYPE_TIMESTAMP;
 import static 
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils.checkTopicCheckpoint;
 
 /**
@@ -70,7 +72,6 @@ public class KafkaOffsetGen {
   private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffsetGen.class);
   private static final String METRIC_NAME_KAFKA_DELAY_COUNT = 
"kafkaDelayCount";
   private static final Comparator<OffsetRange> SORT_BY_PARTITION = 
Comparator.comparing(OffsetRange::partition);
-  public static final String KAFKA_CHECKPOINT_TYPE_TIMESTAMP = "timestamp";
 
   public static class CheckpointUtils {
     /**
@@ -258,13 +259,13 @@ public class KafkaOffsetGen {
     long numEvents;
     if (sourceLimit == Long.MAX_VALUE) {
       numEvents = maxEventsToReadFromKafka;
-      LOG.info("SourceLimit not configured, set numEvents to default value : " 
+ maxEventsToReadFromKafka);
+      LOG.info("SourceLimit not configured, set numEvents to default value : 
{}", maxEventsToReadFromKafka);
     } else {
       numEvents = sourceLimit;
     }
 
     long minPartitions = getLongWithAltKeys(props, 
KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS);
-    LOG.info("getNextOffsetRanges set config " + 
KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key() + " to " + minPartitions);
+    LOG.info("getNextOffsetRanges set config {} to {}", 
KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key(), minPartitions);
 
     return getNextOffsetRanges(lastCheckpointStr, numEvents, minPartitions, 
metrics);
   }
@@ -281,8 +282,14 @@ public class KafkaOffsetGen {
       Set<TopicPartition> topicPartitions = partitionInfoList.stream()
               .map(x -> new TopicPartition(x.topic(), 
x.partition())).collect(Collectors.toSet());
 
-      if (KAFKA_CHECKPOINT_TYPE_TIMESTAMP.equals(kafkaCheckpointType) && 
isValidTimestampCheckpointType(lastCheckpointStr)) {
+      if 
(KAFKA_CHECKPOINT_TYPE_TIMESTAMP.equalsIgnoreCase(kafkaCheckpointType) && 
isValidTimestampCheckpointType(lastCheckpointStr)) {
         lastCheckpointStr = getOffsetsByTimestamp(consumer, partitionInfoList, 
topicPartitions, topicName, Long.parseLong(lastCheckpointStr.get()));
+      } else if 
(KAFKA_CHECKPOINT_TYPE_SINGLE_OFFSET.equalsIgnoreCase(kafkaCheckpointType) && 
partitionInfoList.size() != 1) {
+        throw new HoodieException("Kafka topic " + topicName + " has " + 
partitionInfoList.size()
+            + " partitions (more than 1). single_offset checkpoint type is not 
applicable.");
+      } else if 
(KAFKA_CHECKPOINT_TYPE_SINGLE_OFFSET.equalsIgnoreCase(kafkaCheckpointType)
+          && partitionInfoList.size() == 1 && 
isValidOffsetCheckpointType(lastCheckpointStr)) {
+        lastCheckpointStr = Option.of(topicName + ",0:" + 
lastCheckpointStr.get());
       }
       // Determine the offset ranges to read from
       if (lastCheckpointStr.isPresent() && !lastCheckpointStr.get().isEmpty() 
&& checkTopicCheckpoint(lastCheckpointStr)) {
@@ -369,8 +376,7 @@ public class KafkaOffsetGen {
       if (getBooleanWithAltKeys(this.props, 
KafkaSourceConfig.ENABLE_FAIL_ON_DATA_LOSS)) {
         throw new HoodieStreamerException(message);
       } else {
-        LOG.warn(message
-            + " If you want Hudi Streamer to fail on such cases, set \"" + 
KafkaSourceConfig.ENABLE_FAIL_ON_DATA_LOSS.key() + "\" to \"true\".");
+        LOG.warn("{} If you want Hudi Streamer to fail on such cases, set 
\"{}\" to \"true\".", message, 
KafkaSourceConfig.ENABLE_FAIL_ON_DATA_LOSS.key());
       }
     }
     return isCheckpointOutOfBounds ? earliestOffsets : checkpointOffsets;
@@ -390,6 +396,24 @@ public class KafkaOffsetGen {
     return isNum.matches() && (lastCheckpointStr.get().length() == 13 || 
lastCheckpointStr.get().length() == 10);
   }
 
+  /**
+   * Check if checkpoint is a single offset
+   * @param lastCheckpointStr
+   * @return
+   */
+  private Boolean isValidOffsetCheckpointType(Option<String> 
lastCheckpointStr) {
+    if (!lastCheckpointStr.isPresent()) {
+      return false;
+    }
+    try {
+      Long.parseUnsignedLong(lastCheckpointStr.get());
+      return true;
+    } catch (NumberFormatException ex) {
+      LOG.warn("Checkpoint type is set to single_offset, but provided value of 
checkpoint=\"{}\" is not a valid number", lastCheckpointStr.get());
+      return false;
+    }
+  }
+
   private Long delayOffsetCalculation(Option<String> lastCheckpointStr, 
Set<TopicPartition> topicPartitions, KafkaConsumer consumer) {
     Long delayCount = 0L;
     Map<TopicPartition, Long> checkpointOffsets = 
CheckpointUtils.strToOffsets(lastCheckpointStr.get());
@@ -424,16 +448,15 @@ public class KafkaOffsetGen {
     Map<TopicPartition, Long> earliestOffsets = 
consumer.beginningOffsets(topicPartitions);
     Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp = 
consumer.offsetsForTimes(topicPartitionsTimestamp);
 
-    StringBuilder sb = new StringBuilder();
-    sb.append(topicName + ",");
+    StringBuilder sb = new StringBuilder(topicName);
     for (Map.Entry<TopicPartition, OffsetAndTimestamp> map : 
offsetAndTimestamp.entrySet()) {
       if (map.getValue() != null) {
-        
sb.append(map.getKey().partition()).append(":").append(map.getValue().offset()).append(",");
+        
sb.append(",").append(map.getKey().partition()).append(":").append(map.getValue().offset());
       } else {
-        
sb.append(map.getKey().partition()).append(":").append(earliestOffsets.get(map.getKey())).append(",");
+        
sb.append(",").append(map.getKey().partition()).append(":").append(earliestOffsets.get(map.getKey()));
       }
     }
-    return Option.of(sb.deleteCharAt(sb.length() - 1).toString());
+    return Option.of(sb.toString());
   }
 
   /**
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
index ba85f04ebcb..db8bcab42b0 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
@@ -21,6 +21,7 @@ package org.apache.hudi.utilities.sources.helpers;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.utilities.config.KafkaSourceConfig;
 import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
@@ -37,6 +38,9 @@ import org.junit.jupiter.api.Test;
 
 import java.util.UUID;
 
+import static 
org.apache.hudi.utilities.config.KafkaSourceConfig.KAFKA_CHECKPOINT_TYPE_SINGLE_OFFSET;
+import static 
org.apache.hudi.utilities.config.KafkaSourceConfig.KAFKA_CHECKPOINT_TYPE_STRING;
+import static 
org.apache.hudi.utilities.config.KafkaSourceConfig.KAFKA_CHECKPOINT_TYPE_TIMESTAMP;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -81,7 +85,7 @@ public class TestKafkaOffsetGen {
     testUtils.createTopic(testTopicName, 1);
     testUtils.sendMessages(testTopicName, 
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
 
-    KafkaOffsetGen kafkaOffsetGen = new 
KafkaOffsetGen(getConsumerConfigs("earliest", "string"));
+    KafkaOffsetGen kafkaOffsetGen = new 
KafkaOffsetGen(getConsumerConfigs("earliest", KAFKA_CHECKPOINT_TYPE_STRING));
     OffsetRange[] nextOffsetRanges = 
kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 500, metrics);
     assertEquals(1, nextOffsetRanges.length);
     assertEquals(0, nextOffsetRanges[0].fromOffset());
@@ -98,7 +102,7 @@ public class TestKafkaOffsetGen {
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
     testUtils.createTopic(testTopicName, 1);
     testUtils.sendMessages(testTopicName, 
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
-    KafkaOffsetGen kafkaOffsetGen = new 
KafkaOffsetGen(getConsumerConfigs("latest", "string"));
+    KafkaOffsetGen kafkaOffsetGen = new 
KafkaOffsetGen(getConsumerConfigs("latest", KAFKA_CHECKPOINT_TYPE_STRING));
     OffsetRange[] nextOffsetRanges = 
kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 500, metrics);
     assertEquals(1, nextOffsetRanges.length);
     assertEquals(1000, nextOffsetRanges[0].fromOffset());
@@ -111,7 +115,7 @@ public class TestKafkaOffsetGen {
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
     testUtils.createTopic(testTopicName, 1);
     testUtils.sendMessages(testTopicName, 
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
-    KafkaOffsetGen kafkaOffsetGen = new 
KafkaOffsetGen(getConsumerConfigs("latest", "string"));
+    KafkaOffsetGen kafkaOffsetGen = new 
KafkaOffsetGen(getConsumerConfigs("latest", KAFKA_CHECKPOINT_TYPE_STRING));
 
     OffsetRange[] nextOffsetRanges = 
kafkaOffsetGen.getNextOffsetRanges(Option.of(lastCheckpointString), 500, 
metrics);
     assertEquals(1, nextOffsetRanges.length);
@@ -125,7 +129,7 @@ public class TestKafkaOffsetGen {
     testUtils.createTopic(testTopicName, 1);
     testUtils.sendMessages(testTopicName, 
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
 
-    KafkaOffsetGen kafkaOffsetGen = new 
KafkaOffsetGen(getConsumerConfigs("latest", "timestamp"));
+    KafkaOffsetGen kafkaOffsetGen = new 
KafkaOffsetGen(getConsumerConfigs("latest", KAFKA_CHECKPOINT_TYPE_TIMESTAMP));
 
     OffsetRange[] nextOffsetRanges = 
kafkaOffsetGen.getNextOffsetRanges(Option.of(String.valueOf(System.currentTimeMillis()
 - 100000)), 500, metrics);
     assertEquals(1, nextOffsetRanges.length);
@@ -133,12 +137,54 @@ public class TestKafkaOffsetGen {
     assertEquals(500, nextOffsetRanges[0].untilOffset());
   }
 
+  @Test
+  public void testGetNextOffsetRangesFromSingleOffsetCheckpoint() {
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    testUtils.createTopic(testTopicName, 1);
+    testUtils.sendMessages(testTopicName, 
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+    KafkaOffsetGen kafkaOffsetGen = new 
KafkaOffsetGen(getConsumerConfigs("latest", 
KAFKA_CHECKPOINT_TYPE_SINGLE_OFFSET));
+
+    // long positive value of offset => get it
+    String lastCheckpointString = "250";
+    OffsetRange[] nextOffsetRanges = 
kafkaOffsetGen.getNextOffsetRanges(Option.of(lastCheckpointString), 500, 
metrics);
+    assertEquals(1, nextOffsetRanges.length);
+    assertEquals(250, nextOffsetRanges[0].fromOffset());
+    assertEquals(750, nextOffsetRanges[0].untilOffset());
+
+    // negative offset value => get by autoOffsetReset config
+    lastCheckpointString = "-2";
+    nextOffsetRanges = 
kafkaOffsetGen.getNextOffsetRanges(Option.of(lastCheckpointString), 500, 
metrics);
+    assertEquals(1, nextOffsetRanges.length);
+    assertEquals(1000, nextOffsetRanges[0].fromOffset());
+    assertEquals(1000, nextOffsetRanges[0].untilOffset());
+
+    // incorrect offset value => get by autoOffsetReset config
+    kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("earliest", 
KAFKA_CHECKPOINT_TYPE_SINGLE_OFFSET));
+    lastCheckpointString = "garbage";
+    nextOffsetRanges = 
kafkaOffsetGen.getNextOffsetRanges(Option.of(lastCheckpointString), 5000, 
metrics);
+    assertEquals(1, nextOffsetRanges.length);
+    assertEquals(0, nextOffsetRanges[0].fromOffset());
+    assertEquals(1000, nextOffsetRanges[0].untilOffset());
+  }
+
+  @Test
+  public void testGetNextOffsetRangesFromSingleOffsetCheckpointNotApplicable() 
{
+    testUtils.createTopic(testTopicName, 2);
+    KafkaOffsetGen kafkaOffsetGen = new 
KafkaOffsetGen(getConsumerConfigs("latest", 
KAFKA_CHECKPOINT_TYPE_SINGLE_OFFSET));
+
+    // incorrect number of partitions => exception (number of partitions is 
more than 1)
+    String lastCheckpointString = "250";
+    Exception exception = assertThrows(HoodieException.class,
+        () -> 
kafkaOffsetGen.getNextOffsetRanges(Option.of(lastCheckpointString), 500, 
metrics));
+    assertTrue(exception.getMessage().startsWith("Kafka topic " + 
testTopicName + " has 2 partitions (more than 1)"));
+  }
+
   @Test
   public void testGetNextOffsetRangesFromMultiplePartitions() {
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
     testUtils.createTopic(testTopicName, 2);
     testUtils.sendMessages(testTopicName, 
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
-    KafkaOffsetGen kafkaOffsetGen = new 
KafkaOffsetGen(getConsumerConfigs("earliest", "string"));
+    KafkaOffsetGen kafkaOffsetGen = new 
KafkaOffsetGen(getConsumerConfigs("earliest", KAFKA_CHECKPOINT_TYPE_STRING));
     OffsetRange[] nextOffsetRanges = 
kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 499, metrics);
     assertEquals(3, nextOffsetRanges.length);
     assertEquals(0, nextOffsetRanges[0].fromOffset());
@@ -154,7 +200,7 @@ public class TestKafkaOffsetGen {
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
     testUtils.createTopic(testTopicName, 2);
     testUtils.sendMessages(testTopicName, 
Helpers.jsonifyRecordsByPartitions(dataGenerator.generateInserts("000", 1000), 
2));
-    KafkaOffsetGen kafkaOffsetGen = new 
KafkaOffsetGen(getConsumerConfigs("group", "string"));
+    KafkaOffsetGen kafkaOffsetGen = new 
KafkaOffsetGen(getConsumerConfigs("group", KAFKA_CHECKPOINT_TYPE_STRING));
     String lastCheckpointString = testTopicName + ",0:250,1:249";
     kafkaOffsetGen.commitOffsetToKafka(lastCheckpointString);
     // don't pass lastCheckpointString as we want to read from group committed 
offset
@@ -191,7 +237,7 @@ public class TestKafkaOffsetGen {
     assertEquals(1, nextOffsetRanges[1].partition());
 
     // committed offsets are not present for the consumer group
-    kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("group", "string"));
+    kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("group", 
KAFKA_CHECKPOINT_TYPE_STRING));
     nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300, 
metrics);
     assertEquals(500, nextOffsetRanges[0].fromOffset());
     assertEquals(500, nextOffsetRanges[0].untilOffset());
@@ -204,7 +250,7 @@ public class TestKafkaOffsetGen {
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
     testUtils.createTopic(testTopicName, 1);
     testUtils.sendMessages(testTopicName, 
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
-    TypedProperties props = getConsumerConfigs("earliest", "string");
+    TypedProperties props = getConsumerConfigs("earliest", 
KAFKA_CHECKPOINT_TYPE_STRING);
 
     // default no minPartition set
     KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(props);
@@ -226,7 +272,7 @@ public class TestKafkaOffsetGen {
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
     testUtils.createTopic(testTopicName, 2);
     testUtils.sendMessages(testTopicName, 
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
-    TypedProperties props = getConsumerConfigs("earliest", "string");
+    TypedProperties props = getConsumerConfigs("earliest", 
KAFKA_CHECKPOINT_TYPE_STRING);
 
     // default no minPartition or minPartition less than TopicPartitions
     KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(props);
@@ -271,7 +317,7 @@ public class TestKafkaOffsetGen {
 
   @Test
   public void testCheckTopicExists() {
-    TypedProperties props = getConsumerConfigs("latest", "string");
+    TypedProperties props = getConsumerConfigs("latest", 
KAFKA_CHECKPOINT_TYPE_STRING);
     KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(props);
     testUtils.createTopic(testTopicName, 1);
     boolean topicExists = kafkaOffsetGen.checkTopicExists(new 
KafkaConsumer(props));

Reply via email to