xushiyan commented on a change in pull request #3936:
URL: https://github.com/apache/hudi/pull/3936#discussion_r747299001
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
##########
@@ -242,8 +244,13 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Runnab
// Append the table schema to the parameters. In the case of merge into,
the schema of sourceDF
// may be different from the target table, because the are transform
logical in the update or
// insert actions.
+ val opertion = if
(StringUtils.isNullOrEmpty(parameters.getOrElse(PRECOMBINE_FIELD.key, ""))) {
Review comment:
```suggestion
val operation = if
(StringUtils.isNullOrEmpty(parameters.getOrElse(PRECOMBINE_FIELD.key, ""))) {
```
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
##########
@@ -38,92 +42,125 @@ import org.apache.spark.sql.hudi.HoodieOptionConfig
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import
org.apache.spark.sql.hudi.command.CreateHoodieTableCommand.{initTableIfNeed,
isEmptyPath}
import
org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.{SPARK_VERSION, SparkConf}
-import java.util.{Locale, Properties}
-import org.apache.hudi.exception.HoodieException
-import org.apache.hudi.keygen.ComplexKeyGenerator
-import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
+import java.util.{Locale, Properties}
import scala.collection.JavaConverters._
import scala.collection.mutable
+import scala.util.control.NonFatal
/**
* Command for create hoodie table.
*/
case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists:
Boolean)
extends RunnableCommand with SparkAdapterSupport {
- override def run(sparkSession: SparkSession): Seq[Row] = {
- val tableName = table.identifier.unquotedString
+ val tableName = formatName(table.identifier.table)
+
+ val tblProperties = table.storage.properties ++ table.properties
+ override def run(sparkSession: SparkSession): Seq[Row] = {
val tableIsExists =
sparkSession.sessionState.catalog.tableExists(table.identifier)
if (tableIsExists) {
if (ignoreIfExists) {
// scalastyle:off
return Seq.empty[Row]
// scalastyle:on
} else {
- throw new IllegalArgumentException(s"Table $tableName already exists.")
+ throw new IllegalArgumentException(s"Table
${table.identifier.unquotedString} already exists.")
}
}
- // Create table in the catalog
- val createTable = createTableInCatalog(sparkSession)
+
+ // get schema with meta fields, table config if hudi table exists, options
including
+ // table configs and properties of the catalog tabe
Review comment:
```suggestion
// table configs and properties of the catalog table
```
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
##########
@@ -171,7 +180,48 @@ object HoodieOptionConfig {
def getPreCombineField(options: Map[String, String]): Option[String] = {
val params = mappingSqlOptionToHoodieParam(options)
- params.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key)
+ params.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key).filter(_.nonEmpty)
+ }
+
+ def deleteHooideOptions(options: Map[String, String]): Map[String, String] =
{
Review comment:
think now it's a good time to add UT for HoodieOptionConfig, at least to
cover the new and changed methods.
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
##########
@@ -242,8 +244,13 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Runnab
// Append the table schema to the parameters. In the case of merge into,
the schema of sourceDF
// may be different from the target table, because the are transform
logical in the update or
// insert actions.
+ val opertion = if
(StringUtils.isNullOrEmpty(parameters.getOrElse(PRECOMBINE_FIELD.key, ""))) {
Review comment:
`PRECOMBINE_FIELD` is meant to determine ordering. If no ordering
specified, then we perform insert. This looks like a new behavior, which I'm ok
with. Just want to make sure this will be documented somewhere for users to
look up.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]