This is an automated email from the ASF dual-hosted git repository. yuzhaojing pushed a commit to branch release-0.12.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 0f1130d2670c1be1c42badc3dcd8970c7f84fa2f Author: Leon Tsao <[email protected]> AuthorDate: Thu Sep 29 14:18:41 2022 +0800 [HUDI-4936] Fix `as.of.instant` not recognized as hoodie config (#5616) Co-authored-by: leon <[email protected]> Co-authored-by: Raymond Xu <[email protected]> --- .../run/strategy/MultipleSparkJobExecutionStrategy.java | 3 ++- .../org/apache/hudi/common/config/HoodieCommonConfig.java | 5 +++++ .../java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java | 6 ++++-- .../src/main/scala/org/apache/hudi/DataSourceOptions.scala | 6 +----- .../org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala | 12 ++++++++++-- 5 files changed, 22 insertions(+), 10 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 0c52a7cc49..eab98f2f19 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -90,6 +90,7 @@ import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.common.config.HoodieCommonConfig.TIMESTAMP_AS_OF; import static org.apache.hudi.common.table.log.HoodieFileSliceReader.getFileSliceReader; import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS; @@ -375,7 +376,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa HashMap<String, String> params = new HashMap<>(); params.put("hoodie.datasource.query.type", "snapshot"); - params.put("as.of.instant", instantTime); + params.put(TIMESTAMP_AS_OF.key(), instantTime); Path[] paths; if (hasLogFiles) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java index 917cfe621f..00ff7e5683 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java @@ -36,6 +36,11 @@ public class HoodieCommonConfig extends HoodieConfig { .defaultValue(false) .withDocumentation("Enables support for Schema Evolution feature"); + public static final ConfigProperty<String> TIMESTAMP_AS_OF = ConfigProperty + .key("as.of.instant") + .noDefaultValue() + .withDocumentation("The query instant for time travel. Without specified this option, we query the latest snapshot."); + public static final ConfigProperty<Boolean> RECONCILE_SCHEMA = ConfigProperty .key("hoodie.datasource.write.reconcile.schema") .defaultValue(false) diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java index 44844a8d47..de1fd0055d 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java @@ -48,6 +48,8 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import static org.apache.hudi.common.config.HoodieCommonConfig.TIMESTAMP_AS_OF; + /** * Given a path is a part of - Hoodie table = accepts ONLY the latest version of each path - Non-Hoodie table = then * always accept @@ -183,13 +185,13 @@ public class HoodieROTablePathFilter implements Configurable, PathFilter, Serial metaClientCache.put(baseDir.toString(), metaClient); } - if (getConf().get("as.of.instant") != null) { + if (getConf().get(TIMESTAMP_AS_OF.key()) != null) { // Build FileSystemViewManager with specified time, it's necessary to set this config when you may // access old version files. For example, in spark side, using "hoodie.datasource.read.paths" // which contains old version files, if not specify this value, these files will be filtered. fsView = FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext, metaClient, HoodieInputFormatUtils.buildMetadataConfig(getConf()), - metaClient.getActiveTimeline().filterCompletedInstants().findInstantsBeforeOrEquals(getConf().get("as.of.instant"))); + metaClient.getActiveTimeline().filterCompletedInstants().findInstantsBeforeOrEquals(getConf().get(TIMESTAMP_AS_OF.key()))); } else { fsView = FileSystemViewManager.createInMemoryFileSystemView(engineContext, metaClient, HoodieInputFormatUtils.buildMetadataConfig(getConf())); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index e8ffb09ff9..6370a0752e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -116,11 +116,7 @@ object DataSourceReadOptions { .withDocumentation("For the use-cases like users only want to incremental pull from certain partitions " + "instead of the full table. This option allows using glob pattern to directly filter on path.") - val TIME_TRAVEL_AS_OF_INSTANT: ConfigProperty[String] = ConfigProperty - .key("as.of.instant") - .noDefaultValue() - .withDocumentation("The query instant for time travel. Without specified this option," + - " we query the latest snapshot.") + val TIME_TRAVEL_AS_OF_INSTANT: ConfigProperty[String] = HoodieCommonConfig.TIMESTAMP_AS_OF val ENABLE_DATA_SKIPPING: ConfigProperty[Boolean] = ConfigProperty .key("hoodie.enable.data.skipping") diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala index 63186c0759..025a224373 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala @@ -26,7 +26,7 @@ import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstantTimeGenerator} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util.PartitionPathEncodeUtils -import org.apache.hudi.{AvroConversionUtils, SparkAdapterSupport} +import org.apache.hudi.{AvroConversionUtils, DataSourceOptionsHelper, DataSourceReadOptions, SparkAdapterSupport} import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.Resolver @@ -250,9 +250,17 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport { (baseConfig: Map[String, String] = Map.empty): Map[String, String] = { baseConfig ++ DFSPropertiesConfiguration.getGlobalProps.asScala ++ // Table options has the highest priority (spark.sessionState.conf.getAllConfs ++ HoodieOptionConfig.mappingSqlOptionToHoodieParam(options)) - .filterKeys(_.startsWith("hoodie.")) + .filterKeys(isHoodieConfigKey) } + /** + * Check if Sql options are Hoodie Config keys. + * + * TODO: standardize the key prefix so that we don't need this helper (HUDI-4935) + */ + def isHoodieConfigKey(key: String): Boolean = + key.startsWith("hoodie.") || key == DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key + /** * Checks whether Spark is using Hive as Session's Catalog */
