This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4fd425d79c8 [HUDI-6470] Add spark sql conf in AlterTableCommand (#9116)
4fd425d79c8 is described below

commit 4fd425d79c8dfb75f7ab4926c2e7a85966ca6790
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Jul 6 09:57:22 2023 +0800

    [HUDI-6470] Add spark sql conf in AlterTableCommand (#9116)
---
 .../AlterHoodieTableAddColumnsCommand.scala        |  3 +-
 .../org/apache/spark/sql/hudi/TestAlterTable.scala | 48 ++++++++++++++++++++++
 .../sql/hudi/TestAlterTableDropPartition.scala     |  3 +-
 .../hudi/command/Spark30AlterTableCommand.scala    | 38 ++++++-----------
 .../hudi/command/Spark31AlterTableCommand.scala    | 22 +++++-----
 .../spark/sql/hudi/command/AlterTableCommand.scala | 22 +++++-----
 6 files changed, 86 insertions(+), 50 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
index e333ba8a7b1..a9876ae9d78 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
@@ -108,7 +108,8 @@ object AlterHoodieTableAddColumnsCommand extends 
SparkAdapterSupport {
       writeSchema.toString,
       hoodieCatalogTable.tableLocation,
       hoodieCatalogTable.tableName,
-      
HoodieWriterUtils.parametersWithWriteDefaults(HoodieOptionConfig.mapSqlOptionsToDataSourceWriteConfigs(hoodieCatalogTable.catalogProperties)).asJava
+      
HoodieWriterUtils.parametersWithWriteDefaults(HoodieOptionConfig.mapSqlOptionsToDataSourceWriteConfigs(
+        hoodieCatalogTable.catalogProperties) ++ 
sparkSession.sqlContext.conf.getAllConfs).asJava
     )
 
     val commitActionType = 
CommitUtils.getCommitActionType(WriteOperationType.ALTER_SCHEMA, 
hoodieCatalogTable.tableType)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala
index de8ac76a99b..b3cd9e497f5 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala
@@ -379,4 +379,52 @@ class TestAlterTable extends HoodieSparkSqlTestBase {
       }
     }
   }
