This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 93cd25fded8 [HUDI-7416] Add interface for StreamProfile to be used in
StreamSync for reading and writing data (#10687)
93cd25fded8 is described below
commit 93cd25fded8b0225ddfc54a49cc40fc5e4ad740c
Author: Vinish Reddy <[email protected]>
AuthorDate: Thu Feb 22 21:25:39 2024 +0530
[HUDI-7416] Add interface for StreamProfile to be used in StreamSync for
reading and writing data (#10687)
* Add interface for StreamProfile to be used in StreamSync for reading and
writing data
* Tidying up
* Add StreamContext class
* Add java doc
* Address comments
* Fix build errors
* Add annotations for SourceProfileSupplier
---
.../org/apache/hudi/utilities/UtilHelpers.java | 19 ++++++++
.../hudi/utilities/deltastreamer/DeltaSync.java | 4 +-
.../hudi/utilities/sources/AvroKafkaSource.java | 10 ++--
.../hudi/utilities/sources/JsonKafkaSource.java | 13 ++++--
.../apache/hudi/utilities/sources/KafkaSource.java | 39 +++++++++++-----
.../hudi/utilities/sources/ProtoKafkaSource.java | 13 ++++--
.../org/apache/hudi/utilities/sources/Source.java | 11 ++++-
.../utilities/sources/helpers/KafkaOffsetGen.java | 39 ++++++++--------
.../utilities/streamer/DefaultStreamContext.java | 48 +++++++++++++++++++
.../hudi/utilities/streamer/HoodieStreamer.java | 21 ++++++---
.../hudi/utilities/streamer/SourceProfile.java | 54 ++++++++++++++++++++++
.../utilities/streamer/SourceProfileSupplier.java | 34 ++++++++++++++
.../hudi/utilities/streamer/StreamContext.java | 44 ++++++++++++++++++
.../apache/hudi/utilities/streamer/StreamSync.java | 10 ++--
.../utilities/sources/BaseTestKafkaSource.java | 51 ++++++++++++++++++++
.../utilities/sources/TestJsonKafkaSource.java | 17 +++++--
.../utilities/sources/TestProtoKafkaSource.java | 3 +-
17 files changed, 370 insertions(+), 60 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 18e92a8463c..18af52d334e 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -66,6 +66,7 @@ import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.Source;
import
org.apache.hudi.utilities.sources.processor.ChainedJsonKafkaSourcePostProcessor;
import
org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;
+import org.apache.hudi.utilities.streamer.StreamContext;
import org.apache.hudi.utilities.transform.ChainedTransformer;
import org.apache.hudi.utilities.transform.ErrorTableAwareChainedTransformer;
import org.apache.hudi.utilities.transform.Transformer;
@@ -156,6 +157,24 @@ public class UtilHelpers {
}
}
+ public static Source createSource(String sourceClass, TypedProperties cfg,
JavaSparkContext jssc,
+ SparkSession sparkSession,
HoodieIngestionMetrics metrics, StreamContext streamContext)
+ throws IOException {
+ try {
+ try {
+ return (Source) ReflectionUtils.loadClass(sourceClass,
+ new Class<?>[] {TypedProperties.class, JavaSparkContext.class,
+ SparkSession.class,
+ HoodieIngestionMetrics.class, streamContext.getClass()},
+ cfg, jssc, sparkSession, metrics, streamContext);
+ } catch (HoodieException e) {
+ return createSource(sourceClass, cfg, jssc, sparkSession,
streamContext.getSchemaProvider(), metrics);
+ }
+ } catch (Throwable e) {
+ throw new IOException("Could not load source class " + sourceClass, e);
+ }
+ }
+
public static JsonKafkaSourcePostProcessor
createJsonKafkaSourcePostProcessor(String postProcessorClassNames,
TypedProperties props) throws IOException {
if (StringUtils.isNullOrEmpty(postProcessorClassNames)) {
return null;
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index c794db32510..4002d1579bb 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -22,7 +22,9 @@ package org.apache.hudi.utilities.deltastreamer;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.HoodieStreamer;
import org.apache.hudi.utilities.streamer.StreamSync;
@@ -49,6 +51,6 @@ public class DeltaSync extends StreamSync {
public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession,
SchemaProvider schemaProvider,
TypedProperties props, HoodieSparkEngineContext
hoodieSparkContext, FileSystem fs, Configuration conf,
Function<SparkRDDWriteClient, Boolean>
onInitializingHoodieWriteClient) throws IOException {
- super(cfg, sparkSession, schemaProvider, props, hoodieSparkContext, fs,
conf, onInitializingHoodieWriteClient);
+ super(cfg, sparkSession, props, hoodieSparkContext, fs, conf,
onInitializingHoodieWriteClient, new DefaultStreamContext(schemaProvider,
Option.empty()));
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
index 2bf92280faf..46095590430 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
@@ -27,6 +27,8 @@ import
org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
+import org.apache.hudi.utilities.streamer.StreamContext;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -69,9 +71,11 @@ public class AvroKafkaSource extends
KafkaSource<GenericRecord> {
public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext,
SparkSession sparkSession,
SchemaProvider schemaProvider, HoodieIngestionMetrics
metrics) {
- super(props, sparkContext, sparkSession,
- UtilHelpers.getSchemaProviderForKafkaSource(schemaProvider, props,
sparkContext),
- SourceType.AVRO, metrics);
+ this(props, sparkContext, sparkSession, metrics, new
DefaultStreamContext(UtilHelpers.getSchemaProviderForKafkaSource(schemaProvider,
props, sparkContext), Option.empty()));
+ }
+
+ public AvroKafkaSource(TypedProperties properties, JavaSparkContext
sparkContext, SparkSession sparkSession, HoodieIngestionMetrics metrics,
StreamContext streamContext) {
+ super(properties, sparkContext, sparkSession, SourceType.AVRO, metrics,
streamContext);
this.originalSchemaProvider = schemaProvider;
props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP,
StringDeserializer.class.getName());
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index eb67abfee3a..0a609dde720 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -19,6 +19,7 @@
package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.config.JsonKafkaPostProcessorConfig;
@@ -27,6 +28,8 @@ import
org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import
org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
+import org.apache.hudi.utilities.streamer.StreamContext;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -44,10 +47,10 @@ import java.util.LinkedList;
import java.util.List;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
+import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN;
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
-import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN;
/**
* Read json kafka data.
@@ -56,9 +59,11 @@ public class JsonKafkaSource extends KafkaSource<String> {
public JsonKafkaSource(TypedProperties properties, JavaSparkContext
sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider, HoodieIngestionMetrics
metrics) {
- super(properties, sparkContext, sparkSession,
- UtilHelpers.getSchemaProviderForKafkaSource(schemaProvider,
properties, sparkContext),
- SourceType.JSON, metrics);
+ this(properties, sparkContext, sparkSession, metrics, new
DefaultStreamContext(UtilHelpers.getSchemaProviderForKafkaSource(schemaProvider,
properties, sparkContext), Option.empty()));
+ }
+
+ public JsonKafkaSource(TypedProperties properties, JavaSparkContext
sparkContext, SparkSession sparkSession, HoodieIngestionMetrics metrics,
StreamContext streamContext) {
+ super(properties, sparkContext, sparkSession, SourceType.JSON, metrics,
streamContext);
properties.put("key.deserializer", StringDeserializer.class.getName());
properties.put("value.deserializer", StringDeserializer.class.getName());
this.offsetGen = new KafkaOffsetGen(props);
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
index bb26d579582..52a6a1217cc 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
@@ -26,6 +26,8 @@ import
org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
+import org.apache.hudi.utilities.streamer.SourceProfile;
+import org.apache.hudi.utilities.streamer.StreamContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -50,9 +52,9 @@ abstract class KafkaSource<T> extends Source<JavaRDD<T>> {
protected final boolean shouldAddOffsets;
protected KafkaSource(TypedProperties props, JavaSparkContext sparkContext,
SparkSession sparkSession,
- SchemaProvider schemaProvider, SourceType sourceType,
HoodieIngestionMetrics metrics) {
- super(props, sparkContext, sparkSession, schemaProvider, sourceType);
- this.schemaProvider = schemaProvider;
+ SourceType sourceType, HoodieIngestionMetrics metrics,
StreamContext streamContext) {
+ super(props, sparkContext, sparkSession, sourceType, streamContext);
+ this.schemaProvider = streamContext.getSchemaProvider();
this.metrics = metrics;
this.shouldAddOffsets =
KafkaOffsetPostProcessor.Config.shouldAddOffsets(props);
}
@@ -60,21 +62,34 @@ abstract class KafkaSource<T> extends Source<JavaRDD<T>> {
@Override
protected InputBatch<JavaRDD<T>> fetchNewData(Option<String>
lastCheckpointStr, long sourceLimit) {
try {
- OffsetRange[] offsetRanges =
offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics);
- long totalNewMsgs =
KafkaOffsetGen.CheckpointUtils.totalNewMessages(offsetRanges);
- LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" +
offsetGen.getTopicName());
- if (totalNewMsgs <= 0) {
-
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KAFKA_MESSAGE_IN_COUNT,
0);
- return new InputBatch<>(Option.empty(),
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
+ OffsetRange[] offsetRanges;
+ if (sourceProfileSupplier.isPresent() &&
sourceProfileSupplier.get().getSourceProfile() != null) {
+ SourceProfile<Long> kafkaSourceProfile =
sourceProfileSupplier.get().getSourceProfile();
+ offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr,
kafkaSourceProfile.getSourceSpecificContext(),
kafkaSourceProfile.getSourcePartitions(), metrics);
+ LOG.info("About to read numEvents {} of size {} bytes in {} partitions
from Kafka for topic {} with offsetRanges {}",
+ kafkaSourceProfile.getSourceSpecificContext(),
kafkaSourceProfile.getMaxSourceBytes(),
+ kafkaSourceProfile.getSourcePartitions(),
offsetGen.getTopicName(), offsetRanges);
+ } else {
+ offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr,
sourceLimit, metrics);
}
-
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KAFKA_MESSAGE_IN_COUNT,
totalNewMsgs);
- JavaRDD<T> newDataRDD = toRDD(offsetRanges);
- return new InputBatch<>(Option.of(newDataRDD),
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
+ return toInputBatch(offsetRanges);
} catch (org.apache.kafka.common.errors.TimeoutException e) {
throw new HoodieSourceTimeoutException("Kafka Source timed out " +
e.getMessage());
}
}
+ private InputBatch<JavaRDD<T>> toInputBatch(OffsetRange[] offsetRanges) {
+ long totalNewMsgs =
KafkaOffsetGen.CheckpointUtils.totalNewMessages(offsetRanges);
+ LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" +
offsetGen.getTopicName());
+ if (totalNewMsgs <= 0) {
+
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KAFKA_MESSAGE_IN_COUNT,
0);
+ return new InputBatch<>(Option.empty(),
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
+ }
+
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KAFKA_MESSAGE_IN_COUNT,
totalNewMsgs);
+ JavaRDD<T> newDataRDD = toRDD(offsetRanges);
+ return new InputBatch<>(Option.of(newDataRDD),
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
+ }
+
abstract JavaRDD<T> toRDD(OffsetRange[] offsetRanges);
@Override
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
index 67927480454..208e591c8f1 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
@@ -19,12 +19,15 @@
package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
+import org.apache.hudi.utilities.streamer.StreamContext;
import com.google.protobuf.Message;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -51,9 +54,13 @@ public class ProtoKafkaSource extends KafkaSource<Message> {
private final String className;
- public ProtoKafkaSource(TypedProperties props, JavaSparkContext sparkContext,
- SparkSession sparkSession, SchemaProvider
schemaProvider, HoodieIngestionMetrics metrics) {
- super(props, sparkContext, sparkSession, schemaProvider, SourceType.PROTO,
metrics);
+ public ProtoKafkaSource(TypedProperties props, JavaSparkContext
sparkContext, SparkSession sparkSession,
+ SchemaProvider schemaProvider,
HoodieIngestionMetrics metrics) {
+ this(props, sparkContext, sparkSession, metrics, new
DefaultStreamContext(schemaProvider, Option.empty()));
+ }
+
+ public ProtoKafkaSource(TypedProperties properties, JavaSparkContext
sparkContext, SparkSession sparkSession, HoodieIngestionMetrics metrics,
StreamContext streamContext) {
+ super(properties, sparkContext, sparkSession, SourceType.PROTO, metrics,
streamContext);
checkRequiredConfigProperties(props, Collections.singletonList(
ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME));
props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class);
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
index cbc0722056b..dfb07c718a0 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
@@ -25,6 +25,9 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.callback.SourceCommitCallback;
import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
+import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
+import org.apache.hudi.utilities.streamer.StreamContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
@@ -44,6 +47,7 @@ public abstract class Source<T> implements
SourceCommitCallback, Serializable {
protected transient TypedProperties props;
protected transient JavaSparkContext sparkContext;
protected transient SparkSession sparkSession;
+ protected transient Option<SourceProfileSupplier> sourceProfileSupplier;
private transient SchemaProvider overriddenSchemaProvider;
private final SourceType sourceType;
@@ -55,11 +59,16 @@ public abstract class Source<T> implements
SourceCommitCallback, Serializable {
protected Source(TypedProperties props, JavaSparkContext sparkContext,
SparkSession sparkSession,
SchemaProvider schemaProvider, SourceType sourceType) {
+ this(props, sparkContext, sparkSession, sourceType, new
DefaultStreamContext(schemaProvider, Option.empty()));
+ }
+
+ protected Source(TypedProperties props, JavaSparkContext sparkContext,
SparkSession sparkSession, SourceType sourceType, StreamContext streamContext) {
this.props = props;
this.sparkContext = sparkContext;
this.sparkSession = sparkSession;
- this.overriddenSchemaProvider = schemaProvider;
+ this.overriddenSchemaProvider = streamContext.getSchemaProvider();
this.sourceType = sourceType;
+ this.sourceProfileSupplier = streamContext.getSourceProfileSupplier();
}
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index d5faec3595e..32df651d556 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -241,7 +241,24 @@ public class KafkaOffsetGen {
}
public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr,
long sourceLimit, HoodieIngestionMetrics metrics) {
+ // Come up with final set of OffsetRanges to read (account for new
partitions, limit number of events)
+ long maxEventsToReadFromKafka = getLongWithAltKeys(props,
KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE);
+ long numEvents;
+ if (sourceLimit == Long.MAX_VALUE) {
+ numEvents = maxEventsToReadFromKafka;
+ LOG.info("SourceLimit not configured, set numEvents to default value : "
+ maxEventsToReadFromKafka);
+ } else {
+ numEvents = sourceLimit;
+ }
+
+ long minPartitions = getLongWithAltKeys(props,
KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS);
+ LOG.info("getNextOffsetRanges set config " +
KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key() + " to " + minPartitions);
+
+ return getNextOffsetRanges(lastCheckpointStr, numEvents, minPartitions,
metrics);
+ }
+
+ public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr,
long numEvents, long minPartitions, HoodieIngestionMetrics metrics) {
// Obtain current metadata for the topic
Map<TopicPartition, Long> fromOffsets;
Map<TopicPartition, Long> toOffsets;
@@ -279,29 +296,9 @@ public class KafkaOffsetGen {
// Obtain the latest offsets.
toOffsets = consumer.endOffsets(topicPartitions);
}
-
- // Come up with final set of OffsetRanges to read (account for new
partitions, limit number of events)
- long maxEventsToReadFromKafka = getLongWithAltKeys(props,
KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE);
-
- long numEvents;
- if (sourceLimit == Long.MAX_VALUE) {
- numEvents = maxEventsToReadFromKafka;
- LOG.info("SourceLimit not configured, set numEvents to default value : "
+ maxEventsToReadFromKafka);
- } else {
- numEvents = sourceLimit;
- }
-
- // TODO(HUDI-4625) remove
- if (numEvents < toOffsets.size()) {
- throw new HoodieException("sourceLimit should not be less than the
number of kafka partitions");
- }
-
- long minPartitions = getLongWithAltKeys(props,
KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS);
- LOG.info("getNextOffsetRanges set config " +
KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key() + " to " + minPartitions);
-
return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets,
numEvents, minPartitions);
}
-
+
/**
* Fetch partition infos for given topic.
*
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/DefaultStreamContext.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/DefaultStreamContext.java
new file mode 100644
index 00000000000..f8dabeb89c9
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/DefaultStreamContext.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.streamer;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+/**
+ * The default implementation for the StreamContext interface,
+ * composes SchemaProvider and SourceProfileSupplier currently,
+ * can be extended for other arguments in the future.
+ */
+public class DefaultStreamContext implements StreamContext {
+
+ private final SchemaProvider schemaProvider;
+ private final Option<SourceProfileSupplier> sourceProfileSupplier;
+
+ public DefaultStreamContext(SchemaProvider schemaProvider,
Option<SourceProfileSupplier> sourceProfileSupplier) {
+ this.schemaProvider = schemaProvider;
+ this.sourceProfileSupplier = sourceProfileSupplier;
+ }
+
+ @Override
+ public SchemaProvider getSchemaProvider() {
+ return schemaProvider;
+ }
+
+ @Override
+ public Option<SourceProfileSupplier> getSourceProfileSupplier() {
+ return sourceProfileSupplier;
+ }
+}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index 8ecc937c5e7..ef31cc34ab5 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -143,8 +143,12 @@ public class HoodieStreamer implements Serializable {
this(cfg, jssc, fs, conf, Option.empty());
}
+ public HoodieStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs,
Configuration conf, Option<TypedProperties> propsOverride) throws IOException {
+ this(cfg, jssc, fs, conf, propsOverride, Option.empty());
+ }
+
public HoodieStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs,
Configuration conf,
- Option<TypedProperties> propsOverride) throws
IOException {
+ Option<TypedProperties> propsOverride,
Option<SourceProfileSupplier> sourceProfileSupplier) throws IOException {
this.properties = combineProperties(cfg, propsOverride,
jssc.hadoopConfiguration());
if (cfg.initialCheckpointProvider != null && cfg.checkpoint == null) {
InitialCheckPointProvider checkPointProvider =
@@ -158,7 +162,7 @@ public class HoodieStreamer implements Serializable {
cfg.runBootstrap ? new BootstrapExecutor(cfg, jssc, fs, conf,
this.properties) : null);
HoodieSparkEngineContext sparkEngineContext = new
HoodieSparkEngineContext(jssc);
this.ingestionService = Option.ofNullable(
- cfg.runBootstrap ? null : new StreamSyncService(cfg,
sparkEngineContext, fs, conf, Option.ofNullable(this.properties)));
+ cfg.runBootstrap ? null : new StreamSyncService(cfg,
sparkEngineContext, fs, conf, Option.ofNullable(this.properties),
sourceProfileSupplier));
}
private static TypedProperties combineProperties(Config cfg,
Option<TypedProperties> propsOverride, Configuration hadoopConf) {
@@ -656,7 +660,7 @@ public class HoodieStreamer implements Serializable {
private final Option<ConfigurationHotUpdateStrategy>
configurationHotUpdateStrategyOpt;
public StreamSyncService(Config cfg, HoodieSparkEngineContext
hoodieSparkContext, FileSystem fs, Configuration conf,
- Option<TypedProperties> properties) throws
IOException {
+ Option<TypedProperties> properties,
Option<SourceProfileSupplier> sourceProfileSupplier) throws IOException {
super(HoodieIngestionConfig.newBuilder()
.isContinuous(cfg.continuousMode)
.withMinSyncInternalSeconds(cfg.minSyncIntervalSeconds).build());
@@ -708,13 +712,18 @@ public class HoodieStreamer implements Serializable {
UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props,
hoodieSparkContext.jsc()),
props, hoodieSparkContext.jsc(), cfg.transformerClassNames);
- streamSync = new StreamSync(cfg, sparkSession, schemaProvider, props,
hoodieSparkContext, fs, conf, this::onInitializingWriteClient);
+ streamSync = new StreamSync(cfg, sparkSession, props,
hoodieSparkContext, fs, conf, this::onInitializingWriteClient, new
DefaultStreamContext(schemaProvider, sourceProfileSupplier));
}
public StreamSyncService(HoodieStreamer.Config cfg,
HoodieSparkEngineContext hoodieSparkContext, FileSystem fs, Configuration conf)
throws IOException {
- this(cfg, hoodieSparkContext, fs, conf, Option.empty());
+ this(cfg, hoodieSparkContext, fs, conf, Option.empty(), Option.empty());
+ }
+
+ public StreamSyncService(HoodieStreamer.Config cfg,
HoodieSparkEngineContext hoodieSparkContext, FileSystem fs, Configuration conf,
Option<TypedProperties> properties)
+ throws IOException {
+ this(cfg, hoodieSparkContext, fs, conf, properties, Option.empty());
}
private void initializeTableTypeAndBaseFileFormat() {
@@ -728,7 +737,7 @@ public class HoodieStreamer implements Serializable {
if (streamSync != null) {
streamSync.close();
}
- streamSync = new StreamSync(cfg, sparkSession, schemaProvider, props,
hoodieSparkContext, fs, hiveConf, this::onInitializingWriteClient);
+ streamSync = new StreamSync(cfg, sparkSession, props,
hoodieSparkContext, fs, hiveConf, this::onInitializingWriteClient, new
DefaultStreamContext(schemaProvider, Option.empty()));
}
@Override
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceProfile.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceProfile.java
new file mode 100644
index 00000000000..d830cf5dee3
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceProfile.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
+
+/**
+ * A profile containing details about how the next input batch in StreamSync
should be consumed and written.
+ * For eg: KafkaSourceProfile contains number of events to consume in this
sync round.
+ * S3SourceProfile contains the list of files to consume in this sync round.
+ * HudiIncrementalSourceProfile contains the beginInstant and endInstant
commit times to consume in this sync round etc.
+ *
+ * @param <T> The type for source context, varies based on sourceType as
described above.
+ */
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
+public interface SourceProfile<T> {
+
+ /**
+ * @return The maxBytes that will be consumed from the source in this sync
round.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ long getMaxSourceBytes();
+
+ /**
+ * @return The number of output partitions required in source RDD.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ int getSourcePartitions();
+
+ /**
+ * @return The source specific context based on sourceType as described
above.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ T getSourceSpecificContext();
+}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceProfileSupplier.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceProfileSupplier.java
new file mode 100644
index 00000000000..34bfb8dff94
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceProfileSupplier.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
+
+/**
+ * Supplier for SourceProfile
+ */
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
+public interface SourceProfileSupplier {
+ @SuppressWarnings("rawtypes")
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ SourceProfile getSourceProfile();
+}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamContext.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamContext.java
new file mode 100644
index 00000000000..bfe337ee3f2
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamContext.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.streamer;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+/**
+ * The context required to sync one batch of data to hoodie table using
StreamSync.
+ */
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
+public interface StreamContext {
+
+ /**
+ * The schema provider used for reading data from source and also writing to
hoodie table.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ SchemaProvider getSchemaProvider();
+
+ /**
+ * An optional stream profile supplying details regarding how the next input
batch in StreamSync should be consumed and written.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ Option<SourceProfileSupplier> getSourceProfileSupplier();
+}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index f87bf083854..4b83ff92b7b 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -259,19 +259,19 @@ public class StreamSync implements Serializable,
Closeable {
public StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession,
SchemaProvider schemaProvider,
TypedProperties props, JavaSparkContext jssc, FileSystem
fs, Configuration conf,
Function<SparkRDDWriteClient, Boolean>
onInitializingHoodieWriteClient) throws IOException {
- this(cfg, sparkSession, schemaProvider, props, new
HoodieSparkEngineContext(jssc), fs, conf, onInitializingHoodieWriteClient);
+ this(cfg, sparkSession, props, new HoodieSparkEngineContext(jssc), fs,
conf, onInitializingHoodieWriteClient, new DefaultStreamContext(schemaProvider,
Option.empty()));
}
- public StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession,
SchemaProvider schemaProvider,
+ public StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession,
TypedProperties props, HoodieSparkEngineContext
hoodieSparkContext, FileSystem fs, Configuration conf,
- Function<SparkRDDWriteClient, Boolean>
onInitializingHoodieWriteClient) throws IOException {
+ Function<SparkRDDWriteClient, Boolean>
onInitializingHoodieWriteClient, StreamContext streamContext) throws
IOException {
this.cfg = cfg;
this.hoodieSparkContext = hoodieSparkContext;
this.sparkSession = sparkSession;
this.fs = fs;
this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient;
this.props = props;
- this.userProvidedSchemaProvider = schemaProvider;
+ this.userProvidedSchemaProvider = streamContext.getSchemaProvider();
this.processedSchema = new SchemaSet();
this.autoGenerateRecordKeys =
KeyGenUtils.enableAutoGenerateRecordKeys(props);
this.keyGenClassName = getKeyGeneratorClassName(new
TypedProperties(props));
@@ -285,7 +285,7 @@ public class StreamSync implements Serializable, Closeable {
this.errorWriteFailureStrategy =
ErrorTableUtils.getErrorWriteFailureStrategy(props);
}
refreshTimeline();
- Source source = UtilHelpers.createSource(cfg.sourceClassName, props,
hoodieSparkContext.jsc(), sparkSession, schemaProvider, metrics);
+ Source source = UtilHelpers.createSource(cfg.sourceClassName, props,
hoodieSparkContext.jsc(), sparkSession, metrics, streamContext);
this.formatAdapter = new SourceFormatAdapter(source,
this.errorTableWriter, Option.of(props));
Supplier<Option<Schema>> schemaSupplier = schemaProvider == null ?
Option::empty : () -> Option.ofNullable(schemaProvider.getSourceSchema());
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
index b5cbf2738f6..011a1f626b2 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
@@ -28,6 +28,8 @@ import
org.apache.hudi.utilities.exception.HoodieStreamerException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
+import org.apache.hudi.utilities.streamer.SourceProfile;
+import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -52,6 +54,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* Generic tests for all {@link KafkaSource} to ensure all implementations
properly handle offsets, fetch limits, failure modes, etc.
@@ -60,6 +63,7 @@ abstract class BaseTestKafkaSource extends
SparkClientFunctionalTestHarness {
protected static final String TEST_TOPIC_PREFIX = "hoodie_test_";
protected final HoodieIngestionMetrics metrics =
mock(HoodieIngestionMetrics.class);
+ protected final Option<SourceProfileSupplier> sourceProfile =
Option.of(mock(SourceProfileSupplier.class));
protected SchemaProvider schemaProvider;
protected KafkaTestUtils testUtils;
@@ -277,4 +281,51 @@ abstract class BaseTestKafkaSource extends
SparkClientFunctionalTestHarness {
+ " either the data was aged out by Kafka or the topic may have
been deleted before all the data in the topic was processed.",
t.getMessage());
}
+
+ @Test
+ public void testKafkaSourceWithOffsetsFromSourceProfile() {
+ // topic setup.
+ final String topic = TEST_TOPIC_PREFIX + "testKafkaSourceWithOffsetRanges";
+ testUtils.createTopic(topic, 2);
+ TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
+
+ when(sourceProfile.get().getSourceProfile()).thenReturn(new
TestSourceProfile(Long.MAX_VALUE, 4, 500));
+ SourceFormatAdapter kafkaSource = createSource(props);
+
+ // Test for empty data.
+ assertEquals(Option.empty(),
kafkaSource.fetchNewDataInAvroFormat(Option.empty(),
Long.MAX_VALUE).getBatch());
+
+ // Publish messages and assert source has picked up all messages in
offsetRanges supplied by input batch profile.
+ sendMessagesToKafka(topic, 1000, 2);
+ InputBatch<JavaRDD<GenericRecord>> fetch1 =
kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900);
+ assertEquals(500, fetch1.getBatch().get().count());
+ }
+
+ static class TestSourceProfile implements SourceProfile<Long> {
+
+ private final long maxSourceBytes;
+ private final int sourcePartitions;
+ private final long numEvents;
+
+ public TestSourceProfile(long maxSourceBytes, int sourcePartitions, long
numEvents) {
+ this.maxSourceBytes = maxSourceBytes;
+ this.sourcePartitions = sourcePartitions;
+ this.numEvents = numEvents;
+ }
+
+ @Override
+ public long getMaxSourceBytes() {
+ return maxSourceBytes;
+ }
+
+ @Override
+ public int getSourcePartitions() {
+ return sourcePartitions;
+ }
+
+ @Override
+ public Long getSourceSpecificContext() {
+ return numEvents;
+ }
+ }
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
index 14ffd31582a..59a85a06e9c 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
@@ -23,14 +23,16 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.testutils.InProcessTimeGenerator;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.InProcessTimeGenerator;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.config.HoodieStreamerConfig;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.streamer.BaseErrorTableWriter;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.ErrorEvent;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
@@ -60,10 +62,10 @@ import scala.Tuple2;
import static
org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TABLE_BASE_PATH;
import static org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TARGET_TABLE;
import static
org.apache.hudi.utilities.config.KafkaSourceConfig.ENABLE_KAFKA_COMMIT_OFFSET;
+import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN;
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
-import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN;
import static
org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecords;
import static
org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecordsByPartitions;
import static
org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecordsByPartitionsWithNullKafkaKey;
@@ -104,7 +106,7 @@ public class TestJsonKafkaSource extends
BaseTestKafkaSource {
@Override
SourceFormatAdapter createSource(TypedProperties props) {
- return new SourceFormatAdapter(new JsonKafkaSource(props, jsc(), spark(),
schemaProvider, metrics));
+ return new SourceFormatAdapter(new JsonKafkaSource(props, jsc(), spark(),
metrics, new DefaultStreamContext(schemaProvider, sourceProfile)));
}
// test whether empty messages can be filtered
@@ -356,4 +358,13 @@ public class TestJsonKafkaSource extends
BaseTestKafkaSource {
dfWithOffsetInfo.unpersist();
dfWithOffsetInfoAndNullKafkaKey.unpersist();
}
+
+ @Test
+ public void testCreateSource() throws IOException {
+ final String topic = TEST_TOPIC_PREFIX + "testJsonKafkaSourceCreation";
+ testUtils.createTopic(topic, 2);
+ TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
+ Source jsonKafkaSource =
UtilHelpers.createSource(JsonKafkaSource.class.getName(), props, jsc(),
spark(), metrics, new DefaultStreamContext(schemaProvider, sourceProfile));
+ assertEquals(Source.SourceType.JSON, jsonKafkaSource.getSourceType());
+ }
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
index 52376f89741..b56d87c9263 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
@@ -25,6 +25,7 @@ import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
import org.apache.hudi.utilities.schema.ProtoClassBasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.test.proto.Nested;
import org.apache.hudi.utilities.test.proto.Sample;
@@ -89,7 +90,7 @@ public class TestProtoKafkaSource extends BaseTestKafkaSource
{
@Override
SourceFormatAdapter createSource(TypedProperties props) {
this.schemaProvider = new ProtoClassBasedSchemaProvider(props, jsc());
- Source protoKafkaSource = new ProtoKafkaSource(props, jsc(), spark(),
schemaProvider, metrics);
+ Source protoKafkaSource = new ProtoKafkaSource(props, jsc(), spark(),
metrics, new DefaultStreamContext(schemaProvider, sourceProfile));
return new SourceFormatAdapter(protoKafkaSource);
}