This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f54b9bb8dd7 [HUDI-6191] Improve passing the debezium checkpoint values
to start job from offset (#11686)
f54b9bb8dd7 is described below
commit f54b9bb8dd7c77ba159f71feaca6f8475c15b535
Author: Vova Kolmakov <[email protected]>
AuthorDate: Fri Jul 26 08:25:56 2024 +0700
[HUDI-6191] Improve passing the debezium checkpoint values to start job
from offset (#11686)
Co-authored-by: Vova Kolmakov <[email protected]>
---
.../hudi/utilities/config/KafkaSourceConfig.java | 14 ++++-
.../utilities/sources/helpers/KafkaOffsetGen.java | 45 +++++++++++----
.../sources/helpers/TestKafkaOffsetGen.java | 66 ++++++++++++++++++----
3 files changed, 102 insertions(+), 23 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
index 6215e99d665..92f1f1cc507 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
@@ -41,15 +41,25 @@ import static
org.apache.hudi.common.util.ConfigUtils.STREAMER_CONFIG_PREFIX;
description = "Configurations controlling the behavior of Kafka source in
Hudi Streamer.")
public class KafkaSourceConfig extends HoodieConfig {
+ public static final String KAFKA_CHECKPOINT_TYPE_STRING = "string";
+ public static final String KAFKA_CHECKPOINT_TYPE_TIMESTAMP = "timestamp";
+ public static final String KAFKA_CHECKPOINT_TYPE_SINGLE_OFFSET =
"single_offset";
+
private static final String PREFIX = STREAMER_CONFIG_PREFIX +
"source.kafka.";
private static final String OLD_PREFIX = DELTA_STREAMER_CONFIG_PREFIX +
"source.kafka.";
public static final ConfigProperty<String> KAFKA_CHECKPOINT_TYPE =
ConfigProperty
.key(PREFIX + "checkpoint.type")
- .defaultValue("string")
+ .defaultValue(KAFKA_CHECKPOINT_TYPE_STRING)
.withAlternatives(OLD_PREFIX + "checkpoint.type")
.markAdvanced()
- .withDocumentation("Kafka checkpoint type.");
+ .withDocumentation("Kafka checkpoint type. Value must be one of the
following: "
+ + KAFKA_CHECKPOINT_TYPE_STRING + ", " +
KAFKA_CHECKPOINT_TYPE_TIMESTAMP + ", " + KAFKA_CHECKPOINT_TYPE_SINGLE_OFFSET
+ + ". Default type is " + KAFKA_CHECKPOINT_TYPE_STRING + ". "
+ + "For type " + KAFKA_CHECKPOINT_TYPE_STRING + ", checkpoint should
be provided as: topicName,0:offset0,1:offset1,2:offset2. "
+ + "For type " + KAFKA_CHECKPOINT_TYPE_TIMESTAMP + ", checkpoint
should be provided as long value of desired timestamp. "
+ + "For type " + KAFKA_CHECKPOINT_TYPE_SINGLE_OFFSET + ", we assume
that topic consists of a single partition, "
+ + "so checkpoint should be provided as long value of desired
offset.");
public static final ConfigProperty<String>
KAFKA_AVRO_VALUE_DESERIALIZER_CLASS = ConfigProperty
.key(PREFIX + "value.deserializer.class")
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index 6274f838f84..028e44bbe50 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -60,6 +60,8 @@ import static
org.apache.hudi.common.util.ConfigUtils.checkRequiredProperties;
import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
+import static
org.apache.hudi.utilities.config.KafkaSourceConfig.KAFKA_CHECKPOINT_TYPE_SINGLE_OFFSET;
+import static
org.apache.hudi.utilities.config.KafkaSourceConfig.KAFKA_CHECKPOINT_TYPE_TIMESTAMP;
import static
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils.checkTopicCheckpoint;
/**
@@ -70,7 +72,6 @@ public class KafkaOffsetGen {
private static final Logger LOG =
LoggerFactory.getLogger(KafkaOffsetGen.class);
private static final String METRIC_NAME_KAFKA_DELAY_COUNT =
"kafkaDelayCount";
private static final Comparator<OffsetRange> SORT_BY_PARTITION =
Comparator.comparing(OffsetRange::partition);
- public static final String KAFKA_CHECKPOINT_TYPE_TIMESTAMP = "timestamp";
public static class CheckpointUtils {
/**
@@ -258,13 +259,13 @@ public class KafkaOffsetGen {
long numEvents;
if (sourceLimit == Long.MAX_VALUE) {
numEvents = maxEventsToReadFromKafka;
- LOG.info("SourceLimit not configured, set numEvents to default value : "
+ maxEventsToReadFromKafka);
+ LOG.info("SourceLimit not configured, set numEvents to default value :
{}", maxEventsToReadFromKafka);
} else {
numEvents = sourceLimit;
}
long minPartitions = getLongWithAltKeys(props,
KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS);
- LOG.info("getNextOffsetRanges set config " +
KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key() + " to " + minPartitions);
+ LOG.info("getNextOffsetRanges set config {} to {}",
KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key(), minPartitions);
return getNextOffsetRanges(lastCheckpointStr, numEvents, minPartitions,
metrics);
}
@@ -281,8 +282,14 @@ public class KafkaOffsetGen {
Set<TopicPartition> topicPartitions = partitionInfoList.stream()
.map(x -> new TopicPartition(x.topic(),
x.partition())).collect(Collectors.toSet());
- if (KAFKA_CHECKPOINT_TYPE_TIMESTAMP.equals(kafkaCheckpointType) &&
isValidTimestampCheckpointType(lastCheckpointStr)) {
+ if
(KAFKA_CHECKPOINT_TYPE_TIMESTAMP.equalsIgnoreCase(kafkaCheckpointType) &&
isValidTimestampCheckpointType(lastCheckpointStr)) {
lastCheckpointStr = getOffsetsByTimestamp(consumer, partitionInfoList,
topicPartitions, topicName, Long.parseLong(lastCheckpointStr.get()));
+ } else if
(KAFKA_CHECKPOINT_TYPE_SINGLE_OFFSET.equalsIgnoreCase(kafkaCheckpointType) &&
partitionInfoList.size() != 1) {
+ throw new HoodieException("Kafka topic " + topicName + " has " +
partitionInfoList.size()
+ + " partitions (more than 1). single_offset checkpoint type is not
applicable.");
+ } else if
(KAFKA_CHECKPOINT_TYPE_SINGLE_OFFSET.equalsIgnoreCase(kafkaCheckpointType)
+ && partitionInfoList.size() == 1 &&
isValidOffsetCheckpointType(lastCheckpointStr)) {
+ lastCheckpointStr = Option.of(topicName + ",0:" +
lastCheckpointStr.get());
}
// Determine the offset ranges to read from
if (lastCheckpointStr.isPresent() && !lastCheckpointStr.get().isEmpty()
&& checkTopicCheckpoint(lastCheckpointStr)) {
@@ -369,8 +376,7 @@ public class KafkaOffsetGen {
if (getBooleanWithAltKeys(this.props,
KafkaSourceConfig.ENABLE_FAIL_ON_DATA_LOSS)) {
throw new HoodieStreamerException(message);
} else {
- LOG.warn(message
- + " If you want Hudi Streamer to fail on such cases, set \"" +
KafkaSourceConfig.ENABLE_FAIL_ON_DATA_LOSS.key() + "\" to \"true\".");
+ LOG.warn("{} If you want Hudi Streamer to fail on such cases, set
\"{}\" to \"true\".", message,
KafkaSourceConfig.ENABLE_FAIL_ON_DATA_LOSS.key());
}
}
return isCheckpointOutOfBounds ? earliestOffsets : checkpointOffsets;
@@ -390,6 +396,24 @@ public class KafkaOffsetGen {
return isNum.matches() && (lastCheckpointStr.get().length() == 13 ||
lastCheckpointStr.get().length() == 10);
}
+ /**
+ * Check if checkpoint is a single offset
+ * @param lastCheckpointStr
+ * @return
+ */
+ private Boolean isValidOffsetCheckpointType(Option<String>
lastCheckpointStr) {
+ if (!lastCheckpointStr.isPresent()) {
+ return false;
+ }
+ try {
+ Long.parseUnsignedLong(lastCheckpointStr.get());
+ return true;
+ } catch (NumberFormatException ex) {
+ LOG.warn("Checkpoint type is set to single_offset, but provided value of
checkpoint=\"{}\" is not a valid number", lastCheckpointStr.get());
+ return false;
+ }
+ }
+
private Long delayOffsetCalculation(Option<String> lastCheckpointStr,
Set<TopicPartition> topicPartitions, KafkaConsumer consumer) {
Long delayCount = 0L;
Map<TopicPartition, Long> checkpointOffsets =
CheckpointUtils.strToOffsets(lastCheckpointStr.get());
@@ -424,16 +448,15 @@ public class KafkaOffsetGen {
Map<TopicPartition, Long> earliestOffsets =
consumer.beginningOffsets(topicPartitions);
Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp =
consumer.offsetsForTimes(topicPartitionsTimestamp);
- StringBuilder sb = new StringBuilder();
- sb.append(topicName + ",");
+ StringBuilder sb = new StringBuilder(topicName);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> map :
offsetAndTimestamp.entrySet()) {
if (map.getValue() != null) {
-
sb.append(map.getKey().partition()).append(":").append(map.getValue().offset()).append(",");
+
sb.append(",").append(map.getKey().partition()).append(":").append(map.getValue().offset());
} else {
-
sb.append(map.getKey().partition()).append(":").append(earliestOffsets.get(map.getKey())).append(",");
+
sb.append(",").append(map.getKey().partition()).append(":").append(earliestOffsets.get(map.getKey()));
}
}
- return Option.of(sb.deleteCharAt(sb.length() - 1).toString());
+ return Option.of(sb.toString());
}
/**
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
index ba85f04ebcb..db8bcab42b0 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
@@ -21,6 +21,7 @@ package org.apache.hudi.utilities.sources.helpers;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
@@ -37,6 +38,9 @@ import org.junit.jupiter.api.Test;
import java.util.UUID;
+import static
org.apache.hudi.utilities.config.KafkaSourceConfig.KAFKA_CHECKPOINT_TYPE_SINGLE_OFFSET;
+import static
org.apache.hudi.utilities.config.KafkaSourceConfig.KAFKA_CHECKPOINT_TYPE_STRING;
+import static
org.apache.hudi.utilities.config.KafkaSourceConfig.KAFKA_CHECKPOINT_TYPE_TIMESTAMP;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -81,7 +85,7 @@ public class TestKafkaOffsetGen {
testUtils.createTopic(testTopicName, 1);
testUtils.sendMessages(testTopicName,
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
- KafkaOffsetGen kafkaOffsetGen = new
KafkaOffsetGen(getConsumerConfigs("earliest", "string"));
+ KafkaOffsetGen kafkaOffsetGen = new
KafkaOffsetGen(getConsumerConfigs("earliest", KAFKA_CHECKPOINT_TYPE_STRING));
OffsetRange[] nextOffsetRanges =
kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 500, metrics);
assertEquals(1, nextOffsetRanges.length);
assertEquals(0, nextOffsetRanges[0].fromOffset());
@@ -98,7 +102,7 @@ public class TestKafkaOffsetGen {
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
testUtils.createTopic(testTopicName, 1);
testUtils.sendMessages(testTopicName,
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
- KafkaOffsetGen kafkaOffsetGen = new
KafkaOffsetGen(getConsumerConfigs("latest", "string"));
+ KafkaOffsetGen kafkaOffsetGen = new
KafkaOffsetGen(getConsumerConfigs("latest", KAFKA_CHECKPOINT_TYPE_STRING));
OffsetRange[] nextOffsetRanges =
kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 500, metrics);
assertEquals(1, nextOffsetRanges.length);
assertEquals(1000, nextOffsetRanges[0].fromOffset());
@@ -111,7 +115,7 @@ public class TestKafkaOffsetGen {
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
testUtils.createTopic(testTopicName, 1);
testUtils.sendMessages(testTopicName,
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
- KafkaOffsetGen kafkaOffsetGen = new
KafkaOffsetGen(getConsumerConfigs("latest", "string"));
+ KafkaOffsetGen kafkaOffsetGen = new
KafkaOffsetGen(getConsumerConfigs("latest", KAFKA_CHECKPOINT_TYPE_STRING));
OffsetRange[] nextOffsetRanges =
kafkaOffsetGen.getNextOffsetRanges(Option.of(lastCheckpointString), 500,
metrics);
assertEquals(1, nextOffsetRanges.length);
@@ -125,7 +129,7 @@ public class TestKafkaOffsetGen {
testUtils.createTopic(testTopicName, 1);
testUtils.sendMessages(testTopicName,
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
- KafkaOffsetGen kafkaOffsetGen = new
KafkaOffsetGen(getConsumerConfigs("latest", "timestamp"));
+ KafkaOffsetGen kafkaOffsetGen = new
KafkaOffsetGen(getConsumerConfigs("latest", KAFKA_CHECKPOINT_TYPE_TIMESTAMP));
OffsetRange[] nextOffsetRanges =
kafkaOffsetGen.getNextOffsetRanges(Option.of(String.valueOf(System.currentTimeMillis()
- 100000)), 500, metrics);
assertEquals(1, nextOffsetRanges.length);
@@ -133,12 +137,54 @@ public class TestKafkaOffsetGen {
assertEquals(500, nextOffsetRanges[0].untilOffset());
}
+ @Test
+ public void testGetNextOffsetRangesFromSingleOffsetCheckpoint() {
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+ testUtils.createTopic(testTopicName, 1);
+ testUtils.sendMessages(testTopicName,
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+ KafkaOffsetGen kafkaOffsetGen = new
KafkaOffsetGen(getConsumerConfigs("latest",
KAFKA_CHECKPOINT_TYPE_SINGLE_OFFSET));
+
+ // long positive value of offset => get it
+ String lastCheckpointString = "250";
+ OffsetRange[] nextOffsetRanges =
kafkaOffsetGen.getNextOffsetRanges(Option.of(lastCheckpointString), 500,
metrics);
+ assertEquals(1, nextOffsetRanges.length);
+ assertEquals(250, nextOffsetRanges[0].fromOffset());
+ assertEquals(750, nextOffsetRanges[0].untilOffset());
+
+ // negative offset value => get by autoOffsetReset config
+ lastCheckpointString = "-2";
+ nextOffsetRanges =
kafkaOffsetGen.getNextOffsetRanges(Option.of(lastCheckpointString), 500,
metrics);
+ assertEquals(1, nextOffsetRanges.length);
+ assertEquals(1000, nextOffsetRanges[0].fromOffset());
+ assertEquals(1000, nextOffsetRanges[0].untilOffset());
+
+ // incorrect offset value => get by autoOffsetReset config
+ kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("earliest",
KAFKA_CHECKPOINT_TYPE_SINGLE_OFFSET));
+ lastCheckpointString = "garbage";
+ nextOffsetRanges =
kafkaOffsetGen.getNextOffsetRanges(Option.of(lastCheckpointString), 5000,
metrics);
+ assertEquals(1, nextOffsetRanges.length);
+ assertEquals(0, nextOffsetRanges[0].fromOffset());
+ assertEquals(1000, nextOffsetRanges[0].untilOffset());
+ }
+
+ @Test
+ public void testGetNextOffsetRangesFromSingleOffsetCheckpointNotApplicable()
{
+ testUtils.createTopic(testTopicName, 2);
+ KafkaOffsetGen kafkaOffsetGen = new
KafkaOffsetGen(getConsumerConfigs("latest",
KAFKA_CHECKPOINT_TYPE_SINGLE_OFFSET));
+
+ // incorrect number of partitions => exception (number of partitions is
more than 1)
+ String lastCheckpointString = "250";
+ Exception exception = assertThrows(HoodieException.class,
+ () ->
kafkaOffsetGen.getNextOffsetRanges(Option.of(lastCheckpointString), 500,
metrics));
+ assertTrue(exception.getMessage().startsWith("Kafka topic " +
testTopicName + " has 2 partitions (more than 1)"));
+ }
+
@Test
public void testGetNextOffsetRangesFromMultiplePartitions() {
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
testUtils.createTopic(testTopicName, 2);
testUtils.sendMessages(testTopicName,
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
- KafkaOffsetGen kafkaOffsetGen = new
KafkaOffsetGen(getConsumerConfigs("earliest", "string"));
+ KafkaOffsetGen kafkaOffsetGen = new
KafkaOffsetGen(getConsumerConfigs("earliest", KAFKA_CHECKPOINT_TYPE_STRING));
OffsetRange[] nextOffsetRanges =
kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 499, metrics);
assertEquals(3, nextOffsetRanges.length);
assertEquals(0, nextOffsetRanges[0].fromOffset());
@@ -154,7 +200,7 @@ public class TestKafkaOffsetGen {
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
testUtils.createTopic(testTopicName, 2);
testUtils.sendMessages(testTopicName,
Helpers.jsonifyRecordsByPartitions(dataGenerator.generateInserts("000", 1000),
2));
- KafkaOffsetGen kafkaOffsetGen = new
KafkaOffsetGen(getConsumerConfigs("group", "string"));
+ KafkaOffsetGen kafkaOffsetGen = new
KafkaOffsetGen(getConsumerConfigs("group", KAFKA_CHECKPOINT_TYPE_STRING));
String lastCheckpointString = testTopicName + ",0:250,1:249";
kafkaOffsetGen.commitOffsetToKafka(lastCheckpointString);
// don't pass lastCheckpointString as we want to read from group committed
offset
@@ -191,7 +237,7 @@ public class TestKafkaOffsetGen {
assertEquals(1, nextOffsetRanges[1].partition());
// committed offsets are not present for the consumer group
- kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("group", "string"));
+ kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("group",
KAFKA_CHECKPOINT_TYPE_STRING));
nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300,
metrics);
assertEquals(500, nextOffsetRanges[0].fromOffset());
assertEquals(500, nextOffsetRanges[0].untilOffset());
@@ -204,7 +250,7 @@ public class TestKafkaOffsetGen {
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
testUtils.createTopic(testTopicName, 1);
testUtils.sendMessages(testTopicName,
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
- TypedProperties props = getConsumerConfigs("earliest", "string");
+ TypedProperties props = getConsumerConfigs("earliest",
KAFKA_CHECKPOINT_TYPE_STRING);
// default no minPartition set
KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(props);
@@ -226,7 +272,7 @@ public class TestKafkaOffsetGen {
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
testUtils.createTopic(testTopicName, 2);
testUtils.sendMessages(testTopicName,
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
- TypedProperties props = getConsumerConfigs("earliest", "string");
+ TypedProperties props = getConsumerConfigs("earliest",
KAFKA_CHECKPOINT_TYPE_STRING);
// default no minPartition or minPartition less than TopicPartitions
KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(props);
@@ -271,7 +317,7 @@ public class TestKafkaOffsetGen {
@Test
public void testCheckTopicExists() {
- TypedProperties props = getConsumerConfigs("latest", "string");
+ TypedProperties props = getConsumerConfigs("latest",
KAFKA_CHECKPOINT_TYPE_STRING);
KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(props);
testUtils.createTopic(testTopicName, 1);
boolean topicExists = kafkaOffsetGen.checkTopicExists(new
KafkaConsumer(props));