This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 23f657db3b9 [HUDI-6635] Add a new Hudi Parquet File Format supporting
MOR and Bootstrap queries in Spark (#9276)
23f657db3b9 is described below
commit 23f657db3b92e579ed8a32deb2743da5aae34fce
Author: Jon Vexler <[email protected]>
AuthorDate: Sun Aug 6 21:06:12 2023 -0400
[HUDI-6635] Add a new Hudi Parquet File Format supporting MOR and Bootstrap
queries in Spark (#9276)
Co-authored-by: Jonathan Vexler <=>
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../spark/sql/HoodieCatalystPlansUtils.scala | 10 +
.../org/apache/spark/sql/hudi/SparkAdapter.scala | 10 +-
...org.apache.spark.sql.sources.DataSourceRegister | 2 +-
.../scala/org/apache/hudi/DataSourceOptions.scala | 9 +
.../main/scala/org/apache/hudi/DefaultSource.scala | 28 +-
.../scala/org/apache/hudi/HoodieBaseRelation.scala | 6 +-
.../org/apache/hudi/HoodieDataSourceHelper.scala | 2 +-
.../scala/org/apache/hudi/HoodieFileIndex.scala | 55 +++-
.../org/apache/hudi/IncrementalRelation.scala | 4 +-
.../src/main/scala/org/apache/hudi/Iterators.scala | 47 ++-
.../hudi/NewHoodieParquetFileFormatUtils.scala | 212 ++++++++++++
.../apache/hudi/PartitionFileSliceMapping.scala | 77 +++++
.../apache/hudi/SparkHoodieTableFileIndex.scala | 4 +-
...t.scala => LegacyHoodieParquetFileFormat.scala} | 13 +-
.../parquet/NewHoodieParquetFileFormat.scala | 362 +++++++++++++++++++++
.../spark/sql/hudi/analysis/HoodieAnalysis.scala | 3 +
.../apache/hudi/functional/TestBootstrapRead.java | 250 +-------------
...otstrapRead.java => TestBootstrapReadBase.java} | 87 ++---
.../functional/TestNewHoodieParquetFileFormat.java | 138 ++++++++
.../spark/sql/HoodieSpark2CatalystPlanUtils.scala | 16 +-
.../apache/spark/sql/adapter/Spark2Adapter.scala | 13 +-
... => Spark24LegacyHoodieParquetFileFormat.scala} | 2 +-
.../spark/sql/adapter/BaseSpark3Adapter.scala | 5 +
.../spark/sql/HoodieSpark30CatalystPlanUtils.scala | 16 +-
.../apache/spark/sql/adapter/Spark3_0Adapter.scala | 6 +-
... => Spark30LegacyHoodieParquetFileFormat.scala} | 6 +-
.../spark/sql/HoodieSpark31CatalystPlanUtils.scala | 16 +-
.../apache/spark/sql/adapter/Spark3_1Adapter.scala | 6 +-
... => Spark31LegacyHoodieParquetFileFormat.scala} | 6 +-
.../spark/sql/HoodieSpark32CatalystPlanUtils.scala | 16 +-
.../apache/spark/sql/adapter/Spark3_2Adapter.scala | 8 +-
... => Spark32LegacyHoodieParquetFileFormat.scala} | 11 +-
.../spark/sql/HoodieSpark33CatalystPlanUtils.scala | 16 +-
.../apache/spark/sql/adapter/Spark3_3Adapter.scala | 8 +-
... => Spark33LegacyHoodieParquetFileFormat.scala} | 6 +-
.../spark/sql/HoodieSpark34CatalystPlanUtils.scala | 16 +-
.../apache/spark/sql/adapter/Spark3_4Adapter.scala | 10 +-
... => Spark34LegacyHoodieParquetFileFormat.scala} | 6 +-
rfc/README.md | 2 +-
39 files changed, 1104 insertions(+), 406 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
index 88ac1693318..58789681c54 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
@@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
+import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.internal.SQLConf
trait HoodieCatalystPlansUtils {
@@ -77,6 +78,15 @@ trait HoodieCatalystPlansUtils {
*/
def unapplyMergeIntoTable(plan: LogicalPlan): Option[(LogicalPlan,
LogicalPlan, Expression)]
+
+ /**
+ * Spark requires file formats to append the partition path fields to the
end of the schema.
+ * For tables where the partition path fields are not at the end of the
schema, we don't want
+ * to return the schema in the wrong order when they do a query like "select
*". To fix this
+ * behavior, we apply a projection onto FileScan when the file format is
NewHudiParquetFileFormat
+ */
+ def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): LogicalPlan
+
/**
* Decomposes [[InsertIntoStatement]] into its arguments allowing to
accommodate for API
* changes in Spark 3.3
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index 782a49ac189..041beba95df 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -26,11 +26,10 @@ import org.apache.spark.sql._
import org.apache.spark.sql.avro.{HoodieAvroDeserializer,
HoodieAvroSchemaConverters, HoodieAvroSerializer}
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
-import org.apache.spark.sql.catalyst.plans.JoinType
-import org.apache.spark.sql.catalyst.plans.logical.{Command, Join, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
import org.apache.spark.sql.catalyst.util.DateFormatter
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.execution.datasources._
@@ -38,6 +37,7 @@ import
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.parser.HoodieExtendedParserInterface
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.{DataType, Metadata, StructType}
+import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
import org.apache.spark.storage.StorageLevel
import java.util.{Locale, TimeZone}
@@ -165,7 +165,9 @@ trait SparkAdapter extends Serializable {
/**
* Create instance of [[ParquetFileFormat]]
*/
- def createHoodieParquetFileFormat(appendPartitionValues: Boolean):
Option[ParquetFileFormat]
+ def createLegacyHoodieParquetFileFormat(appendPartitionValues: Boolean):
Option[ParquetFileFormat]
+
+ def makeColumnarBatch(vectors: Array[ColumnVector], numRows: Int):
ColumnarBatch
/**
* Create instance of [[InterpretedPredicate]]
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
b/hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index 556b0feef1c..f4029445f61 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -17,4 +17,4 @@
org.apache.hudi.DefaultSource
-org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
\ No newline at end of file
+org.apache.spark.sql.execution.datasources.parquet.LegacyHoodieParquetFileFormat
\ No newline at end of file
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 59c2a60a3ad..6e14e262d2c 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -87,6 +87,15 @@ object DataSourceReadOptions {
s"payload implementation to merge (${REALTIME_PAYLOAD_COMBINE_OPT_VAL})
or skip merging altogether" +
s"${REALTIME_SKIP_MERGE_OPT_VAL}")
+ val USE_NEW_HUDI_PARQUET_FILE_FORMAT: ConfigProperty[String] = ConfigProperty
+ .key("hoodie.datasource.read.use.new.parquet.file.format")
+ .defaultValue("false")
+ .markAdvanced()
+ .sinceVersion("0.14.0")
+ .withDocumentation("Read using the new Hudi parquet file format. The new
Hudi parquet file format is " +
+ "introduced as an experimental feature in 0.14.0. Currently, the new
Hudi parquet file format only applies " +
+ "to bootstrap and MOR queries. Schema evolution is also not supported by
the new file format.")
+
val READ_PATHS: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.paths")
.noDefaultValue()
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 3e5cf351ba1..5ecf250eaab 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -246,6 +246,16 @@ object DefaultSource {
} else if (isCdcQuery) {
CDCRelation.getCDCRelation(sqlContext, metaClient, parameters)
} else {
+ lazy val newHudiFileFormatUtils = if
(parameters.getOrElse(USE_NEW_HUDI_PARQUET_FILE_FORMAT.key,
+ USE_NEW_HUDI_PARQUET_FILE_FORMAT.defaultValue).toBoolean && (globPaths
== null || globPaths.isEmpty)
+ && parameters.getOrElse(REALTIME_MERGE.key(),
REALTIME_MERGE.defaultValue())
+ .equalsIgnoreCase(REALTIME_PAYLOAD_COMBINE_OPT_VAL)) {
+ val formatUtils = new NewHoodieParquetFileFormatUtils(sqlContext,
metaClient, parameters, userSchema)
+ if (formatUtils.hasSchemaOnRead) Option.empty else Some(formatUtils)
+ } else {
+ Option.empty
+ }
+
(tableType, queryType, isBootstrappedTable) match {
case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
(COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
@@ -256,16 +266,28 @@ object DefaultSource {
new IncrementalRelation(sqlContext, parameters, userSchema,
metaClient)
case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
- new MergeOnReadSnapshotRelation(sqlContext, parameters, metaClient,
globPaths, userSchema)
+ if (newHudiFileFormatUtils.isEmpty) {
+ new MergeOnReadSnapshotRelation(sqlContext, parameters,
metaClient, globPaths, userSchema)
+ } else {
+ newHudiFileFormatUtils.get.getHadoopFsRelation(isMOR = true,
isBootstrap = false)
+ }
case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
new MergeOnReadIncrementalRelation(sqlContext, parameters,
metaClient, userSchema)
case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, true) =>
- new HoodieBootstrapMORRelation(sqlContext, userSchema, globPaths,
metaClient, parameters)
+ if (newHudiFileFormatUtils.isEmpty) {
+ new HoodieBootstrapMORRelation(sqlContext, userSchema, globPaths,
metaClient, parameters)
+ } else {
+ newHudiFileFormatUtils.get.getHadoopFsRelation(isMOR = true,
isBootstrap = true)
+ }
case (_, _, true) =>
- resolveHoodieBootstrapRelation(sqlContext, globPaths, userSchema,
metaClient, parameters)
+ if (newHudiFileFormatUtils.isEmpty) {
+ resolveHoodieBootstrapRelation(sqlContext, globPaths, userSchema,
metaClient, parameters)
+ } else {
+ newHudiFileFormatUtils.get.getHadoopFsRelation(isMOR = false,
isBootstrap = true)
+ }
case (_, _, _) =>
throw new HoodieException(s"Invalid query type : $queryType for
tableType: $tableType," +
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 2f9579d629e..fea7781f84d 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -56,7 +56,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression,
SubqueryExpression
import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
-import
org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormat,
ParquetFileFormat}
+import
org.apache.spark.sql.execution.datasources.parquet.{LegacyHoodieParquetFileFormat,
ParquetFileFormat}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
import org.apache.spark.sql.types.StructType
@@ -241,8 +241,8 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
case HoodieFileFormat.PARQUET =>
// We're delegating to Spark to append partition values to every row
only in cases
// when these corresponding partition-values are not persisted w/in
the data file itself
- val parquetFileFormat =
sparkAdapter.createHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get
- (parquetFileFormat, HoodieParquetFileFormat.FILE_FORMAT_ID)
+ val parquetFileFormat =
sparkAdapter.createLegacyHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get
+ (parquetFileFormat, LegacyHoodieParquetFileFormat.FILE_FORMAT_ID)
}
/**
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
index 49db12b792c..eb8ddfdf870 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
@@ -52,7 +52,7 @@ object HoodieDataSourceHelper extends PredicateHelper with
SparkAdapterSupport {
options: Map[String, String],
hadoopConf: Configuration,
appendPartitionValues: Boolean =
false): PartitionedFile => Iterator[InternalRow] = {
- val parquetFileFormat: ParquetFileFormat =
sparkAdapter.createHoodieParquetFileFormat(appendPartitionValues).get
+ val parquetFileFormat: ParquetFileFormat =
sparkAdapter.createLegacyHoodieParquetFileFormat(appendPartitionValues).get
val readParquetFile: PartitionedFile => Iterator[Any] =
parquetFileFormat.buildReaderWithPartitionValues(
sparkSession = sparkSession,
dataSchema = dataSchema,
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 9791d39e280..964ef970c91 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -18,7 +18,7 @@
package org.apache.hudi
import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hudi.HoodieFileIndex.{DataSkippingFailureMode,
collectReferencedColumns, getConfigProperties}
+import org.apache.hudi.HoodieFileIndex.{DataSkippingFailureMode,
collectReferencedColumns, convertFilterForTimestampKeyGenerator,
getConfigProperties}
import org.apache.hudi.HoodieSparkConfUtils.getConfigValue
import
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT,
TIMESTAMP_OUTPUT_DATE_FORMAT}
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
@@ -100,6 +100,8 @@ case class HoodieFileIndex(spark: SparkSession,
override def rootPaths: Seq[Path] = getQueryPaths.asScala
+ var shouldBroadcast: Boolean = false
+
/**
* Returns the FileStatus for all the base files (excluding log files). This
should be used only for
* cases where Spark directly fetches the list of files via HoodieFileIndex
or for read optimized query logic
@@ -142,26 +144,49 @@ case class HoodieFileIndex(spark: SparkSession,
override def listFiles(partitionFilters: Seq[Expression], dataFilters:
Seq[Expression]): Seq[PartitionDirectory] = {
val prunedPartitionsAndFilteredFileSlices = filterFileSlices(dataFilters,
partitionFilters).map {
case (partitionOpt, fileSlices) =>
- val allCandidateFiles: Seq[FileStatus] = fileSlices.flatMap(fs => {
- val baseFileStatusOpt =
getBaseFileStatus(Option.apply(fs.getBaseFile.orElse(null)))
- val logFilesStatus = if (includeLogFiles) {
-
fs.getLogFiles.map[FileStatus](JFunction.toJavaFunction[HoodieLogFile,
FileStatus](lf => lf.getFileStatus))
+ if (shouldBroadcast) {
+ val baseFileStatusesAndLogFileOnly: Seq[FileStatus] =
fileSlices.map(slice => {
+ if (slice.getBaseFile.isPresent) {
+ slice.getBaseFile.get().getFileStatus
+ } else if (slice.getLogFiles.findAny().isPresent) {
+ slice.getLogFiles.findAny().get().getFileStatus
+ } else {
+ null
+ }
+ }).filter(slice => slice != null)
+ val c = fileSlices.filter(f => f.getLogFiles.findAny().isPresent
+ || (f.getBaseFile.isPresent &&
f.getBaseFile.get().getBootstrapBaseFile.isPresent)).
+ foldLeft(Map[String, FileSlice]()) { (m, f) => m + (f.getFileId ->
f) }
+ if (c.nonEmpty) {
+ PartitionDirectory(new
PartitionFileSliceMapping(InternalRow.fromSeq(partitionOpt.get.values),
spark.sparkContext.broadcast(c)), baseFileStatusesAndLogFileOnly)
} else {
- java.util.stream.Stream.empty()
+ PartitionDirectory(InternalRow.fromSeq(partitionOpt.get.values),
baseFileStatusesAndLogFileOnly)
}
- val files =
logFilesStatus.collect(Collectors.toList[FileStatus]).asScala
- baseFileStatusOpt.foreach(f => files.append(f))
- files
- })
- PartitionDirectory(InternalRow.fromSeq(partitionOpt.get.values),
allCandidateFiles)
+ } else {
+ val allCandidateFiles: Seq[FileStatus] = fileSlices.flatMap(fs => {
+ val baseFileStatusOpt =
getBaseFileStatus(Option.apply(fs.getBaseFile.orElse(null)))
+ val logFilesStatus = if (includeLogFiles) {
+
fs.getLogFiles.map[FileStatus](JFunction.toJavaFunction[HoodieLogFile,
FileStatus](lf => lf.getFileStatus))
+ } else {
+ java.util.stream.Stream.empty()
+ }
+ val files =
logFilesStatus.collect(Collectors.toList[FileStatus]).asScala
+ baseFileStatusOpt.foreach(f => files.append(f))
+ files
+ })
+ PartitionDirectory(InternalRow.fromSeq(partitionOpt.get.values),
allCandidateFiles)
+ }
}
hasPushedDownPartitionPredicates = true
if (shouldReadAsPartitionedTable()) {
prunedPartitionsAndFilteredFileSlices
- } else {
+ } else if (shouldBroadcast) {
+ assert(partitionSchema.isEmpty)
+ prunedPartitionsAndFilteredFileSlices
+ }else {
Seq(PartitionDirectory(InternalRow.empty,
prunedPartitionsAndFilteredFileSlices.flatMap(_.files)))
}
}
@@ -244,7 +269,11 @@ case class HoodieFileIndex(spark: SparkSession,
// Prune the partition path by the partition filters
// NOTE: Non-partitioned tables are assumed to consist from a single
partition
// encompassing the whole table
- val prunedPartitions = listMatchingPartitionPaths (partitionFilters)
+ val prunedPartitions = if (shouldBroadcast) {
+
listMatchingPartitionPaths(convertFilterForTimestampKeyGenerator(metaClient,
partitionFilters))
+ } else {
+ listMatchingPartitionPaths(partitionFilters)
+ }
getInputFileSlices(prunedPartitions: _*).asScala.toSeq.map(
{ case (partition, fileSlices) => (Option.apply(partition),
fileSlices.asScala) })
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index 6daa84e1b42..79dad997b4c 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -37,7 +37,7 @@ import org.apache.hudi.internal.schema.utils.SerDeHelper
import org.apache.hudi.table.HoodieSparkTable
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.rdd.RDD
-import
org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
+import
org.apache.spark.sql.execution.datasources.parquet.LegacyHoodieParquetFileFormat
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SQLContext}
@@ -206,7 +206,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH,
metaClient.getBasePath)
sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST,
validCommits)
val formatClassName = metaClient.getTableConfig.getBaseFileFormat match {
- case HoodieFileFormat.PARQUET => HoodieParquetFileFormat.FILE_FORMAT_ID
+ case HoodieFileFormat.PARQUET =>
LegacyHoodieParquetFileFormat.FILE_FORMAT_ID
case HoodieFileFormat.ORC => "orc"
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
index 942ade81cc5..054fcc799d7 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
@@ -30,7 +30,7 @@ import org.apache.hudi.LogFileIterator._
import org.apache.hudi.common.config.{HoodieCommonConfig,
HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.engine.{EngineType, HoodieLocalEngineContext}
import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
+import org.apache.hudi.common.fs.FSUtils.{buildInlineConf,
getRelativePartitionPath}
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
import org.apache.hudi.common.model._
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner
@@ -61,15 +61,26 @@ import scala.util.Try
class LogFileIterator(logFiles: List[HoodieLogFile],
partitionPath: Path,
tableSchema: HoodieTableSchema,
- requiredSchema: HoodieTableSchema,
+ requiredStructTypeSchema: StructType,
+ requiredAvroSchema: Schema,
tableState: HoodieTableState,
config: Configuration)
extends CachingIterator[InternalRow] with AvroDeserializerSupport {
+ def this(logFiles: List[HoodieLogFile],
+ partitionPath: Path,
+ tableSchema: HoodieTableSchema,
+ requiredSchema: HoodieTableSchema,
+ tableState: HoodieTableState,
+ config: Configuration) {
+ this(logFiles, partitionPath, tableSchema, requiredSchema.structTypeSchema,
+ new Schema.Parser().parse(requiredSchema.avroSchemaStr), tableState,
config)
+ }
def this(split: HoodieMergeOnReadFileSplit,
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
- tableState: HoodieTableState, config: Configuration) {
+ tableState: HoodieTableState,
+ config: Configuration) {
this(split.logFiles, getPartitionPath(split), tableSchema, requiredSchema,
tableState, config)
}
private val maxCompactionMemoryInBytes: Long =
getMaxCompactionMemoryInBytes(new JobConf(config))
@@ -83,8 +94,8 @@ class LogFileIterator(logFiles: List[HoodieLogFile],
}
.getOrElse(new TypedProperties())
- protected override val avroSchema: Schema = new
Schema.Parser().parse(requiredSchema.avroSchemaStr)
- protected override val structTypeSchema: StructType =
requiredSchema.structTypeSchema
+ protected override val avroSchema: Schema = requiredAvroSchema
+ protected override val structTypeSchema: StructType =
requiredStructTypeSchema
protected val logFileReaderAvroSchema: Schema = new
Schema.Parser().parse(tableSchema.avroSchemaStr)
protected val logFileReaderStructType: StructType =
tableSchema.structTypeSchema
@@ -142,20 +153,22 @@ class LogFileIterator(logFiles: List[HoodieLogFile],
* Base file as well as all of the Delta Log files simply returning
concatenation of these streams, while not
* performing any combination/merging of the records w/ the same primary keys
(ie producing duplicates potentially)
*/
-private class SkipMergeIterator(logFiles: List[HoodieLogFile],
+class SkipMergeIterator(logFiles: List[HoodieLogFile],
partitionPath: Path,
baseFileIterator: Iterator[InternalRow],
readerSchema: StructType,
dataSchema: HoodieTableSchema,
- requiredSchema: HoodieTableSchema,
+ requiredStructTypeSchema: StructType,
+ requiredAvroSchema: Schema,
tableState: HoodieTableState,
config: Configuration)
- extends LogFileIterator(logFiles, partitionPath, dataSchema, requiredSchema,
tableState, config) {
+ extends LogFileIterator(logFiles, partitionPath, dataSchema,
requiredStructTypeSchema, requiredAvroSchema, tableState, config) {
def this(split: HoodieMergeOnReadFileSplit, baseFileReader: BaseFileReader,
dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema, tableState: HoodieTableState,
config: Configuration) {
this(split.logFiles, getPartitionPath(split),
baseFileReader(split.dataFile.get),
- baseFileReader.schema, dataSchema, requiredSchema, tableState, config)
+ baseFileReader.schema, dataSchema, requiredSchema.structTypeSchema,
+ new Schema.Parser().parse(requiredSchema.avroSchemaStr), tableState,
config)
}
private val requiredSchemaProjection =
generateUnsafeProjection(readerSchema, structTypeSchema)
@@ -181,11 +194,23 @@ class RecordMergingFileIterator(logFiles:
List[HoodieLogFile],
baseFileIterator: Iterator[InternalRow],
readerSchema: StructType,
dataSchema: HoodieTableSchema,
- requiredSchema: HoodieTableSchema,
+ requiredStructTypeSchema: StructType,
+ requiredAvroSchema: Schema,
tableState: HoodieTableState,
config: Configuration)
- extends LogFileIterator(logFiles, partitionPath, dataSchema, requiredSchema,
tableState, config) {
+ extends LogFileIterator(logFiles, partitionPath, dataSchema,
requiredStructTypeSchema, requiredAvroSchema, tableState, config) {
+ def this(logFiles: List[HoodieLogFile],
+ partitionPath: Path,
+ baseFileIterator: Iterator[InternalRow],
+ readerSchema: StructType,
+ dataSchema: HoodieTableSchema,
+ requiredSchema: HoodieTableSchema,
+ tableState: HoodieTableState,
+ config: Configuration) {
+ this(logFiles, partitionPath, baseFileIterator, readerSchema, dataSchema,
requiredSchema.structTypeSchema,
+ new Schema.Parser().parse(requiredSchema.avroSchemaStr), tableState,
config)
+ }
def this(split: HoodieMergeOnReadFileSplit, baseFileReader: BaseFileReader,
dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema, tableState: HoodieTableState,
config: Configuration) {
this(split.logFiles, getPartitionPath(split),
baseFileReader(split.dataFile.get),
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala
new file mode 100644
index 00000000000..5dd85c973b6
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hudi.HoodieBaseRelation._
+import org.apache.hudi.HoodieConversionUtils.toScalaOption
+import org.apache.hudi.common.config.ConfigProperty
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieRecord
+import
org.apache.hudi.common.model.HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX
+import org.apache.hudi.common.table.timeline.HoodieTimeline
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
TableSchemaResolver}
+import org.apache.hudi.common.util.ValidationUtils.checkState
+import org.apache.hudi.common.util.{ConfigUtils, StringUtils}
+import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
+import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
+import org.apache.spark.sql.catalyst.analysis.Resolver
+import
org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat
+import org.apache.spark.sql.execution.datasources.{FileStatusCache,
HadoopFsRelation}
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{SQLContext, SparkSession}
+
+import scala.collection.JavaConverters._
+import scala.util.{Failure, Success, Try}
+
+class NewHoodieParquetFileFormatUtils(val sqlContext: SQLContext,
+ val metaClient: HoodieTableMetaClient,
+ val optParamsInput: Map[String, String],
+ private val schemaSpec:
Option[StructType]) extends SparkAdapterSupport {
+ protected val sparkSession: SparkSession = sqlContext.sparkSession
+
+ protected val optParams: Map[String, String] = optParamsInput.filter(kv =>
!kv._1.equals(DATA_QUERIES_ONLY.key()))
+ protected def tableName: String = metaClient.getTableConfig.getTableName
+
+ protected lazy val resolver: Resolver =
sparkSession.sessionState.analyzer.resolver
+
+ private lazy val metaFieldNames =
HoodieRecord.HOODIE_META_COLUMNS.asScala.toSet
+
+ protected lazy val conf: Configuration = new
Configuration(sqlContext.sparkContext.hadoopConfiguration)
+ protected lazy val jobConf = new JobConf(conf)
+
+ protected lazy val tableConfig: HoodieTableConfig = metaClient.getTableConfig
+
+ protected lazy val basePath: Path = metaClient.getBasePathV2
+
+ protected lazy val (tableAvroSchema: Schema, internalSchemaOpt:
Option[InternalSchema]) = {
+ val schemaResolver = new TableSchemaResolver(metaClient)
+ val internalSchemaOpt = if (!isSchemaEvolutionEnabledOnRead(optParams,
sparkSession)) {
+ None
+ } else {
+ Try {
+
specifiedQueryTimestamp.map(schemaResolver.getTableInternalSchemaFromCommitMetadata)
+ .getOrElse(schemaResolver.getTableInternalSchemaFromCommitMetadata)
+ } match {
+ case Success(internalSchemaOpt) => toScalaOption(internalSchemaOpt)
+ case Failure(e) =>
+ None
+ }
+ }
+
+ val (name, namespace) =
AvroConversionUtils.getAvroRecordNameAndNamespace(tableName)
+ val avroSchema = internalSchemaOpt.map { is =>
+ AvroInternalSchemaConverter.convert(is, namespace + "." + name)
+ } orElse {
+ specifiedQueryTimestamp.map(schemaResolver.getTableAvroSchema)
+ } orElse {
+ schemaSpec.map(s => convertToAvroSchema(s, tableName))
+ } getOrElse {
+ Try(schemaResolver.getTableAvroSchema) match {
+ case Success(schema) => schema
+ case Failure(e) =>
+ throw new HoodieSchemaException("Failed to fetch schema from the
table")
+ }
+ }
+
+ (avroSchema, internalSchemaOpt)
+ }
+
+ protected lazy val tableStructSchema: StructType = {
+ val converted =
AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
+ val metaFieldMetadata = sparkAdapter.createCatalystMetadataForMetaField
+
+ // NOTE: Here we annotate meta-fields with corresponding metadata such
that Spark (>= 3.2)
+ // is able to recognize such fields as meta-fields
+ StructType(converted.map { field =>
+ if (metaFieldNames.exists(metaFieldName => resolver(metaFieldName,
field.name))) {
+ field.copy(metadata = metaFieldMetadata)
+ } else {
+ field
+ }
+ })
+ }
+
+ protected lazy val preCombineFieldOpt: Option[String] =
+ Option(tableConfig.getPreCombineField)
+ .orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key))
match {
+ // NOTE: This is required to compensate for cases when empty string is
used to stub
+ // property value to avoid it being set with the default value
+ // TODO(HUDI-3456) cleanup
+ case Some(f) if !StringUtils.isNullOrEmpty(f) => Some(f)
+ case _ => None
+ }
+
+ protected def timeline: HoodieTimeline =
+ // NOTE: We're including compaction here since it's not considering a
"commit" operation
+ metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants
+
+ protected lazy val recordKeyField: String =
+ if (tableConfig.populateMetaFields()) {
+ HoodieRecord.RECORD_KEY_METADATA_FIELD
+ } else {
+ val keyFields = tableConfig.getRecordKeyFields.get()
+ checkState(keyFields.length == 1)
+ keyFields.head
+ }
+
+ private def queryTimestamp: Option[String] =
+
specifiedQueryTimestamp.orElse(toScalaOption(timeline.lastInstant()).map(_.getTimestamp))
+
+ protected lazy val specifiedQueryTimestamp: Option[String] =
+ optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key)
+ .map(HoodieSqlCommonUtils.formatQueryInstant)
+
+ private def getConfigValue(config: ConfigProperty[String],
+ defaultValueOption: Option[String] =
Option.empty): String = {
+ optParams.getOrElse(config.key(),
+ sqlContext.getConf(config.key(),
defaultValueOption.getOrElse(config.defaultValue())))
+ }
+
+ protected val mergeType: String =
optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key,
+ DataSourceReadOptions.REALTIME_MERGE.defaultValue)
+
+ protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = {
+ // Controls whether partition columns (which are the source for the
partition path values) should
+ // be omitted from persistence in the data files. On the read path it
affects whether partition values (values
+ // of partition columns) will be read from the data file or extracted from
partition path
+ val shouldOmitPartitionColumns =
metaClient.getTableConfig.shouldDropPartitionColumns &&
partitionColumns.nonEmpty
+ val shouldExtractPartitionValueFromPath =
+
optParams.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key,
+
DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean
+ val shouldUseBootstrapFastRead =
optParams.getOrElse(DATA_QUERIES_ONLY.key(), "false").toBoolean
+
+ shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath ||
shouldUseBootstrapFastRead
+ }
+
+ protected lazy val mandatoryFieldsForMerging: Seq[String] =
+ Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
+
+ protected lazy val partitionColumns: Array[String] =
tableConfig.getPartitionFields.orElse(Array.empty)
+
+ def hasSchemaOnRead: Boolean = internalSchemaOpt.isDefined
+
+ def getHadoopFsRelation(isMOR: Boolean, isBootstrap: Boolean): BaseRelation
= {
+
+ val fileIndex = HoodieFileIndex(sparkSession, metaClient,
Some(tableStructSchema), optParams, FileStatusCache.getOrCreate(sparkSession),
isMOR)
+ val recordMergerImpls =
ConfigUtils.split2List(getConfigValue(HoodieWriteConfig.RECORD_MERGER_IMPLS)).asScala.toList
+ val recordMergerStrategy =
getConfigValue(HoodieWriteConfig.RECORD_MERGER_STRATEGY,
+ Option(metaClient.getTableConfig.getRecordMergerStrategy))
+
+ val tableState = // Subset of the state of table's configuration as of at
the time of the query
+ HoodieTableState(
+ tablePath = basePath.toString,
+ latestCommitTimestamp = queryTimestamp,
+ recordKeyField = recordKeyField,
+ preCombineFieldOpt = preCombineFieldOpt,
+ usesVirtualKeys = !tableConfig.populateMetaFields(),
+ recordPayloadClassName = tableConfig.getPayloadClass,
+ metadataConfig = fileIndex.metadataConfig,
+ recordMergerImpls = recordMergerImpls,
+ recordMergerStrategy = recordMergerStrategy
+ )
+
+ val mandatoryFields = if (isMOR) {
+ mandatoryFieldsForMerging
+ } else {
+ Seq.empty
+ }
+ fileIndex.shouldBroadcast = true
+ HadoopFsRelation(
+ location = fileIndex,
+ partitionSchema = fileIndex.partitionSchema,
+ dataSchema = fileIndex.dataSchema,
+ bucketSpec = None,
+ fileFormat = new
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
+
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt)),
+ metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
isMOR, isBootstrap),
+ optParams)(sparkSession)
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionFileSliceMapping.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionFileSliceMapping.scala
new file mode 100644
index 00000000000..c9468e2d601
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionFileSliceMapping.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.model.FileSlice
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
+import org.apache.spark.sql.types.{DataType, Decimal}
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+
+class PartitionFileSliceMapping(internalRow: InternalRow,
+ broadcast: Broadcast[Map[String, FileSlice]])
extends InternalRow {
+
+ def getSlice(fileId: String): Option[FileSlice] = {
+ broadcast.value.get(fileId)
+ }
+
+ def getInternalRow: InternalRow = internalRow
+
+ override def numFields: Int = internalRow.numFields
+
+ override def setNullAt(i: Int): Unit = internalRow.setNullAt(i)
+
+ override def update(i: Int, value: Any): Unit = internalRow.update(i, value)
+
+ override def copy(): InternalRow = new
PartitionFileSliceMapping(internalRow.copy(), broadcast)
+
+ override def isNullAt(ordinal: Int): Boolean = internalRow.isNullAt(ordinal)
+
+ override def getBoolean(ordinal: Int): Boolean =
internalRow.getBoolean(ordinal)
+
+ override def getByte(ordinal: Int): Byte = internalRow.getByte(ordinal)
+
+ override def getShort(ordinal: Int): Short = internalRow.getShort(ordinal)
+
+ override def getInt(ordinal: Int): Int = internalRow.getInt(ordinal)
+
+ override def getLong(ordinal: Int): Long = internalRow.getLong(ordinal)
+
+ override def getFloat(ordinal: Int): Float = internalRow.getFloat(ordinal)
+
+ override def getDouble(ordinal: Int): Double = internalRow.getDouble(ordinal)
+
+ override def getDecimal(ordinal: Int, precision: Int, scale: Int): Decimal =
internalRow.getDecimal(ordinal, precision, scale)
+
+ override def getUTF8String(ordinal: Int): UTF8String =
internalRow.getUTF8String(ordinal)
+
+ override def getBinary(ordinal: Int): Array[Byte] =
internalRow.getBinary(ordinal)
+
+ override def getInterval(ordinal: Int): CalendarInterval =
internalRow.getInterval(ordinal)
+
+ override def getStruct(ordinal: Int, numFields: Int): InternalRow =
internalRow.getStruct(ordinal, numFields)
+
+ override def getArray(ordinal: Int): ArrayData =
internalRow.getArray(ordinal)
+
+ override def getMap(ordinal: Int): MapData = internalRow.getMap(ordinal)
+
+ override def get(ordinal: Int, dataType: DataType): AnyRef =
internalRow.get(ordinal, dataType)
+}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index b3d9e5659e8..d1b6df6619d 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -47,7 +47,7 @@ import java.util.Collections
import javax.annotation.concurrent.NotThreadSafe
import scala.collection.JavaConverters._
import scala.language.implicitConversions
-import scala.util.{Failure, Success, Try}
+import scala.util.{Success, Try}
/**
* Implementation of the [[BaseHoodieTableFileIndex]] for Spark
@@ -96,7 +96,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema)
})
- protected lazy val shouldFastBootstrap: Boolean =
configProperties.getBoolean(DATA_QUERIES_ONLY.key, false)
+ protected lazy val shouldFastBootstrap =
configProperties.getBoolean(DATA_QUERIES_ONLY.key, false)
private lazy val sparkParsePartitionUtil =
sparkAdapter.getSparkParsePartitionUtil
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/LegacyHoodieParquetFileFormat.scala
similarity index 85%
rename from
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala
rename to
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/LegacyHoodieParquetFileFormat.scala
index 7f494af37af..046640c11c1 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/LegacyHoodieParquetFileFormat.scala
@@ -23,12 +23,15 @@ import org.apache.hudi.{DataSourceReadOptions,
HoodieSparkUtils, SparkAdapterSup
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
-import
org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat.FILE_FORMAT_ID
+import
org.apache.spark.sql.execution.datasources.parquet.LegacyHoodieParquetFileFormat.FILE_FORMAT_ID
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{AtomicType, StructType}
-
-class HoodieParquetFileFormat extends ParquetFileFormat with
SparkAdapterSupport {
+/**
+ * This legacy parquet file format implementation to support Hudi will be
replaced by
+ * [[NewHoodieParquetFileFormat]] in the future.
+ */
+class LegacyHoodieParquetFileFormat extends ParquetFileFormat with
SparkAdapterSupport {
override def shortName(): String = FILE_FORMAT_ID
@@ -55,11 +58,11 @@ class HoodieParquetFileFormat extends ParquetFileFormat
with SparkAdapterSupport
DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean
sparkAdapter
-
.createHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get
+
.createLegacyHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get
.buildReaderWithPartitionValues(sparkSession, dataSchema,
partitionSchema, requiredSchema, filters, options, hadoopConf)
}
}
-object HoodieParquetFileFormat {
+object LegacyHoodieParquetFileFormat {
val FILE_FORMAT_ID = "hoodie-parquet"
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
new file mode 100644
index 00000000000..0c1c3c8e5ee
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import
org.apache.hudi.DataSourceReadOptions.{REALTIME_PAYLOAD_COMBINE_OPT_VAL,
REALTIME_SKIP_MERGE_OPT_VAL}
+import org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.{BaseFile, FileSlice, HoodieLogFile,
HoodieRecord}
+import org.apache.hudi.common.util.ValidationUtils.checkState
+import org.apache.hudi.{HoodieBaseRelation, HoodieSparkUtils,
HoodieTableSchema, HoodieTableState, LogFileIterator,
MergeOnReadSnapshotRelation, PartitionFileSliceMapping,
RecordMergingFileIterator, SkipMergeIterator, SparkAdapterSupport}
+import org.apache.spark.broadcast.Broadcast
+import
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
+import org.apache.spark.util.SerializableConfiguration
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters.asScalaIteratorConverter
+
+/**
+ * This class does bootstrap and MOR merging so that we can use hadoopfs
relation.
+ */
+class NewHoodieParquetFileFormat(tableState: Broadcast[HoodieTableState],
+ tableSchema: Broadcast[HoodieTableSchema],
+ tableName: String,
+ mergeType: String,
+ mandatoryFields: Seq[String],
+ isMOR: Boolean,
+ isBootstrap: Boolean) extends
ParquetFileFormat with SparkAdapterSupport {
+
+ override def isSplitable(sparkSession: SparkSession,
+ options: Map[String, String],
+ path: Path): Boolean = {
+ false
+ }
+
+ //Used so that the planner only projects once and does not stack overflow
+ var isProjected = false
+
+ /**
+ * Support batch needs to remain consistent, even if one side of a bootstrap
merge can support
+ * while the other side can't
+ */
+ private var supportBatchCalled = false
+ private var supportBatchResult = false
+ override def supportBatch(sparkSession: SparkSession, schema: StructType):
Boolean = {
+ if (!supportBatchCalled) {
+ supportBatchCalled = true
+ supportBatchResult = !isMOR && super.supportBatch(sparkSession, schema)
+ }
+ supportBatchResult
+ }
+
+ override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String],
+ hadoopConf: Configuration):
PartitionedFile => Iterator[InternalRow] = {
+
+ val outputSchema = StructType(requiredSchema.fields ++
partitionSchema.fields)
+
+ val requiredSchemaWithMandatory = if (!isMOR ||
MergeOnReadSnapshotRelation.isProjectionCompatible(tableState.value)) {
+ //add mandatory fields to required schema
+ val added: mutable.Buffer[StructField] = mutable.Buffer[StructField]()
+ for (field <- mandatoryFields) {
+ if (requiredSchema.getFieldIndex(field).isEmpty) {
+ val fieldToAdd =
dataSchema.fields(dataSchema.getFieldIndex(field).get)
+ added.append(fieldToAdd)
+ }
+ }
+ val addedFields = StructType(added.toArray)
+ StructType(requiredSchema.toArray ++ addedFields.fields)
+ } else {
+ dataSchema
+ }
+
+ val requiredSchemaSplits = requiredSchemaWithMandatory.fields.partition(f
=> HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name))
+ val requiredMeta = StructType(requiredSchemaSplits._1)
+ val requiredWithoutMeta = StructType(requiredSchemaSplits._2)
+ val needMetaCols = requiredMeta.nonEmpty
+ val needDataCols = requiredWithoutMeta.nonEmpty
+ // note: this is only the output of the bootstrap merge if isMOR. If it is
only bootstrap then the
+ // output will just be outputSchema
+ val bootstrapReaderOutput = StructType(requiredMeta.fields ++
requiredWithoutMeta.fields)
+
+ val skeletonReaderAppend = needMetaCols && isBootstrap && !(needDataCols
|| isMOR) && partitionSchema.nonEmpty
+ val bootstrapBaseAppend = needDataCols && isBootstrap && !isMOR &&
partitionSchema.nonEmpty
+
+ val (baseFileReader, preMergeBaseFileReader, skeletonReader,
bootstrapBaseReader) = buildFileReaders(sparkSession,
+ dataSchema, partitionSchema, requiredSchema, filters, options,
hadoopConf, requiredSchemaWithMandatory,
+ requiredWithoutMeta, requiredMeta)
+
+ val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
+ (file: PartitionedFile) => {
+ file.partitionValues match {
+ case broadcast: PartitionFileSliceMapping =>
+ val filePath =
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
+ if (FSUtils.isLogFile(filePath)) {
+ //no base file
+ val fileSlice =
broadcast.getSlice(FSUtils.getFileId(filePath.getName).substring(1)).get
+ val logFiles = getLogFilesFromSlice(fileSlice)
+ val outputAvroSchema =
HoodieBaseRelation.convertToAvroSchema(outputSchema, tableName)
+ new LogFileIterator(logFiles, filePath.getParent,
tableSchema.value, outputSchema, outputAvroSchema,
+ tableState.value, broadcastedHadoopConf.value.value)
+ } else {
+ //We do not broadcast the slice if it has no log files or
bootstrap base
+ broadcast.getSlice(FSUtils.getFileId(filePath.getName)) match {
+ case Some(fileSlice) =>
+ val hoodieBaseFile = fileSlice.getBaseFile.get()
+ val bootstrapFileOpt = hoodieBaseFile.getBootstrapBaseFile
+ val partitionValues = broadcast.getInternalRow
+ val logFiles = getLogFilesFromSlice(fileSlice)
+ if (requiredSchemaWithMandatory.isEmpty) {
+ val baseFile = createPartitionedFile(partitionValues,
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen)
+ baseFileReader(baseFile)
+ } else if (bootstrapFileOpt.isPresent) {
+ val bootstrapIterator =
buildBootstrapIterator(skeletonReader, bootstrapBaseReader,
+ skeletonReaderAppend, bootstrapBaseAppend,
bootstrapFileOpt.get(), hoodieBaseFile, partitionValues,
+ needMetaCols, needDataCols)
+ (isMOR, logFiles.nonEmpty) match {
+ case (true, true) =>
buildMergeOnReadIterator(bootstrapIterator, logFiles, filePath.getParent,
+ bootstrapReaderOutput, requiredSchemaWithMandatory,
outputSchema, partitionSchema, partitionValues,
+ broadcastedHadoopConf.value.value)
+ case (true, false) =>
appendPartitionAndProject(bootstrapIterator, bootstrapReaderOutput,
+ partitionSchema, outputSchema, partitionValues)
+ case (false, false) => bootstrapIterator
+ case (false, true) => throw new
IllegalStateException("should not be log files if not mor table")
+ }
+ } else {
+ if (logFiles.nonEmpty) {
+ val baseFile = createPartitionedFile(InternalRow.empty,
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen)
+ buildMergeOnReadIterator(preMergeBaseFileReader(baseFile),
logFiles, filePath.getParent, requiredSchemaWithMandatory,
+ requiredSchemaWithMandatory, outputSchema,
partitionSchema, partitionValues, broadcastedHadoopConf.value.value)
+ } else {
+ throw new IllegalStateException("should not be here since
file slice should not have been broadcasted since it has no log or data files")
+ //baseFileReader(baseFile)
+ }
+ }
+ case _ => baseFileReader(file)
+ }
+ }
+ case _ => baseFileReader(file)
+ }
+ }
+ }
+
+ /**
+ * Build file readers to read individual physical files
+ */
+ protected def buildFileReaders(sparkSession: SparkSession, dataSchema:
StructType, partitionSchema: StructType,
+ requiredSchema: StructType, filters: Seq[Filter],
options: Map[String, String],
+ hadoopConf: Configuration, requiredSchemaWithMandatory:
StructType,
+ requiredWithoutMeta: StructType, requiredMeta:
StructType):
+ (PartitionedFile => Iterator[InternalRow],
+ PartitionedFile => Iterator[InternalRow],
+ PartitionedFile => Iterator[InternalRow],
+ PartitionedFile => Iterator[InternalRow]) = {
+
+ //file reader when you just read a hudi parquet file and don't do any
merging
+
+ val baseFileReader = super.buildReaderWithPartitionValues(sparkSession,
dataSchema, partitionSchema, requiredSchema,
+ filters, options, new Configuration(hadoopConf))
+
+ //file reader for reading a hudi base file that needs to be merged with
log files
+ val preMergeBaseFileReader = if (isMOR) {
+ super.buildReaderWithPartitionValues(sparkSession, dataSchema,
StructType(Seq.empty),
+ requiredSchemaWithMandatory, Seq.empty, options, new
Configuration(hadoopConf))
+ } else {
+ _: PartitionedFile => Iterator.empty
+ }
+
+ //Rules for appending partitions and filtering in the bootstrap readers:
+ // 1. if it is mor, we don't want to filter data or append partitions
+ // 2. if we need to merge the bootstrap base and skeleton files then we
cannot filter
+ // 3. if we need to merge the bootstrap base and skeleton files then we
should never append partitions to the
+ // skeleton reader
+
+ val needMetaCols = requiredMeta.nonEmpty
+ val needDataCols = requiredWithoutMeta.nonEmpty
+
+ //file reader for bootstrap skeleton files
+ val skeletonReader = if (needMetaCols && isBootstrap) {
+ if (needDataCols || isMOR) {
+ // no filter and no append
+ super.buildReaderWithPartitionValues(sparkSession,
HoodieSparkUtils.getMetaSchema, StructType(Seq.empty),
+ requiredMeta, Seq.empty, options, new Configuration(hadoopConf))
+ } else {
+ // filter and append
+ super.buildReaderWithPartitionValues(sparkSession,
HoodieSparkUtils.getMetaSchema, partitionSchema,
+ requiredMeta, filters, options, new Configuration(hadoopConf))
+ }
+ } else {
+ _: PartitionedFile => Iterator.empty
+ }
+
+ //file reader for bootstrap base files
+ val bootstrapBaseReader = if (needDataCols && isBootstrap) {
+ val dataSchemaWithoutMeta = StructType(dataSchema.fields.filterNot(sf =>
isMetaField(sf.name)))
+ if (isMOR) {
+ // no filter and no append
+ super.buildReaderWithPartitionValues(sparkSession,
dataSchemaWithoutMeta, StructType(Seq.empty), requiredWithoutMeta,
+ Seq.empty, options, new Configuration(hadoopConf))
+ } else if (needMetaCols) {
+ // no filter but append
+ super.buildReaderWithPartitionValues(sparkSession,
dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta,
+ Seq.empty, options, new Configuration(hadoopConf))
+ } else {
+ // filter and append
+ super.buildReaderWithPartitionValues(sparkSession,
dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta,
+ filters, options, new Configuration(hadoopConf))
+ }
+ } else {
+ _: PartitionedFile => Iterator.empty
+ }
+
+ (baseFileReader, preMergeBaseFileReader, skeletonReader,
bootstrapBaseReader)
+ }
+
+ /**
+ * Create iterator for a file slice that has bootstrap base and skeleton file
+ */
+ protected def buildBootstrapIterator(skeletonReader: PartitionedFile =>
Iterator[InternalRow],
+ bootstrapBaseReader: PartitionedFile =>
Iterator[InternalRow],
+ skeletonReaderAppend: Boolean,
bootstrapBaseAppend: Boolean,
+ bootstrapBaseFile: BaseFile, hoodieBaseFile:
BaseFile,
+ partitionValues: InternalRow, needMetaCols:
Boolean,
+ needDataCols: Boolean): Iterator[InternalRow] = {
+ lazy val skeletonFile = if (skeletonReaderAppend) {
+ createPartitionedFile(partitionValues, hoodieBaseFile.getHadoopPath, 0,
hoodieBaseFile.getFileLen)
+ } else {
+ createPartitionedFile(InternalRow.empty, hoodieBaseFile.getHadoopPath,
0, hoodieBaseFile.getFileLen)
+ }
+
+ lazy val dataFile = if (bootstrapBaseAppend) {
+ createPartitionedFile(partitionValues, bootstrapBaseFile.getHadoopPath,
0, bootstrapBaseFile.getFileLen)
+ } else {
+ createPartitionedFile(InternalRow.empty,
bootstrapBaseFile.getHadoopPath, 0, bootstrapBaseFile.getFileLen)
+ }
+
+ lazy val skeletonIterator = skeletonReader(skeletonFile)
+ lazy val dataFileIterator = bootstrapBaseReader(dataFile)
+
+ (needMetaCols, needDataCols) match {
+ case (true, true) => doBootstrapMerge(skeletonIterator, dataFileIterator)
+ case (true, false) => skeletonIterator
+ case (false, true) => dataFileIterator
+ case (false, false) => throw new IllegalStateException("should not be
here if only partition cols are required")
+ }
+ }
+
+ /**
+ * Merge skeleton and data file iterators
+ */
+ protected def doBootstrapMerge(skeletonFileIterator: Iterator[Any],
dataFileIterator: Iterator[Any]): Iterator[InternalRow] = {
+ new Iterator[Any] {
+ val combinedRow = new JoinedRow()
+
+ override def hasNext: Boolean = {
+ checkState(dataFileIterator.hasNext == skeletonFileIterator.hasNext,
+ "Bootstrap data-file iterator and skeleton-file iterator have to be
in-sync!")
+ dataFileIterator.hasNext && skeletonFileIterator.hasNext
+ }
+
+ override def next(): Any = {
+ (skeletonFileIterator.next(), dataFileIterator.next()) match {
+ case (s: ColumnarBatch, d: ColumnarBatch) =>
+ val numCols = s.numCols() + d.numCols()
+ val vecs: Array[ColumnVector] = new Array[ColumnVector](numCols)
+ for (i <- 0 until numCols) {
+ if (i < s.numCols()) {
+ vecs(i) = s.column(i)
+ } else {
+ vecs(i) = d.column(i - s.numCols())
+ }
+ }
+ assert(s.numRows() == d.numRows())
+ sparkAdapter.makeColumnarBatch(vecs, s.numRows())
+ case (_: ColumnarBatch, _: InternalRow) => throw new
IllegalStateException("InternalRow ColumnVector mismatch")
+ case (_: InternalRow, _: ColumnarBatch) => throw new
IllegalStateException("InternalRow ColumnVector mismatch")
+ case (s: InternalRow, d: InternalRow) => combinedRow(s, d)
+ }
+ }
+ }.asInstanceOf[Iterator[InternalRow]]
+ }
+
+ /**
+ * Create iterator for a file slice that has log files
+ */
+ protected def buildMergeOnReadIterator(iter: Iterator[InternalRow],
logFiles: List[HoodieLogFile],
+ partitionPath: Path, inputSchema: StructType,
requiredSchemaWithMandatory: StructType,
+ outputSchema: StructType, partitionSchema:
StructType, partitionValues: InternalRow,
+ hadoopConf: Configuration):
Iterator[InternalRow] = {
+
+ val requiredAvroSchema =
HoodieBaseRelation.convertToAvroSchema(requiredSchemaWithMandatory, tableName)
+ val morIterator = mergeType match {
+ case REALTIME_SKIP_MERGE_OPT_VAL => throw new
UnsupportedOperationException("Skip merge is not currently " +
+ "implemented for the New Hudi Parquet File format")
+ //new SkipMergeIterator(logFiles, partitionPath, iter, inputSchema,
tableSchema.value,
+ // requiredSchemaWithMandatory, requiredAvroSchema, tableState.value,
hadoopConf)
+ case REALTIME_PAYLOAD_COMBINE_OPT_VAL =>
+ new RecordMergingFileIterator(logFiles, partitionPath, iter,
inputSchema, tableSchema.value,
+ requiredSchemaWithMandatory, requiredAvroSchema, tableState.value,
hadoopConf)
+ }
+ appendPartitionAndProject(morIterator, requiredSchemaWithMandatory,
partitionSchema,
+ outputSchema, partitionValues)
+ }
+
+ /**
+ * Append partition values to rows and project to output schema
+ */
+ protected def appendPartitionAndProject(iter: Iterator[InternalRow],
+ inputSchema: StructType,
+ partitionSchema: StructType,
+ to: StructType,
+ partitionValues: InternalRow):
Iterator[InternalRow] = {
+ if (partitionSchema.isEmpty) {
+ projectSchema(iter, inputSchema, to)
+ } else {
+ val unsafeProjection =
generateUnsafeProjection(StructType(inputSchema.fields ++
partitionSchema.fields), to)
+ val joinedRow = new JoinedRow()
+ iter.map(d => unsafeProjection(joinedRow(d, partitionValues)))
+ }
+ }
+
+ protected def projectSchema(iter: Iterator[InternalRow],
+ from: StructType,
+ to: StructType): Iterator[InternalRow] = {
+ val unsafeProjection = generateUnsafeProjection(from, to)
+ iter.map(d => unsafeProjection(d))
+ }
+
+ protected def getLogFilesFromSlice(fileSlice: FileSlice):
List[HoodieLogFile] = {
+
fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toList
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index 1f25493e5e9..3c2d41aa582 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -265,6 +265,9 @@ object HoodieAnalysis extends SparkAdapterSupport {
case ut @ UpdateTable(relation @ ResolvesToHudiTable(_), _, _) =>
ut.copy(table = relation)
+
+ case logicalPlan: LogicalPlan if logicalPlan.resolved =>
+
sparkAdapter.getCatalystPlanUtils.applyNewHoodieParquetFileFormatProjection(logicalPlan)
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
index bbce1c61f0f..f57be60461a 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
@@ -18,86 +18,27 @@
package org.apache.hudi.functional;
-import org.apache.hudi.DataSourceWriteOptions;
-import org.apache.hudi.client.bootstrap.selector.BootstrapRegexModeSelector;
-import
org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector;
-import
org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector;
-import
org.apache.hudi.client.bootstrap.translator.DecodedBootstrapPartitionPathTranslator;
import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
-import org.apache.hudi.config.HoodieBootstrapConfig;
-import org.apache.hudi.config.HoodieCompactionConfig;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.keygen.ComplexKeyGenerator;
-import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
-import org.apache.hudi.keygen.SimpleKeyGenerator;
-import org.apache.hudi.testutils.HoodieSparkClientTestBase;
-import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.functions;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
-import static
org.apache.hudi.common.testutils.RawTripTestPayload.recordToString;
-import static org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY;
-import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* Tests different layouts for bootstrap base path
*/
@Tag("functional")
-public class TestBootstrapRead extends HoodieSparkClientTestBase {
-
- @TempDir
- public java.nio.file.Path tmpFolder;
- protected String bootstrapBasePath = null;
- protected String bootstrapTargetPath = null;
- protected String hudiBasePath = null;
-
- protected static int nInserts = 100;
- protected static int nUpdates = 20;
- protected static String[] dashPartitionPaths = {"2016-03-14","2016-03-15",
"2015-03-16", "2015-03-17"};
- protected static String[] slashPartitionPaths = {"2016/03/15", "2015/03/16",
"2015/03/17"};
- protected String bootstrapType;
- protected Boolean dashPartitions;
- protected HoodieTableType tableType;
- protected Integer nPartitions;
-
- protected String[] partitionCols;
- protected static String[] dropColumns = {"_hoodie_commit_time",
"_hoodie_commit_seqno", "_hoodie_record_key", "_hoodie_file_name",
"city_to_state"};
-
- @BeforeEach
- public void setUp() throws Exception {
- bootstrapBasePath = tmpFolder.toAbsolutePath() + "/bootstrapBasePath";
- hudiBasePath = tmpFolder.toAbsolutePath() + "/hudiBasePath";
- bootstrapTargetPath = tmpFolder.toAbsolutePath() + "/bootstrapTargetPath";
- initSparkContexts();
- }
-
- @AfterEach
- public void tearDown() throws IOException {
- cleanupSparkContexts();
- cleanupClients();
- cleanupTestDataGenerator();
- }
-
+public class TestBootstrapRead extends TestBootstrapReadBase {
private static Stream<Arguments> testArgs() {
Stream.Builder<Arguments> b = Stream.builder();
String[] bootstrapType = {"full", "metadata", "mixed"};
@@ -149,193 +90,4 @@ public class TestBootstrapRead extends
HoodieSparkClientTestBase {
compareTables();
verifyMetaColOnlyRead(2);
}
-
- protected Map<String, String> basicOptions() {
- Map<String, String> options = new HashMap<>();
- options.put(DataSourceWriteOptions.TABLE_TYPE().key(), tableType.name());
- options.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING().key(),
"true");
- options.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key");
- if (nPartitions == 0) {
- options.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(),
NonpartitionedKeyGenerator.class.getName());
- } else {
- options.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(),
String.join(",", partitionCols));
- if (nPartitions == 1) {
- options.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(),
SimpleKeyGenerator.class.getName());
- } else {
- options.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(),
ComplexKeyGenerator.class.getName());
- }
- }
- options.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "timestamp");
- if (tableType.equals(MERGE_ON_READ)) {
- options.put(HoodieCompactionConfig.INLINE_COMPACT.key(), "true");
- }
- options.put(HoodieWriteConfig.TBL_NAME.key(), "test");
- return options;
- }
-
- protected Map<String, String> setBootstrapOptions() {
- Map<String, String> options = basicOptions();
- options.put(DataSourceWriteOptions.OPERATION().key(),
DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL());
- options.put(HoodieBootstrapConfig.BASE_PATH.key(), bootstrapBasePath);
- if (!dashPartitions) {
-
options.put(HoodieBootstrapConfig.PARTITION_PATH_TRANSLATOR_CLASS_NAME.key(),
DecodedBootstrapPartitionPathTranslator.class.getName());
- }
- switch (bootstrapType) {
- case "metadata":
- options.put(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key(),
MetadataOnlyBootstrapModeSelector.class.getName());
- break;
- case "full":
- options.put(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key(),
FullRecordBootstrapModeSelector.class.getName());
- break;
- case "mixed":
- options.put(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key(),
BootstrapRegexModeSelector.class.getName());
- String regexPattern;
- if (dashPartitions) {
- regexPattern = "partition_path=2015-03-1[5-7]";
- } else {
- regexPattern = "partition_path=2015%2F03%2F1[5-7]";
- }
- if (nPartitions > 1) {
- regexPattern = regexPattern + "\\/.*";
- }
-
options.put(HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_PATTERN.key(),
regexPattern);
- break;
- default:
- throw new RuntimeException();
- }
- return options;
- }
-
- protected void doUpdate(Map<String,String> options, String instantTime) {
- Dataset<Row> updates = generateTestUpdates(instantTime, nUpdates);
- doUpsert(options, updates);
- }
-
- protected void doInsert(Map<String,String> options, String instantTime) {
- Dataset<Row> inserts = generateTestInserts(instantTime, nUpdates);
- doUpsert(options, inserts);
- }
-
- protected void doUpsert(Map<String,String> options, Dataset<Row> df) {
- String nCompactCommits = "3";
- df.write().format("hudi")
- .options(options)
- .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(),
nCompactCommits)
- .mode(SaveMode.Append)
- .save(hudiBasePath);
- if (bootstrapType.equals("mixed")) {
- // mixed tables have a commit for each of the metadata and full
bootstrap modes
- // so to align with the regular hudi table, we need to compact after 4
commits instead of 3
- nCompactCommits = "4";
- }
- df.write().format("hudi")
- .options(options)
- .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(),
nCompactCommits)
- .mode(SaveMode.Append)
- .save(bootstrapTargetPath);
- }
-
- protected void compareTables() {
- Dataset<Row> hudiDf =
sparkSession.read().format("hudi").load(hudiBasePath);
- Dataset<Row> bootstrapDf =
sparkSession.read().format("hudi").load(bootstrapTargetPath);
- Dataset<Row> fastBootstrapDf =
sparkSession.read().format("hudi").option(DATA_QUERIES_ONLY.key(),
"true").load(bootstrapTargetPath);
- if (nPartitions == 0) {
- compareDf(hudiDf.drop(dropColumns), bootstrapDf.drop(dropColumns));
- if (tableType.equals(COPY_ON_WRITE)) {
- compareDf(fastBootstrapDf.drop("city_to_state"),
bootstrapDf.drop(dropColumns).drop("_hoodie_partition_path"));
- }
- return;
- }
- compareDf(hudiDf.drop(dropColumns).drop(partitionCols),
bootstrapDf.drop(dropColumns).drop(partitionCols));
- compareDf(hudiDf.select("_row_key",partitionCols),
bootstrapDf.select("_row_key",partitionCols));
- if (tableType.equals(COPY_ON_WRITE)) {
- compareDf(fastBootstrapDf.drop("city_to_state").drop(partitionCols),
bootstrapDf.drop(dropColumns).drop("_hoodie_partition_path").drop(partitionCols));
- compareDf(fastBootstrapDf.select("_row_key",partitionCols),
bootstrapDf.select("_row_key",partitionCols));
- }
- }
-
- protected void verifyMetaColOnlyRead(Integer iteration) {
- Dataset<Row> hudiDf =
sparkSession.read().format("hudi").load(hudiBasePath).select("_hoodie_commit_time",
"_hoodie_record_key");
- Dataset<Row> bootstrapDf =
sparkSession.read().format("hudi").load(bootstrapTargetPath).select("_hoodie_commit_time",
"_hoodie_record_key");
- hudiDf.show(100,false);
- bootstrapDf.show(100,false);
- if (iteration > 0) {
- assertEquals(sparkSession.sql("select * from hudi_iteration_" +
(iteration - 1)).intersect(hudiDf).count(),
- sparkSession.sql("select * from bootstrap_iteration_" + (iteration -
1)).intersect(bootstrapDf).count());
- }
- hudiDf.createOrReplaceTempView("hudi_iteration_" + iteration);
- bootstrapDf.createOrReplaceTempView("bootstrap_iteration_" + iteration);
- }
-
- protected void compareDf(Dataset<Row> df1, Dataset<Row> df2) {
- assertEquals(0, df1.except(df2).count());
- assertEquals(0, df2.except(df1).count());
- }
-
- protected void setupDirs() {
- dataGen = new HoodieTestDataGenerator(dashPartitions ? dashPartitionPaths
: slashPartitionPaths);
- Dataset<Row> inserts = generateTestInserts("000", nInserts);
- if (dashPartitions) {
- //test adding a partition to the table
- inserts = inserts.filter("partition_path != '2016-03-14'");
- }
- if (nPartitions > 0) {
- partitionCols = new String[nPartitions];
- partitionCols[0] = "partition_path";
- for (int i = 1; i < partitionCols.length; i++) {
- partitionCols[i] = "partpath" + (i + 1);
- }
- inserts.write().partitionBy(partitionCols).save(bootstrapBasePath);
- } else {
- inserts.write().save(bootstrapBasePath);
- }
-
- inserts.write().format("hudi")
- .options(basicOptions())
- .mode(SaveMode.Overwrite)
- .save(hudiBasePath);
- }
-
- protected Dataset<Row> makeInsertDf(String instantTime, Integer n) {
- List<String> records = dataGen.generateInserts(instantTime, n).stream()
- .map(r -> recordToString(r).get()).collect(Collectors.toList());
- JavaRDD<String> rdd = jsc.parallelize(records);
- return sparkSession.read().json(rdd);
- }
-
- protected Dataset<Row> generateTestInserts(String instantTime, Integer n) {
- return addPartitionColumns(makeInsertDf(instantTime, n), nPartitions);
- }
-
- protected Dataset<Row> makeUpdateDf(String instantTime, Integer n) {
- try {
- List<String> records = dataGen.generateUpdates(instantTime, n).stream()
- .map(r -> recordToString(r).get()).collect(Collectors.toList());
- JavaRDD<String> rdd = jsc.parallelize(records);
- return sparkSession.read().json(rdd);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- protected Dataset<Row> generateTestUpdates(String instantTime, Integer n) {
- return addPartitionColumns(makeUpdateDf(instantTime, n), nPartitions);
- }
-
- private static Dataset<Row> addPartitionColumns(Dataset<Row> df, Integer
nPartitions) {
- if (nPartitions < 2) {
- return df;
- }
- for (int i = 2; i <= nPartitions; i++) {
- df = applyPartition(df, i);
- }
- return df;
- }
-
- private static Dataset<Row> applyPartition(Dataset<Row> df, Integer n) {
- return df.withColumn("partpath" + n,
- functions.md5(functions.concat_ws("," + n + ",",
- df.col("partition_path"),
- functions.hash(df.col("_row_key")).mod(n))));
- }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapReadBase.java
similarity index 84%
copy from
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
copy to
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapReadBase.java
index bbce1c61f0f..d3246f7c4da 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapReadBase.java
@@ -42,28 +42,22 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.io.TempDir;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
+import static
org.apache.hudi.DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT;
import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
import static
org.apache.hudi.common.testutils.RawTripTestPayload.recordToString;
import static org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY;
import static org.junit.jupiter.api.Assertions.assertEquals;
-/**
- * Tests different layouts for bootstrap base path
- */
@Tag("functional")
-public class TestBootstrapRead extends HoodieSparkClientTestBase {
+public abstract class TestBootstrapReadBase extends HoodieSparkClientTestBase {
@TempDir
public java.nio.file.Path tmpFolder;
@@ -98,58 +92,6 @@ public class TestBootstrapRead extends
HoodieSparkClientTestBase {
cleanupTestDataGenerator();
}
- private static Stream<Arguments> testArgs() {
- Stream.Builder<Arguments> b = Stream.builder();
- String[] bootstrapType = {"full", "metadata", "mixed"};
- Boolean[] dashPartitions = {true,false};
- HoodieTableType[] tableType = {COPY_ON_WRITE, MERGE_ON_READ};
- Integer[] nPartitions = {0, 1, 2};
- for (HoodieTableType tt : tableType) {
- for (Boolean dash : dashPartitions) {
- for (String bt : bootstrapType) {
- for (Integer n : nPartitions) {
- // can't be mixed bootstrap if it's nonpartitioned
- // don't need to test slash partitions if it's nonpartitioned
- if ((!bt.equals("mixed") && dash) || n > 0) {
- b.add(Arguments.of(bt, dash, tt, n));
- }
- }
- }
- }
- }
- return b.build();
- }
-
- @ParameterizedTest
- @MethodSource("testArgs")
- public void runTests(String bootstrapType, Boolean dashPartitions,
HoodieTableType tableType, Integer nPartitions) {
- this.bootstrapType = bootstrapType;
- this.dashPartitions = dashPartitions;
- this.tableType = tableType;
- this.nPartitions = nPartitions;
- setupDirs();
-
- // do bootstrap
- Map<String, String> options = setBootstrapOptions();
- Dataset<Row> bootstrapDf = sparkSession.emptyDataFrame();
- bootstrapDf.write().format("hudi")
- .options(options)
- .mode(SaveMode.Overwrite)
- .save(bootstrapTargetPath);
- compareTables();
- verifyMetaColOnlyRead(0);
-
- // do upserts
- options = basicOptions();
- doUpdate(options, "001");
- compareTables();
- verifyMetaColOnlyRead(1);
-
- doInsert(options, "002");
- compareTables();
- verifyMetaColOnlyRead(2);
- }
-
protected Map<String, String> basicOptions() {
Map<String, String> options = new HashMap<>();
options.put(DataSourceWriteOptions.TABLE_TYPE().key(), tableType.name());
@@ -216,6 +158,11 @@ public class TestBootstrapRead extends
HoodieSparkClientTestBase {
doUpsert(options, inserts);
}
+ protected void doDelete(Map<String,String> options, String instantTime) {
+ Dataset<Row> deletes = generateTestDeletes(instantTime, nUpdates);
+ doUpsert(options, deletes);
+ }
+
protected void doUpsert(Map<String,String> options, Dataset<Row> df) {
String nCompactCommits = "3";
df.write().format("hudi")
@@ -239,16 +186,17 @@ public class TestBootstrapRead extends
HoodieSparkClientTestBase {
Dataset<Row> hudiDf =
sparkSession.read().format("hudi").load(hudiBasePath);
Dataset<Row> bootstrapDf =
sparkSession.read().format("hudi").load(bootstrapTargetPath);
Dataset<Row> fastBootstrapDf =
sparkSession.read().format("hudi").option(DATA_QUERIES_ONLY.key(),
"true").load(bootstrapTargetPath);
+ boolean shouldTestFastBootstrap = tableType.equals(COPY_ON_WRITE) &&
!Boolean.parseBoolean(USE_NEW_HUDI_PARQUET_FILE_FORMAT().defaultValue());
if (nPartitions == 0) {
compareDf(hudiDf.drop(dropColumns), bootstrapDf.drop(dropColumns));
- if (tableType.equals(COPY_ON_WRITE)) {
+ if (shouldTestFastBootstrap) {
compareDf(fastBootstrapDf.drop("city_to_state"),
bootstrapDf.drop(dropColumns).drop("_hoodie_partition_path"));
}
return;
}
compareDf(hudiDf.drop(dropColumns).drop(partitionCols),
bootstrapDf.drop(dropColumns).drop(partitionCols));
compareDf(hudiDf.select("_row_key",partitionCols),
bootstrapDf.select("_row_key",partitionCols));
- if (tableType.equals(COPY_ON_WRITE)) {
+ if (shouldTestFastBootstrap) {
compareDf(fastBootstrapDf.drop("city_to_state").drop(partitionCols),
bootstrapDf.drop(dropColumns).drop("_hoodie_partition_path").drop(partitionCols));
compareDf(fastBootstrapDf.select("_row_key",partitionCols),
bootstrapDf.select("_row_key",partitionCols));
}
@@ -296,6 +244,17 @@ public class TestBootstrapRead extends
HoodieSparkClientTestBase {
.save(hudiBasePath);
}
+ protected Dataset<Row> makeDeleteDf(String instantTime, Integer n) {
+ List<String> records = dataGen.generateUniqueDeleteRecords(instantTime,
n).stream()
+ .map(r -> recordToString(r).get()).collect(Collectors.toList());
+ JavaRDD<String> rdd = jsc.parallelize(records);
+ return sparkSession.read().json(rdd);
+ }
+
+ protected Dataset<Row> generateTestDeletes(String instantTime, Integer n) {
+ return addPartitionColumns(makeDeleteDf(instantTime, n), nPartitions);
+ }
+
protected Dataset<Row> makeInsertDf(String instantTime, Integer n) {
List<String> records = dataGen.generateInserts(instantTime, n).stream()
.map(r -> recordToString(r).get()).collect(Collectors.toList());
@@ -322,7 +281,7 @@ public class TestBootstrapRead extends
HoodieSparkClientTestBase {
return addPartitionColumns(makeUpdateDf(instantTime, n), nPartitions);
}
- private static Dataset<Row> addPartitionColumns(Dataset<Row> df, Integer
nPartitions) {
+ protected static Dataset<Row> addPartitionColumns(Dataset<Row> df, Integer
nPartitions) {
if (nPartitions < 2) {
return df;
}
@@ -332,7 +291,7 @@ public class TestBootstrapRead extends
HoodieSparkClientTestBase {
return df;
}
- private static Dataset<Row> applyPartition(Dataset<Row> df, Integer n) {
+ protected static Dataset<Row> applyPartition(Dataset<Row> df, Integer n) {
return df.withColumn("partpath" + n,
functions.md5(functions.concat_ws("," + n + ",",
df.col("partition_path"),
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
new file mode 100644
index 00000000000..ef6814f21c5
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional;
+
+import org.apache.hudi.DataSourceReadOptions;
+import org.apache.hudi.common.model.HoodieTableType;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
+import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag("functional")
+public class TestNewHoodieParquetFileFormat extends TestBootstrapReadBase {
+
+ private static Stream<Arguments> testArgs() {
+ Stream.Builder<Arguments> b = Stream.builder();
+ HoodieTableType[] tableType = {COPY_ON_WRITE, MERGE_ON_READ};
+ Integer[] nPartitions = {0, 1, 2};
+ for (HoodieTableType tt : tableType) {
+ for (Integer n : nPartitions) {
+ b.add(Arguments.of(tt, n));
+ }
+ }
+ return b.build();
+ }
+
+ @ParameterizedTest
+ @MethodSource("testArgs")
+ public void runTests(HoodieTableType tableType, Integer nPartitions) {
+ this.bootstrapType = nPartitions == 0 ? "metadata" : "mixed";
+ this.dashPartitions = true;
+ this.tableType = tableType;
+ this.nPartitions = nPartitions;
+ setupDirs();
+
+ //do bootstrap
+ Map<String, String> options = setBootstrapOptions();
+ Dataset<Row> bootstrapDf = sparkSession.emptyDataFrame();
+ bootstrapDf.write().format("hudi")
+ .options(options)
+ .mode(SaveMode.Overwrite)
+ .save(bootstrapTargetPath);
+ runComparisons();
+
+ options = basicOptions();
+ doUpdate(options, "001");
+ runComparisons();
+
+ doInsert(options, "002");
+ runComparisons();
+
+ doDelete(options, "003");
+ runComparisons();
+ }
+
+ protected void runComparisons() {
+ if (tableType.equals(MERGE_ON_READ)) {
+ runComparison(hudiBasePath);
+ }
+ runComparison(bootstrapTargetPath);
+ }
+
+ protected void runComparison(String tableBasePath) {
+ testCount(tableBasePath);
+ runIndividualComparison(tableBasePath);
+ runIndividualComparison(tableBasePath, "partition_path");
+ runIndividualComparison(tableBasePath, "_hoodie_record_key",
"_hoodie_commit_time", "_hoodie_partition_path");
+ runIndividualComparison(tableBasePath, "_hoodie_commit_time",
"_hoodie_commit_seqno");
+ runIndividualComparison(tableBasePath, "_hoodie_commit_time",
"_hoodie_commit_seqno", "partition_path");
+ runIndividualComparison(tableBasePath, "_row_key", "_hoodie_commit_seqno",
"_hoodie_record_key", "_hoodie_partition_path");
+ runIndividualComparison(tableBasePath, "_row_key", "partition_path",
"_hoodie_is_deleted", "begin_lon");
+ }
+
+ protected void testCount(String tableBasePath) {
+ Dataset<Row> legacyDf = sparkSession.read().format("hudi")
+
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(),
"false").load(tableBasePath);
+ Dataset<Row> fileFormatDf = sparkSession.read().format("hudi")
+
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(),
"true").load(tableBasePath);
+ assertEquals(legacyDf.count(), fileFormatDf.count());
+ }
+
+ protected scala.collection.Seq<String> seq(String... a) {
+ return
scala.collection.JavaConverters.asScalaBufferConverter(Arrays.asList(a)).asScala().toSeq();
+ }
+
+ protected void runIndividualComparison(String tableBasePath) {
+ runIndividualComparison(tableBasePath, "");
+ }
+
+ protected void runIndividualComparison(String tableBasePath, String
firstColumn, String... columns) {
+ Dataset<Row> legacyDf = sparkSession.read().format("hudi")
+
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(),
"false").load(tableBasePath);
+ Dataset<Row> fileFormatDf = sparkSession.read().format("hudi")
+
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(),
"true").load(tableBasePath);
+ if (firstColumn.isEmpty()) {
+ //df.except(df) does not work with map type cols
+ legacyDf = legacyDf.drop("city_to_state");
+ fileFormatDf = fileFormatDf.drop("city_to_state");
+ } else {
+ if (columns.length > 0) {
+ legacyDf = legacyDf.select(firstColumn, columns);
+ fileFormatDf = fileFormatDf.select(firstColumn, columns);
+ } else {
+ legacyDf = legacyDf.select(firstColumn);
+ fileFormatDf = fileFormatDf.select(firstColumn);
+ }
+ }
+ compareDf(legacyDf, fileFormatDf);
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
index 9f551a0bf1d..cdb4c5226a6 100644
---
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
@@ -17,13 +17,17 @@
package org.apache.spark.sql
+import org.apache.hudi.SparkHoodieTableFileIndex
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.optimizer.SimplifyCasts
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.JoinType
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join,
LogicalPlan, MergeIntoTable}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join,
LogicalPlan, MergeIntoTable, Project}
import
org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand,
ExplainCommand}
+import
org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
LogicalRelation}
import org.apache.spark.sql.internal.SQLConf
object HoodieSpark2CatalystPlanUtils extends HoodieCatalystPlansUtils {
@@ -84,4 +88,14 @@ object HoodieSpark2CatalystPlanUtils extends
HoodieCatalystPlansUtils {
override def createMITJoin(left: LogicalPlan, right: LogicalPlan, joinType:
JoinType, condition: Option[Expression], hint: String): LogicalPlan = {
Join(left, right, joinType, condition)
}
+
+ override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan):
LogicalPlan = {
+ plan match {
+ case p@PhysicalOperation(_, _,
+ l@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if
fs.fileFormat.isInstanceOf[NewHoodieParquetFileFormat] &&
!fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =>
+ fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =
true
+
Project(l.resolve(fs.location.asInstanceOf[SparkHoodieTableFileIndex].schema,
fs.sparkSession.sessionState.analyzer.resolver), p)
+ case _ => plan
+ }
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
index cc2e25f9891..ec275a1d3fd 100644
---
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
@@ -33,13 +33,14 @@ import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Command, DeleteFromTable,
Join, LogicalPlan}
import org.apache.spark.sql.catalyst.util.DateFormatter
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark24HoodieParquetFileFormat}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark24LegacyHoodieParquetFileFormat}
import org.apache.spark.sql.execution.vectorized.MutableColumnarRow
import org.apache.spark.sql.hudi.SparkAdapter
import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser
import org.apache.spark.sql.parser.HoodieExtendedParserInterface
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder,
StructType}
+import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel._
@@ -143,8 +144,8 @@ class Spark2Adapter extends SparkAdapter {
partitions.toSeq
}
- override def createHoodieParquetFileFormat(appendPartitionValues: Boolean):
Option[ParquetFileFormat] = {
- Some(new Spark24HoodieParquetFileFormat(appendPartitionValues))
+ override def createLegacyHoodieParquetFileFormat(appendPartitionValues:
Boolean): Option[ParquetFileFormat] = {
+ Some(new Spark24LegacyHoodieParquetFileFormat(appendPartitionValues))
}
override def createInterpretedPredicate(e: Expression): InterpretedPredicate
= {
@@ -200,4 +201,10 @@ class Spark2Adapter extends SparkAdapter {
DataSourceStrategy.translateFilter(predicate)
}
+
+ override def makeColumnarBatch(vectors: Array[ColumnVector], numRows: Int):
ColumnarBatch = {
+ val batch = new ColumnarBatch(vectors)
+ batch.setNumRows(numRows)
+ batch
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24LegacyHoodieParquetFileFormat.scala
similarity index 99%
rename from
hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala
rename to
hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24LegacyHoodieParquetFileFormat.scala
index c168911302e..a6a4ab943a1 100644
---
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24LegacyHoodieParquetFileFormat.scala
@@ -50,7 +50,7 @@ import java.net.URI
* <li>Avoiding appending partition values to the rows read from the data
file</li>
* </ol>
*/
-class Spark24HoodieParquetFileFormat(private val shouldAppendPartitionValues:
Boolean) extends ParquetFileFormat {
+class Spark24LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
override def buildReaderWithPartitionValues(sparkSession: SparkSession,
dataSchema: StructType,
diff --git
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
index ce9499ae7d2..b2a9a529511 100644
---
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.hudi.SparkAdapter
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.{HoodieSpark3CatalogUtils, SQLContext,
SparkSession}
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
import org.apache.spark.storage.StorageLevel
import java.time.ZoneId
@@ -97,4 +98,8 @@ abstract class BaseSpark3Adapter extends SparkAdapter with
Logging {
supportNestedPredicatePushdown: Boolean =
false): Option[Filter] = {
DataSourceStrategy.translateFilter(predicate,
supportNestedPredicatePushdown)
}
+
+ override def makeColumnarBatch(vectors: Array[ColumnVector], numRows: Int):
ColumnarBatch = {
+ new ColumnarBatch(vectors, numRows)
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
index aefc6af7e3e..e9757c821d9 100644
---
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
@@ -18,12 +18,16 @@
package org.apache.spark.sql
+import org.apache.hudi.SparkHoodieTableFileIndex
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.ResolvedTable
import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression,
ProjectionOverSchema}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
MergeIntoTable}
+import org.apache.spark.sql.catalyst.planning.ScanOperation
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
MergeIntoTable, Project}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import
org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
+import
org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
LogicalRelation}
import org.apache.spark.sql.types.StructType
object HoodieSpark30CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {
@@ -42,6 +46,16 @@ object HoodieSpark30CatalystPlanUtils extends
HoodieSpark3CatalystPlanUtils {
}
}
+ override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan):
LogicalPlan = {
+ plan match {
+ case s@ScanOperation(_, _,
+ l@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if
fs.fileFormat.isInstanceOf[NewHoodieParquetFileFormat] &&
!fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =>
+ fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =
true
+
Project(l.resolve(fs.location.asInstanceOf[SparkHoodieTableFileIndex].schema,
fs.sparkSession.sessionState.analyzer.resolver), s)
+ case _ => plan
+ }
+ }
+
override def projectOverSchema(schema: StructType, output: AttributeSet):
ProjectionOverSchema = ProjectionOverSchema(schema)
override def isRepairTable(plan: LogicalPlan): Boolean = {
diff --git
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_0Adapter.scala
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_0Adapter.scala
index 5abd46948eb..22a9f090fb3 100644
---
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_0Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_0Adapter.scala
@@ -29,7 +29,7 @@ import
org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark30HoodieParquetFileFormat}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark30LegacyHoodieParquetFileFormat}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD,
HoodieSpark30PartitionedFileUtils, HoodieSparkPartitionedFileUtils,
PartitionedFile}
import org.apache.spark.sql.hudi.SparkAdapter
@@ -84,8 +84,8 @@ class Spark3_0Adapter extends BaseSpark3Adapter {
override def createExtendedSparkParser(spark: SparkSession, delegate:
ParserInterface): HoodieExtendedParserInterface =
new HoodieSpark3_0ExtendedSqlParser(spark, delegate)
- override def createHoodieParquetFileFormat(appendPartitionValues: Boolean):
Option[ParquetFileFormat] = {
- Some(new Spark30HoodieParquetFileFormat(appendPartitionValues))
+ override def createLegacyHoodieParquetFileFormat(appendPartitionValues:
Boolean): Option[ParquetFileFormat] = {
+ Some(new Spark30LegacyHoodieParquetFileFormat(appendPartitionValues))
}
override def createHoodieFileScanRDD(sparkSession: SparkSession,
diff --git
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30HoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30LegacyHoodieParquetFileFormat.scala
similarity index 98%
rename from
hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30HoodieParquetFileFormat.scala
rename to
hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30LegacyHoodieParquetFileFormat.scala
index 4c33ac89677..de0be0db04b 100644
---
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30HoodieParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30LegacyHoodieParquetFileFormat.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import
org.apache.spark.sql.execution.datasources.parquet.Spark30HoodieParquetFileFormat.{createParquetFilters,
pruneInternalSchema, rebuildFilterFromParquet}
+import
org.apache.spark.sql.execution.datasources.parquet.Spark30LegacyHoodieParquetFileFormat.{createParquetFilters,
pruneInternalSchema, rebuildFilterFromParquet}
import org.apache.spark.sql.execution.datasources.{DataSourceUtils,
PartitionedFile, RecordReaderIterator}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
@@ -61,7 +61,7 @@ import java.net.URI
* <li>Schema on-read</li>
* </ol>
*/
-class Spark30HoodieParquetFileFormat(private val shouldAppendPartitionValues:
Boolean) extends ParquetFileFormat {
+class Spark30LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
override def buildReaderWithPartitionValues(sparkSession: SparkSession,
dataSchema: StructType,
@@ -336,7 +336,7 @@ class Spark30HoodieParquetFileFormat(private val
shouldAppendPartitionValues: Bo
}
}
-object Spark30HoodieParquetFileFormat {
+object Spark30LegacyHoodieParquetFileFormat {
def pruneInternalSchema(internalSchemaStr: String, requiredSchema:
StructType): String = {
val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
diff --git
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
index 2fcc88d4b37..df94529ce12 100644
---
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
@@ -18,12 +18,16 @@
package org.apache.spark.sql
+import org.apache.hudi.SparkHoodieTableFileIndex
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.ResolvedTable
import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression,
ProjectionOverSchema}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
MergeIntoTable}
+import org.apache.spark.sql.catalyst.planning.ScanOperation
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
MergeIntoTable, Project}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import
org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
+import
org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
LogicalRelation}
import org.apache.spark.sql.types.StructType
object HoodieSpark31CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {
@@ -42,6 +46,16 @@ object HoodieSpark31CatalystPlanUtils extends
HoodieSpark3CatalystPlanUtils {
}
}
+ override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan):
LogicalPlan = {
+ plan match {
+ case s@ScanOperation(_, _,
+ l@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if
fs.fileFormat.isInstanceOf[NewHoodieParquetFileFormat] &&
!fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =>
+ fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =
true
+
Project(l.resolve(fs.location.asInstanceOf[SparkHoodieTableFileIndex].schema,
fs.sparkSession.sessionState.analyzer.resolver), s)
+ case _ => plan
+ }
+ }
+
override def projectOverSchema(schema: StructType, output: AttributeSet):
ProjectionOverSchema = ProjectionOverSchema(schema)
override def isRepairTable(plan: LogicalPlan): Boolean = {
diff --git
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
index 547309c0e10..8ca072333d0 100644
---
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.connector.catalog.V2TableWithV1Fallback
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark31HoodieParquetFileFormat}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark31LegacyHoodieParquetFileFormat}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD,
HoodieSpark31PartitionedFileUtils, HoodieSparkPartitionedFileUtils,
PartitionedFile}
import org.apache.spark.sql.hudi.SparkAdapter
@@ -85,8 +85,8 @@ class Spark3_1Adapter extends BaseSpark3Adapter {
override def createExtendedSparkParser(spark: SparkSession, delegate:
ParserInterface): HoodieExtendedParserInterface =
new HoodieSpark3_1ExtendedSqlParser(spark, delegate)
- override def createHoodieParquetFileFormat(appendPartitionValues: Boolean):
Option[ParquetFileFormat] = {
- Some(new Spark31HoodieParquetFileFormat(appendPartitionValues))
+ override def createLegacyHoodieParquetFileFormat(appendPartitionValues:
Boolean): Option[ParquetFileFormat] = {
+ Some(new Spark31LegacyHoodieParquetFileFormat(appendPartitionValues))
}
override def createHoodieFileScanRDD(sparkSession: SparkSession,
diff --git
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31LegacyHoodieParquetFileFormat.scala
similarity index 98%
rename from
hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala
rename to
hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31LegacyHoodieParquetFileFormat.scala
index a90d36a02de..2d844007506 100644
---
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31LegacyHoodieParquetFileFormat.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import
org.apache.spark.sql.execution.datasources.parquet.Spark31HoodieParquetFileFormat.{createParquetFilters,
pruneInternalSchema, rebuildFilterFromParquet}
+import
org.apache.spark.sql.execution.datasources.parquet.Spark31LegacyHoodieParquetFileFormat.{createParquetFilters,
pruneInternalSchema, rebuildFilterFromParquet}
import org.apache.spark.sql.execution.datasources.{DataSourceUtils,
PartitionedFile, RecordReaderIterator}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
@@ -61,7 +61,7 @@ import java.net.URI
* <li>Schema on-read</li>
* </ol>
*/
-class Spark31HoodieParquetFileFormat(private val shouldAppendPartitionValues:
Boolean) extends ParquetFileFormat {
+class Spark31LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
override def buildReaderWithPartitionValues(sparkSession: SparkSession,
dataSchema: StructType,
@@ -343,7 +343,7 @@ class Spark31HoodieParquetFileFormat(private val
shouldAppendPartitionValues: Bo
}
}
-object Spark31HoodieParquetFileFormat {
+object Spark31LegacyHoodieParquetFileFormat {
def pruneInternalSchema(internalSchemaStr: String, requiredSchema:
StructType): String = {
val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
diff --git
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
index f76128d4f81..d4624625d75 100644
---
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
@@ -19,14 +19,18 @@
package org.apache.spark.sql
import org.apache.hudi.HoodieSparkUtils
+import org.apache.hudi.SparkHoodieTableFileIndex
import org.apache.hudi.common.util.ValidationUtils.checkArgument
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.AnalysisErrorAt
import org.apache.spark.sql.catalyst.analysis.ResolvedTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet,
Expression, ProjectionOverSchema}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
MergeIntoTable}
+import org.apache.spark.sql.catalyst.planning.ScanOperation
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
MergeIntoTable, Project}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.execution.command.RepairTableCommand
+import
org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
LogicalRelation}
import org.apache.spark.sql.types.StructType
object HoodieSpark32CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {
@@ -45,6 +49,16 @@ object HoodieSpark32CatalystPlanUtils extends
HoodieSpark3CatalystPlanUtils {
}
}
+ override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan):
LogicalPlan = {
+ plan match {
+ case s@ScanOperation(_, _,
+ l@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if
fs.fileFormat.isInstanceOf[NewHoodieParquetFileFormat] &&
!fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =>
+ fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =
true
+
Project(l.resolve(fs.location.asInstanceOf[SparkHoodieTableFileIndex].schema,
fs.sparkSession.sessionState.analyzer.resolver), s)
+ case _ => plan
+ }
+ }
+
override def projectOverSchema(schema: StructType, output: AttributeSet):
ProjectionOverSchema = {
val klass = classOf[ProjectionOverSchema]
checkArgument(klass.getConstructors.length == 1)
diff --git
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
index f07d0ccdc63..3a5812a5faa 100644
---
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
@@ -30,9 +30,9 @@ import
org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{Command, DeleteFromTable,
LogicalPlan}
import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY
import org.apache.spark.sql.connector.catalog.V2TableWithV1Fallback
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark32HoodieParquetFileFormat}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark32LegacyHoodieParquetFileFormat}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD,
HoodieSpark32PartitionedFileUtils, HoodieSparkPartitionedFileUtils,
PartitionedFile}
+import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hudi.analysis.TableValuedFunctions
import org.apache.spark.sql.parser.{HoodieExtendedParserInterface,
HoodieSpark3_2ExtendedSqlParser}
import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder,
StructType}
@@ -84,8 +84,8 @@ class Spark3_2Adapter extends BaseSpark3Adapter {
override def createExtendedSparkParser(spark: SparkSession, delegate:
ParserInterface): HoodieExtendedParserInterface =
new HoodieSpark3_2ExtendedSqlParser(spark, delegate)
- override def createHoodieParquetFileFormat(appendPartitionValues: Boolean):
Option[ParquetFileFormat] = {
- Some(new Spark32HoodieParquetFileFormat(appendPartitionValues))
+ override def createLegacyHoodieParquetFileFormat(appendPartitionValues:
Boolean): Option[ParquetFileFormat] = {
+ Some(new Spark32LegacyHoodieParquetFileFormat(appendPartitionValues))
}
override def createHoodieFileScanRDD(sparkSession: SparkSession,
diff --git
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32LegacyHoodieParquetFileFormat.scala
similarity index 98%
rename from
hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
rename to
hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32LegacyHoodieParquetFileFormat.scala
index f6eb5da13b5..c88c35b5eeb 100644
---
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32LegacyHoodieParquetFileFormat.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import
org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat._
+import
org.apache.spark.sql.execution.datasources.parquet.Spark32LegacyHoodieParquetFileFormat._
import org.apache.spark.sql.execution.datasources.{DataSourceUtils,
PartitionedFile, RecordReaderIterator}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
@@ -60,7 +60,7 @@ import java.net.URI
* <li>Schema on-read</li>
* </ol>
*/
-class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues:
Boolean) extends ParquetFileFormat {
+class Spark32LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
override def buildReaderWithPartitionValues(sparkSession: SparkSession,
dataSchema: StructType,
@@ -172,7 +172,7 @@ class Spark32HoodieParquetFileFormat(private val
shouldAppendPartitionValues: Bo
// and unfortunately won't compile against Spark 3.2.0
// However this code is runtime-compatible w/ both Spark 3.2.0
and >= Spark 3.2.1
val datetimeRebaseSpec =
-
DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get,
datetimeRebaseModeInRead)
+
DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get,
datetimeRebaseModeInRead)
new ParquetFilters(
parquetSchema,
pushDownDate,
@@ -271,7 +271,7 @@ class Spark32HoodieParquetFileFormat(private val
shouldAppendPartitionValues: Bo
// and unfortunately won't compile against Spark 3.2.0
// However this code is runtime-compatible w/ both Spark
3.2.0 and >= Spark 3.2.1
val int96RebaseSpec =
-
DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get,
int96RebaseModeInRead)
+
DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get,
int96RebaseModeInRead)
val datetimeRebaseSpec =
DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get,
datetimeRebaseModeInRead)
new VectorizedParquetRecordReader(
@@ -409,7 +409,7 @@ class Spark32HoodieParquetFileFormat(private val
shouldAppendPartitionValues: Bo
}
}
-object Spark32HoodieParquetFileFormat {
+object Spark32LegacyHoodieParquetFileFormat {
/**
* NOTE: This method is specific to Spark 3.2.0
@@ -514,4 +514,3 @@ object Spark32HoodieParquetFileFormat {
}
}
}
-
diff --git
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
index a294dcfdf4b..16f2517d128 100644
---
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
@@ -18,13 +18,17 @@
package org.apache.spark.sql
+import org.apache.hudi.SparkHoodieTableFileIndex
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.AnalysisErrorAt
import org.apache.spark.sql.catalyst.analysis.ResolvedTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet,
Expression, ProjectionOverSchema}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
MergeIntoTable}
+import org.apache.spark.sql.catalyst.planning.ScanOperation
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
MergeIntoTable, Project}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.execution.command.RepairTableCommand
+import
org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
LogicalRelation}
import org.apache.spark.sql.types.StructType
object HoodieSpark33CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {
@@ -43,6 +47,16 @@ object HoodieSpark33CatalystPlanUtils extends
HoodieSpark3CatalystPlanUtils {
}
}
+ override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan):
LogicalPlan = {
+ plan match {
+ case s@ScanOperation(_, _,
+ l@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if
fs.fileFormat.isInstanceOf[NewHoodieParquetFileFormat] &&
!fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =>
+ fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =
true
+
Project(l.resolve(fs.location.asInstanceOf[SparkHoodieTableFileIndex].schema,
fs.sparkSession.sessionState.analyzer.resolver), s)
+ case _ => plan
+ }
+ }
+
override def projectOverSchema(schema: StructType, output: AttributeSet):
ProjectionOverSchema =
ProjectionOverSchema(schema, output)
diff --git
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
index 651027f932d..e3d2cc9cd18 100644
---
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
@@ -30,9 +30,9 @@ import
org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY
import org.apache.spark.sql.connector.catalog.V2TableWithV1Fallback
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark33HoodieParquetFileFormat}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark33LegacyHoodieParquetFileFormat}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD,
HoodieSpark33PartitionedFileUtils, HoodieSparkPartitionedFileUtils,
PartitionedFile}
+import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hudi.analysis.TableValuedFunctions
import org.apache.spark.sql.parser.{HoodieExtendedParserInterface,
HoodieSpark3_3ExtendedSqlParser}
import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder,
StructType}
@@ -85,8 +85,8 @@ class Spark3_3Adapter extends BaseSpark3Adapter {
override def createExtendedSparkParser(spark: SparkSession, delegate:
ParserInterface): HoodieExtendedParserInterface =
new HoodieSpark3_3ExtendedSqlParser(spark, delegate)
- override def createHoodieParquetFileFormat(appendPartitionValues: Boolean):
Option[ParquetFileFormat] = {
- Some(new Spark33HoodieParquetFileFormat(appendPartitionValues))
+ override def createLegacyHoodieParquetFileFormat(appendPartitionValues:
Boolean): Option[ParquetFileFormat] = {
+ Some(new Spark33LegacyHoodieParquetFileFormat(appendPartitionValues))
}
override def createHoodieFileScanRDD(sparkSession: SparkSession,
diff --git
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33HoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala
similarity index 99%
rename from
hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33HoodieParquetFileFormat.scala
rename to
hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala
index 1e60f2ae968..de6cbff90ca 100644
---
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33HoodieParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala
@@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import
org.apache.spark.sql.execution.datasources.parquet.Spark33HoodieParquetFileFormat._
+import
org.apache.spark.sql.execution.datasources.parquet.Spark33LegacyHoodieParquetFileFormat._
import org.apache.spark.sql.execution.datasources.{DataSourceUtils,
PartitionedFile, RecordReaderIterator}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
@@ -62,7 +62,7 @@ import java.net.URI
* <li>Schema on-read</li>
* </ol>
*/
-class Spark33HoodieParquetFileFormat(private val shouldAppendPartitionValues:
Boolean) extends ParquetFileFormat {
+class Spark33LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
override def buildReaderWithPartitionValues(sparkSession: SparkSession,
dataSchema: StructType,
@@ -411,7 +411,7 @@ class Spark33HoodieParquetFileFormat(private val
shouldAppendPartitionValues: Bo
}
}
-object Spark33HoodieParquetFileFormat {
+object Spark33LegacyHoodieParquetFileFormat {
/**
* NOTE: This method is specific to Spark 3.2.0
diff --git
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
index 302fba45590..947a73285f5 100644
---
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
@@ -18,12 +18,16 @@
package org.apache.spark.sql
+import org.apache.hudi.SparkHoodieTableFileIndex
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{AnalysisErrorAt, ResolvedTable}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet,
Expression, ProjectionOverSchema}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
MergeIntoTable}
+import org.apache.spark.sql.catalyst.planning.ScanOperation
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
MergeIntoTable, Project}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.execution.command.RepairTableCommand
+import
org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
LogicalRelation}
import org.apache.spark.sql.types.StructType
object HoodieSpark34CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {
@@ -42,6 +46,16 @@ object HoodieSpark34CatalystPlanUtils extends
HoodieSpark3CatalystPlanUtils {
}
}
+ override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan):
LogicalPlan = {
+ plan match {
+ case s@ScanOperation(_, _, _,
+ l@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if
fs.fileFormat.isInstanceOf[NewHoodieParquetFileFormat] &&
!fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =>
+ fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =
true
+
Project(l.resolve(fs.location.asInstanceOf[SparkHoodieTableFileIndex].schema,
fs.sparkSession.sessionState.analyzer.resolver), s)
+ case _ => plan
+ }
+ }
+
override def projectOverSchema(schema: StructType, output: AttributeSet):
ProjectionOverSchema =
ProjectionOverSchema(schema, output)
diff --git
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
index 3bd8256c3aa..0ae5ef3dbf3 100644
---
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
@@ -29,14 +29,14 @@ import
org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY
import org.apache.spark.sql.connector.catalog.V2TableWithV1Fallback
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark34HoodieParquetFileFormat}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark34LegacyHoodieParquetFileFormat}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD,
HoodieSpark34PartitionedFileUtils, HoodieSparkPartitionedFileUtils,
PartitionedFile}
+import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hudi.analysis.TableValuedFunctions
import org.apache.spark.sql.parser.{HoodieExtendedParserInterface,
HoodieSpark3_4ExtendedSqlParser}
import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder,
StructType}
import org.apache.spark.sql.vectorized.ColumnarBatchRow
-import org.apache.spark.sql.{HoodieCatalystExpressionUtils,
HoodieCatalystPlansUtils, HoodieSchemaUtils, HoodieSpark34CatalogUtils,
HoodieSpark34CatalystExpressionUtils, HoodieSpark34CatalystPlanUtils,
HoodieSpark34SchemaUtils, HoodieSpark3CatalogUtils, SparkSession,
SparkSessionExtensions}
+import org.apache.spark.sql._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel._
@@ -85,8 +85,8 @@ class Spark3_4Adapter extends BaseSpark3Adapter {
override def createExtendedSparkParser(spark: SparkSession, delegate:
ParserInterface): HoodieExtendedParserInterface =
new HoodieSpark3_4ExtendedSqlParser(spark, delegate)
- override def createHoodieParquetFileFormat(appendPartitionValues: Boolean):
Option[ParquetFileFormat] = {
- Some(new Spark34HoodieParquetFileFormat(appendPartitionValues))
+ override def createLegacyHoodieParquetFileFormat(appendPartitionValues:
Boolean): Option[ParquetFileFormat] = {
+ Some(new Spark34LegacyHoodieParquetFileFormat(appendPartitionValues))
}
override def createHoodieFileScanRDD(sparkSession: SparkSession,
diff --git
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34HoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
similarity index 99%
rename from
hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34HoodieParquetFileFormat.scala
rename to
hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
index 4bd3e2fc345..6de8ded06ec 100644
---
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34HoodieParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
@@ -41,7 +41,7 @@ import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.WholeStageCodegenExec
-import
org.apache.spark.sql.execution.datasources.parquet.Spark34HoodieParquetFileFormat._
+import
org.apache.spark.sql.execution.datasources.parquet.Spark34LegacyHoodieParquetFileFormat._
import org.apache.spark.sql.execution.datasources.{DataSourceUtils,
PartitionedFile, RecordReaderIterator}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
@@ -57,7 +57,7 @@ import org.apache.spark.util.SerializableConfiguration
* <li>Schema on-read</li>
* </ol>
*/
-class Spark34HoodieParquetFileFormat(private val shouldAppendPartitionValues:
Boolean) extends ParquetFileFormat {
+class Spark34LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
override def supportBatch(sparkSession: SparkSession, schema: StructType):
Boolean = {
val conf = sparkSession.sessionState.conf
@@ -427,7 +427,7 @@ class Spark34HoodieParquetFileFormat(private val
shouldAppendPartitionValues: Bo
}
}
-object Spark34HoodieParquetFileFormat {
+object Spark34LegacyHoodieParquetFileFormat {
/**
* NOTE: This method is specific to Spark 3.2.0
diff --git a/rfc/README.md b/rfc/README.md
index 544e1c83de9..0c5475233de 100644
--- a/rfc/README.md
+++ b/rfc/README.md
@@ -107,4 +107,4 @@ The list of all RFCs can be found here.
| 69 | [Hudi 1.x](./rfc-69/rfc-69.md)
| `UNDER
REVIEW` |
| 70 | [Hudi Reverse Streamer](./rfc/rfc-70/rfc-70.md)
| `UNDER
REVIEW` |
| 71 | [Enhance OCC conflict detection](./rfc/rfc-71/rfc-71.md)
| `UNDER
REVIEW` |
-| 72 | [Redesign Hudi-Spark Integration](./rfc/rfc-72/rfc-72.md)
| `IN
PROGRESS` |
\ No newline at end of file
+| 72 | [Redesign Hudi-Spark Integration](./rfc/rfc-72/rfc-72.md)
|
`ONGOING` |
\ No newline at end of file