[
https://issues.apache.org/jira/browse/HUDI-1842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17394865#comment-17394865
]
ASF GitHub Bot commented on HUDI-1842:
--------------------------------------
nsivabalan commented on a change in pull request #3393:
URL: https://github.com/apache/hudi/pull/3393#discussion_r684350411
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
##########
@@ -301,47 +318,102 @@ case class CreateHoodieTableCommand(table: CatalogTable,
ignoreIfExists: Boolean
s"'${HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_MOR}'")
}
}
+
+ private def getAllPartitionPaths(spark: SparkSession, table: CatalogTable):
Seq[String] = {
+ val sparkEngine = new HoodieSparkEngineContext(new
JavaSparkContext(spark.sparkContext))
+ val metadataConfig = {
+ val properties = new Properties()
+ properties.putAll((spark.sessionState.conf.getAllConfs ++
table.storage.properties).asJava)
+ HoodieMetadataConfig.newBuilder.fromProperties(properties).build()
+ }
+ FSUtils.getAllPartitionPaths(sparkEngine, metadataConfig,
getTableLocation(table, spark)).asScala
+ }
+
+ /**
+ * This method is used to compatible with the old non-hive-styled partition
table.
+ * By default we enable the "hoodie.datasource.write.hive_style_partitioning"
+ * when writing data to hudi table by spark sql by default.
+ * If the exist table is a non-hive-styled partitioned table, we should
+ * disable the "hoodie.datasource.write.hive_style_partitioning" when
+ * merge or update the table. Or else, we will get an incorrect merge result
+ * as the partition path mismatch.
+ */
+ private def isNotHiveStyledPartitionTable(partitionPaths: Seq[String],
table: CatalogTable): Boolean = {
+ if (table.partitionColumnNames.nonEmpty) {
+ val isHiveStylePartitionPath = (path: String) => {
+ val fragments = path.split("/")
+ if (fragments.size != table.partitionColumnNames.size) {
+ false
+ } else {
+ fragments.zip(table.partitionColumnNames).forall {
+ case (pathFragment, partitionColumn) =>
pathFragment.startsWith(s"$partitionColumn=")
+ }
+ }
+ }
+ !partitionPaths.forall(isHiveStylePartitionPath)
+ } else {
+ false
+ }
+ }
+
+ /**
+ * If this table has disable the url encode, spark sql should also disable
it when writing to the table.
+ */
+ private def isUrlEncodeDisable(partitionPaths: Seq[String], table:
CatalogTable): Boolean = {
+ if (table.partitionColumnNames.nonEmpty) {
+ !partitionPaths.forall(partitionPath => partitionPath.split("/").length
== table.partitionColumnNames.size)
+ } else {
+ false
+ }
+ }
+
}
object CreateHoodieTableCommand extends Logging {
/**
- * Init the table if it is not exists.
- * @param sparkSession
- * @param table
- * @return
+ * Init the hoodie.properties.
*/
def initTableIfNeed(sparkSession: SparkSession, table: CatalogTable): Unit =
{
- val location = getTableLocation(table, sparkSession).getOrElse(
- throw new IllegalArgumentException(s"Missing location for
${table.identifier}"))
+ val location = getTableLocation(table, sparkSession)
val conf = sparkSession.sessionState.newHadoopConf()
// Init the hoodie table
- if (!tableExistsInPath(location, conf)) {
- val tableName = table.identifier.table
- logInfo(s"Table $tableName is not exists, start to create the hudi
table")
+ val originTableConfig = if (tableExistsInPath(location, conf)) {
+ val metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(location)
+ .setConf(conf)
+ .build()
+ metaClient.getTableConfig.getProps.asScala.toMap
+ } else {
+ Map.empty[String, String]
+ }
- // Save all the table config to the hoodie.properties.
- val parameters =
HoodieOptionConfig.mappingSqlOptionToTableConfig(table.storage.properties)
- val properties = new Properties()
+ val tableName = table.identifier.table
+ logInfo(s"Init hoodie.properties for $tableName")
+ val tableOptions =
HoodieOptionConfig.mappingSqlOptionToTableConfig(table.storage.properties)
+ checkTableConfigEqual(originTableConfig, tableOptions,
HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD_PROP.key)
+ checkTableConfigEqual(originTableConfig, tableOptions,
HoodieTableConfig.HOODIE_TABLE_PARTITION_FIELDS_PROP.key)
+ checkTableConfigEqual(originTableConfig, tableOptions,
HoodieTableConfig.HOODIE_TABLE_RECORDKEY_FIELDS.key)
+ // Save all the table config to the hoodie.properties.
+ val parameters = originTableConfig ++ tableOptions
+ val properties = new Properties()
properties.putAll(parameters.asJava)
HoodieTableMetaClient.withPropertyBuilder()
- .fromProperties(properties)
- .setTableName(tableName)
-
.setTableCreateSchema(SchemaConverters.toAvroType(table.schema).toString())
- .setPartitionFields(table.partitionColumnNames.mkString(","))
- .initTable(conf, location)
- }
+ .fromProperties(properties)
+ .setTableName(tableName)
+
.setTableCreateSchema(SchemaConverters.toAvroType(table.schema).toString())
+ .setPartitionFields(table.partitionColumnNames.mkString(","))
Review comment:
don't we need to set record key fields here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
> [SQL] Spark Sql Support For The Exists Hoodie Table
> ---------------------------------------------------
>
> Key: HUDI-1842
> URL: https://issues.apache.org/jira/browse/HUDI-1842
> Project: Apache Hudi
> Issue Type: Sub-task
> Reporter: pengzhiwei
> Assignee: pengzhiwei
> Priority: Blocker
> Labels: pull-request-available, release-blocker
> Fix For: 0.9.0
>
>
> In order to support spark sql for hoodie, we persist some table properties to
> the hoodie.properties. e.g. primaryKey, preCombineField, partition columns.
> For the exists hoodie tables, these properties are missing. We need do some
> code in UpgradeDowngrade to support spark sql for the exists tables.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)