alexeykudinkin commented on code in PR #5708:
URL: https://github.com/apache/hudi/pull/5708#discussion_r928145861
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -274,7 +274,7 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
def canPruneRelationSchema: Boolean =
(fileFormat.isInstanceOf[ParquetFileFormat] ||
fileFormat.isInstanceOf[OrcFileFormat]) &&
// NOTE: Some relations might be disabling sophisticated schema pruning
techniques (for ex, nested schema pruning)
- // TODO(HUDI-XXX) internal schema doesn't supported nested schema
pruning currently
+ // TODO(HUDI-XXX) internal schema doesn't support nested schema pruning
currently
Review Comment:
You mean, JIRA, right? Will do
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -645,17 +642,45 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
object HoodieBaseRelation extends SparkAdapterSupport {
- type BaseFileReader = PartitionedFile => Iterator[InternalRow]
+ case class BaseFileReader(read: PartitionedFile => Iterator[InternalRow],
val schema: StructType) {
+ def apply(file: PartitionedFile): Iterator[InternalRow] = read.apply(file)
+ }
- private def generateUnsafeProjection(from: StructType, to: StructType) =
- sparkAdapter.getCatalystExpressionUtils().generateUnsafeProjection(from,
to)
+ def generateUnsafeProjection(from: StructType, to: StructType):
UnsafeProjection =
+ sparkAdapter.getCatalystExpressionUtils.generateUnsafeProjection(from, to)
def convertToAvroSchema(structSchema: StructType): Schema =
sparkAdapter.getAvroSchemaConverters.toAvroType(structSchema, nullable =
false, "Record")
def getPartitionPath(fileStatus: FileStatus): Path =
fileStatus.getPath.getParent
+ /**
+ * Projects provided file reader's output from its original schema, into a
[[requiredSchema]]
+ *
+ * NOTE: [[requiredSchema]] has to be a proper subset of the file reader's
schema
+ *
+ * @param reader file reader to be projected
+ * @param requiredSchema target schema for the output of the provided file
reader
+ */
+ def projectReader(reader: BaseFileReader, requiredSchema: StructType):
BaseFileReader = {
+
checkState(reader.schema.fields.toSet.intersect(requiredSchema.fields.toSet).size
== requiredSchema.size)
+
+ if (reader.schema == requiredSchema) {
+ reader
+ } else {
+ val read = reader.apply(_)
+ val projectedRead: PartitionedFile => Iterator[InternalRow] = (file:
PartitionedFile) => {
+ // NOTE: Projection is not a serializable object, hence it creation
should only happen w/in
+ // the executor process
+ val unsafeProjection = generateUnsafeProjection(reader.schema,
requiredSchema)
+ read(file).map(unsafeProjection)
+ }
+
+ BaseFileReader(projectedRead, requiredSchema)
Review Comment:
Please check my comment where this method is used for an example: whenever
we prune partition columns, ordering of the columns would change (partition
ones will be removed and then appended to the resulting schema) therefore
without projecting back into required schema caller will get dataset that will
have incorrect ordering of the columns
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]