This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9d2174acd6b Revert "[HUDI-7416] Add interface for StreamProfile to be
used in StreamSync for reading and writing data (#10687)" (#10734)
9d2174acd6b is described below
commit 9d2174acd6ba02ce580da74167ec6551bd3978be
Author: Vinish Reddy <[email protected]>
AuthorDate: Fri Feb 23 08:13:56 2024 +0530
Revert "[HUDI-7416] Add interface for StreamProfile to be used in
StreamSync for reading and writing data (#10687)" (#10734)
This reverts commit 93cd25fded8b0225ddfc54a49cc40fc5e4ad740c.
---
.../org/apache/hudi/utilities/UtilHelpers.java | 19 --------
.../hudi/utilities/deltastreamer/DeltaSync.java | 4 +-
.../hudi/utilities/sources/AvroKafkaSource.java | 10 ++--
.../hudi/utilities/sources/JsonKafkaSource.java | 13 ++----
.../apache/hudi/utilities/sources/KafkaSource.java | 39 +++++-----------
.../hudi/utilities/sources/ProtoKafkaSource.java | 13 ++----
.../org/apache/hudi/utilities/sources/Source.java | 11 +----
.../utilities/sources/helpers/KafkaOffsetGen.java | 39 ++++++++--------
.../utilities/streamer/DefaultStreamContext.java | 48 -------------------
.../hudi/utilities/streamer/HoodieStreamer.java | 21 +++------
.../hudi/utilities/streamer/SourceProfile.java | 54 ----------------------
.../utilities/streamer/SourceProfileSupplier.java | 34 --------------
.../hudi/utilities/streamer/StreamContext.java | 44 ------------------
.../apache/hudi/utilities/streamer/StreamSync.java | 10 ++--
.../utilities/sources/BaseTestKafkaSource.java | 51 --------------------
.../utilities/sources/TestJsonKafkaSource.java | 17 ++-----
.../utilities/sources/TestProtoKafkaSource.java | 3 +-
17 files changed, 60 insertions(+), 370 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 18af52d334e..18e92a8463c 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -66,7 +66,6 @@ import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.Source;
import
org.apache.hudi.utilities.sources.processor.ChainedJsonKafkaSourcePostProcessor;
import
org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;
-import org.apache.hudi.utilities.streamer.StreamContext;
import org.apache.hudi.utilities.transform.ChainedTransformer;
import org.apache.hudi.utilities.transform.ErrorTableAwareChainedTransformer;
import org.apache.hudi.utilities.transform.Transformer;
@@ -157,24 +156,6 @@ public class UtilHelpers {
}
}
- public static Source createSource(String sourceClass, TypedProperties cfg,
JavaSparkContext jssc,
- SparkSession sparkSession,
HoodieIngestionMetrics metrics, StreamContext streamContext)
- throws IOException {
- try {
- try {
- return (Source) ReflectionUtils.loadClass(sourceClass,
- new Class<?>[] {TypedProperties.class, JavaSparkContext.class,
- SparkSession.class,
- HoodieIngestionMetrics.class, streamContext.getClass()},
- cfg, jssc, sparkSession, metrics, streamContext);
- } catch (HoodieException e) {
- return createSource(sourceClass, cfg, jssc, sparkSession,
streamContext.getSchemaProvider(), metrics);
- }
- } catch (Throwable e) {
- throw new IOException("Could not load source class " + sourceClass, e);
- }
- }
-
public static JsonKafkaSourcePostProcessor
createJsonKafkaSourcePostProcessor(String postProcessorClassNames,
TypedProperties props) throws IOException {
if (StringUtils.isNullOrEmpty(postProcessorClassNames)) {
return null;
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 4002d1579bb..c794db32510 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -22,9 +22,7 @@ package org.apache.hudi.utilities.deltastreamer;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.schema.SchemaProvider;
-import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.HoodieStreamer;
import org.apache.hudi.utilities.streamer.StreamSync;
@@ -51,6 +49,6 @@ public class DeltaSync extends StreamSync {
public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession,
SchemaProvider schemaProvider,
TypedProperties props, HoodieSparkEngineContext
hoodieSparkContext, FileSystem fs, Configuration conf,
Function<SparkRDDWriteClient, Boolean>
onInitializingHoodieWriteClient) throws IOException {
- super(cfg, sparkSession, props, hoodieSparkContext, fs, conf,
onInitializingHoodieWriteClient, new DefaultStreamContext(schemaProvider,
Option.empty()));
+ super(cfg, sparkSession, schemaProvider, props, hoodieSparkContext, fs,
conf, onInitializingHoodieWriteClient);
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
index 46095590430..2bf92280faf 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
@@ -27,8 +27,6 @@ import
org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
-import org.apache.hudi.utilities.streamer.DefaultStreamContext;
-import org.apache.hudi.utilities.streamer.StreamContext;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -71,11 +69,9 @@ public class AvroKafkaSource extends
KafkaSource<GenericRecord> {
public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext,
SparkSession sparkSession,
SchemaProvider schemaProvider, HoodieIngestionMetrics
metrics) {
- this(props, sparkContext, sparkSession, metrics, new
DefaultStreamContext(UtilHelpers.getSchemaProviderForKafkaSource(schemaProvider,
props, sparkContext), Option.empty()));
- }
-
- public AvroKafkaSource(TypedProperties properties, JavaSparkContext
sparkContext, SparkSession sparkSession, HoodieIngestionMetrics metrics,
StreamContext streamContext) {
- super(properties, sparkContext, sparkSession, SourceType.AVRO, metrics,
streamContext);
+ super(props, sparkContext, sparkSession,
+ UtilHelpers.getSchemaProviderForKafkaSource(schemaProvider, props,
sparkContext),
+ SourceType.AVRO, metrics);
this.originalSchemaProvider = schemaProvider;
props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP,
StringDeserializer.class.getName());
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index 0a609dde720..eb67abfee3a 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -19,7 +19,6 @@
package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.config.JsonKafkaPostProcessorConfig;
@@ -28,8 +27,6 @@ import
org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import
org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;
-import org.apache.hudi.utilities.streamer.DefaultStreamContext;
-import org.apache.hudi.utilities.streamer.StreamContext;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -47,10 +44,10 @@ import java.util.LinkedList;
import java.util.List;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
-import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN;
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
+import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN;
/**
* Read json kafka data.
@@ -59,11 +56,9 @@ public class JsonKafkaSource extends KafkaSource<String> {
public JsonKafkaSource(TypedProperties properties, JavaSparkContext
sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider, HoodieIngestionMetrics
metrics) {
- this(properties, sparkContext, sparkSession, metrics, new
DefaultStreamContext(UtilHelpers.getSchemaProviderForKafkaSource(schemaProvider,
properties, sparkContext), Option.empty()));
- }
-
- public JsonKafkaSource(TypedProperties properties, JavaSparkContext
sparkContext, SparkSession sparkSession, HoodieIngestionMetrics metrics,
StreamContext streamContext) {
- super(properties, sparkContext, sparkSession, SourceType.JSON, metrics,
streamContext);
+ super(properties, sparkContext, sparkSession,
+ UtilHelpers.getSchemaProviderForKafkaSource(schemaProvider,
properties, sparkContext),
+ SourceType.JSON, metrics);
properties.put("key.deserializer", StringDeserializer.class.getName());
properties.put("value.deserializer", StringDeserializer.class.getName());
this.offsetGen = new KafkaOffsetGen(props);
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
index 52a6a1217cc..bb26d579582 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
@@ -26,8 +26,6 @@ import
org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
-import org.apache.hudi.utilities.streamer.SourceProfile;
-import org.apache.hudi.utilities.streamer.StreamContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -52,9 +50,9 @@ abstract class KafkaSource<T> extends Source<JavaRDD<T>> {
protected final boolean shouldAddOffsets;
protected KafkaSource(TypedProperties props, JavaSparkContext sparkContext,
SparkSession sparkSession,
- SourceType sourceType, HoodieIngestionMetrics metrics,
StreamContext streamContext) {
- super(props, sparkContext, sparkSession, sourceType, streamContext);
- this.schemaProvider = streamContext.getSchemaProvider();
+ SchemaProvider schemaProvider, SourceType sourceType,
HoodieIngestionMetrics metrics) {
+ super(props, sparkContext, sparkSession, schemaProvider, sourceType);
+ this.schemaProvider = schemaProvider;
this.metrics = metrics;
this.shouldAddOffsets =
KafkaOffsetPostProcessor.Config.shouldAddOffsets(props);
}
@@ -62,34 +60,21 @@ abstract class KafkaSource<T> extends Source<JavaRDD<T>> {
@Override
protected InputBatch<JavaRDD<T>> fetchNewData(Option<String>
lastCheckpointStr, long sourceLimit) {
try {
- OffsetRange[] offsetRanges;
- if (sourceProfileSupplier.isPresent() &&
sourceProfileSupplier.get().getSourceProfile() != null) {
- SourceProfile<Long> kafkaSourceProfile =
sourceProfileSupplier.get().getSourceProfile();
- offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr,
kafkaSourceProfile.getSourceSpecificContext(),
kafkaSourceProfile.getSourcePartitions(), metrics);
- LOG.info("About to read numEvents {} of size {} bytes in {} partitions
from Kafka for topic {} with offsetRanges {}",
- kafkaSourceProfile.getSourceSpecificContext(),
kafkaSourceProfile.getMaxSourceBytes(),
- kafkaSourceProfile.getSourcePartitions(),
offsetGen.getTopicName(), offsetRanges);
- } else {
- offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr,
sourceLimit, metrics);
+ OffsetRange[] offsetRanges =
offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics);
+ long totalNewMsgs =
KafkaOffsetGen.CheckpointUtils.totalNewMessages(offsetRanges);
+ LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" +
offsetGen.getTopicName());
+ if (totalNewMsgs <= 0) {
+
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KAFKA_MESSAGE_IN_COUNT,
0);
+ return new InputBatch<>(Option.empty(),
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
}
- return toInputBatch(offsetRanges);
+
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KAFKA_MESSAGE_IN_COUNT,
totalNewMsgs);
+ JavaRDD<T> newDataRDD = toRDD(offsetRanges);
+ return new InputBatch<>(Option.of(newDataRDD),
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
} catch (org.apache.kafka.common.errors.TimeoutException e) {
throw new HoodieSourceTimeoutException("Kafka Source timed out " +
e.getMessage());
}
}
- private InputBatch<JavaRDD<T>> toInputBatch(OffsetRange[] offsetRanges) {
- long totalNewMsgs =
KafkaOffsetGen.CheckpointUtils.totalNewMessages(offsetRanges);
- LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" +
offsetGen.getTopicName());
- if (totalNewMsgs <= 0) {
-
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KAFKA_MESSAGE_IN_COUNT,
0);
- return new InputBatch<>(Option.empty(),
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
- }
-
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KAFKA_MESSAGE_IN_COUNT,
totalNewMsgs);
- JavaRDD<T> newDataRDD = toRDD(offsetRanges);
- return new InputBatch<>(Option.of(newDataRDD),
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
- }
-
abstract JavaRDD<T> toRDD(OffsetRange[] offsetRanges);
@Override
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
index 208e591c8f1..67927480454 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
@@ -19,15 +19,12 @@
package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
-import org.apache.hudi.utilities.streamer.DefaultStreamContext;
-import org.apache.hudi.utilities.streamer.StreamContext;
import com.google.protobuf.Message;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -54,13 +51,9 @@ public class ProtoKafkaSource extends KafkaSource<Message> {
private final String className;
- public ProtoKafkaSource(TypedProperties props, JavaSparkContext
sparkContext, SparkSession sparkSession,
- SchemaProvider schemaProvider,
HoodieIngestionMetrics metrics) {
- this(props, sparkContext, sparkSession, metrics, new
DefaultStreamContext(schemaProvider, Option.empty()));
- }
-
- public ProtoKafkaSource(TypedProperties properties, JavaSparkContext
sparkContext, SparkSession sparkSession, HoodieIngestionMetrics metrics,
StreamContext streamContext) {
- super(properties, sparkContext, sparkSession, SourceType.PROTO, metrics,
streamContext);
+ public ProtoKafkaSource(TypedProperties props, JavaSparkContext sparkContext,
+ SparkSession sparkSession, SchemaProvider
schemaProvider, HoodieIngestionMetrics metrics) {
+ super(props, sparkContext, sparkSession, schemaProvider, SourceType.PROTO,
metrics);
checkRequiredConfigProperties(props, Collections.singletonList(
ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME));
props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class);
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
index dfb07c718a0..cbc0722056b 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
@@ -25,9 +25,6 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.callback.SourceCommitCallback;
import org.apache.hudi.utilities.schema.SchemaProvider;
-import org.apache.hudi.utilities.streamer.DefaultStreamContext;
-import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
-import org.apache.hudi.utilities.streamer.StreamContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
@@ -47,7 +44,6 @@ public abstract class Source<T> implements
SourceCommitCallback, Serializable {
protected transient TypedProperties props;
protected transient JavaSparkContext sparkContext;
protected transient SparkSession sparkSession;
- protected transient Option<SourceProfileSupplier> sourceProfileSupplier;
private transient SchemaProvider overriddenSchemaProvider;
private final SourceType sourceType;
@@ -59,16 +55,11 @@ public abstract class Source<T> implements
SourceCommitCallback, Serializable {
protected Source(TypedProperties props, JavaSparkContext sparkContext,
SparkSession sparkSession,
SchemaProvider schemaProvider, SourceType sourceType) {
- this(props, sparkContext, sparkSession, sourceType, new
DefaultStreamContext(schemaProvider, Option.empty()));
- }
-
- protected Source(TypedProperties props, JavaSparkContext sparkContext,
SparkSession sparkSession, SourceType sourceType, StreamContext streamContext) {
this.props = props;
this.sparkContext = sparkContext;
this.sparkSession = sparkSession;
- this.overriddenSchemaProvider = streamContext.getSchemaProvider();
+ this.overriddenSchemaProvider = schemaProvider;
this.sourceType = sourceType;
- this.sourceProfileSupplier = streamContext.getSourceProfileSupplier();
}
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index 32df651d556..d5faec3595e 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -241,24 +241,7 @@ public class KafkaOffsetGen {
}
public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr,
long sourceLimit, HoodieIngestionMetrics metrics) {
- // Come up with final set of OffsetRanges to read (account for new
partitions, limit number of events)
- long maxEventsToReadFromKafka = getLongWithAltKeys(props,
KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE);
- long numEvents;
- if (sourceLimit == Long.MAX_VALUE) {
- numEvents = maxEventsToReadFromKafka;
- LOG.info("SourceLimit not configured, set numEvents to default value : "
+ maxEventsToReadFromKafka);
- } else {
- numEvents = sourceLimit;
- }
-
- long minPartitions = getLongWithAltKeys(props,
KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS);
- LOG.info("getNextOffsetRanges set config " +
KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key() + " to " + minPartitions);
-
- return getNextOffsetRanges(lastCheckpointStr, numEvents, minPartitions,
metrics);
- }
-
- public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr,
long numEvents, long minPartitions, HoodieIngestionMetrics metrics) {
// Obtain current metadata for the topic
Map<TopicPartition, Long> fromOffsets;
Map<TopicPartition, Long> toOffsets;
@@ -296,9 +279,29 @@ public class KafkaOffsetGen {
// Obtain the latest offsets.
toOffsets = consumer.endOffsets(topicPartitions);
}
+
+ // Come up with final set of OffsetRanges to read (account for new
partitions, limit number of events)
+ long maxEventsToReadFromKafka = getLongWithAltKeys(props,
KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE);
+
+ long numEvents;
+ if (sourceLimit == Long.MAX_VALUE) {
+ numEvents = maxEventsToReadFromKafka;
+ LOG.info("SourceLimit not configured, set numEvents to default value : "
+ maxEventsToReadFromKafka);
+ } else {
+ numEvents = sourceLimit;
+ }
+
+ // TODO(HUDI-4625) remove
+ if (numEvents < toOffsets.size()) {
+ throw new HoodieException("sourceLimit should not be less than the
number of kafka partitions");
+ }
+
+ long minPartitions = getLongWithAltKeys(props,
KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS);
+ LOG.info("getNextOffsetRanges set config " +
KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key() + " to " + minPartitions);
+
return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets,
numEvents, minPartitions);
}
-
+
/**
* Fetch partition infos for given topic.
*
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/DefaultStreamContext.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/DefaultStreamContext.java
deleted file mode 100644
index f8dabeb89c9..00000000000
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/DefaultStreamContext.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.utilities.streamer;
-
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.utilities.schema.SchemaProvider;
-
-/**
- * The default implementation for the StreamContext interface,
- * composes SchemaProvider and SourceProfileSupplier currently,
- * can be extended for other arguments in the future.
- */
-public class DefaultStreamContext implements StreamContext {
-
- private final SchemaProvider schemaProvider;
- private final Option<SourceProfileSupplier> sourceProfileSupplier;
-
- public DefaultStreamContext(SchemaProvider schemaProvider,
Option<SourceProfileSupplier> sourceProfileSupplier) {
- this.schemaProvider = schemaProvider;
- this.sourceProfileSupplier = sourceProfileSupplier;
- }
-
- @Override
- public SchemaProvider getSchemaProvider() {
- return schemaProvider;
- }
-
- @Override
- public Option<SourceProfileSupplier> getSourceProfileSupplier() {
- return sourceProfileSupplier;
- }
-}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index ef31cc34ab5..8ecc937c5e7 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -143,12 +143,8 @@ public class HoodieStreamer implements Serializable {
this(cfg, jssc, fs, conf, Option.empty());
}
- public HoodieStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs,
Configuration conf, Option<TypedProperties> propsOverride) throws IOException {
- this(cfg, jssc, fs, conf, propsOverride, Option.empty());
- }
-
public HoodieStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs,
Configuration conf,
- Option<TypedProperties> propsOverride,
Option<SourceProfileSupplier> sourceProfileSupplier) throws IOException {
+ Option<TypedProperties> propsOverride) throws
IOException {
this.properties = combineProperties(cfg, propsOverride,
jssc.hadoopConfiguration());
if (cfg.initialCheckpointProvider != null && cfg.checkpoint == null) {
InitialCheckPointProvider checkPointProvider =
@@ -162,7 +158,7 @@ public class HoodieStreamer implements Serializable {
cfg.runBootstrap ? new BootstrapExecutor(cfg, jssc, fs, conf,
this.properties) : null);
HoodieSparkEngineContext sparkEngineContext = new
HoodieSparkEngineContext(jssc);
this.ingestionService = Option.ofNullable(
- cfg.runBootstrap ? null : new StreamSyncService(cfg,
sparkEngineContext, fs, conf, Option.ofNullable(this.properties),
sourceProfileSupplier));
+ cfg.runBootstrap ? null : new StreamSyncService(cfg,
sparkEngineContext, fs, conf, Option.ofNullable(this.properties)));
}
private static TypedProperties combineProperties(Config cfg,
Option<TypedProperties> propsOverride, Configuration hadoopConf) {
@@ -660,7 +656,7 @@ public class HoodieStreamer implements Serializable {
private final Option<ConfigurationHotUpdateStrategy>
configurationHotUpdateStrategyOpt;
public StreamSyncService(Config cfg, HoodieSparkEngineContext
hoodieSparkContext, FileSystem fs, Configuration conf,
- Option<TypedProperties> properties,
Option<SourceProfileSupplier> sourceProfileSupplier) throws IOException {
+ Option<TypedProperties> properties) throws
IOException {
super(HoodieIngestionConfig.newBuilder()
.isContinuous(cfg.continuousMode)
.withMinSyncInternalSeconds(cfg.minSyncIntervalSeconds).build());
@@ -712,18 +708,13 @@ public class HoodieStreamer implements Serializable {
UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props,
hoodieSparkContext.jsc()),
props, hoodieSparkContext.jsc(), cfg.transformerClassNames);
- streamSync = new StreamSync(cfg, sparkSession, props,
hoodieSparkContext, fs, conf, this::onInitializingWriteClient, new
DefaultStreamContext(schemaProvider, sourceProfileSupplier));
+ streamSync = new StreamSync(cfg, sparkSession, schemaProvider, props,
hoodieSparkContext, fs, conf, this::onInitializingWriteClient);
}
public StreamSyncService(HoodieStreamer.Config cfg,
HoodieSparkEngineContext hoodieSparkContext, FileSystem fs, Configuration conf)
throws IOException {
- this(cfg, hoodieSparkContext, fs, conf, Option.empty(), Option.empty());
- }
-
- public StreamSyncService(HoodieStreamer.Config cfg,
HoodieSparkEngineContext hoodieSparkContext, FileSystem fs, Configuration conf,
Option<TypedProperties> properties)
- throws IOException {
- this(cfg, hoodieSparkContext, fs, conf, properties, Option.empty());
+ this(cfg, hoodieSparkContext, fs, conf, Option.empty());
}
private void initializeTableTypeAndBaseFileFormat() {
@@ -737,7 +728,7 @@ public class HoodieStreamer implements Serializable {
if (streamSync != null) {
streamSync.close();
}
- streamSync = new StreamSync(cfg, sparkSession, props,
hoodieSparkContext, fs, hiveConf, this::onInitializingWriteClient, new
DefaultStreamContext(schemaProvider, Option.empty()));
+ streamSync = new StreamSync(cfg, sparkSession, schemaProvider, props,
hoodieSparkContext, fs, hiveConf, this::onInitializingWriteClient);
}
@Override
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceProfile.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceProfile.java
deleted file mode 100644
index d830cf5dee3..00000000000
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceProfile.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi.utilities.streamer;
-
-import org.apache.hudi.ApiMaturityLevel;
-import org.apache.hudi.PublicAPIClass;
-import org.apache.hudi.PublicAPIMethod;
-
-/**
- * A profile containing details about how the next input batch in StreamSync
should be consumed and written.
- * For eg: KafkaSourceProfile contains number of events to consume in this
sync round.
- * S3SourceProfile contains the list of files to consume in this sync round.
- * HudiIncrementalSourceProfile contains the beginInstant and endInstant
commit times to consume in this sync round etc.
- *
- * @param <T> The type for source context, varies based on sourceType as
described above.
- */
-@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
-public interface SourceProfile<T> {
-
- /**
- * @return The maxBytes that will be consumed from the source in this sync
round.
- */
- @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
- long getMaxSourceBytes();
-
- /**
- * @return The number of output partitions required in source RDD.
- */
- @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
- int getSourcePartitions();
-
- /**
- * @return The source specific context based on sourceType as described
above.
- */
- @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
- T getSourceSpecificContext();
-}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceProfileSupplier.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceProfileSupplier.java
deleted file mode 100644
index 34bfb8dff94..00000000000
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceProfileSupplier.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi.utilities.streamer;
-
-import org.apache.hudi.ApiMaturityLevel;
-import org.apache.hudi.PublicAPIClass;
-import org.apache.hudi.PublicAPIMethod;
-
-/**
- * Supplier for SourceProfile
- */
-@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
-public interface SourceProfileSupplier {
- @SuppressWarnings("rawtypes")
- @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
- SourceProfile getSourceProfile();
-}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamContext.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamContext.java
deleted file mode 100644
index bfe337ee3f2..00000000000
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.utilities.streamer;
-
-import org.apache.hudi.ApiMaturityLevel;
-import org.apache.hudi.PublicAPIClass;
-import org.apache.hudi.PublicAPIMethod;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.utilities.schema.SchemaProvider;
-
-/**
- * The context required to sync one batch of data to hoodie table using
StreamSync.
- */
-@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
-public interface StreamContext {
-
- /**
- * The schema provider used for reading data from source and also writing to
hoodie table.
- */
- @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
- SchemaProvider getSchemaProvider();
-
- /**
- * An optional stream profile supplying details regarding how the next input
batch in StreamSync should be consumed and written.
- */
- @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
- Option<SourceProfileSupplier> getSourceProfileSupplier();
-}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 4b83ff92b7b..f87bf083854 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -259,19 +259,19 @@ public class StreamSync implements Serializable,
Closeable {
public StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession,
SchemaProvider schemaProvider,
TypedProperties props, JavaSparkContext jssc, FileSystem
fs, Configuration conf,
Function<SparkRDDWriteClient, Boolean>
onInitializingHoodieWriteClient) throws IOException {
- this(cfg, sparkSession, props, new HoodieSparkEngineContext(jssc), fs,
conf, onInitializingHoodieWriteClient, new DefaultStreamContext(schemaProvider,
Option.empty()));
+ this(cfg, sparkSession, schemaProvider, props, new
HoodieSparkEngineContext(jssc), fs, conf, onInitializingHoodieWriteClient);
}
- public StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession,
+ public StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession,
SchemaProvider schemaProvider,
TypedProperties props, HoodieSparkEngineContext
hoodieSparkContext, FileSystem fs, Configuration conf,
- Function<SparkRDDWriteClient, Boolean>
onInitializingHoodieWriteClient, StreamContext streamContext) throws
IOException {
+ Function<SparkRDDWriteClient, Boolean>
onInitializingHoodieWriteClient) throws IOException {
this.cfg = cfg;
this.hoodieSparkContext = hoodieSparkContext;
this.sparkSession = sparkSession;
this.fs = fs;
this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient;
this.props = props;
- this.userProvidedSchemaProvider = streamContext.getSchemaProvider();
+ this.userProvidedSchemaProvider = schemaProvider;
this.processedSchema = new SchemaSet();
this.autoGenerateRecordKeys =
KeyGenUtils.enableAutoGenerateRecordKeys(props);
this.keyGenClassName = getKeyGeneratorClassName(new
TypedProperties(props));
@@ -285,7 +285,7 @@ public class StreamSync implements Serializable, Closeable {
this.errorWriteFailureStrategy =
ErrorTableUtils.getErrorWriteFailureStrategy(props);
}
refreshTimeline();
- Source source = UtilHelpers.createSource(cfg.sourceClassName, props,
hoodieSparkContext.jsc(), sparkSession, metrics, streamContext);
+ Source source = UtilHelpers.createSource(cfg.sourceClassName, props,
hoodieSparkContext.jsc(), sparkSession, schemaProvider, metrics);
this.formatAdapter = new SourceFormatAdapter(source,
this.errorTableWriter, Option.of(props));
Supplier<Option<Schema>> schemaSupplier = schemaProvider == null ?
Option::empty : () -> Option.ofNullable(schemaProvider.getSourceSchema());
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
index 011a1f626b2..b5cbf2738f6 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
@@ -28,8 +28,6 @@ import
org.apache.hudi.utilities.exception.HoodieStreamerException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
-import org.apache.hudi.utilities.streamer.SourceProfile;
-import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -54,7 +52,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
/**
* Generic tests for all {@link KafkaSource} to ensure all implementations
properly handle offsets, fetch limits, failure modes, etc.
@@ -63,7 +60,6 @@ abstract class BaseTestKafkaSource extends
SparkClientFunctionalTestHarness {
protected static final String TEST_TOPIC_PREFIX = "hoodie_test_";
protected final HoodieIngestionMetrics metrics =
mock(HoodieIngestionMetrics.class);
- protected final Option<SourceProfileSupplier> sourceProfile =
Option.of(mock(SourceProfileSupplier.class));
protected SchemaProvider schemaProvider;
protected KafkaTestUtils testUtils;
@@ -281,51 +277,4 @@ abstract class BaseTestKafkaSource extends
SparkClientFunctionalTestHarness {
+ " either the data was aged out by Kafka or the topic may have
been deleted before all the data in the topic was processed.",
t.getMessage());
}
-
- @Test
- public void testKafkaSourceWithOffsetsFromSourceProfile() {
- // topic setup.
- final String topic = TEST_TOPIC_PREFIX + "testKafkaSourceWithOffsetRanges";
- testUtils.createTopic(topic, 2);
- TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
-
- when(sourceProfile.get().getSourceProfile()).thenReturn(new
TestSourceProfile(Long.MAX_VALUE, 4, 500));
- SourceFormatAdapter kafkaSource = createSource(props);
-
- // Test for empty data.
- assertEquals(Option.empty(),
kafkaSource.fetchNewDataInAvroFormat(Option.empty(),
Long.MAX_VALUE).getBatch());
-
- // Publish messages and assert source has picked up all messages in
offsetRanges supplied by input batch profile.
- sendMessagesToKafka(topic, 1000, 2);
- InputBatch<JavaRDD<GenericRecord>> fetch1 =
kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900);
- assertEquals(500, fetch1.getBatch().get().count());
- }
-
- static class TestSourceProfile implements SourceProfile<Long> {
-
- private final long maxSourceBytes;
- private final int sourcePartitions;
- private final long numEvents;
-
- public TestSourceProfile(long maxSourceBytes, int sourcePartitions, long
numEvents) {
- this.maxSourceBytes = maxSourceBytes;
- this.sourcePartitions = sourcePartitions;
- this.numEvents = numEvents;
- }
-
- @Override
- public long getMaxSourceBytes() {
- return maxSourceBytes;
- }
-
- @Override
- public int getSourcePartitions() {
- return sourcePartitions;
- }
-
- @Override
- public Long getSourceSpecificContext() {
- return numEvents;
- }
- }
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
index 59a85a06e9c..14ffd31582a 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
@@ -23,16 +23,14 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.InProcessTimeGenerator;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.config.HoodieStreamerConfig;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.streamer.BaseErrorTableWriter;
-import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.ErrorEvent;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
@@ -62,10 +60,10 @@ import scala.Tuple2;
import static
org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TABLE_BASE_PATH;
import static org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TARGET_TABLE;
import static
org.apache.hudi.utilities.config.KafkaSourceConfig.ENABLE_KAFKA_COMMIT_OFFSET;
-import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN;
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
+import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN;
import static
org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecords;
import static
org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecordsByPartitions;
import static
org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecordsByPartitionsWithNullKafkaKey;
@@ -106,7 +104,7 @@ public class TestJsonKafkaSource extends
BaseTestKafkaSource {
@Override
SourceFormatAdapter createSource(TypedProperties props) {
- return new SourceFormatAdapter(new JsonKafkaSource(props, jsc(), spark(),
metrics, new DefaultStreamContext(schemaProvider, sourceProfile)));
+ return new SourceFormatAdapter(new JsonKafkaSource(props, jsc(), spark(),
schemaProvider, metrics));
}
// test whether empty messages can be filtered
@@ -358,13 +356,4 @@ public class TestJsonKafkaSource extends
BaseTestKafkaSource {
dfWithOffsetInfo.unpersist();
dfWithOffsetInfoAndNullKafkaKey.unpersist();
}
-
- @Test
- public void testCreateSource() throws IOException {
- final String topic = TEST_TOPIC_PREFIX + "testJsonKafkaSourceCreation";
- testUtils.createTopic(topic, 2);
- TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
- Source jsonKafkaSource =
UtilHelpers.createSource(JsonKafkaSource.class.getName(), props, jsc(),
spark(), metrics, new DefaultStreamContext(schemaProvider, sourceProfile));
- assertEquals(Source.SourceType.JSON, jsonKafkaSource.getSourceType());
- }
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
index b56d87c9263..52376f89741 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
@@ -25,7 +25,6 @@ import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
import org.apache.hudi.utilities.schema.ProtoClassBasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
-import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.test.proto.Nested;
import org.apache.hudi.utilities.test.proto.Sample;
@@ -90,7 +89,7 @@ public class TestProtoKafkaSource extends BaseTestKafkaSource
{
@Override
SourceFormatAdapter createSource(TypedProperties props) {
this.schemaProvider = new ProtoClassBasedSchemaProvider(props, jsc());
- Source protoKafkaSource = new ProtoKafkaSource(props, jsc(), spark(),
metrics, new DefaultStreamContext(schemaProvider, sourceProfile));
+ Source protoKafkaSource = new ProtoKafkaSource(props, jsc(), spark(),
schemaProvider, metrics);
return new SourceFormatAdapter(protoKafkaSource);
}