xiarixiaoyao commented on code in PR #5708:
URL: https://github.com/apache/hudi/pull/5708#discussion_r928122799
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -645,17 +642,45 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
object HoodieBaseRelation extends SparkAdapterSupport {
- type BaseFileReader = PartitionedFile => Iterator[InternalRow]
+ case class BaseFileReader(read: PartitionedFile => Iterator[InternalRow],
val schema: StructType) {
+ def apply(file: PartitionedFile): Iterator[InternalRow] = read.apply(file)
+ }
- private def generateUnsafeProjection(from: StructType, to: StructType) =
- sparkAdapter.getCatalystExpressionUtils().generateUnsafeProjection(from,
to)
+ def generateUnsafeProjection(from: StructType, to: StructType):
UnsafeProjection =
+ sparkAdapter.getCatalystExpressionUtils.generateUnsafeProjection(from, to)
def convertToAvroSchema(structSchema: StructType): Schema =
sparkAdapter.getAvroSchemaConverters.toAvroType(structSchema, nullable =
false, "Record")
def getPartitionPath(fileStatus: FileStatus): Path =
fileStatus.getPath.getParent
+ /**
+ * Projects provided file reader's output from its original schema, into a
[[requiredSchema]]
+ *
+ * NOTE: [[requiredSchema]] has to be a proper subset of the file reader's
schema
+ *
+ * @param reader file reader to be projected
+ * @param requiredSchema target schema for the output of the provided file
reader
+ */
+ def projectReader(reader: BaseFileReader, requiredSchema: StructType):
BaseFileReader = {
+
checkState(reader.schema.fields.toSet.intersect(requiredSchema.fields.toSet).size
== requiredSchema.size)
+
+ if (reader.schema == requiredSchema) {
+ reader
+ } else {
+ val read = reader.apply(_)
+ val projectedRead: PartitionedFile => Iterator[InternalRow] = (file:
PartitionedFile) => {
+ // NOTE: Projection is not a serializable object, hence it creation
should only happen w/in
+ // the executor process
+ val unsafeProjection = generateUnsafeProjection(reader.schema,
requiredSchema)
+ read(file).map(unsafeProjection)
+ }
+
+ BaseFileReader(projectedRead, requiredSchema)
Review Comment:
why we still need requriedSchema?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]