This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 0b75508a22ea20c00b25a6558e5044a86e468d38 Author: Vinish Reddy <[email protected]> AuthorDate: Mon May 13 07:23:31 2024 +0530 [HUDI-7501] Use source profile for S3 and GCS sources (#10861) Co-authored-by: Y Ethan Guo <[email protected]> --- .../org/apache/hudi/utilities/UtilHelpers.java | 53 ++++----- .../sources/GcsEventsHoodieIncrSource.java | 61 ++++------ .../hudi/utilities/sources/HoodieIncrSource.java | 6 +- .../apache/hudi/utilities/sources/RowSource.java | 8 +- .../sources/S3EventsHoodieIncrSource.java | 87 +++----------- .../sources/helpers/CloudDataFetcher.java | 79 ++++++++++++- .../helpers/CloudObjectsSelectorCommon.java | 70 ++++++++---- .../helpers/gcs/GcsObjectMetadataFetcher.java | 86 -------------- .../sources/TestGcsEventsHoodieIncrSource.java | 83 ++++++++++---- .../utilities/sources/TestHoodieIncrSource.java | 3 +- .../sources/TestS3EventsHoodieIncrSource.java | 125 ++++++++++++++++----- .../debezium/TestAbstractDebeziumSource.java | 3 +- .../helpers/TestCloudObjectsSelectorCommon.java | 42 ++++--- 13 files changed, 383 insertions(+), 323 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index 026bb621677..abf0558e5ff 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -40,6 +40,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieLockConfig; @@ -140,42 +141,30 @@ public class UtilHelpers { } public static Source createSource(String sourceClass, TypedProperties cfg, JavaSparkContext jssc, - SparkSession sparkSession, SchemaProvider schemaProvider, - HoodieIngestionMetrics metrics) throws IOException { - try { + SparkSession sparkSession, HoodieIngestionMetrics metrics, StreamContext streamContext) throws IOException { + // All possible constructors. + Class<?>[] constructorArgsStreamContextMetrics = new Class<?>[] {TypedProperties.class, JavaSparkContext.class, SparkSession.class, HoodieIngestionMetrics.class, StreamContext.class}; + Class<?>[] constructorArgsStreamContext = new Class<?>[] {TypedProperties.class, JavaSparkContext.class, SparkSession.class, StreamContext.class}; + Class<?>[] constructorArgsMetrics = new Class<?>[] {TypedProperties.class, JavaSparkContext.class, SparkSession.class, SchemaProvider.class, HoodieIngestionMetrics.class}; + Class<?>[] constructorArgs = new Class<?>[] {TypedProperties.class, JavaSparkContext.class, SparkSession.class, SchemaProvider.class}; + // List of constructor and their respective arguments. + List<Pair<Class<?>[], Object[]>> sourceConstructorAndArgs = new ArrayList<>(); + sourceConstructorAndArgs.add(Pair.of(constructorArgsStreamContextMetrics, new Object[] {cfg, jssc, sparkSession, metrics, streamContext})); + sourceConstructorAndArgs.add(Pair.of(constructorArgsStreamContext, new Object[] {cfg, jssc, sparkSession, streamContext})); + sourceConstructorAndArgs.add(Pair.of(constructorArgsMetrics, new Object[] {cfg, jssc, sparkSession, streamContext.getSchemaProvider(), metrics})); + sourceConstructorAndArgs.add(Pair.of(constructorArgs, new Object[] {cfg, jssc, sparkSession, streamContext.getSchemaProvider()})); + + HoodieException sourceClassLoadException = null; + for (Pair<Class<?>[], Object[]> constructor : sourceConstructorAndArgs) { try { - return (Source) ReflectionUtils.loadClass(sourceClass, - new Class<?>[] {TypedProperties.class, JavaSparkContext.class, - SparkSession.class, SchemaProvider.class, - HoodieIngestionMetrics.class}, - cfg, jssc, sparkSession, schemaProvider, metrics); + return (Source) ReflectionUtils.loadClass(sourceClass, constructor.getLeft(), constructor.getRight()); } catch (HoodieException e) { - return (Source) ReflectionUtils.loadClass(sourceClass, - new Class<?>[] {TypedProperties.class, JavaSparkContext.class, - SparkSession.class, SchemaProvider.class}, - cfg, jssc, sparkSession, schemaProvider); + sourceClassLoadException = e; + } catch (Throwable t) { + throw new IOException("Could not load source class " + sourceClass, t); } - } catch (Throwable e) { - throw new IOException("Could not load source class " + sourceClass, e); - } - } - - public static Source createSource(String sourceClass, TypedProperties cfg, JavaSparkContext jssc, - SparkSession sparkSession, HoodieIngestionMetrics metrics, StreamContext streamContext) - throws IOException { - try { - try { - return (Source) ReflectionUtils.loadClass(sourceClass, - new Class<?>[] {TypedProperties.class, JavaSparkContext.class, - SparkSession.class, - HoodieIngestionMetrics.class, StreamContext.class}, - cfg, jssc, sparkSession, metrics, streamContext); - } catch (HoodieException e) { - return createSource(sourceClass, cfg, jssc, sparkSession, streamContext.getSchemaProvider(), metrics); - } - } catch (Throwable e) { - throw new IOException("Could not load source class " + sourceClass, e); } + throw new IOException("Could not load source class " + sourceClass, sourceClassLoadException); } public static JsonKafkaSourcePostProcessor createJsonKafkaSourcePostProcessor(String postProcessorClassNames, TypedProperties props) throws IOException { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java index d1d320f99b8..5900ddade24 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java @@ -26,13 +26,12 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher; import org.apache.hudi.utilities.sources.helpers.CloudObjectIncrCheckpoint; -import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata; import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon; -import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy; import org.apache.hudi.utilities.sources.helpers.QueryInfo; import org.apache.hudi.utilities.sources.helpers.QueryRunner; -import org.apache.hudi.utilities.sources.helpers.gcs.GcsObjectMetadataFetcher; +import org.apache.hudi.utilities.streamer.DefaultStreamContext; +import org.apache.hudi.utilities.streamer.StreamContext; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; @@ -42,7 +41,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collections; -import java.util.List; import static org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties; import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; @@ -52,6 +50,7 @@ import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; import static org.apache.hudi.utilities.config.CloudSourceConfig.ENABLE_EXISTS_CHECK; import static org.apache.hudi.utilities.config.HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH; import static org.apache.hudi.utilities.config.HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH; +import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.Type.GCS; import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.generateQueryInfo; import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getHollowCommitHandleMode; import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getMissingCheckpointStrategy; @@ -109,8 +108,7 @@ public class GcsEventsHoodieIncrSource extends HoodieIncrSource { private final int numInstantsPerFetch; private final MissingCheckpointStrategy missingCheckpointStrategy; - private final GcsObjectMetadataFetcher gcsObjectMetadataFetcher; - private final CloudDataFetcher gcsObjectDataFetcher; + private final CloudDataFetcher cloudDataFetcher; private final QueryRunner queryRunner; private final Option<SchemaProvider> schemaProvider; private final Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitter; @@ -120,16 +118,26 @@ public class GcsEventsHoodieIncrSource extends HoodieIncrSource { public GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext jsc, SparkSession spark, SchemaProvider schemaProvider) { - this(props, jsc, spark, schemaProvider, - new GcsObjectMetadataFetcher(props), - new CloudDataFetcher(props), - new QueryRunner(spark, props) + this(props, jsc, spark, + new CloudDataFetcher(props, jsc, spark), + new QueryRunner(spark, props), + new DefaultStreamContext(schemaProvider, Option.empty()) + ); + } + + public GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext jsc, SparkSession spark, + StreamContext streamContext) { + + this(props, jsc, spark, + new CloudDataFetcher(props, jsc, spark), + new QueryRunner(spark, props), + streamContext ); } GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext jsc, SparkSession spark, - SchemaProvider schemaProvider, GcsObjectMetadataFetcher gcsObjectMetadataFetcher, CloudDataFetcher gcsObjectDataFetcher, QueryRunner queryRunner) { - super(props, jsc, spark, schemaProvider); + CloudDataFetcher cloudDataFetcher, QueryRunner queryRunner, StreamContext streamContext) { + super(props, jsc, spark, streamContext); checkRequiredConfigProperties(props, Collections.singletonList(HOODIE_SRC_BASE_PATH)); srcPath = getStringWithAltKeys(props, HOODIE_SRC_BASE_PATH); @@ -137,10 +145,9 @@ public class GcsEventsHoodieIncrSource extends HoodieIncrSource { numInstantsPerFetch = getIntWithAltKeys(props, NUM_INSTANTS_PER_FETCH); checkIfFileExists = getBooleanWithAltKeys(props, ENABLE_EXISTS_CHECK); - this.gcsObjectMetadataFetcher = gcsObjectMetadataFetcher; - this.gcsObjectDataFetcher = gcsObjectDataFetcher; + this.cloudDataFetcher = cloudDataFetcher; this.queryRunner = queryRunner; - this.schemaProvider = Option.ofNullable(schemaProvider); + this.schemaProvider = Option.ofNullable(streamContext.getSchemaProvider()); this.snapshotLoadQuerySplitter = SnapshotLoadQuerySplitter.getInstance(props); LOG.info("srcPath: " + srcPath); @@ -168,28 +175,6 @@ public class GcsEventsHoodieIncrSource extends HoodieIncrSource { + queryInfo.getStartInstant()); return Pair.of(Option.empty(), queryInfo.getStartInstant()); } - - Pair<QueryInfo, Dataset<Row>> queryInfoDatasetPair = queryRunner.run(queryInfo, snapshotLoadQuerySplitter); - Dataset<Row> filteredSourceData = gcsObjectMetadataFetcher.applyFilter(queryInfoDatasetPair.getRight()); - queryInfo = queryInfoDatasetPair.getLeft(); - LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based on sourceLimit :" + sourceLimit); - Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset = - IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( - filteredSourceData, sourceLimit, queryInfo, cloudObjectIncrCheckpoint); - if (!checkPointAndDataset.getRight().isPresent()) { - LOG.info("Empty source, returning endpoint:" + checkPointAndDataset.getLeft()); - return Pair.of(Option.empty(), checkPointAndDataset.getLeft().toString()); - } - LOG.info("Adjusted end checkpoint :" + checkPointAndDataset.getLeft()); - - Pair<Option<Dataset<Row>>, String> extractedCheckPointAndDataset = extractData(queryInfo, checkPointAndDataset.getRight().get()); - return Pair.of(extractedCheckPointAndDataset.getLeft(), checkPointAndDataset.getLeft().toString()); - } - - private Pair<Option<Dataset<Row>>, String> extractData(QueryInfo queryInfo, Dataset<Row> cloudObjectMetadataDF) { - List<CloudObjectMetadata> cloudObjectMetadata = gcsObjectMetadataFetcher.getGcsObjectMetadata(sparkContext, cloudObjectMetadataDF, checkIfFileExists); - LOG.info("Total number of files to process :" + cloudObjectMetadata.size()); - Option<Dataset<Row>> fileDataRows = gcsObjectDataFetcher.getCloudObjectDataDF(sparkSession, cloudObjectMetadata, props, schemaProvider); - return Pair.of(fileDataRows, queryInfo.getEndInstant()); + return cloudDataFetcher.fetchPartitionedSource(GCS, cloudObjectIncrCheckpoint, this.sourceProfileSupplier, queryRunner.run(queryInfo, snapshotLoadQuerySplitter), this.schemaProvider, sourceLimit); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java index 9ea394889c9..eecab298840 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java @@ -25,9 +25,9 @@ import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.utilities.config.HoodieIncrSourceConfig; -import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; import org.apache.hudi.utilities.sources.helpers.QueryInfo; +import org.apache.hudi.utilities.streamer.StreamContext; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; @@ -127,8 +127,8 @@ public class HoodieIncrSource extends RowSource { } public HoodieIncrSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, - SchemaProvider schemaProvider) { - super(props, sparkContext, sparkSession, schemaProvider); + StreamContext streamContext) { + super(props, sparkContext, sparkSession, streamContext); this.snapshotLoadQuerySplitter = SnapshotLoadQuerySplitter.getInstance(props); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java index 1c7e9d99098..f76c285f2bb 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java @@ -26,8 +26,9 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.exception.HoodieReadFromSourceException; import org.apache.hudi.utilities.schema.SchemaProvider; - import org.apache.hudi.utilities.sources.helpers.SanitizationUtils; +import org.apache.hudi.utilities.streamer.StreamContext; + import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -41,6 +42,11 @@ public abstract class RowSource extends Source<Dataset<Row>> { SchemaProvider schemaProvider) { super(props, sparkContext, sparkSession, schemaProvider, SourceType.ROW); } + + public RowSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, + StreamContext streamContext) { + super(props, sparkContext, sparkSession, SourceType.ROW, streamContext); + } protected abstract Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java index be9914190e7..579bc5c2021 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java @@ -23,41 +23,32 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.hadoop.fs.HadoopFSUtils; -import org.apache.hudi.storage.StorageConfiguration; -import org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher; import org.apache.hudi.utilities.sources.helpers.CloudObjectIncrCheckpoint; -import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata; import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon; import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; import org.apache.hudi.utilities.sources.helpers.QueryInfo; import org.apache.hudi.utilities.sources.helpers.QueryRunner; +import org.apache.hudi.utilities.streamer.DefaultStreamContext; +import org.apache.hudi.utilities.streamer.StreamContext; -import org.apache.hadoop.conf.Configuration; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collections; -import java.util.List; import static org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties; -import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys; import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; -import static org.apache.hudi.utilities.config.CloudSourceConfig.ENABLE_EXISTS_CHECK; import static org.apache.hudi.utilities.config.HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH; import static org.apache.hudi.utilities.config.HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH; -import static org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_FS_PREFIX; -import static org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_INCR_ENABLE_EXISTS_CHECK; -import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.getCloudObjectMetadataPerPartition; +import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.Type.S3; import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getHollowCommitHandleMode; import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getMissingCheckpointStrategy; @@ -69,7 +60,6 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource { private static final Logger LOG = LoggerFactory.getLogger(S3EventsHoodieIncrSource.class); private final String srcPath; private final int numInstantsPerFetch; - private final boolean checkIfFileExists; private final IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy; private final QueryRunner queryRunner; private final CloudDataFetcher cloudDataFetcher; @@ -78,50 +68,39 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource { private final Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitter; - public static class Config { - // control whether we do existence check for files before consuming them - @Deprecated - static final String ENABLE_EXISTS_CHECK = S3_INCR_ENABLE_EXISTS_CHECK.key(); - @Deprecated - static final Boolean DEFAULT_ENABLE_EXISTS_CHECK = S3_INCR_ENABLE_EXISTS_CHECK.defaultValue(); - - @Deprecated - static final String S3_FS_PREFIX = S3EventsHoodieIncrSourceConfig.S3_FS_PREFIX.key(); - - /** - * {@link #SPARK_DATASOURCE_OPTIONS} is json string, passed to the reader while loading dataset. - * Example Hudi Streamer conf - * - --hoodie-conf hoodie.streamer.source.s3incr.spark.datasource.options={"header":"true","encoding":"UTF-8"} - */ - @Deprecated - public static final String SPARK_DATASOURCE_OPTIONS = S3EventsHoodieIncrSourceConfig.SPARK_DATASOURCE_OPTIONS.key(); + public S3EventsHoodieIncrSource( + TypedProperties props, + JavaSparkContext sparkContext, + SparkSession sparkSession, + SchemaProvider schemaProvider) { + this(props, sparkContext, sparkSession, new QueryRunner(sparkSession, props), + new CloudDataFetcher(props, sparkContext, sparkSession), new DefaultStreamContext(schemaProvider, Option.empty())); } public S3EventsHoodieIncrSource( TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, - SchemaProvider schemaProvider) { - this(props, sparkContext, sparkSession, schemaProvider, new QueryRunner(sparkSession, props), - new CloudDataFetcher(props)); + StreamContext streamContext) { + this(props, sparkContext, sparkSession, new QueryRunner(sparkSession, props), + new CloudDataFetcher(props, sparkContext, sparkSession), streamContext); } public S3EventsHoodieIncrSource( TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, - SchemaProvider schemaProvider, QueryRunner queryRunner, - CloudDataFetcher cloudDataFetcher) { - super(props, sparkContext, sparkSession, schemaProvider); + CloudDataFetcher cloudDataFetcher, + StreamContext streamContext) { + super(props, sparkContext, sparkSession, streamContext); checkRequiredConfigProperties(props, Collections.singletonList(HOODIE_SRC_BASE_PATH)); this.srcPath = getStringWithAltKeys(props, HOODIE_SRC_BASE_PATH); this.numInstantsPerFetch = getIntWithAltKeys(props, NUM_INSTANTS_PER_FETCH); - this.checkIfFileExists = getBooleanWithAltKeys(props, ENABLE_EXISTS_CHECK); this.missingCheckpointStrategy = getMissingCheckpointStrategy(props); this.queryRunner = queryRunner; this.cloudDataFetcher = cloudDataFetcher; - this.schemaProvider = Option.ofNullable(schemaProvider); + this.schemaProvider = Option.ofNullable(streamContext.getSchemaProvider()); this.snapshotLoadQuerySplitter = SnapshotLoadQuerySplitter.getInstance(props); } @@ -144,36 +123,6 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource { LOG.warn("Already caught up. No new data to process"); return Pair.of(Option.empty(), queryInfo.getEndInstant()); } - Pair<QueryInfo, Dataset<Row>> queryInfoDatasetPair = queryRunner.run(queryInfo, snapshotLoadQuerySplitter); - queryInfo = queryInfoDatasetPair.getLeft(); - Dataset<Row> filteredSourceData = queryInfoDatasetPair.getRight().filter( - CloudObjectsSelectorCommon.generateFilter(CloudObjectsSelectorCommon.Type.S3, props)); - - LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based on sourceLimit :" + sourceLimit); - Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset = - IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( - filteredSourceData, sourceLimit, queryInfo, cloudObjectIncrCheckpoint); - if (!checkPointAndDataset.getRight().isPresent()) { - LOG.info("Empty source, returning endpoint:" + checkPointAndDataset.getLeft()); - return Pair.of(Option.empty(), checkPointAndDataset.getLeft().toString()); - } - LOG.info("Adjusted end checkpoint :" + checkPointAndDataset.getLeft()); - - String s3FS = getStringWithAltKeys(props, S3_FS_PREFIX, true).toLowerCase(); - String s3Prefix = s3FS + "://"; - - // Create S3 paths - StorageConfiguration<Configuration> storageConf = HadoopFSUtils.getStorageConfWithCopy(sparkContext.hadoopConfiguration()); - List<CloudObjectMetadata> cloudObjectMetadata = checkPointAndDataset.getRight().get() - .select(CloudObjectsSelectorCommon.S3_BUCKET_NAME, - CloudObjectsSelectorCommon.S3_OBJECT_KEY, - CloudObjectsSelectorCommon.S3_OBJECT_SIZE) - .distinct() - .mapPartitions(getCloudObjectMetadataPerPartition(s3Prefix, storageConf, checkIfFileExists), Encoders.kryo(CloudObjectMetadata.class)) - .collectAsList(); - LOG.info("Total number of files to process :" + cloudObjectMetadata.size()); - - Option<Dataset<Row>> datasetOption = cloudDataFetcher.getCloudObjectDataDF(sparkSession, cloudObjectMetadata, props, schemaProvider); - return Pair.of(datasetOption, checkPointAndDataset.getLeft().toString()); + return cloudDataFetcher.fetchPartitionedSource(S3, cloudObjectIncrCheckpoint, this.sourceProfileSupplier, queryRunner.run(queryInfo, snapshotLoadQuerySplitter), this.schemaProvider, sourceLimit); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java index ed1a49e33e7..06fb89da9a4 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java @@ -21,8 +21,11 @@ package org.apache.hudi.utilities.sources.helpers; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.streamer.SourceProfileSupplier; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; @@ -32,10 +35,13 @@ import org.slf4j.LoggerFactory; import java.io.Serializable; import java.util.List; +import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_MAX_FILE_SIZE; +import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; import static org.apache.hudi.utilities.config.CloudSourceConfig.DATAFILE_FORMAT; +import static org.apache.hudi.utilities.config.CloudSourceConfig.ENABLE_EXISTS_CHECK; +import static org.apache.hudi.utilities.config.CloudSourceConfig.SOURCE_MAX_BYTES_PER_PARTITION; import static org.apache.hudi.utilities.config.HoodieIncrSourceConfig.SOURCE_FILE_FORMAT; -import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.loadAsDataset; /** * Connects to S3/GCS from Spark and downloads data from a given list of files. @@ -45,14 +51,24 @@ public class CloudDataFetcher implements Serializable { private static final String EMPTY_STRING = ""; - private final TypedProperties props; + private transient TypedProperties props; + private transient JavaSparkContext sparkContext; + private transient SparkSession sparkSession; + private transient CloudObjectsSelectorCommon cloudObjectsSelectorCommon; private static final Logger LOG = LoggerFactory.getLogger(CloudDataFetcher.class); private static final long serialVersionUID = 1L; - public CloudDataFetcher(TypedProperties props) { + public CloudDataFetcher(TypedProperties props, JavaSparkContext jsc, SparkSession sparkSession) { + this(props, jsc, sparkSession, new CloudObjectsSelectorCommon(props)); + } + + public CloudDataFetcher(TypedProperties props, JavaSparkContext jsc, SparkSession sparkSession, CloudObjectsSelectorCommon cloudObjectsSelectorCommon) { this.props = props; + this.sparkContext = jsc; + this.sparkSession = sparkSession; + this.cloudObjectsSelectorCommon = cloudObjectsSelectorCommon; } public static String getFileFormat(TypedProperties props) { @@ -63,8 +79,59 @@ public class CloudDataFetcher implements Serializable { : getStringWithAltKeys(props, DATAFILE_FORMAT, EMPTY_STRING); } - public Option<Dataset<Row>> getCloudObjectDataDF(SparkSession spark, List<CloudObjectMetadata> cloudObjectMetadata, - TypedProperties props, Option<SchemaProvider> schemaProviderOption) { - return loadAsDataset(spark, cloudObjectMetadata, props, getFileFormat(props), schemaProviderOption); + public Pair<Option<Dataset<Row>>, String> fetchPartitionedSource( + CloudObjectsSelectorCommon.Type cloudType, + CloudObjectIncrCheckpoint cloudObjectIncrCheckpoint, + Option<SourceProfileSupplier> sourceProfileSupplier, + Pair<QueryInfo, Dataset<Row>> queryInfoDatasetPair, + Option<SchemaProvider> schemaProvider, + long sourceLimit) { + boolean isSourceProfileSupplierAvailable = sourceProfileSupplier.isPresent() && sourceProfileSupplier.get().getSourceProfile() != null; + if (isSourceProfileSupplierAvailable) { + LOG.debug("Using source limit from source profile sourceLimitFromConfig {} sourceLimitFromProfile {}", sourceLimit, sourceProfileSupplier.get().getSourceProfile().getMaxSourceBytes()); + sourceLimit = sourceProfileSupplier.get().getSourceProfile().getMaxSourceBytes(); + } + + QueryInfo queryInfo = queryInfoDatasetPair.getLeft(); + String filter = CloudObjectsSelectorCommon.generateFilter(cloudType, props); + LOG.info("Adding filter string to Dataset: " + filter); + Dataset<Row> filteredSourceData = queryInfoDatasetPair.getRight().filter(filter); + + LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based on sourceLimit :" + sourceLimit); + Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset = + IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( + filteredSourceData, sourceLimit, queryInfo, cloudObjectIncrCheckpoint); + if (!checkPointAndDataset.getRight().isPresent()) { + LOG.info("Empty source, returning endpoint:" + checkPointAndDataset.getLeft()); + return Pair.of(Option.empty(), checkPointAndDataset.getLeft().toString()); + } + LOG.info("Adjusted end checkpoint :" + checkPointAndDataset.getLeft()); + + boolean checkIfFileExists = getBooleanWithAltKeys(props, ENABLE_EXISTS_CHECK); + List<CloudObjectMetadata> cloudObjectMetadata = CloudObjectsSelectorCommon.getObjectMetadata(cloudType, sparkContext, checkPointAndDataset.getRight().get(), checkIfFileExists, props); + LOG.info("Total number of files to process :" + cloudObjectMetadata.size()); + + long bytesPerPartition = props.containsKey(SOURCE_MAX_BYTES_PER_PARTITION.key()) ? props.getLong(SOURCE_MAX_BYTES_PER_PARTITION.key()) : + props.getLong(PARQUET_MAX_FILE_SIZE.key(), Long.parseLong(PARQUET_MAX_FILE_SIZE.defaultValue())); + if (isSourceProfileSupplierAvailable) { + long bytesPerPartitionFromProfile = (long) sourceProfileSupplier.get().getSourceProfile().getSourceSpecificContext(); + if (bytesPerPartitionFromProfile > 0) { + LOG.debug("Using bytesPerPartition from source profile bytesPerPartitionFromConfig {} bytesPerPartitionFromProfile {}", bytesPerPartition, bytesPerPartitionFromProfile); + bytesPerPartition = bytesPerPartitionFromProfile; + } + } + Option<Dataset<Row>> datasetOption = getCloudObjectDataDF(cloudObjectMetadata, schemaProvider, bytesPerPartition); + return Pair.of(datasetOption, checkPointAndDataset.getLeft().toString()); + } + + private Option<Dataset<Row>> getCloudObjectDataDF(List<CloudObjectMetadata> cloudObjectMetadata, Option<SchemaProvider> schemaProviderOption, long bytesPerPartition) { + long totalSize = 0; + for (CloudObjectMetadata o : cloudObjectMetadata) { + totalSize += o.getSize(); + } + // inflate 10% for potential hoodie meta fields + double totalSizeWithHoodieMetaFields = totalSize * 1.1; + int numPartitions = (int) Math.max(Math.ceil(totalSizeWithHoodieMetaFields / bytesPerPartition), 1); + return cloudObjectsSelectorCommon.loadAsDataset(sparkSession, cloudObjectMetadata, getFileFormat(props), schemaProviderOption, numPartitions); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java index 8a442455291..8aee9d92754 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java @@ -37,9 +37,11 @@ import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.MapPartitionsFunction; import org.apache.spark.sql.DataFrameReader; import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; @@ -53,7 +55,6 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_MAX_FILE_SIZE; import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty; import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty; import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; @@ -62,8 +63,8 @@ import static org.apache.hudi.utilities.config.CloudSourceConfig.IGNORE_RELATIVE import static org.apache.hudi.utilities.config.CloudSourceConfig.IGNORE_RELATIVE_PATH_SUBSTR; import static org.apache.hudi.utilities.config.CloudSourceConfig.PATH_BASED_PARTITION_FIELDS; import static org.apache.hudi.utilities.config.CloudSourceConfig.SELECT_RELATIVE_PATH_PREFIX; -import static org.apache.hudi.utilities.config.CloudSourceConfig.SOURCE_MAX_BYTES_PER_PARTITION; import static org.apache.hudi.utilities.config.CloudSourceConfig.SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT; +import static org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_FS_PREFIX; import static org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_PREFIX; import static org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_SUBSTRING; import static org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_KEY_PREFIX; @@ -85,6 +86,13 @@ public class CloudObjectsSelectorCommon { public static final String GCS_OBJECT_KEY = "name"; public static final String GCS_OBJECT_SIZE = "size"; private static final String SPACE_DELIMTER = " "; + private static final String GCS_PREFIX = "gs://"; + + private final TypedProperties properties; + + public CloudObjectsSelectorCommon(TypedProperties properties) { + this.properties = properties; + } /** * Return a function that extracts filepaths from a list of Rows. @@ -205,8 +213,40 @@ public class CloudObjectsSelectorCommon { return filter.toString(); } - public static Option<Dataset<Row>> loadAsDataset(SparkSession spark, List<CloudObjectMetadata> cloudObjectMetadata, - TypedProperties props, String fileFormat, Option<SchemaProvider> schemaProviderOption) { + /** + * @param cloudObjectMetadataDF a Dataset that contains metadata of S3/GCS objects. Assumed to be a persisted form + * of a Cloud Storage SQS/PubSub Notification event. + * @param checkIfExists Check if each file exists, before returning its full path + * @return A {@link List} of {@link CloudObjectMetadata} containing file info. + */ + public static List<CloudObjectMetadata> getObjectMetadata( + Type type, + JavaSparkContext jsc, + Dataset<Row> cloudObjectMetadataDF, + boolean checkIfExists, + TypedProperties props + ) { + StorageConfiguration<Configuration> storageConf = HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration()); + if (type == Type.GCS) { + return cloudObjectMetadataDF + .select("bucket", "name", "size") + .distinct() + .mapPartitions(getCloudObjectMetadataPerPartition(GCS_PREFIX, storageConf, checkIfExists), Encoders.kryo(CloudObjectMetadata.class)) + .collectAsList(); + } else if (type == Type.S3) { + String s3FS = getStringWithAltKeys(props, S3_FS_PREFIX, true).toLowerCase(); + String s3Prefix = s3FS + "://"; + return cloudObjectMetadataDF + .select(CloudObjectsSelectorCommon.S3_BUCKET_NAME, CloudObjectsSelectorCommon.S3_OBJECT_KEY, CloudObjectsSelectorCommon.S3_OBJECT_SIZE) + .distinct() + .mapPartitions(getCloudObjectMetadataPerPartition(s3Prefix, storageConf, checkIfExists), Encoders.kryo(CloudObjectMetadata.class)) + .collectAsList(); + } + throw new UnsupportedOperationException("Invalid cloud type " + type); + } + + public Option<Dataset<Row>> loadAsDataset(SparkSession spark, List<CloudObjectMetadata> cloudObjectMetadata, + String fileFormat, Option<SchemaProvider> schemaProviderOption, int numPartitions) { if (LOG.isDebugEnabled()) { LOG.debug("Extracted distinct files " + cloudObjectMetadata.size() + " and some samples " + cloudObjectMetadata.stream().map(CloudObjectMetadata::getPath).limit(10).collect(Collectors.toList())); @@ -216,7 +256,7 @@ public class CloudObjectsSelectorCommon { return Option.empty(); } DataFrameReader reader = spark.read().format(fileFormat); - String datasourceOpts = getStringWithAltKeys(props, CloudSourceConfig.SPARK_DATASOURCE_OPTIONS, true); + String datasourceOpts = getStringWithAltKeys(properties, CloudSourceConfig.SPARK_DATASOURCE_OPTIONS, true); if (schemaProviderOption.isPresent()) { Schema sourceSchema = schemaProviderOption.get().getSourceSchema(); if (sourceSchema != null && !sourceSchema.equals(InputBatch.NULL_SCHEMA)) { @@ -225,7 +265,7 @@ public class CloudObjectsSelectorCommon { } if (StringUtils.isNullOrEmpty(datasourceOpts)) { // fall back to legacy config for BWC. TODO consolidate in HUDI-6020 - datasourceOpts = getStringWithAltKeys(props, S3EventsHoodieIncrSourceConfig.SPARK_DATASOURCE_OPTIONS, true); + datasourceOpts = getStringWithAltKeys(properties, S3EventsHoodieIncrSourceConfig.SPARK_DATASOURCE_OPTIONS, true); } if (StringUtils.nonEmpty(datasourceOpts)) { final ObjectMapper mapper = new ObjectMapper(); @@ -239,18 +279,10 @@ public class CloudObjectsSelectorCommon { reader = reader.options(sparkOptionsMap); } List<String> paths = new ArrayList<>(); - long totalSize = 0; for (CloudObjectMetadata o : cloudObjectMetadata) { paths.add(o.getPath()); - totalSize += o.getSize(); } - // inflate 10% for potential hoodie meta fields - totalSize *= 1.1; - // if source bytes are provided, then give preference to that. - long bytesPerPartition = props.containsKey(SOURCE_MAX_BYTES_PER_PARTITION.key()) ? props.getLong(SOURCE_MAX_BYTES_PER_PARTITION.key()) : - props.getLong(PARQUET_MAX_FILE_SIZE.key(), Long.parseLong(PARQUET_MAX_FILE_SIZE.defaultValue())); - int numPartitions = (int) Math.max(Math.ceil(totalSize / bytesPerPartition), 1); - boolean isCommaSeparatedPathFormat = props.getBoolean(SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT.key(), false); + boolean isCommaSeparatedPathFormat = properties.getBoolean(SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT.key(), false); Dataset<Row> dataset; if (isCommaSeparatedPathFormat) { @@ -260,8 +292,8 @@ public class CloudObjectsSelectorCommon { } // add partition column from source path if configured - if (containsConfigProperty(props, PATH_BASED_PARTITION_FIELDS)) { - String[] partitionKeysToAdd = getStringWithAltKeys(props, PATH_BASED_PARTITION_FIELDS).split(","); + if (containsConfigProperty(properties, PATH_BASED_PARTITION_FIELDS)) { + String[] partitionKeysToAdd = getStringWithAltKeys(properties, PATH_BASED_PARTITION_FIELDS).split(","); // Add partition column for all path-based partition keys. If key is not present in path, the value will be null. for (String partitionKey : partitionKeysToAdd) { String partitionPathPattern = String.format("%s=", partitionKey); @@ -284,10 +316,6 @@ public class CloudObjectsSelectorCommon { return dataset; } - public static Option<Dataset<Row>> loadAsDataset(SparkSession spark, List<CloudObjectMetadata> cloudObjectMetadata, TypedProperties props, String fileFormat) { - return loadAsDataset(spark, cloudObjectMetadata, props, fileFormat, Option.empty()); - } - private static Option<String> getPropVal(TypedProperties props, ConfigProperty<String> configProperty) { String value = getStringWithAltKeys(props, configProperty, true); if (!StringUtils.isNullOrEmpty(value)) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java deleted file mode 100644 index 21ca334d05f..00000000000 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.utilities.sources.helpers.gcs; - -import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.hadoop.fs.HadoopFSUtils; -import org.apache.hudi.storage.StorageConfiguration; -import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata; -import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon; - -import org.apache.hadoop.conf.Configuration; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Serializable; -import java.util.List; - -import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.getCloudObjectMetadataPerPartition; - -/** - * Extracts a list of GCS {@link CloudObjectMetadata} containing metadata of GCS objects from a given Spark Dataset as input. - * Optionally: - * i) Match the filename and path against provided input filter strings - * ii) Check if each file exists on GCS, in which case it assumes SparkContext is already - * configured with GCS options through GcsEventsHoodieIncrSource.addGcsAccessConfs(). - */ -public class GcsObjectMetadataFetcher implements Serializable { - - private final TypedProperties props; - - private static final String GCS_PREFIX = "gs://"; - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(GcsObjectMetadataFetcher.class); - - public GcsObjectMetadataFetcher(TypedProperties props) { - this.props = props; - } - - /** - * @param cloudObjectMetadataDF a Dataset that contains metadata of GCS objects. Assumed to be a persisted form - * of a Cloud Storage Pubsub Notification event. - * @param checkIfExists Check if each file exists, before returning its full path - * @return A {@link List} of {@link CloudObjectMetadata} containing GCS info. - */ - public List<CloudObjectMetadata> getGcsObjectMetadata(JavaSparkContext jsc, Dataset<Row> cloudObjectMetadataDF, boolean checkIfExists) { - StorageConfiguration<Configuration> storageConf = HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration()); - return cloudObjectMetadataDF - .select("bucket", "name", "size") - .distinct() - .mapPartitions(getCloudObjectMetadataPerPartition(GCS_PREFIX, storageConf, checkIfExists), Encoders.kryo(CloudObjectMetadata.class)) - .collectAsList(); - } - - /** - * @param cloudObjectMetadataDF a Dataset that contains metadata of GCS objects. Assumed to be a persisted form - * of a Cloud Storage Pubsub Notification event. - * @return Dataset<Row> after apply the filtering. - */ - public Dataset<Row> applyFilter(Dataset<Row> cloudObjectMetadataDF) { - String filter = CloudObjectsSelectorCommon.generateFilter(CloudObjectsSelectorCommon.Type.GCS, props); - LOG.info("Adding filter string to Dataset: " + filter); - - return cloudObjectMetadataDF.filter(filter); - } -} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java index 8d529fda073..dda205db8f8 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java @@ -36,14 +36,19 @@ import org.apache.hudi.config.HoodieArchivalConfig; import org.apache.hudi.config.HoodieCleanConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; +import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.config.CloudSourceConfig; +import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.sources.TestS3EventsHoodieIncrSource.TestSourceProfile; import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher; +import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon; import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; import org.apache.hudi.utilities.sources.helpers.QueryInfo; import org.apache.hudi.utilities.sources.helpers.QueryRunner; -import org.apache.hudi.utilities.sources.helpers.gcs.GcsObjectMetadataFetcher; +import org.apache.hudi.utilities.streamer.DefaultStreamContext; +import org.apache.hudi.utilities.streamer.SourceProfileSupplier; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -61,8 +66,8 @@ import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,8 +83,12 @@ import java.util.stream.Collectors; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.times; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -94,13 +103,14 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn @TempDir protected java.nio.file.Path tempDir; - @Mock - CloudDataFetcher gcsObjectDataFetcher; - @Mock QueryRunner queryRunner; @Mock QueryInfo queryInfo; + @Mock + CloudObjectsSelectorCommon cloudObjectsSelectorCommon; + @Mock + SourceProfileSupplier sourceProfileSupplier; protected Option<SchemaProvider> schemaProvider; private HoodieTableMetaClient metaClient; @@ -133,9 +143,6 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn Pair<String, List<HoodieRecord>> inserts = writeGcsMetadataRecords(commitTimeForWrites); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 100L, inserts.getKey()); - - verify(gcsObjectDataFetcher, times(0)).getCloudObjectDataDF( - Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider)); } @Test @@ -151,7 +158,7 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); setMockQueryRunner(inputDs); - + when(sourceProfileSupplier.getSourceProfile()).thenReturn(null); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 100L, "1#path/to/file1.json"); } @@ -170,7 +177,7 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); setMockQueryRunner(inputDs); - + when(sourceProfileSupplier.getSourceProfile()).thenReturn(null); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 250L, "1#path/to/file2.json"); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"), 250L, "1#path/to/file3.json"); } @@ -193,7 +200,7 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); setMockQueryRunner(inputDs); - + when(sourceProfileSupplier.getSourceProfile()).thenReturn(null); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 250L, "1#path/to/file10006.json"); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file10006.json"), 250L, "1#path/to/file10007.json"); } @@ -227,15 +234,20 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file5%s", extension), 150L, "2")); Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); - + List<Long> bytesPerPartition = Arrays.asList(10L, 100L, -1L); setMockQueryRunner(inputDs); - readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 100L, - "1#path/to/file1" + extension, typedProperties); - readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1" + extension), 100L, - "1#path/to/file2" + extension, typedProperties); - readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2" + extension), 1000L, - "2#path/to/file5" + extension, typedProperties); + when(sourceProfileSupplier.getSourceProfile()).thenReturn(new TestSourceProfile(100L, bytesPerPartition.get(0))); + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 100L, "1#path/to/file1" + extension, typedProperties); + when(sourceProfileSupplier.getSourceProfile()).thenReturn(new TestSourceProfile(100L, bytesPerPartition.get(1))); + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1" + extension), 100L, "1#path/to/file2" + extension, typedProperties); + when(sourceProfileSupplier.getSourceProfile()).thenReturn(new TestSourceProfile(1000L, bytesPerPartition.get(2))); + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2" + extension), 1000L, "2#path/to/file5" + extension, typedProperties); + // Verify the partitions being passed in getCloudObjectDataDF are correct. + List<Integer> numPartitions = Arrays.asList(12, 2, 1); + ArgumentCaptor<Integer> argumentCaptor = ArgumentCaptor.forClass(Integer.class); + verify(cloudObjectsSelectorCommon, atLeastOnce()).loadAsDataset(any(), any(), any(), eq(schemaProvider), argumentCaptor.capture()); + Assertions.assertEquals(numPartitions, argumentCaptor.getAllValues()); } @ParameterizedTest @@ -264,15 +276,41 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn setMockQueryRunner(inputDs, Option.of(snapshotCheckPoint)); TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT); typedProperties.setProperty("hoodie.streamer.source.cloud.data.ignore.relpath.prefix", "path/to/skip"); + when(sourceProfileSupplier.getSourceProfile()).thenReturn(null); + List<Long> bytesPerPartition = Arrays.asList(10L, 20L, -1L, 1000L * 1000L * 1000L); + //1. snapshot query, read all records + when(sourceProfileSupplier.getSourceProfile()).thenReturn(new TestSourceProfile(50000L, bytesPerPartition.get(0))); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50000L, exptected1, typedProperties); //2. incremental query, as commit is present in timeline + when(sourceProfileSupplier.getSourceProfile()).thenReturn(new TestSourceProfile(10L, bytesPerPartition.get(1))); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(exptected1), 10L, exptected2, typedProperties); //3. snapshot query with source limit less than first commit size + when(sourceProfileSupplier.getSourceProfile()).thenReturn(new TestSourceProfile(50L, bytesPerPartition.get(2))); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected3, typedProperties); typedProperties.setProperty("hoodie.streamer.source.cloud.data.ignore.relpath.prefix", "path/to"); //4. As snapshotQuery will return 1 -> same would be return as nextCheckpoint (dataset is empty due to ignore prefix). + when(sourceProfileSupplier.getSourceProfile()).thenReturn(new TestSourceProfile(50L, bytesPerPartition.get(3))); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected4, typedProperties); + // Verify the partitions being passed in getCloudObjectDataDF are correct. + ArgumentCaptor<Integer> argumentCaptor = ArgumentCaptor.forClass(Integer.class); + verify(cloudObjectsSelectorCommon, atLeastOnce()).loadAsDataset(any(), any(), any(), eq(schemaProvider), argumentCaptor.capture()); + if (snapshotCheckPoint.equals("1") || snapshotCheckPoint.equals("2")) { + Assertions.assertEquals(Arrays.asList(12, 3, 1), argumentCaptor.getAllValues()); + } else { + Assertions.assertEquals(Arrays.asList(23, 1), argumentCaptor.getAllValues()); + } + } + + @Test + public void testCreateSource() throws IOException { + TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT); + HoodieIngestionMetrics metrics = mock(HoodieIngestionMetrics.class); + Source gcsSource = UtilHelpers.createSource(GcsEventsHoodieIncrSource.class.getName(), typedProperties, jsc(), spark(), metrics, + new DefaultStreamContext(schemaProvider.orElse(null), Option.of(sourceProfileSupplier))); + assertEquals(Source.SourceType.ROW, gcsSource.getSourceType()); + assertThrows(IOException.class, () -> UtilHelpers.createSource(GcsEventsHoodieIncrSource.class.getName(), new TypedProperties(), jsc(), spark(), metrics, + new DefaultStreamContext(schemaProvider.orElse(null), Option.of(sourceProfileSupplier)))); } private void setMockQueryRunner(Dataset<Row> inputDs) { @@ -281,7 +319,7 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn private void setMockQueryRunner(Dataset<Row> inputDs, Option<String> nextCheckPointOpt) { - when(queryRunner.run(Mockito.any(QueryInfo.class), Mockito.any())).thenAnswer(invocation -> { + when(queryRunner.run(any(QueryInfo.class), any())).thenAnswer(invocation -> { QueryInfo queryInfo = invocation.getArgument(0); QueryInfo updatedQueryInfo = nextCheckPointOpt.map(nextCheckPoint -> queryInfo.withUpdatedEndInstant(nextCheckPoint)) @@ -302,7 +340,8 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn TypedProperties typedProperties) { GcsEventsHoodieIncrSource incrSource = new GcsEventsHoodieIncrSource(typedProperties, jsc(), - spark(), schemaProvider.orElse(null), new GcsObjectMetadataFetcher(typedProperties), gcsObjectDataFetcher, queryRunner); + spark(), new CloudDataFetcher(typedProperties, jsc(), spark(), cloudObjectsSelectorCommon), queryRunner, + new DefaultStreamContext(schemaProvider.orElse(null), Option.of(sourceProfileSupplier))); Pair<Option<Dataset<Row>>, String> dataAndCheckpoint = incrSource.fetchNextBatch(checkpointToPull, sourceLimit); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java index d01543044b0..c1e7f9dca49 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java @@ -43,6 +43,7 @@ import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; import org.apache.hudi.utilities.sources.helpers.TestSnapshotQuerySplitterImpl; +import org.apache.hudi.utilities.streamer.DefaultStreamContext; import org.apache.avro.Schema; import org.apache.spark.api.java.JavaRDD; @@ -335,7 +336,7 @@ public class TestHoodieIncrSource extends SparkClientFunctionalTestHarness { snapshotCheckPointImplClassOpt.map(className -> properties.setProperty(SnapshotLoadQuerySplitter.Config.SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME, className)); TypedProperties typedProperties = new TypedProperties(properties); - HoodieIncrSource incrSource = new HoodieIncrSource(typedProperties, jsc(), spark(), new DummySchemaProvider(HoodieTestDataGenerator.AVRO_SCHEMA)); + HoodieIncrSource incrSource = new HoodieIncrSource(typedProperties, jsc(), spark(), new DefaultStreamContext(new DummySchemaProvider(HoodieTestDataGenerator.AVRO_SCHEMA), Option.empty())); // read everything until latest Pair<Option<Dataset<Row>>, String> batchCheckPoint = incrSource.fetchNextBatch(checkpointToPull, 500); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java index 553078ff3fc..be26dfb1f3b 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java @@ -36,14 +36,20 @@ import org.apache.hudi.config.HoodieArchivalConfig; import org.apache.hudi.config.HoodieCleanConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; +import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.config.CloudSourceConfig; +import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher; +import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon; import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; import org.apache.hudi.utilities.sources.helpers.QueryInfo; import org.apache.hudi.utilities.sources.helpers.QueryRunner; import org.apache.hudi.utilities.sources.helpers.TestCloudObjectsSelectorCommon; +import org.apache.hudi.utilities.streamer.DefaultStreamContext; +import org.apache.hudi.utilities.streamer.SourceProfile; +import org.apache.hudi.utilities.streamer.SourceProfileSupplier; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -61,6 +67,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; @@ -68,6 +75,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -76,7 +84,10 @@ import java.util.stream.Collectors; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT; -import static org.mockito.ArgumentMatchers.eq; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -93,7 +104,9 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne @Mock QueryRunner mockQueryRunner; @Mock - CloudDataFetcher mockCloudDataFetcher; + CloudObjectsSelectorCommon mockCloudObjectsSelectorCommon; + @Mock + SourceProfileSupplier sourceProfileSupplier; @Mock QueryInfo queryInfo; private JavaSparkContext jsc; @@ -257,8 +270,8 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); setMockQueryRunner(inputDs); - when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider))) - .thenReturn(Option.empty()); + when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider), Mockito.anyInt())).thenReturn(Option.empty()); + when(sourceProfileSupplier.getSourceProfile()).thenReturn(null); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 100L, "1#path/to/file1.json"); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1.json"), 200L, "1#path/to/file2.json"); @@ -282,8 +295,8 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); setMockQueryRunner(inputDs); - when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider))) - .thenReturn(Option.empty()); + when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider), Mockito.anyInt())).thenReturn(Option.empty()); + when(sourceProfileSupplier.getSourceProfile()).thenReturn(null); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 250L, "1#path/to/file2.json"); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"), 250L, "1#path/to/file3.json"); @@ -322,15 +335,15 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); setMockQueryRunner(inputDs); - when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider))) - .thenReturn(Option.empty()); + when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider), Mockito.anyInt())).thenReturn(Option.empty()); + when(sourceProfileSupplier.getSourceProfile()).thenReturn(null); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 100L, - "1#path/to/file1" + extension, typedProperties); + "1#path/to/file1" + extension, typedProperties); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1" + extension), 100L, - "1#path/to/file2" + extension, typedProperties); + "1#path/to/file2" + extension, typedProperties); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2" + extension), 1000L, - "2#path/to/file5" + extension, typedProperties); + "2#path/to/file5" + extension, typedProperties); } @Test @@ -363,8 +376,9 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("2"), 1000L, "2", typedProperties); } - @Test - public void testFilterAnEntireCommit() throws IOException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testFilterAnEntireCommit(boolean useSourceProfile) throws IOException { String commitTimeForWrites1 = "2"; String commitTimeForReads = "1"; @@ -385,16 +399,22 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); setMockQueryRunner(inputDs); - when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider))) - .thenReturn(Option.empty()); + SourceProfile<Long> sourceProfile = new TestSourceProfile(50L, 10L); + when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider), Mockito.anyInt())).thenReturn(Option.empty()); + if (useSourceProfile) { + when(sourceProfileSupplier.getSourceProfile()).thenReturn(sourceProfile); + } else { + when(sourceProfileSupplier.getSourceProfile()).thenReturn(null); + } TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT); typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix", "path/to/skip"); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 50L, "2#path/to/file4.json", typedProperties); } - @Test - public void testFilterAnEntireMiddleCommit() throws IOException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testFilterAnEntireMiddleCommit(boolean useSourceProfile) throws IOException { String commitTimeForWrites1 = "2"; String commitTimeForWrites2 = "3"; String commitTimeForReads = "1"; @@ -417,16 +437,21 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); setMockQueryRunner(inputDs); - when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider))) - .thenReturn(Option.empty()); + when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider), Mockito.anyInt())).thenReturn(Option.empty()); + SourceProfile<Long> sourceProfile = new TestSourceProfile(50L, 10L); + if (useSourceProfile) { + when(sourceProfileSupplier.getSourceProfile()).thenReturn(sourceProfile); + } else { + when(sourceProfileSupplier.getSourceProfile()).thenReturn(null); + } + TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT); typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix", "path/to/skip"); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file3.json"), 50L, "3#path/to/file4.json", typedProperties); schemaProvider = Option.empty(); - when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider))) - .thenReturn(Option.empty()); + when(sourceProfileSupplier.getSourceProfile()).thenReturn(null); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file3.json"), 50L, "3#path/to/file4.json", typedProperties); } @@ -454,26 +479,50 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); setMockQueryRunner(inputDs, Option.of(snapshotCheckPoint)); - when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider))) - .thenReturn(Option.empty()); + when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider), Mockito.anyInt())).thenReturn(Option.empty()); TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT); typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix", "path/to/skip"); + List<Long> bytesPerPartition = Arrays.asList(10L, 20L, -1L, 1000L * 1000L * 1000L); + //1. snapshot query, read all records + when(sourceProfileSupplier.getSourceProfile()).thenReturn(new TestSourceProfile(50000L, bytesPerPartition.get(0))); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50000L, exptected1, typedProperties); //2. incremental query, as commit is present in timeline + when(sourceProfileSupplier.getSourceProfile()).thenReturn(new TestSourceProfile(10L, bytesPerPartition.get(1))); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(exptected1), 10L, exptected2, typedProperties); //3. snapshot query with source limit less than first commit size + when(sourceProfileSupplier.getSourceProfile()).thenReturn(new TestSourceProfile(50L, bytesPerPartition.get(2))); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected3, typedProperties); typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix", "path/to"); //4. As snapshotQuery will return 1 -> same would be return as nextCheckpoint (dataset is empty due to ignore prefix). + when(sourceProfileSupplier.getSourceProfile()).thenReturn(new TestSourceProfile(50L, bytesPerPartition.get(3))); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected4, typedProperties); + // Verify the partitions being passed in getCloudObjectDataDF are correct. + ArgumentCaptor<Integer> argumentCaptor = ArgumentCaptor.forClass(Integer.class); + verify(mockCloudObjectsSelectorCommon, atLeastOnce()).loadAsDataset(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider), argumentCaptor.capture()); + List<Integer> numPartitions = Collections.emptyList(); + if (snapshotCheckPoint.equals("1") || snapshotCheckPoint.equals("2")) { + Assertions.assertEquals(Arrays.asList(12, 3, 1), argumentCaptor.getAllValues()); + } else { + Assertions.assertEquals(Arrays.asList(23, 1), argumentCaptor.getAllValues()); + } + } + + @Test + public void testCreateSource() throws IOException { + TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT); + HoodieIngestionMetrics metrics = mock(HoodieIngestionMetrics.class); + Source s3Source = UtilHelpers.createSource(S3EventsHoodieIncrSource.class.getName(), typedProperties, jsc(), spark(), metrics, + new DefaultStreamContext(schemaProvider.orElse(null), Option.of(sourceProfileSupplier))); + assertEquals(Source.SourceType.ROW, s3Source.getSourceType()); } private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option<String> checkpointToPull, long sourceLimit, String expectedCheckpoint, TypedProperties typedProperties) { S3EventsHoodieIncrSource incrSource = new S3EventsHoodieIncrSource(typedProperties, jsc(), - spark(), schemaProvider.orElse(null), mockQueryRunner, mockCloudDataFetcher); + spark(), mockQueryRunner, new CloudDataFetcher(typedProperties, jsc(), spark(), mockCloudObjectsSelectorCommon), + new DefaultStreamContext(schemaProvider.orElse(null), Option.of(sourceProfileSupplier))); Pair<Option<Dataset<Row>>, String> dataAndCheckpoint = incrSource.fetchNextBatch(checkpointToPull, sourceLimit); @@ -512,4 +561,30 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne readAndAssert(missingCheckpointStrategy, checkpointToPull, sourceLimit, expectedCheckpoint, typedProperties); } -} + + static class TestSourceProfile implements SourceProfile<Long> { + + private final long maxSourceBytes; + private final long bytesPerPartition; + + public TestSourceProfile(long maxSourceBytes, long bytesPerPartition) { + this.maxSourceBytes = maxSourceBytes; + this.bytesPerPartition = bytesPerPartition; + } + + @Override + public long getMaxSourceBytes() { + return maxSourceBytes; + } + + @Override + public int getSourcePartitions() { + throw new UnsupportedOperationException("getSourcePartitions is not required for S3 source profile"); + } + + @Override + public Long getSourceSpecificContext() { + return bytesPerPartition; + } + } +} \ No newline at end of file diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java index a57383c43b2..9e5d3d1f132 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java @@ -26,6 +26,7 @@ import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.schema.SchemaRegistryProvider; import org.apache.hudi.utilities.sources.InputBatch; +import org.apache.hudi.utilities.streamer.DefaultStreamContext; import org.apache.hudi.utilities.streamer.SourceFormatAdapter; import org.apache.hudi.utilities.testutils.UtilitiesTestBase; @@ -118,7 +119,7 @@ public abstract class TestAbstractDebeziumSource extends UtilitiesTestBase { TypedProperties props = createPropsForJsonSource(); SchemaProvider schemaProvider = new MockSchemaRegistryProvider(props, jsc, this); - SourceFormatAdapter debeziumSource = new SourceFormatAdapter(UtilHelpers.createSource(sourceClass, props, jsc, sparkSession, schemaProvider, metrics)); + SourceFormatAdapter debeziumSource = new SourceFormatAdapter(UtilHelpers.createSource(sourceClass, props, jsc, sparkSession, metrics, new DefaultStreamContext(schemaProvider, Option.empty()))); testUtils.sendMessages(testTopicName, new String[] {generateDebeziumEvent(operation).toString()}); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java index 79f15975cb5..4b30bb14b57 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java @@ -50,14 +50,16 @@ public class TestCloudObjectsSelectorCommon extends HoodieSparkClientTestHarness @Test public void emptyMetadataReturnsEmptyOption() { - Option<Dataset<Row>> result = CloudObjectsSelectorCommon.loadAsDataset(sparkSession, Collections.emptyList(), new TypedProperties(), "json"); + CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new CloudObjectsSelectorCommon(new TypedProperties()); + Option<Dataset<Row>> result = cloudObjectsSelectorCommon.loadAsDataset(sparkSession, Collections.emptyList(), "json", Option.empty(), 1); Assertions.assertFalse(result.isPresent()); } @Test public void filesFromMetadataRead() { + CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new CloudObjectsSelectorCommon(new TypedProperties()); List<CloudObjectMetadata> input = Collections.singletonList(new CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json", 1)); - Option<Dataset<Row>> result = CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, new TypedProperties(), "json"); + Option<Dataset<Row>> result = cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "json", Option.empty(), 1); Assertions.assertTrue(result.isPresent()); Assertions.assertEquals(1, result.get().count()); Row expected = RowFactory.create("some data"); @@ -70,7 +72,8 @@ public class TestCloudObjectsSelectorCommon extends HoodieSparkClientTestHarness TypedProperties properties = new TypedProperties(); properties.put("hoodie.streamer.source.cloud.data.partition.fields.from.path", "country,state"); - Option<Dataset<Row>> result = CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, properties, "json"); + CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new CloudObjectsSelectorCommon(properties); + Option<Dataset<Row>> result = cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "json", Option.empty(), 1); Assertions.assertTrue(result.isPresent()); Assertions.assertEquals(1, result.get().count()); Row expected = RowFactory.create("some data", "US", "CA"); @@ -85,27 +88,15 @@ public class TestCloudObjectsSelectorCommon extends HoodieSparkClientTestHarness props.put("hoodie.streamer.schemaprovider.source.schema.file", schemaFilePath); props.put("hoodie.streamer.schema.provider.class.name", FilebasedSchemaProvider.class.getName()); props.put("hoodie.streamer.source.cloud.data.partition.fields.from.path", "country,state"); + CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new CloudObjectsSelectorCommon(props); List<CloudObjectMetadata> input = Collections.singletonList(new CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json", 1)); - Option<Dataset<Row>> result = CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, props, "json", Option.of(new FilebasedSchemaProvider(props, jsc))); + Option<Dataset<Row>> result = cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "json", Option.of(new FilebasedSchemaProvider(props, jsc)), 1); Assertions.assertTrue(result.isPresent()); Assertions.assertEquals(1, result.get().count()); Row expected = RowFactory.create("some data", "US", "CA"); Assertions.assertEquals(Collections.singletonList(expected), result.get().collectAsList()); } - @Test - public void partitionKeyNotPresentInPath() { - List<CloudObjectMetadata> input = Collections.singletonList(new CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json", 1)); - TypedProperties properties = new TypedProperties(); - properties.put("hoodie.streamer.source.cloud.data.reader.comma.separated.path.format", "false"); - properties.put("hoodie.streamer.source.cloud.data.partition.fields.from.path", "unknown"); - Option<Dataset<Row>> result = CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, properties, "json"); - Assertions.assertTrue(result.isPresent()); - Assertions.assertEquals(1, result.get().count()); - Row expected = RowFactory.create("some data", null); - Assertions.assertEquals(Collections.singletonList(expected), result.get().collectAsList()); - } - @Test public void loadDatasetWithSchemaAndRepartition() { TypedProperties props = new TypedProperties(); @@ -121,10 +112,25 @@ public class TestCloudObjectsSelectorCommon extends HoodieSparkClientTestHarness new CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=TX/data.json", 1000), new CloudObjectMetadata("src/test/resources/data/partitioned/country=IND/state=TS/data.json", 1000) ); - Option<Dataset<Row>> result = CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, props, "json", Option.of(new FilebasedSchemaProvider(props, jsc))); + CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new CloudObjectsSelectorCommon(props); + Option<Dataset<Row>> result = cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "json", Option.of(new FilebasedSchemaProvider(props, jsc)), 30); Assertions.assertTrue(result.isPresent()); List<Row> expected = Arrays.asList(RowFactory.create("some data", "US", "CA"), RowFactory.create("some data", "US", "TX"), RowFactory.create("some data", "IND", "TS")); List<Row> actual = result.get().collectAsList(); Assertions.assertEquals(new HashSet<>(expected), new HashSet<>(actual)); } + + @Test + public void partitionKeyNotPresentInPath() { + List<CloudObjectMetadata> input = Collections.singletonList(new CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json", 1)); + TypedProperties properties = new TypedProperties(); + properties.put("hoodie.deltastreamer.source.cloud.data.reader.comma.separated.path.format", "false"); + properties.put("hoodie.deltastreamer.source.cloud.data.partition.fields.from.path", "unknown"); + CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new CloudObjectsSelectorCommon(properties); + Option<Dataset<Row>> result = cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "json", Option.empty(), 1); + Assertions.assertTrue(result.isPresent()); + Assertions.assertEquals(1, result.get().count()); + Row expected = RowFactory.create("some data", null); + Assertions.assertEquals(Collections.singletonList(expected), result.get().collectAsList()); + } }
