leesf commented on a change in pull request #4611:
URL: https://github.com/apache/hudi/pull/4611#discussion_r791286706



##########
File path: 
hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/connector/catalog/HoodieIdentifierHelper.scala
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog
+
+import java.util
+import java.util.Objects
+
+object HoodieIdentifierHelper {
+  def of(namespace: Array[String], name: String): Identifier = {
+    HoodieIdentifier(namespace, name)

Review comment:
       done

##########
File path: 
hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.analysis
+
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.{DefaultSource, SparkAdapterSupport}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{ResolvedTable, 
UnresolvedPartitionSpec}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
V2SessionCatalog}
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{castIfNeeded, 
getTableLocation, isHoodieTable, removeMetaFields, tableExistsInPath}
+import org.apache.spark.sql.hudi.catalog.{HoodieCatalog, HoodieConfigHelper, 
HoodieInternalV2Table}
+import 
org.apache.spark.sql.hudi.command.{AlterHoodieTableDropPartitionCommand, 
ShowHoodieTablePartitionsCommand, TruncateHoodieTableCommand}
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{AnalysisException, SQLContext, SparkSession}
+
+/**
+ * Rule for convert the logical plan to command.
+ * @param sparkSession
+ */
+case class HoodieSpark3Analysis(sparkSession: SparkSession) extends 
Rule[LogicalPlan]
+  with SparkAdapterSupport with HoodieConfigHelper {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsDown {
+    case dsv2 @ DataSourceV2Relation(d: HoodieInternalV2Table, _, _, _, _) =>
+      val output = dsv2.output
+      val catalogTable = if (d.catalogTable.isDefined) {
+        Some(d.v1Table)
+      } else {
+        None
+      }
+      val relation = new DefaultSource().createRelation(new 
SQLContext(sparkSession),
+        buildHoodieConfig(d.hoodieCatalogTable))
+      LogicalRelation(relation, output, catalogTable, isStreaming = false)
+    case a @ InsertIntoStatement(r: DataSourceV2Relation, partitionSpec, _, _, 
_, _) if a.query.resolved &&
+      r.table.isInstanceOf[HoodieInternalV2Table] &&
+      needsSchemaAdjustment(a.query, 
r.table.asInstanceOf[HoodieInternalV2Table], partitionSpec, r.schema) =>
+      val projection = resolveQueryColumnsByOrdinal(a.query, r.output)
+      if (projection != a.query) {
+        a.copy(query = projection)
+      } else {
+        a
+      }
+  }
+
+  private def needsSchemaAdjustment(query: LogicalPlan,

Review comment:
       done

##########
File path: 
hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.analysis
+
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.{DefaultSource, SparkAdapterSupport}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{ResolvedTable, 
UnresolvedPartitionSpec}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
V2SessionCatalog}
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{castIfNeeded, 
getTableLocation, isHoodieTable, removeMetaFields, tableExistsInPath}
+import org.apache.spark.sql.hudi.catalog.{HoodieCatalog, HoodieConfigHelper, 
HoodieInternalV2Table}
+import 
org.apache.spark.sql.hudi.command.{AlterHoodieTableDropPartitionCommand, 
ShowHoodieTablePartitionsCommand, TruncateHoodieTableCommand}
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{AnalysisException, SQLContext, SparkSession}
+
+/**
+ * Rule for convert the logical plan to command.
+ * @param sparkSession
+ */
+case class HoodieSpark3Analysis(sparkSession: SparkSession) extends 
Rule[LogicalPlan]
+  with SparkAdapterSupport with HoodieConfigHelper {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsDown {
+    case dsv2 @ DataSourceV2Relation(d: HoodieInternalV2Table, _, _, _, _) =>
+      val output = dsv2.output
+      val catalogTable = if (d.catalogTable.isDefined) {
+        Some(d.v1Table)
+      } else {
+        None
+      }
+      val relation = new DefaultSource().createRelation(new 
SQLContext(sparkSession),
+        buildHoodieConfig(d.hoodieCatalogTable))
+      LogicalRelation(relation, output, catalogTable, isStreaming = false)
+    case a @ InsertIntoStatement(r: DataSourceV2Relation, partitionSpec, _, _, 
_, _) if a.query.resolved &&
+      r.table.isInstanceOf[HoodieInternalV2Table] &&
+      needsSchemaAdjustment(a.query, 
r.table.asInstanceOf[HoodieInternalV2Table], partitionSpec, r.schema) =>
+      val projection = resolveQueryColumnsByOrdinal(a.query, r.output)
+      if (projection != a.query) {
+        a.copy(query = projection)
+      } else {
+        a
+      }
+  }
+
+  private def needsSchemaAdjustment(query: LogicalPlan,
+                                    hoodieTable: HoodieInternalV2Table,
+                                    partitionSpec: Map[String, Option[String]],
+                                    schema: StructType): Boolean = {
+    val output = query.output
+    val queryOutputWithoutMetaFields = removeMetaFields(output)
+    val partitionFields = hoodieTable.hoodieCatalogTable.partitionFields
+    val partitionSchema = hoodieTable.hoodieCatalogTable.partitionSchema
+    val staticPartitionValues = partitionSpec.filter(p => 
p._2.isDefined).mapValues(_.get)
+
+    assert(staticPartitionValues.isEmpty ||
+      staticPartitionValues.size == partitionSchema.size,
+      s"Required partition columns is: ${partitionSchema.json}, Current static 
partitions " +
+        s"is: ${staticPartitionValues.mkString("," + "")}")
+
+    assert(staticPartitionValues.size + queryOutputWithoutMetaFields.size
+      == hoodieTable.hoodieCatalogTable.tableSchemaWithoutMetaFields.size,
+      s"Required select columns count: 
${hoodieTable.hoodieCatalogTable.tableSchemaWithoutMetaFields.size}, " +
+        s"Current select columns(including static partition column) count: " +
+        s"${staticPartitionValues.size + 
queryOutputWithoutMetaFields.size},columns: " +
+        s"(${(queryOutputWithoutMetaFields.map(_.name) ++ 
staticPartitionValues.keys).mkString(",")})")
+
+    // static partition insert.
+    if (staticPartitionValues.nonEmpty) {
+      // drop partition fields in origin schema to align fields.
+      schema.dropWhile(p => partitionFields.contains(p.name))
+    }
+
+    val existingSchemaOutput = output.take(schema.length)
+    existingSchemaOutput.map(_.name) != schema.map(_.name) ||
+      existingSchemaOutput.map(_.dataType) != schema.map(_.dataType)
+  }
+
+  private def resolveQueryColumnsByOrdinal(query: LogicalPlan,
+                                           targetAttrs: Seq[Attribute]): 
LogicalPlan = {
+    // always add a Cast. it will be removed in the optimizer if it is 
unnecessary.
+    val project = query.output.zipWithIndex.map { case (attr, i) =>
+      if (i < targetAttrs.length) {
+        val targetAttr = targetAttrs(i)
+        val castAttr = castIfNeeded(attr.withNullability(targetAttr.nullable), 
targetAttr.dataType, conf)
+        Alias(castAttr, targetAttr.name)()
+      } else {
+        attr
+      }
+    }
+    Project(project, query)
+  }
+}
+
+/**
+ * Rule for resolve hoodie's extended syntax or rewrite some logical plan.
+ * @param sparkSession
+ */
+case class HoodieSpark3ResolveReferences(sparkSession: SparkSession) extends 
Rule[LogicalPlan]
+  with SparkAdapterSupport with HoodieConfigHelper {
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
+    // Fill schema for Create Table without specify schema info
+    case c @ CreateV2Table(tableCatalog, tableName, schema, _, properties, _)
+      if isHoodieTable(properties) =>
+
+      val hoodieCatalog = tableCatalog match {
+        case catalog: HoodieCatalog => catalog
+        case _ => tableCatalog.asInstanceOf[V2SessionCatalog]
+      }
+      val tablePath = getTableLocation(properties,
+        TableIdentifier(tableName.name(), tableName.namespace().lastOption), 
sparkSession)
+
+      val tableExistInCatalog = hoodieCatalog.tableExists(tableName)
+      // Only when the table has not exist in catalog, we need to fill the 
schema info for creating table.
+      if (!tableExistInCatalog && tableExistsInPath(tablePath, 
sparkSession.sessionState.newHadoopConf())) {
+        val metaClient = HoodieTableMetaClient.builder()
+          .setBasePath(tablePath)
+          .setConf(sparkSession.sessionState.newHadoopConf())
+          .build()
+        val tableSchema = HoodieSqlCommonUtils.getTableSqlSchema(metaClient)
+        if (tableSchema.isDefined && schema.isEmpty) {
+          // Fill the schema with the schema from the table
+          c.copy(tableSchema = tableSchema.get)
+        } else if (tableSchema.isDefined && schema != tableSchema.get) {
+          throw new AnalysisException(s"Specified schema in create table 
statement is not equal to the table schema." +
+            s"You should not specify the schema for an exist table: $tableName 
")
+        } else {
+          c
+        }
+      } else {
+        c
+      }
+    case DropPartitions(child, specs, ifExists, purge)

Review comment:
       done

##########
File path: 
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
##########
@@ -92,4 +95,31 @@ trait SparkAdapter extends Serializable {
    * ParserInterface#parseMultipartIdentifier is supported since spark3, for 
spark2 this should not be called.
    */
   def parseMultipartIdentifier(parser: ParserInterface, sqlText: String): 
Seq[String]
+
+  def isHoodieTable(table: LogicalPlan, spark: SparkSession): Boolean = {
+    tripAlias(table) match {
+      case LogicalRelation(_, _, Some(tbl), _) => isHoodieTable(tbl)
+      case relation: UnresolvedRelation =>
+        isHoodieTable(toTableIdentifier(relation), spark)
+      case _=> false
+    }
+  }
+
+  def tripAlias(plan: LogicalPlan): LogicalPlan = {
+    plan match {
+      case SubqueryAlias(_, relation: LogicalPlan) =>
+        tripAlias(relation)
+      case other =>
+        other
+    }
+  }
+
+  def isHoodieTable(table: CatalogTable): Boolean = {
+    table.provider.map(_.toLowerCase(Locale.ROOT)).orNull == "hudi"
+  }
+
+  def isHoodieTable(tableId: TableIdentifier, spark: SparkSession): Boolean = {
+    val table = spark.sessionState.catalog.getTableMetadata(tableId)
+    isHoodieTable(table)
+  }

Review comment:
       done

##########
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
##########
@@ -94,11 +94,17 @@ class TestHoodieSparkSqlWriter {
    * Utility method for initializing the spark context.
    */
   def initSparkContext(): Unit = {
+    val sparkConf = new SparkConf()
+    if (HoodieSparkUtils.isSpark3_2) {
+      sparkConf.set("spark.sql.catalog.spark_catalog",
+        "org.apache.spark.sql.hudi.catalog.HoodieCatalog")

Review comment:
       here is because in the class we use Spark SQL to create hudi table, so 
we need to set HoodieCatalog

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
##########
@@ -42,12 +43,39 @@ object HoodieAnalysis {
     Seq(
       session => HoodieResolveReferences(session),
       session => HoodieAnalysis(session)
-    )
+    ) ++ extraResolutionRules()
 
   def customPostHocResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] =
     Seq(
       session => HoodiePostAnalysisRule(session)
-    )
+    ) ++ extraPostHocResolutionRules()
+
+  def extraResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] = {
+    if (HoodieSparkUtils.isSpark3_2) {

Review comment:
       done

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
##########
@@ -42,12 +43,39 @@ object HoodieAnalysis {
     Seq(
       session => HoodieResolveReferences(session),
       session => HoodieAnalysis(session)
-    )
+    ) ++ extraResolutionRules()
 
   def customPostHocResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] =
     Seq(
       session => HoodiePostAnalysisRule(session)
-    )
+    ) ++ extraPostHocResolutionRules()
+
+  def extraResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] = {
+    if (HoodieSparkUtils.isSpark3_2) {
+      val spark3AnalysisClass = 
"org.apache.spark.sql.hudi.analysis.HoodieSpark3Analysis"
+      val spark3Analysis: SparkSession => Rule[LogicalPlan] =
+        session => ReflectionUtils.loadClass(spark3AnalysisClass, 
session).asInstanceOf[Rule[LogicalPlan]]
+
+      val spark3ResolveReferences = 
"org.apache.spark.sql.hudi.analysis.HoodieSpark3ResolveReferences"
+      val spark3References: SparkSession => Rule[LogicalPlan] =
+        session => ReflectionUtils.loadClass(spark3ResolveReferences, 
session).asInstanceOf[Rule[LogicalPlan]]
+
+      Seq(spark3Analysis, spark3References)
+    } else {
+      Seq.empty
+    }
+  }
+
+  def extraPostHocResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] =
+    if (HoodieSparkUtils.isSpark3_2) {

Review comment:
       done

##########
File path: 
hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HudiSpark3SqlUtils.scala
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.hudi.exception.HoodieException
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.connector.expressions.{BucketTransform, 
FieldReference, IdentityTransform, Transform}
+
+import scala.collection.mutable
+
+object HudiSpark3SqlUtils {

Review comment:
       done

##########
File path: 
hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieConfigHelper.scala
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.catalog
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
+import org.apache.hudi.hive.MultiPartKeysValueExtractor
+import org.apache.hudi.hive.ddl.HiveSyncMode
+import org.apache.hudi.keygen.ComplexKeyGenerator
+import org.apache.hudi.sql.InsertMode
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{castIfNeeded, 
isEnableHive, withSparkConf}
+import org.apache.spark.sql.hudi.command.{SqlKeyGenerator, 
ValidateDuplicateKeyPayload}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructField
+
+import scala.collection.JavaConverters.propertiesAsScalaMapConverter
+
+trait HoodieConfigHelper extends Logging {

Review comment:
       done

##########
File path: 
hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieConfigHelper.scala
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.catalog
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
+import org.apache.hudi.hive.MultiPartKeysValueExtractor
+import org.apache.hudi.hive.ddl.HiveSyncMode
+import org.apache.hudi.keygen.ComplexKeyGenerator
+import org.apache.hudi.sql.InsertMode
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{castIfNeeded, 
isEnableHive, withSparkConf}
+import org.apache.spark.sql.hudi.command.{SqlKeyGenerator, 
ValidateDuplicateKeyPayload}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructField
+
+import scala.collection.JavaConverters.propertiesAsScalaMapConverter
+
+trait HoodieConfigHelper extends Logging {
+
+  def buildHoodieConfig(hoodieCatalogTable: HoodieCatalogTable): Map[String, 
String] = {
+    val sparkSession: SparkSession = hoodieCatalogTable.spark
+    val catalogProperties = hoodieCatalogTable.catalogProperties
+    val tableConfig = hoodieCatalogTable.tableConfig
+    val tableId = hoodieCatalogTable.table.identifier
+
+    val preCombineColumn = Option(tableConfig.getPreCombineField).getOrElse("")
+    assert(hoodieCatalogTable.primaryKeys.nonEmpty,

Review comment:
       done

##########
File path: 
hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieConfigHelper.scala
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.catalog
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
+import org.apache.hudi.hive.MultiPartKeysValueExtractor
+import org.apache.hudi.hive.ddl.HiveSyncMode
+import org.apache.hudi.keygen.ComplexKeyGenerator
+import org.apache.hudi.sql.InsertMode
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{castIfNeeded, 
isEnableHive, withSparkConf}
+import org.apache.spark.sql.hudi.command.{SqlKeyGenerator, 
ValidateDuplicateKeyPayload}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructField
+
+import scala.collection.JavaConverters.propertiesAsScalaMapConverter
+
+trait HoodieConfigHelper extends Logging {
+
+  def buildHoodieConfig(hoodieCatalogTable: HoodieCatalogTable): Map[String, 
String] = {
+    val sparkSession: SparkSession = hoodieCatalogTable.spark
+    val catalogProperties = hoodieCatalogTable.catalogProperties
+    val tableConfig = hoodieCatalogTable.tableConfig
+    val tableId = hoodieCatalogTable.table.identifier
+
+    val preCombineColumn = Option(tableConfig.getPreCombineField).getOrElse("")

Review comment:
       done

##########
File path: 
hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieConfigHelper.scala
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.catalog
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
+import org.apache.hudi.hive.MultiPartKeysValueExtractor
+import org.apache.hudi.hive.ddl.HiveSyncMode
+import org.apache.hudi.keygen.ComplexKeyGenerator
+import org.apache.hudi.sql.InsertMode
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{castIfNeeded, 
isEnableHive, withSparkConf}
+import org.apache.spark.sql.hudi.command.{SqlKeyGenerator, 
ValidateDuplicateKeyPayload}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructField
+
+import scala.collection.JavaConverters.propertiesAsScalaMapConverter
+
+trait HoodieConfigHelper extends Logging {
+
+  def buildHoodieConfig(hoodieCatalogTable: HoodieCatalogTable): Map[String, 
String] = {
+    val sparkSession: SparkSession = hoodieCatalogTable.spark
+    val catalogProperties = hoodieCatalogTable.catalogProperties
+    val tableConfig = hoodieCatalogTable.tableConfig
+    val tableId = hoodieCatalogTable.table.identifier
+
+    val preCombineColumn = Option(tableConfig.getPreCombineField).getOrElse("")
+    assert(hoodieCatalogTable.primaryKeys.nonEmpty,
+      s"There are no primary key in table 
${hoodieCatalogTable.table.identifier}, cannot execute update operator")
+    val enableHive = isEnableHive(sparkSession)
+
+    withSparkConf(sparkSession, catalogProperties) {
+      Map(
+        "path" -> hoodieCatalogTable.tableLocation,
+        RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
+        PRECOMBINE_FIELD.key -> preCombineColumn,
+        TBL_NAME.key -> hoodieCatalogTable.tableName,
+        HIVE_STYLE_PARTITIONING.key -> 
tableConfig.getHiveStylePartitioningEnable,
+        URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
+        KEYGENERATOR_CLASS_NAME.key -> 
classOf[SqlKeyGenerator].getCanonicalName,
+        SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> 
tableConfig.getKeyGeneratorClassName,
+        OPERATION.key -> UPSERT_OPERATION_OPT_VAL,
+        PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
+        META_SYNC_ENABLED.key -> enableHive.toString,
+        HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
+        HIVE_USE_JDBC.key -> "false",
+        HIVE_DATABASE.key -> tableId.database.getOrElse("default"),
+        HIVE_TABLE.key -> tableId.table,
+        HIVE_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp,
+        HIVE_PARTITION_EXTRACTOR_CLASS.key -> 
classOf[MultiPartKeysValueExtractor].getCanonicalName,
+        HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
+        HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200",
+        SqlKeyGenerator.PARTITION_SCHEMA -> 
hoodieCatalogTable.partitionSchema.toDDL
+      )
+    }
+  }
+
+  def cast(exp: Expression, field: StructField, sqlConf: SQLConf): Expression 
= {
+    castIfNeeded(exp, field.dataType, sqlConf)
+  }

Review comment:
       removed

##########
File path: 
hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieConfigHelper.scala
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.catalog
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
+import org.apache.hudi.hive.MultiPartKeysValueExtractor
+import org.apache.hudi.hive.ddl.HiveSyncMode
+import org.apache.hudi.keygen.ComplexKeyGenerator
+import org.apache.hudi.sql.InsertMode
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{castIfNeeded, 
isEnableHive, withSparkConf}
+import org.apache.spark.sql.hudi.command.{SqlKeyGenerator, 
ValidateDuplicateKeyPayload}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructField
+
+import scala.collection.JavaConverters.propertiesAsScalaMapConverter
+
+trait HoodieConfigHelper extends Logging {
+
+  def buildHoodieConfig(hoodieCatalogTable: HoodieCatalogTable): Map[String, 
String] = {
+    val sparkSession: SparkSession = hoodieCatalogTable.spark
+    val catalogProperties = hoodieCatalogTable.catalogProperties
+    val tableConfig = hoodieCatalogTable.tableConfig
+    val tableId = hoodieCatalogTable.table.identifier
+
+    val preCombineColumn = Option(tableConfig.getPreCombineField).getOrElse("")
+    assert(hoodieCatalogTable.primaryKeys.nonEmpty,
+      s"There are no primary key in table 
${hoodieCatalogTable.table.identifier}, cannot execute update operator")
+    val enableHive = isEnableHive(sparkSession)
+
+    withSparkConf(sparkSession, catalogProperties) {
+      Map(
+        "path" -> hoodieCatalogTable.tableLocation,
+        RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
+        PRECOMBINE_FIELD.key -> preCombineColumn,
+        TBL_NAME.key -> hoodieCatalogTable.tableName,
+        HIVE_STYLE_PARTITIONING.key -> 
tableConfig.getHiveStylePartitioningEnable,
+        URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
+        KEYGENERATOR_CLASS_NAME.key -> 
classOf[SqlKeyGenerator].getCanonicalName,
+        SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> 
tableConfig.getKeyGeneratorClassName,
+        OPERATION.key -> UPSERT_OPERATION_OPT_VAL,
+        PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
+        META_SYNC_ENABLED.key -> enableHive.toString,
+        HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
+        HIVE_USE_JDBC.key -> "false",
+        HIVE_DATABASE.key -> tableId.database.getOrElse("default"),
+        HIVE_TABLE.key -> tableId.table,
+        HIVE_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp,
+        HIVE_PARTITION_EXTRACTOR_CLASS.key -> 
classOf[MultiPartKeysValueExtractor].getCanonicalName,
+        HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
+        HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200",
+        SqlKeyGenerator.PARTITION_SCHEMA -> 
hoodieCatalogTable.partitionSchema.toDDL
+      )
+    }
+  }
+
+  def cast(exp: Expression, field: StructField, sqlConf: SQLConf): Expression 
= {
+    castIfNeeded(exp, field.dataType, sqlConf)
+  }
+
+  /**
+   * Build the default config for insert.
+   * @return
+   */
+    def buildHoodieInsertConfig(hoodieCatalogTable: HoodieCatalogTable,
+                                sparkSession: SparkSession,
+                                isOverwrite: Boolean,
+                                insertPartitions: Map[String, Option[String]] 
= Map.empty,
+                                extraOptions: Map[String, String]): 
Map[String, String] = {
+
+    if (insertPartitions.nonEmpty &&
+      (insertPartitions.keys.toSet != 
hoodieCatalogTable.partitionFields.toSet)) {
+      throw new IllegalArgumentException(s"Insert partition fields" +
+        s"[${insertPartitions.keys.mkString(" " )}]" +
+        s" not equal to the defined partition in 
table[${hoodieCatalogTable.partitionFields.mkString(",")}]")
+    }

Review comment:
       here if replace it with require, the condition will be very complicated, 
so i just left it as it is.

##########
File path: 
hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.catalog
+
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable}
+import org.apache.spark.sql.connector.catalog.TableCapability._
+import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, 
TableCapability, V2TableWithV1Fallback}
+import org.apache.spark.sql.connector.expressions.{FieldReference, 
IdentityTransform, Transform}
+import org.apache.spark.sql.connector.write._
+import org.apache.spark.sql.sources.{Filter, InsertableRelation}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+
+import java.util
+import scala.collection.JavaConverters.{mapAsJavaMapConverter, 
setAsJavaSetConverter}
+
+case class HoodieInternalV2Table(spark: SparkSession,
+                                 path: String,
+                                 catalogTable: Option[CatalogTable] = None,
+                                 tableIdentifier: Option[String] = None,
+                                 options: CaseInsensitiveStringMap = 
CaseInsensitiveStringMap.empty())
+  extends Table with SupportsWrite with V2TableWithV1Fallback {
+
+  lazy val hoodieCatalogTable: HoodieCatalogTable = if 
(catalogTable.isDefined) {
+    HoodieCatalogTable(spark, catalogTable.get)
+  } else {
+    val metaClient: HoodieTableMetaClient = HoodieTableMetaClient.builder()
+      .setBasePath(path)
+      .setConf(SparkSession.active.sessionState.newHadoopConf)
+      .build()
+
+    val tableConfig: HoodieTableConfig = metaClient.getTableConfig
+    val tableName: String = tableConfig.getTableName
+
+    HoodieCatalogTable(spark, TableIdentifier(tableName))
+  }
+
+  private lazy val tableSchema: StructType = hoodieCatalogTable.tableSchema
+
+  override def name(): String = 
hoodieCatalogTable.table.identifier.unquotedString
+
+  override def schema(): StructType = tableSchema
+
+  override def capabilities(): util.Set[TableCapability] = Set(
+    BATCH_READ, V1_BATCH_WRITE, OVERWRITE_BY_FILTER, TRUNCATE, 
ACCEPT_ANY_SCHEMA
+  ).asJava
+
+  override def properties(): util.Map[String, String] = {
+    val map = new util.HashMap[String, String]()
+    map.put("provider", "hudi")

Review comment:
       here because the catalog not only handle hudi tables, but would also 
handle other types of tables, such as parquet.

##########
File path: 
hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.catalog
+
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable}
+import org.apache.spark.sql.connector.catalog.TableCapability._
+import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, 
TableCapability, V2TableWithV1Fallback}
+import org.apache.spark.sql.connector.expressions.{FieldReference, 
IdentityTransform, Transform}
+import org.apache.spark.sql.connector.write._
+import org.apache.spark.sql.sources.{Filter, InsertableRelation}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+
+import java.util
+import scala.collection.JavaConverters.{mapAsJavaMapConverter, 
setAsJavaSetConverter}
+
+case class HoodieInternalV2Table(spark: SparkSession,
+                                 path: String,
+                                 catalogTable: Option[CatalogTable] = None,
+                                 tableIdentifier: Option[String] = None,
+                                 options: CaseInsensitiveStringMap = 
CaseInsensitiveStringMap.empty())
+  extends Table with SupportsWrite with V2TableWithV1Fallback {
+
+  lazy val hoodieCatalogTable: HoodieCatalogTable = if 
(catalogTable.isDefined) {
+    HoodieCatalogTable(spark, catalogTable.get)
+  } else {
+    val metaClient: HoodieTableMetaClient = HoodieTableMetaClient.builder()
+      .setBasePath(path)
+      .setConf(SparkSession.active.sessionState.newHadoopConf)
+      .build()
+
+    val tableConfig: HoodieTableConfig = metaClient.getTableConfig
+    val tableName: String = tableConfig.getTableName
+
+    HoodieCatalogTable(spark, TableIdentifier(tableName))
+  }
+
+  private lazy val tableSchema: StructType = hoodieCatalogTable.tableSchema
+
+  override def name(): String = 
hoodieCatalogTable.table.identifier.unquotedString
+
+  override def schema(): StructType = tableSchema
+
+  override def capabilities(): util.Set[TableCapability] = Set(
+    BATCH_READ, V1_BATCH_WRITE, OVERWRITE_BY_FILTER, TRUNCATE, 
ACCEPT_ANY_SCHEMA
+  ).asJava
+
+  override def properties(): util.Map[String, String] = {
+    val map = new util.HashMap[String, String]()
+    map.put("provider", "hudi")
+    map.putAll(hoodieCatalogTable.catalogProperties.asJava)
+    map
+  }
+
+  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+    new WriteIntoHoodieBuilder(info.options, hoodieCatalogTable, spark)
+  }
+
+  override def v1Table: CatalogTable = hoodieCatalogTable.table
+
+  override def partitioning(): Array[Transform] = {
+    hoodieCatalogTable.partitionFields.map { col =>
+      new IdentityTransform(new FieldReference(Seq(col)))
+    }.toArray
+  }
+
+}
+
+private class WriteIntoHoodieBuilder(writeOptions: CaseInsensitiveStringMap,

Review comment:
       done

##########
File path: 
hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/BaseStagedTable.scala
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.catalog
+
+import org.apache.hudi.exception.HoodieException
+import org.apache.spark.sql.connector.catalog._
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
+import org.apache.spark.sql.types.StructType
+
+import java.util
+
+case class BaseStagedTable(ident: Identifier,

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to