YannByron commented on a change in pull request #4270:
URL: https://github.com/apache/hudi/pull/4270#discussion_r767130598



##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializer.scala
##########
@@ -18,17 +18,25 @@
 package org.apache.spark.sql.avro
 
 import org.apache.avro.Schema
+
 import org.apache.spark.sql.types.DataType
 
 /**
  * This is to be compatible with the type returned by Spark 3.1
  * and other spark versions for AvroDeserializer
  */
-case class HoodieAvroDeserializer(rootAvroType: Schema, rootCatalystType: 
DataType)
-  extends AvroDeserializer(rootAvroType, rootCatalystType) {
+case class HoodieAvroDeserializer(rootAvroType: Schema, rootCatalystType: 
DataType) {

Review comment:
       SPARK-34404

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Compaction.scala
##########
@@ -22,17 +22,37 @@ import 
org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.Compactio
 case class CompactionTable(table: LogicalPlan, operation: CompactionOperation, 
instantTimestamp: Option[Long])
   extends Command {
   override def children: Seq[LogicalPlan] = Seq(table)
+
+  def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): 
CompactionTable = {

Review comment:
       SPARK-34989

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
##########
@@ -244,13 +253,19 @@ case class HoodieResolveReferences(sparkSession: 
SparkSession) extends Rule[Logi
         case DeleteAction(condition) =>
           val resolvedCondition = 
condition.map(resolveExpressionFrom(resolvedSource)(_))
           DeleteAction(resolvedCondition)
+        case action: MergeAction =>

Review comment:
       SPARK-34962

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
##########
@@ -244,13 +253,19 @@ case class HoodieResolveReferences(sparkSession: 
SparkSession) extends Rule[Logi
         case DeleteAction(condition) =>
           val resolvedCondition = 
condition.map(resolveExpressionFrom(resolvedSource)(_))
           DeleteAction(resolvedCondition)
+        case action: MergeAction =>
+          // ForSpark3.2, it's UpdateStarAction
+          UpdateAction(action.condition, Seq.empty)
       }
       // Resolve the notMatchedActions
       val resolvedNotMatchedActions = notMatchedActions.map {
         case InsertAction(condition, assignments) =>
           val (resolvedCondition, resolvedAssignments) =
             resolveConditionAssignments(condition, assignments)
           InsertAction(resolvedCondition, resolvedAssignments)
+        case action: MergeAction =>
+          // ForSpark3.2, it's InsertStarAction

Review comment:
       ditto

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
##########
@@ -424,9 +439,9 @@ case class HoodiePostAnalysisRule(sparkSession: 
SparkSession) extends Rule[Logic
       case AlterTableChangeColumnCommand(tableName, columnName, newColumn)
         if isHoodieTable(tableName, sparkSession) =>
         AlterHoodieTableChangeColumnCommand(tableName, columnName, newColumn)
-      case ShowPartitionsCommand(tableName, specOpt)
-        if isHoodieTable(tableName, sparkSession) =>
-         ShowHoodieTablePartitionsCommand(tableName, specOpt)
+      case s: ShowPartitionsCommand

Review comment:
       SPARK-34238

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
##########
@@ -84,12 +89,11 @@ case class AlterHoodieTableAddColumnsCommand(
     sparkSession.catalog.refreshTable(table.identifier.unquotedString)
 
     SchemaUtils.checkColumnNameDuplication(
-      newSqlSchema.map(_.name),
+      newSqlDataSchema.map(_.name),
       "in the table definition of " + table.identifier,
       conf.caseSensitiveAnalysis)
-    DDLUtils.checkDataColNames(table, colsToAdd.map(_.name))

Review comment:
       the definition of `checkDataColNames` is changed by SPARK-36201, and 
this method is meaningless for hudi table.

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
##########
@@ -203,7 +207,11 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Runnab
 
     sourceExpression match {
       case attr: AttributeReference if sourceColumnName.find(resolver(_, 
attr.name)).get.equals(targetColumnName) => true
-      case Cast(attr: AttributeReference, _, _) if 
sourceColumnName.find(resolver(_, attr.name)).get.equals(targetColumnName) => 
true
+      case cast: Cast =>

Review comment:
       SPARK-35857

##########
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
##########
@@ -293,28 +293,26 @@ class TestHoodieSparkSqlWriter {
    */
   @Test
   def testDisableAndEnableMetaFields(): Unit = {
-    try {
-      testBulkInsertWithSortMode(BulkInsertSortMode.NONE, populateMetaFields = 
false)
-      //create a new table
-      val fooTableModifier = 
commonTableModifier.updated("hoodie.bulkinsert.shuffle.parallelism", "4")
-        .updated(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
-        .updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true")
-        .updated(HoodieWriteConfig.BULK_INSERT_SORT_MODE.key(), 
BulkInsertSortMode.NONE.name())
-        .updated(HoodieTableConfig.POPULATE_META_FIELDS.key(), "true")
+    testBulkInsertWithSortMode(BulkInsertSortMode.NONE, populateMetaFields = 
false)

Review comment:
       solve the warning while build.

##########
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
##########
@@ -711,51 +709,49 @@ class TestHoodieSparkSqlWriter {
         DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "",
         DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> 
"org.apache.hudi.keygen.NonpartitionedKeyGenerator",
         HoodieWriteConfig.TBL_NAME.key -> "hoodie_test")
-      try {
-        val df = spark.range(0, 1000).toDF("keyid")
-          .withColumn("col3", expr("keyid"))
-          .withColumn("age", lit(1))
-          .withColumn("p", lit(2))
-
-        df.write.format("hudi")
-          .options(options)
-          .option(DataSourceWriteOptions.OPERATION.key, "insert")
-          .option("hoodie.insert.shuffle.parallelism", "4")
-          .mode(SaveMode.Overwrite).save(tempBasePath)
-
-        df.write.format("hudi")
-          .options(options)
-          .option(DataSourceWriteOptions.OPERATION.key, 
"insert_overwrite_table")
-          .option("hoodie.insert.shuffle.parallelism", "4")
-          .mode(SaveMode.Append).save(tempBasePath)
-
-        val currentCommits = 
spark.read.format("hudi").load(tempBasePath).select("_hoodie_commit_time").take(1).map(_.getString(0))
-        val incrementalKeyIdNum = spark.read.format("hudi")
-          .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-          .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "0000")
-          .option(DataSourceReadOptions.END_INSTANTTIME.key, currentCommits(0))
-          .load(tempBasePath).select("keyid").orderBy("keyid").count
-        assert(incrementalKeyIdNum == 1000)
-
-        df.write.mode(SaveMode.Overwrite).save(baseBootStrapPath)
-        spark.emptyDataFrame.write.format("hudi")
-          .options(options)
-          .option(HoodieBootstrapConfig.BASE_PATH.key, baseBootStrapPath)
-          .option(HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key, 
classOf[NonpartitionedKeyGenerator].getCanonicalName)
-          .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
-          .option(HoodieBootstrapConfig.PARALLELISM_VALUE.key, "4")
-          .mode(SaveMode.Overwrite).save(tempBasePath)
-        df.write.format("hudi").options(options)
-          .option(DataSourceWriteOptions.OPERATION.key, 
"insert_overwrite_table")
-          .option("hoodie.insert.shuffle.parallelism", 
"4").mode(SaveMode.Append).save(tempBasePath)
-        val currentCommitsBootstrap = 
spark.read.format("hudi").load(tempBasePath).select("_hoodie_commit_time").take(1).map(_.getString(0))
-        val incrementalKeyIdNumBootstrap = spark.read.format("hudi")
-          .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-          .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "0000")
-          .option(DataSourceReadOptions.END_INSTANTTIME.key, 
currentCommitsBootstrap(0))
-          .load(tempBasePath).select("keyid").orderBy("keyid").count
-        assert(incrementalKeyIdNumBootstrap == 1000)
-      }
+      val df = spark.range(0, 1000).toDF("keyid")

Review comment:
       ditto

##########
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala
##########
@@ -59,14 +59,18 @@ class TestHoodieSqlBase extends FunSuite with 
BeforeAndAfterAll {
   }
 
   override protected def test(testName: String, testTags: Tag*)(testFun: => 
Any /* Assertion */)(implicit pos: source.Position): Unit = {
-    try super.test(testName, testTags: _*)(try testFun finally {
-      val catalog = spark.sessionState.catalog
-      catalog.listDatabases().foreach{db =>
-        catalog.listTables(db).foreach {table =>
-          catalog.dropTable(table, true, true)
+    super.test(testName, testTags: _*)(

Review comment:
       ditto

##########
File path: 
hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala
##########
@@ -16,24 +16,255 @@
  */
 
 package org.apache.spark.sql.execution.datasources
-import java.util.TimeZone
+
+import java.lang.{Double => JDouble, Long => JLong}
+import java.math.{BigDecimal => JBigDecimal}
+import java.time.ZoneId
+import java.util.{Locale, TimeZone}
 
 import org.apache.hadoop.fs.Path
+
+import 
org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH
+import org.apache.hudi.spark3.internal.ReflectUtil
+
+import org.apache.spark.SPARK_VERSION
+import org.apache.spark.sql.catalyst.InternalRow
+import 
org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.unescapePathName
+import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
 import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
-import 
org.apache.spark.sql.execution.datasources.PartitioningUtils.{PartitionValues, 
timestampPartitionPattern}
+import 
org.apache.spark.sql.execution.datasources.PartitioningUtils.timestampPartitionPattern
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
+import scala.util.control.NonFatal
 
 class Spark3ParsePartitionUtil(conf: SQLConf) extends SparkParsePartitionUtil {
 
-  override def parsePartition(path: Path, typeInference: Boolean,
-                              basePaths: Set[Path], userSpecifiedDataTypes: 
Map[String, DataType],
-                              timeZone: TimeZone): Option[PartitionValues] = {
-    val dateFormatter = DateFormatter(timeZone.toZoneId)
+  override def parsePartition(

Review comment:
       the definition of `PartitionValues` has been changed by SPARK-34314.
   Copy some codes from `PartitioningUtils` in Spark3.2 here to solve the 
compatibility between 3.1 and 3.2.

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
##########
@@ -84,12 +89,11 @@ case class AlterHoodieTableAddColumnsCommand(
     sparkSession.catalog.refreshTable(table.identifier.unquotedString)
 
     SchemaUtils.checkColumnNameDuplication(
-      newSqlSchema.map(_.name),
+      newSqlDataSchema.map(_.name),
       "in the table definition of " + table.identifier,
       conf.caseSensitiveAnalysis)
-    DDLUtils.checkDataColNames(table, colsToAdd.map(_.name))

Review comment:
       the definition of `checkDataColNames` has been changed by SPARK-36201, 
and this method is meaningless for hudi table.

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
##########
@@ -180,11 +181,19 @@ case class HoodieResolveReferences(sparkSession: 
SparkSession) extends Rule[Logi
               .map { case (targetAttr, sourceAttr) => Assignment(targetAttr, 
sourceAttr) }
           }
         } else {
-          assignments.map(assignment => {
+          // For Spark3.2, InsertStarAction/UpdateStarAction's assignments 
will contain the meta fields.

Review comment:
       yep.

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
##########
@@ -244,13 +253,19 @@ case class HoodieResolveReferences(sparkSession: 
SparkSession) extends Rule[Logi
         case DeleteAction(condition) =>
           val resolvedCondition = 
condition.map(resolveExpressionFrom(resolvedSource)(_))
           DeleteAction(resolvedCondition)
+        case action: MergeAction =>
+          // ForSpark3.2, it's UpdateStarAction
+          UpdateAction(action.condition, Seq.empty)

Review comment:
       `action` is a UpdateStarAction's instance. Here we convert 
UpdateStarAction to UpdateAction.

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializer.scala
##########
@@ -18,17 +18,25 @@
 package org.apache.spark.sql.avro
 
 import org.apache.avro.Schema
+
 import org.apache.spark.sql.types.DataType
 
 /**
  * This is to be compatible with the type returned by Spark 3.1
  * and other spark versions for AvroDeserializer
  */
-case class HoodieAvroDeserializer(rootAvroType: Schema, rootCatalystType: 
DataType)
-  extends AvroDeserializer(rootAvroType, rootCatalystType) {
+case class HoodieAvroDeserializer(rootAvroType: Schema, rootCatalystType: 
DataType) {
+
+  private val avroDeserializer = if 
(org.apache.spark.SPARK_VERSION.startsWith("3.2")) {
+    val constructor = 
classOf[AvroDeserializer].getConstructor(classOf[Schema], classOf[DataType], 
classOf[String])
+    constructor.newInstance(rootAvroType, rootCatalystType, "EXCEPTION")
+  } else {
+    val constructor = 
classOf[AvroDeserializer].getConstructor(classOf[Schema], classOf[DataType])
+    constructor.newInstance(rootAvroType, rootCatalystType)
+  }
 
   def deserializeData(data: Any): Any = {
-    super.deserialize(data) match {
+    avroDeserializer.deserialize(data) match {
       case Some(r) => r // spark 3.1 return type is Option, we fetch the data.

Review comment:
       ok

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Compaction.scala
##########
@@ -22,17 +22,37 @@ import 
org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.Compactio
 case class CompactionTable(table: LogicalPlan, operation: CompactionOperation, 
instantTimestamp: Option[Long])
   extends Command {
   override def children: Seq[LogicalPlan] = Seq(table)
+
+  def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): 
CompactionTable = {
+    copy(table = newChildren.head)
+  }
 }
 
 case class CompactionPath(path: String, operation: CompactionOperation, 
instantTimestamp: Option[Long])
-  extends Command
+  extends Command {
+  override def children: Seq[LogicalPlan] = Seq.empty
+
+  def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): 
CompactionPath = {
+    this
+  }
+}
 
 case class CompactionShowOnTable(table: LogicalPlan, limit: Int = 20)
   extends Command {
   override def children: Seq[LogicalPlan] = Seq(table)
+
+  def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): 
CompactionShowOnTable = {
+     copy(table = newChildren.head)

Review comment:
       Other classes which extends `RunnableCommand`, class 
`CompactionShowOnPath` and `CompactionPath` has no children, so that 
`withNewChildrenInternal ` just return the original object.
   In Spark3.2, any classes which extends `RunnableCommand` extends 
`LeafRunnableCommand` at the same time, except ones related to `view`. And 
`LeafRunnableCommand` overrides method `withNewChildrenInternal ` and return   
`this`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to