This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4fd425d79c8 [HUDI-6470] Add spark sql conf in AlterTableCommand (#9116)
4fd425d79c8 is described below
commit 4fd425d79c8dfb75f7ab4926c2e7a85966ca6790
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Jul 6 09:57:22 2023 +0800
[HUDI-6470] Add spark sql conf in AlterTableCommand (#9116)
---
.../AlterHoodieTableAddColumnsCommand.scala | 3 +-
.../org/apache/spark/sql/hudi/TestAlterTable.scala | 48 ++++++++++++++++++++++
.../sql/hudi/TestAlterTableDropPartition.scala | 3 +-
.../hudi/command/Spark30AlterTableCommand.scala | 38 ++++++-----------
.../hudi/command/Spark31AlterTableCommand.scala | 22 +++++-----
.../spark/sql/hudi/command/AlterTableCommand.scala | 22 +++++-----
6 files changed, 86 insertions(+), 50 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
index e333ba8a7b1..a9876ae9d78 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
@@ -108,7 +108,8 @@ object AlterHoodieTableAddColumnsCommand extends
SparkAdapterSupport {
writeSchema.toString,
hoodieCatalogTable.tableLocation,
hoodieCatalogTable.tableName,
-
HoodieWriterUtils.parametersWithWriteDefaults(HoodieOptionConfig.mapSqlOptionsToDataSourceWriteConfigs(hoodieCatalogTable.catalogProperties)).asJava
+
HoodieWriterUtils.parametersWithWriteDefaults(HoodieOptionConfig.mapSqlOptionsToDataSourceWriteConfigs(
+ hoodieCatalogTable.catalogProperties) ++
sparkSession.sqlContext.conf.getAllConfs).asJava
)
val commitActionType =
CommitUtils.getCommitActionType(WriteOperationType.ALTER_SCHEMA,
hoodieCatalogTable.tableType)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala
index de8ac76a99b..b3cd9e497f5 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala
@@ -379,4 +379,52 @@ class TestAlterTable extends HoodieSparkSqlTestBase {
}
}
}
+
+ test("Test Alter Table With Spark Sql Conf") {
+ withTempDir { tmp =>
+ Seq(true, false).foreach { cleanEnable =>
+ val tableName = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '$tablePath'
+ | tblproperties (
+ | type = 'cow',
+ | primaryKey = 'id',
+ | preCombineField = 'ts',
+ | hoodie.clean.trigger.strategy = 'NUM_COMMITS',
+ | hoodie.cleaner.commits.retained = '3'
+ | )
+ """.stripMargin)
+
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"update $tableName set name = 'a2' where id = 1")
+ spark.sql(s"update $tableName set name = 'a3' where id = 1")
+ spark.sql(s"update $tableName set name = 'a4' where id = 1")
+
+ withSQLConf("hoodie.clean.automatic" -> cleanEnable.toString) {
+ spark.sql(s"alter table $tableName add columns(ext0 string)")
+ }
+
+ val metaClient = HoodieTableMetaClient.builder
+ .setConf(spark.sqlContext.sessionState.newHadoopConf())
+ .setBasePath(tablePath)
+ .build
+
+ val cnt = metaClient.getActiveTimeline.countInstants()
+ if (cleanEnable) {
+ assertResult(6)(cnt)
+ } else {
+ assertResult(5)(cnt)
+ }
+ }
+ }
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
index d1ffa66edf8..ea489252c14 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hudi
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.avro.model.{HoodieCleanMetadata,
HoodieCleanPartitionMetadata}
-import org.apache.hudi.{HoodieCLIUtils, HoodieSparkUtils}
import org.apache.hudi.common.model.{HoodieCleaningPolicy,
HoodieCommitMetadata}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline,
HoodieInstant}
@@ -28,7 +27,7 @@ import org.apache.hudi.config.{HoodieCleanConfig,
HoodieWriteConfig}
import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.{HoodieCLIUtils, HoodieSparkUtils}
import org.apache.spark.sql.SaveMode
-import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase.{getLastCleanMetadata,
getLastCommitMetadata}
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase.getLastCleanMetadata
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Assertions.assertTrue
diff --git
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark30AlterTableCommand.scala
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark30AlterTableCommand.scala
index 7bd1fda77ef..22aea4c53e2 100644
---
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark30AlterTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark30AlterTableCommand.scala
@@ -17,38 +17,37 @@
package org.apache.spark.sql.hudi.command
-import java.net.URI
-import java.nio.charset.StandardCharsets
-import java.util
-import java.util.concurrent.atomic.AtomicInteger
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.{DataSourceUtils, HoodieWriterUtils}
import org.apache.hudi.avro.AvroSchemaUtils.getAvroRecordQualifiedName
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType}
-import org.apache.hudi.{AvroConversionUtils, DataSourceOptionsHelper,
DataSourceUtils}
-import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline,
HoodieInstant}
import org.apache.hudi.common.table.timeline.HoodieInstant.State
+import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline,
HoodieInstant}
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.util.{CommitUtils, Option}
-import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.action.TableChange.ColumnChangeID
import org.apache.hudi.internal.schema.action.TableChanges
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
-import org.apache.hudi.internal.schema.utils.{SchemaChangeUtils, SerDeHelper}
import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager
+import org.apache.hudi.internal.schema.utils.{SchemaChangeUtils, SerDeHelper}
import org.apache.hudi.table.HoodieSparkTable
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
-import org.apache.spark.sql.connector.catalog.{TableCatalog, TableChange}
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn,
DeleteColumn, RemoveProperty, SetProperty}
+import org.apache.spark.sql.connector.catalog.{TableCatalog, TableChange}
import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.hudi.HoodieOptionConfig
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{Row, SparkSession}
+import java.net.URI
+import java.nio.charset.StandardCharsets
+import java.util
+import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
@@ -218,7 +217,9 @@ object Spark30AlterTableCommand extends Logging {
val jsc = new JavaSparkContext(sparkSession.sparkContext)
val client = DataSourceUtils.createHoodieClient(jsc, schema.toString,
- path, table.identifier.table,
parametersWithWriteDefaults(table.storage.properties).asJava)
+ path, table.identifier.table,
HoodieWriterUtils.parametersWithWriteDefaults(
+
HoodieOptionConfig.mapSqlOptionsToDataSourceWriteConfigs(table.storage.properties
++ table.properties) ++
+ sparkSession.sqlContext.conf.getAllConfs).asJava)
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val metaClient =
HoodieTableMetaClient.builder().setBasePath(path).setConf(hadoopConf).build()
@@ -298,18 +299,6 @@ object Spark30AlterTableCommand extends Logging {
} else ""
}
- def parametersWithWriteDefaults(parameters: Map[String, String]):
Map[String, String] = {
- Map(OPERATION.key -> OPERATION.defaultValue,
- TABLE_TYPE.key -> TABLE_TYPE.defaultValue,
- PRECOMBINE_FIELD.key -> PRECOMBINE_FIELD.defaultValue,
- HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key ->
HoodieWriteConfig.DEFAULT_WRITE_PAYLOAD_CLASS,
- INSERT_DROP_DUPS.key -> INSERT_DROP_DUPS.defaultValue,
- ASYNC_COMPACT_ENABLE.key -> ASYNC_COMPACT_ENABLE.defaultValue,
- INLINE_CLUSTERING_ENABLE.key -> INLINE_CLUSTERING_ENABLE.defaultValue,
- ASYNC_CLUSTERING_ENABLE.key -> ASYNC_CLUSTERING_ENABLE.defaultValue
- ) ++ DataSourceOptionsHelper.translateConfigurations(parameters)
- }
-
def checkSchemaChange(colNames: Seq[String], catalogTable: CatalogTable):
Unit = {
val primaryKeys = catalogTable.storage.properties.getOrElse("primaryKey",
catalogTable.properties.getOrElse("primaryKey", "keyid")).split(",").map(_.trim)
val preCombineKey =
Seq(catalogTable.storage.properties.getOrElse("preCombineField",
catalogTable.properties.getOrElse("preCombineField", "ts"))).map(_.trim)
@@ -322,4 +311,3 @@ object Spark30AlterTableCommand extends Logging {
}
}
}
-
diff --git
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark31AlterTableCommand.scala
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark31AlterTableCommand.scala
index 7216a7acce4..a24a5d6b189 100644
---
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark31AlterTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark31AlterTableCommand.scala
@@ -17,37 +17,37 @@
package org.apache.spark.sql.hudi.command
-import java.net.URI
-import java.nio.charset.StandardCharsets
-import java.util
-import java.util.concurrent.atomic.AtomicInteger
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.avro.AvroSchemaUtils.getAvroRecordQualifiedName
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType}
-import org.apache.hudi.{DataSourceUtils, HoodieWriterUtils}
-import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline,
HoodieInstant}
import org.apache.hudi.common.table.timeline.HoodieInstant.State
+import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline,
HoodieInstant}
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.util.{CommitUtils, Option}
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.action.TableChange.ColumnChangeID
import org.apache.hudi.internal.schema.action.TableChanges
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
-import org.apache.hudi.internal.schema.utils.{SchemaChangeUtils, SerDeHelper}
import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager
+import org.apache.hudi.internal.schema.utils.{SchemaChangeUtils, SerDeHelper}
import org.apache.hudi.table.HoodieSparkTable
+import org.apache.hudi.{DataSourceUtils, HoodieWriterUtils}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
-import org.apache.spark.sql.connector.catalog.{TableCatalog, TableChange}
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn,
DeleteColumn, RemoveProperty, SetProperty}
+import org.apache.spark.sql.connector.catalog.{TableCatalog, TableChange}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.hudi.HoodieOptionConfig
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{Row, SparkSession}
+import java.net.URI
+import java.nio.charset.StandardCharsets
+import java.util
+import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
@@ -218,7 +218,8 @@ object Spark31AlterTableCommand extends Logging {
val jsc = new JavaSparkContext(sparkSession.sparkContext)
val client = DataSourceUtils.createHoodieClient(jsc, schema.toString,
path, table.identifier.table,
HoodieWriterUtils.parametersWithWriteDefaults(
-
HoodieOptionConfig.mapSqlOptionsToDataSourceWriteConfigs(table.storage.properties
++ table.properties)).asJava)
+
HoodieOptionConfig.mapSqlOptionsToDataSourceWriteConfigs(table.storage.properties
++ table.properties) ++
+ sparkSession.sqlContext.conf.getAllConfs).asJava)
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val metaClient =
HoodieTableMetaClient.builder().setBasePath(path).setConf(hadoopConf).build()
@@ -310,4 +311,3 @@ object Spark31AlterTableCommand extends Logging {
}
}
}
-
diff --git
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
index b0a3a46c8b2..78972cf239d 100644
---
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
@@ -17,36 +17,36 @@
package org.apache.spark.sql.hudi.command
-import java.net.URI
-import java.nio.charset.StandardCharsets
-import java.util
-import java.util.concurrent.atomic.AtomicInteger
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.avro.AvroSchemaUtils.getAvroRecordQualifiedName
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType}
-import org.apache.hudi.{DataSourceUtils, HoodieWriterUtils}
-import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline,
HoodieInstant}
import org.apache.hudi.common.table.timeline.HoodieInstant.State
+import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline,
HoodieInstant}
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.util.{CommitUtils, Option}
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.action.TableChange.ColumnChangeID
import org.apache.hudi.internal.schema.action.TableChanges
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
-import org.apache.hudi.internal.schema.utils.{SchemaChangeUtils, SerDeHelper}
import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager
+import org.apache.hudi.internal.schema.utils.{SchemaChangeUtils, SerDeHelper}
import org.apache.hudi.table.HoodieSparkTable
+import org.apache.hudi.{DataSourceUtils, HoodieWriterUtils}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
-import org.apache.spark.sql.connector.catalog.{TableCatalog, TableChange}
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn,
DeleteColumn, RemoveProperty, SetProperty}
+import org.apache.spark.sql.connector.catalog.{TableCatalog, TableChange}
import org.apache.spark.sql.hudi.HoodieOptionConfig
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{Row, SparkSession}
+import java.net.URI
+import java.nio.charset.StandardCharsets
+import java.util
+import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
@@ -253,7 +253,8 @@ object AlterTableCommand extends Logging {
val jsc = new JavaSparkContext(sparkSession.sparkContext)
val client = DataSourceUtils.createHoodieClient(jsc, schema.toString,
path, table.identifier.table,
HoodieWriterUtils.parametersWithWriteDefaults(
-
HoodieOptionConfig.mapSqlOptionsToDataSourceWriteConfigs(table.storage.properties
++ table.properties)).asJava)
+
HoodieOptionConfig.mapSqlOptionsToDataSourceWriteConfigs(table.storage.properties
++ table.properties) ++
+ sparkSession.sqlContext.conf.getAllConfs).asJava)
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val metaClient =
HoodieTableMetaClient.builder().setBasePath(path).setConf(hadoopConf).build()
@@ -333,4 +334,3 @@ object AlterTableCommand extends Logging {
} else ""
}
}
-