kwondw commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r605341196



##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME
+import org.apache.hudi.hive.MultiPartKeysValueExtractor
+import org.apache.hudi.keygen.ComplexKeyGenerator
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment, SubqueryAlias, 
UpdateTable}
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.hudi.HoodieOptionConfig
+import org.apache.spark.sql.hudi.HoodieSqlUtils._
+import org.apache.spark.sql.types.StructField
+
+import scala.collection.JavaConverters._
+
+case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends 
RunnableCommand {
+
+  private val table = updateTable.table
+  private val tableAlias = table match {
+    case SubqueryAlias(name, _) => name
+    case _ => throw new IllegalArgumentException(s"Illegal table: $table")
+  }
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    logInfo(s"start execute update command for $tableAlias")
+    def cast(exp:Expression, field: StructField): Expression = {
+      castIfNeeded(exp, field.dataType, sparkSession.sqlContext.conf)
+    }
+    val name2UpdateValue = updateTable.assignments.map {
+      case Assignment(attr: AttributeReference, value) =>
+        attr.name -> value
+    }.toMap
+
+    val updateExpressions = table.output
+      .map(attr => name2UpdateValue.getOrElse(attr.name, attr))
+      .filter { // filter the meta columns
+        case attr: AttributeReference =>
+          !HoodieRecord.HOODIE_META_COLUMNS.asScala.toSet.contains(attr.name)
+        case _=> true
+      }
+
+    val projects = 
updateExpressions.zip(removeMetaFields(table.schema).fields).map {
+      case (attr: AttributeReference, field) =>
+        Column(cast(attr, field))
+      case (exp, field) =>
+        Column(Alias(cast(exp, field), field.name)())
+    }
+
+    var df = Dataset.ofRows(sparkSession, table)
+    if (updateTable.condition.isDefined) {
+      df = df.filter(Column(updateTable.condition.get))
+    }
+    df = df.select(projects: _*)
+    val config = buildHoodieConfig(sparkSession)
+    df.write
+      .format("hudi")
+      .mode(SaveMode.Append)
+      .options(config)
+      .save()
+    table.refresh()
+    logInfo(s"finish execute update command for $tableAlias")
+    Seq.empty[Row]
+  }
+
+  private def buildHoodieConfig(sparkSession: SparkSession): Map[String, 
String] = {
+    val targetTable = sparkSession.sessionState.catalog
+      .getTableMetadata(TableIdentifier(tableAlias.identifier, 
tableAlias.database))
+    val path = getTableLocation(targetTable, sparkSession)
+      .getOrElse(s"missing location for $tableAlias")
+
+    val primaryColumns = 
HoodieOptionConfig.getPrimaryColumns(targetTable.storage.properties)
+
+    assert(primaryColumns.nonEmpty,
+      s"There are no primary key in table $tableAlias, cannot execute update 
operator")
+    withSparkConf(sparkSession, targetTable.storage.properties) {
+      Map(
+        "path" -> removeStarFromPath(path.toString),
+        RECORDKEY_FIELD_OPT_KEY -> primaryColumns.mkString(","),
+        KEYGENERATOR_CLASS_OPT_KEY -> 
classOf[ComplexKeyGenerator].getCanonicalName,

Review comment:
       Sometime a user creates table with different key generator, isn't it 
better to make this configurable and store as TBLPROPERTIES when a table is 
created?

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
##########
@@ -56,13 +68,46 @@ case class CreateHoodieTableCommand(table: CatalogTable, 
ignoreIfExists: Boolean
         return Seq.empty[Row]
         // scalastyle:on
       } else {
-        throw new IllegalArgumentException(s"Table 
${table.identifier.unquotedString} already exists.")
+        throw new IllegalArgumentException(s"Table $tableName already exists.")
       }
     }
-    // Add the meta fields to the schema,
-    val newSchema = addMetaFields(table.schema)
+
     var path = getTableLocation(table, sparkSession)
       .getOrElse(s"Missing path for table ${table.identifier}")
+    val conf = sparkSession.sessionState.newHadoopConf()
+    val isTableExists = tableExists(path, conf)
+    // Get the schema & table options
+    val (newSchema, tableOptions) = if (table.tableType == 
CatalogTableType.EXTERNAL &&
+      isTableExists) {
+      // If this is an external table & the table has already exists in the 
location,
+      // load the schema from the table meta.
+      assert(table.schema.isEmpty,
+        s"Should not specified table schema for an exists hoodie external " +
+          s"table: ${table.identifier.unquotedString}")
+      // Get Schema from the external table
+      val metaClient = HoodieTableMetaClient.builder()
+        .setBasePath(path)
+        .setConf(conf)
+        .build()
+      val schemaResolver = new TableSchemaResolver(metaClient)
+      val avroSchema = schemaResolver.getTableAvroSchema(true)
+      val tableSchema = 
SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
+      // Get options from the external table
+      val options = HoodieOptionConfig.mappingTableConfigToSqlOption(
+        metaClient.getTableConfig.getProps.asScala.toMap)
+      (tableSchema, options)
+    } else {

Review comment:
       Does this mean Hudi support Managed table too? 
   As far as I understand, the data of managed table will be deleted by "drop 
table", I wonder is this expected?

##########
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/MergeIntoTest.scala
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.spark.sql.Row
+
+class MergeIntoTest extends HoodieBaseSqlTest {
+
+  test("Test MergeInto") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      // Create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}'
+           | options (
+           |  primaryKey ='id',
+           |  versionColumn = 'ts'
+           | )
+       """.stripMargin)
+
+      // First merge (insert a new record)
+      spark.sql(
+        s"""
+           | merge into $tableName
+           | using (
+           |  select 1 as id, 'a1' as name, 10 as price, 1000 as ts
+           | ) s0
+           | on s0.id = $tableName.id
+           | when matched then update set
+           | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
+           | when not matched then insert *
+       """.stripMargin)
+      val queryResult1 = spark.sql(s"select id, name, price, ts from 
$tableName").collect()
+      assertResult(Array(Row(1, "a1", 10.0, 1000)))(queryResult1)
+
+      // second merge (update the record)
+      spark.sql(
+        s"""
+           | merge into $tableName
+           | using (
+           |  select 1 as id, 'a1' as name, 10 as price, 1001 as ts
+           | ) s0
+           | on s0.id = $tableName.id
+           | when matched then update set
+           | id = s0.id, name = s0.name, price = s0.price + $tableName.price, 
ts = s0.ts
+           | when not matched then insert *
+       """.stripMargin)
+      val queryResult2 = spark.sql(s"select id, name, price, ts from 
$tableName").collect()
+      assertResult(Array(Row(1, "a1", 20.0, 1001)))(queryResult2)
+
+      // the third time merge (update & insert the record)
+      spark.sql(
+        s"""
+           | merge into $tableName
+           | using (
+           |  select * from (
+           |  select 1 as id, 'a1' as name, 10 as price, 1002 as ts
+           |  union all
+           |  select 2 as id, 'a2' as name, 12 as price, 1001 as ts
+           |  )
+           | ) s0
+           | on s0.id = $tableName.id
+           | when matched then update set
+           | id = s0.id, name = s0.name, price = s0.price + $tableName.price, 
ts = s0.ts
+           | when not matched and id % 2 = 0 then insert *
+       """.stripMargin)
+      val queryResult3 = spark.sql(s"select id, name, price, ts from 
$tableName").collect()
+      assertResult(Array(Row(1, "a1", 30.0, 1002), Row(2, "a2", 12.0, 
1001)))(queryResult3)
+
+      // the fourth merge (delete the record)
+      spark.sql(
+        s"""
+           | merge into $tableName
+           | using (
+           |  select 1 as id, 'a1' as name, 12 as price, 1003 as ts
+           | ) s0
+           | on s0.id = $tableName.id
+           | when matched and id != 1 then update set
+           |    id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
+           | when matched and id = 1 then delete

Review comment:
       Does the match-delete condition also support only record key? like `when 
matched and ts = 1002` won't work?

##########
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/MergeIntoTest.scala
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.spark.sql.Row
+
+class MergeIntoTest extends HoodieBaseSqlTest {
+
+  test("Test MergeInto") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      // Create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}'
+           | options (
+           |  primaryKey ='id',
+           |  versionColumn = 'ts'
+           | )
+       """.stripMargin)
+
+      // First merge (insert a new record)
+      spark.sql(
+        s"""
+           | merge into $tableName
+           | using (
+           |  select 1 as id, 'a1' as name, 10 as price, 1000 as ts
+           | ) s0
+           | on s0.id = $tableName.id

Review comment:
       does 2 tables column names must be the same for on condition?
   When the on condition compares 2 different columns like `s0.eid = 
$tableName.id` or `s0.id = $tableName.eid` , it doesn't seem to work even if I 
make `id` and `eid` as a record key, I wonder is this limitation of merge into 
query?

##########
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/CreateTableTest.scala
##########
@@ -19,17 +19,44 @@ package org.apache.spark.sql.hudi
 
 import scala.collection.JavaConverters._
 import org.apache.hudi.common.model.HoodieRecord
-import org.apache.hudi.exception.HoodieDuplicateKeyException
-import org.apache.hudi.hadoop.HoodieParquetInputFormat
 import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
-import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTableType
 import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, 
StringType, StructField}
 
 class CreateTableTest extends HoodieBaseSqlTest {
 
-  test("Test Create Hoodie Table") {
+  test("Test Create Managed Hoodie Table") {
+    val tableName = generateTableName
+    // Create a managed table
+    spark.sql(
+      s"""
+         | create table $tableName (
+         |  id int,
+         |  name string,
+         |  price double,
+         |  ts long
+         | ) using hudi
+         | options (
+         |   primaryKey = 'id',

Review comment:
       It looks TBLPROPERTIES would more make sense than OPTIONS. Is there any 
particular reason you choose OPTIONS over TBLPROPERTIES?
   
   Also I noticed there isn't any validation of primaryKey option whether 
actually exists among table columns like in this test, change of 
`primaryKey=nonexistid` still allows to create table without any error but 
failed when try to do other operations like insert.

##########
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/MergeIntoTest.scala
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.spark.sql.Row
+
+class MergeIntoTest extends HoodieBaseSqlTest {
+
+  test("Test MergeInto") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      // Create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}'
+           | options (
+           |  primaryKey ='id',
+           |  versionColumn = 'ts'
+           | )
+       """.stripMargin)
+
+      // First merge (insert a new record)
+      spark.sql(
+        s"""
+           | merge into $tableName
+           | using (
+           |  select 1 as id, 'a1' as name, 10 as price, 1000 as ts
+           | ) s0
+           | on s0.id = $tableName.id
+           | when matched then update set
+           | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
+           | when not matched then insert *
+       """.stripMargin)
+      val queryResult1 = spark.sql(s"select id, name, price, ts from 
$tableName").collect()
+      assertResult(Array(Row(1, "a1", 10.0, 1000)))(queryResult1)
+
+      // second merge (update the record)
+      spark.sql(
+        s"""
+           | merge into $tableName

Review comment:
       alias for merge doesn't seem to work like `merge into targetTable as t 
using sourceTable as s on t.id = s.id`, I wonder will support later?

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import java.util.Properties
+
+import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericRecord, IndexedRecord}
+import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord}
+import org.apache.hudi.common.util.{Option => HOption}
+import org.apache.hudi.exception.HoodieDuplicateKeyException
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME
+import org.apache.hudi.hive.MultiPartKeysValueExtractor
+import org.apache.hudi.keygen.{ComplexKeyGenerator, UuidKeyGenerator}
+import org.apache.hudi.{HoodieSparkSqlWriter, HoodieWriterUtils}
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SaveMode, 
SparkSession}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import 
org.apache.spark.sql.execution.datasources.{InsertIntoDataSourceCommand, 
LogicalRelation}
+import org.apache.spark.sql.hudi.HoodieOptionConfig
+import org.apache.spark.sql.hudi.HoodieSqlUtils._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{StringType, StructType}
+
+/**
+  * Command for insert into hoodie table.
+  */
+class InsertIntoHoodieTableCommand(
+    logicalRelation: LogicalRelation,
+    query: LogicalPlan,
+    partition: Map[String, Option[String]],
+    overwrite: Boolean)
+  extends InsertIntoDataSourceCommand(logicalRelation, query, overwrite) {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    assert(logicalRelation.catalogTable.isDefined, "Missing catalog table")
+
+    val table = logicalRelation.catalogTable.get
+    InsertIntoHoodieTableCommand.run(sparkSession, table, query, partition, 
overwrite)
+    Seq.empty[Row]
+  }
+}
+
+object InsertIntoHoodieTableCommand {
+  /**
+    * Run the insert query. We support both dynamic partition insert and 
static partition insert.
+    * @param sparkSession The spark session.
+    * @param table The insert table.
+    * @param query The insert query.
+    * @param insertPartitions The specified insert partition map.
+    *                         e.g. "insert into h(dt = '2021') select id, name 
from src"
+    *                         "dt" is the key in the map and "2021" is the 
partition value. If the
+    *                         partition value has not specified(in the case of 
dynamic partition)
+    *                         , it is None in the map.
+    * @param overwrite Whether to overwrite the table.
+    */
+  def run(sparkSession: SparkSession, table: CatalogTable, query: LogicalPlan,
+          insertPartitions: Map[String, Option[String]],
+          overwrite: Boolean): Unit = {
+
+    val config = if (table.schema.fields.nonEmpty) { // for insert into
+      buildHoodieInsertConfig(table, sparkSession, insertPartitions)
+    } else { // It is CTAS if the table schema is empty, we use the schema 
from the query.
+      buildHoodieInsertConfig(table, sparkSession, insertPartitions, 
Some(query.schema))
+    }
+
+    val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
+    val parameters = HoodieWriterUtils.parametersWithWriteDefaults(config)
+    val queryData = Dataset.ofRows(sparkSession, query)
+    val conf = sparkSession.sessionState.conf
+    val alignedQuery = alignOutputFields(queryData, table, insertPartitions, 
conf)
+    HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, parameters, 
alignedQuery)
+    sparkSession.catalog.refreshTable(table.identifier.unquotedString)
+  }
+
+  /**
+    * Aligned the type and name of query's output fields with the result 
table's fields.
+    * @param query The insert query which to aligned.
+    * @param table The result table.
+    * @param insertPartitions The insert partition map.
+    * @param conf The SQLConf.
+    * @return
+    */
+  private def alignOutputFields(
+    query: DataFrame,
+    table: CatalogTable,
+    insertPartitions: Map[String, Option[String]],
+    conf: SQLConf): DataFrame = {
+
+    val targetPartitionSchema = table.partitionSchema
+
+    val staticPartitionValues = insertPartitions.filter(p => 
p._2.isDefined).mapValues(_.get)
+    assert(staticPartitionValues.isEmpty ||
+      staticPartitionValues.size == targetPartitionSchema.size,
+      s"Required partition columns is: ${targetPartitionSchema.json}, Current 
static partitions " +
+        s"is: ${staticPartitionValues.mkString("," + "")}")
+
+    val queryDataFields = if (staticPartitionValues.isEmpty) { // insert 
dynamic partition
+      query.logicalPlan.output.dropRight(targetPartitionSchema.fields.length)
+    } else { // insert static partition
+      query.logicalPlan.output
+    }
+    val targetDataSchema = if (table.schema.fields.nonEmpty) {
+      table.dataSchema
+    } else { // for CTAS
+      query.schema
+    }
+    // Align for the data fields of the query
+    val dataProjects = queryDataFields.zip(targetDataSchema.fields).map {
+      case (dataAttr, targetField) =>
+        val castAttr = castIfNeeded(dataAttr,
+          targetField.dataType, conf)
+        new Column(Alias(castAttr, targetField.name)())
+    }
+
+    val partitionProjects = if (staticPartitionValues.isEmpty) { // insert 
dynamic partitions
+      // The partition attributes is followed the data attributes in the query
+      // So we init the partitionAttrPosition with the data schema size.
+      var partitionAttrPosition = targetDataSchema.size
+      targetPartitionSchema.fields.map(f => {
+        val partitionAttr = query.logicalPlan.output(partitionAttrPosition)
+        partitionAttrPosition = partitionAttrPosition + 1
+        // Cast the partition attribute to the target table field's type.
+        val castAttr = castIfNeeded(partitionAttr, f.dataType, conf)
+        new Column(Alias(castAttr, f.name)())
+      })
+    } else { // insert static partitions
+      targetPartitionSchema.fields.map(f => {
+        val staticPartitionValue = staticPartitionValues.getOrElse(f.name,
+        s"Missing static partition value for: ${f.name}")
+        // Cast the static partition value to the target table field's type.
+        val castAttr = castIfNeeded(
+          Literal.create(staticPartitionValue, StringType), f.dataType, conf)
+        new Column(Alias(castAttr, f.name)())
+      })
+    }
+    val alignedProjects = dataProjects ++ partitionProjects
+    query.select(alignedProjects: _*)
+  }
+
+  /**
+    * Build the default config for insert.
+    * @return
+    */
+  private def buildHoodieInsertConfig(table: CatalogTable,
+                              sparkSession: SparkSession,
+                              insertPartitions: Map[String, Option[String]] = 
Map.empty,
+                              schema: Option[StructType] = None): Map[String, 
String] = {
+
+    if (insertPartitions.nonEmpty &&
+      (insertPartitions.keys.toSet != table.partitionColumnNames.toSet)) {
+      throw new IllegalArgumentException(s"Insert partition fields" +
+        s"[${insertPartitions.keys.mkString(" " )}]" +
+        s" not equal to the defined partition in 
table[${table.partitionColumnNames.mkString(",")}]")
+    }
+    val parameters = 
HoodieOptionConfig.mappingSqlOptionToHoodieParam(table.storage.properties)
+
+    val tableType = parameters.getOrElse(TABLE_TYPE_OPT_KEY, 
DEFAULT_TABLE_TYPE_OPT_VAL)
+
+    val partitionFields = table.partitionColumnNames.mkString(",")
+    val path = getTableLocation(table, sparkSession)
+      .getOrElse(s"Missing location for table ${table.identifier}")
+
+    val tableSchema = schema.getOrElse(table.schema)
+    val options = table.storage.properties
+    val primaryColumns = HoodieOptionConfig.getPrimaryColumns(options)
+
+    val keyGenClass = if (primaryColumns.nonEmpty) {
+      classOf[ComplexKeyGenerator].getCanonicalName
+    } else {
+      classOf[UuidKeyGenerator].getName

Review comment:
       I noticed current implementation allows creating a table without a 
primary key and allow to insert but not update or delete, I wonder what's use 
case of this?
   I couldn't find the way to alter table with storage.properties, I wanted to 
try changing non-primary-key table into primary-key table, is this possible?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to