alexeykudinkin commented on a change in pull request #4709:
URL: https://github.com/apache/hudi/pull/4709#discussion_r803037765
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
##########
@@ -107,37 +107,35 @@ class HoodieBootstrapRelation(@transient val _sqlContext:
SQLContext,
})
// Prepare readers for reading data file and skeleton files
- val dataReadFunction = new ParquetFileFormat()
- .buildReaderWithPartitionValues(
- sparkSession = _sqlContext.sparkSession,
- dataSchema = dataSchema,
- partitionSchema = StructType(Seq.empty),
- requiredSchema = requiredDataSchema,
- filters = if (requiredSkeletonSchema.isEmpty) filters else Seq() ,
- options = Map.empty,
- hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()
- )
-
- val skeletonReadFunction = new ParquetFileFormat()
- .buildReaderWithPartitionValues(
- sparkSession = _sqlContext.sparkSession,
- dataSchema = skeletonSchema,
- partitionSchema = StructType(Seq.empty),
- requiredSchema = requiredSkeletonSchema,
- filters = if (requiredDataSchema.isEmpty) filters else Seq(),
- options = Map.empty,
- hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()
- )
-
- val regularReadFunction = new ParquetFileFormat()
- .buildReaderWithPartitionValues(
- sparkSession = _sqlContext.sparkSession,
- dataSchema = fullSchema,
- partitionSchema = StructType(Seq.empty),
- requiredSchema = requiredColsSchema,
- filters = filters,
- options = Map.empty,
- hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf())
+ val dataReadFunction = HoodieDataSourceHelper.buildHoodieParquetReader(
+ sparkSession = _sqlContext.sparkSession,
+ dataSchema = dataSchema,
+ partitionSchema = StructType(Seq.empty),
+ requiredSchema = requiredDataSchema,
+ filters = if (requiredSkeletonSchema.isEmpty) filters else Seq() ,
+ options = Map.empty,
Review comment:
We should pass optParams here
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileStatus
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Expression, PredicateHelper,
SpecificInternalRow, SubqueryExpression, UnsafeProjection}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.datasources.{FilePartition,
PartitionedFile}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+object HoodieDataSourceHelper extends PredicateHelper {
+
+ /**
+ * Partition the given condition into two sequence of conjunctive predicates:
+ * - predicates that can be evaluated using metadata only.
+ * - other predicates.
+ */
+ def splitPartitionAndDataPredicates(
+ spark: SparkSession,
+ condition: Expression,
+ partitionColumns: Seq[String]): (Seq[Expression], Seq[Expression]) = {
+ splitConjunctivePredicates(condition).partition(
+ isPredicateMetadataOnly(spark, _, partitionColumns))
+ }
+
+ /**
+ * Check if condition can be evaluated using only metadata. In Delta, this
means the condition
+ * only references partition columns and involves no subquery.
+ */
+ def isPredicateMetadataOnly(
+ spark: SparkSession,
+ condition: Expression,
+ partitionColumns: Seq[String]): Boolean = {
+ isPredicatePartitionColumnsOnly(spark, condition, partitionColumns) &&
+ !SubqueryExpression.hasSubquery(condition)
+ }
+
+ /**
+ * Does the predicate only contains partition columns?
+ */
+ def isPredicatePartitionColumnsOnly(
+ spark: SparkSession,
+ condition: Expression,
+ partitionColumns: Seq[String]): Boolean = {
+ val nameEquality = spark.sessionState.analyzer.resolver
+ condition.references.forall { r =>
+ partitionColumns.exists(nameEquality(r.name, _))
+ }
+ }
+
+ /**
+ * Wrapper `buildReaderWithPartitionValues` of [[ParquetFileFormat]]
+ * to deal with [[ColumnarBatch]] when enable parquet vectorized reader if
necessary.
+ */
+ def buildHoodieParquetReader(
+ sparkSession: SparkSession,
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String],
+ hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] =
{
+
+ val readParquetFile: PartitionedFile => Iterator[Any] = new
ParquetFileFormat().buildReaderWithPartitionValues(
+ sparkSession = sparkSession,
+ dataSchema = dataSchema,
+ partitionSchema = partitionSchema,
+ requiredSchema = requiredSchema,
+ filters = filters,
+ options = options,
+ hadoopConf = hadoopConf
+ )
+
+ file: PartitionedFile => {
+ val iter = readParquetFile(file)
+ unravelColumnarBatchIfNecessary(iter)
+ }
+ }
+
+ /**
+ * if is [[ColumnarBatch]], unravel it to [[InternalRow]]
+ */
+ def unravelColumnarBatchIfNecessary(iter: Iterator[Any]):
Iterator[InternalRow] = {
Review comment:
I don't think this should be used at all outside of building the reader
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -61,26 +64,31 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
} else {
new Properties()
}
+
+ private val requiredSchema = tableState.requiredStructSchema
+
+ private val requiredFieldPosition = requiredSchema.map(f =>
tableState.tableStructSchema.fieldIndex(f.name))
+
override def compute(split: Partition, context: TaskContext):
Iterator[InternalRow] = {
val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
val iter = mergeOnReadPartition.split match {
case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
- val rows = read(dataFileOnlySplit.dataFile.get,
requiredSchemaFileReader)
- extractRequiredSchema(rows)
+ val rows = readParquetFile(dataFileOnlySplit.dataFile.get,
requiredSchemaFileReader)
+ extractRequiredSchema(rows, requiredSchema, requiredFieldPosition)
Review comment:
There's actually HUDI-3396 to make sure this is the case
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hadoop.fs.FileStatus
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Expression, PredicateHelper,
SpecificInternalRow, SubqueryExpression, UnsafeProjection}
+import org.apache.spark.sql.execution.datasources.{FilePartition,
PartitionedFile}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+object HoodieDataSourceHelper extends PredicateHelper {
+
+ /**
+ * Partition the given condition into two sequence of conjunctive predicates:
+ * - predicates that can be evaluated using metadata only.
+ * - other predicates.
+ */
+ def splitPartitionAndDataPredicates(
+ condition: Expression,
+ partitionColumns: Seq[String],
+ spark: SparkSession): (Seq[Expression], Seq[Expression]) = {
+ splitConjunctivePredicates(condition).partition(
+ isPredicateMetadataOnly(_, partitionColumns, spark))
+ }
+
+ /**
+ * Check if condition can be evaluated using only metadata. In Delta, this
means the condition
+ * only references partition columns and involves no subquery.
+ */
+ def isPredicateMetadataOnly(
+ condition: Expression,
+ partitionColumns: Seq[String],
+ spark: SparkSession): Boolean = {
+ isPredicatePartitionColumnsOnly(condition, partitionColumns, spark) &&
+ !containsSubquery(condition)
+ }
+
+ /**
+ * Does the predicate only contains partition columns?
+ */
+ def isPredicatePartitionColumnsOnly(
+ condition: Expression,
+ partitionColumns: Seq[String],
+ spark: SparkSession): Boolean = {
+ val nameEquality = spark.sessionState.analyzer.resolver
+ condition.references.forall { r =>
+ partitionColumns.exists(nameEquality(r.name, _))
+ }
+ }
+
+ /**
+ * Check if condition involves a subquery expression.
+ */
+ def containsSubquery(condition: Expression): Boolean = {
+ SubqueryExpression.hasSubquery(condition)
+ }
+
+ /**
+ * Wrapper `readFunction` to deal with [[ColumnarBatch]] when enable parquet
vectorized reader.
+ */
+ def readParquetFile(
+ file: PartitionedFile,
+ readFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] =
{
+ val fileIterator = readFunction(file)
+ val rows = fileIterator.flatMap(_ match {
+ case r: InternalRow => Seq(r)
+ case b: ColumnarBatch => b.rowIterator().asScala
+ })
+ rows
+ }
+
+ /**
+ * Extract the required schema from [[InternalRow]]
+ */
+ def extractRequiredSchema(
+ iter: Iterator[InternalRow],
+ requiredSchema: StructType,
+ requiredFieldPos: Seq[Int]): Iterator[InternalRow] = {
+ val unsafeProjection = UnsafeProjection.create(requiredSchema)
+ val rows = iter.map { row =>
+ unsafeProjection(createInternalRowWithSchema(row, requiredSchema,
requiredFieldPos))
+ }
+ rows
+ }
+
+ /**
+ * Convert [[InternalRow]] to [[SpecificInternalRow]].
+ */
+ def createInternalRowWithSchema(
+ row: InternalRow,
+ schema: StructType,
+ positions: Seq[Int]): InternalRow = {
+ val rowToReturn = new SpecificInternalRow(schema)
+ var curIndex = 0
+ schema.zip(positions).foreach { case (field, pos) =>
+ val curField = if (row.isNullAt(pos)) {
+ null
+ } else {
+ row.get(pos, field.dataType)
+ }
+ rowToReturn.update(curIndex, curField)
+ curIndex += 1
+ }
+ rowToReturn
+ }
+
+
+ def splitFiles(
+ sparkSession: SparkSession,
+ file: FileStatus,
+ partitionValues: InternalRow): Seq[PartitionedFile] = {
+ val filePath = file.getPath
+ val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
+ (0L until file.getLen by maxSplitBytes).map { offset =>
+ val remaining = file.getLen - offset
+ val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
+ PartitionedFile(partitionValues, filePath.toUri.toString, offset, size)
+ }
+ }
+
+ def getFilePartitions(
Review comment:
Please add a comment elaborating on that, and making it clear that this
is a copy of `FilePartitions.getFilePartitions`
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hadoop.fs.FileStatus
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Expression, PredicateHelper,
SpecificInternalRow, SubqueryExpression, UnsafeProjection}
+import org.apache.spark.sql.execution.datasources.{FilePartition,
PartitionedFile}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+object HoodieDataSourceHelper extends PredicateHelper {
+
+ /**
+ * Partition the given condition into two sequence of conjunctive predicates:
+ * - predicates that can be evaluated using metadata only.
+ * - other predicates.
+ */
+ def splitPartitionAndDataPredicates(
+ condition: Expression,
+ partitionColumns: Seq[String],
+ spark: SparkSession): (Seq[Expression], Seq[Expression]) = {
+ splitConjunctivePredicates(condition).partition(
+ isPredicateMetadataOnly(_, partitionColumns, spark))
+ }
+
+ /**
+ * Check if condition can be evaluated using only metadata. In Delta, this
means the condition
+ * only references partition columns and involves no subquery.
+ */
+ def isPredicateMetadataOnly(
+ condition: Expression,
+ partitionColumns: Seq[String],
+ spark: SparkSession): Boolean = {
+ isPredicatePartitionColumnsOnly(condition, partitionColumns, spark) &&
+ !containsSubquery(condition)
+ }
+
+ /**
+ * Does the predicate only contains partition columns?
+ */
+ def isPredicatePartitionColumnsOnly(
+ condition: Expression,
+ partitionColumns: Seq[String],
+ spark: SparkSession): Boolean = {
+ val nameEquality = spark.sessionState.analyzer.resolver
+ condition.references.forall { r =>
+ partitionColumns.exists(nameEquality(r.name, _))
+ }
+ }
+
+ /**
+ * Check if condition involves a subquery expression.
+ */
+ def containsSubquery(condition: Expression): Boolean = {
+ SubqueryExpression.hasSubquery(condition)
+ }
+
+ /**
+ * Wrapper `readFunction` to deal with [[ColumnarBatch]] when enable parquet
vectorized reader.
+ */
+ def readParquetFile(
+ file: PartitionedFile,
+ readFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] =
{
+ val fileIterator = readFunction(file)
+ val rows = fileIterator.flatMap(_ match {
+ case r: InternalRow => Seq(r)
+ case b: ColumnarBatch => b.rowIterator().asScala
+ })
+ rows
+ }
+
+ /**
+ * Extract the required schema from [[InternalRow]]
+ */
+ def extractRequiredSchema(
+ iter: Iterator[InternalRow],
+ requiredSchema: StructType,
+ requiredFieldPos: Seq[Int]): Iterator[InternalRow] = {
+ val unsafeProjection = UnsafeProjection.create(requiredSchema)
+ val rows = iter.map { row =>
+ unsafeProjection(createInternalRowWithSchema(row, requiredSchema,
requiredFieldPos))
+ }
+ rows
+ }
+
+ /**
+ * Convert [[InternalRow]] to [[SpecificInternalRow]].
+ */
+ def createInternalRowWithSchema(
+ row: InternalRow,
+ schema: StructType,
+ positions: Seq[Int]): InternalRow = {
+ val rowToReturn = new SpecificInternalRow(schema)
+ var curIndex = 0
+ schema.zip(positions).foreach { case (field, pos) =>
+ val curField = if (row.isNullAt(pos)) {
+ null
+ } else {
+ row.get(pos, field.dataType)
+ }
+ rowToReturn.update(curIndex, curField)
+ curIndex += 1
+ }
+ rowToReturn
+ }
+
+
+ def splitFiles(
+ sparkSession: SparkSession,
+ file: FileStatus,
+ partitionValues: InternalRow): Seq[PartitionedFile] = {
+ val filePath = file.getPath
+ val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
+ (0L until file.getLen by maxSplitBytes).map { offset =>
+ val remaining = file.getLen - offset
+ val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
+ PartitionedFile(partitionValues, filePath.toUri.toString, offset, size)
+ }
+ }
+
+ def getFilePartitions(
Review comment:
We should actually use native impl where possible, let's actually add
this method to SparkAdapter and only place it for Spark 2 impls
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.execution.QueryExecutionException
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.datasources.{FilePartition,
PartitionedFile, SchemaColumnConvertNotSupportedException}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Similar to [[org.apache.spark.sql.execution.datasources.FileScanRDD]].
+ *
+ * This class will extract the fields needed according to [[requiredColumns]]
and
+ * return iterator of [[org.apache.spark.sql.Row]] directly.
+ */
+class HoodieFileScanRDD(
+ @transient private val sparkSession: SparkSession,
+ requiredColumns: Array[String],
+ schema: StructType,
+ readFunction: PartitionedFile => Iterator[Any],
+ @transient val filePartitions: Seq[FilePartition])
+ extends RDD[Row](sparkSession.sparkContext, Nil) {
+
+ private val requiredSchema = {
+ val nameToStructField = schema.map(field => (field.name, field)).toMap
+ StructType(requiredColumns.map(nameToStructField))
+ }
+
+ private val requiredFieldPos = requiredSchema.map(f =>
schema.fieldIndex(f.name))
+
+ override def compute(split: Partition, context: TaskContext): Iterator[Row]
= {
+ val iterator = new Iterator[Object] with AutoCloseable {
+
+ private[this] val files =
split.asInstanceOf[FilePartition].files.toIterator
+ private[this] var currentFile: PartitionedFile = null
+ private[this] var currentIterator: Iterator[Object] = null
+
+ override def hasNext: Boolean = {
+ (currentIterator != null && currentIterator.hasNext) || nextIterator()
+ }
+
+ def next(): Object = {
+ currentIterator.next()
+ }
+
+ /** Advances to the next file. Returns true if a new non-empty iterator
is available. */
+ private def nextIterator(): Boolean = {
+ if (files.hasNext) {
+ currentFile = files.next()
+
+ logInfo(s"Reading File $currentFile")
+ currentIterator =
HoodieDataSourceHelper.readParquetFile(currentFile, readFunction)
+
+ try {
+ hasNext
+ } catch {
+ case e: SchemaColumnConvertNotSupportedException =>
Review comment:
Fair enough
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -61,26 +64,31 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
} else {
new Properties()
}
+
+ private val requiredSchema = tableState.requiredStructSchema
+
+ private val requiredFieldPosition = requiredSchema.map(f =>
tableState.tableStructSchema.fieldIndex(f.name))
+
override def compute(split: Partition, context: TaskContext):
Iterator[InternalRow] = {
val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
val iter = mergeOnReadPartition.split match {
case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
- val rows = read(dataFileOnlySplit.dataFile.get,
requiredSchemaFileReader)
- extractRequiredSchema(rows)
+ val rows = readParquetFile(dataFileOnlySplit.dataFile.get,
requiredSchemaFileReader)
+ extractRequiredSchema(rows, requiredSchema, requiredFieldPosition)
Review comment:
Can you elaborate what you mean by not working well?
Projecting should be done by Parquet Reader to make sure we're reading only
strictly necessary data in, but no more.
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.TableSchemaResolver
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.execution.datasources.{PartitionedFile, _}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
+import org.apache.spark.sql.types.{BooleanType, StructType}
+
+import scala.util.Try
+
+class BaseFileOnlyViewRelation(
+ val sqlContext: SQLContext,
+ metaClient: HoodieTableMetaClient,
+ optParams: Map[String, String],
+ userSchema: StructType
+ ) extends BaseRelation with PrunedFilteredScan {
+
+ private val sparkSession = sqlContext.sparkSession
+
+ private val tableAvroSchema = {
+ val schemaUtil = new TableSchemaResolver(metaClient)
+ Try
(schemaUtil.getTableAvroSchema).getOrElse(SchemaConverters.toAvroType(userSchema))
+ }
+
+ private val tableStructSchema =
AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
+
+ private val fileIndex = HoodieFileIndex(sparkSession,
+ metaClient,
+ if (userSchema == null) Option.empty[StructType] else Some(userSchema),
+ optParams,
+ FileStatusCache.getOrCreate(sqlContext.sparkSession)
+ )
+
+ private val partitionColumns =
metaClient.getTableConfig.getPartitionFields.orElse(Array.empty)
+
+ override def schema: StructType = tableStructSchema
+
+ override def buildScan(requiredColumns: Array[String], filters:
Array[Filter]): RDD[Row] = {
Review comment:
Feel free to do this in a follow-up so that we don't overload a single PR
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
##########
@@ -107,37 +107,35 @@ class HoodieBootstrapRelation(@transient val _sqlContext:
SQLContext,
})
// Prepare readers for reading data file and skeleton files
- val dataReadFunction = new ParquetFileFormat()
- .buildReaderWithPartitionValues(
- sparkSession = _sqlContext.sparkSession,
- dataSchema = dataSchema,
- partitionSchema = StructType(Seq.empty),
- requiredSchema = requiredDataSchema,
- filters = if (requiredSkeletonSchema.isEmpty) filters else Seq() ,
- options = Map.empty,
- hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()
- )
-
- val skeletonReadFunction = new ParquetFileFormat()
- .buildReaderWithPartitionValues(
- sparkSession = _sqlContext.sparkSession,
- dataSchema = skeletonSchema,
- partitionSchema = StructType(Seq.empty),
- requiredSchema = requiredSkeletonSchema,
- filters = if (requiredDataSchema.isEmpty) filters else Seq(),
- options = Map.empty,
- hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()
- )
-
- val regularReadFunction = new ParquetFileFormat()
- .buildReaderWithPartitionValues(
- sparkSession = _sqlContext.sparkSession,
- dataSchema = fullSchema,
- partitionSchema = StructType(Seq.empty),
- requiredSchema = requiredColsSchema,
- filters = filters,
- options = Map.empty,
- hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf())
+ val dataReadFunction = HoodieDataSourceHelper.buildHoodieParquetReader(
+ sparkSession = _sqlContext.sparkSession,
Review comment:
Any good reason to have that "_" prefix?
##########
File path:
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
##########
@@ -293,4 +293,21 @@ object HoodieSparkUtils extends SparkAdapterSupport {
s"${tableSchema.fieldNames.mkString(",")}")
AttributeReference(columnName, field.get.dataType, field.get.nullable)()
}
+
+ def getRequiredSchema(tableAvroSchema: Schema, requiredColumns:
Array[String]): (Schema, StructType) = {
+ // First get the required avro-schema, then convert the avro-schema to
spark schema.
+ val name2Fields = tableAvroSchema.getFields.asScala.map(f => f.name() ->
f).toMap
+ val requiredFields = requiredColumns.map(c => name2Fields(c))
+ .map(f => new Schema.Field(f.name(), f.schema(), f.doc(),
f.defaultVal(), f.order())).toList
Review comment:
Got it. Yeah, not sure i fully grasp what this is for. Would suggest to
try to clean this up
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]