alexeykudinkin commented on code in PR #7003:
URL: https://github.com/apache/hudi/pull/7003#discussion_r1024786089


##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala:
##########
@@ -170,4 +174,44 @@ class HoodieSparkSqlTestBase extends FunSuite with 
BeforeAndAfterAll {
     val fs = FSUtils.getFs(filePath, spark.sparkContext.hadoopConfiguration)
     fs.exists(path)
   }
+
+  protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+    val conf = spark.sessionState.conf
+    val currentValues = pairs.unzip._1.map { k =>
+      if (conf.contains(k)) {
+        Some(conf.getConfString(k))
+      } else None
+    }
+    pairs.foreach { case(k, v) => conf.setConfString(k, v) }
+    try f finally {
+      pairs.unzip._1.zip(currentValues).foreach {
+        case (key, Some(value)) => conf.setConfString(key, value)
+        case (key, None) => conf.unsetConf(key)
+      }
+    }
+  }
+
+  protected def withRecordType(f: => Unit, recordConfig: Map[HoodieRecordType, 
Map[String, String]]=Map.empty) {

Review Comment:
   Why not structuring it the same way as `witSqlConf`? Like below:
   
   ```
   def withRecordType(recordConfig)(f: => Unit)
   ```



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala:
##########
@@ -170,4 +174,44 @@ class HoodieSparkSqlTestBase extends FunSuite with 
BeforeAndAfterAll {
     val fs = FSUtils.getFs(filePath, spark.sparkContext.hadoopConfiguration)
     fs.exists(path)
   }
+
+  protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+    val conf = spark.sessionState.conf
+    val currentValues = pairs.unzip._1.map { k =>
+      if (conf.contains(k)) {
+        Some(conf.getConfString(k))
+      } else None
+    }
+    pairs.foreach { case(k, v) => conf.setConfString(k, v) }
+    try f finally {
+      pairs.unzip._1.zip(currentValues).foreach {
+        case (key, Some(value)) => conf.setConfString(key, value)
+        case (key, None) => conf.unsetConf(key)
+      }
+    }
+  }
+
+  protected def withRecordType(f: => Unit, recordConfig: Map[HoodieRecordType, 
Map[String, String]]=Map.empty) {
+    Seq(HoodieRecordType.SPARK).foreach { recordType =>

Review Comment:
   Why are we testing just the Spark? 



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala:
##########
@@ -1228,18 +1232,10 @@ class TestMORDataSource extends HoodieClientTestBase 
with SparkDatasetMixin {
       roDf.where(col(recordKeyField) === 
0).select(dataField).collect()(0).getLong(0))
   }
 
-  def getOpts(recordType: HoodieRecordType): (Map[String, String], Map[String, 
String]) = {
-    val writeOpts = if (recordType == HoodieRecordType.SPARK) {
-      commonOpts ++ sparkOpts
-    } else {
-      commonOpts
+  def getOpts(recordType: HoodieRecordType, opt: Map[String, String] = 
commonOpts): (Map[String, String], Map[String, String]) = {

Review Comment:
   Same as below, please ditto everywhere



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -185,9 +185,11 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
 
     // Create the write parameters
     val parameters = buildMergeIntoConfig(hoodieCatalogTable)
+    // TODO Remove it when we implement ExpressionPayload for SparkRecord
+    val parametersWithAvroRecordMerger = parameters ++ 
Map(HoodieWriteConfig.MERGER_IMPLS.key -> 
classOf[HoodieAvroRecordMerger].getName)

Review Comment:
   Let's just append to `parameters` (no need for new var)



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala:
##########
@@ -251,8 +252,16 @@ object HoodieCatalystExpressionUtils {
     )
   }
 
+  @tailrec
   def existField(structType: StructType, name: String): Boolean = {
-    structType.getFieldIndex(name).isDefined
+    if (name.contains(".")) {

Review Comment:
   Let's move this method to `HoodieInternalRowUtils`, it has nothing to do w/ 
expressions



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java:
##########
@@ -61,7 +61,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
 
   private final boolean enablePointLookups;
 
-  protected final Schema readerSchema;
+  protected Schema readerSchema;

Review Comment:
   We should not be relaxing mutability constraints unless absolutely have to



##########
hudi-common/src/main/java/org/apache/hudi/common/util/IdentityIterator.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import java.util.Iterator;
+
+public class IdentityIterator<R> implements ClosableIterator<R> {

Review Comment:
   Why do we need this one?



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala:
##########
@@ -531,6 +533,18 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
           )
         }
       }
+    })
+  }
+
+  val sparkOpts = Map(
+    HoodieWriteConfig.MERGER_IMPLS.key -> 
classOf[HoodieSparkRecordMerger].getName,
+    HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet"
+  )
+
+  def getOpts(recordType: HoodieRecordType, opt: Map[String, String]): 
(Map[String, String], Map[String, String]) = {

Review Comment:
   nit: `getWriterReaderOpts`



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java:
##########
@@ -108,7 +117,12 @@ private ClosableIterator<InternalRow> 
getInternalRowIterator(Schema readerSchema
 
   @Override
   public Schema getSchema() {
-    return parquetUtils.readAvroSchema(conf, path);
+    // Some types in avro are not compatible with parquet
+    // decimal as int32/int64
+    MessageType messageType = ((ParquetUtils) parquetUtils).readSchema(conf, 
path);
+    StructType structType = new 
ParquetToSparkSchemaConverter(conf).convert(messageType);

Review Comment:
   Why do we need to convert to StructType?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java:
##########
@@ -231,7 +229,11 @@ public MapData getMap(int ordinal) {
 
   @Override
   public InternalRow copy() {
-    return new HoodieInternalRow(Arrays.copyOf(metaFields, metaFields.length), 
sourceRow.copy(), sourceContainsMetaFields);
+    UTF8String[] copyMetaFields = new UTF8String[metaFields.length];
+    for (int i = 0; i < metaFields.length; i++) {
+      copyMetaFields[i] = metaFields[i].copy();

Review Comment:
   Why do we need to copy individual fields?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -461,9 +460,12 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
   }
 
   protected def getTableState: HoodieTableState = {
-    val mergerImpls = 
ConfigUtils.getMergerImpls(optParams.asJava).asScala.toList
+    val mergerImpls = 
ConfigUtils.getMergerImpls(optParams.getOrElse(HoodieWriteConfig.MERGER_IMPLS.key(),

Review Comment:
   Let's abstract this lookup into common utility (and re-use it below)



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieInternalRowUtils.scala:
##########
@@ -72,8 +76,46 @@ object HoodieInternalRowUtils {
             } else {
               newRow.update(pos, oldValue)
             }
-          case _ =>
-            newRow.update(pos, oldValue)
+          case t if t == oldType => newRow.update(pos, oldValue)
+          // Type promotion
+          case _: ShortType =>
+            oldType match {
+              case _: ByteType => newRow.update(pos, 
oldValue.asInstanceOf[Byte].toShort)
+              case _ => throw new IllegalArgumentException(s"$oldSchema and 
$newSchema are incompatible")
+            }
+          case _: IntegerType =>
+            oldType match {
+              case _: ByteType => newRow.update(pos, 
oldValue.asInstanceOf[Byte].toInt)
+              case _: ShortType => newRow.update(pos, 
oldValue.asInstanceOf[Short].toInt)
+              case _ => throw new IllegalArgumentException(s"$oldSchema and 
$newSchema are incompatible")
+            }
+          case _: LongType =>
+            oldType match {
+              case _: ByteType => newRow.update(pos, 
oldValue.asInstanceOf[Byte].toLong)
+              case _: ShortType => newRow.update(pos, 
oldValue.asInstanceOf[Short].toLong)
+              case _: IntegerType => newRow.update(pos, 
oldValue.asInstanceOf[Int].toLong)
+              case _ => throw new IllegalArgumentException(s"$oldSchema and 
$newSchema are incompatible")
+            }
+          case _: FloatType =>
+            oldType match {
+              case _: ByteType => newRow.update(pos, 
oldValue.asInstanceOf[Byte].toFloat)
+              case _: ShortType => newRow.update(pos, 
oldValue.asInstanceOf[Short].toFloat)
+              case _: IntegerType => newRow.update(pos, 
oldValue.asInstanceOf[Int].toFloat)
+              case _: LongType => newRow.update(pos, 
oldValue.asInstanceOf[Long].toFloat)
+              case _ => throw new IllegalArgumentException(s"$oldSchema and 
$newSchema are incompatible")
+            }
+          case _: DoubleType =>
+            oldType match {
+              case _: ByteType => newRow.update(pos, 
oldValue.asInstanceOf[Byte].toDouble)
+              case _: ShortType => newRow.update(pos, 
oldValue.asInstanceOf[Short].toDouble)
+              case _: IntegerType => newRow.update(pos, 
oldValue.asInstanceOf[Int].toDouble)
+              case _: LongType => newRow.update(pos, 
oldValue.asInstanceOf[Long].toDouble)
+              case _: FloatType => newRow.update(pos, 
oldValue.asInstanceOf[Float].toDouble)
+              case _ => throw new IllegalArgumentException(s"$oldSchema and 
$newSchema are incompatible")
+            }
+          case _: BinaryType if oldType.isInstanceOf[StringType] => 
newRow.update(pos, oldValue.asInstanceOf[Array[Byte]].map(_.toChar).mkString)

Review Comment:
   These 2 seems to be inverted



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java:
##########
@@ -194,6 +185,13 @@ private RecordIterator(Schema readerSchema, Schema 
writerSchema, byte[] content,
     public static RecordIterator getInstance(HoodieAvroDataBlock dataBlock, 
byte[] content, InternalSchema internalSchema) throws IOException {
       // Get schema from the header
       Schema writerSchema = new 
Schema.Parser().parse(dataBlock.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
+      if (!internalSchema.isEmptySchema()) {

Review Comment:
   Why did this block move?



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala:
##########
@@ -251,8 +252,16 @@ object HoodieCatalystExpressionUtils {
     )
   }
 
+  @tailrec
   def existField(structType: StructType, name: String): Boolean = {
-    structType.getFieldIndex(name).isDefined
+    if (name.contains(".")) {

Review Comment:
   Would be better to rewrite it as a single loop, instead of tail-rec (to 
avoid calling split N times)



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieInternalRowUtils.scala:
##########
@@ -172,24 +214,22 @@ object HoodieInternalRowUtils {
     }
   }
 
-  /**
-   * @see 
org.apache.hudi.avro.HoodieAvroUtils#rewriteRecordWithMetadata(org.apache.avro.generic.GenericRecord,
 org.apache.avro.Schema, java.lang.String)
-   */
-  def rewriteRecordWithMetadata(record: InternalRow, oldSchema: StructType, 
newSchema: StructType, fileName: String): InternalRow = {
-    val newRecord = rewriteRecord(record, oldSchema, newSchema)
-    newRecord.update(HoodieMetadataField.FILENAME_METADATA_FIELD.ordinal, 
CatalystTypeConverters.convertToCatalyst(fileName))
+  def compareSchema(left: StructType, right: StructType): Boolean = {
+    val schemaPair = (left, right)
+    if (!schemaCompareMap.contains(schemaPair)) {

Review Comment:
   Left this comment before, but this still seems to be bubbling up: this 
doesn't make sense -- keys in the map will always be checked for equality, 
which defeats the purpose of this mapping



##########
hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java:
##########
@@ -58,10 +56,8 @@ public static String getPayloadClass(Properties properties) {
     return payloadClass;
   }
 
-  public static List<String> getMergerImpls(Map<String, String> optParams) {
-    return Arrays.stream(
-            optParams.getOrDefault("hoodie.datasource.write.merger.impls",
-                HoodieAvroRecordMerger.class.getName()).split(","))
+  public static List<String> getMergerImpls(String param) {

Review Comment:
   This utility isn't really specific for merge-impls



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/HoodieSparkValidateDuplicateKeyRecordMerger.scala:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.avro.Schema
+import org.apache.hudi.HoodieSparkRecordMerger
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model.HoodieAvroRecordMerger.Config
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.util.{collection, Option => HOption}
+import org.apache.hudi.exception.HoodieDuplicateKeyException
+
+/**
+ * Validate the duplicate key for insert statement without enable the 
INSERT_DROP_DUPS_OPT
+ * config.
+ */
+class HoodieSparkValidateDuplicateKeyRecordMerger extends 
HoodieSparkRecordMerger {

Review Comment:
   Can you please elaborate why do we need this RM impl (read t/h the java-doc, 
but don't think i fully grasped it)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to