This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new be0a6604b12 [HUDI-7501] Use source profile for S3 and GCS sources
(#10861)
be0a6604b12 is described below
commit be0a6604b12abe6ef74a7b2c83f24de6af19e3d7
Author: Vinish Reddy <[email protected]>
AuthorDate: Mon May 13 07:23:31 2024 +0530
[HUDI-7501] Use source profile for S3 and GCS sources (#10861)
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../org/apache/hudi/utilities/UtilHelpers.java | 53 ++++-----
.../sources/GcsEventsHoodieIncrSource.java | 61 ++++------
.../hudi/utilities/sources/HoodieIncrSource.java | 6 +-
.../apache/hudi/utilities/sources/RowSource.java | 8 +-
.../sources/S3EventsHoodieIncrSource.java | 87 +++-----------
.../sources/helpers/CloudDataFetcher.java | 79 ++++++++++++-
.../helpers/CloudObjectsSelectorCommon.java | 70 ++++++++----
.../helpers/gcs/GcsObjectMetadataFetcher.java | 86 --------------
.../sources/TestGcsEventsHoodieIncrSource.java | 83 ++++++++++----
.../utilities/sources/TestHoodieIncrSource.java | 3 +-
.../sources/TestS3EventsHoodieIncrSource.java | 125 ++++++++++++++++-----
.../debezium/TestAbstractDebeziumSource.java | 3 +-
.../helpers/TestCloudObjectsSelectorCommon.java | 42 ++++---
13 files changed, 383 insertions(+), 323 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 124abeb059f..d0acffe5d17 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -40,6 +40,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieLockConfig;
@@ -140,42 +141,30 @@ public class UtilHelpers {
}
public static Source createSource(String sourceClass, TypedProperties cfg,
JavaSparkContext jssc,
- SparkSession sparkSession, SchemaProvider schemaProvider,
- HoodieIngestionMetrics metrics) throws IOException {
- try {
+ SparkSession sparkSession,
HoodieIngestionMetrics metrics, StreamContext streamContext) throws IOException
{
+ // All possible constructors.
+ Class<?>[] constructorArgsStreamContextMetrics = new Class<?>[]
{TypedProperties.class, JavaSparkContext.class, SparkSession.class,
HoodieIngestionMetrics.class, StreamContext.class};
+ Class<?>[] constructorArgsStreamContext = new Class<?>[]
{TypedProperties.class, JavaSparkContext.class, SparkSession.class,
StreamContext.class};
+ Class<?>[] constructorArgsMetrics = new Class<?>[] {TypedProperties.class,
JavaSparkContext.class, SparkSession.class, SchemaProvider.class,
HoodieIngestionMetrics.class};
+ Class<?>[] constructorArgs = new Class<?>[] {TypedProperties.class,
JavaSparkContext.class, SparkSession.class, SchemaProvider.class};
+ // List of constructor and their respective arguments.
+ List<Pair<Class<?>[], Object[]>> sourceConstructorAndArgs = new
ArrayList<>();
+ sourceConstructorAndArgs.add(Pair.of(constructorArgsStreamContextMetrics,
new Object[] {cfg, jssc, sparkSession, metrics, streamContext}));
+ sourceConstructorAndArgs.add(Pair.of(constructorArgsStreamContext, new
Object[] {cfg, jssc, sparkSession, streamContext}));
+ sourceConstructorAndArgs.add(Pair.of(constructorArgsMetrics, new Object[]
{cfg, jssc, sparkSession, streamContext.getSchemaProvider(), metrics}));
+ sourceConstructorAndArgs.add(Pair.of(constructorArgs, new Object[] {cfg,
jssc, sparkSession, streamContext.getSchemaProvider()}));
+
+ HoodieException sourceClassLoadException = null;
+ for (Pair<Class<?>[], Object[]> constructor : sourceConstructorAndArgs) {
try {
- return (Source) ReflectionUtils.loadClass(sourceClass,
- new Class<?>[] {TypedProperties.class, JavaSparkContext.class,
- SparkSession.class, SchemaProvider.class,
- HoodieIngestionMetrics.class},
- cfg, jssc, sparkSession, schemaProvider, metrics);
+ return (Source) ReflectionUtils.loadClass(sourceClass,
constructor.getLeft(), constructor.getRight());
} catch (HoodieException e) {
- return (Source) ReflectionUtils.loadClass(sourceClass,
- new Class<?>[] {TypedProperties.class, JavaSparkContext.class,
- SparkSession.class, SchemaProvider.class},
- cfg, jssc, sparkSession, schemaProvider);
+ sourceClassLoadException = e;
+ } catch (Throwable t) {
+ throw new IOException("Could not load source class " + sourceClass, t);
}
- } catch (Throwable e) {
- throw new IOException("Could not load source class " + sourceClass, e);
- }
- }
-
- public static Source createSource(String sourceClass, TypedProperties cfg,
JavaSparkContext jssc,
- SparkSession sparkSession,
HoodieIngestionMetrics metrics, StreamContext streamContext)
- throws IOException {
- try {
- try {
- return (Source) ReflectionUtils.loadClass(sourceClass,
- new Class<?>[] {TypedProperties.class, JavaSparkContext.class,
- SparkSession.class,
- HoodieIngestionMetrics.class, StreamContext.class},
- cfg, jssc, sparkSession, metrics, streamContext);
- } catch (HoodieException e) {
- return createSource(sourceClass, cfg, jssc, sparkSession,
streamContext.getSchemaProvider(), metrics);
- }
- } catch (Throwable e) {
- throw new IOException("Could not load source class " + sourceClass, e);
}
+ throw new IOException("Could not load source class " + sourceClass,
sourceClassLoadException);
}
public static JsonKafkaSourcePostProcessor
createJsonKafkaSourcePostProcessor(String postProcessorClassNames,
TypedProperties props) throws IOException {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
index d1d320f99b8..5900ddade24 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
@@ -26,13 +26,12 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
import org.apache.hudi.utilities.sources.helpers.CloudObjectIncrCheckpoint;
-import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata;
import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon;
-import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy;
import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.hudi.utilities.sources.helpers.QueryRunner;
-import org.apache.hudi.utilities.sources.helpers.gcs.GcsObjectMetadataFetcher;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
+import org.apache.hudi.utilities.streamer.StreamContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
@@ -42,7 +41,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
-import java.util.List;
import static
org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties;
import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
@@ -52,6 +50,7 @@ import static
org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
import static
org.apache.hudi.utilities.config.CloudSourceConfig.ENABLE_EXISTS_CHECK;
import static
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH;
import static
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH;
+import static
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.Type.GCS;
import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.generateQueryInfo;
import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getHollowCommitHandleMode;
import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getMissingCheckpointStrategy;
@@ -109,8 +108,7 @@ public class GcsEventsHoodieIncrSource extends
HoodieIncrSource {
private final int numInstantsPerFetch;
private final MissingCheckpointStrategy missingCheckpointStrategy;
- private final GcsObjectMetadataFetcher gcsObjectMetadataFetcher;
- private final CloudDataFetcher gcsObjectDataFetcher;
+ private final CloudDataFetcher cloudDataFetcher;
private final QueryRunner queryRunner;
private final Option<SchemaProvider> schemaProvider;
private final Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitter;
@@ -120,16 +118,26 @@ public class GcsEventsHoodieIncrSource extends
HoodieIncrSource {
public GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext
jsc, SparkSession spark,
SchemaProvider schemaProvider) {
- this(props, jsc, spark, schemaProvider,
- new GcsObjectMetadataFetcher(props),
- new CloudDataFetcher(props),
- new QueryRunner(spark, props)
+ this(props, jsc, spark,
+ new CloudDataFetcher(props, jsc, spark),
+ new QueryRunner(spark, props),
+ new DefaultStreamContext(schemaProvider, Option.empty())
+ );
+ }
+
+ public GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext
jsc, SparkSession spark,
+ StreamContext streamContext) {
+
+ this(props, jsc, spark,
+ new CloudDataFetcher(props, jsc, spark),
+ new QueryRunner(spark, props),
+ streamContext
);
}
GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext jsc,
SparkSession spark,
- SchemaProvider schemaProvider,
GcsObjectMetadataFetcher gcsObjectMetadataFetcher, CloudDataFetcher
gcsObjectDataFetcher, QueryRunner queryRunner) {
- super(props, jsc, spark, schemaProvider);
+ CloudDataFetcher cloudDataFetcher, QueryRunner
queryRunner, StreamContext streamContext) {
+ super(props, jsc, spark, streamContext);
checkRequiredConfigProperties(props,
Collections.singletonList(HOODIE_SRC_BASE_PATH));
srcPath = getStringWithAltKeys(props, HOODIE_SRC_BASE_PATH);
@@ -137,10 +145,9 @@ public class GcsEventsHoodieIncrSource extends
HoodieIncrSource {
numInstantsPerFetch = getIntWithAltKeys(props, NUM_INSTANTS_PER_FETCH);
checkIfFileExists = getBooleanWithAltKeys(props, ENABLE_EXISTS_CHECK);
- this.gcsObjectMetadataFetcher = gcsObjectMetadataFetcher;
- this.gcsObjectDataFetcher = gcsObjectDataFetcher;
+ this.cloudDataFetcher = cloudDataFetcher;
this.queryRunner = queryRunner;
- this.schemaProvider = Option.ofNullable(schemaProvider);
+ this.schemaProvider = Option.ofNullable(streamContext.getSchemaProvider());
this.snapshotLoadQuerySplitter =
SnapshotLoadQuerySplitter.getInstance(props);
LOG.info("srcPath: " + srcPath);
@@ -168,28 +175,6 @@ public class GcsEventsHoodieIncrSource extends
HoodieIncrSource {
+ queryInfo.getStartInstant());
return Pair.of(Option.empty(), queryInfo.getStartInstant());
}
-
- Pair<QueryInfo, Dataset<Row>> queryInfoDatasetPair =
queryRunner.run(queryInfo, snapshotLoadQuerySplitter);
- Dataset<Row> filteredSourceData =
gcsObjectMetadataFetcher.applyFilter(queryInfoDatasetPair.getRight());
- queryInfo = queryInfoDatasetPair.getLeft();
- LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based
on sourceLimit :" + sourceLimit);
- Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset
=
- IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
- filteredSourceData, sourceLimit, queryInfo,
cloudObjectIncrCheckpoint);
- if (!checkPointAndDataset.getRight().isPresent()) {
- LOG.info("Empty source, returning endpoint:" +
checkPointAndDataset.getLeft());
- return Pair.of(Option.empty(),
checkPointAndDataset.getLeft().toString());
- }
- LOG.info("Adjusted end checkpoint :" + checkPointAndDataset.getLeft());
-
- Pair<Option<Dataset<Row>>, String> extractedCheckPointAndDataset =
extractData(queryInfo, checkPointAndDataset.getRight().get());
- return Pair.of(extractedCheckPointAndDataset.getLeft(),
checkPointAndDataset.getLeft().toString());
- }
-
- private Pair<Option<Dataset<Row>>, String> extractData(QueryInfo queryInfo,
Dataset<Row> cloudObjectMetadataDF) {
- List<CloudObjectMetadata> cloudObjectMetadata =
gcsObjectMetadataFetcher.getGcsObjectMetadata(sparkContext,
cloudObjectMetadataDF, checkIfFileExists);
- LOG.info("Total number of files to process :" +
cloudObjectMetadata.size());
- Option<Dataset<Row>> fileDataRows =
gcsObjectDataFetcher.getCloudObjectDataDF(sparkSession, cloudObjectMetadata,
props, schemaProvider);
- return Pair.of(fileDataRows, queryInfo.getEndInstant());
+ return cloudDataFetcher.fetchPartitionedSource(GCS,
cloudObjectIncrCheckpoint, this.sourceProfileSupplier,
queryRunner.run(queryInfo, snapshotLoadQuerySplitter), this.schemaProvider,
sourceLimit);
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
index 2b52bda04f2..7e019a2aaf0 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
@@ -27,9 +27,9 @@ import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.config.HoodieIncrSourceConfig;
-import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.hudi.utilities.sources.helpers.QueryInfo;
+import org.apache.hudi.utilities.streamer.StreamContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
@@ -136,8 +136,8 @@ public class HoodieIncrSource extends RowSource {
private final Map<String, String> readOpts = new HashMap<>();
public HoodieIncrSource(TypedProperties props, JavaSparkContext
sparkContext, SparkSession sparkSession,
- SchemaProvider schemaProvider) {
- super(props, sparkContext, sparkSession, schemaProvider);
+ StreamContext streamContext) {
+ super(props, sparkContext, sparkSession, streamContext);
for (Object key : props.keySet()) {
String keyString = key.toString();
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
index 1c7e9d99098..f76c285f2bb 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
@@ -26,8 +26,9 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.schema.SchemaProvider;
-
import org.apache.hudi.utilities.sources.helpers.SanitizationUtils;
+import org.apache.hudi.utilities.streamer.StreamContext;
+
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -41,6 +42,11 @@ public abstract class RowSource extends Source<Dataset<Row>>
{
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider, SourceType.ROW);
}
+
+ public RowSource(TypedProperties props, JavaSparkContext sparkContext,
SparkSession sparkSession,
+ StreamContext streamContext) {
+ super(props, sparkContext, sparkSession, SourceType.ROW, streamContext);
+ }
protected abstract Pair<Option<Dataset<Row>>, String>
fetchNextBatch(Option<String> lastCkptStr, long sourceLimit);
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
index be9914190e7..579bc5c2021 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
@@ -23,41 +23,32 @@ import org.apache.hudi.common.model.HoodieRecord;
import
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.storage.StorageConfiguration;
-import org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
import org.apache.hudi.utilities.sources.helpers.CloudObjectIncrCheckpoint;
-import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata;
import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.hudi.utilities.sources.helpers.QueryRunner;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
+import org.apache.hudi.utilities.streamer.StreamContext;
-import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
-import java.util.List;
import static
org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties;
-import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
-import static
org.apache.hudi.utilities.config.CloudSourceConfig.ENABLE_EXISTS_CHECK;
import static
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH;
import static
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH;
-import static
org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_FS_PREFIX;
-import static
org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_INCR_ENABLE_EXISTS_CHECK;
-import static
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.getCloudObjectMetadataPerPartition;
+import static
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.Type.S3;
import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getHollowCommitHandleMode;
import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getMissingCheckpointStrategy;
@@ -69,7 +60,6 @@ public class S3EventsHoodieIncrSource extends
HoodieIncrSource {
private static final Logger LOG =
LoggerFactory.getLogger(S3EventsHoodieIncrSource.class);
private final String srcPath;
private final int numInstantsPerFetch;
- private final boolean checkIfFileExists;
private final IncrSourceHelper.MissingCheckpointStrategy
missingCheckpointStrategy;
private final QueryRunner queryRunner;
private final CloudDataFetcher cloudDataFetcher;
@@ -78,50 +68,39 @@ public class S3EventsHoodieIncrSource extends
HoodieIncrSource {
private final Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitter;
- public static class Config {
- // control whether we do existence check for files before consuming them
- @Deprecated
- static final String ENABLE_EXISTS_CHECK =
S3_INCR_ENABLE_EXISTS_CHECK.key();
- @Deprecated
- static final Boolean DEFAULT_ENABLE_EXISTS_CHECK =
S3_INCR_ENABLE_EXISTS_CHECK.defaultValue();
-
- @Deprecated
- static final String S3_FS_PREFIX =
S3EventsHoodieIncrSourceConfig.S3_FS_PREFIX.key();
-
- /**
- * {@link #SPARK_DATASOURCE_OPTIONS} is json string, passed to the reader
while loading dataset.
- * Example Hudi Streamer conf
- * - --hoodie-conf
hoodie.streamer.source.s3incr.spark.datasource.options={"header":"true","encoding":"UTF-8"}
- */
- @Deprecated
- public static final String SPARK_DATASOURCE_OPTIONS =
S3EventsHoodieIncrSourceConfig.SPARK_DATASOURCE_OPTIONS.key();
+ public S3EventsHoodieIncrSource(
+ TypedProperties props,
+ JavaSparkContext sparkContext,
+ SparkSession sparkSession,
+ SchemaProvider schemaProvider) {
+ this(props, sparkContext, sparkSession, new QueryRunner(sparkSession,
props),
+ new CloudDataFetcher(props, sparkContext, sparkSession), new
DefaultStreamContext(schemaProvider, Option.empty()));
}
public S3EventsHoodieIncrSource(
TypedProperties props,
JavaSparkContext sparkContext,
SparkSession sparkSession,
- SchemaProvider schemaProvider) {
- this(props, sparkContext, sparkSession, schemaProvider, new
QueryRunner(sparkSession, props),
- new CloudDataFetcher(props));
+ StreamContext streamContext) {
+ this(props, sparkContext, sparkSession, new QueryRunner(sparkSession,
props),
+ new CloudDataFetcher(props, sparkContext, sparkSession),
streamContext);
}
public S3EventsHoodieIncrSource(
TypedProperties props,
JavaSparkContext sparkContext,
SparkSession sparkSession,
- SchemaProvider schemaProvider,
QueryRunner queryRunner,
- CloudDataFetcher cloudDataFetcher) {
- super(props, sparkContext, sparkSession, schemaProvider);
+ CloudDataFetcher cloudDataFetcher,
+ StreamContext streamContext) {
+ super(props, sparkContext, sparkSession, streamContext);
checkRequiredConfigProperties(props,
Collections.singletonList(HOODIE_SRC_BASE_PATH));
this.srcPath = getStringWithAltKeys(props, HOODIE_SRC_BASE_PATH);
this.numInstantsPerFetch = getIntWithAltKeys(props,
NUM_INSTANTS_PER_FETCH);
- this.checkIfFileExists = getBooleanWithAltKeys(props, ENABLE_EXISTS_CHECK);
this.missingCheckpointStrategy = getMissingCheckpointStrategy(props);
this.queryRunner = queryRunner;
this.cloudDataFetcher = cloudDataFetcher;
- this.schemaProvider = Option.ofNullable(schemaProvider);
+ this.schemaProvider = Option.ofNullable(streamContext.getSchemaProvider());
this.snapshotLoadQuerySplitter =
SnapshotLoadQuerySplitter.getInstance(props);
}
@@ -144,36 +123,6 @@ public class S3EventsHoodieIncrSource extends
HoodieIncrSource {
LOG.warn("Already caught up. No new data to process");
return Pair.of(Option.empty(), queryInfo.getEndInstant());
}
- Pair<QueryInfo, Dataset<Row>> queryInfoDatasetPair =
queryRunner.run(queryInfo, snapshotLoadQuerySplitter);
- queryInfo = queryInfoDatasetPair.getLeft();
- Dataset<Row> filteredSourceData = queryInfoDatasetPair.getRight().filter(
-
CloudObjectsSelectorCommon.generateFilter(CloudObjectsSelectorCommon.Type.S3,
props));
-
- LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based
on sourceLimit :" + sourceLimit);
- Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset
=
- IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
- filteredSourceData, sourceLimit, queryInfo,
cloudObjectIncrCheckpoint);
- if (!checkPointAndDataset.getRight().isPresent()) {
- LOG.info("Empty source, returning endpoint:" +
checkPointAndDataset.getLeft());
- return Pair.of(Option.empty(),
checkPointAndDataset.getLeft().toString());
- }
- LOG.info("Adjusted end checkpoint :" + checkPointAndDataset.getLeft());
-
- String s3FS = getStringWithAltKeys(props, S3_FS_PREFIX,
true).toLowerCase();
- String s3Prefix = s3FS + "://";
-
- // Create S3 paths
- StorageConfiguration<Configuration> storageConf =
HadoopFSUtils.getStorageConfWithCopy(sparkContext.hadoopConfiguration());
- List<CloudObjectMetadata> cloudObjectMetadata =
checkPointAndDataset.getRight().get()
- .select(CloudObjectsSelectorCommon.S3_BUCKET_NAME,
- CloudObjectsSelectorCommon.S3_OBJECT_KEY,
- CloudObjectsSelectorCommon.S3_OBJECT_SIZE)
- .distinct()
- .mapPartitions(getCloudObjectMetadataPerPartition(s3Prefix,
storageConf, checkIfFileExists), Encoders.kryo(CloudObjectMetadata.class))
- .collectAsList();
- LOG.info("Total number of files to process :" +
cloudObjectMetadata.size());
-
- Option<Dataset<Row>> datasetOption =
cloudDataFetcher.getCloudObjectDataDF(sparkSession, cloudObjectMetadata, props,
schemaProvider);
- return Pair.of(datasetOption, checkPointAndDataset.getLeft().toString());
+ return cloudDataFetcher.fetchPartitionedSource(S3,
cloudObjectIncrCheckpoint, this.sourceProfileSupplier,
queryRunner.run(queryInfo, snapshotLoadQuerySplitter), this.schemaProvider,
sourceLimit);
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
index ed1a49e33e7..06fb89da9a4 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
@@ -21,8 +21,11 @@ package org.apache.hudi.utilities.sources.helpers;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
+import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
@@ -32,10 +35,13 @@ import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.List;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_MAX_FILE_SIZE;
+import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static
org.apache.hudi.utilities.config.CloudSourceConfig.DATAFILE_FORMAT;
+import static
org.apache.hudi.utilities.config.CloudSourceConfig.ENABLE_EXISTS_CHECK;
+import static
org.apache.hudi.utilities.config.CloudSourceConfig.SOURCE_MAX_BYTES_PER_PARTITION;
import static
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.SOURCE_FILE_FORMAT;
-import static
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.loadAsDataset;
/**
* Connects to S3/GCS from Spark and downloads data from a given list of files.
@@ -45,14 +51,24 @@ public class CloudDataFetcher implements Serializable {
private static final String EMPTY_STRING = "";
- private final TypedProperties props;
+ private transient TypedProperties props;
+ private transient JavaSparkContext sparkContext;
+ private transient SparkSession sparkSession;
+ private transient CloudObjectsSelectorCommon cloudObjectsSelectorCommon;
private static final Logger LOG =
LoggerFactory.getLogger(CloudDataFetcher.class);
private static final long serialVersionUID = 1L;
- public CloudDataFetcher(TypedProperties props) {
+ public CloudDataFetcher(TypedProperties props, JavaSparkContext jsc,
SparkSession sparkSession) {
+ this(props, jsc, sparkSession, new CloudObjectsSelectorCommon(props));
+ }
+
+ public CloudDataFetcher(TypedProperties props, JavaSparkContext jsc,
SparkSession sparkSession, CloudObjectsSelectorCommon
cloudObjectsSelectorCommon) {
this.props = props;
+ this.sparkContext = jsc;
+ this.sparkSession = sparkSession;
+ this.cloudObjectsSelectorCommon = cloudObjectsSelectorCommon;
}
public static String getFileFormat(TypedProperties props) {
@@ -63,8 +79,59 @@ public class CloudDataFetcher implements Serializable {
: getStringWithAltKeys(props, DATAFILE_FORMAT, EMPTY_STRING);
}
- public Option<Dataset<Row>> getCloudObjectDataDF(SparkSession spark,
List<CloudObjectMetadata> cloudObjectMetadata,
- TypedProperties props,
Option<SchemaProvider> schemaProviderOption) {
- return loadAsDataset(spark, cloudObjectMetadata, props,
getFileFormat(props), schemaProviderOption);
+ public Pair<Option<Dataset<Row>>, String> fetchPartitionedSource(
+ CloudObjectsSelectorCommon.Type cloudType,
+ CloudObjectIncrCheckpoint cloudObjectIncrCheckpoint,
+ Option<SourceProfileSupplier> sourceProfileSupplier,
+ Pair<QueryInfo, Dataset<Row>> queryInfoDatasetPair,
+ Option<SchemaProvider> schemaProvider,
+ long sourceLimit) {
+ boolean isSourceProfileSupplierAvailable =
sourceProfileSupplier.isPresent() &&
sourceProfileSupplier.get().getSourceProfile() != null;
+ if (isSourceProfileSupplierAvailable) {
+ LOG.debug("Using source limit from source profile sourceLimitFromConfig
{} sourceLimitFromProfile {}", sourceLimit,
sourceProfileSupplier.get().getSourceProfile().getMaxSourceBytes());
+ sourceLimit =
sourceProfileSupplier.get().getSourceProfile().getMaxSourceBytes();
+ }
+
+ QueryInfo queryInfo = queryInfoDatasetPair.getLeft();
+ String filter = CloudObjectsSelectorCommon.generateFilter(cloudType,
props);
+ LOG.info("Adding filter string to Dataset: " + filter);
+ Dataset<Row> filteredSourceData =
queryInfoDatasetPair.getRight().filter(filter);
+
+ LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based
on sourceLimit :" + sourceLimit);
+ Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset
=
+ IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
+ filteredSourceData, sourceLimit, queryInfo,
cloudObjectIncrCheckpoint);
+ if (!checkPointAndDataset.getRight().isPresent()) {
+ LOG.info("Empty source, returning endpoint:" +
checkPointAndDataset.getLeft());
+ return Pair.of(Option.empty(),
checkPointAndDataset.getLeft().toString());
+ }
+ LOG.info("Adjusted end checkpoint :" + checkPointAndDataset.getLeft());
+
+ boolean checkIfFileExists = getBooleanWithAltKeys(props,
ENABLE_EXISTS_CHECK);
+ List<CloudObjectMetadata> cloudObjectMetadata =
CloudObjectsSelectorCommon.getObjectMetadata(cloudType, sparkContext,
checkPointAndDataset.getRight().get(), checkIfFileExists, props);
+ LOG.info("Total number of files to process :" +
cloudObjectMetadata.size());
+
+ long bytesPerPartition =
props.containsKey(SOURCE_MAX_BYTES_PER_PARTITION.key()) ?
props.getLong(SOURCE_MAX_BYTES_PER_PARTITION.key()) :
+ props.getLong(PARQUET_MAX_FILE_SIZE.key(),
Long.parseLong(PARQUET_MAX_FILE_SIZE.defaultValue()));
+ if (isSourceProfileSupplierAvailable) {
+ long bytesPerPartitionFromProfile = (long)
sourceProfileSupplier.get().getSourceProfile().getSourceSpecificContext();
+ if (bytesPerPartitionFromProfile > 0) {
+ LOG.debug("Using bytesPerPartition from source profile
bytesPerPartitionFromConfig {} bytesPerPartitionFromProfile {}",
bytesPerPartition, bytesPerPartitionFromProfile);
+ bytesPerPartition = bytesPerPartitionFromProfile;
+ }
+ }
+ Option<Dataset<Row>> datasetOption =
getCloudObjectDataDF(cloudObjectMetadata, schemaProvider, bytesPerPartition);
+ return Pair.of(datasetOption, checkPointAndDataset.getLeft().toString());
+ }
+
+ private Option<Dataset<Row>> getCloudObjectDataDF(List<CloudObjectMetadata>
cloudObjectMetadata, Option<SchemaProvider> schemaProviderOption, long
bytesPerPartition) {
+ long totalSize = 0;
+ for (CloudObjectMetadata o : cloudObjectMetadata) {
+ totalSize += o.getSize();
+ }
+ // inflate 10% for potential hoodie meta fields
+ double totalSizeWithHoodieMetaFields = totalSize * 1.1;
+ int numPartitions = (int) Math.max(Math.ceil(totalSizeWithHoodieMetaFields
/ bytesPerPartition), 1);
+ return cloudObjectsSelectorCommon.loadAsDataset(sparkSession,
cloudObjectMetadata, getFileFormat(props), schemaProviderOption, numPartitions);
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
index 8a442455291..8aee9d92754 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
@@ -37,9 +37,11 @@ import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
@@ -53,7 +55,6 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_MAX_FILE_SIZE;
import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
@@ -62,8 +63,8 @@ import static
org.apache.hudi.utilities.config.CloudSourceConfig.IGNORE_RELATIVE
import static
org.apache.hudi.utilities.config.CloudSourceConfig.IGNORE_RELATIVE_PATH_SUBSTR;
import static
org.apache.hudi.utilities.config.CloudSourceConfig.PATH_BASED_PARTITION_FIELDS;
import static
org.apache.hudi.utilities.config.CloudSourceConfig.SELECT_RELATIVE_PATH_PREFIX;
-import static
org.apache.hudi.utilities.config.CloudSourceConfig.SOURCE_MAX_BYTES_PER_PARTITION;
import static
org.apache.hudi.utilities.config.CloudSourceConfig.SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT;
+import static
org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_FS_PREFIX;
import static
org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_PREFIX;
import static
org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_SUBSTRING;
import static
org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_KEY_PREFIX;
@@ -85,6 +86,13 @@ public class CloudObjectsSelectorCommon {
public static final String GCS_OBJECT_KEY = "name";
public static final String GCS_OBJECT_SIZE = "size";
private static final String SPACE_DELIMTER = " ";
+ private static final String GCS_PREFIX = "gs://";
+
+ private final TypedProperties properties;
+
+ public CloudObjectsSelectorCommon(TypedProperties properties) {
+ this.properties = properties;
+ }
/**
* Return a function that extracts filepaths from a list of Rows.
@@ -205,8 +213,40 @@ public class CloudObjectsSelectorCommon {
return filter.toString();
}
- public static Option<Dataset<Row>> loadAsDataset(SparkSession spark,
List<CloudObjectMetadata> cloudObjectMetadata,
- TypedProperties props,
String fileFormat, Option<SchemaProvider> schemaProviderOption) {
+ /**
+ * @param cloudObjectMetadataDF a Dataset that contains metadata of S3/GCS
objects. Assumed to be a persisted form
+ * of a Cloud Storage SQS/PubSub Notification
event.
+ * @param checkIfExists Check if each file exists, before returning
its full path
+ * @return A {@link List} of {@link CloudObjectMetadata} containing file
info.
+ */
+ public static List<CloudObjectMetadata> getObjectMetadata(
+ Type type,
+ JavaSparkContext jsc,
+ Dataset<Row> cloudObjectMetadataDF,
+ boolean checkIfExists,
+ TypedProperties props
+ ) {
+ StorageConfiguration<Configuration> storageConf =
HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration());
+ if (type == Type.GCS) {
+ return cloudObjectMetadataDF
+ .select("bucket", "name", "size")
+ .distinct()
+ .mapPartitions(getCloudObjectMetadataPerPartition(GCS_PREFIX,
storageConf, checkIfExists), Encoders.kryo(CloudObjectMetadata.class))
+ .collectAsList();
+ } else if (type == Type.S3) {
+ String s3FS = getStringWithAltKeys(props, S3_FS_PREFIX,
true).toLowerCase();
+ String s3Prefix = s3FS + "://";
+ return cloudObjectMetadataDF
+ .select(CloudObjectsSelectorCommon.S3_BUCKET_NAME,
CloudObjectsSelectorCommon.S3_OBJECT_KEY,
CloudObjectsSelectorCommon.S3_OBJECT_SIZE)
+ .distinct()
+ .mapPartitions(getCloudObjectMetadataPerPartition(s3Prefix,
storageConf, checkIfExists), Encoders.kryo(CloudObjectMetadata.class))
+ .collectAsList();
+ }
+ throw new UnsupportedOperationException("Invalid cloud type " + type);
+ }
+
+ public Option<Dataset<Row>> loadAsDataset(SparkSession spark,
List<CloudObjectMetadata> cloudObjectMetadata,
+ String fileFormat,
Option<SchemaProvider> schemaProviderOption, int numPartitions) {
if (LOG.isDebugEnabled()) {
LOG.debug("Extracted distinct files " + cloudObjectMetadata.size()
+ " and some samples " +
cloudObjectMetadata.stream().map(CloudObjectMetadata::getPath).limit(10).collect(Collectors.toList()));
@@ -216,7 +256,7 @@ public class CloudObjectsSelectorCommon {
return Option.empty();
}
DataFrameReader reader = spark.read().format(fileFormat);
- String datasourceOpts = getStringWithAltKeys(props,
CloudSourceConfig.SPARK_DATASOURCE_OPTIONS, true);
+ String datasourceOpts = getStringWithAltKeys(properties,
CloudSourceConfig.SPARK_DATASOURCE_OPTIONS, true);
if (schemaProviderOption.isPresent()) {
Schema sourceSchema = schemaProviderOption.get().getSourceSchema();
if (sourceSchema != null &&
!sourceSchema.equals(InputBatch.NULL_SCHEMA)) {
@@ -225,7 +265,7 @@ public class CloudObjectsSelectorCommon {
}
if (StringUtils.isNullOrEmpty(datasourceOpts)) {
// fall back to legacy config for BWC. TODO consolidate in HUDI-6020
- datasourceOpts = getStringWithAltKeys(props,
S3EventsHoodieIncrSourceConfig.SPARK_DATASOURCE_OPTIONS, true);
+ datasourceOpts = getStringWithAltKeys(properties,
S3EventsHoodieIncrSourceConfig.SPARK_DATASOURCE_OPTIONS, true);
}
if (StringUtils.nonEmpty(datasourceOpts)) {
final ObjectMapper mapper = new ObjectMapper();
@@ -239,18 +279,10 @@ public class CloudObjectsSelectorCommon {
reader = reader.options(sparkOptionsMap);
}
List<String> paths = new ArrayList<>();
- long totalSize = 0;
for (CloudObjectMetadata o : cloudObjectMetadata) {
paths.add(o.getPath());
- totalSize += o.getSize();
}
- // inflate 10% for potential hoodie meta fields
- totalSize *= 1.1;
- // if source bytes are provided, then give preference to that.
- long bytesPerPartition =
props.containsKey(SOURCE_MAX_BYTES_PER_PARTITION.key()) ?
props.getLong(SOURCE_MAX_BYTES_PER_PARTITION.key()) :
- props.getLong(PARQUET_MAX_FILE_SIZE.key(),
Long.parseLong(PARQUET_MAX_FILE_SIZE.defaultValue()));
- int numPartitions = (int) Math.max(Math.ceil(totalSize /
bytesPerPartition), 1);
- boolean isCommaSeparatedPathFormat =
props.getBoolean(SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT.key(),
false);
+ boolean isCommaSeparatedPathFormat =
properties.getBoolean(SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT.key(),
false);
Dataset<Row> dataset;
if (isCommaSeparatedPathFormat) {
@@ -260,8 +292,8 @@ public class CloudObjectsSelectorCommon {
}
// add partition column from source path if configured
- if (containsConfigProperty(props, PATH_BASED_PARTITION_FIELDS)) {
- String[] partitionKeysToAdd = getStringWithAltKeys(props,
PATH_BASED_PARTITION_FIELDS).split(",");
+ if (containsConfigProperty(properties, PATH_BASED_PARTITION_FIELDS)) {
+ String[] partitionKeysToAdd = getStringWithAltKeys(properties,
PATH_BASED_PARTITION_FIELDS).split(",");
// Add partition column for all path-based partition keys. If key is not
present in path, the value will be null.
for (String partitionKey : partitionKeysToAdd) {
String partitionPathPattern = String.format("%s=", partitionKey);
@@ -284,10 +316,6 @@ public class CloudObjectsSelectorCommon {
return dataset;
}
- public static Option<Dataset<Row>> loadAsDataset(SparkSession spark,
List<CloudObjectMetadata> cloudObjectMetadata, TypedProperties props, String
fileFormat) {
- return loadAsDataset(spark, cloudObjectMetadata, props, fileFormat,
Option.empty());
- }
-
private static Option<String> getPropVal(TypedProperties props,
ConfigProperty<String> configProperty) {
String value = getStringWithAltKeys(props, configProperty, true);
if (!StringUtils.isNullOrEmpty(value)) {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
deleted file mode 100644
index 21ca334d05f..00000000000
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.utilities.sources.helpers.gcs;
-
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.storage.StorageConfiguration;
-import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata;
-import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
-import org.apache.spark.sql.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.List;
-
-import static
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.getCloudObjectMetadataPerPartition;
-
-/**
- * Extracts a list of GCS {@link CloudObjectMetadata} containing metadata of
GCS objects from a given Spark Dataset as input.
- * Optionally:
- * i) Match the filename and path against provided input filter strings
- * ii) Check if each file exists on GCS, in which case it assumes SparkContext
is already
- * configured with GCS options through
GcsEventsHoodieIncrSource.addGcsAccessConfs().
- */
-public class GcsObjectMetadataFetcher implements Serializable {
-
- private final TypedProperties props;
-
- private static final String GCS_PREFIX = "gs://";
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG =
LoggerFactory.getLogger(GcsObjectMetadataFetcher.class);
-
- public GcsObjectMetadataFetcher(TypedProperties props) {
- this.props = props;
- }
-
- /**
- * @param cloudObjectMetadataDF a Dataset that contains metadata of GCS
objects. Assumed to be a persisted form
- * of a Cloud Storage Pubsub Notification event.
- * @param checkIfExists Check if each file exists, before returning
its full path
- * @return A {@link List} of {@link CloudObjectMetadata} containing GCS info.
- */
- public List<CloudObjectMetadata> getGcsObjectMetadata(JavaSparkContext jsc,
Dataset<Row> cloudObjectMetadataDF, boolean checkIfExists) {
- StorageConfiguration<Configuration> storageConf =
HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration());
- return cloudObjectMetadataDF
- .select("bucket", "name", "size")
- .distinct()
- .mapPartitions(getCloudObjectMetadataPerPartition(GCS_PREFIX,
storageConf, checkIfExists), Encoders.kryo(CloudObjectMetadata.class))
- .collectAsList();
- }
-
- /**
- * @param cloudObjectMetadataDF a Dataset that contains metadata of GCS
objects. Assumed to be a persisted form
- * of a Cloud Storage Pubsub Notification event.
- * @return Dataset<Row> after apply the filtering.
- */
- public Dataset<Row> applyFilter(Dataset<Row> cloudObjectMetadataDF) {
- String filter =
CloudObjectsSelectorCommon.generateFilter(CloudObjectsSelectorCommon.Type.GCS,
props);
- LOG.info("Adding filter string to Dataset: " + filter);
-
- return cloudObjectMetadataDF.filter(filter);
- }
-}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
index 8d529fda073..dda205db8f8 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
@@ -36,14 +36,19 @@ import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.config.CloudSourceConfig;
+import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
+import
org.apache.hudi.utilities.sources.TestS3EventsHoodieIncrSource.TestSourceProfile;
import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.hudi.utilities.sources.helpers.QueryRunner;
-import org.apache.hudi.utilities.sources.helpers.gcs.GcsObjectMetadataFetcher;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
+import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -61,8 +66,8 @@ import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
-import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,8 +83,12 @@ import java.util.stream.Collectors;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.times;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -94,13 +103,14 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
@TempDir
protected java.nio.file.Path tempDir;
- @Mock
- CloudDataFetcher gcsObjectDataFetcher;
-
@Mock
QueryRunner queryRunner;
@Mock
QueryInfo queryInfo;
+ @Mock
+ CloudObjectsSelectorCommon cloudObjectsSelectorCommon;
+ @Mock
+ SourceProfileSupplier sourceProfileSupplier;
protected Option<SchemaProvider> schemaProvider;
private HoodieTableMetaClient metaClient;
@@ -133,9 +143,6 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
Pair<String, List<HoodieRecord>> inserts =
writeGcsMetadataRecords(commitTimeForWrites);
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
100L, inserts.getKey());
-
- verify(gcsObjectDataFetcher, times(0)).getCloudObjectDataDF(
- Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider));
}
@Test
@@ -151,7 +158,7 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
setMockQueryRunner(inputDs);
-
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(null);
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
100L, "1#path/to/file1.json");
}
@@ -170,7 +177,7 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
setMockQueryRunner(inputDs);
-
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(null);
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
250L, "1#path/to/file2.json");
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"),
250L, "1#path/to/file3.json");
}
@@ -193,7 +200,7 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
setMockQueryRunner(inputDs);
-
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(null);
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
250L, "1#path/to/file10006.json");
readAndAssert(READ_UPTO_LATEST_COMMIT,
Option.of("1#path/to/file10006.json"), 250L, "1#path/to/file10007.json");
}
@@ -227,15 +234,20 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file5%s",
extension), 150L, "2"));
Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
-
+ List<Long> bytesPerPartition = Arrays.asList(10L, 100L, -1L);
setMockQueryRunner(inputDs);
- readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 100L,
- "1#path/to/file1" + extension, typedProperties);
- readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1" +
extension), 100L,
- "1#path/to/file2" + extension, typedProperties);
- readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2" +
extension), 1000L,
- "2#path/to/file5" + extension, typedProperties);
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(new
TestSourceProfile(100L, bytesPerPartition.get(0)));
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 100L,
"1#path/to/file1" + extension, typedProperties);
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(new
TestSourceProfile(100L, bytesPerPartition.get(1)));
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1" +
extension), 100L, "1#path/to/file2" + extension, typedProperties);
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(new
TestSourceProfile(1000L, bytesPerPartition.get(2)));
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2" +
extension), 1000L, "2#path/to/file5" + extension, typedProperties);
+ // Verify the partitions being passed in getCloudObjectDataDF are correct.
+ List<Integer> numPartitions = Arrays.asList(12, 2, 1);
+ ArgumentCaptor<Integer> argumentCaptor =
ArgumentCaptor.forClass(Integer.class);
+ verify(cloudObjectsSelectorCommon, atLeastOnce()).loadAsDataset(any(),
any(), any(), eq(schemaProvider), argumentCaptor.capture());
+ Assertions.assertEquals(numPartitions, argumentCaptor.getAllValues());
}
@ParameterizedTest
@@ -264,15 +276,41 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
setMockQueryRunner(inputDs, Option.of(snapshotCheckPoint));
TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
typedProperties.setProperty("hoodie.streamer.source.cloud.data.ignore.relpath.prefix",
"path/to/skip");
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(null);
+ List<Long> bytesPerPartition = Arrays.asList(10L, 20L, -1L, 1000L * 1000L
* 1000L);
+
//1. snapshot query, read all records
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(new
TestSourceProfile(50000L, bytesPerPartition.get(0)));
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50000L, exptected1,
typedProperties);
//2. incremental query, as commit is present in timeline
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(new
TestSourceProfile(10L, bytesPerPartition.get(1)));
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(exptected1), 10L,
exptected2, typedProperties);
//3. snapshot query with source limit less than first commit size
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(new
TestSourceProfile(50L, bytesPerPartition.get(2)));
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected3,
typedProperties);
typedProperties.setProperty("hoodie.streamer.source.cloud.data.ignore.relpath.prefix",
"path/to");
//4. As snapshotQuery will return 1 -> same would be return as
nextCheckpoint (dataset is empty due to ignore prefix).
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(new
TestSourceProfile(50L, bytesPerPartition.get(3)));
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected4,
typedProperties);
+ // Verify the partitions being passed in getCloudObjectDataDF are correct.
+ ArgumentCaptor<Integer> argumentCaptor =
ArgumentCaptor.forClass(Integer.class);
+ verify(cloudObjectsSelectorCommon, atLeastOnce()).loadAsDataset(any(),
any(), any(), eq(schemaProvider), argumentCaptor.capture());
+ if (snapshotCheckPoint.equals("1") || snapshotCheckPoint.equals("2")) {
+ Assertions.assertEquals(Arrays.asList(12, 3, 1),
argumentCaptor.getAllValues());
+ } else {
+ Assertions.assertEquals(Arrays.asList(23, 1),
argumentCaptor.getAllValues());
+ }
+ }
+
+ @Test
+ public void testCreateSource() throws IOException {
+ TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
+ HoodieIngestionMetrics metrics = mock(HoodieIngestionMetrics.class);
+ Source gcsSource =
UtilHelpers.createSource(GcsEventsHoodieIncrSource.class.getName(),
typedProperties, jsc(), spark(), metrics,
+ new DefaultStreamContext(schemaProvider.orElse(null),
Option.of(sourceProfileSupplier)));
+ assertEquals(Source.SourceType.ROW, gcsSource.getSourceType());
+ assertThrows(IOException.class, () ->
UtilHelpers.createSource(GcsEventsHoodieIncrSource.class.getName(), new
TypedProperties(), jsc(), spark(), metrics,
+ new DefaultStreamContext(schemaProvider.orElse(null),
Option.of(sourceProfileSupplier))));
}
private void setMockQueryRunner(Dataset<Row> inputDs) {
@@ -281,7 +319,7 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
private void setMockQueryRunner(Dataset<Row> inputDs, Option<String>
nextCheckPointOpt) {
- when(queryRunner.run(Mockito.any(QueryInfo.class),
Mockito.any())).thenAnswer(invocation -> {
+ when(queryRunner.run(any(QueryInfo.class), any())).thenAnswer(invocation
-> {
QueryInfo queryInfo = invocation.getArgument(0);
QueryInfo updatedQueryInfo = nextCheckPointOpt.map(nextCheckPoint ->
queryInfo.withUpdatedEndInstant(nextCheckPoint))
@@ -302,7 +340,8 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
TypedProperties typedProperties) {
GcsEventsHoodieIncrSource incrSource = new
GcsEventsHoodieIncrSource(typedProperties, jsc(),
- spark(), schemaProvider.orElse(null), new
GcsObjectMetadataFetcher(typedProperties), gcsObjectDataFetcher, queryRunner);
+ spark(), new CloudDataFetcher(typedProperties, jsc(), spark(),
cloudObjectsSelectorCommon), queryRunner,
+ new DefaultStreamContext(schemaProvider.orElse(null),
Option.of(sourceProfileSupplier)));
Pair<Option<Dataset<Row>>, String> dataAndCheckpoint =
incrSource.fetchNextBatch(checkpointToPull, sourceLimit);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
index 38b6f69c361..ca4d6fec892 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
@@ -44,6 +44,7 @@ import
org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.hudi.utilities.sources.helpers.TestSnapshotQuerySplitterImpl;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.avro.Schema;
import org.apache.spark.SparkConf;
@@ -344,7 +345,7 @@ public class TestHoodieIncrSource extends
SparkClientFunctionalTestHarness {
snapshotCheckPointImplClassOpt.map(className ->
properties.setProperty(SnapshotLoadQuerySplitter.Config.SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME,
className));
TypedProperties typedProperties = new TypedProperties(properties);
- HoodieIncrSource incrSource = new HoodieIncrSource(typedProperties, jsc(),
spark(), new DummySchemaProvider(HoodieTestDataGenerator.AVRO_SCHEMA));
+ HoodieIncrSource incrSource = new HoodieIncrSource(typedProperties, jsc(),
spark(), new DefaultStreamContext(new
DummySchemaProvider(HoodieTestDataGenerator.AVRO_SCHEMA), Option.empty()));
// read everything until latest
Pair<Option<Dataset<Row>>, String> batchCheckPoint =
incrSource.fetchNextBatch(checkpointToPull, 500);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
index 553078ff3fc..be26dfb1f3b 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
@@ -36,14 +36,20 @@ import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.config.CloudSourceConfig;
+import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.hudi.utilities.sources.helpers.QueryRunner;
import
org.apache.hudi.utilities.sources.helpers.TestCloudObjectsSelectorCommon;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
+import org.apache.hudi.utilities.streamer.SourceProfile;
+import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -61,6 +67,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
@@ -68,6 +75,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -76,7 +84,10 @@ import java.util.stream.Collectors;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT;
-import static org.mockito.ArgumentMatchers.eq;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@@ -93,7 +104,9 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
@Mock
QueryRunner mockQueryRunner;
@Mock
- CloudDataFetcher mockCloudDataFetcher;
+ CloudObjectsSelectorCommon mockCloudObjectsSelectorCommon;
+ @Mock
+ SourceProfileSupplier sourceProfileSupplier;
@Mock
QueryInfo queryInfo;
private JavaSparkContext jsc;
@@ -257,8 +270,8 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
setMockQueryRunner(inputDs);
- when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(),
Mockito.any(), Mockito.any(), eq(schemaProvider)))
- .thenReturn(Option.empty());
+ when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(),
Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider),
Mockito.anyInt())).thenReturn(Option.empty());
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(null);
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
100L, "1#path/to/file1.json");
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1.json"),
200L, "1#path/to/file2.json");
@@ -282,8 +295,8 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
setMockQueryRunner(inputDs);
- when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(),
Mockito.any(), Mockito.any(), eq(schemaProvider)))
- .thenReturn(Option.empty());
+ when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(),
Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider),
Mockito.anyInt())).thenReturn(Option.empty());
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(null);
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
250L, "1#path/to/file2.json");
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"),
250L, "1#path/to/file3.json");
@@ -322,15 +335,15 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
setMockQueryRunner(inputDs);
- when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(),
Mockito.any(), Mockito.any(), eq(schemaProvider)))
- .thenReturn(Option.empty());
+ when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(),
Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider),
Mockito.anyInt())).thenReturn(Option.empty());
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(null);
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 100L,
- "1#path/to/file1" + extension, typedProperties);
+ "1#path/to/file1" + extension, typedProperties);
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1" +
extension), 100L,
- "1#path/to/file2" + extension, typedProperties);
+ "1#path/to/file2" + extension, typedProperties);
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2" +
extension), 1000L,
- "2#path/to/file5" + extension, typedProperties);
+ "2#path/to/file5" + extension, typedProperties);
}
@Test
@@ -363,8 +376,9 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("2"), 1000L, "2",
typedProperties);
}
- @Test
- public void testFilterAnEntireCommit() throws IOException {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testFilterAnEntireCommit(boolean useSourceProfile) throws
IOException {
String commitTimeForWrites1 = "2";
String commitTimeForReads = "1";
@@ -385,16 +399,22 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
setMockQueryRunner(inputDs);
- when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(),
Mockito.any(), Mockito.any(), eq(schemaProvider)))
- .thenReturn(Option.empty());
+ SourceProfile<Long> sourceProfile = new TestSourceProfile(50L, 10L);
+ when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(),
Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider),
Mockito.anyInt())).thenReturn(Option.empty());
+ if (useSourceProfile) {
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(sourceProfile);
+ } else {
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(null);
+ }
TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix",
"path/to/skip");
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 50L,
"2#path/to/file4.json", typedProperties);
}
- @Test
- public void testFilterAnEntireMiddleCommit() throws IOException {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testFilterAnEntireMiddleCommit(boolean useSourceProfile) throws
IOException {
String commitTimeForWrites1 = "2";
String commitTimeForWrites2 = "3";
String commitTimeForReads = "1";
@@ -417,16 +437,21 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
setMockQueryRunner(inputDs);
- when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(),
Mockito.any(), Mockito.any(), eq(schemaProvider)))
- .thenReturn(Option.empty());
+ when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(),
Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider),
Mockito.anyInt())).thenReturn(Option.empty());
+ SourceProfile<Long> sourceProfile = new TestSourceProfile(50L, 10L);
+ if (useSourceProfile) {
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(sourceProfile);
+ } else {
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(null);
+ }
+
TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix",
"path/to/skip");
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file3.json"),
50L, "3#path/to/file4.json", typedProperties);
schemaProvider = Option.empty();
- when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(),
Mockito.any(), Mockito.any(), eq(schemaProvider)))
- .thenReturn(Option.empty());
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(null);
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file3.json"),
50L, "3#path/to/file4.json", typedProperties);
}
@@ -454,26 +479,50 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
setMockQueryRunner(inputDs, Option.of(snapshotCheckPoint));
- when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(),
Mockito.any(), Mockito.any(), eq(schemaProvider)))
- .thenReturn(Option.empty());
+ when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(),
Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider),
Mockito.anyInt())).thenReturn(Option.empty());
TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix",
"path/to/skip");
+ List<Long> bytesPerPartition = Arrays.asList(10L, 20L, -1L, 1000L * 1000L
* 1000L);
+
//1. snapshot query, read all records
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(new
TestSourceProfile(50000L, bytesPerPartition.get(0)));
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50000L, exptected1,
typedProperties);
//2. incremental query, as commit is present in timeline
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(new
TestSourceProfile(10L, bytesPerPartition.get(1)));
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(exptected1), 10L,
exptected2, typedProperties);
//3. snapshot query with source limit less than first commit size
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(new
TestSourceProfile(50L, bytesPerPartition.get(2)));
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected3,
typedProperties);
typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix",
"path/to");
//4. As snapshotQuery will return 1 -> same would be return as
nextCheckpoint (dataset is empty due to ignore prefix).
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(new
TestSourceProfile(50L, bytesPerPartition.get(3)));
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected4,
typedProperties);
+ // Verify the partitions being passed in getCloudObjectDataDF are correct.
+ ArgumentCaptor<Integer> argumentCaptor =
ArgumentCaptor.forClass(Integer.class);
+ verify(mockCloudObjectsSelectorCommon,
atLeastOnce()).loadAsDataset(Mockito.any(), Mockito.any(), Mockito.any(),
Mockito.eq(schemaProvider), argumentCaptor.capture());
+ List<Integer> numPartitions = Collections.emptyList();
+ if (snapshotCheckPoint.equals("1") || snapshotCheckPoint.equals("2")) {
+ Assertions.assertEquals(Arrays.asList(12, 3, 1),
argumentCaptor.getAllValues());
+ } else {
+ Assertions.assertEquals(Arrays.asList(23, 1),
argumentCaptor.getAllValues());
+ }
+ }
+
+ @Test
+ public void testCreateSource() throws IOException {
+ TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
+ HoodieIngestionMetrics metrics = mock(HoodieIngestionMetrics.class);
+ Source s3Source =
UtilHelpers.createSource(S3EventsHoodieIncrSource.class.getName(),
typedProperties, jsc(), spark(), metrics,
+ new DefaultStreamContext(schemaProvider.orElse(null),
Option.of(sourceProfileSupplier)));
+ assertEquals(Source.SourceType.ROW, s3Source.getSourceType());
}
private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy
missingCheckpointStrategy,
Option<String> checkpointToPull, long
sourceLimit, String expectedCheckpoint,
TypedProperties typedProperties) {
S3EventsHoodieIncrSource incrSource = new
S3EventsHoodieIncrSource(typedProperties, jsc(),
- spark(), schemaProvider.orElse(null), mockQueryRunner,
mockCloudDataFetcher);
+ spark(), mockQueryRunner, new CloudDataFetcher(typedProperties, jsc(),
spark(), mockCloudObjectsSelectorCommon),
+ new DefaultStreamContext(schemaProvider.orElse(null),
Option.of(sourceProfileSupplier)));
Pair<Option<Dataset<Row>>, String> dataAndCheckpoint =
incrSource.fetchNextBatch(checkpointToPull, sourceLimit);
@@ -512,4 +561,30 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
readAndAssert(missingCheckpointStrategy, checkpointToPull, sourceLimit,
expectedCheckpoint, typedProperties);
}
-}
+
+ static class TestSourceProfile implements SourceProfile<Long> {
+
+ private final long maxSourceBytes;
+ private final long bytesPerPartition;
+
+ public TestSourceProfile(long maxSourceBytes, long bytesPerPartition) {
+ this.maxSourceBytes = maxSourceBytes;
+ this.bytesPerPartition = bytesPerPartition;
+ }
+
+ @Override
+ public long getMaxSourceBytes() {
+ return maxSourceBytes;
+ }
+
+ @Override
+ public int getSourcePartitions() {
+ throw new UnsupportedOperationException("getSourcePartitions is not
required for S3 source profile");
+ }
+
+ @Override
+ public Long getSourceSpecificContext() {
+ return bytesPerPartition;
+ }
+ }
+}
\ No newline at end of file
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
index a57383c43b2..9e5d3d1f132 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
@@ -26,6 +26,7 @@ import
org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
import org.apache.hudi.utilities.sources.InputBatch;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
@@ -118,7 +119,7 @@ public abstract class TestAbstractDebeziumSource extends
UtilitiesTestBase {
TypedProperties props = createPropsForJsonSource();
SchemaProvider schemaProvider = new MockSchemaRegistryProvider(props, jsc,
this);
- SourceFormatAdapter debeziumSource = new
SourceFormatAdapter(UtilHelpers.createSource(sourceClass, props, jsc,
sparkSession, schemaProvider, metrics));
+ SourceFormatAdapter debeziumSource = new
SourceFormatAdapter(UtilHelpers.createSource(sourceClass, props, jsc,
sparkSession, metrics, new DefaultStreamContext(schemaProvider,
Option.empty())));
testUtils.sendMessages(testTopicName, new String[]
{generateDebeziumEvent(operation).toString()});
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
index 79f15975cb5..4b30bb14b57 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
@@ -50,14 +50,16 @@ public class TestCloudObjectsSelectorCommon extends
HoodieSparkClientTestHarness
@Test
public void emptyMetadataReturnsEmptyOption() {
- Option<Dataset<Row>> result =
CloudObjectsSelectorCommon.loadAsDataset(sparkSession, Collections.emptyList(),
new TypedProperties(), "json");
+ CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new
CloudObjectsSelectorCommon(new TypedProperties());
+ Option<Dataset<Row>> result =
cloudObjectsSelectorCommon.loadAsDataset(sparkSession, Collections.emptyList(),
"json", Option.empty(), 1);
Assertions.assertFalse(result.isPresent());
}
@Test
public void filesFromMetadataRead() {
+ CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new
CloudObjectsSelectorCommon(new TypedProperties());
List<CloudObjectMetadata> input = Collections.singletonList(new
CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json",
1));
- Option<Dataset<Row>> result =
CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, new
TypedProperties(), "json");
+ Option<Dataset<Row>> result =
cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "json",
Option.empty(), 1);
Assertions.assertTrue(result.isPresent());
Assertions.assertEquals(1, result.get().count());
Row expected = RowFactory.create("some data");
@@ -70,7 +72,8 @@ public class TestCloudObjectsSelectorCommon extends
HoodieSparkClientTestHarness
TypedProperties properties = new TypedProperties();
properties.put("hoodie.streamer.source.cloud.data.partition.fields.from.path",
"country,state");
- Option<Dataset<Row>> result =
CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, properties,
"json");
+ CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new
CloudObjectsSelectorCommon(properties);
+ Option<Dataset<Row>> result =
cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "json",
Option.empty(), 1);
Assertions.assertTrue(result.isPresent());
Assertions.assertEquals(1, result.get().count());
Row expected = RowFactory.create("some data", "US", "CA");
@@ -85,27 +88,15 @@ public class TestCloudObjectsSelectorCommon extends
HoodieSparkClientTestHarness
props.put("hoodie.streamer.schemaprovider.source.schema.file",
schemaFilePath);
props.put("hoodie.streamer.schema.provider.class.name",
FilebasedSchemaProvider.class.getName());
props.put("hoodie.streamer.source.cloud.data.partition.fields.from.path",
"country,state");
+ CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new
CloudObjectsSelectorCommon(props);
List<CloudObjectMetadata> input = Collections.singletonList(new
CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json",
1));
- Option<Dataset<Row>> result =
CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, props, "json",
Option.of(new FilebasedSchemaProvider(props, jsc)));
+ Option<Dataset<Row>> result =
cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "json",
Option.of(new FilebasedSchemaProvider(props, jsc)), 1);
Assertions.assertTrue(result.isPresent());
Assertions.assertEquals(1, result.get().count());
Row expected = RowFactory.create("some data", "US", "CA");
Assertions.assertEquals(Collections.singletonList(expected),
result.get().collectAsList());
}
- @Test
- public void partitionKeyNotPresentInPath() {
- List<CloudObjectMetadata> input = Collections.singletonList(new
CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json",
1));
- TypedProperties properties = new TypedProperties();
-
properties.put("hoodie.streamer.source.cloud.data.reader.comma.separated.path.format",
"false");
-
properties.put("hoodie.streamer.source.cloud.data.partition.fields.from.path",
"unknown");
- Option<Dataset<Row>> result =
CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, properties,
"json");
- Assertions.assertTrue(result.isPresent());
- Assertions.assertEquals(1, result.get().count());
- Row expected = RowFactory.create("some data", null);
- Assertions.assertEquals(Collections.singletonList(expected),
result.get().collectAsList());
- }
-
@Test
public void loadDatasetWithSchemaAndRepartition() {
TypedProperties props = new TypedProperties();
@@ -121,10 +112,25 @@ public class TestCloudObjectsSelectorCommon extends
HoodieSparkClientTestHarness
new
CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=TX/data.json",
1000),
new
CloudObjectMetadata("src/test/resources/data/partitioned/country=IND/state=TS/data.json",
1000)
);
- Option<Dataset<Row>> result =
CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, props, "json",
Option.of(new FilebasedSchemaProvider(props, jsc)));
+ CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new
CloudObjectsSelectorCommon(props);
+ Option<Dataset<Row>> result =
cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "json",
Option.of(new FilebasedSchemaProvider(props, jsc)), 30);
Assertions.assertTrue(result.isPresent());
List<Row> expected = Arrays.asList(RowFactory.create("some data", "US",
"CA"), RowFactory.create("some data", "US", "TX"), RowFactory.create("some
data", "IND", "TS"));
List<Row> actual = result.get().collectAsList();
Assertions.assertEquals(new HashSet<>(expected), new HashSet<>(actual));
}
+
+ @Test
+ public void partitionKeyNotPresentInPath() {
+ List<CloudObjectMetadata> input = Collections.singletonList(new
CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json",
1));
+ TypedProperties properties = new TypedProperties();
+
properties.put("hoodie.deltastreamer.source.cloud.data.reader.comma.separated.path.format",
"false");
+
properties.put("hoodie.deltastreamer.source.cloud.data.partition.fields.from.path",
"unknown");
+ CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new
CloudObjectsSelectorCommon(properties);
+ Option<Dataset<Row>> result =
cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "json",
Option.empty(), 1);
+ Assertions.assertTrue(result.isPresent());
+ Assertions.assertEquals(1, result.get().count());
+ Row expected = RowFactory.create("some data", null);
+ Assertions.assertEquals(Collections.singletonList(expected),
result.get().collectAsList());
+ }
}