+
+  test("Test Alter Table With Spark Sql Conf") {
+    withTempDir { tmp =>
+      Seq(true, false).foreach { cleanEnable =>
+        val tableName = generateTableName
+        val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | location '$tablePath'
+             | tblproperties (
+             |  type = 'cow',
+             |  primaryKey = 'id',
+             |  preCombineField = 'ts',
+             |  hoodie.clean.trigger.strategy = 'NUM_COMMITS',
+             |  hoodie.cleaner.commits.retained = '3'
+             | )
+       """.stripMargin)
+
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+        spark.sql(s"update $tableName set name = 'a2' where id = 1")
+        spark.sql(s"update $tableName set name = 'a3' where id = 1")
+        spark.sql(s"update $tableName set name = 'a4' where id = 1")
+
+        withSQLConf("hoodie.clean.automatic" -> cleanEnable.toString) {
+          spark.sql(s"alter table $tableName add columns(ext0 string)")
+        }
+
+        val metaClient = HoodieTableMetaClient.builder
+          .setConf(spark.sqlContext.sessionState.newHadoopConf())
+          .setBasePath(tablePath)
+          .build
+
+        val cnt = metaClient.getActiveTimeline.countInstants()
+        if (cleanEnable) {
+          assertResult(6)(cnt)
+        } else {
+          assertResult(5)(cnt)
+        }
+      }
+    }
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
index d1ffa66edf8..ea489252c14 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hudi
 
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.avro.model.{HoodieCleanMetadata, 
HoodieCleanPartitionMetadata}
-import org.apache.hudi.{HoodieCLIUtils, HoodieSparkUtils}
 import org.apache.hudi.common.model.{HoodieCleaningPolicy, 
HoodieCommitMetadata}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, 
HoodieInstant}
@@ -28,7 +27,7 @@ import org.apache.hudi.config.{HoodieCleanConfig, 
HoodieWriteConfig}
 import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator}
 import org.apache.hudi.{HoodieCLIUtils, HoodieSparkUtils}
 import org.apache.spark.sql.SaveMode
-import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase.{getLastCleanMetadata, 
getLastCommitMetadata}
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase.getLastCleanMetadata
 import org.junit.jupiter.api.Assertions
 import org.junit.jupiter.api.Assertions.assertTrue
 
diff --git 
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark30AlterTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark30AlterTableCommand.scala
index 7bd1fda77ef..22aea4c53e2 100644
--- 
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark30AlterTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark30AlterTableCommand.scala
@@ -17,38 +17,37 @@
 
 package org.apache.spark.sql.hudi.command
 
-import java.net.URI
-import java.nio.charset.StandardCharsets
-import java.util
-import java.util.concurrent.atomic.AtomicInteger
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.{DataSourceUtils, HoodieWriterUtils}
 import org.apache.hudi.avro.AvroSchemaUtils.getAvroRecordQualifiedName
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
 import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType}
-import org.apache.hudi.{AvroConversionUtils, DataSourceOptionsHelper, 
DataSourceUtils}
-import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, 
HoodieInstant}
 import org.apache.hudi.common.table.timeline.HoodieInstant.State
+import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, 
HoodieInstant}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.util.{CommitUtils, Option}
-import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.action.TableChange.ColumnChangeID
 import org.apache.hudi.internal.schema.action.TableChanges
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
-import org.apache.hudi.internal.schema.utils.{SchemaChangeUtils, SerDeHelper}
 import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager
+import org.apache.hudi.internal.schema.utils.{SchemaChangeUtils, SerDeHelper}
 import org.apache.hudi.table.HoodieSparkTable
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
-import org.apache.spark.sql.connector.catalog.{TableCatalog, TableChange}
 import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, 
DeleteColumn, RemoveProperty, SetProperty}
+import org.apache.spark.sql.connector.catalog.{TableCatalog, TableChange}
 import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.hudi.HoodieOptionConfig
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{Row, SparkSession}
 
+import java.net.URI
+import java.nio.charset.StandardCharsets
+import java.util
+import java.util.concurrent.atomic.AtomicInteger
 import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
 
@@ -218,7 +217,9 @@ object Spark30AlterTableCommand extends Logging {
 
     val jsc = new JavaSparkContext(sparkSession.sparkContext)
     val client = DataSourceUtils.createHoodieClient(jsc, schema.toString,
-      path, table.identifier.table, 
parametersWithWriteDefaults(table.storage.properties).asJava)
+      path, table.identifier.table, 
HoodieWriterUtils.parametersWithWriteDefaults(
+        
HoodieOptionConfig.mapSqlOptionsToDataSourceWriteConfigs(table.storage.properties
 ++ table.properties) ++
+          sparkSession.sqlContext.conf.getAllConfs).asJava)
 
     val hadoopConf = sparkSession.sessionState.newHadoopConf()
     val metaClient = 
HoodieTableMetaClient.builder().setBasePath(path).setConf(hadoopConf).build()
@@ -298,18 +299,6 @@ object Spark30AlterTableCommand extends Logging {
     } else ""
   }
 
-  def parametersWithWriteDefaults(parameters: Map[String, String]): 
Map[String, String] = {
-    Map(OPERATION.key -> OPERATION.defaultValue,
-      TABLE_TYPE.key -> TABLE_TYPE.defaultValue,
-      PRECOMBINE_FIELD.key -> PRECOMBINE_FIELD.defaultValue,
-      HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key ->  
HoodieWriteConfig.DEFAULT_WRITE_PAYLOAD_CLASS,
-      INSERT_DROP_DUPS.key -> INSERT_DROP_DUPS.defaultValue,
-      ASYNC_COMPACT_ENABLE.key -> ASYNC_COMPACT_ENABLE.defaultValue,
-      INLINE_CLUSTERING_ENABLE.key -> INLINE_CLUSTERING_ENABLE.defaultValue,
-      ASYNC_CLUSTERING_ENABLE.key -> ASYNC_CLUSTERING_ENABLE.defaultValue
-    ) ++ DataSourceOptionsHelper.translateConfigurations(parameters)
-  }
-
   def checkSchemaChange(colNames: Seq[String], catalogTable: CatalogTable): 
Unit = {
     val primaryKeys = catalogTable.storage.properties.getOrElse("primaryKey", 
catalogTable.properties.getOrElse("primaryKey", "keyid")).split(",").map(_.trim)
     val preCombineKey = 
Seq(catalogTable.storage.properties.getOrElse("preCombineField", 
catalogTable.properties.getOrElse("preCombineField", "ts"))).map(_.trim)
@@ -322,4 +311,3 @@ object Spark30AlterTableCommand extends Logging {
     }
   }
 }
-
diff --git 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark31AlterTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark31AlterTableCommand.scala
index 7216a7acce4..a24a5d6b189 100644
--- 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark31AlterTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark31AlterTableCommand.scala
@@ -17,37 +17,37 @@
 
 package org.apache.spark.sql.hudi.command
 
-import java.net.URI
-import java.nio.charset.StandardCharsets
-import java.util
-import java.util.concurrent.atomic.AtomicInteger
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.avro.AvroSchemaUtils.getAvroRecordQualifiedName
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
 import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType}
-import org.apache.hudi.{DataSourceUtils, HoodieWriterUtils}
-import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, 
HoodieInstant}
 import org.apache.hudi.common.table.timeline.HoodieInstant.State
+import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, 
HoodieInstant}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.util.{CommitUtils, Option}
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.action.TableChange.ColumnChangeID
 import org.apache.hudi.internal.schema.action.TableChanges
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
-import org.apache.hudi.internal.schema.utils.{SchemaChangeUtils, SerDeHelper}
 import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager
+import org.apache.hudi.internal.schema.utils.{SchemaChangeUtils, SerDeHelper}
 import org.apache.hudi.table.HoodieSparkTable
+import org.apache.hudi.{DataSourceUtils, HoodieWriterUtils}
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
-import org.apache.spark.sql.connector.catalog.{TableCatalog, TableChange}
 import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, 
DeleteColumn, RemoveProperty, SetProperty}
+import org.apache.spark.sql.connector.catalog.{TableCatalog, TableChange}
 import org.apache.spark.sql.execution.command.RunnableCommand
 import org.apache.spark.sql.hudi.HoodieOptionConfig
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{Row, SparkSession}
 
+import java.net.URI
+import java.nio.charset.StandardCharsets
+import java.util
+import java.util.concurrent.atomic.AtomicInteger
 import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
 
@@ -218,7 +218,8 @@ object Spark31AlterTableCommand extends Logging {
     val jsc = new JavaSparkContext(sparkSession.sparkContext)
     val client = DataSourceUtils.createHoodieClient(jsc, schema.toString,
       path, table.identifier.table, 
HoodieWriterUtils.parametersWithWriteDefaults(
-        
HoodieOptionConfig.mapSqlOptionsToDataSourceWriteConfigs(table.storage.properties
 ++ table.properties)).asJava)
+        
HoodieOptionConfig.mapSqlOptionsToDataSourceWriteConfigs(table.storage.properties
 ++ table.properties) ++
+          sparkSession.sqlContext.conf.getAllConfs).asJava)
 
     val hadoopConf = sparkSession.sessionState.newHadoopConf()
     val metaClient = 
HoodieTableMetaClient.builder().setBasePath(path).setConf(hadoopConf).build()
@@ -310,4 +311,3 @@ object Spark31AlterTableCommand extends Logging {
     }
   }
 }
-
diff --git 
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
index b0a3a46c8b2..78972cf239d 100644
--- 
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
@@ -17,36 +17,36 @@
 
 package org.apache.spark.sql.hudi.command
 
-import java.net.URI
-import java.nio.charset.StandardCharsets
-import java.util
-import java.util.concurrent.atomic.AtomicInteger
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.avro.AvroSchemaUtils.getAvroRecordQualifiedName
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
 import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType}
-import org.apache.hudi.{DataSourceUtils, HoodieWriterUtils}
-import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, 
HoodieInstant}
 import org.apache.hudi.common.table.timeline.HoodieInstant.State
+import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, 
HoodieInstant}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.util.{CommitUtils, Option}
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.action.TableChange.ColumnChangeID
 import org.apache.hudi.internal.schema.action.TableChanges
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
-import org.apache.hudi.internal.schema.utils.{SchemaChangeUtils, SerDeHelper}
 import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager
+import org.apache.hudi.internal.schema.utils.{SchemaChangeUtils, SerDeHelper}
 import org.apache.hudi.table.HoodieSparkTable
+import org.apache.hudi.{DataSourceUtils, HoodieWriterUtils}
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
-import org.apache.spark.sql.connector.catalog.{TableCatalog, TableChange}
 import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, 
DeleteColumn, RemoveProperty, SetProperty}
+import org.apache.spark.sql.connector.catalog.{TableCatalog, TableChange}
 import org.apache.spark.sql.hudi.HoodieOptionConfig
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{Row, SparkSession}
 
+import java.net.URI
+import java.nio.charset.StandardCharsets
+import java.util
+import java.util.concurrent.atomic.AtomicInteger
 import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
 
@@ -253,7 +253,8 @@ object AlterTableCommand extends Logging {
     val jsc = new JavaSparkContext(sparkSession.sparkContext)
     val client = DataSourceUtils.createHoodieClient(jsc, schema.toString,
       path, table.identifier.table, 
HoodieWriterUtils.parametersWithWriteDefaults(
-        
HoodieOptionConfig.mapSqlOptionsToDataSourceWriteConfigs(table.storage.properties
 ++ table.properties)).asJava)
+        
HoodieOptionConfig.mapSqlOptionsToDataSourceWriteConfigs(table.storage.properties
 ++ table.properties) ++
+          sparkSession.sqlContext.conf.getAllConfs).asJava)
 
     val hadoopConf = sparkSession.sessionState.newHadoopConf()
     val metaClient = 
HoodieTableMetaClient.builder().setBasePath(path).setConf(hadoopConf).build()
@@ -333,4 +334,3 @@ object AlterTableCommand extends Logging {
     } else ""
   }
 }
-

Reply via email to