This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 59786113fae [HUDI-5998] Speed up reads from bootstrapped tables in
spark (#8303)
59786113fae is described below
commit 59786113fae88382f03412c7c79f7a827bf9393f
Author: Jon Vexler <[email protected]>
AuthorDate: Fri May 26 02:36:32 2023 -0400
[HUDI-5998] Speed up reads from bootstrapped tables in spark (#8303)
Introduces new config `hoodie.bootstrap.data.queries.only` that is disabled
by default. To read the Hudi metadata fields, it needs to be set to false.
---
.../apache/hudi/config/HoodieBootstrapConfig.java | 7 ++++++
.../org/apache/hudi/config/HoodieWriteConfig.java | 5 ++++
.../main/scala/org/apache/hudi/DefaultSource.scala | 24 +++++++++++++++++--
.../scala/org/apache/hudi/HoodieBaseRelation.scala | 5 +++-
.../org/apache/hudi/HoodieBootstrapRelation.scala | 13 ++++++++--
.../scala/org/apache/hudi/HoodieFileIndex.scala | 28 ++++++++++++++++++----
.../apache/hudi/SparkHoodieTableFileIndex.scala | 23 ++++++++++++------
.../apache/hudi/functional/TestBootstrapRead.java | 5 ++++
.../functional/TestDataSourceForBootstrap.scala | 9 ++++---
9 files changed, 99 insertions(+), 20 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
index 1de36a26a5f..d88f0bb2e6f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
@@ -67,6 +67,13 @@ public class HoodieBootstrapConfig extends HoodieConfig {
.sinceVersion("0.6.0")
.withDocumentation("Selects the mode in which each file/partition in the
bootstrapped dataset gets bootstrapped");
+ public static final ConfigProperty<String> DATA_QUERIES_ONLY = ConfigProperty
+ .key("hoodie.bootstrap.data.queries.only")
+ .defaultValue("false")
+ .markAdvanced()
+ .sinceVersion("0.14.0")
+ .withDocumentation("Improves query performance, but queries cannot use
hudi metadata fields");
+
public static final ConfigProperty<String>
FULL_BOOTSTRAP_INPUT_PROVIDER_CLASS_NAME = ConfigProperty
.key("hoodie.bootstrap.full.input.provider")
.defaultValue("org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider")
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 878d185af06..50f264cd7e6 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2855,6 +2855,11 @@ public class HoodieWriteConfig extends HoodieConfig {
return this;
}
+ public Builder withHiveStylePartitioningEnabled(boolean enabled) {
+ writeConfig.setValue(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE,
String.valueOf(enabled));
+ return this;
+ }
+
public Builder withExternalSchemaTrasformation(boolean enabled) {
writeConfig.setValue(AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE,
String.valueOf(enabled));
return this;
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index e4be1a136e2..6655093fc85 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -28,6 +28,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.util.ConfigUtils
import org.apache.hudi.common.util.ValidationUtils.checkState
+import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.util.PathUtils
@@ -101,7 +102,8 @@ class DefaultSource extends RelationProvider
)
} else {
Map()
- }) ++ DataSourceOptionsHelper.parametersWithReadDefaults(optParams)
+ }) ++ DataSourceOptionsHelper.parametersWithReadDefaults(optParams +
+ (DATA_QUERIES_ONLY.key() -> sqlContext.getConf(DATA_QUERIES_ONLY.key(),
optParams.getOrElse(DATA_QUERIES_ONLY.key(),
DATA_QUERIES_ONLY.defaultValue()))))
// Get the table base path
val tablePath = if (globPaths.nonEmpty) {
@@ -262,7 +264,7 @@ object DefaultSource {
new MergeOnReadIncrementalRelation(sqlContext, parameters,
metaClient, userSchema)
case (_, _, true) =>
- new HoodieBootstrapRelation(sqlContext, userSchema, globPaths,
metaClient, parameters)
+ resolveHoodieBootstrapRelation(sqlContext, globPaths, userSchema,
metaClient, parameters)
case (_, _, _) =>
throw new HoodieException(s"Invalid query type : $queryType for
tableType: $tableType," +
@@ -271,6 +273,24 @@ object DefaultSource {
}
}
+ private def resolveHoodieBootstrapRelation(sqlContext: SQLContext,
+ globPaths: Seq[Path],
+ userSchema: Option[StructType],
+ metaClient: HoodieTableMetaClient,
+ parameters: Map[String, String]):
BaseRelation = {
+ val enableFileIndex = HoodieSparkConfUtils.getConfigValue(parameters,
sqlContext.sparkSession.sessionState.conf,
+ ENABLE_HOODIE_FILE_INDEX.key,
ENABLE_HOODIE_FILE_INDEX.defaultValue.toString).toBoolean
+ val isSchemaEvolutionEnabledOnRead =
HoodieSparkConfUtils.getConfigValue(parameters,
+ sqlContext.sparkSession.sessionState.conf,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
+
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
+ if (!enableFileIndex || isSchemaEvolutionEnabledOnRead
+ || globPaths.nonEmpty || !parameters.getOrElse(DATA_QUERIES_ONLY.key,
DATA_QUERIES_ONLY.defaultValue).toBoolean) {
+ HoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient,
parameters + (DATA_QUERIES_ONLY.key() -> "false"))
+ } else {
+ HoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient,
parameters).toHadoopFsRelation
+ }
+ }
+
private def resolveBaseFileOnlyRelation(sqlContext: SQLContext,
globPaths: Seq[Path],
userSchema: Option[StructType],
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index d7588335822..de67504d73d 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -38,6 +38,7 @@ import org.apache.hudi.common.table.{HoodieTableConfig,
HoodieTableMetaClient, T
import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.{ConfigUtils, StringUtils}
+import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hadoop.CachingPath
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
@@ -225,7 +226,9 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
val shouldExtractPartitionValueFromPath =
optParams.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key,
DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean
- shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath
+ val shouldUseBootstrapFastRead =
optParams.getOrElse(DATA_QUERIES_ONLY.key(), "false").toBoolean
+
+ shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath ||
shouldUseBootstrapFastRead
}
/**
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
index 5e15c0d1d68..34db2850501 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
@@ -29,7 +29,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
PartitionedFile}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
@@ -200,6 +200,16 @@ case class HoodieBootstrapRelation(override val
sqlContext: SQLContext,
override def updatePrunedDataSchema(prunedSchema: StructType):
HoodieBootstrapRelation =
this.copy(prunedDataSchema = Some(prunedSchema))
+ def toHadoopFsRelation: HadoopFsRelation = {
+ HadoopFsRelation(
+ location = fileIndex,
+ partitionSchema = fileIndex.partitionSchema,
+ dataSchema = fileIndex.dataSchema,
+ bucketSpec = None,
+ fileFormat = fileFormat,
+ optParams)(sparkSession)
+ }
+
//TODO: This should be unnecessary with spark 3.4 [SPARK-41970]
private def encodePartitionPath(file: FileStatus): String = {
val tablePathWithoutScheme =
CachingPath.getPathWithoutSchemeAndAuthority(bootstrapBasePath)
@@ -212,7 +222,6 @@ case class HoodieBootstrapRelation(override val sqlContext:
SQLContext,
object HoodieBootstrapRelation {
-
private def validate(requiredDataSchema: HoodieTableSchema,
requiredDataFileSchema: StructType, requiredSkeletonFileSchema: StructType):
Unit = {
val requiredDataColumns: Seq[String] =
requiredDataSchema.structTypeSchema.fieldNames.toSeq
val combinedColumns = (requiredSkeletonFileSchema.fieldNames ++
requiredDataFileSchema.fieldNames).toSeq
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 3131c81abee..3767b65a8ce 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -22,6 +22,7 @@ import
org.apache.hudi.HoodieFileIndex.{DataSkippingFailureMode, collectReferenc
import org.apache.hudi.HoodieSparkConfUtils.getConfigValue
import
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT,
TIMESTAMP_OUTPUT_DATE_FORMAT}
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
+import org.apache.hudi.common.model.HoodieBaseFile
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.exception.HoodieException
@@ -41,6 +42,7 @@ import org.apache.spark.unsafe.types.UTF8String
import java.text.SimpleDateFormat
import javax.annotation.concurrent.NotThreadSafe
import scala.collection.JavaConverters._
+import scala.collection.mutable
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}
@@ -145,11 +147,10 @@ case class HoodieFileIndex(spark: SparkSession,
val prunedPartitions = listMatchingPartitionPaths(partitionFilters)
val listedPartitions = getInputFileSlices(prunedPartitions:
_*).asScala.toSeq.map {
case (partition, fileSlices) =>
- val baseFileStatuses: Seq[FileStatus] =
- fileSlices.asScala
- .map(fs => fs.getBaseFile.orElse(null))
- .filter(_ != null)
- .map(_.getFileStatus)
+ val baseFileStatuses: Seq[FileStatus] = getBaseFileStatus(fileSlices
+ .asScala
+ .map(fs => fs.getBaseFile.orElse(null))
+ .filter(_ != null))
// Filter in candidate files based on the col-stats index lookup
val candidateFiles = baseFileStatuses.filter(fs =>
@@ -179,6 +180,23 @@ case class HoodieFileIndex(spark: SparkSession,
}
}
+ /**
+ * In the fast bootstrap read code path, it gets the file status for the
bootstrap base files instead of
+ * skeleton files.
+ */
+ private def getBaseFileStatus(baseFiles: mutable.Buffer[HoodieBaseFile]):
mutable.Buffer[FileStatus] = {
+ if (shouldFastBootstrap) {
+ baseFiles.map(f =>
+ if (f.getBootstrapBaseFile.isPresent) {
+ f.getBootstrapBaseFile.get().getFileStatus
+ } else {
+ f.getFileStatus
+ })
+ } else {
+ baseFiles.map(_.getFileStatus)
+ }
+ }
+
private def lookupFileNamesMissingFromIndex(allIndexedFileNames:
Set[String]) = {
val allBaseFileNames = allFiles.map(f => f.getPath.getName).toSet
allBaseFileNames -- allIndexedFileNames
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index be9c8cdb6bb..c76af7b39ce 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -23,11 +23,12 @@ import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.HoodieConversionUtils.toJavaOption
import org.apache.hudi.SparkHoodieTableFileIndex._
import org.apache.hudi.client.common.HoodieSparkEngineContext
-import org.apache.hudi.common.bootstrap.index.BootstrapIndex
import org.apache.hudi.common.config.TypedProperties
+import
org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION
import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType}
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.util.ValidationUtils.checkState
+import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
import org.apache.hudi.hadoop.CachingPath
import org.apache.hudi.hadoop.CachingPath.createRelativePathUnsafe
import org.apache.hudi.keygen.{StringPartitionPathFormatter,
TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
@@ -40,7 +41,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.{InternalRow, expressions}
import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache}
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{ByteType, DataType, DateType, IntegerType,
LongType, ShortType, StringType, StructField, StructType}
+import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import javax.annotation.concurrent.NotThreadSafe
@@ -83,10 +84,18 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
/**
* Get the schema of the table.
*/
- lazy val schema: StructType = schemaSpec.getOrElse({
- val schemaUtil = new TableSchemaResolver(metaClient)
-
AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema)
- })
+ lazy val schema: StructType = if (shouldFastBootstrap) {
+ StructType(rawSchema.fields.filterNot(f =>
HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name)))
+ } else {
+ rawSchema
+ }
+
+ private lazy val rawSchema: StructType = schemaSpec.getOrElse({
+ val schemaUtil = new TableSchemaResolver(metaClient)
+
AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema)
+ })
+
+ protected lazy val shouldFastBootstrap =
configProperties.getBoolean(DATA_QUERIES_ONLY.key, false)
private lazy val sparkParsePartitionUtil =
sparkAdapter.getSparkParsePartitionUtil
@@ -110,7 +119,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
.map(column => nameFieldMap.apply(column))
if (partitionFields.size != partitionColumns.get().size) {
- val isBootstrapTable =
BootstrapIndex.getBootstrapIndex(metaClient).useIndex()
+ val isBootstrapTable = tableConfig.getBootstrapBasePath.isPresent
if (isBootstrapTable) {
// For bootstrapped tables its possible the schema does not
contain partition field when source table
// is hive style partitioned. In this case we would like to treat
the table as non-partitioned
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
index 874d6fe3062..f6ec64ce16b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
@@ -54,6 +54,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static
org.apache.hudi.common.testutils.RawTripTestPayload.recordToString;
+import static org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
@@ -228,12 +229,16 @@ public class TestBootstrapRead extends
HoodieSparkClientTestBase {
}
Dataset<Row> hudiDf =
sparkSession.read().options(readOpts).format("hudi").load(hudiBasePath);
Dataset<Row> bootstrapDf =
sparkSession.read().format("hudi").load(bootstrapTargetPath);
+ Dataset<Row> fastBootstrapDf =
sparkSession.read().format("hudi").option(DATA_QUERIES_ONLY.key(),
"true").load(bootstrapTargetPath);
if (nPartitions == 0) {
+ compareDf(fastBootstrapDf.drop("city_to_state"),
bootstrapDf.drop(dropColumns).drop("_hoodie_partition_path"));
compareDf(hudiDf.drop(dropColumns), bootstrapDf.drop(dropColumns));
return;
}
compareDf(hudiDf.drop(dropColumns).drop(partitionCols),
bootstrapDf.drop(dropColumns).drop(partitionCols));
+ compareDf(fastBootstrapDf.drop("city_to_state").drop(partitionCols),
bootstrapDf.drop(dropColumns).drop("_hoodie_partition_path").drop(partitionCols));
compareDf(hudiDf.select("_row_key",partitionCols),
bootstrapDf.select("_row_key",partitionCols));
+ compareDf(fastBootstrapDf.select("_row_key",partitionCols),
bootstrapDf.select("_row_key",partitionCols));
}
protected void compareDf(Dataset<Row> df1, Dataset<Row> df2) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
index 81c9233eb34..12974d133a8 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
@@ -583,10 +583,12 @@ class TestDataSourceForBootstrap {
assertEquals(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS, commitInstantTime1)
// Read bootstrapped table and verify count
- val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
+ val hoodieROViewDF1 = spark.read.format("hudi")
+ .option(HoodieBootstrapConfig.DATA_QUERIES_ONLY.key(),
"true").load(basePath + "/*")
assertEquals(sort(sourceDF).collectAsList(),
sort(dropMetaCols(hoodieROViewDF1)).collectAsList())
- val hoodieROViewDFWithBasePath = spark.read.format("hudi").load(basePath)
+ val hoodieROViewDFWithBasePath = spark.read.format("hudi")
+ .option(HoodieBootstrapConfig.DATA_QUERIES_ONLY.key(),
"true").load(basePath)
assertEquals(sort(sourceDF).collectAsList(),
sort(dropMetaCols(hoodieROViewDFWithBasePath)).collectAsList())
// Perform upsert
@@ -606,7 +608,8 @@ class TestDataSourceForBootstrap {
assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath,
commitInstantTime1).size())
// Read table after upsert and verify count
- val hoodieROViewDF2 = spark.read.format("hudi").load(basePath + "/*")
+ val hoodieROViewDF2 = spark.read.format("hudi")
+ .option(HoodieBootstrapConfig.DATA_QUERIES_ONLY.key(),
"true").load(basePath + "/*")
assertEquals(numRecords, hoodieROViewDF2.count())
assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp ==
$updateTimestamp").count())