This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e4e5bf02320 [HUDI-8097] Fix getting Schema evolution setting from
hudi-defaults.conf on altering columns (#11796)
e4e5bf02320 is described below
commit e4e5bf0232093bfba25f1769fb63258e9f816a8f
Author: Vova Kolmakov <[email protected]>
AuthorDate: Tue Aug 20 07:36:12 2024 +0700
[HUDI-8097] Fix getting Schema evolution setting from hudi-defaults.conf on
altering columns (#11796)
Co-authored-by: Vova Kolmakov <[email protected]>
---
.../src/main/scala/org/apache/hudi/HoodieBaseRelation.scala | 6 ++----
.../scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala | 9 +++++++--
.../src/test/resources/external-config/hudi-defaults.conf | 2 ++
.../scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala | 5 ++++-
.../scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala | 6 ++----
.../spark/sql/hudi/Spark33ResolveHudiAlterTableCommand.scala | 8 +-------
.../spark/sql/hudi/Spark34ResolveHudiAlterTableCommand.scala | 7 +------
.../spark/sql/hudi/Spark35ResolveHudiAlterTableCommand.scala | 7 +------
8 files changed, 20 insertions(+), 30 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 1920b56ecab..00a20b9607e 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -48,7 +48,6 @@ import org.apache.hudi.io.storage.HoodieSparkIOFactory
import org.apache.hudi.metadata.HoodieTableMetadata
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage
import org.apache.hudi.storage.{StoragePath, StoragePathInfo}
-
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
@@ -65,7 +64,7 @@ import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import
org.apache.spark.sql.execution.datasources.parquet.{LegacyHoodieParquetFileFormat,
ParquetFileFormat}
-import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
+import org.apache.spark.sql.hudi.{HoodieSqlCommonUtils, ProvidesHoodieConfig}
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SQLContext, SparkSession}
@@ -877,7 +876,6 @@ object HoodieBaseRelation extends SparkAdapterSupport {
// t/h Spark Session configuration (for ex, for Spark SQL)
optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
||
- sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
-
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
+ ProvidesHoodieConfig.isSchemaEvolutionEnabled(sparkSession)
}
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index 440980dff2b..976f8496415 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -21,7 +21,7 @@ import org.apache.hudi.{DataSourceWriteOptions,
HoodieFileIndex}
import
org.apache.hudi.AutoRecordKeyGenerationUtils.shouldAutoGenerateRecordKeys
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieConversionUtils.toProperties
-import org.apache.hudi.common.config.{DFSPropertiesConfiguration,
TypedProperties}
+import org.apache.hudi.common.config.{DFSPropertiesConfiguration,
HoodieCommonConfig, TypedProperties}
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload,
WriteOperationType}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.util.{ReflectionUtils, StringUtils}
@@ -48,7 +48,6 @@ import org.apache.spark.sql.types.StructType
import org.slf4j.LoggerFactory
import java.util.Locale
-
import scala.collection.JavaConverters._
trait ProvidesHoodieConfig extends Logging {
@@ -573,6 +572,12 @@ object ProvidesHoodieConfig {
}
}
+ def isSchemaEvolutionEnabled(sparkSession: SparkSession): Boolean =
+
sparkSession.sessionState.conf.getConfString(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key,
+
DFSPropertiesConfiguration.getGlobalProps.getString(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(),
+ HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue.toString)
+ ).toBoolean
+
private def filterNullValues(opts: Map[String, String]): Map[String, String]
=
opts.filter { case (_, v) => v != null }
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/resources/external-config/hudi-defaults.conf
b/hudi-spark-datasource/hudi-spark/src/test/resources/external-config/hudi-defaults.conf
index c883b5bbe83..3ade0cc3cf7 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/resources/external-config/hudi-defaults.conf
+++
b/hudi-spark-datasource/hudi-spark/src/test/resources/external-config/hudi-defaults.conf
@@ -21,3 +21,5 @@
# Example:
hoodie.datasource.write.table.type MERGE_ON_READ
hoodie.datasource.write.hive_style_partitioning false
+hoodie.schema.on.read.enable true
+
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala
index b0d57aceab6..f30ec036f58 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala
@@ -24,7 +24,6 @@ import org.apache.hudi.common.table.{HoodieTableConfig,
HoodieTableMetaClient}
import org.apache.hudi.common.testutils.HoodieTestUtils
import org.apache.hudi.storage.{HoodieStorageUtils, StoragePath}
import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
-
import org.scalatest.BeforeAndAfter
import java.io.File
@@ -99,6 +98,10 @@ class TestSqlConf extends HoodieSparkSqlTestBase with
BeforeAndAfter {
spark.sql(s"delete from $tableName where year = $partitionVal")
val cnt = spark.sql(s"select * from $tableName where year =
$partitionVal").count()
assertResult(0)(cnt)
+
+ // check that schema evolution is enabled (from hudi-defaults.conf),
+ // so no exception is thrown on alter table change column type
+ spark.sql(s"alter table $tableName change column price price string")
}
}
diff --git
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala
index bdde33b5845..65adee23009 100644
---
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala
+++
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala
@@ -28,8 +28,7 @@ import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.sql.InsertMode
import org.apache.hudi.storage.StoragePath
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
SparkAdapterSupport}
-
+import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.HoodieSpark3CatalogUtils.MatchBucketTransform
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -135,8 +134,7 @@ class HoodieCatalog extends DelegatingCatalogExtension
catalogTable = Some(catalogTable),
tableIdentifier = Some(ident.toString))
- val schemaEvolutionEnabled: Boolean =
spark.sessionState.conf.getConfString(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
-
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
+ val schemaEvolutionEnabled =
ProvidesHoodieConfig.isSchemaEvolutionEnabled(spark)
// NOTE: PLEASE READ CAREFULLY
//
diff --git
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/hudi/Spark33ResolveHudiAlterTableCommand.scala
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/hudi/Spark33ResolveHudiAlterTableCommand.scala
index c8078147838..4188f19ebae 100644
---
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/hudi/Spark33ResolveHudiAlterTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/hudi/Spark33ResolveHudiAlterTableCommand.scala
@@ -17,9 +17,7 @@
package org.apache.spark.sql.hudi
-import org.apache.hudi.common.config.HoodieCommonConfig
import org.apache.hudi.internal.schema.action.TableChange.ColumnChangeID
-
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.ResolvedTable
import org.apache.spark.sql.catalyst.plans.logical._
@@ -34,7 +32,7 @@ import org.apache.spark.sql.hudi.command.{AlterTableCommand
=> HudiAlterTableCom
class Spark33ResolveHudiAlterTableCommand(sparkSession: SparkSession) extends
Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
- if (schemaEvolutionEnabled) {
+ if (ProvidesHoodieConfig.isSchemaEvolutionEnabled(sparkSession)) {
plan.resolveOperatorsUp {
case set@SetTableProperties(ResolvedHoodieV2TablePlan(t), _) if
set.resolved =>
HudiAlterTableCommand(t.v1Table, set.changes,
ColumnChangeID.PROPERTY_CHANGE)
@@ -56,10 +54,6 @@ class Spark33ResolveHudiAlterTableCommand(sparkSession:
SparkSession) extends Ru
}
}
- private def schemaEvolutionEnabled: Boolean =
-
sparkSession.sessionState.conf.getConfString(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key,
-
HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue.toString).toBoolean
-
object ResolvedHoodieV2TablePlan {
def unapply(plan: LogicalPlan): Option[HoodieInternalV2Table] = {
plan match {
diff --git
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/hudi/Spark34ResolveHudiAlterTableCommand.scala
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/hudi/Spark34ResolveHudiAlterTableCommand.scala
index cce06395157..9389e6095dc 100644
---
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/hudi/Spark34ResolveHudiAlterTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/hudi/Spark34ResolveHudiAlterTableCommand.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.hudi
-import org.apache.hudi.common.config.HoodieCommonConfig
import org.apache.hudi.internal.schema.action.TableChange.ColumnChangeID
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.ResolvedTable
@@ -33,7 +32,7 @@ import org.apache.spark.sql.hudi.command.{AlterTableCommand
=> HudiAlterTableCom
class Spark34ResolveHudiAlterTableCommand(sparkSession: SparkSession) extends
Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
- if (schemaEvolutionEnabled) {
+ if (ProvidesHoodieConfig.isSchemaEvolutionEnabled(sparkSession)) {
plan.resolveOperatorsUp {
case set@SetTableProperties(ResolvedHoodieV2TablePlan(t), _) if
set.resolved =>
HudiAlterTableCommand(t.v1Table, set.changes,
ColumnChangeID.PROPERTY_CHANGE)
@@ -55,10 +54,6 @@ class Spark34ResolveHudiAlterTableCommand(sparkSession:
SparkSession) extends Ru
}
}
- private def schemaEvolutionEnabled: Boolean =
-
sparkSession.sessionState.conf.getConfString(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key,
-
HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue.toString).toBoolean
-
object ResolvedHoodieV2TablePlan {
def unapply(plan: LogicalPlan): Option[HoodieInternalV2Table] = {
plan match {
diff --git
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/hudi/Spark35ResolveHudiAlterTableCommand.scala
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/hudi/Spark35ResolveHudiAlterTableCommand.scala
index 160804f62b3..0cbd506150d 100644
---
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/hudi/Spark35ResolveHudiAlterTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/hudi/Spark35ResolveHudiAlterTableCommand.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.hudi
-import org.apache.hudi.common.config.HoodieCommonConfig
import org.apache.hudi.internal.schema.action.TableChange.ColumnChangeID
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.ResolvedTable
@@ -33,7 +32,7 @@ import org.apache.spark.sql.hudi.command.{AlterTableCommand
=> HudiAlterTableCom
class Spark35ResolveHudiAlterTableCommand(sparkSession: SparkSession) extends
Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
- if (schemaEvolutionEnabled) {
+ if (ProvidesHoodieConfig.isSchemaEvolutionEnabled(sparkSession)) {
plan.resolveOperatorsUp {
case set@SetTableProperties(ResolvedHoodieV2TablePlan(t), _) if
set.resolved =>
HudiAlterTableCommand(t.v1Table, set.changes,
ColumnChangeID.PROPERTY_CHANGE)
@@ -55,10 +54,6 @@ class Spark35ResolveHudiAlterTableCommand(sparkSession:
SparkSession) extends Ru
}
}
- private def schemaEvolutionEnabled: Boolean =
-
sparkSession.sessionState.conf.getConfString(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key,
-
HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue.toString).toBoolean
-
object ResolvedHoodieV2TablePlan {
def unapply(plan: LogicalPlan): Option[HoodieInternalV2Table] = {
plan match {