This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7b17e6fa1f [HUDI-5403] Turn off metadata-table-based file listing in
BaseHoodieTableFileIndex (#7488)
7b17e6fa1f is described below
commit 7b17e6fa1f7ed4d112e81ecc51c2d0f048b874db
Author: Y Ethan Guo <[email protected]>
AuthorDate: Fri Dec 16 23:49:04 2022 -0800
[HUDI-5403] Turn off metadata-table-based file listing in
BaseHoodieTableFileIndex (#7488)
Currently, on the reader or query engine side, the direct file listing on
the file system is used by default, as indicated by
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS (=false). Without
providing explicit config of hoodie.metadata.enable, the metadata-table-based
file listing is disabled. However, the BaseHoodieTableFileIndex, the common
File Index implementation, used by Trino Hive connector, does not respect this
default behavior. This leads to performance regression [...]
This PR fixes the BaseHoodieTableFileIndex to respect the expected behavior
defined by HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS, i.e.,
metadata-table-based file listing is disabled by default. The
metadata-table-based file listing is only enabled when hoodie.metadata.enable
is set to true and the files partition of the metadata table is ready for read
based on the Hudi table config.
Impact
This mitigates the performance regression of query latency in Trino Hive
connector and fixes the read-side behavior of the file listing.
Tested the PR that by default, the HoodieParquetInputFormat does not read
metadata table for file listing anymore.
Co-authored-by: Sagar Sumit <[email protected]>
Co-authored-by: Alexey Kudinkin <[email protected]>
---
.../TestHoodieClientOnMergeOnReadStorage.java | 2 +-
.../org/apache/hudi/BaseHoodieTableFileIndex.java | 5 +++
.../hudi/metadata/HoodieTableMetadataUtil.java | 46 ++++++++++++++--------
.../org/apache/hudi/integ/ITTestHoodieSanity.java | 12 ++++--
.../org/apache/hudi/BaseFileOnlyRelation.scala | 4 +-
.../scala/org/apache/hudi/HoodieFileIndex.scala | 24 ++++++-----
.../org/apache/hudi/HoodieSparkConfUtils.scala | 10 ++---
.../scala/org/apache/hudi/cdc/HoodieCDCRDD.scala | 2 +-
8 files changed, 63 insertions(+), 42 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
index 026a607f06..8935c6ee35 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
@@ -230,7 +230,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends
HoodieClientTestBase {
client.logCompact(timeStamp.get());
// Verify all the records.
assertDataInMORTable(config, lastCommitBeforeLogCompaction,
timeStamp.get(),
- hadoopConf, Arrays.asList(dataGen.getPartitionPaths()));
+ hadoopConf,
Arrays.asList(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH));
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index 681b05f250..cef86109ff 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -39,6 +39,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.CachingPath;
import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
@@ -57,6 +58,8 @@ import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
+import static
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
+import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE;
import static org.apache.hudi.common.util.CollectionUtils.combine;
import static org.apache.hudi.hadoop.CachingPath.createRelativePathUnsafe;
@@ -126,6 +129,8 @@ public abstract class BaseHoodieTableFileIndex implements
AutoCloseable {
this.metadataConfig = HoodieMetadataConfig.newBuilder()
.fromProperties(configProperties)
+ .enable(configProperties.getBoolean(ENABLE.key(),
DEFAULT_METADATA_ENABLE_FOR_READERS)
+ && HoodieTableMetadataUtil.isFilesPartitionAvailable(metaClient))
.build();
this.queryPaths = queryPaths;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index af5f2214cc..9d6a41ea15 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -111,6 +111,18 @@ public class HoodieTableMetadataUtil {
public static final String PARTITION_NAME_COLUMN_STATS = "column_stats";
public static final String PARTITION_NAME_BLOOM_FILTERS = "bloom_filters";
+ /**
+ * Returns whether the files partition of metadata table is ready for read.
+ *
+ * @param metaClient {@link HoodieTableMetaClient} instance.
+ * @return true if the files partition of metadata table is ready for read,
+ * based on the table config; false otherwise.
+ */
+ public static boolean isFilesPartitionAvailable(HoodieTableMetaClient
metaClient) {
+ return metaClient.getTableConfig().getMetadataPartitions()
+ .contains(HoodieTableMetadataUtil.PARTITION_NAME_FILES);
+ }
+
/**
* Collects {@link HoodieColumnRangeMetadata} for the provided collection of
records, pretending
* as if provided records have been persisted w/in given {@code filePath}
@@ -171,23 +183,23 @@ public class HoodieTableMetadataUtil {
Collectors.toMap(colRangeMetadata -> colRangeMetadata.getColumnName(),
Function.identity());
return (Map<String, HoodieColumnRangeMetadata<Comparable>>)
targetFields.stream()
- .map(field -> {
- ColumnStats colStats = allColumnStats.get(field.name());
- return HoodieColumnRangeMetadata.<Comparable>create(
- filePath,
- field.name(),
- colStats == null ? null : coerceToComparable(field.schema(),
colStats.minValue),
- colStats == null ? null : coerceToComparable(field.schema(),
colStats.maxValue),
- colStats == null ? 0 : colStats.nullCount,
- colStats == null ? 0 : colStats.valueCount,
- // NOTE: Size and compressed size statistics are set to 0 to make
sure we're not
- // mixing up those provided by Parquet with the ones from
other encodings,
- // since those are not directly comparable
- 0,
- 0
- );
- })
- .collect(collector);
+ .map(field -> {
+ ColumnStats colStats = allColumnStats.get(field.name());
+ return HoodieColumnRangeMetadata.<Comparable>create(
+ filePath,
+ field.name(),
+ colStats == null ? null : coerceToComparable(field.schema(),
colStats.minValue),
+ colStats == null ? null : coerceToComparable(field.schema(),
colStats.maxValue),
+ colStats == null ? 0 : colStats.nullCount,
+ colStats == null ? 0 : colStats.valueCount,
+ // NOTE: Size and compressed size statistics are set to 0 to
make sure we're not
+ // mixing up those provided by Parquet with the ones from
other encodings,
+ // since those are not directly comparable
+ 0,
+ 0
+ );
+ })
+ .collect(collector);
}
/**
diff --git
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
index 40827c650a..a050d7eb88 100644
---
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
+++
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
@@ -185,12 +185,12 @@ public class ITTestHoodieSanity extends ITTestBase {
// Ensure row count is 80 (without duplicates) (100 - 20 deleted)
stdOutErr = executeHiveCommand("select count(1) from " +
snapshotTableName);
- assertEquals(80,
Integer.parseInt(stdOutErr.getLeft().substring(stdOutErr.getLeft().lastIndexOf("\n")).trim()),
+ assertEquals(80, Integer.parseInt(lastLine(stdOutErr.getLeft()).trim()),
"Expecting 80 rows to be present in the snapshot table");
if (roTableName.isPresent()) {
stdOutErr = executeHiveCommand("select count(1) from " +
roTableName.get());
- assertEquals(80,
Integer.parseInt(stdOutErr.getLeft().substring(stdOutErr.getLeft().lastIndexOf("\n")).trim()),
+ assertEquals(80, Integer.parseInt(lastLine(stdOutErr.getLeft()).trim()),
"Expecting 80 rows to be present in the snapshot table");
}
@@ -204,10 +204,16 @@ public class ITTestHoodieSanity extends ITTestBase {
} else {
stdOutErr = executeHiveCommand("select count(1) from " +
snapshotTableName);
}
- assertEquals(280, Integer.parseInt(stdOutErr.getLeft().trim()),
+
+ assertEquals(280, Integer.parseInt(lastLine(stdOutErr.getLeft()).trim()),
"Expecting 280 rows to be present in the new table");
}
+ private static String lastLine(String output) {
+ String[] lines = output.split("\n");
+ return lines[lines.length - 1];
+ }
+
public void testRunHoodieJavaApp(String hiveTableName, String tableType,
PartitionType partitionType)
throws Exception {
testRunHoodieJavaApp(HOODIE_JAVA_APP, hiveTableName, tableType,
partitionType);
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
index 523cb53ba7..42e71e5e33 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
@@ -135,8 +135,8 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
* rule; you can find more details in HUDI-3896)
*/
def toHadoopFsRelation: HadoopFsRelation = {
- val enableFileIndex = HoodieSparkConfUtils.getBooleanConfigValue(
- optParams, sparkSession.sessionState.conf, ENABLE_HOODIE_FILE_INDEX.key,
ENABLE_HOODIE_FILE_INDEX.defaultValue)
+ val enableFileIndex = HoodieSparkConfUtils.getConfigValue(optParams,
sparkSession.sessionState.conf,
+ ENABLE_HOODIE_FILE_INDEX.key,
ENABLE_HOODIE_FILE_INDEX.defaultValue.toString).toBoolean
if (enableFileIndex && globPaths.isEmpty) {
// NOTE: There are currently 2 ways partition values could be fetched:
// - Source columns (producing the values used for physical
partitioning) will be read
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 40e1353a54..60f413970a 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -19,6 +19,7 @@ package org.apache.hudi
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hudi.HoodieFileIndex.{DataSkippingFailureMode,
collectReferencedColumns, getConfigProperties}
+import org.apache.hudi.HoodieSparkConfUtils.getConfigValue
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.StringUtils
@@ -74,7 +75,7 @@ case class HoodieFileIndex(spark: SparkSession,
spark = spark,
metaClient = metaClient,
schemaSpec = schemaSpec,
- configProperties = getConfigProperties(spark, options, metaClient),
+ configProperties = getConfigProperties(spark, options),
queryPaths = HoodieFileIndex.getQueryPaths(options),
specifiedQueryInstant =
options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant),
fileStatusCache = fileStatusCache
@@ -251,8 +252,8 @@ case class HoodieFileIndex(spark: SparkSession,
override def sizeInBytes: Long = getTotalCachedFilesSize
- private def isDataSkippingEnabled: Boolean =
HoodieSparkConfUtils.getBooleanConfigValue(
- options, spark.sessionState.conf,
DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), false)
+ private def isDataSkippingEnabled: Boolean = getConfigValue(options,
spark.sessionState.conf,
+ DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "false").toBoolean
private def isMetadataTableEnabled: Boolean = metadataConfig.enabled()
@@ -291,21 +292,18 @@ object HoodieFileIndex extends Logging {
schema.fieldNames.filter { colName => refs.exists(r =>
resolver.apply(colName, r.name)) }
}
- private def isFilesPartitionAvailable(metaClient: HoodieTableMetaClient):
Boolean = {
-
metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_FILES)
- }
-
- def getConfigProperties(spark: SparkSession, options: Map[String, String],
metaClient: HoodieTableMetaClient) = {
+ def getConfigProperties(spark: SparkSession, options: Map[String, String]) =
{
val sqlConf: SQLConf = spark.sessionState.conf
val properties = new TypedProperties()
+ properties.putAll(options.filter(p => p._2 != null).asJava)
// To support metadata listing via Spark SQL we allow users to pass the
config via SQL Conf in spark session. Users
// would be able to run SET hoodie.metadata.enable=true in the spark sql
session to enable metadata listing.
- val isMetadataFilesPartitionAvailable =
isFilesPartitionAvailable(metaClient) &&
- HoodieSparkConfUtils.getBooleanConfigValue(
- options, sqlConf, HoodieMetadataConfig.ENABLE.key(),
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS)
- properties.putAll(options.filter(p => p._2 != null).asJava)
- properties.setProperty(HoodieMetadataConfig.ENABLE.key(),
String.valueOf(isMetadataFilesPartitionAvailable))
+ val isMetadataTableEnabled = getConfigValue(options, sqlConf,
HoodieMetadataConfig.ENABLE.key, null)
+ if (isMetadataTableEnabled != null) {
+ properties.setProperty(HoodieMetadataConfig.ENABLE.key(),
String.valueOf(isMetadataTableEnabled))
+ }
+
properties
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkConfUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkConfUtils.scala
index 0d85ace0bf..5a6f03aed1 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkConfUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkConfUtils.scala
@@ -34,10 +34,10 @@ object HoodieSparkConfUtils {
* @param defaultValue Default value to return if not configured.
* @return The config value.
*/
- def getBooleanConfigValue(options: Map[String, String],
- sqlConf: SQLConf,
- configKey: String,
- defaultValue: Boolean): Boolean = {
- options.getOrElse(configKey, sqlConf.getConfString(configKey,
defaultValue.toString)).toBoolean
+ def getConfigValue(options: Map[String, String],
+ sqlConf: SQLConf,
+ configKey: String,
+ defaultValue: String): String = {
+ options.getOrElse(configKey, sqlConf.getConfString(configKey,
defaultValue))
}
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
index 092bf8a191..9df5586b3d 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
@@ -94,7 +94,7 @@ class HoodieCDCRDD(
private val cdcSupplementalLoggingMode =
metaClient.getTableConfig.cdcSupplementalLoggingMode
- private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty,
metaClient)
+ private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty)
protected val payloadProps: Properties =
Option(metaClient.getTableConfig.getPreCombineField)
.map { preCombineField =>