This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new af837d2 [HUDI-1447] DeltaStreamer kafka source supports consuming
from specified timestamp (#2438)
af837d2 is described below
commit af837d2f1825d14ae8403b2290cf5eab39780343
Author: liujinhui <[email protected]>
AuthorDate: Sat Jul 17 12:31:06 2021 +0800
[HUDI-1447] DeltaStreamer kafka source supports consuming from specified
timestamp (#2438)
---
.../hudi/utilities/deltastreamer/DeltaSync.java | 15 ++-
.../hudi/utilities/sources/AvroKafkaSource.java | 5 +-
.../hudi/utilities/sources/JsonKafkaSource.java | 5 +-
.../utilities/sources/helpers/KafkaOffsetGen.java | 103 ++++++++++++++++++---
.../functional/TestHoodieDeltaStreamer.java | 56 ++++++++---
.../hudi/utilities/sources/TestKafkaSource.java | 15 +--
.../sources/helpers/TestKafkaOffsetGen.java | 35 +++++--
7 files changed, 172 insertions(+), 62 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 7742e8e..9d445dc 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -38,6 +38,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
@@ -59,6 +60,7 @@ import
org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SchemaSet;
import org.apache.hudi.utilities.sources.InputBatch;
+import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.transform.Transformer;
import com.codahale.metrics.Timer;
@@ -318,13 +320,12 @@ public class DeltaSync implements Serializable {
if (lastCommit.isPresent()) {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get(),
HoodieCommitMetadata.class);
- if (cfg.checkpoint != null &&
!cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
+ if (cfg.checkpoint != null &&
(StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))
+ ||
!cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)))) {
resumeCheckpointStr = Option.of(cfg.checkpoint);
- } else if (commitMetadata.getMetadata(CHECKPOINT_KEY) != null) {
+ } else if
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_KEY))) {
//if previous checkpoint is an empty string, skip resume use
Option.empty()
- if (!commitMetadata.getMetadata(CHECKPOINT_KEY).isEmpty()) {
- resumeCheckpointStr =
Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
- }
+ resumeCheckpointStr =
Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
} else if (commitMetadata.getOperationType() ==
WriteOperationType.CLUSTER) {
// incase of CLUSTER commit, no checkpoint will be available in
metadata.
resumeCheckpointStr = Option.empty();
@@ -336,6 +337,10 @@ public class DeltaSync implements Serializable {
+
commitTimelineOpt.get().getInstants().collect(Collectors.toList()) + ",
CommitMetadata="
+ commitMetadata.toJsonString());
}
+ // KAFKA_CHECKPOINT_TYPE will be honored only for first batch.
+ if
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
+ props.remove(KafkaOffsetGen.Config.KAFKA_CHECKPOINT_TYPE.key());
+ }
}
} else {
String partitionColumns =
HoodieWriterUtils.getPartitionColumns(keyGenerator);
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
index 4cea13d..500c412 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
@@ -40,9 +40,6 @@ import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;
-import static
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET;
-import static
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET;
-
/**
* Reads avro serialized Kafka data, based on the confluent schema-registry.
*/
@@ -104,7 +101,7 @@ public class AvroKafkaSource extends AvroSource {
@Override
public void onCommit(String lastCkptStr) {
- if (this.props.getBoolean(ENABLE_KAFKA_COMMIT_OFFSET,
DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET)) {
+ if
(this.props.getBoolean(KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.key(),
KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.defaultValue())) {
offsetGen.commitOffsetToKafka(lastCkptStr);
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index c1e2e3d..cf9e905 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -35,9 +35,6 @@ import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;
-import static
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET;
-import static
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET;
-
/**
* Read json kafka data.
*/
@@ -77,7 +74,7 @@ public class JsonKafkaSource extends JsonSource {
@Override
public void onCommit(String lastCkptStr) {
- if (this.props.getBoolean(ENABLE_KAFKA_COMMIT_OFFSET,
DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET)) {
+ if
(this.props.getBoolean(KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.key(),
KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.defaultValue())) {
offsetGen.commitOffsetToKafka(lastCkptStr);
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index 4378cb1..a7b983a 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -19,6 +19,7 @@
package org.apache.hudi.utilities.sources.helpers;
import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieDeltaStreamerException;
@@ -30,6 +31,7 @@ import org.apache.hudi.utilities.sources.AvroKafkaSource;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
@@ -46,6 +48,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -160,28 +163,48 @@ public class KafkaOffsetGen {
*/
public static class Config {
- private static final String KAFKA_TOPIC_NAME =
"hoodie.deltastreamer.source.kafka.topic";
- private static final String MAX_EVENTS_FROM_KAFKA_SOURCE_PROP =
"hoodie.deltastreamer.kafka.source.maxEvents";
- public static final String ENABLE_KAFKA_COMMIT_OFFSET =
"hoodie.deltastreamer.source.kafka.enable.commit.offset";
- public static final Boolean DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET = false;
+ private static final ConfigProperty<String> KAFKA_TOPIC_NAME =
ConfigProperty
+ .key("hoodie.deltastreamer.source.kafka.topic")
+ .noDefaultValue()
+ .withDocumentation("Kafka topic name.");
+
+ public static final ConfigProperty<String> KAFKA_CHECKPOINT_TYPE =
ConfigProperty
+ .key("hoodie.deltastreamer.source.kafka.checkpoint.type")
+ .defaultValue("string")
+ .withDocumentation("Kafka chepoint type.");
+
+ public static final ConfigProperty<Boolean> ENABLE_KAFKA_COMMIT_OFFSET =
ConfigProperty
+ .key("hoodie.deltastreamer.source.kafka.enable.commit.offset")
+ .defaultValue(false)
+ .withDocumentation("Automatically submits offset to kafka.");
+
+ public static final ConfigProperty<Long> MAX_EVENTS_FROM_KAFKA_SOURCE_PROP
= ConfigProperty
+ .key("hoodie.deltastreamer.kafka.source.maxEvents")
+ .defaultValue(5000000L)
+ .withDocumentation("Maximum number of records obtained in each
batch.");
+
// "auto.offset.reset" is kafka native config param. Do not change the
config param name.
- public static final String KAFKA_AUTO_OFFSET_RESET = "auto.offset.reset";
- private static final KafkaResetOffsetStrategies
DEFAULT_KAFKA_AUTO_OFFSET_RESET = KafkaResetOffsetStrategies.LATEST;
- public static final long DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = 5000000;
- public static long maxEventsFromKafkaSource =
DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE;
+ private static final ConfigProperty<KafkaResetOffsetStrategies>
KAFKA_AUTO_OFFSET_RESET = ConfigProperty
+ .key("auto.offset.reset")
+ .defaultValue(KafkaResetOffsetStrategies.LATEST)
+ .withDocumentation("Kafka consumer strategy for reading data.");
+
+ public static final String KAFKA_CHECKPOINT_TYPE_TIMESTAMP = "timestamp";
}
private final Map<String, Object> kafkaParams;
private final TypedProperties props;
protected final String topicName;
private KafkaResetOffsetStrategies autoResetValue;
+ private final String kafkaCheckpointType;
public KafkaOffsetGen(TypedProperties props) {
this.props = props;
kafkaParams = excludeHoodieConfigs(props);
- DataSourceUtils.checkRequiredProperties(props,
Collections.singletonList(Config.KAFKA_TOPIC_NAME));
- topicName = props.getString(Config.KAFKA_TOPIC_NAME);
- String kafkaAutoResetOffsetsStr =
props.getString(Config.KAFKA_AUTO_OFFSET_RESET,
Config.DEFAULT_KAFKA_AUTO_OFFSET_RESET.name().toLowerCase());
+ DataSourceUtils.checkRequiredProperties(props,
Collections.singletonList(Config.KAFKA_TOPIC_NAME.key()));
+ topicName = props.getString(Config.KAFKA_TOPIC_NAME.key());
+ kafkaCheckpointType = props.getString(Config.KAFKA_CHECKPOINT_TYPE.key(),
Config.KAFKA_CHECKPOINT_TYPE.defaultValue());
+ String kafkaAutoResetOffsetsStr =
props.getString(Config.KAFKA_AUTO_OFFSET_RESET.key(),
Config.KAFKA_AUTO_OFFSET_RESET.defaultValue().name().toLowerCase());
boolean found = false;
for (KafkaResetOffsetStrategies entry:
KafkaResetOffsetStrategies.values()) {
if (entry.name().toLowerCase().equals(kafkaAutoResetOffsetsStr)) {
@@ -194,7 +217,7 @@ public class KafkaOffsetGen {
throw new HoodieDeltaStreamerException(Config.KAFKA_AUTO_OFFSET_RESET +
" config set to unknown value " + kafkaAutoResetOffsetsStr);
}
if (autoResetValue.equals(KafkaResetOffsetStrategies.GROUP)) {
- this.kafkaParams.put(Config.KAFKA_AUTO_OFFSET_RESET,
Config.DEFAULT_KAFKA_AUTO_OFFSET_RESET.name().toLowerCase());
+ this.kafkaParams.put(Config.KAFKA_AUTO_OFFSET_RESET.key(),
Config.KAFKA_AUTO_OFFSET_RESET.defaultValue().name().toLowerCase());
}
}
@@ -212,6 +235,9 @@ public class KafkaOffsetGen {
Set<TopicPartition> topicPartitions = partitionInfoList.stream()
.map(x -> new TopicPartition(x.topic(),
x.partition())).collect(Collectors.toSet());
+ if (Config.KAFKA_CHECKPOINT_TYPE_TIMESTAMP.equals(kafkaCheckpointType)
&& isValidTimestampCheckpointType(lastCheckpointStr)) {
+ lastCheckpointStr = getOffsetsByTimestamp(consumer, partitionInfoList,
topicPartitions, topicName, Long.parseLong(lastCheckpointStr.get()));
+ }
// Determine the offset ranges to read from
if (lastCheckpointStr.isPresent() && !lastCheckpointStr.get().isEmpty()
&& checkTopicCheckpoint(lastCheckpointStr)) {
fromOffsets = fetchValidOffsets(consumer, lastCheckpointStr,
topicPartitions);
@@ -237,8 +263,8 @@ public class KafkaOffsetGen {
}
// Come up with final set of OffsetRanges to read (account for new
partitions, limit number of events)
- long maxEventsToReadFromKafka =
props.getLong(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP,
- Config.maxEventsFromKafkaSource);
+ long maxEventsToReadFromKafka =
props.getLong(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP.key(),
+ Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP.defaultValue());
long numEvents;
if (sourceLimit == Long.MAX_VALUE) {
@@ -271,6 +297,20 @@ public class KafkaOffsetGen {
return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
}
+ /**
+ * Check if the checkpoint is a timestamp.
+ * @param lastCheckpointStr
+ * @return
+ */
+ private Boolean isValidTimestampCheckpointType(Option<String>
lastCheckpointStr) {
+ if (!lastCheckpointStr.isPresent()) {
+ return false;
+ }
+ Pattern pattern = Pattern.compile("[-+]?[0-9]+(\\.[0-9]+)?");
+ Matcher isNum = pattern.matcher(lastCheckpointStr.get());
+ return isNum.matches() && (lastCheckpointStr.get().length() == 13 ||
lastCheckpointStr.get().length() == 10);
+ }
+
private Long delayOffsetCalculation(Option<String> lastCheckpointStr,
Set<TopicPartition> topicPartitions, KafkaConsumer consumer) {
Long delayCount = 0L;
Map<TopicPartition, Long> checkpointOffsets =
CheckpointUtils.strToOffsets(lastCheckpointStr.get());
@@ -284,6 +324,41 @@ public class KafkaOffsetGen {
}
/**
+ * Get the checkpoint by timestamp.
+ * This method returns the checkpoint format based on the timestamp.
+ * example:
+ * 1. input: timestamp, etc.
+ * 2. output:
topicName,partition_num_0:100,partition_num_1:101,partition_num_2:102.
+ *
+ * @param consumer
+ * @param topicName
+ * @param timestamp
+ * @return
+ */
+ private Option<String> getOffsetsByTimestamp(KafkaConsumer consumer,
List<PartitionInfo> partitionInfoList, Set<TopicPartition> topicPartitions,
+ String topicName, Long
timestamp) {
+
+ Map<TopicPartition, Long> topicPartitionsTimestamp =
partitionInfoList.stream()
+ .map(x -> new
TopicPartition(x.topic(), x.partition()))
+
.collect(Collectors.toMap(Function.identity(), x -> timestamp));
+
+ Map<TopicPartition, Long> earliestOffsets =
consumer.beginningOffsets(topicPartitions);
+ Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp =
consumer.offsetsForTimes(topicPartitionsTimestamp);
+
+ StringBuilder sb = new StringBuilder();
+ sb.append(topicName + ",");
+ for (Map.Entry<TopicPartition, OffsetAndTimestamp> map :
offsetAndTimestamp.entrySet()) {
+ if (map.getValue() != null) {
+
sb.append(map.getKey().partition()).append(":").append(map.getValue().offset()).append(",");
+ } else {
+
sb.append(map.getKey().partition()).append(":").append(earliestOffsets.get(map.getKey())).append(",");
+ }
+ }
+ return Option.of(sb.deleteCharAt(sb.length() - 1).toString());
+ }
+
+
+ /**
* Check if topic exists.
* @param consumer kafka consumer
* @return
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index 642c666..db0ab19 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -58,7 +58,6 @@ import org.apache.hudi.utilities.sources.JdbcSource;
import org.apache.hudi.utilities.sources.JsonKafkaSource;
import org.apache.hudi.utilities.sources.ParquetDFSSource;
import org.apache.hudi.utilities.sources.TestDataSource;
-import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config;
import org.apache.hudi.utilities.testutils.JdbcTestUtils;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.hudi.utilities.testutils.sources.DistributedTestDataSource;
@@ -139,6 +138,7 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
private static final int PARQUET_NUM_RECORDS = 5;
private static final int CSV_NUM_RECORDS = 3;
private static final int JSON_KAFKA_NUM_RECORDS = 5;
+ private String kafkaCheckpointType = "string";
// Required fields
private static final String TGT_BASE_PATH_PARAM = "--target-base-path";
private static final String TGT_BASE_PATH_VALUE = "s3://mybucket/blah";
@@ -274,7 +274,7 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
protected static void populateCommonKafkaProps(TypedProperties props) {
//Kafka source properties
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
- props.setProperty(Config.KAFKA_AUTO_OFFSET_RESET, "earliest");
+ props.setProperty("auto.offset.reset", "earliest");
props.setProperty("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
String.valueOf(5000));
@@ -360,12 +360,13 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
String propsFilename, boolean
enableHiveSync, boolean useSchemaProviderClass, boolean updatePayloadClass,
String payloadClassName,
String tableType) {
return makeConfig(basePath, op, TestDataSource.class.getName(),
transformerClassNames, propsFilename, enableHiveSync,
- useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName,
tableType, "timestamp");
+ useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName,
tableType, "timestamp", null);
}
static HoodieDeltaStreamer.Config makeConfig(String basePath,
WriteOperationType op, String sourceClassName,
List<String>
transformerClassNames, String propsFilename, boolean enableHiveSync, boolean
useSchemaProviderClass,
- int sourceLimit, boolean
updatePayloadClass, String payloadClassName, String tableType, String
sourceOrderingField) {
+ int sourceLimit, boolean
updatePayloadClass, String payloadClassName, String tableType, String
sourceOrderingField,
+ String checkpoint) {
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
cfg.targetBasePath = basePath;
cfg.targetTableName = "hoodie_trips";
@@ -377,6 +378,7 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
cfg.sourceOrderingField = sourceOrderingField;
cfg.propsFilePath = dfsBasePath + "/" + propsFilename;
cfg.sourceLimit = sourceLimit;
+ cfg.checkpoint = checkpoint;
if (updatePayloadClass) {
cfg.payloadClassName = payloadClassName;
}
@@ -601,7 +603,7 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
Arguments.of(allConfig, conf)
);
}
-
+
@ParameterizedTest
@MethodSource("provideValidCliArgs")
public void testValidCommandLineArgs(String[] args,
HoodieDeltaStreamer.Config expected) {
@@ -1399,7 +1401,7 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT,
ParquetDFSSource.class.getName(),
transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false,
- useSchemaProvider, 100000, false, null, null, "timestamp"), jsc);
+ useSchemaProvider, 100000, false, null, null, "timestamp", null),
jsc);
deltaStreamer.sync();
TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath +
"/*/*.parquet", sqlContext);
testNum++;
@@ -1414,10 +1416,11 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
props.setProperty("hoodie.datasource.write.partitionpath.field",
"not_there");
props.setProperty("hoodie.deltastreamer.source.dfs.root",
JSON_KAFKA_SOURCE_ROOT);
- props.setProperty("hoodie.deltastreamer.source.kafka.topic",topicName);
+ props.setProperty("hoodie.deltastreamer.source.kafka.topic", topicName);
+ props.setProperty("hoodie.deltastreamer.source.kafka.checkpoint.type",
kafkaCheckpointType);
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/source_uber.avsc");
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
dfsBasePath + "/target_uber.avsc");
- props.setProperty(Config.KAFKA_AUTO_OFFSET_RESET, autoResetValue);
+ props.setProperty("auto.offset.reset", autoResetValue);
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" +
propsFileName);
}
@@ -1440,7 +1443,7 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT,
ParquetDFSSource.class.getName(),
Collections.EMPTY_LIST, PROPS_FILENAME_TEST_PARQUET, false,
- false, 100000, false, null, null, "timestamp"), jsc);
+ false, 100000, false, null, null, "timestamp", null), jsc);
deltaStreamer.sync();
TestHelpers.assertRecordCount(parquetRecords, tableBasePath +
"/*/*.parquet", sqlContext);
deltaStreamer.shutdownGracefully();
@@ -1453,7 +1456,7 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
JsonKafkaSource.class.getName(),
Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false,
- true, 100000, false, null, null, "timestamp"), jsc);
+ true, 100000, false, null, null, "timestamp", null), jsc);
deltaStreamer.sync();
// if auto reset value is set to LATEST, this all kafka records so far may
not be synced.
int totalExpectedRecords = parquetRecords + ((autoResetToLatest) ? 0 :
JSON_KAFKA_NUM_RECORDS);
@@ -1471,12 +1474,12 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
public void testJsonKafkaDFSSource() throws Exception {
topicName = "topic" + testNum;
prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, true, topicName);
- prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA,
"earliest",topicName);
+ prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest",
topicName);
String tableBasePath = dfsBasePath + "/test_json_kafka_table" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
JsonKafkaSource.class.getName(),
Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false,
- true, 100000, false, null, null, "timestamp"), jsc);
+ true, 100000, false, null, null, "timestamp", null), jsc);
deltaStreamer.sync();
TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath +
"/*/*.parquet", sqlContext);
@@ -1489,6 +1492,31 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
}
@Test
+ public void testKafkaTimestampType() throws Exception {
+ topicName = "topic" + testNum;
+ kafkaCheckpointType = "timestamp";
+ prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, true, topicName);
+ prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest",
topicName);
+ String tableBasePath = dfsBasePath + "/test_json_kafka_table" + testNum;
+ HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+ TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
JsonKafkaSource.class.getName(),
+ Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA,
false,
+ true, 100000, false, null,
+ null, "timestamp",
String.valueOf(System.currentTimeMillis())), jsc);
+ deltaStreamer.sync();
+ TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath +
"/*/*.parquet", sqlContext);
+
+ prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, false, topicName);
+ deltaStreamer = new HoodieDeltaStreamer(
+ TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
JsonKafkaSource.class.getName(),
+ Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA,
false,
+ true, 100000, false, null, null,
+ "timestamp", String.valueOf(System.currentTimeMillis())),
jsc);
+ deltaStreamer.sync();
+ TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS * 2, tableBasePath +
"/*/*.parquet", sqlContext);
+ }
+
+ @Test
public void testParquetSourceToKafkaSourceEarliestAutoResetValue() throws
Exception {
testDeltaStreamerTransitionFromParquetToKafkaSource(false);
}
@@ -1566,7 +1594,7 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
new HoodieDeltaStreamer(TestHelpers.makeConfig(
tableBasePath, WriteOperationType.INSERT,
CsvDFSSource.class.getName(),
transformerClassNames, PROPS_FILENAME_TEST_CSV, false,
- useSchemaProvider, 1000, false, null, null, sourceOrderingField),
jsc);
+ useSchemaProvider, 1000, false, null, null, sourceOrderingField,
null), jsc);
deltaStreamer.sync();
TestHelpers.assertRecordCount(CSV_NUM_RECORDS, tableBasePath +
"/*/*.parquet", sqlContext);
testNum++;
@@ -1679,7 +1707,7 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
String tableBasePath = dfsBasePath + "/triprec";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.INSERT, JdbcSource.class.getName(),
null, "test-jdbc-source.properties", false,
- false, sourceLimit, false, null, null, "timestamp");
+ false, sourceLimit, false, null, null, "timestamp", null);
cfg.continuousMode = true;
// Add 1000 records
JdbcTestUtils.clearAndInsert("000", numRecords, connection, new
HoodieTestDataGenerator(), props);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
index a1a00fa..aa25446 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
@@ -94,11 +94,11 @@ public class TestKafkaSource extends UtilitiesTestBase {
TypedProperties props = new TypedProperties();
props.setProperty("hoodie.deltastreamer.source.kafka.topic",
TEST_TOPIC_NAME);
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
- props.setProperty(Config.KAFKA_AUTO_OFFSET_RESET, resetStrategy);
+ props.setProperty("auto.offset.reset", resetStrategy);
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
maxEventsToReadFromKafkaSource != null ?
String.valueOf(maxEventsToReadFromKafkaSource) :
- String.valueOf(Config.maxEventsFromKafkaSource));
+
String.valueOf(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP.defaultValue()));
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
UUID.randomUUID().toString());
return props;
}
@@ -193,7 +193,6 @@ public class TestKafkaSource extends UtilitiesTestBase {
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession,
schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
- Config.maxEventsFromKafkaSource = 500;
/*
1. Extract without any checkpoint => get all the data, respecting default
upper cap since both sourceLimit and
@@ -208,9 +207,6 @@ public class TestKafkaSource extends UtilitiesTestBase {
InputBatch<Dataset<Row>> fetch2 =
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()),
1500);
assertEquals(1000, fetch2.getBatch().get().count());
-
- //reset the value back since it is a static variable
- Config.maxEventsFromKafkaSource =
Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE;
}
@Test
@@ -222,7 +218,7 @@ public class TestKafkaSource extends UtilitiesTestBase {
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession,
schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
- Config.maxEventsFromKafkaSource = 500;
+ props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", "500");
/*
1. maxEventsFromKafkaSourceProp set to more than generated insert records
@@ -240,9 +236,6 @@ public class TestKafkaSource extends UtilitiesTestBase {
InputBatch<Dataset<Row>> fetch2 =
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()),
300);
assertEquals(300, fetch2.getBatch().get().count());
-
- //reset the value back since it is a static variable
- Config.maxEventsFromKafkaSource =
Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE;
}
@Test
@@ -300,7 +293,7 @@ public class TestKafkaSource extends UtilitiesTestBase {
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties props = createPropsForJsonSource(null, "earliest");
- props.put(ENABLE_KAFKA_COMMIT_OFFSET, "true");
+ props.put(ENABLE_KAFKA_COMMIT_OFFSET.key(), "true");
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession,
schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
index ccc141b..eff9b24 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
@@ -23,8 +23,8 @@ import
org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
-import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers;
+
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -62,9 +62,10 @@ public class TestKafkaOffsetGen {
testUtils.teardown();
}
- private TypedProperties getConsumerConfigs(String autoOffsetReset) {
+ private TypedProperties getConsumerConfigs(String autoOffsetReset, String
kafkaCheckpointType) {
TypedProperties props = new TypedProperties();
- props.put(Config.KAFKA_AUTO_OFFSET_RESET, autoOffsetReset);
+ props.put("hoodie.deltastreamer.source.kafka.checkpoint.type",
kafkaCheckpointType);
+ props.put("auto.offset.reset", autoOffsetReset);
props.put("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME);
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
props.setProperty("key.deserializer", StringDeserializer.class.getName());
@@ -79,7 +80,7 @@ public class TestKafkaOffsetGen {
testUtils.createTopic(TEST_TOPIC_NAME, 1);
testUtils.sendMessages(TEST_TOPIC_NAME,
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
- KafkaOffsetGen kafkaOffsetGen = new
KafkaOffsetGen(getConsumerConfigs("earliest"));
+ KafkaOffsetGen kafkaOffsetGen = new
KafkaOffsetGen(getConsumerConfigs("earliest", "string"));
OffsetRange[] nextOffsetRanges =
kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 500, metrics);
assertEquals(1, nextOffsetRanges.length);
assertEquals(0, nextOffsetRanges[0].fromOffset());
@@ -96,7 +97,7 @@ public class TestKafkaOffsetGen {
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
testUtils.createTopic(TEST_TOPIC_NAME, 1);
testUtils.sendMessages(TEST_TOPIC_NAME,
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
- KafkaOffsetGen kafkaOffsetGen = new
KafkaOffsetGen(getConsumerConfigs("latest"));
+ KafkaOffsetGen kafkaOffsetGen = new
KafkaOffsetGen(getConsumerConfigs("latest", "string"));
OffsetRange[] nextOffsetRanges =
kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 500, metrics);
assertEquals(1, nextOffsetRanges.length);
assertEquals(1000, nextOffsetRanges[0].fromOffset());
@@ -109,7 +110,7 @@ public class TestKafkaOffsetGen {
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
testUtils.createTopic(TEST_TOPIC_NAME, 1);
testUtils.sendMessages(TEST_TOPIC_NAME,
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
- KafkaOffsetGen kafkaOffsetGen = new
KafkaOffsetGen(getConsumerConfigs("latest"));
+ KafkaOffsetGen kafkaOffsetGen = new
KafkaOffsetGen(getConsumerConfigs("latest", "string"));
OffsetRange[] nextOffsetRanges =
kafkaOffsetGen.getNextOffsetRanges(Option.of(lastCheckpointString), 500,
metrics);
assertEquals(1, nextOffsetRanges.length);
@@ -118,11 +119,25 @@ public class TestKafkaOffsetGen {
}
@Test
+ public void testGetNextOffsetRangesFromTimestampCheckpointType() {
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+ testUtils.createTopic(TEST_TOPIC_NAME, 1);
+ testUtils.sendMessages(TEST_TOPIC_NAME,
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+
+ KafkaOffsetGen kafkaOffsetGen = new
KafkaOffsetGen(getConsumerConfigs("latest", "timestamp"));
+
+ OffsetRange[] nextOffsetRanges =
kafkaOffsetGen.getNextOffsetRanges(Option.of(String.valueOf(System.currentTimeMillis()
- 100000)), 500, metrics);
+ assertEquals(1, nextOffsetRanges.length);
+ assertEquals(0, nextOffsetRanges[0].fromOffset());
+ assertEquals(500, nextOffsetRanges[0].untilOffset());
+ }
+
+ @Test
public void testGetNextOffsetRangesFromMultiplePartitions() {
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
testUtils.createTopic(TEST_TOPIC_NAME, 2);
testUtils.sendMessages(TEST_TOPIC_NAME,
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
- KafkaOffsetGen kafkaOffsetGen = new
KafkaOffsetGen(getConsumerConfigs("earliest"));
+ KafkaOffsetGen kafkaOffsetGen = new
KafkaOffsetGen(getConsumerConfigs("earliest", "string"));
OffsetRange[] nextOffsetRanges =
kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 499, metrics);
assertEquals(2, nextOffsetRanges.length);
assertEquals(0, nextOffsetRanges[0].fromOffset());
@@ -136,7 +151,7 @@ public class TestKafkaOffsetGen {
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
testUtils.createTopic(TEST_TOPIC_NAME, 2);
testUtils.sendMessages(TEST_TOPIC_NAME,
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
- KafkaOffsetGen kafkaOffsetGen = new
KafkaOffsetGen(getConsumerConfigs("group"));
+ KafkaOffsetGen kafkaOffsetGen = new
KafkaOffsetGen(getConsumerConfigs("group", "string"));
String lastCheckpointString = TEST_TOPIC_NAME + ",0:250,1:249";
kafkaOffsetGen.commitOffsetToKafka(lastCheckpointString);
// don't pass lastCheckpointString as we want to read from group committed
offset
@@ -147,7 +162,7 @@ public class TestKafkaOffsetGen {
assertEquals(399, nextOffsetRanges[1].untilOffset());
// committed offsets are not present for the consumer group
- kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("group"));
+ kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("group", "string"));
nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300,
metrics);
assertEquals(500, nextOffsetRanges[0].fromOffset());
assertEquals(500, nextOffsetRanges[0].untilOffset());
@@ -157,7 +172,7 @@ public class TestKafkaOffsetGen {
@Test
public void testCheckTopicExists() {
- TypedProperties props = getConsumerConfigs("latest");
+ TypedProperties props = getConsumerConfigs("latest", "string");
KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(props);
testUtils.createTopic(TEST_TOPIC_NAME, 1);
boolean topicExists = kafkaOffsetGen.checkTopicExists(new
KafkaConsumer(props));