This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 4538fb2fc3f070883a03cc254a6958f38bfffd1d Author: Rajesh Mahindra <[email protected]> AuthorDate: Tue Mar 5 17:32:51 2024 -0800 [HUDI-7418] Create a common method for filtering in S3 and GCS sources and add tests for filtering out extensions (#10724) Co-authored-by: rmahindra123 <[email protected]> --- .../hudi/utilities/config/CloudSourceConfig.java | 4 +- .../config/S3EventsHoodieIncrSourceConfig.java | 6 ++ .../sources/GcsEventsHoodieIncrSource.java | 8 +-- .../sources/S3EventsHoodieIncrSource.java | 50 +++------------- .../helpers/CloudObjectsSelectorCommon.java | 68 ++++++++++++++++++++++ .../helpers/gcs/GcsObjectMetadataFetcher.java | 39 +------------ .../sources/TestGcsEventsHoodieIncrSource.java | 42 +++++++++---- .../sources/TestS3EventsHoodieIncrSource.java | 6 +- 8 files changed, 124 insertions(+), 99 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java index 54be9cabef9..e3bdca1a395 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java @@ -85,14 +85,14 @@ public class CloudSourceConfig extends HoodieConfig { .noDefaultValue() .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + "source.cloud.data.select.relpath.prefix") .markAdvanced() - .withDocumentation("Only selects objects in the bucket whose relative path matches this prefix"); + .withDocumentation("Only selects objects in the bucket whose relative path starts with this prefix"); public static final ConfigProperty<String> IGNORE_RELATIVE_PATH_PREFIX = ConfigProperty .key(STREAMER_CONFIG_PREFIX + "source.cloud.data.ignore.relpath.prefix") .noDefaultValue() .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + "source.cloud.data.ignore.relpath.prefix") .markAdvanced() - .withDocumentation("Ignore objects in the bucket whose relative path matches this prefix"); + .withDocumentation("Ignore objects in the bucket whose relative path starts this prefix"); public static final ConfigProperty<String> IGNORE_RELATIVE_PATH_SUBSTR = ConfigProperty .key(STREAMER_CONFIG_PREFIX + "source.cloud.data.ignore.relpath.substring") diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/S3EventsHoodieIncrSourceConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/S3EventsHoodieIncrSourceConfig.java index 3db572b1f84..23ecb96d795 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/S3EventsHoodieIncrSourceConfig.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/S3EventsHoodieIncrSourceConfig.java @@ -47,6 +47,8 @@ public class S3EventsHoodieIncrSourceConfig extends HoodieConfig { .markAdvanced() .withDocumentation("Control whether we do existence check for files before consuming them"); + @Deprecated + // Use {@link CloudSourceConfig.SELECT_RELATIVE_PATH_PREFIX} public static final ConfigProperty<String> S3_KEY_PREFIX = ConfigProperty .key(STREAMER_CONFIG_PREFIX + "source.s3incr.key.prefix") .noDefaultValue() @@ -61,6 +63,8 @@ public class S3EventsHoodieIncrSourceConfig extends HoodieConfig { .markAdvanced() .withDocumentation("The file system prefix."); + @Deprecated + // Use {@link CloudSourceConfig.IGNORE_RELATIVE_PATH_PREFIX} public static final ConfigProperty<String> S3_IGNORE_KEY_PREFIX = ConfigProperty .key(STREAMER_CONFIG_PREFIX + "source.s3incr.ignore.key.prefix") .noDefaultValue() @@ -68,6 +72,8 @@ public class S3EventsHoodieIncrSourceConfig extends HoodieConfig { .markAdvanced() .withDocumentation("Control whether to ignore the s3 objects starting with this prefix"); + @Deprecated + // Use {@link CloudSourceConfig.IGNORE_RELATIVE_PATH_SUBSTR} public static final ConfigProperty<String> S3_IGNORE_KEY_SUBSTRING = ConfigProperty .key(STREAMER_CONFIG_PREFIX + "source.s3incr.ignore.key.substring") .noDefaultValue() diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java index 208aaaf3b5b..07950742909 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java @@ -27,6 +27,7 @@ import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher; import org.apache.hudi.utilities.sources.helpers.CloudObjectIncrCheckpoint; import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata; +import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon; import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy; import org.apache.hudi.utilities.sources.helpers.QueryInfo; @@ -114,10 +115,6 @@ public class GcsEventsHoodieIncrSource extends HoodieIncrSource { private final Option<SchemaProvider> schemaProvider; private final Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitter; - - public static final String GCS_OBJECT_KEY = "name"; - public static final String GCS_OBJECT_SIZE = "size"; - private static final Logger LOG = LoggerFactory.getLogger(GcsEventsHoodieIncrSource.class); public GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext jsc, SparkSession spark, @@ -161,7 +158,8 @@ public class GcsEventsHoodieIncrSource extends HoodieIncrSource { sparkContext, srcPath, numInstantsPerFetch, Option.of(cloudObjectIncrCheckpoint.getCommit()), missingCheckpointStrategy, handlingMode, HoodieRecord.COMMIT_TIME_METADATA_FIELD, - GCS_OBJECT_KEY, GCS_OBJECT_SIZE, true, + CloudObjectsSelectorCommon.GCS_OBJECT_KEY, + CloudObjectsSelectorCommon.GCS_OBJECT_SIZE, true, Option.ofNullable(cloudObjectIncrCheckpoint.getKey())); LOG.info("Querying GCS with:" + cloudObjectIncrCheckpoint + " and queryInfo:" + queryInfo); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java index c4ab7339fbb..84b267709ad 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java @@ -23,13 +23,13 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher; import org.apache.hudi.utilities.sources.helpers.CloudObjectIncrCheckpoint; import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata; +import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon; import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; import org.apache.hudi.utilities.sources.helpers.QueryInfo; import org.apache.hudi.utilities.sources.helpers.QueryRunner; @@ -50,15 +50,11 @@ import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys; import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; -import static org.apache.hudi.utilities.config.CloudSourceConfig.CLOUD_DATAFILE_EXTENSION; import static org.apache.hudi.utilities.config.CloudSourceConfig.ENABLE_EXISTS_CHECK; import static org.apache.hudi.utilities.config.HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH; import static org.apache.hudi.utilities.config.HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH; import static org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_FS_PREFIX; -import static org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_PREFIX; -import static org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_SUBSTRING; import static org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_INCR_ENABLE_EXISTS_CHECK; -import static org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_KEY_PREFIX; import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.getCloudObjectMetadataPerPartition; import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getHollowCommitHandleMode; import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getMissingCheckpointStrategy; @@ -87,18 +83,9 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource { @Deprecated static final Boolean DEFAULT_ENABLE_EXISTS_CHECK = S3_INCR_ENABLE_EXISTS_CHECK.defaultValue(); - // control whether to filter the s3 objects starting with this prefix - @Deprecated - static final String S3_KEY_PREFIX = S3EventsHoodieIncrSourceConfig.S3_KEY_PREFIX.key(); @Deprecated static final String S3_FS_PREFIX = S3EventsHoodieIncrSourceConfig.S3_FS_PREFIX.key(); - // control whether to ignore the s3 objects starting with this prefix - @Deprecated - static final String S3_IGNORE_KEY_PREFIX = S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_PREFIX.key(); - // control whether to ignore the s3 objects with this substring - @Deprecated - static final String S3_IGNORE_KEY_SUBSTRING = S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_SUBSTRING.key(); /** * {@link #SPARK_DATASOURCE_OPTIONS} is json string, passed to the reader while loading dataset. * Example Hudi Streamer conf @@ -108,10 +95,6 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource { public static final String SPARK_DATASOURCE_OPTIONS = S3EventsHoodieIncrSourceConfig.SPARK_DATASOURCE_OPTIONS.key(); } - public static final String S3_OBJECT_KEY = "s3.object.key"; - public static final String S3_OBJECT_SIZE = "s3.object.size"; - public static final String S3_BUCKET_NAME = "s3.bucket.name"; - public S3EventsHoodieIncrSource( TypedProperties props, JavaSparkContext sparkContext, @@ -140,27 +123,6 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource { this.snapshotLoadQuerySplitter = SnapshotLoadQuerySplitter.getInstance(props); } - public static String generateFilter(TypedProperties props) { - String fileFormat = CloudDataFetcher.getFileFormat(props); - String filter = S3_OBJECT_SIZE + " > 0"; - if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props, S3_KEY_PREFIX, true))) { - filter = filter + " and " + S3_OBJECT_KEY + " like '" + getStringWithAltKeys(props, S3_KEY_PREFIX) + "%'"; - } - if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props, S3_IGNORE_KEY_PREFIX, true))) { - filter = filter + " and " + S3_OBJECT_KEY + " not like '" + getStringWithAltKeys(props, S3_IGNORE_KEY_PREFIX) + "%'"; - } - if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props, S3_IGNORE_KEY_SUBSTRING, true))) { - filter = filter + " and " + S3_OBJECT_KEY + " not like '%" + getStringWithAltKeys(props, S3_IGNORE_KEY_SUBSTRING) + "%'"; - } - // Match files with a given extension, or use the fileFormat as the fallback incase the config is not set. - if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props, CLOUD_DATAFILE_EXTENSION, true))) { - filter = filter + " and " + S3_OBJECT_KEY + " like '%" + getStringWithAltKeys(props, CLOUD_DATAFILE_EXTENSION) + "'"; - } else { - filter = filter + " and " + S3_OBJECT_KEY + " like '%" + fileFormat + "%'"; - } - return filter; - } - @Override public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCheckpoint, long sourceLimit) { CloudObjectIncrCheckpoint cloudObjectIncrCheckpoint = CloudObjectIncrCheckpoint.fromString(lastCheckpoint); @@ -171,7 +133,8 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource { Option.of(cloudObjectIncrCheckpoint.getCommit()), missingCheckpointStrategy, handlingMode, HoodieRecord.COMMIT_TIME_METADATA_FIELD, - S3_OBJECT_KEY, S3_OBJECT_SIZE, true, + CloudObjectsSelectorCommon.S3_OBJECT_KEY, + CloudObjectsSelectorCommon.S3_OBJECT_SIZE, true, Option.ofNullable(cloudObjectIncrCheckpoint.getKey())); LOG.info("Querying S3 with:" + cloudObjectIncrCheckpoint + ", queryInfo:" + queryInfo); @@ -181,7 +144,8 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource { } Pair<QueryInfo, Dataset<Row>> queryInfoDatasetPair = queryRunner.run(queryInfo, snapshotLoadQuerySplitter); queryInfo = queryInfoDatasetPair.getLeft(); - Dataset<Row> filteredSourceData = queryInfoDatasetPair.getRight().filter(generateFilter(props)); + Dataset<Row> filteredSourceData = queryInfoDatasetPair.getRight().filter( + CloudObjectsSelectorCommon.generateFilter(CloudObjectsSelectorCommon.Type.S3, props)); LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based on sourceLimit :" + sourceLimit); Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset = @@ -199,7 +163,9 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource { // Create S3 paths SerializableConfiguration serializableHadoopConf = new SerializableConfiguration(sparkContext.hadoopConfiguration()); List<CloudObjectMetadata> cloudObjectMetadata = checkPointAndDataset.getRight().get() - .select(S3_BUCKET_NAME, S3_OBJECT_KEY, S3_OBJECT_SIZE) + .select(CloudObjectsSelectorCommon.S3_BUCKET_NAME, + CloudObjectsSelectorCommon.S3_OBJECT_KEY, + CloudObjectsSelectorCommon.S3_OBJECT_SIZE) .distinct() .mapPartitions(getCloudObjectMetadataPerPartition(s3Prefix, serializableHadoopConf, checkIfFileExists), Encoders.kryo(CloudObjectMetadata.class)) .collectAsList(); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java index 5ed7dcae897..8676bf41cb5 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java @@ -19,6 +19,7 @@ package org.apache.hudi.utilities.sources.helpers; import org.apache.hudi.AvroConversionUtils; +import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; @@ -56,9 +57,16 @@ import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_MAX_FILE import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty; import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty; import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; +import static org.apache.hudi.utilities.config.CloudSourceConfig.CLOUD_DATAFILE_EXTENSION; +import static org.apache.hudi.utilities.config.CloudSourceConfig.IGNORE_RELATIVE_PATH_PREFIX; +import static org.apache.hudi.utilities.config.CloudSourceConfig.IGNORE_RELATIVE_PATH_SUBSTR; import static org.apache.hudi.utilities.config.CloudSourceConfig.PATH_BASED_PARTITION_FIELDS; +import static org.apache.hudi.utilities.config.CloudSourceConfig.SELECT_RELATIVE_PATH_PREFIX; import static org.apache.hudi.utilities.config.CloudSourceConfig.SOURCE_MAX_BYTES_PER_PARTITION; import static org.apache.hudi.utilities.config.CloudSourceConfig.SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT; +import static org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_PREFIX; +import static org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_SUBSTRING; +import static org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_KEY_PREFIX; import static org.apache.spark.sql.functions.input_file_name; import static org.apache.spark.sql.functions.split; @@ -71,6 +79,13 @@ public class CloudObjectsSelectorCommon { private static final Logger LOG = LoggerFactory.getLogger(CloudObjectsSelectorCommon.class); + public static final String S3_OBJECT_KEY = "s3.object.key"; + public static final String S3_OBJECT_SIZE = "s3.object.size"; + public static final String S3_BUCKET_NAME = "s3.bucket.name"; + public static final String GCS_OBJECT_KEY = "name"; + public static final String GCS_OBJECT_SIZE = "size"; + private static final String SPACE_DELIMTER = " "; + /** * Return a function that extracts filepaths from a list of Rows. * Here Row is assumed to have the schema [bucket_name, filepath_relative_to_bucket, object_size] @@ -151,6 +166,45 @@ public class CloudObjectsSelectorCommon { } } + public static String generateFilter(Type type, + TypedProperties props) { + String fileFormat = CloudDataFetcher.getFileFormat(props); + Option<String> selectRelativePathPrefix = getPropVal(props, SELECT_RELATIVE_PATH_PREFIX); + Option<String> ignoreRelativePathPrefix = getPropVal(props, IGNORE_RELATIVE_PATH_PREFIX); + Option<String> ignoreRelativePathSubStr = getPropVal(props, IGNORE_RELATIVE_PATH_SUBSTR); + + String objectKey; + String objectSizeKey; + // This is for backwards compatibility of configs for s3. + if (type.equals(Type.S3)) { + objectKey = S3_OBJECT_KEY; + objectSizeKey = S3_OBJECT_SIZE; + selectRelativePathPrefix = selectRelativePathPrefix.or(() -> getPropVal(props, S3_KEY_PREFIX)); + ignoreRelativePathPrefix = ignoreRelativePathPrefix.or(() -> getPropVal(props, S3_IGNORE_KEY_PREFIX)); + ignoreRelativePathSubStr = ignoreRelativePathSubStr.or(() -> getPropVal(props, S3_IGNORE_KEY_SUBSTRING)); + } else { + objectKey = GCS_OBJECT_KEY; + objectSizeKey = GCS_OBJECT_SIZE; + } + + StringBuilder filter = new StringBuilder(String.format("%s > 0", objectSizeKey)); + if (selectRelativePathPrefix.isPresent()) { + filter.append(SPACE_DELIMTER).append(String.format("and %s like '%s%%'", objectKey, selectRelativePathPrefix.get())); + } + if (ignoreRelativePathPrefix.isPresent()) { + filter.append(SPACE_DELIMTER).append(String.format("and %s not like '%s%%'", objectKey, ignoreRelativePathPrefix.get())); + } + if (ignoreRelativePathSubStr.isPresent()) { + filter.append(SPACE_DELIMTER).append(String.format("and %s not like '%%%s%%'", objectKey, ignoreRelativePathSubStr.get())); + } + + // Match files with a given extension, or use the fileFormat as the default. + getPropVal(props, CLOUD_DATAFILE_EXTENSION).or(() -> Option.of(fileFormat)) + .map(val -> filter.append(SPACE_DELIMTER).append(String.format("and %s like '%%%s'", objectKey, val))); + + return filter.toString(); + } + public static Option<Dataset<Row>> loadAsDataset(SparkSession spark, List<CloudObjectMetadata> cloudObjectMetadata, TypedProperties props, String fileFormat, Option<SchemaProvider> schemaProviderOption) { if (LOG.isDebugEnabled()) { @@ -233,4 +287,18 @@ public class CloudObjectsSelectorCommon { public static Option<Dataset<Row>> loadAsDataset(SparkSession spark, List<CloudObjectMetadata> cloudObjectMetadata, TypedProperties props, String fileFormat) { return loadAsDataset(spark, cloudObjectMetadata, props, fileFormat, Option.empty()); } + + private static Option<String> getPropVal(TypedProperties props, ConfigProperty<String> configProperty) { + String value = getStringWithAltKeys(props, configProperty, true); + if (!StringUtils.isNullOrEmpty(value)) { + return Option.of(value); + } + + return Option.empty(); + } + + public enum Type { + S3, + GCS + } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java index 44480d91f65..29a50e81fb0 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java @@ -18,12 +18,10 @@ package org.apache.hudi.utilities.sources.helpers.gcs; -import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher; import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata; +import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; @@ -35,12 +33,6 @@ import org.slf4j.LoggerFactory; import java.io.Serializable; import java.util.List; -import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; -import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; -import static org.apache.hudi.utilities.config.CloudSourceConfig.CLOUD_DATAFILE_EXTENSION; -import static org.apache.hudi.utilities.config.CloudSourceConfig.IGNORE_RELATIVE_PATH_PREFIX; -import static org.apache.hudi.utilities.config.CloudSourceConfig.IGNORE_RELATIVE_PATH_SUBSTR; -import static org.apache.hudi.utilities.config.CloudSourceConfig.SELECT_RELATIVE_PATH_PREFIX; import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.getCloudObjectMetadataPerPartition; /** @@ -78,40 +70,13 @@ public class GcsObjectMetadataFetcher implements Serializable { .collectAsList(); } - /** - * Add optional filters that narrow down the list of GCS objects to fetch. - */ - public static String generateFilter(TypedProperties props) { - StringBuilder filter = new StringBuilder("size > 0"); - - getPropVal(props, SELECT_RELATIVE_PATH_PREFIX).ifPresent(val -> filter.append(" and name like '" + val + "%'")); - getPropVal(props, IGNORE_RELATIVE_PATH_PREFIX).ifPresent(val -> filter.append(" and name not like '" + val + "%'")); - getPropVal(props, IGNORE_RELATIVE_PATH_SUBSTR).ifPresent(val -> filter.append(" and name not like '%" + val + "%'")); - - // Match files with a given extension, or use the fileFormat as the default. - String fileFormat = CloudDataFetcher.getFileFormat(props); - getPropVal(props, CLOUD_DATAFILE_EXTENSION).or(() -> Option.of(fileFormat)) - .map(val -> filter.append(" and name like '%" + val + "'")); - - return filter.toString(); - } - - private static Option<String> getPropVal(TypedProperties props, ConfigProperty<String> configProperty) { - String value = getStringWithAltKeys(props, configProperty, true); - if (!isNullOrEmpty(value)) { - return Option.of(value); - } - - return Option.empty(); - } - /** * @param cloudObjectMetadataDF a Dataset that contains metadata of GCS objects. Assumed to be a persisted form * of a Cloud Storage Pubsub Notification event. * @return Dataset<Row> after apply the filtering. */ public Dataset<Row> applyFilter(Dataset<Row> cloudObjectMetadataDF) { - String filter = generateFilter(props); + String filter = CloudObjectsSelectorCommon.generateFilter(CloudObjectsSelectorCommon.Type.GCS, props); LOG.info("Adding filter string to Dataset: " + filter); return cloudObjectMetadataDF.filter(filter); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java index 4e37c17b43a..c1844c7a2a1 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java @@ -60,6 +60,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; @@ -86,6 +87,7 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn private static final Schema GCS_METADATA_SCHEMA = SchemaTestUtil.getSchemaFromResource( TestGcsEventsHoodieIncrSource.class, "/streamer-config/gcs-metadata.avsc", true); + private static final String IGNORE_FILE_EXTENSION = ".ignore"; private ObjectMapper mapper = new ObjectMapper(); @@ -196,28 +198,44 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file10006.json"), 250L, "1#path/to/file10007.json"); } - @Test - public void testTwoFilesAndContinueAcrossCommits() throws IOException { + @ParameterizedTest + @ValueSource(strings = { + ".json", + ".gz" + }) + public void testTwoFilesAndContinueAcrossCommits(String extension) throws IOException { String commitTimeForWrites = "2"; String commitTimeForReads = "1"; Pair<String, List<HoodieRecord>> inserts = writeGcsMetadataRecords(commitTimeForWrites); + + TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT); + // In the case the extension is explicitly set to something other than the file format. + if (!extension.endsWith("json")) { + typedProperties.setProperty(CloudSourceConfig.CLOUD_DATAFILE_EXTENSION.key(), extension); + } + List<Triple<String, Long, String>> filePathSizeAndCommitTime = new ArrayList<>(); - // Add file paths and sizes to the list - filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L, "1")); - filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1")); - filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 150L, "1")); - filePathSizeAndCommitTime.add(Triple.of("path/to/file4.json", 50L, "2")); - filePathSizeAndCommitTime.add(Triple.of("path/to/file5.json", 150L, "2")); + // Add file paths and sizes to the list. + // Check with a couple of invalid file extensions to ensure they are filtered out. + filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file1%s", extension), 100L, "1")); + filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file2%s", IGNORE_FILE_EXTENSION), 800L, "1")); + filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file3%s", extension), 200L, "1")); + filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file2%s", extension), 150L, "1")); + filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file4%s", extension), 50L, "2")); + filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file4%s", IGNORE_FILE_EXTENSION), 200L, "2")); + filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file5%s", extension), 150L, "2")); Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); setMockQueryRunner(inputDs); - readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 100L, "1#path/to/file1.json"); - readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1.json"), 100L, "1#path/to/file2.json"); - readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"), 1000L, "2#path/to/file5.json"); - readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 100L, "1#path/to/file1.json"); + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 100L, + "1#path/to/file1" + extension, typedProperties); + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1" + extension), 100L, + "1#path/to/file2" + extension, typedProperties); + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2" + extension), 1000L, + "2#path/to/file5" + extension, typedProperties); } @ParameterizedTest diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java index 33faac5361f..90fbeb3bb35 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java @@ -87,6 +87,7 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne private ObjectMapper mapper = new ObjectMapper(); private static final String MY_BUCKET = "some-bucket"; + private static final String IGNORE_FILE_EXTENSION = ".ignore"; private Option<SchemaProvider> schemaProvider; @Mock @@ -308,11 +309,14 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne } List<Triple<String, Long, String>> filePathSizeAndCommitTime = new ArrayList<>(); - // Add file paths and sizes to the list + // Add file paths and sizes to the list. + // Check with a couple of invalid file extensions to ensure they are filtered out. filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file1%s", extension), 100L, "1")); + filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file2%s", IGNORE_FILE_EXTENSION), 800L, "1")); filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file3%s", extension), 200L, "1")); filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file2%s", extension), 150L, "1")); filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file4%s", extension), 50L, "2")); + filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file4%s", IGNORE_FILE_EXTENSION), 200L, "2")); filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file5%s", extension), 150L, "2")); Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
