This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b17e6fa1f [HUDI-5403] Turn off metadata-table-based file listing in 
BaseHoodieTableFileIndex (#7488)
7b17e6fa1f is described below

commit 7b17e6fa1f7ed4d112e81ecc51c2d0f048b874db
Author: Y Ethan Guo <[email protected]>
AuthorDate: Fri Dec 16 23:49:04 2022 -0800

    [HUDI-5403] Turn off metadata-table-based file listing in 
BaseHoodieTableFileIndex (#7488)
    
    Currently, on the reader or query engine side, the direct file listing on 
the file system is used by default, as indicated by 
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS (=false). Without 
providing explicit config of hoodie.metadata.enable, the metadata-table-based 
file listing is disabled. However, the BaseHoodieTableFileIndex, the common 
File Index implementation, used by Trino Hive connector, does not respect this 
default behavior. This leads to performance regression  [...]
    
    This PR fixes the BaseHoodieTableFileIndex to respect the expected behavior 
defined by HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS, i.e., 
metadata-table-based file listing is disabled by default. The 
metadata-table-based file listing is only enabled when hoodie.metadata.enable 
is set to true and the files partition of the metadata table is ready for read 
based on the Hudi table config.
    
    Impact
    This mitigates the performance regression of query latency in Trino Hive 
connector and fixes the read-side behavior of the file listing.
    
    Tested the PR that by default, the HoodieParquetInputFormat does not read 
metadata table for file listing anymore.
    
    Co-authored-by: Sagar Sumit <[email protected]>
    Co-authored-by: Alexey Kudinkin <[email protected]>
---
 .../TestHoodieClientOnMergeOnReadStorage.java      |  2 +-
 .../org/apache/hudi/BaseHoodieTableFileIndex.java  |  5 +++
 .../hudi/metadata/HoodieTableMetadataUtil.java     | 46 ++++++++++++++--------
 .../org/apache/hudi/integ/ITTestHoodieSanity.java  | 12 ++++--
 .../org/apache/hudi/BaseFileOnlyRelation.scala     |  4 +-
 .../scala/org/apache/hudi/HoodieFileIndex.scala    | 24 ++++++-----
 .../org/apache/hudi/HoodieSparkConfUtils.scala     | 10 ++---
 .../scala/org/apache/hudi/cdc/HoodieCDCRDD.scala   |  2 +-
 8 files changed, 63 insertions(+), 42 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
index 026a607f06..8935c6ee35 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
@@ -230,7 +230,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends 
HoodieClientTestBase {
     client.logCompact(timeStamp.get());
     // Verify all the records.
     assertDataInMORTable(config, lastCommitBeforeLogCompaction, 
timeStamp.get(),
-        hadoopConf, Arrays.asList(dataGen.getPartitionPaths()));
+        hadoopConf, 
Arrays.asList(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH));
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java 
b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index 681b05f250..cef86109ff 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -39,6 +39,7 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.hadoop.CachingPath;
 import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
@@ -57,6 +58,8 @@ import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
+import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE;
 import static org.apache.hudi.common.util.CollectionUtils.combine;
 import static org.apache.hudi.hadoop.CachingPath.createRelativePathUnsafe;
 
@@ -126,6 +129,8 @@ public abstract class BaseHoodieTableFileIndex implements 
AutoCloseable {
 
     this.metadataConfig = HoodieMetadataConfig.newBuilder()
         .fromProperties(configProperties)
+        .enable(configProperties.getBoolean(ENABLE.key(), 
DEFAULT_METADATA_ENABLE_FOR_READERS)
+            && HoodieTableMetadataUtil.isFilesPartitionAvailable(metaClient))
         .build();
 
     this.queryPaths = queryPaths;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index af5f2214cc..9d6a41ea15 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -111,6 +111,18 @@ public class HoodieTableMetadataUtil {
   public static final String PARTITION_NAME_COLUMN_STATS = "column_stats";
   public static final String PARTITION_NAME_BLOOM_FILTERS = "bloom_filters";
 
+  /**
+   * Returns whether the files partition of metadata table is ready for read.
+   *
+   * @param metaClient {@link HoodieTableMetaClient} instance.
+   * @return true if the files partition of metadata table is ready for read,
+   * based on the table config; false otherwise.
+   */
+  public static boolean isFilesPartitionAvailable(HoodieTableMetaClient 
metaClient) {
+    return metaClient.getTableConfig().getMetadataPartitions()
+        .contains(HoodieTableMetadataUtil.PARTITION_NAME_FILES);
+  }
+
   /**
    * Collects {@link HoodieColumnRangeMetadata} for the provided collection of 
records, pretending
    * as if provided records have been persisted w/in given {@code filePath}
@@ -171,23 +183,23 @@ public class HoodieTableMetadataUtil {
         Collectors.toMap(colRangeMetadata -> colRangeMetadata.getColumnName(), 
Function.identity());
 
     return (Map<String, HoodieColumnRangeMetadata<Comparable>>) 
targetFields.stream()
-      .map(field -> {
-        ColumnStats colStats = allColumnStats.get(field.name());
-        return HoodieColumnRangeMetadata.<Comparable>create(
-            filePath,
-            field.name(),
-            colStats == null ? null : coerceToComparable(field.schema(), 
colStats.minValue),
-            colStats == null ? null : coerceToComparable(field.schema(), 
colStats.maxValue),
-            colStats == null ? 0 : colStats.nullCount,
-            colStats == null ? 0 : colStats.valueCount,
-            // NOTE: Size and compressed size statistics are set to 0 to make 
sure we're not
-            //       mixing up those provided by Parquet with the ones from 
other encodings,
-            //       since those are not directly comparable
-            0,
-            0
-        );
-      })
-      .collect(collector);
+        .map(field -> {
+          ColumnStats colStats = allColumnStats.get(field.name());
+          return HoodieColumnRangeMetadata.<Comparable>create(
+              filePath,
+              field.name(),
+              colStats == null ? null : coerceToComparable(field.schema(), 
colStats.minValue),
+              colStats == null ? null : coerceToComparable(field.schema(), 
colStats.maxValue),
+              colStats == null ? 0 : colStats.nullCount,
+              colStats == null ? 0 : colStats.valueCount,
+              // NOTE: Size and compressed size statistics are set to 0 to 
make sure we're not
+              //       mixing up those provided by Parquet with the ones from 
other encodings,
+              //       since those are not directly comparable
+              0,
+              0
+          );
+        })
+        .collect(collector);
   }
 
   /**
diff --git 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
index 40827c650a..a050d7eb88 100644
--- 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
+++ 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
@@ -185,12 +185,12 @@ public class ITTestHoodieSanity extends ITTestBase {
 
     // Ensure row count is 80 (without duplicates) (100 - 20 deleted)
     stdOutErr = executeHiveCommand("select count(1) from " + 
snapshotTableName);
-    assertEquals(80, 
Integer.parseInt(stdOutErr.getLeft().substring(stdOutErr.getLeft().lastIndexOf("\n")).trim()),
+    assertEquals(80, Integer.parseInt(lastLine(stdOutErr.getLeft()).trim()),
         "Expecting 80 rows to be present in the snapshot table");
 
     if (roTableName.isPresent()) {
       stdOutErr = executeHiveCommand("select count(1) from " + 
roTableName.get());
-      assertEquals(80, 
Integer.parseInt(stdOutErr.getLeft().substring(stdOutErr.getLeft().lastIndexOf("\n")).trim()),
+      assertEquals(80, Integer.parseInt(lastLine(stdOutErr.getLeft()).trim()),
           "Expecting 80 rows to be present in the snapshot table");
     }
 
@@ -204,10 +204,16 @@ public class ITTestHoodieSanity extends ITTestBase {
     } else {
       stdOutErr = executeHiveCommand("select count(1) from " + 
snapshotTableName);
     }
-    assertEquals(280, Integer.parseInt(stdOutErr.getLeft().trim()),
+
+    assertEquals(280, Integer.parseInt(lastLine(stdOutErr.getLeft()).trim()),
         "Expecting 280 rows to be present in the new table");
   }
 
+  private static String lastLine(String output) {
+    String[] lines = output.split("\n");
+    return lines[lines.length - 1];
+  }
+
   public void testRunHoodieJavaApp(String hiveTableName, String tableType, 
PartitionType partitionType)
       throws Exception {
     testRunHoodieJavaApp(HOODIE_JAVA_APP, hiveTableName, tableType, 
partitionType);
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
index 523cb53ba7..42e71e5e33 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
@@ -135,8 +135,8 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
    *       rule; you can find more details in HUDI-3896)
    */
   def toHadoopFsRelation: HadoopFsRelation = {
-    val enableFileIndex = HoodieSparkConfUtils.getBooleanConfigValue(
-      optParams, sparkSession.sessionState.conf, ENABLE_HOODIE_FILE_INDEX.key, 
ENABLE_HOODIE_FILE_INDEX.defaultValue)
+    val enableFileIndex = HoodieSparkConfUtils.getConfigValue(optParams, 
sparkSession.sessionState.conf,
+      ENABLE_HOODIE_FILE_INDEX.key, 
ENABLE_HOODIE_FILE_INDEX.defaultValue.toString).toBoolean
     if (enableFileIndex && globPaths.isEmpty) {
       // NOTE: There are currently 2 ways partition values could be fetched:
       //          - Source columns (producing the values used for physical 
partitioning) will be read
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 40e1353a54..60f413970a 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -19,6 +19,7 @@ package org.apache.hudi
 
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hudi.HoodieFileIndex.{DataSkippingFailureMode, 
collectReferencedColumns, getConfigProperties}
+import org.apache.hudi.HoodieSparkConfUtils.getConfigValue
 import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.util.StringUtils
@@ -74,7 +75,7 @@ case class HoodieFileIndex(spark: SparkSession,
     spark = spark,
     metaClient = metaClient,
     schemaSpec = schemaSpec,
-    configProperties = getConfigProperties(spark, options, metaClient),
+    configProperties = getConfigProperties(spark, options),
     queryPaths = HoodieFileIndex.getQueryPaths(options),
     specifiedQueryInstant = 
options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant),
     fileStatusCache = fileStatusCache
@@ -251,8 +252,8 @@ case class HoodieFileIndex(spark: SparkSession,
 
   override def sizeInBytes: Long = getTotalCachedFilesSize
 
-  private def isDataSkippingEnabled: Boolean = 
HoodieSparkConfUtils.getBooleanConfigValue(
-    options, spark.sessionState.conf, 
DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), false)
+  private def isDataSkippingEnabled: Boolean = getConfigValue(options, 
spark.sessionState.conf,
+    DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "false").toBoolean
 
   private def isMetadataTableEnabled: Boolean = metadataConfig.enabled()
 
@@ -291,21 +292,18 @@ object HoodieFileIndex extends Logging {
     schema.fieldNames.filter { colName => refs.exists(r => 
resolver.apply(colName, r.name)) }
   }
 
-  private def isFilesPartitionAvailable(metaClient: HoodieTableMetaClient): 
Boolean = {
-    
metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_FILES)
-  }
-
-  def getConfigProperties(spark: SparkSession, options: Map[String, String], 
metaClient: HoodieTableMetaClient) = {
+  def getConfigProperties(spark: SparkSession, options: Map[String, String]) = 
{
     val sqlConf: SQLConf = spark.sessionState.conf
     val properties = new TypedProperties()
+    properties.putAll(options.filter(p => p._2 != null).asJava)
 
     // To support metadata listing via Spark SQL we allow users to pass the 
config via SQL Conf in spark session. Users
     // would be able to run SET hoodie.metadata.enable=true in the spark sql 
session to enable metadata listing.
-    val isMetadataFilesPartitionAvailable = 
isFilesPartitionAvailable(metaClient) &&
-      HoodieSparkConfUtils.getBooleanConfigValue(
-        options, sqlConf, HoodieMetadataConfig.ENABLE.key(), 
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS)
-    properties.putAll(options.filter(p => p._2 != null).asJava)
-    properties.setProperty(HoodieMetadataConfig.ENABLE.key(), 
String.valueOf(isMetadataFilesPartitionAvailable))
+    val isMetadataTableEnabled = getConfigValue(options, sqlConf, 
HoodieMetadataConfig.ENABLE.key, null)
+    if (isMetadataTableEnabled != null) {
+      properties.setProperty(HoodieMetadataConfig.ENABLE.key(), 
String.valueOf(isMetadataTableEnabled))
+    }
+
     properties
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkConfUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkConfUtils.scala
index 0d85ace0bf..5a6f03aed1 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkConfUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkConfUtils.scala
@@ -34,10 +34,10 @@ object HoodieSparkConfUtils {
    * @param defaultValue Default value to return if not configured.
    * @return The config value.
    */
-  def getBooleanConfigValue(options: Map[String, String],
-                            sqlConf: SQLConf,
-                            configKey: String,
-                            defaultValue: Boolean): Boolean = {
-    options.getOrElse(configKey, sqlConf.getConfString(configKey, 
defaultValue.toString)).toBoolean
+  def getConfigValue(options: Map[String, String],
+                     sqlConf: SQLConf,
+                     configKey: String,
+                     defaultValue: String): String = {
+    options.getOrElse(configKey, sqlConf.getConfString(configKey, 
defaultValue))
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
index 092bf8a191..9df5586b3d 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
@@ -94,7 +94,7 @@ class HoodieCDCRDD(
 
   private val cdcSupplementalLoggingMode = 
metaClient.getTableConfig.cdcSupplementalLoggingMode
 
-  private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty, 
metaClient)
+  private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty)
 
   protected val payloadProps: Properties = 
Option(metaClient.getTableConfig.getPreCombineField)
     .map { preCombineField =>

Reply via email to