alexeykudinkin commented on code in PR #7003:
URL: https://github.com/apache/hudi/pull/7003#discussion_r1028517715
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala:
##########
@@ -222,83 +223,85 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
Seq(3, "a3", 30.0, 3000, "20210103", "03"),
Seq(4, "a4", 40.0, 4000, "20210104", "04")
)
- }
+ })
}
test("Test Insert Into None Partitioned Table") {
- withTempDir { tmp =>
- val tableName = generateTableName
- spark.sql(s"set hoodie.sql.insert.mode=strict")
- // Create none partitioned cow table
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | price double,
- | ts long
- |) using hudi
- | location '${tmp.getCanonicalPath}/$tableName'
- | tblproperties (
- | type = 'cow',
- | primaryKey = 'id',
- | preCombineField = 'ts'
- | )
- """.stripMargin)
- spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
- checkAnswer(s"select id, name, price, ts from $tableName")(
- Seq(1, "a1", 10.0, 1000)
- )
- spark.sql(s"insert into $tableName select 2, 'a2', 12, 1000")
- checkAnswer(s"select id, name, price, ts from $tableName")(
- Seq(1, "a1", 10.0, 1000),
- Seq(2, "a2", 12.0, 1000)
- )
-
- assertThrows[HoodieDuplicateKeyException] {
- try {
- spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
- } catch {
- case e: Exception =>
- var root: Throwable = e
- while (root.getCause != null) {
- root = root.getCause
- }
- throw root
- }
- }
- // Create table with dropDup is true
- val tableName2 = generateTableName
- spark.sql("set hoodie.datasource.write.insert.drop.duplicates = true")
- spark.sql(
- s"""
- |create table $tableName2 (
- | id int,
- | name string,
- | price double,
- | ts long
- |) using hudi
- | location '${tmp.getCanonicalPath}/$tableName2'
- | tblproperties (
- | type = 'mor',
- | primaryKey = 'id',
- | preCombineField = 'ts'
- | )
- """.stripMargin)
- spark.sql(s"insert into $tableName2 select 1, 'a1', 10, 1000")
- // This record will be drop when dropDup is true
- spark.sql(s"insert into $tableName2 select 1, 'a1', 12, 1000")
- checkAnswer(s"select id, name, price, ts from $tableName2")(
- Seq(1, "a1", 10.0, 1000)
- )
- // disable this config to avoid affect other test in this class.
- spark.sql("set hoodie.datasource.write.insert.drop.duplicates = false")
- spark.sql(s"set hoodie.sql.insert.mode=upsert")
- }
+ withRecordType(Map(HoodieRecordType.SPARK ->
+ Map(HoodieWriteConfig.MERGER_IMPLS.key ->
+
classOf[HoodieSparkValidateDuplicateKeyRecordMerger].getName)))(withTempDir {
tmp =>
Review Comment:
Please add a comment elaborating why we're using
`HoodieSparkValidateDuplicateKeyRecordMerger` in here (basically what you
mentioned in a comment)
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala:
##########
@@ -170,4 +174,44 @@ class HoodieSparkSqlTestBase extends FunSuite with
BeforeAndAfterAll {
val fs = FSUtils.getFs(filePath, spark.sparkContext.hadoopConfiguration)
fs.exists(path)
}
+
+ protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+ val conf = spark.sessionState.conf
+ val currentValues = pairs.unzip._1.map { k =>
+ if (conf.contains(k)) {
+ Some(conf.getConfString(k))
+ } else None
+ }
+ pairs.foreach { case(k, v) => conf.setConfString(k, v) }
+ try f finally {
+ pairs.unzip._1.zip(currentValues).foreach {
+ case (key, Some(value)) => conf.setConfString(key, value)
+ case (key, None) => conf.unsetConf(key)
+ }
+ }
+ }
+
+ protected def withRecordType(recordConfig: Map[HoodieRecordType, Map[String,
String]]=Map.empty)(f: => Unit) {
+ Seq(HoodieRecordType.AVRO, HoodieRecordType.SPARK).foreach { recordType =>
+ val (merger, format) = recordType match {
+ case HoodieRecordType.SPARK =>
(classOf[HoodieSparkRecordMerger].getName, "parquet")
+ case _ => (classOf[HoodieAvroRecordMerger].getName, "avro")
Review Comment:
No need to address in this PR, but let's create a ticket to make sure we
also test Avro record / Parquet log-file combination
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java:
##########
@@ -183,28 +182,27 @@ public HoodieRecord rewriteRecord(Schema recordSchema,
Properties props, Schema
StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
StructType targetStructType =
HoodieInternalRowUtils.getCachedSchema(targetSchema);
- boolean containMetaFields = hasMetaFields(structType);
+ boolean containMetaFields = hasMetaFields(targetStructType);
UTF8String[] metaFields = tryExtractMetaFields(data, structType);
+ InternalRow rewriteRecord =
HoodieInternalRowUtils.rewriteRecord(this.data, structType, targetStructType);
+ UnsafeRow unsafeRow =
HoodieInternalRowUtils.getCachedUnsafeProjection(targetStructType,
targetStructType).apply(rewriteRecord);
+ InternalRow finalRow = copy ? unsafeRow.copy() : unsafeRow;
Review Comment:
I appreciate the intent, but we should keep all copying decisions to be
committed by the user of `HoodieSparkRecord` -- we should only do the copying,
where's absolutely necessary. Let's move this copying to a place where it's
actually required.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java:
##########
@@ -61,7 +61,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
private final boolean enablePointLookups;
- protected final Schema readerSchema;
+ protected Schema readerSchema;
Review Comment:
@wzx140 ^
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java:
##########
@@ -194,6 +185,13 @@ private RecordIterator(Schema readerSchema, Schema
writerSchema, byte[] content,
public static RecordIterator getInstance(HoodieAvroDataBlock dataBlock,
byte[] content, InternalSchema internalSchema) throws IOException {
// Get schema from the header
Schema writerSchema = new
Schema.Parser().parse(dataBlock.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
+ if (!internalSchema.isEmptySchema()) {
Review Comment:
We should not be modifying data-block itself
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java:
##########
@@ -183,28 +182,27 @@ public HoodieRecord rewriteRecord(Schema recordSchema,
Properties props, Schema
StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
StructType targetStructType =
HoodieInternalRowUtils.getCachedSchema(targetSchema);
- boolean containMetaFields = hasMetaFields(structType);
+ boolean containMetaFields = hasMetaFields(targetStructType);
UTF8String[] metaFields = tryExtractMetaFields(data, structType);
+ InternalRow rewriteRecord =
HoodieInternalRowUtils.rewriteRecord(this.data, structType, targetStructType);
+ UnsafeRow unsafeRow =
HoodieInternalRowUtils.getCachedUnsafeProjection(targetStructType,
targetStructType).apply(rewriteRecord);
+ InternalRow finalRow = copy ? unsafeRow.copy() : unsafeRow;
- // TODO add actual rewriting
- InternalRow finalRow = new HoodieInternalRow(metaFields, data,
containMetaFields);
-
- return new HoodieSparkRecord(getKey(), finalRow, targetStructType,
getOperation(), copy);
+ return new HoodieSparkRecord(getKey(), new HoodieInternalRow(metaFields,
finalRow, containMetaFields), targetStructType, getOperation(), copy);
}
@Override
public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema,
Properties props, Schema newSchema, Map<String, String> renameCols) throws
IOException {
StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
StructType newStructType =
HoodieInternalRowUtils.getCachedSchema(newSchema);
- boolean containMetaFields = hasMetaFields(structType);
+ boolean containMetaFields = hasMetaFields(newStructType);
UTF8String[] metaFields = tryExtractMetaFields(data, structType);
+ InternalRow rewriteRecord =
HoodieInternalRowUtils.rewriteRecordWithNewSchema(this.data, structType,
newStructType, renameCols);
+ UnsafeRow unsafeRow =
HoodieInternalRowUtils.getCachedUnsafeProjection(newStructType,
newStructType).apply(rewriteRecord);
+ InternalRow finalRow = copy ? unsafeRow.copy() : unsafeRow;
Review Comment:
Same comment as above
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala:
##########
@@ -72,8 +72,46 @@ object HoodieInternalRowUtils {
} else {
newRow.update(pos, oldValue)
}
- case _ =>
- newRow.update(pos, oldValue)
+ case t if t == oldType => newRow.update(pos, oldValue)
+ // Type promotion
+ case _: ShortType =>
+ oldType match {
+ case _: ByteType => newRow.update(pos,
oldValue.asInstanceOf[Byte].toShort)
+ case _ => throw new IllegalArgumentException(s"$oldSchema and
$newSchema are incompatible")
+ }
+ case _: IntegerType =>
+ oldType match {
+ case _: ByteType => newRow.update(pos,
oldValue.asInstanceOf[Byte].toInt)
+ case _: ShortType => newRow.update(pos,
oldValue.asInstanceOf[Short].toInt)
+ case _ => throw new IllegalArgumentException(s"$oldSchema and
$newSchema are incompatible")
+ }
+ case _: LongType =>
+ oldType match {
+ case _: ByteType => newRow.update(pos,
oldValue.asInstanceOf[Byte].toLong)
+ case _: ShortType => newRow.update(pos,
oldValue.asInstanceOf[Short].toLong)
+ case _: IntegerType => newRow.update(pos,
oldValue.asInstanceOf[Int].toLong)
+ case _ => throw new IllegalArgumentException(s"$oldSchema and
$newSchema are incompatible")
+ }
+ case _: FloatType =>
+ oldType match {
+ case _: ByteType => newRow.update(pos,
oldValue.asInstanceOf[Byte].toFloat)
+ case _: ShortType => newRow.update(pos,
oldValue.asInstanceOf[Short].toFloat)
+ case _: IntegerType => newRow.update(pos,
oldValue.asInstanceOf[Int].toFloat)
+ case _: LongType => newRow.update(pos,
oldValue.asInstanceOf[Long].toFloat)
+ case _ => throw new IllegalArgumentException(s"$oldSchema and
$newSchema are incompatible")
+ }
+ case _: DoubleType =>
+ oldType match {
+ case _: ByteType => newRow.update(pos,
oldValue.asInstanceOf[Byte].toDouble)
+ case _: ShortType => newRow.update(pos,
oldValue.asInstanceOf[Short].toDouble)
+ case _: IntegerType => newRow.update(pos,
oldValue.asInstanceOf[Int].toDouble)
+ case _: LongType => newRow.update(pos,
oldValue.asInstanceOf[Long].toDouble)
+ case _: FloatType => newRow.update(pos,
oldValue.asInstanceOf[Float].toDouble)
+ case _ => throw new IllegalArgumentException(s"$oldSchema and
$newSchema are incompatible")
+ }
+ case _: BinaryType if oldType.isInstanceOf[StringType] =>
newRow.update(pos, oldValue.asInstanceOf[String].getBytes)
Review Comment:
I don't think Binary > String is a safe promotion. Why do we need it?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala:
##########
@@ -467,292 +448,288 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
.setConf(spark.sessionState.newHadoopConf())
.build()
assertResult(metaClient.getTableConfig.getTableName)(tableName)
- }
+ })
}
test("Test Insert Exception") {
- val tableName = generateTableName
Review Comment:
Why do these tests change?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java:
##########
@@ -194,6 +185,13 @@ private RecordIterator(Schema readerSchema, Schema
writerSchema, byte[] content,
public static RecordIterator getInstance(HoodieAvroDataBlock dataBlock,
byte[] content, InternalSchema internalSchema) throws IOException {
// Get schema from the header
Schema writerSchema = new
Schema.Parser().parse(dataBlock.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
+ if (!internalSchema.isEmptySchema()) {
Review Comment:
Take a look at how it's addressed in
https://github.com/apache/hudi/pull/6358, we can replicate the same thing here
to avoid creating more merge conflicts
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]