This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8a09c77061e [HUDI-7416] Remove duplicate code for getFileFormat and
Refactor filter methods for S3/GCS sources (#10701)
8a09c77061e is described below
commit 8a09c77061e0ba720c53435e07b68a6bc129e876
Author: Vinish Reddy <[email protected]>
AuthorDate: Tue Feb 20 11:34:12 2024 +0530
[HUDI-7416] Remove duplicate code for getFileFormat and Refactor filter
methods for S3/GCS sources (#10701)
---
.../sources/GcsEventsHoodieIncrSource.java | 11 +---
.../sources/S3EventsHoodieIncrSource.java | 58 +++++++++-------------
.../sources/helpers/CloudDataFetcher.java | 27 +++++++---
.../helpers/gcs/GcsObjectMetadataFetcher.java | 49 ++++++++----------
.../sources/TestGcsEventsHoodieIncrSource.java | 5 +-
5 files changed, 68 insertions(+), 82 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
index a06130d3972..208aaaf3b5b 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
@@ -48,11 +48,9 @@ import static
org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
-import static
org.apache.hudi.utilities.config.CloudSourceConfig.DATAFILE_FORMAT;
import static
org.apache.hudi.utilities.config.CloudSourceConfig.ENABLE_EXISTS_CHECK;
import static
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH;
import static
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH;
-import static
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.SOURCE_FILE_FORMAT;
import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.generateQueryInfo;
import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getHollowCommitHandleMode;
import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getMissingCheckpointStrategy;
@@ -126,8 +124,8 @@ public class GcsEventsHoodieIncrSource extends
HoodieIncrSource {
SchemaProvider schemaProvider) {
this(props, jsc, spark, schemaProvider,
- new GcsObjectMetadataFetcher(props, getSourceFileFormat(props)),
- new CloudDataFetcher(props, getStringWithAltKeys(props,
DATAFILE_FORMAT, true)),
+ new GcsObjectMetadataFetcher(props),
+ new CloudDataFetcher(props),
new QueryRunner(spark, props)
);
}
@@ -196,9 +194,4 @@ public class GcsEventsHoodieIncrSource extends
HoodieIncrSource {
Option<Dataset<Row>> fileDataRows =
gcsObjectDataFetcher.getCloudObjectDataDF(sparkSession, cloudObjectMetadata,
props, schemaProvider);
return Pair.of(fileDataRows, queryInfo.getEndInstant());
}
-
- private static String getSourceFileFormat(TypedProperties props) {
- return getStringWithAltKeys(props, SOURCE_FILE_FORMAT, true);
- }
-
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
index 4cbec4d2212..c4ab7339fbb 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
@@ -25,7 +25,6 @@ import
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.utilities.config.CloudSourceConfig;
import org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
@@ -52,11 +51,9 @@ import static
org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
import static
org.apache.hudi.utilities.config.CloudSourceConfig.CLOUD_DATAFILE_EXTENSION;
-import static
org.apache.hudi.utilities.config.CloudSourceConfig.DATAFILE_FORMAT;
import static
org.apache.hudi.utilities.config.CloudSourceConfig.ENABLE_EXISTS_CHECK;
import static
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH;
import static
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH;
-import static
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.SOURCE_FILE_FORMAT;
import static
org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_FS_PREFIX;
import static
org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_PREFIX;
import static
org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_SUBSTRING;
@@ -72,11 +69,9 @@ import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getMiss
public class S3EventsHoodieIncrSource extends HoodieIncrSource {
private static final Logger LOG =
LoggerFactory.getLogger(S3EventsHoodieIncrSource.class);
- private static final String EMPTY_STRING = "";
private final String srcPath;
private final int numInstantsPerFetch;
private final boolean checkIfFileExists;
- private final String fileFormat;
private final IncrSourceHelper.MissingCheckpointStrategy
missingCheckpointStrategy;
private final QueryRunner queryRunner;
private final CloudDataFetcher cloudDataFetcher;
@@ -123,7 +118,7 @@ public class S3EventsHoodieIncrSource extends
HoodieIncrSource {
SparkSession sparkSession,
SchemaProvider schemaProvider) {
this(props, sparkContext, sparkSession, schemaProvider, new
QueryRunner(sparkSession, props),
- new CloudDataFetcher(props, getStringWithAltKeys(props,
CloudSourceConfig.DATAFILE_FORMAT, true)));
+ new CloudDataFetcher(props));
}
public S3EventsHoodieIncrSource(
@@ -138,13 +133,6 @@ public class S3EventsHoodieIncrSource extends
HoodieIncrSource {
this.srcPath = getStringWithAltKeys(props, HOODIE_SRC_BASE_PATH);
this.numInstantsPerFetch = getIntWithAltKeys(props,
NUM_INSTANTS_PER_FETCH);
this.checkIfFileExists = getBooleanWithAltKeys(props, ENABLE_EXISTS_CHECK);
-
- // This is to ensure backward compatibility where we were using the
- // config SOURCE_FILE_FORMAT for file format in previous versions.
- this.fileFormat = StringUtils.isNullOrEmpty(getStringWithAltKeys(props,
DATAFILE_FORMAT, EMPTY_STRING))
- ? getStringWithAltKeys(props, SOURCE_FILE_FORMAT, true)
- : getStringWithAltKeys(props, DATAFILE_FORMAT, EMPTY_STRING);
-
this.missingCheckpointStrategy = getMissingCheckpointStrategy(props);
this.queryRunner = queryRunner;
this.cloudDataFetcher = cloudDataFetcher;
@@ -152,6 +140,27 @@ public class S3EventsHoodieIncrSource extends
HoodieIncrSource {
this.snapshotLoadQuerySplitter =
SnapshotLoadQuerySplitter.getInstance(props);
}
+ public static String generateFilter(TypedProperties props) {
+ String fileFormat = CloudDataFetcher.getFileFormat(props);
+ String filter = S3_OBJECT_SIZE + " > 0";
+ if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props, S3_KEY_PREFIX,
true))) {
+ filter = filter + " and " + S3_OBJECT_KEY + " like '" +
getStringWithAltKeys(props, S3_KEY_PREFIX) + "%'";
+ }
+ if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props,
S3_IGNORE_KEY_PREFIX, true))) {
+ filter = filter + " and " + S3_OBJECT_KEY + " not like '" +
getStringWithAltKeys(props, S3_IGNORE_KEY_PREFIX) + "%'";
+ }
+ if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props,
S3_IGNORE_KEY_SUBSTRING, true))) {
+ filter = filter + " and " + S3_OBJECT_KEY + " not like '%" +
getStringWithAltKeys(props, S3_IGNORE_KEY_SUBSTRING) + "%'";
+ }
+ // Match files with a given extension, or use the fileFormat as the
fallback incase the config is not set.
+ if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props,
CLOUD_DATAFILE_EXTENSION, true))) {
+ filter = filter + " and " + S3_OBJECT_KEY + " like '%" +
getStringWithAltKeys(props, CLOUD_DATAFILE_EXTENSION) + "'";
+ } else {
+ filter = filter + " and " + S3_OBJECT_KEY + " like '%" + fileFormat +
"%'";
+ }
+ return filter;
+ }
+
@Override
public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String>
lastCheckpoint, long sourceLimit) {
CloudObjectIncrCheckpoint cloudObjectIncrCheckpoint =
CloudObjectIncrCheckpoint.fromString(lastCheckpoint);
@@ -172,7 +181,7 @@ public class S3EventsHoodieIncrSource extends
HoodieIncrSource {
}
Pair<QueryInfo, Dataset<Row>> queryInfoDatasetPair =
queryRunner.run(queryInfo, snapshotLoadQuerySplitter);
queryInfo = queryInfoDatasetPair.getLeft();
- Dataset<Row> filteredSourceData =
applyFilter(queryInfoDatasetPair.getRight(), fileFormat);
+ Dataset<Row> filteredSourceData =
queryInfoDatasetPair.getRight().filter(generateFilter(props));
LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based
on sourceLimit :" + sourceLimit);
Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset
=
@@ -199,25 +208,4 @@ public class S3EventsHoodieIncrSource extends
HoodieIncrSource {
Option<Dataset<Row>> datasetOption =
cloudDataFetcher.getCloudObjectDataDF(sparkSession, cloudObjectMetadata, props,
schemaProvider);
return Pair.of(datasetOption, checkPointAndDataset.getLeft().toString());
}
-
- Dataset<Row> applyFilter(Dataset<Row> source, String fileFormat) {
- String filter = S3_OBJECT_SIZE + " > 0";
- if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props, S3_KEY_PREFIX,
true))) {
- filter = filter + " and " + S3_OBJECT_KEY + " like '" +
getStringWithAltKeys(props, S3_KEY_PREFIX) + "%'";
- }
- if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props,
S3_IGNORE_KEY_PREFIX, true))) {
- filter = filter + " and " + S3_OBJECT_KEY + " not like '" +
getStringWithAltKeys(props, S3_IGNORE_KEY_PREFIX) + "%'";
- }
- if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props,
S3_IGNORE_KEY_SUBSTRING, true))) {
- filter = filter + " and " + S3_OBJECT_KEY + " not like '%" +
getStringWithAltKeys(props, S3_IGNORE_KEY_SUBSTRING) + "%'";
- }
- // Match files with a given extension, or use the fileFormat as the
fallback incase the config is not set.
- if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props,
CLOUD_DATAFILE_EXTENSION, true))) {
- filter = filter + " and " + S3_OBJECT_KEY + " like '%" +
getStringWithAltKeys(props, CLOUD_DATAFILE_EXTENSION) + "'";
- } else {
- filter = filter + " and " + S3_OBJECT_KEY + " like '%" + fileFormat +
"%'";
- }
-
- return source.filter(filter);
- }
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
index 9595ec1a9e6..ed1a49e33e7 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
@@ -20,17 +20,21 @@ package org.apache.hudi.utilities.sources.helpers;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.utilities.schema.SchemaProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.List;
+import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
+import static
org.apache.hudi.utilities.config.CloudSourceConfig.DATAFILE_FORMAT;
+import static
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.SOURCE_FILE_FORMAT;
import static
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.loadAsDataset;
/**
@@ -39,21 +43,28 @@ import static
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorComm
*/
public class CloudDataFetcher implements Serializable {
- private final String fileFormat;
- private TypedProperties props;
+ private static final String EMPTY_STRING = "";
+
+ private final TypedProperties props;
private static final Logger LOG =
LoggerFactory.getLogger(CloudDataFetcher.class);
private static final long serialVersionUID = 1L;
- public CloudDataFetcher(TypedProperties props, String fileFormat) {
- this.fileFormat = fileFormat;
+ public CloudDataFetcher(TypedProperties props) {
this.props = props;
}
+ public static String getFileFormat(TypedProperties props) {
+ // This is to ensure backward compatibility where we were using the
+ // config SOURCE_FILE_FORMAT for file format in previous versions.
+ return StringUtils.isNullOrEmpty(getStringWithAltKeys(props,
DATAFILE_FORMAT, EMPTY_STRING))
+ ? getStringWithAltKeys(props, SOURCE_FILE_FORMAT, true)
+ : getStringWithAltKeys(props, DATAFILE_FORMAT, EMPTY_STRING);
+ }
+
public Option<Dataset<Row>> getCloudObjectDataDF(SparkSession spark,
List<CloudObjectMetadata> cloudObjectMetadata,
TypedProperties props,
Option<SchemaProvider> schemaProviderOption) {
- return loadAsDataset(spark, cloudObjectMetadata, props, fileFormat,
schemaProviderOption);
+ return loadAsDataset(spark, cloudObjectMetadata, props,
getFileFormat(props), schemaProviderOption);
}
-
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
index c92901d14cf..44480d91f65 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata;
import org.apache.spark.api.java.JavaSparkContext;
@@ -51,10 +52,6 @@ import static
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorComm
*/
public class GcsObjectMetadataFetcher implements Serializable {
- /**
- * The default file format to assume if {@link
GcsIngestionConfig#GCS_INCR_DATAFILE_EXTENSION} is not given.
- */
- private final String fileFormat;
private final TypedProperties props;
private static final String GCS_PREFIX = "gs://";
@@ -62,13 +59,8 @@ public class GcsObjectMetadataFetcher implements
Serializable {
private static final Logger LOG =
LoggerFactory.getLogger(GcsObjectMetadataFetcher.class);
- /**
- * @param fileFormat The default file format to assume if {@link
GcsIngestionConfig#GCS_INCR_DATAFILE_EXTENSION}
- * is not given.
- */
- public GcsObjectMetadataFetcher(TypedProperties props, String fileFormat) {
+ public GcsObjectMetadataFetcher(TypedProperties props) {
this.props = props;
- this.fileFormat = fileFormat;
}
/**
@@ -86,36 +78,25 @@ public class GcsObjectMetadataFetcher implements
Serializable {
.collectAsList();
}
- /**
- * @param cloudObjectMetadataDF a Dataset that contains metadata of GCS
objects. Assumed to be a persisted form
- * of a Cloud Storage Pubsub Notification event.
- * @return Dataset<Row> after apply the filtering.
- */
- public Dataset<Row> applyFilter(Dataset<Row> cloudObjectMetadataDF) {
- String filter = createFilter();
- LOG.info("Adding filter string to Dataset: " + filter);
-
- return cloudObjectMetadataDF.filter(filter);
- }
-
/**
* Add optional filters that narrow down the list of GCS objects to fetch.
*/
- private String createFilter() {
+ public static String generateFilter(TypedProperties props) {
StringBuilder filter = new StringBuilder("size > 0");
- getPropVal(SELECT_RELATIVE_PATH_PREFIX).ifPresent(val -> filter.append("
and name like '" + val + "%'"));
- getPropVal(IGNORE_RELATIVE_PATH_PREFIX).ifPresent(val -> filter.append("
and name not like '" + val + "%'"));
- getPropVal(IGNORE_RELATIVE_PATH_SUBSTR).ifPresent(val -> filter.append("
and name not like '%" + val + "%'"));
+ getPropVal(props, SELECT_RELATIVE_PATH_PREFIX).ifPresent(val ->
filter.append(" and name like '" + val + "%'"));
+ getPropVal(props, IGNORE_RELATIVE_PATH_PREFIX).ifPresent(val ->
filter.append(" and name not like '" + val + "%'"));
+ getPropVal(props, IGNORE_RELATIVE_PATH_SUBSTR).ifPresent(val ->
filter.append(" and name not like '%" + val + "%'"));
// Match files with a given extension, or use the fileFormat as the
default.
- getPropVal(CLOUD_DATAFILE_EXTENSION).or(() -> Option.of(fileFormat))
+ String fileFormat = CloudDataFetcher.getFileFormat(props);
+ getPropVal(props, CLOUD_DATAFILE_EXTENSION).or(() -> Option.of(fileFormat))
.map(val -> filter.append(" and name like '%" + val + "'"));
return filter.toString();
}
- private Option<String> getPropVal(ConfigProperty<String> configProperty) {
+ private static Option<String> getPropVal(TypedProperties props,
ConfigProperty<String> configProperty) {
String value = getStringWithAltKeys(props, configProperty, true);
if (!isNullOrEmpty(value)) {
return Option.of(value);
@@ -123,4 +104,16 @@ public class GcsObjectMetadataFetcher implements
Serializable {
return Option.empty();
}
+
+ /**
+ * @param cloudObjectMetadataDF a Dataset that contains metadata of GCS
objects. Assumed to be a persisted form
+ * of a Cloud Storage Pubsub Notification event.
+ * @return Dataset<Row> after apply the filtering.
+ */
+ public Dataset<Row> applyFilter(Dataset<Row> cloudObjectMetadataDF) {
+ String filter = generateFilter(props);
+ LOG.info("Adding filter string to Dataset: " + filter);
+
+ return cloudObjectMetadataDF.filter(filter);
+ }
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
index bc2906d251f..4e37c17b43a 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
@@ -36,6 +36,7 @@ import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.hudi.utilities.config.CloudSourceConfig;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
@@ -283,7 +284,7 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
TypedProperties typedProperties) {
GcsEventsHoodieIncrSource incrSource = new
GcsEventsHoodieIncrSource(typedProperties, jsc(),
- spark(), schemaProvider.orElse(null), new
GcsObjectMetadataFetcher(typedProperties, "json"), gcsObjectDataFetcher,
queryRunner);
+ spark(), schemaProvider.orElse(null), new
GcsObjectMetadataFetcher(typedProperties), gcsObjectDataFetcher, queryRunner);
Pair<Option<Dataset<Row>>, String> dataAndCheckpoint =
incrSource.fetchNextBatch(checkpointToPull, sourceLimit);
@@ -374,7 +375,7 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path",
basePath());
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy",
missingCheckpointStrategy.name());
-
properties.setProperty("hoodie.deltastreamer.source.gcsincr.datafile.format",
"json");
+ properties.setProperty(CloudSourceConfig.DATAFILE_FORMAT.key(), "json");
return new TypedProperties(properties);
}