This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2d745057ea [HUDI-4420] Fixing table schema delineation on
partition/data schema for Spark relations (#5708)
2d745057ea is described below
commit 2d745057eae3f6644339ceaba2dfcb6450871442
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Sat Jul 23 14:59:16 2022 -0700
[HUDI-4420] Fixing table schema delineation on partition/data schema for
Spark relations (#5708)
---
.../org/apache/spark/sql/hudi/SparkAdapter.scala | 2 +-
.../org/apache/hudi/BaseFileOnlyRelation.scala | 23 +-
.../scala/org/apache/hudi/HoodieBaseRelation.scala | 162 ++++++++------
.../org/apache/hudi/HoodieDataSourceHelper.scala | 18 +-
.../scala/org/apache/hudi/HoodieFileScanRDD.scala | 4 +-
.../org/apache/hudi/HoodieMergeOnReadRDD.scala | 243 +++++++++++----------
.../hudi/MergeOnReadIncrementalRelation.scala | 42 +---
.../apache/hudi/MergeOnReadSnapshotRelation.scala | 120 +++++++---
.../apache/spark/sql/hudi/DataSkippingUtils.scala | 2 +-
.../procedures/RunClusteringProcedure.scala | 2 +-
.../org/apache/hudi/TestDataSkippingUtils.scala | 2 +-
.../apache/spark/sql/hudi/TestInsertTable.scala | 3 +-
.../apache/spark/sql/adapter/Spark2Adapter.scala | 2 +-
.../apache/spark/sql/adapter/Spark3_1Adapter.scala | 6 +-
.../apache/spark/sql/adapter/Spark3_2Adapter.scala | 6 +-
15 files changed, 364 insertions(+), 273 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index cd01e4fd5a..cd30528798 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -45,7 +45,7 @@ trait SparkAdapter extends Serializable {
* Creates instance of [[HoodieCatalystExpressionUtils]] providing for
common utils operating
* on Catalyst [[Expression]]s
*/
- def getCatalystExpressionUtils(): HoodieCatalystExpressionUtils
+ def getCatalystExpressionUtils: HoodieCatalystExpressionUtils
/**
* Creates instance of [[HoodieCatalystPlansUtils]] providing for common
utils operating
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
index b7033c3bfc..416e91800f 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
@@ -20,11 +20,13 @@ package org.apache.hudi
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
+import org.apache.hudi.HoodieBaseRelation.projectReader
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.hadoop.HoodieROTablePathFilter
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.execution.datasources
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructType
@@ -68,17 +70,18 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
}
protected override def composeRDD(fileSplits: Seq[HoodieBaseFileSplit],
- partitionSchema: StructType,
- dataSchema: HoodieTableSchema,
+ tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
- filters: Array[Filter]): HoodieUnsafeRDD =
{
+ filters: Array[Filter]): RDD[InternalRow]
= {
+ val (partitionSchema, dataSchema, requiredDataSchema) =
+ tryPrunePartitionColumns(tableSchema, requiredSchema)
val baseFileReader = createBaseFileReader(
spark = sparkSession,
partitionSchema = partitionSchema,
dataSchema = dataSchema,
- requiredSchema = requiredSchema,
+ requiredDataSchema = requiredDataSchema,
filters = filters,
options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be
modifying it
@@ -86,7 +89,15 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
hadoopConf = embedInternalSchema(new Configuration(conf),
requiredSchema.internalSchema)
)
- new HoodieFileScanRDD(sparkSession, baseFileReader, fileSplits)
+ // NOTE: In some case schema of the reader's output (reader's schema)
might not match the schema expected by the caller.
+ // This could occur for ex, when requested schema contains partition
columns which might not be persisted w/in the
+ // data file, but instead would be parsed from the partition path.
In that case output of the file-reader will have
+ // different ordering of the fields than the original required
schema (for more details please check out
+ // [[ParquetFileFormat]] impl). In that case we have to project the
rows from the file-reader's schema
+ // back into the one expected by the caller
+ val projectedReader = projectReader(baseFileReader,
requiredSchema.structTypeSchema)
+
+ new HoodieFileScanRDD(sparkSession, projectedReader.apply, fileSplits)
}
protected def collectFileSplits(partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Seq[HoodieBaseFileSplit] = {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index ff6515db32..5274f257e1 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -45,7 +45,7 @@ import
org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Expression,
SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.{Expression,
SubqueryExpression, UnsafeProjection}
import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import
org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormat,
ParquetFileFormat}
@@ -274,7 +274,7 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
def canPruneRelationSchema: Boolean =
(fileFormat.isInstanceOf[ParquetFileFormat] ||
fileFormat.isInstanceOf[OrcFileFormat]) &&
// NOTE: Some relations might be disabling sophisticated schema pruning
techniques (for ex, nested schema pruning)
- // TODO(HUDI-XXX) internal schema doesn't supported nested schema
pruning currently
+ // TODO(HUDI-XXX) internal schema doesn't support nested schema pruning
currently
!hasSchemaOnRead
override def schema: StructType = {
@@ -334,38 +334,14 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchemaStr,
internalSchemaOpt)
val requiredSchema = HoodieTableSchema(requiredStructSchema,
requiredAvroSchema.toString, Some(requiredInternalSchema))
- // Since schema requested by the caller might contain partition columns,
we might need to
- // prune it, removing all partition columns from it in case these columns
are not persisted
- // in the data files
- //
- // NOTE: This partition schema is only relevant to file reader to be able
to embed
- // values of partition columns (hereafter referred to as partition
values) encoded into
- // the partition path, and omitted from the data file, back into
fetched rows;
- // Note that, by default, partition columns are not omitted
therefore specifying
- // partition schema for reader is not required
- val (partitionSchema, dataSchema, requiredDataSchema) =
- tryPrunePartitionColumns(tableSchema, requiredSchema)
-
if (fileSplits.isEmpty) {
sparkSession.sparkContext.emptyRDD
} else {
- val rdd = composeRDD(fileSplits, partitionSchema, dataSchema,
requiredDataSchema, targetColumns, filters)
-
- // NOTE: In case when partition columns have been pruned from the
required schema, we have to project
- // the rows from the pruned schema back into the one expected by
the caller
- val projectedRDD = if (requiredDataSchema.structTypeSchema !=
requiredSchema.structTypeSchema) {
- rdd.mapPartitions { it =>
- val fullPrunedSchema =
StructType(requiredDataSchema.structTypeSchema.fields ++ partitionSchema.fields)
- val unsafeProjection = generateUnsafeProjection(fullPrunedSchema,
requiredSchema.structTypeSchema)
- it.map(unsafeProjection)
- }
- } else {
- rdd
- }
+ val rdd = composeRDD(fileSplits, tableSchema, requiredSchema,
targetColumns, filters)
// Here we rely on a type erasure, to workaround inherited API
restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]]
// Please check [[needConversion]] scala-doc for more details
- projectedRDD.asInstanceOf[RDD[Row]]
+ rdd.asInstanceOf[RDD[Row]]
}
}
@@ -373,19 +349,17 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
* Composes RDD provided file splits to read from, table and partition
schemas, data filters to be applied
*
* @param fileSplits file splits to be handled by the RDD
- * @param partitionSchema target table's partition schema
- * @param dataSchema target table's data files' schema
+ * @param tableSchema target table's schema
* @param requiredSchema projected schema required by the reader
* @param requestedColumns columns requested by the query
* @param filters data filters to be applied
- * @return instance of RDD (implementing [[HoodieUnsafeRDD]])
+ * @return instance of RDD (holding [[InternalRow]]s)
*/
protected def composeRDD(fileSplits: Seq[FileSplit],
- partitionSchema: StructType,
- dataSchema: HoodieTableSchema,
+ tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
- filters: Array[Filter]): HoodieUnsafeRDD
+ filters: Array[Filter]): RDD[InternalRow]
/**
* Provided with partition and date filters collects target file splits to
read records from, while
@@ -553,7 +527,7 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
protected def createBaseFileReader(spark: SparkSession,
partitionSchema: StructType,
dataSchema: HoodieTableSchema,
- requiredSchema: HoodieTableSchema,
+ requiredDataSchema: HoodieTableSchema,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration):
BaseFileReader = {
@@ -564,42 +538,56 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
// we have to eagerly initialize all of the readers even though only
one specific to the type
// of the file being read will be used. This is required to avoid
serialization of the whole
// relation (containing file-index for ex) and passing it to the
executor
- val reader = tableBaseFileFormat match {
- case HoodieFileFormat.PARQUET =>
- HoodieDataSourceHelper.buildHoodieParquetReader(
- sparkSession = spark,
- dataSchema = dataSchema.structTypeSchema,
- partitionSchema = partitionSchema,
- requiredSchema = requiredSchema.structTypeSchema,
- filters = filters,
- options = options,
- hadoopConf = hadoopConf,
- // We're delegating to Spark to append partition values to every row
only in cases
- // when these corresponding partition-values are not persisted w/in
the data file itself
- appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath
- )
+ val (read: (PartitionedFile => Iterator[InternalRow]), schema: StructType)
=
+ tableBaseFileFormat match {
+ case HoodieFileFormat.PARQUET =>
+ val parquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
+ sparkSession = spark,
+ dataSchema = dataSchema.structTypeSchema,
+ partitionSchema = partitionSchema,
+ requiredSchema = requiredDataSchema.structTypeSchema,
+ filters = filters,
+ options = options,
+ hadoopConf = hadoopConf,
+ // We're delegating to Spark to append partition values to every
row only in cases
+ // when these corresponding partition-values are not persisted
w/in the data file itself
+ appendPartitionValues =
shouldExtractPartitionValuesFromPartitionPath
+ )
+ // Since partition values by default are omitted, and not persisted
w/in data-files by Spark,
+ // data-file readers (such as [[ParquetFileFormat]]) have to inject
partition values while reading
+ // the data. As such, actual full schema produced by such reader is
composed of
+ // a) Data-file schema (projected or not)
+ // b) Appended partition column values
+ val readerSchema =
StructType(requiredDataSchema.structTypeSchema.fields ++ partitionSchema.fields)
+
+ (parquetReader, readerSchema)
case HoodieFileFormat.HFILE =>
- createHFileReader(
+ val hfileReader = createHFileReader(
spark = spark,
dataSchema = dataSchema,
- requiredSchema = requiredSchema,
+ requiredDataSchema = requiredDataSchema,
filters = filters,
options = options,
hadoopConf = hadoopConf
)
+ (hfileReader, requiredDataSchema.structTypeSchema)
+
case _ => throw new UnsupportedOperationException(s"Base file format is
not currently supported ($tableBaseFileFormat)")
}
- partitionedFile => {
- val extension = FSUtils.getFileExtension(partitionedFile.filePath)
- if (tableBaseFileFormat.getFileExtension.equals(extension)) {
- reader.apply(partitionedFile)
- } else {
- throw new UnsupportedOperationException(s"Invalid base-file format
($extension), expected ($tableBaseFileFormat)")
- }
- }
+ BaseFileReader(
+ read = partitionedFile => {
+ val extension = FSUtils.getFileExtension(partitionedFile.filePath)
+ if (tableBaseFileFormat.getFileExtension.equals(extension)) {
+ read(partitionedFile)
+ } else {
+ throw new UnsupportedOperationException(s"Invalid base-file format
($extension), expected ($tableBaseFileFormat)")
+ }
+ },
+ schema = schema
+ )
}
protected def embedInternalSchema(conf: Configuration, internalSchemaOpt:
Option[InternalSchema]): Configuration = {
@@ -615,8 +603,17 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
conf
}
- private def tryPrunePartitionColumns(tableSchema: HoodieTableSchema,
- requiredSchema: HoodieTableSchema):
(StructType, HoodieTableSchema, HoodieTableSchema) = {
+ protected def tryPrunePartitionColumns(tableSchema: HoodieTableSchema,
+ requiredSchema: HoodieTableSchema):
(StructType, HoodieTableSchema, HoodieTableSchema) = {
+ // Since schema requested by the caller might contain partition columns,
we might need to
+ // prune it, removing all partition columns from it in case these columns
are not persisted
+ // in the data files
+ //
+ // NOTE: This partition schema is only relevant to file reader to be able
to embed
+ // values of partition columns (hereafter referred to as partition
values) encoded into
+ // the partition path, and omitted from the data file, back into
fetched rows;
+ // Note that, by default, partition columns are not omitted
therefore specifying
+ // partition schema for reader is not required
if (shouldExtractPartitionValuesFromPartitionPath) {
val partitionSchema = StructType(partitionColumns.map(StructField(_,
StringType)))
val prunedDataStructSchema =
prunePartitionColumns(tableSchema.structTypeSchema)
@@ -645,10 +642,12 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
object HoodieBaseRelation extends SparkAdapterSupport {
- type BaseFileReader = PartitionedFile => Iterator[InternalRow]
+ case class BaseFileReader(read: PartitionedFile => Iterator[InternalRow],
val schema: StructType) {
+ def apply(file: PartitionedFile): Iterator[InternalRow] = read.apply(file)
+ }
- private def generateUnsafeProjection(from: StructType, to: StructType) =
- sparkAdapter.getCatalystExpressionUtils().generateUnsafeProjection(from,
to)
+ def generateUnsafeProjection(from: StructType, to: StructType):
UnsafeProjection =
+ sparkAdapter.getCatalystExpressionUtils.generateUnsafeProjection(from, to)
def convertToAvroSchema(structSchema: StructType): Schema =
sparkAdapter.getAvroSchemaConverters.toAvroType(structSchema, nullable =
false, "Record")
@@ -656,6 +655,32 @@ object HoodieBaseRelation extends SparkAdapterSupport {
def getPartitionPath(fileStatus: FileStatus): Path =
fileStatus.getPath.getParent
+ /**
+ * Projects provided file reader's output from its original schema, into a
[[requiredSchema]]
+ *
+ * NOTE: [[requiredSchema]] has to be a proper subset of the file reader's
schema
+ *
+ * @param reader file reader to be projected
+ * @param requiredSchema target schema for the output of the provided file
reader
+ */
+ def projectReader(reader: BaseFileReader, requiredSchema: StructType):
BaseFileReader = {
+
checkState(reader.schema.fields.toSet.intersect(requiredSchema.fields.toSet).size
== requiredSchema.size)
+
+ if (reader.schema == requiredSchema) {
+ reader
+ } else {
+ val read = reader.apply(_)
+ val projectedRead: PartitionedFile => Iterator[InternalRow] = (file:
PartitionedFile) => {
+ // NOTE: Projection is not a serializable object, hence it creation
should only happen w/in
+ // the executor process
+ val unsafeProjection = generateUnsafeProjection(reader.schema,
requiredSchema)
+ read(file).map(unsafeProjection)
+ }
+
+ BaseFileReader(projectedRead, requiredSchema)
+ }
+ }
+
/**
* Projects provided schema by picking only required (projected) top-level
columns from it
*
@@ -666,7 +691,6 @@ object HoodieBaseRelation extends SparkAdapterSupport {
tableSchema match {
case Right(internalSchema) =>
checkState(!internalSchema.isEmptySchema)
- // TODO extend pruning to leverage optimizer pruned schema
val prunedInternalSchema =
InternalSchemaUtils.pruneInternalSchema(internalSchema,
requiredColumns.toList.asJava)
val requiredAvroSchema =
AvroInternalSchemaConverter.convert(prunedInternalSchema, "schema")
val requiredStructSchema =
AvroConversionUtils.convertAvroSchemaToStructType(requiredAvroSchema)
@@ -691,10 +715,10 @@ object HoodieBaseRelation extends SparkAdapterSupport {
private def createHFileReader(spark: SparkSession,
dataSchema: HoodieTableSchema,
- requiredSchema: HoodieTableSchema,
+ requiredDataSchema: HoodieTableSchema,
filters: Seq[Filter],
options: Map[String, String],
- hadoopConf: Configuration): BaseFileReader = {
+ hadoopConf: Configuration): PartitionedFile =>
Iterator[InternalRow] = {
val hadoopConfBroadcast =
spark.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
@@ -703,10 +727,10 @@ object HoodieBaseRelation extends SparkAdapterSupport {
val reader = new HoodieHFileReader[GenericRecord](hadoopConf, new
Path(partitionedFile.filePath),
new CacheConfig(hadoopConf))
- val requiredRowSchema = requiredSchema.structTypeSchema
+ val requiredRowSchema = requiredDataSchema.structTypeSchema
// NOTE: Schema has to be parsed at this point, since Avro's [[Schema]]
aren't serializable
// to be passed from driver to executor
- val requiredAvroSchema = new
Schema.Parser().parse(requiredSchema.avroSchemaStr)
+ val requiredAvroSchema = new
Schema.Parser().parse(requiredDataSchema.avroSchemaStr)
val avroToRowConverter =
AvroConversionUtils.createAvroToInternalRowConverter(requiredAvroSchema,
requiredRowSchema)
reader.getRecordIterator(requiredAvroSchema).asScala
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
index 6c721723c5..8bd295c7f3 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
@@ -41,15 +41,17 @@ object HoodieDataSourceHelper extends PredicateHelper with
SparkAdapterSupport {
/**
* Wrapper for `buildReaderWithPartitionValues` of [[ParquetFileFormat]]
handling [[ColumnarBatch]],
* when Parquet's Vectorized Reader is used
+ *
+ * TODO move to HoodieBaseRelation, make private
*/
- def buildHoodieParquetReader(sparkSession: SparkSession,
- dataSchema: StructType,
- partitionSchema: StructType,
- requiredSchema: StructType,
- filters: Seq[Filter],
- options: Map[String, String],
- hadoopConf: Configuration,
- appendPartitionValues: Boolean = false):
PartitionedFile => Iterator[InternalRow] = {
+ private[hudi] def buildHoodieParquetReader(sparkSession: SparkSession,
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String],
+ hadoopConf: Configuration,
+ appendPartitionValues: Boolean =
false): PartitionedFile => Iterator[InternalRow] = {
val parquetFileFormat: ParquetFileFormat =
sparkAdapter.createHoodieParquetFileFormat(appendPartitionValues).get
val readParquetFile: PartitionedFile => Iterator[Any] =
parquetFileFormat.buildReaderWithPartitionValues(
sparkSession = sparkSession,
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala
index a176626f76..4b7a09795a 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala
@@ -25,9 +25,9 @@ import
org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, P
case class HoodieBaseFileSplit(filePartition: FilePartition) extends
HoodieFileSplit
class HoodieFileScanRDD(@transient private val sparkSession: SparkSession,
- readFunction: PartitionedFile => Iterator[InternalRow],
+ read: PartitionedFile => Iterator[InternalRow],
@transient fileSplits: Seq[HoodieBaseFileSplit])
- extends FileScanRDD(sparkSession, readFunction,
fileSplits.map(_.filePartition))
+ extends FileScanRDD(sparkSession, read, fileSplits.map(_.filePartition))
with HoodieUnsafeRDD {
override final def collect(): Array[InternalRow] =
super[HoodieUnsafeRDD].collect()
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
index c4c70cb414..512c97806f 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
@@ -23,9 +23,10 @@ import org.apache.avro.generic.{GenericRecord,
GenericRecordBuilder, IndexedReco
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.JobConf
-import org.apache.hudi.HoodieBaseRelation.BaseFileReader
+import org.apache.hudi.HoodieBaseRelation.{BaseFileReader,
generateUnsafeProjection, projectReader}
import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption}
-import org.apache.hudi.HoodieMergeOnReadRDD.{AvroDeserializerSupport,
collectFieldOrdinals, getPartitionPath, projectAvro, projectAvroUnsafe,
projectRowUnsafe, resolveAvroSchemaNullability}
+import
org.apache.hudi.HoodieMergeOnReadRDD.SafeAvroProjection.collectFieldOrdinals
+import org.apache.hudi.HoodieMergeOnReadRDD._
import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
import org.apache.hudi.common.engine.HoodieLocalEngineContext
import org.apache.hudi.common.fs.FSUtils
@@ -43,8 +44,6 @@ import org.apache.hudi.metadata.{HoodieBackedTableMetadata,
HoodieTableMetadata}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.HoodieAvroDeserializer
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow,
UnsafeProjection}
-import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.types.StructType
import org.apache.spark.{Partition, SerializableWritable, SparkContext,
TaskContext}
@@ -56,14 +55,42 @@ import scala.util.Try
case class HoodieMergeOnReadPartition(index: Int, split:
HoodieMergeOnReadFileSplit) extends Partition
-case class HoodieMergeOnReadBaseFileReaders(fullSchemaFileReader:
BaseFileReader,
-
requiredSchemaFileReaderForMerging: BaseFileReader,
-
requiredSchemaFileReaderForNoMerging: BaseFileReader)
+/**
+ * Class holding base-file readers for 3 different use-cases:
+ *
+ * <ol>
+ * <li>Full-schema reader: is used when whole row has to be read to perform
merging correctly.
+ * This could occur, when no optimizations could be applied and we have to
fallback to read the whole row from
+ * the base file and the corresponding delta-log file to merge them
correctly</li>
+ *
+ * <li>Required-schema reader: is used when it's fine to only read row's
projected columns.
+ * This could occur, when row could be merged with corresponding delta-log
record leveraging while only having
+ * projected columns</li>
+ *
+ * <li>Required-schema reader (skip-merging): is used when when no merging
will be performed (skip-merged).
+ * This could occur, when file-group has no delta-log files</li>
+ * </ol>
+ */
+private[hudi] case class HoodieMergeOnReadBaseFileReaders(fullSchemaReader:
BaseFileReader,
+
requiredSchemaReader: BaseFileReader,
+
requiredSchemaReaderSkipMerging: BaseFileReader)
+/**
+ * RDD enabling Hudi's Merge-on-Read (MOR) semantic
+ *
+ * @param sc spark's context
+ * @param config hadoop configuration
+ * @param fileReaders suite of base file readers
+ * @param tableSchema table's full schema
+ * @param requiredSchema expected (potentially) projected schema
+ * @param tableState table's state
+ * @param mergeType type of merge performed
+ * @param fileSplits target file-splits this RDD will be iterating over
+ */
class HoodieMergeOnReadRDD(@transient sc: SparkContext,
@transient config: Configuration,
fileReaders: HoodieMergeOnReadBaseFileReaders,
- dataSchema: HoodieTableSchema,
+ tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
tableState: HoodieTableState,
mergeType: String,
@@ -90,18 +117,19 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
val iter = mergeOnReadPartition.split match {
case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
-
fileReaders.requiredSchemaFileReaderForNoMerging.apply(dataFileOnlySplit.dataFile.get)
+ val projectedReader =
projectReader(fileReaders.requiredSchemaReaderSkipMerging,
requiredSchema.structTypeSchema)
+ projectedReader(dataFileOnlySplit.dataFile.get)
case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
new LogFileIterator(logFileOnlySplit, getConfig)
case split if
mergeType.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) =>
- val baseFileIterator =
fileReaders.requiredSchemaFileReaderForNoMerging.apply(split.dataFile.get)
- new SkipMergeIterator(split, baseFileIterator, getConfig)
+ val reader = fileReaders.requiredSchemaReaderSkipMerging
+ new SkipMergeIterator(split, reader, getConfig)
case split if
mergeType.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) =>
- val (baseFileIterator, schema) = readBaseFile(split)
- new RecordMergingFileIterator(split, baseFileIterator, schema,
getConfig)
+ val reader = pickBaseFileReader
+ new RecordMergingFileIterator(split, reader, getConfig)
case _ => throw new HoodieException(s"Unable to select an Iterator to
read the Hoodie MOR File Split for " +
s"file path: ${mergeOnReadPartition.split.dataFile.get.filePath}" +
@@ -120,7 +148,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
iter
}
- private def readBaseFile(split: HoodieMergeOnReadFileSplit):
(Iterator[InternalRow], HoodieTableSchema) = {
+ private def pickBaseFileReader: BaseFileReader = {
// NOTE: This is an optimization making sure that even for MOR tables we
fetch absolute minimum
// of the stored data possible, while still properly executing
corresponding relation's semantic
// and meet the query's requirements.
@@ -129,10 +157,11 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
// a) It does use one of the standard (and whitelisted) Record
Payload classes
// then we can avoid reading and parsing the records w/ _full_
schema, and instead only
// rely on projected one, nevertheless being able to perform merging
correctly
- if (!whitelistedPayloadClasses.contains(tableState.recordPayloadClassName))
- (fileReaders.fullSchemaFileReader(split.dataFile.get), dataSchema)
- else
- (fileReaders.requiredSchemaFileReaderForMerging(split.dataFile.get),
requiredSchema)
+ if (whitelistedPayloadClasses.contains(tableState.recordPayloadClassName))
{
+ fileReaders.requiredSchemaReader
+ } else {
+ fileReaders.fullSchemaReader
+ }
}
override protected def getPartitions: Array[Partition] =
@@ -156,38 +185,27 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
protected override val requiredAvroSchema: Schema = new
Schema.Parser().parse(requiredSchema.avroSchemaStr)
protected override val requiredStructTypeSchema: StructType =
requiredSchema.structTypeSchema
- protected val logFileReaderAvroSchema: Schema = new
Schema.Parser().parse(dataSchema.avroSchemaStr)
+ protected val logFileReaderAvroSchema: Schema = new
Schema.Parser().parse(tableSchema.avroSchemaStr)
- protected val recordBuilder: GenericRecordBuilder = new
GenericRecordBuilder(requiredAvroSchema)
protected var recordToLoad: InternalRow = _
- // TODO validate whether we need to do UnsafeProjection
- protected val unsafeProjection: UnsafeProjection =
UnsafeProjection.create(requiredStructTypeSchema)
-
- // NOTE: This maps _required_ schema fields onto the _full_ table schema,
collecting their "ordinals"
- // w/in the record payload. This is required, to project records
read from the Delta Log file
- // which always reads records in full schema (never projected, due
to the fact that DL file might
- // be stored in non-columnar formats like Avro, HFile, etc)
- private val requiredSchemaFieldOrdinals: List[Int] =
collectFieldOrdinals(requiredAvroSchema, logFileReaderAvroSchema)
+ private val requiredSchemaSafeAvroProjection =
SafeAvroProjection.create(logFileReaderAvroSchema, requiredAvroSchema)
private var logScanner = {
- val internalSchema =
dataSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema)
+ val internalSchema =
tableSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema)
HoodieMergeOnReadRDD.scanLog(split.logFiles, getPartitionPath(split),
logFileReaderAvroSchema, tableState,
maxCompactionMemoryInBytes, config, internalSchema)
}
private val logRecords = logScanner.getRecords.asScala
- // NOTE: This iterator iterates over already projected (in required
schema) records
// NOTE: This have to stay lazy to make sure it's initialized only at the
point where it's
// going to be used, since we modify `logRecords` before that and
therefore can't do it any earlier
protected lazy val logRecordsIterator: Iterator[Option[GenericRecord]] =
logRecords.iterator.map {
case (_, record) =>
- val avroRecordOpt =
toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema,
payloadProps))
- avroRecordOpt.map {
- avroRecord => projectAvroUnsafe(avroRecord, requiredAvroSchema,
requiredSchemaFieldOrdinals, recordBuilder)
- }
+ toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema,
payloadProps))
+ .map(_.asInstanceOf[GenericRecord])
}
protected def removeLogRecord(key: String): Option[HoodieRecord[_ <:
HoodieRecordPayload[_]]] =
@@ -205,7 +223,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
// Record has been deleted, skipping
this.hasNextInternal
} else {
- recordToLoad = unsafeProjection(deserialize(avroRecordOpt.get))
+ val projectedAvroRecord =
requiredSchemaSafeAvroProjection(avroRecordOpt.get)
+ recordToLoad = deserialize(projectedAvroRecord)
true
}
}
@@ -229,14 +248,18 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
* performing any combination/merging of the records w/ the same primary
keys (ie producing duplicates potentially)
*/
private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit,
- baseFileIterator: Iterator[InternalRow],
+ baseFileReader: BaseFileReader,
config: Configuration)
extends LogFileIterator(split, config) {
+ private val requiredSchemaUnsafeProjection =
generateUnsafeProjection(baseFileReader.schema, requiredStructTypeSchema)
+
+ private val baseFileIterator = baseFileReader(split.dataFile.get)
+
override def hasNext: Boolean = {
if (baseFileIterator.hasNext) {
- val curRow = baseFileIterator.next()
- recordToLoad = curRow
+ // No merge is required, simply load current row and project into
required schema
+ recordToLoad = requiredSchemaUnsafeProjection(baseFileIterator.next())
true
} else {
super[LogFileIterator].hasNext
@@ -250,8 +273,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
* streams
*/
private class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit,
- baseFileIterator:
Iterator[InternalRow],
- baseFileReaderSchema:
HoodieTableSchema,
+ baseFileReader: BaseFileReader,
config: Configuration)
extends LogFileIterator(split, config) {
@@ -260,13 +282,17 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
// - Projected schema
// As such, no particular schema could be assumed, and therefore we
rely on the caller
// to correspondingly set the scheme of the expected output of
base-file reader
- private val baseFileReaderAvroSchema = new
Schema.Parser().parse(baseFileReaderSchema.avroSchemaStr)
- private val requiredSchemaFieldOrdinals: List[Int] =
collectFieldOrdinals(requiredAvroSchema, baseFileReaderAvroSchema)
+ private val baseFileReaderAvroSchema =
sparkAdapter.getAvroSchemaConverters.toAvroType(baseFileReader.schema, nullable
= false, "record")
+
+ private val serializer =
sparkAdapter.createAvroSerializer(baseFileReader.schema,
baseFileReaderAvroSchema, nullable = false)
- private val serializer =
sparkAdapter.createAvroSerializer(baseFileReaderSchema.structTypeSchema,
- baseFileReaderAvroSchema,
resolveAvroSchemaNullability(baseFileReaderAvroSchema))
+ private val reusableRecordBuilder: GenericRecordBuilder = new
GenericRecordBuilder(requiredAvroSchema)
- private val recordKeyOrdinal =
baseFileReaderSchema.structTypeSchema.fieldIndex(tableState.recordKeyField)
+ private val recordKeyOrdinal =
baseFileReader.schema.fieldIndex(tableState.recordKeyField)
+
+ private val requiredSchemaUnsafeProjection =
generateUnsafeProjection(baseFileReader.schema, requiredStructTypeSchema)
+
+ private val baseFileIterator = baseFileReader(split.dataFile.get)
override def hasNext: Boolean = hasNextInternal
@@ -275,26 +301,22 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
// handling records
@tailrec private def hasNextInternal: Boolean = {
if (baseFileIterator.hasNext) {
- val curRowRecord = baseFileIterator.next()
- val curKey = curRowRecord.getString(recordKeyOrdinal)
+ val curRow = baseFileIterator.next()
+ val curKey = curRow.getString(recordKeyOrdinal)
val updatedRecordOpt = removeLogRecord(curKey)
if (updatedRecordOpt.isEmpty) {
- // No merge needed, load current row with required projected schema
- recordToLoad = unsafeProjection(projectRowUnsafe(curRowRecord,
requiredSchema.structTypeSchema, requiredSchemaFieldOrdinals))
+ // No merge is required, simply load current row and project into
required schema
+ recordToLoad = requiredSchemaUnsafeProjection(curRow)
true
} else {
- val mergedAvroRecordOpt = merge(serialize(curRowRecord),
updatedRecordOpt.get)
+ val mergedAvroRecordOpt = merge(serialize(curRow),
updatedRecordOpt.get)
if (mergedAvroRecordOpt.isEmpty) {
// Record has been deleted, skipping
this.hasNextInternal
} else {
- // NOTE: In occurrence of a merge we can't know the schema of the
record being returned, b/c
- // record from the Delta Log will bear (full) Table schema,
while record from the Base file
- // might already be read in projected one (as an
optimization).
- // As such we can't use more performant
[[projectAvroUnsafe]], and instead have to fallback
- // to [[projectAvro]]
- val projectedAvroRecord = projectAvro(mergedAvroRecordOpt.get,
requiredAvroSchema, recordBuilder)
- recordToLoad = unsafeProjection(deserialize(projectedAvroRecord))
+ val projectedAvroRecord =
projectAvroUnsafe(mergedAvroRecordOpt.get.asInstanceOf[GenericRecord],
+ requiredAvroSchema, reusableRecordBuilder)
+ recordToLoad = deserialize(projectedAvroRecord)
true
}
}
@@ -381,66 +403,10 @@ private object HoodieMergeOnReadRDD {
}
}
- /**
- * Projects provided instance of [[InternalRow]] into provided schema,
assuming that the
- * the schema of the original row is strictly a superset of the given one
- */
- private def projectRowUnsafe(row: InternalRow,
- projectedSchema: StructType,
- ordinals: Seq[Int]): InternalRow = {
- val projectedRow = new SpecificInternalRow(projectedSchema)
- var curIndex = 0
- projectedSchema.zip(ordinals).foreach { case (field, pos) =>
- val curField = if (row.isNullAt(pos)) {
- null
- } else {
- row.get(pos, field.dataType)
- }
- projectedRow.update(curIndex, curField)
- curIndex += 1
- }
- projectedRow
- }
-
- /**
- * Projects provided instance of [[IndexedRecord]] into provided schema,
assuming that the
- * the schema of the original row is strictly a superset of the given one
- */
- def projectAvroUnsafe(record: IndexedRecord,
- projectedSchema: Schema,
- ordinals: List[Int],
- recordBuilder: GenericRecordBuilder): GenericRecord = {
+ private def projectAvroUnsafe(record: GenericRecord, projectedSchema:
Schema, reusableRecordBuilder: GenericRecordBuilder): GenericRecord = {
val fields = projectedSchema.getFields.asScala
- checkState(fields.length == ordinals.length)
- fields.zip(ordinals).foreach {
- case (field, pos) => recordBuilder.set(field, record.get(pos))
- }
- recordBuilder.build()
- }
-
- /**
- * Projects provided instance of [[IndexedRecord]] into provided schema,
assuming that the
- * the schema of the original row is strictly a superset of the given one
- *
- * This is a "safe" counterpart of [[projectAvroUnsafe]]: it does build
mapping of the record's
- * schema into projected one itself (instead of expecting such mapping from
the caller)
- */
- def projectAvro(record: IndexedRecord,
- projectedSchema: Schema,
- recordBuilder: GenericRecordBuilder): GenericRecord = {
- projectAvroUnsafe(record, projectedSchema,
collectFieldOrdinals(projectedSchema, record.getSchema), recordBuilder)
- }
-
- /**
- * Maps [[projected]] [[Schema]] onto [[source]] one, collecting
corresponding field ordinals w/in it, which
- * will be subsequently used by either [[projectRowUnsafe]] or
[[projectAvroUnsafe()]] method
- *
- * @param projected target projected schema (which is a proper subset of
[[source]] [[Schema]])
- * @param source source schema of the record being projected
- * @return list of ordinals of corresponding fields of [[projected]] schema
w/in [[source]] one
- */
- private def collectFieldOrdinals(projected: Schema, source: Schema):
List[Int] = {
- projected.getFields.asScala.map(f =>
source.getField(f.name()).pos()).toList
+ fields.foreach(field => reusableRecordBuilder.set(field,
record.get(field.name())))
+ reusableRecordBuilder.build()
}
private def getPartitionPath(split: HoodieMergeOnReadFileSplit): Path = {
@@ -452,9 +418,48 @@ private object HoodieMergeOnReadRDD {
.getParent
}
- private def resolveAvroSchemaNullability(schema: Schema) = {
- AvroConversionUtils.resolveAvroTypeNullability(schema) match {
- case (nullable, _) => nullable
+ // TODO extract to HoodieAvroSchemaUtils
+ abstract class AvroProjection extends (GenericRecord => GenericRecord)
+
+ class SafeAvroProjection(sourceSchema: Schema,
+ projectedSchema: Schema,
+ reusableRecordBuilder: GenericRecordBuilder = null)
extends AvroProjection {
+
+ private val ordinals: List[Int] = collectFieldOrdinals(projectedSchema,
sourceSchema)
+ private val recordBuilder: GenericRecordBuilder =
+ if (reusableRecordBuilder != null) {
+ reusableRecordBuilder
+ } else {
+ new GenericRecordBuilder(projectedSchema)
+ }
+
+ override def apply(record: GenericRecord): GenericRecord = {
+ val fields = projectedSchema.getFields.asScala
+ checkState(fields.length == ordinals.length)
+ fields.zip(ordinals).foreach {
+ case (field, pos) => recordBuilder.set(field, record.get(pos))
+ }
+ recordBuilder.build()
+ }
+ }
+
+ object SafeAvroProjection {
+ def create(sourceSchema: Schema, projectedSchema: Schema,
reusableRecordBuilder: GenericRecordBuilder = null): SafeAvroProjection =
+ new SafeAvroProjection(
+ sourceSchema = sourceSchema,
+ projectedSchema = projectedSchema,
+ reusableRecordBuilder = reusableRecordBuilder)
+
+ /**
+ * Maps [[projected]] [[Schema]] onto [[source]] one, collecting
corresponding field ordinals w/in it, which
+ * will be subsequently used by either [[projectRowUnsafe]] or
[[projectAvroUnsafe()]] method
+ *
+ * @param projected target projected schema (which is a proper subset of
[[source]] [[Schema]])
+ * @param source source schema of the record being projected
+ * @return list of ordinals of corresponding fields of [[projected]]
schema w/in [[source]] one
+ */
+ private def collectFieldOrdinals(projected: Schema, source: Schema):
List[Int] = {
+ projected.getFields.asScala.map(f =>
source.getField(f.name()).pos()).toList
}
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index 6fa130ac8c..0fc6ef2f83 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -17,7 +17,6 @@
package org.apache.hudi
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{GlobPattern, Path}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.common.model.{FileSlice, HoodieRecord}
@@ -27,7 +26,9 @@ import
org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.{getCommitMetadata,
getWritePartitionPaths, listAffectedFilesForCommits}
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
@@ -58,32 +59,15 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
}
protected override def composeRDD(fileSplits:
Seq[HoodieMergeOnReadFileSplit],
- partitionSchema: StructType,
- dataSchema: HoodieTableSchema,
+ tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
- filters: Array[Filter]):
HoodieMergeOnReadRDD = {
- val fullSchemaParquetReader = createBaseFileReader(
- spark = sqlContext.sparkSession,
- partitionSchema = partitionSchema,
- dataSchema = dataSchema,
- requiredSchema = dataSchema,
- // This file-reader is used to read base file records, subsequently
merging them with the records
- // stored in delta-log files. As such, we have to read _all_ records
from the base file, while avoiding
- // applying any user-defined filtering _before_ we complete combining
them w/ delta-log records (to make sure that
- // we combine them correctly)
- //
- // The only filtering applicable here is the filtering to make sure
we're only fetching records that
- // fall into incremental span of the timeline being queried
- filters = incrementalSpanRecordFilters,
- options = optParams,
- // NOTE: We have to fork the Hadoop Config here as Spark will be
modifying it
- // to configure Parquet reader appropriately
- hadoopConf = embedInternalSchema(new Configuration(conf),
internalSchemaOpt)
- )
-
- val (requiredSchemaBaseFileReaderMerging,
requiredSchemaBaseFileReaderNoMerging) =
- createMergeOnReadBaseFileReaders(partitionSchema, dataSchema,
requiredSchema, requestedColumns, filters ++ incrementalSpanRecordFilters)
+ filters: Array[Filter]): RDD[InternalRow]
= {
+ // The only required filters are ones that make sure we're only fetching
records that
+ // fall into incremental span of the timeline being queried
+ val requiredFilters = incrementalSpanRecordFilters
+ val optionalFilters = filters
+ val readers = createBaseFileReaders(tableSchema, requiredSchema,
requestedColumns, requiredFilters, optionalFilters)
val hoodieTableState = getTableState
// TODO(HUDI-3639) implement incremental span record filtering w/in RDD to
make sure returned iterator is appropriately
@@ -91,12 +75,8 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
new HoodieMergeOnReadRDD(
sqlContext.sparkContext,
config = jobConf,
- fileReaders = HoodieMergeOnReadBaseFileReaders(
- fullSchemaFileReader = fullSchemaParquetReader,
- requiredSchemaFileReaderForMerging =
requiredSchemaBaseFileReaderMerging,
- requiredSchemaFileReaderForNoMerging =
requiredSchemaBaseFileReaderNoMerging
- ),
- dataSchema = dataSchema,
+ fileReaders = readers,
+ tableSchema = tableSchema,
requiredSchema = requiredSchema,
tableState = hoodieTableState,
mergeType = mergeType,
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
index c6d4eafafc..1cb35aa61d 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
@@ -29,6 +29,7 @@ import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
@@ -80,44 +81,113 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
}
protected override def composeRDD(fileSplits:
Seq[HoodieMergeOnReadFileSplit],
- partitionSchema: StructType,
- dataSchema: HoodieTableSchema,
+ tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
- filters: Array[Filter]):
HoodieMergeOnReadRDD = {
- val fullSchemaBaseFileReader = createBaseFileReader(
+ filters: Array[Filter]): RDD[InternalRow]
= {
+ val requiredFilters = Seq.empty
+ val optionalFilters = filters
+ val readers = createBaseFileReaders(tableSchema, requiredSchema,
requestedColumns, requiredFilters, optionalFilters)
+
+ val tableState = getTableState
+ new HoodieMergeOnReadRDD(
+ sqlContext.sparkContext,
+ config = jobConf,
+ fileReaders = readers,
+ tableSchema = tableSchema,
+ requiredSchema = requiredSchema,
+ tableState = tableState,
+ mergeType = mergeType,
+ fileSplits = fileSplits)
+ }
+
+ protected def createBaseFileReaders(tableSchema: HoodieTableSchema,
+ requiredSchema: HoodieTableSchema,
+ requestedColumns: Array[String],
+ requiredFilters: Seq[Filter],
+ optionalFilters: Seq[Filter] =
Seq.empty): HoodieMergeOnReadBaseFileReaders = {
+ val (partitionSchema, dataSchema, requiredDataSchema) =
+ tryPrunePartitionColumns(tableSchema, requiredSchema)
+
+ val fullSchemaReader = createBaseFileReader(
spark = sqlContext.sparkSession,
partitionSchema = partitionSchema,
dataSchema = dataSchema,
- requiredSchema = dataSchema,
+ requiredDataSchema = dataSchema,
// This file-reader is used to read base file records, subsequently
merging them with the records
// stored in delta-log files. As such, we have to read _all_ records
from the base file, while avoiding
// applying any filtering _before_ we complete combining them w/
delta-log records (to make sure that
- // we combine them correctly)
- filters = Seq.empty,
+ // we combine them correctly);
+ // As such only required filters could be pushed-down to such reader
+ filters = requiredFilters,
options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be
modifying it
// to configure Parquet reader appropriately
hadoopConf = embedInternalSchema(new Configuration(conf),
internalSchemaOpt)
)
- val (requiredSchemaBaseFileReaderMerging,
requiredSchemaBaseFileReaderNoMerging) =
- createMergeOnReadBaseFileReaders(partitionSchema, dataSchema,
requiredSchema, requestedColumns, filters)
-
- val tableState = getTableState
- new HoodieMergeOnReadRDD(
- sqlContext.sparkContext,
- config = jobConf,
- fileReaders = HoodieMergeOnReadBaseFileReaders(
- fullSchemaFileReader = fullSchemaBaseFileReader,
- requiredSchemaFileReaderForMerging =
requiredSchemaBaseFileReaderMerging,
- requiredSchemaFileReaderForNoMerging =
requiredSchemaBaseFileReaderNoMerging
- ),
+ val requiredSchemaReader = createBaseFileReader(
+ spark = sqlContext.sparkSession,
+ partitionSchema = partitionSchema,
dataSchema = dataSchema,
- requiredSchema = requiredSchema,
- tableState = tableState,
- mergeType = mergeType,
- fileSplits = fileSplits)
+ requiredDataSchema = requiredDataSchema,
+ // This file-reader is used to read base file records, subsequently
merging them with the records
+ // stored in delta-log files. As such, we have to read _all_ records
from the base file, while avoiding
+ // applying any filtering _before_ we complete combining them w/
delta-log records (to make sure that
+ // we combine them correctly);
+ // As such only required filters could be pushed-down to such reader
+ filters = requiredFilters,
+ options = optParams,
+ // NOTE: We have to fork the Hadoop Config here as Spark will be
modifying it
+ // to configure Parquet reader appropriately
+ hadoopConf = embedInternalSchema(new Configuration(conf),
requiredDataSchema.internalSchema)
+ )
+
+ // Check whether fields required for merging were also requested to be
fetched
+ // by the query:
+ // - In case they were, there's no optimization we could apply here (we
will have
+ // to fetch such fields)
+ // - In case they were not, we will provide 2 separate file-readers
+ // a) One which would be applied to file-groups w/ delta-logs
(merging)
+ // b) One which would be applied to file-groups w/ no delta-logs or
+ // in case query-mode is skipping merging
+ val mandatoryColumns =
mandatoryFieldsForMerging.map(HoodieAvroUtils.getRootLevelFieldName)
+ if (mandatoryColumns.forall(requestedColumns.contains)) {
+ HoodieMergeOnReadBaseFileReaders(
+ fullSchemaReader = fullSchemaReader,
+ requiredSchemaReader = requiredSchemaReader,
+ requiredSchemaReaderSkipMerging = requiredSchemaReader
+ )
+ } else {
+ val prunedRequiredSchema = {
+ val superfluousColumnNames =
mandatoryColumns.filterNot(requestedColumns.contains)
+ val prunedStructSchema =
+ StructType(requiredDataSchema.structTypeSchema.fields
+ .filterNot(f => superfluousColumnNames.contains(f.name)))
+
+ HoodieTableSchema(prunedStructSchema,
convertToAvroSchema(prunedStructSchema).toString)
+ }
+
+ val requiredSchemaReaderSkipMerging = createBaseFileReader(
+ spark = sqlContext.sparkSession,
+ partitionSchema = partitionSchema,
+ dataSchema = dataSchema,
+ requiredDataSchema = prunedRequiredSchema,
+ // This file-reader is only used in cases when no merging is
performed, therefore it's safe to push
+ // down these filters to the base file readers
+ filters = requiredFilters ++ optionalFilters,
+ options = optParams,
+ // NOTE: We have to fork the Hadoop Config here as Spark will be
modifying it
+ // to configure Parquet reader appropriately
+ hadoopConf = embedInternalSchema(new Configuration(conf),
requiredDataSchema.internalSchema)
+ )
+
+ HoodieMergeOnReadBaseFileReaders(
+ fullSchemaReader = fullSchemaReader,
+ requiredSchemaReader = requiredSchemaReader,
+ requiredSchemaReaderSkipMerging = requiredSchemaReaderSkipMerging
+ )
+ }
}
protected override def collectFileSplits(partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = {
@@ -156,7 +226,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
spark = sqlContext.sparkSession,
partitionSchema = partitionSchema,
dataSchema = dataSchema,
- requiredSchema = requiredDataSchema,
+ requiredDataSchema = requiredDataSchema,
filters = filters,
options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be
modifying it
@@ -189,7 +259,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
spark = sqlContext.sparkSession,
partitionSchema = partitionSchema,
dataSchema = dataSchema,
- requiredSchema = prunedRequiredSchema,
+ requiredDataSchema = prunedRequiredSchema,
filters = filters,
options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be
modifying it
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
index b5d19bd37d..6a04ec57e1 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
@@ -384,7 +384,7 @@ private object ColumnStatsExpressionUtils {
* Returns only [[AttributeReference]] contained as a sub-expression
*/
object AllowedTransformationExpression extends SparkAdapterSupport {
- val exprUtils: HoodieCatalystExpressionUtils =
sparkAdapter.getCatalystExpressionUtils()
+ val exprUtils: HoodieCatalystExpressionUtils =
sparkAdapter.getCatalystExpressionUtils
def unapply(expr: Expression): Option[AttributeReference] = {
// First step, we check that expression
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
index f2ae31a0f7..289f800b27 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
@@ -41,7 +41,7 @@ class RunClusteringProcedure extends BaseProcedure
with Logging
with SparkAdapterSupport {
- private val exprUtils = sparkAdapter.getCatalystExpressionUtils()
+ private val exprUtils = sparkAdapter.getCatalystExpressionUtils
/**
* OPTIMIZE table_name|table_path [WHERE predicate]
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
index bd5aa01216..9300b94bb9 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
@@ -57,7 +57,7 @@ case class IndexRow(fileName: String,
class TestDataSkippingUtils extends HoodieClientTestBase with
SparkAdapterSupport {
- val exprUtils: HoodieCatalystExpressionUtils =
sparkAdapter.getCatalystExpressionUtils()
+ val exprUtils: HoodieCatalystExpressionUtils =
sparkAdapter.getCatalystExpressionUtils
var spark: SparkSession = _
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index 57c826af92..9aa3c509c3 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -17,14 +17,13 @@
package org.apache.spark.sql.hudi
-import org.apache.hudi.DataSourceWriteOptions.{KEYGENERATOR_CLASS_NAME,
MOR_TABLE_TYPE_OPT_VAL, PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD,
TABLE_TYPE}
+import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieDuplicateKeyException
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.spark.sql.SaveMode
-import org.apache.spark.sql.internal.SQLConf
import java.io.File
diff --git
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
index fdc0857800..30af252d2d 100644
---
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
@@ -40,7 +40,7 @@ import scala.collection.mutable.ArrayBuffer
*/
class Spark2Adapter extends SparkAdapter {
- override def getCatalystExpressionUtils(): HoodieCatalystExpressionUtils =
HoodieSpark2CatalystExpressionUtils
+ override def getCatalystExpressionUtils: HoodieCatalystExpressionUtils =
HoodieSpark2CatalystExpressionUtils
override def getCatalystPlanUtils: HoodieCatalystPlansUtils =
HoodieSpark2CatalystPlanUtils
diff --git
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
index 8093d70692..028bb5788c 100644
---
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
@@ -19,20 +19,20 @@
package org.apache.spark.sql.adapter
import org.apache.avro.Schema
+import org.apache.spark.sql.{HoodieCatalystExpressionUtils,
HoodieCatalystPlansUtils, HoodieSpark31CatalystExpressionUtils,
HoodieSpark31CatalystPlanUtils}
import org.apache.spark.sql.avro.{HoodieAvroDeserializer,
HoodieAvroSerializer, HoodieSpark3_1AvroDeserializer,
HoodieSpark3_1AvroSerializer}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark31HoodieParquetFileFormat}
import org.apache.spark.sql.hudi.SparkAdapter
import org.apache.spark.sql.types.DataType
-import org.apache.spark.sql.{HoodieCatalystExpressionUtils,
HoodieCatalystPlansUtils, HoodieSpark31CatalystExpressionUtils,
HoodieSpark31CatalystPlanUtils}
/**
* Implementation of [[SparkAdapter]] for Spark 3.1.x
*/
class Spark3_1Adapter extends BaseSpark3Adapter {
- def getCatalystPlanUtils: HoodieCatalystPlansUtils =
HoodieSpark31CatalystPlanUtils
+ override def getCatalystExpressionUtils: HoodieCatalystExpressionUtils =
HoodieSpark31CatalystExpressionUtils
- override def getCatalystExpressionUtils(): HoodieCatalystExpressionUtils =
HoodieSpark31CatalystExpressionUtils
+ override def getCatalystPlanUtils: HoodieCatalystPlansUtils =
HoodieSpark31CatalystPlanUtils
override def createAvroSerializer(rootCatalystType: DataType, rootAvroType:
Schema, nullable: Boolean): HoodieAvroSerializer =
new HoodieSpark3_1AvroSerializer(rootCatalystType, rootAvroType, nullable)
diff --git
a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
index ceb66b7437..fe25ee7fdc 100644
---
a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
@@ -30,15 +30,15 @@ import org.apache.spark.sql._
*/
class Spark3_2Adapter extends BaseSpark3Adapter {
- def getCatalystPlanUtils: HoodieCatalystPlansUtils =
HoodieSpark32CatalystPlanUtils
-
override def createAvroSerializer(rootCatalystType: DataType, rootAvroType:
Schema, nullable: Boolean): HoodieAvroSerializer =
new HoodieSpark3_2AvroSerializer(rootCatalystType, rootAvroType, nullable)
override def createAvroDeserializer(rootAvroType: Schema, rootCatalystType:
DataType): HoodieAvroDeserializer =
new HoodieSpark3_2AvroDeserializer(rootAvroType, rootCatalystType)
- override def getCatalystExpressionUtils(): HoodieCatalystExpressionUtils =
HoodieSpark32CatalystExpressionUtils
+ override def getCatalystExpressionUtils: HoodieCatalystExpressionUtils =
HoodieSpark32CatalystExpressionUtils
+
+ override def getCatalystPlanUtils: HoodieCatalystPlansUtils =
HoodieSpark32CatalystPlanUtils
override def createExtendedSparkParser: Option[(SparkSession,
ParserInterface) => ParserInterface] = {
Some(