yihua commented on code in PR #10137:
URL: https://github.com/apache/hudi/pull/10137#discussion_r1411117772
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java:
##########
@@ -123,4 +125,11 @@ private Object getFieldValueFromInternalRow(InternalRow
row, Schema recordSchema
return null;
}
}
+
+ @Override
+ public UnaryOperator<InternalRow> projectRecord(Schema from, Schema to) {
+ UnsafeProjection projection =
HoodieInternalRowUtils.getCachedUnsafeProjection(AvroConversionUtils.convertAvroSchemaToStructType(from),
Review Comment:
Use `HoodieInternalRowUtils.getCachedSchema(schema)` to avoid parsing Avro
schema per record?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java:
##########
@@ -449,7 +449,8 @@ private static void validateRow(InternalRow data,
StructType schema) {
data instanceof HoodieInternalRow
|| data instanceof GenericInternalRow
|| data instanceof SpecificInternalRow
- ||
SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data));
+ ||
SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data))
+ || data instanceof JoinedRow;
Review Comment:
Partition value is empty for non-partitioned tables.
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -77,14 +78,18 @@ class
SparkFileFormatInternalRowReaderContext(baseFileReader: Option[Partitioned
}
}).asInstanceOf[ClosableIterator[InternalRow]]
} else {
- if (baseFileReader.isEmpty) {
- throw new IllegalArgumentException("Base file reader is missing when
instantiating "
- + "SparkFileFormatInternalRowReaderContext.");
+ val key = generateKey(dataSchema, requiredSchema)
Review Comment:
nit: rename to `schemaPairHashKey`
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -49,8 +51,7 @@ import scala.collection.mutable
* not required for reading a file group with only log
files.
* @param partitionValues The values for a partition in which the file group
lives.
*/
-class SparkFileFormatInternalRowReaderContext(baseFileReader:
Option[PartitionedFile => Iterator[InternalRow]],
- partitionValues: InternalRow)
extends BaseSparkInternalRowReaderContext {
+class SparkFileFormatInternalRowReaderContext(readerMaps: mutable.Map[Long,
PartitionedFile => Iterator[InternalRow]]) extends
BaseSparkInternalRowReaderContext {
Review Comment:
Add docs on `readerMaps`
##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java:
##########
@@ -218,17 +220,102 @@ private static boolean isProjectionOfInternal(Schema
sourceSchema,
return atomicTypeEqualityPredicate.apply(sourceSchema, targetSchema);
}
+ public static Option<Schema.Field> findNestedField(Schema schema, String
fieldName) {
+ return findNestedField(schema, fieldName.split("\\."), 0);
+ }
+
+ private static Option<Schema.Field> findNestedField(Schema schema, String[]
fieldParts, int index) {
+ if (schema.getType().equals(Schema.Type.UNION)) {
+ Option<Schema.Field> notUnion =
findNestedField(resolveNullableSchema(schema), fieldParts, index);
+ if (!notUnion.isPresent()) {
+ return Option.empty();
+ }
+ Schema.Field nu = notUnion.get();
+ return Option.of(new Schema.Field(nu.name(), nu.schema(), nu.doc(),
nu.defaultVal()));
+ }
+ if (fieldParts.length <= index) {
+ return Option.empty();
+ }
+
+ Schema.Field foundField = schema.getField(fieldParts[index]);
+ if (foundField == null) {
+ return Option.empty();
+ }
+
+ if (index == fieldParts.length - 1) {
+ return Option.of(new Schema.Field(foundField.name(),
foundField.schema(), foundField.doc(), foundField.defaultVal()));
+ }
+
+ Schema foundSchema = foundField.schema();
+ Option<Schema.Field> nestedPart = findNestedField(foundSchema, fieldParts,
index + 1);
+ if (!nestedPart.isPresent()) {
+ return Option.empty();
+ }
+ //temporary, need to match HoodieFileGroupReaderBasedParquetFileFormat for
now
+ return nestedPart;
+ /*
+ boolean isUnion = false;
+ if (foundSchema.getType().equals(Schema.Type.UNION)) {
+ isUnion = true;
+ foundSchema = resolveNullableSchema(foundSchema);
+ }
+
+ Schema newSchema = Schema.createRecord(foundSchema.getName(),
foundSchema.getDoc(), foundSchema.getNamespace(), false,
Collections.singletonList(nestedPart.get()));
+ return Option.of(new Schema.Field(foundField.name(), isUnion ?
createNullableSchema(newSchema) : newSchema, foundField.doc(),
foundField.defaultVal()));
+ */
Review Comment:
Remove unused code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]