alexeykudinkin commented on code in PR #7003:
URL: https://github.com/apache/hudi/pull/7003#discussion_r1030869820
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java:
##########
@@ -231,7 +229,11 @@ public MapData getMap(int ordinal) {
@Override
public InternalRow copy() {
- return new HoodieInternalRow(Arrays.copyOf(metaFields, metaFields.length),
sourceRow.copy(), sourceContainsMetaFields);
+ UTF8String[] copyMetaFields = new UTF8String[metaFields.length];
+ for (int i = 0; i < metaFields.length; i++) {
+ copyMetaFields[i] = metaFields[i] != null ? metaFields[i].copy() : null;
Review Comment:
Good catch!
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala:
##########
@@ -219,11 +237,34 @@ object HoodieInternalRowUtils {
schemaMap.get(schema)
}
+ def existField(structType: StructType, name: String): Boolean = {
+ val stack = new mutable.Stack[(StructType, String)]
Review Comment:
Oh, i totally forgot that we actually have `composeNestedFieldPath` that has
already been optimized for such retrievals. Let's modify it to be able to use
it here and we should be good to go
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala:
##########
@@ -219,11 +237,34 @@ object HoodieInternalRowUtils {
schemaMap.get(schema)
}
+ def existField(structType: StructType, name: String): Boolean = {
+ val stack = new mutable.Stack[(StructType, String)]
Review Comment:
Instead of stacking, can we just split the name once and retrieve nested one
in a single pass?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java:
##########
@@ -183,11 +182,13 @@ public HoodieRecord rewriteRecord(Schema recordSchema,
Properties props, Schema
StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
StructType targetStructType =
HoodieInternalRowUtils.getCachedSchema(targetSchema);
- boolean containMetaFields = hasMetaFields(structType);
- UTF8String[] metaFields = tryExtractMetaFields(data, structType);
+ InternalRow rewriteRecord =
HoodieInternalRowUtils.rewriteRecord(this.data, structType, targetStructType);
+ UnsafeRow unsafeRow =
HoodieInternalRowUtils.getCachedUnsafeProjection(targetStructType,
targetStructType).apply(rewriteRecord);
- // TODO add actual rewriting
- InternalRow finalRow = new HoodieInternalRow(metaFields, data,
containMetaFields);
+ boolean containMetaFields = hasMetaFields(targetStructType);
+ UTF8String[] metaFields = tryExtractMetaFields(unsafeRow,
targetStructType);
+ HoodieInternalRow internalRow = new HoodieInternalRow(metaFields,
unsafeRow, containMetaFields);
+ InternalRow finalRow = copy ? internalRow.copy() : internalRow;
Review Comment:
Seems like my previous comment got lost: why do we want to do a copy here?
We can just reset copy flag in the new record we create and therefore make
reader to do the copying (if needed)
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala:
##########
@@ -467,292 +448,288 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
.setConf(spark.sessionState.newHadoopConf())
.build()
assertResult(metaClient.getTableConfig.getTableName)(tableName)
- }
+ })
}
test("Test Insert Exception") {
- val tableName = generateTableName
Review Comment:
@wzx140 we didn't modify the test itself, did we?
GH UI shows it as if the whole test had been rewritten which is very hard to
decipher
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java:
##########
@@ -194,6 +185,13 @@ private RecordIterator(Schema readerSchema, Schema
writerSchema, byte[] content,
public static RecordIterator getInstance(HoodieAvroDataBlock dataBlock,
byte[] content, InternalSchema internalSchema) throws IOException {
// Get schema from the header
Schema writerSchema = new
Schema.Parser().parse(dataBlock.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
+ if (!internalSchema.isEmptySchema()) {
Review Comment:
@wzx140 yes, that's the plan. #6358 will be merged today.
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala:
##########
@@ -72,8 +72,46 @@ object HoodieInternalRowUtils {
} else {
newRow.update(pos, oldValue)
}
- case _ =>
- newRow.update(pos, oldValue)
+ case t if t == oldType => newRow.update(pos, oldValue)
+ // Type promotion
+ case _: ShortType =>
+ oldType match {
+ case _: ByteType => newRow.update(pos,
oldValue.asInstanceOf[Byte].toShort)
+ case _ => throw new IllegalArgumentException(s"$oldSchema and
$newSchema are incompatible")
+ }
+ case _: IntegerType =>
+ oldType match {
+ case _: ByteType => newRow.update(pos,
oldValue.asInstanceOf[Byte].toInt)
+ case _: ShortType => newRow.update(pos,
oldValue.asInstanceOf[Short].toInt)
+ case _ => throw new IllegalArgumentException(s"$oldSchema and
$newSchema are incompatible")
+ }
+ case _: LongType =>
+ oldType match {
+ case _: ByteType => newRow.update(pos,
oldValue.asInstanceOf[Byte].toLong)
+ case _: ShortType => newRow.update(pos,
oldValue.asInstanceOf[Short].toLong)
+ case _: IntegerType => newRow.update(pos,
oldValue.asInstanceOf[Int].toLong)
+ case _ => throw new IllegalArgumentException(s"$oldSchema and
$newSchema are incompatible")
+ }
+ case _: FloatType =>
+ oldType match {
+ case _: ByteType => newRow.update(pos,
oldValue.asInstanceOf[Byte].toFloat)
+ case _: ShortType => newRow.update(pos,
oldValue.asInstanceOf[Short].toFloat)
+ case _: IntegerType => newRow.update(pos,
oldValue.asInstanceOf[Int].toFloat)
+ case _: LongType => newRow.update(pos,
oldValue.asInstanceOf[Long].toFloat)
+ case _ => throw new IllegalArgumentException(s"$oldSchema and
$newSchema are incompatible")
+ }
+ case _: DoubleType =>
+ oldType match {
+ case _: ByteType => newRow.update(pos,
oldValue.asInstanceOf[Byte].toDouble)
+ case _: ShortType => newRow.update(pos,
oldValue.asInstanceOf[Short].toDouble)
+ case _: IntegerType => newRow.update(pos,
oldValue.asInstanceOf[Int].toDouble)
+ case _: LongType => newRow.update(pos,
oldValue.asInstanceOf[Long].toDouble)
+ case _: FloatType => newRow.update(pos,
oldValue.asInstanceOf[Float].toDouble)
+ case _ => throw new IllegalArgumentException(s"$oldSchema and
$newSchema are incompatible")
+ }
+ case _: BinaryType if oldType.isInstanceOf[StringType] =>
newRow.update(pos, oldValue.asInstanceOf[String].getBytes)
Review Comment:
Yeah, we need to unify these. I'm less concerned regarding Avro though, b/c
Spark is not interfacing w/ Avro (directly) so we're unlikely to hit an issue
w/ it
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java:
##########
@@ -183,11 +182,13 @@ public HoodieRecord rewriteRecord(Schema recordSchema,
Properties props, Schema
StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
StructType targetStructType =
HoodieInternalRowUtils.getCachedSchema(targetSchema);
- boolean containMetaFields = hasMetaFields(structType);
- UTF8String[] metaFields = tryExtractMetaFields(data, structType);
+ InternalRow rewriteRecord =
HoodieInternalRowUtils.rewriteRecord(this.data, structType, targetStructType);
Review Comment:
Let's create a ticket and add TODO to revisit rewriting to instead rely on
`UnsafeRowWriter`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]