This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 0f1130d2670c1be1c42badc3dcd8970c7f84fa2f
Author: Leon Tsao <[email protected]>
AuthorDate: Thu Sep 29 14:18:41 2022 +0800

    [HUDI-4936] Fix `as.of.instant` not recognized as hoodie config (#5616)
    
    
    Co-authored-by: leon <[email protected]>
    Co-authored-by: Raymond Xu <[email protected]>
---
 .../run/strategy/MultipleSparkJobExecutionStrategy.java      |  3 ++-
 .../org/apache/hudi/common/config/HoodieCommonConfig.java    |  5 +++++
 .../java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java |  6 ++++--
 .../src/main/scala/org/apache/hudi/DataSourceOptions.scala   |  6 +-----
 .../org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala     | 12 ++++++++++--
 5 files changed, 22 insertions(+), 10 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 0c52a7cc49..eab98f2f19 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -90,6 +90,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.hudi.common.config.HoodieCommonConfig.TIMESTAMP_AS_OF;
 import static 
org.apache.hudi.common.table.log.HoodieFileSliceReader.getFileSliceReader;
 import static 
org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
 
@@ -375,7 +376,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T 
extends HoodieRecordPa
 
     HashMap<String, String> params = new HashMap<>();
     params.put("hoodie.datasource.query.type", "snapshot");
-    params.put("as.of.instant", instantTime);
+    params.put(TIMESTAMP_AS_OF.key(), instantTime);
 
     Path[] paths;
     if (hasLogFiles) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
index 917cfe621f..00ff7e5683 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
@@ -36,6 +36,11 @@ public class HoodieCommonConfig extends HoodieConfig {
       .defaultValue(false)
       .withDocumentation("Enables support for Schema Evolution feature");
 
+  public static final ConfigProperty<String> TIMESTAMP_AS_OF = ConfigProperty
+      .key("as.of.instant")
+      .noDefaultValue()
+      .withDocumentation("The query instant for time travel. Without specified 
this option, we query the latest snapshot.");
+
   public static final ConfigProperty<Boolean> RECONCILE_SCHEMA = ConfigProperty
       .key("hoodie.datasource.write.reconcile.schema")
       .defaultValue(false)
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
index 44844a8d47..de1fd0055d 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
@@ -48,6 +48,8 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.common.config.HoodieCommonConfig.TIMESTAMP_AS_OF;
+
 /**
  * Given a path is a part of - Hoodie table = accepts ONLY the latest version 
of each path - Non-Hoodie table = then
  * always accept
@@ -183,13 +185,13 @@ public class HoodieROTablePathFilter implements 
Configurable, PathFilter, Serial
             metaClientCache.put(baseDir.toString(), metaClient);
           }
 
-          if (getConf().get("as.of.instant") != null) {
+          if (getConf().get(TIMESTAMP_AS_OF.key()) != null) {
             // Build FileSystemViewManager with specified time, it's necessary 
to set this config when you may
             // access old version files. For example, in spark side, using 
"hoodie.datasource.read.paths"
             // which contains old version files, if not specify this value, 
these files will be filtered.
             fsView = 
FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext,
                 metaClient, 
HoodieInputFormatUtils.buildMetadataConfig(getConf()),
-                
metaClient.getActiveTimeline().filterCompletedInstants().findInstantsBeforeOrEquals(getConf().get("as.of.instant")));
+                
metaClient.getActiveTimeline().filterCompletedInstants().findInstantsBeforeOrEquals(getConf().get(TIMESTAMP_AS_OF.key())));
           } else {
             fsView = 
FileSystemViewManager.createInMemoryFileSystemView(engineContext,
                 metaClient, 
HoodieInputFormatUtils.buildMetadataConfig(getConf()));
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index e8ffb09ff9..6370a0752e 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -116,11 +116,7 @@ object DataSourceReadOptions {
     .withDocumentation("For the use-cases like users only want to incremental 
pull from certain partitions "
       + "instead of the full table. This option allows using glob pattern to 
directly filter on path.")
 
-  val TIME_TRAVEL_AS_OF_INSTANT: ConfigProperty[String] = ConfigProperty
-    .key("as.of.instant")
-    .noDefaultValue()
-    .withDocumentation("The query instant for time travel. Without specified 
this option," +
-      " we query the latest snapshot.")
+  val TIME_TRAVEL_AS_OF_INSTANT: ConfigProperty[String] = 
HoodieCommonConfig.TIMESTAMP_AS_OF
 
   val ENABLE_DATA_SKIPPING: ConfigProperty[Boolean] = ConfigProperty
     .key("hoodie.enable.data.skipping")
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
index 63186c0759..025a224373 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
@@ -26,7 +26,7 @@ import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, 
HoodieInstantTimeGenerator}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.util.PartitionPathEncodeUtils
-import org.apache.hudi.{AvroConversionUtils, SparkAdapterSupport}
+import org.apache.hudi.{AvroConversionUtils, DataSourceOptionsHelper, 
DataSourceReadOptions, SparkAdapterSupport}
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.Resolver
@@ -250,9 +250,17 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
                    (baseConfig: Map[String, String] = Map.empty): Map[String, 
String] = {
     baseConfig ++ DFSPropertiesConfiguration.getGlobalProps.asScala ++ // 
Table options has the highest priority
       (spark.sessionState.conf.getAllConfs ++ 
HoodieOptionConfig.mappingSqlOptionToHoodieParam(options))
-        .filterKeys(_.startsWith("hoodie."))
+        .filterKeys(isHoodieConfigKey)
   }
 
+  /**
+   * Check if Sql options are Hoodie Config keys.
+   *
+   * TODO: standardize the key prefix so that we don't need this helper 
(HUDI-4935)
+   */
+  def isHoodieConfigKey(key: String): Boolean =
+    key.startsWith("hoodie.") || key == 
DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key
+
   /**
    * Checks whether Spark is using Hive as Session's Catalog
    */

Reply via email to