leesf commented on a change in pull request #3936:
URL: https://github.com/apache/hudi/pull/3936#discussion_r747552833
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
##########
@@ -38,92 +42,125 @@ import org.apache.spark.sql.hudi.HoodieOptionConfig
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import
org.apache.spark.sql.hudi.command.CreateHoodieTableCommand.{initTableIfNeed,
isEmptyPath}
import
org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.{SPARK_VERSION, SparkConf}
-import java.util.{Locale, Properties}
-import org.apache.hudi.exception.HoodieException
-import org.apache.hudi.keygen.ComplexKeyGenerator
-import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
+import java.util.{Locale, Properties}
import scala.collection.JavaConverters._
import scala.collection.mutable
+import scala.util.control.NonFatal
/**
* Command for create hoodie table.
*/
case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists:
Boolean)
extends RunnableCommand with SparkAdapterSupport {
- override def run(sparkSession: SparkSession): Seq[Row] = {
- val tableName = table.identifier.unquotedString
+ val tableName = formatName(table.identifier.table)
+
+ val tblProperties = table.storage.properties ++ table.properties
+ override def run(sparkSession: SparkSession): Seq[Row] = {
val tableIsExists =
sparkSession.sessionState.catalog.tableExists(table.identifier)
if (tableIsExists) {
if (ignoreIfExists) {
// scalastyle:off
return Seq.empty[Row]
// scalastyle:on
} else {
- throw new IllegalArgumentException(s"Table $tableName already exists.")
+ throw new IllegalArgumentException(s"Table
${table.identifier.unquotedString} already exists.")
}
}
- // Create table in the catalog
- val createTable = createTableInCatalog(sparkSession)
+
+ // get schema with meta fields, table config if hudi table exists, options
including
+ // table configs and properties of the catalog table
+ val path = getTableLocation(table, sparkSession)
+ val (finalSchema, existingTableConfig, tableSqlOptions) =
parseSchemaAndConfigs(sparkSession, path)
+
// Init the hoodie.properties
- initTableIfNeed(sparkSession, createTable)
+ initTableIfNeed(sparkSession, tableName, path, finalSchema,
+ table.partitionColumnNames, existingTableConfig, tableSqlOptions)
+
+ try {
+ // Create table in the catalog
+ createTableInCatalog(sparkSession, finalSchema, tableSqlOptions)
+ } catch {
+ case NonFatal(e) =>
+ logWarning(s"Failed to create catalog table in metastore:
${e.getMessage}")
+ e.printStackTrace()
Review comment:
remove this line?
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
##########
@@ -102,4 +106,75 @@ object HoodieWriterUtils {
properties.putAll(mapAsJavaMap(parameters))
new HoodieConfig(properties)
}
+
+ def getRealKeyGenerator(hoodieConfig: HoodieConfig): String = {
+ val kg = hoodieConfig.getString(KEYGENERATOR_CLASS_NAME.key())
+ if (classOf[SqlKeyGenerator].getCanonicalName == kg) {
+ hoodieConfig.getString(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME)
+ } else {
+ kg
+ }
+ }
+
+ /**
+ * Detects conflicts between new parameters and existing table configurations
+ */
+ def validateTableConfig(spark: SparkSession, params: Map[String, String],
+ tableConfig: HoodieConfig): Unit = {
+ val resolver = spark.sessionState.conf.resolver
+ val diffConfigs = StringBuilder.newBuilder
+ params.foreach { case (key, value) =>
+ val existingValue =
getStringFromTableConfigWithAlternatives(tableConfig, key)
+ if (null != existingValue && !resolver(existingValue, value)) {
+ diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(key)}\n")
+ }
+ }
+
+ if (null != tableConfig) {
+ val datasourceRecordKey = params.getOrElse(RECORDKEY_FIELD.key(), null)
+ val tableConfigRecordKey =
tableConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS)
+ if (null != datasourceRecordKey && null != tableConfigRecordKey
+ && datasourceRecordKey != tableConfigRecordKey) {
+
diffConfigs.append(s"RecordKey:\t$datasourceRecordKey\t$tableConfigRecordKey\n")
+ }
+
+ val datasourcePreCombineKey = params.getOrElse(PRECOMBINE_FIELD.key(),
null)
+ val tableConfigPreCombineKey =
tableConfig.getString(HoodieTableConfig.PRECOMBINE_FIELD)
+ if (null != datasourcePreCombineKey && null != tableConfigPreCombineKey
+ && datasourcePreCombineKey != tableConfigPreCombineKey) {
+
diffConfigs.append(s"PreCombineKey:\t$datasourcePreCombineKey\t$tableConfigPreCombineKey\n")
+ }
+
+ val datasourceKeyGen = {
+ val kg = params.getOrElse(KEYGENERATOR_CLASS_NAME.key(), null)
+ if (classOf[SqlKeyGenerator].getCanonicalName == kg) {
Review comment:
duplicate code above in `getRealKeyGenerator `
##########
File path:
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala
##########
@@ -38,7 +38,7 @@ class TestAlterTable extends TestHoodieSqlBase {
| ts long
|) using hudi
| location '$tablePath'
- | options (
+ | tblproperties (
Review comment:
here means users would not use options?
##########
File path:
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
##########
@@ -62,6 +62,53 @@ class TestCreateTable extends TestHoodieSqlBase {
)(table.schema.fields)
}
+ test("Test Create Hoodie Table With Options") {
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ | create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long,
+ | dt string
+ | ) using hudi
+ | partitioned by (dt)
+ | options (
Review comment:
we would change here to tblproperties and rename the test description?
##########
File path:
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
##########
@@ -475,8 +478,8 @@ class TestDataSourceForBootstrap {
}
def runMetadataBootstrapAndVerifyCommit(tableType: String,
- partitionColumns: Option[String] =
None,
- extraOpts: Map[String, String] =
Map.empty): String = {
+ partitionColumns: Option[String] = None,
Review comment:
no need to change the format?
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
##########
@@ -38,92 +42,125 @@ import org.apache.spark.sql.hudi.HoodieOptionConfig
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import
org.apache.spark.sql.hudi.command.CreateHoodieTableCommand.{initTableIfNeed,
isEmptyPath}
import
org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.{SPARK_VERSION, SparkConf}
-import java.util.{Locale, Properties}
-import org.apache.hudi.exception.HoodieException
-import org.apache.hudi.keygen.ComplexKeyGenerator
-import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
+import java.util.{Locale, Properties}
import scala.collection.JavaConverters._
import scala.collection.mutable
+import scala.util.control.NonFatal
/**
* Command for create hoodie table.
*/
case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists:
Boolean)
extends RunnableCommand with SparkAdapterSupport {
- override def run(sparkSession: SparkSession): Seq[Row] = {
- val tableName = table.identifier.unquotedString
+ val tableName = formatName(table.identifier.table)
+
+ val tblProperties = table.storage.properties ++ table.properties
+ override def run(sparkSession: SparkSession): Seq[Row] = {
val tableIsExists =
sparkSession.sessionState.catalog.tableExists(table.identifier)
if (tableIsExists) {
if (ignoreIfExists) {
// scalastyle:off
return Seq.empty[Row]
// scalastyle:on
} else {
- throw new IllegalArgumentException(s"Table $tableName already exists.")
+ throw new IllegalArgumentException(s"Table
${table.identifier.unquotedString} already exists.")
}
}
- // Create table in the catalog
- val createTable = createTableInCatalog(sparkSession)
+
+ // get schema with meta fields, table config if hudi table exists, options
including
+ // table configs and properties of the catalog table
+ val path = getTableLocation(table, sparkSession)
+ val (finalSchema, existingTableConfig, tableSqlOptions) =
parseSchemaAndConfigs(sparkSession, path)
+
// Init the hoodie.properties
- initTableIfNeed(sparkSession, createTable)
+ initTableIfNeed(sparkSession, tableName, path, finalSchema,
Review comment:
too many args here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]