[
https://issues.apache.org/jira/browse/HUDI-1842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392439#comment-17392439
]
ASF GitHub Bot commented on HUDI-1842:
--------------------------------------
nsivabalan commented on a change in pull request #3393:
URL: https://github.com/apache/hudi/pull/3393#discussion_r681950625
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
##########
@@ -268,7 +269,25 @@ case class HoodieResolveReferences(sparkSession:
SparkSession) extends Rule[Logi
} else {
l
}
-
+ // Fill schema for Create Table without specify schema info
+ case c @ CreateTable(tableDesc, _, _)
+ if isHoodieTable(tableDesc) =>
+ val tablePath = getTableLocation(c.tableDesc, sparkSession)
+ .getOrElse(s"Missing location defined in table
${c.tableDesc.identifier}")
Review comment:
is it that for a new table thats getting created, tablePath will be set
to "Missing location defined in table ..." ? In other words, if table already
existed, tablePath will be set to right value, if not, it will be set to this
string.
Is my understanding right? If so, why can't we return right away if there
table does not exist to maintain the same flow as before for tables that are
just getting created.
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
##########
@@ -268,7 +269,25 @@ case class HoodieResolveReferences(sparkSession:
SparkSession) extends Rule[Logi
} else {
l
}
-
+ // Fill schema for Create Table without specify schema info
+ case c @ CreateTable(tableDesc, _, _)
+ if isHoodieTable(tableDesc) =>
+ val tablePath = getTableLocation(c.tableDesc, sparkSession)
+ .getOrElse(s"Missing location defined in table
${c.tableDesc.identifier}")
+ val metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(tablePath)
+ .setConf(sparkSession.sessionState.newHadoopConf())
+ .build()
+ val tableSchema =
HoodieSqlUtils.getTableSqlSchema(metaClient).map(HoodieSqlUtils.addMetaFields)
+ if (tableSchema.isDefined && tableDesc.schema.isEmpty) {
+ // Fill the schema with the schema from the table
+ c.copy(tableDesc.copy(schema = tableSchema.get))
+ } else if (tableSchema.isDefined && tableDesc.schema !=
tableSchema.get) {
Review comment:
should we do "!=" or ".equals()" for schema comparison?
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
##########
@@ -306,34 +299,49 @@ case class CreateHoodieTableCommand(table: CatalogTable,
ignoreIfExists: Boolean
object CreateHoodieTableCommand extends Logging {
/**
- * Init the table if it is not exists.
- * @param sparkSession
- * @param table
- * @return
+ * Init the hoodie.properties.
*/
def initTableIfNeed(sparkSession: SparkSession, table: CatalogTable): Unit =
{
val location = getTableLocation(table, sparkSession).getOrElse(
throw new IllegalArgumentException(s"Missing location for
${table.identifier}"))
val conf = sparkSession.sessionState.newHadoopConf()
// Init the hoodie table
- if (!tableExistsInPath(location, conf)) {
- val tableName = table.identifier.table
- logInfo(s"Table $tableName is not exists, start to create the hudi
table")
+ val originTableConfig = if (tableExistsInPath(location, conf)) {
+ val metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(location)
+ .setConf(conf)
+ .build()
+ metaClient.getTableConfig.getProps.asScala.toMap
+ } else {
+ Map.empty[String, String]
+ }
- // Save all the table config to the hoodie.properties.
- val parameters =
HoodieOptionConfig.mappingSqlOptionToTableConfig(table.storage.properties)
- val properties = new Properties()
+ val tableName = table.identifier.table
+ logInfo(s"Init hoodie.properties for $tableName")
+ val tableOptions =
HoodieOptionConfig.mappingSqlOptionToTableConfig(table.storage.properties)
+ checkTableConfigEqual(originTableConfig, tableOptions,
HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD_PROP.key)
+ checkTableConfigEqual(originTableConfig, tableOptions,
HoodieTableConfig.HOODIE_TABLE_PARTITION_FIELDS_PROP.key)
Review comment:
may I know why record key field is not validated?
##########
File path:
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
##########
@@ -272,4 +278,48 @@ class TestCreateTable extends TestHoodieSqlBase {
)
}
}
+
+ test("Test Create Table From Exist Hoodie Table") {
+ withTempDir{tmp =>
+ // Write a table by spark dataframe.
+ val tableName = generateTableName
+ import spark.implicits._
+ val df = Seq((1, "a1", 10, 1000, "2021-08-02")).toDF("id", "name",
"value", "ts", "dt")
+ df.write.format("hudi")
+ .option(HoodieWriteConfig.TABLE_NAME.key, tableName)
+ .option(TABLE_TYPE_OPT_KEY.key, COW_TABLE_TYPE_OPT_VAL)
+ .option(RECORDKEY_FIELD_OPT_KEY.key, "id")
+ .option(PRECOMBINE_FIELD_OPT_KEY.key, "ts")
+ .option(PARTITIONPATH_FIELD_OPT_KEY.key, "dt")
+ .option(KEYGENERATOR_CLASS_OPT_KEY.key,
classOf[ComplexKeyGenerator].getName)
+ .option(HoodieWriteConfig.INSERT_PARALLELISM.key, "1")
+ .option(HoodieWriteConfig.UPSERT_PARALLELISM.key, "1")
+ .mode(SaveMode.Overwrite)
+ .save(tmp.getCanonicalPath)
+
+ // Create a table over the exist old table.
+ spark.sql(
+ s"""
+ |create table $tableName using hudi
+ | options (
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ |)
+ |partitioned by (dt)
+ |location '${tmp.getCanonicalPath}'
+ |""".stripMargin)
+ checkAnswer(s"select id, name, value, ts, dt from $tableName")(
+ Seq(1, "a1", 10, 1000, "2021-08-02")
+ )
+ // Check the missing properties for spark sql
+ val metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(tmp.getCanonicalPath)
+ .setConf(spark.sessionState.newHadoopConf())
+ .build()
+ val properties = metaClient.getTableConfig.getProps.asScala.toMap
+
assertResult(true)(properties.contains(HoodieTableConfig.HOODIE_TABLE_CREATE_SCHEMA.key))
+
assertResult("dt")(properties(HoodieTableConfig.HOODIE_TABLE_PARTITION_FIELDS_PROP.key))
+
assertResult("ts")(properties(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD_PROP.key))
Review comment:
Can we do some basic table operations as well.
1. create hudi table w/ spark data source with few records(lets call this as
old record).
2. create the same table in spark-sql.
a. ensure table props match
b. INSERT into works
c. UPDATE -> updates both old records and new records
d. DELETE -> again, ensure both old records and new records that matched
are deleted..
e. MERGE INTO -> ensure there is a match for both old record and a new
record as well. both should get updated.
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
##########
@@ -268,7 +269,25 @@ case class HoodieResolveReferences(sparkSession:
SparkSession) extends Rule[Logi
} else {
l
}
-
+ // Fill schema for Create Table without specify schema info
+ case c @ CreateTable(tableDesc, _, _)
+ if isHoodieTable(tableDesc) =>
+ val tablePath = getTableLocation(c.tableDesc, sparkSession)
+ .getOrElse(s"Missing location defined in table
${c.tableDesc.identifier}")
Review comment:
Or will this case c @ CreateTable(tableDesc, _, _) will be invoked only
if table already exists?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
> [SQL] Spark Sql Support For The Exists Hoodie Table
> ---------------------------------------------------
>
> Key: HUDI-1842
> URL: https://issues.apache.org/jira/browse/HUDI-1842
> Project: Apache Hudi
> Issue Type: Sub-task
> Reporter: pengzhiwei
> Assignee: pengzhiwei
> Priority: Blocker
> Labels: pull-request-available, release-blocker
> Fix For: 0.9.0
>
>
> In order to support spark sql for hoodie, we persist some table properties to
> the hoodie.properties. e.g. primaryKey, preCombineField, partition columns.
> For the exists hoodie tables, these properties are missing. We need do some
> code in UpgradeDowngrade to support spark sql for the exists tables.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)