This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 7a4f86af9b9562cef05475694201231ab2d24e52
Author: Geser Dugarov, PhD <[email protected]>
AuthorDate: Fri Mar 1 07:48:04 2024 +0700

    Revert "[HUDI-6438] Config parameter 'MAKE_NEW_COLUMNS_NULLABLE' to allow 
for marking a newly created column as nullable." (#10782)
---
 .../hudi/common/config/HoodieCommonConfig.java     |  9 -----
 .../schema/utils/AvroSchemaEvolutionUtils.java     |  9 +----
 .../scala/org/apache/hudi/DataSourceOptions.scala  |  2 -
 .../scala/org/apache/hudi/HoodieSchemaUtils.scala  |  2 +-
 .../scala/org/apache/hudi/HoodieWriterUtils.scala  |  1 -
 .../apache/hudi/functional/TestCOWDataSource.scala | 47 +---------------------
 6 files changed, 4 insertions(+), 66 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
index 65fded08e52..afb22a4a27e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
@@ -64,15 +64,6 @@ public class HoodieCommonConfig extends HoodieConfig {
           + "This enables us, to always extend the table's schema during 
evolution and never lose the data (when, for "
           + "ex, existing column is being dropped in a new batch)");
 
-  public static final ConfigProperty<Boolean> MAKE_NEW_COLUMNS_NULLABLE = 
ConfigProperty
-      .key("hoodie.datasource.write.new.columns.nullable")
-      .defaultValue(false)
-      .markAdvanced()
-      .sinceVersion("0.14.0")
-      .withDocumentation("When a non-nullable column is added to datasource 
during a write operation, the write "
-          + " operation will fail schema compatibility check. Set this option 
to true will make the newly added "
-          + " column nullable to successfully complete the write operation.");
-
   public static final ConfigProperty<String> SET_NULL_FOR_MISSING_COLUMNS = 
ConfigProperty
       .key("hoodie.write.set.null.for.missing.columns")
       .defaultValue("false")
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java
index 809cd2837c7..7ca0cb7f81e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java
@@ -25,11 +25,9 @@ import org.apache.avro.Schema;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 
-import static 
org.apache.hudi.common.config.HoodieCommonConfig.MAKE_NEW_COLUMNS_NULLABLE;
 import static org.apache.hudi.common.util.CollectionUtils.reduce;
 import static 
org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter.convert;
 
@@ -136,10 +134,9 @@ public class AvroSchemaEvolutionUtils {
    *
    * @param sourceSchema source schema that needs reconciliation
    * @param targetSchema target schema that source schema will be reconciled 
against
-   * @param opts         config options
    * @return schema (based off {@code source} one) that has nullability 
constraints and datatypes reconciled
    */
-  public static Schema reconcileSchemaRequirements(Schema sourceSchema, Schema 
targetSchema, Map<String, String> opts) {
+  public static Schema reconcileSchemaRequirements(Schema sourceSchema, Schema 
targetSchema) {
     if (targetSchema.getType() == Schema.Type.NULL || 
targetSchema.getFields().isEmpty()) {
       return sourceSchema;
     }
@@ -153,14 +150,12 @@ public class AvroSchemaEvolutionUtils {
 
     List<String> colNamesSourceSchema = 
sourceInternalSchema.getAllColsFullName();
     List<String> colNamesTargetSchema = 
targetInternalSchema.getAllColsFullName();
-    boolean makeNewColsNullable = 
"true".equals(opts.get(MAKE_NEW_COLUMNS_NULLABLE.key()));
 
     List<String> nullableUpdateColsInSource = new ArrayList<>();
     List<String> typeUpdateColsInSource = new ArrayList<>();
     colNamesSourceSchema.forEach(field -> {
       // handle columns that needs to be made nullable
-      if ((makeNewColsNullable && !colNamesTargetSchema.contains(field))
-          || colNamesTargetSchema.contains(field) && 
sourceInternalSchema.findField(field).isOptional() != 
targetInternalSchema.findField(field).isOptional()) {
+      if (colNamesTargetSchema.contains(field) && 
sourceInternalSchema.findField(field).isOptional() != 
targetInternalSchema.findField(field).isOptional()) {
         nullableUpdateColsInSource.add(field);
       }
       // handle columns that needs type to be updated
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 85faccdc4d7..578f7aebaf2 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -541,8 +541,6 @@ object DataSourceWriteOptions {
 
   val SET_NULL_FOR_MISSING_COLUMNS: ConfigProperty[String] = 
HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS
 
-  val MAKE_NEW_COLUMNS_NULLABLE: ConfigProperty[java.lang.Boolean] = 
HoodieCommonConfig.MAKE_NEW_COLUMNS_NULLABLE
-
   val SPARK_SQL_INSERT_INTO_OPERATION: ConfigProperty[String] = ConfigProperty
     .key("hoodie.spark.sql.insert.into.operation")
     .defaultValue(WriteOperationType.INSERT.value())
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
index ed073ce4b17..0b42dc75b54 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
@@ -206,7 +206,7 @@ object HoodieSchemaUtils {
    * TODO support casing reconciliation
    */
   private def canonicalizeSchema(sourceSchema: Schema, latestTableSchema: 
Schema, opts : Map[String, String]): Schema = {
-    reconcileSchemaRequirements(sourceSchema, latestTableSchema, opts)
+    reconcileSchemaRequirements(sourceSchema, latestTableSchema)
   }
 
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index 6e541973b91..0a4ef7a3d63 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -82,7 +82,6 @@ object HoodieWriterUtils {
     hoodieConfig.setDefaultValue(ASYNC_CLUSTERING_ENABLE)
     hoodieConfig.setDefaultValue(ENABLE_ROW_WRITER)
     hoodieConfig.setDefaultValue(RECONCILE_SCHEMA)
-    hoodieConfig.setDefaultValue(MAKE_NEW_COLUMNS_NULLABLE)
     hoodieConfig.setDefaultValue(DROP_PARTITION_COLUMNS)
     
hoodieConfig.setDefaultValue(KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED)
     Map() ++ hoodieConfig.getProps.asScala ++ globalProps ++ 
DataSourceOptionsHelper.translateConfigurations(parameters)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index cb0209de979..a28a228fd46 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -24,7 +24,7 @@ import org.apache.hudi.HoodieConversionUtils.toJavaOption
 import org.apache.hudi.QuickstartUtils.{convertToStringList, 
getQuickstartWriteConfigs}
 import org.apache.hudi.client.common.HoodieSparkEngineContext
 import 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT,
 TIMESTAMP_OUTPUT_DATE_FORMAT, TIMESTAMP_TIMEZONE_FORMAT, TIMESTAMP_TYPE_FIELD}
-import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
+import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
 import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
 import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline, 
TimelineUtils}
@@ -38,7 +38,6 @@ import org.apache.hudi.exception.ExceptionUtil.getRootCause
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.functional.CommonOptionUtils._
 import org.apache.hudi.functional.TestCOWDataSource.convertColumnsToNullable
-import org.apache.hudi.hive.HiveSyncConfigHolder
 import org.apache.hudi.keygen._
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
 import org.apache.hudi.metrics.{Metrics, MetricsReporterType}
@@ -1749,50 +1748,6 @@ class TestCOWDataSource extends 
HoodieSparkClientTestBase with ScalaAssertionSup
     assertEquals(0, result.filter(result("id") === 1).count())
   }
 
-  /** Test case to verify MAKE_NEW_COLUMNS_NULLABLE config parameter. */
-  @Test
-  def testSchemaEvolutionWithNewColumn(): Unit = {
-    val df1 = spark.sql("select '1' as event_id, '2' as ts, '3' as version, 
'foo' as event_date")
-    var hudiOptions = Map[String, String](
-      HoodieWriteConfig.TBL_NAME.key() -> "test_hudi_merger",
-      KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key() -> "event_id",
-      KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key() -> "version",
-      DataSourceWriteOptions.OPERATION.key() -> "insert",
-      HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key() -> "ts",
-      HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key() -> 
"org.apache.hudi.keygen.ComplexKeyGenerator",
-      KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key() -> "true",
-      HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key() -> "false",
-      HoodieWriteConfig.RECORD_MERGER_IMPLS.key() -> 
"org.apache.hudi.HoodieSparkRecordMerger"
-    )
-    
df1.write.format("hudi").options(hudiOptions).mode(SaveMode.Append).save(basePath)
-
-    // Try adding a string column. This operation is expected to throw 'schema 
not compatible' exception since
-    // 'MAKE_NEW_COLUMNS_NULLABLE' parameter is 'false' by default.
-    val df2 = spark.sql("select '2' as event_id, '2' as ts, '3' as version, 
'foo' as event_date, 'bar' as add_col")
-    try {
-      
(df2.write.format("hudi").options(hudiOptions).mode("append").save(basePath))
-      fail("Option succeeded, but was expected to fail.")
-    } catch {
-      case ex: org.apache.hudi.exception.HoodieInsertException => {
-        assertTrue(ex.getMessage.equals("Failed insert schema compatibility 
check"))
-      }
-      case ex: Exception => {
-        fail(ex)
-      }
-    }
-
-    // Try adding the string column again. This operation is expected to 
succeed since 'MAKE_NEW_COLUMNS_NULLABLE'
-    // parameter has been set to 'true'.
-    hudiOptions = hudiOptions + 
(HoodieCommonConfig.MAKE_NEW_COLUMNS_NULLABLE.key() -> "true")
-    try {
-      
(df2.write.format("hudi").options(hudiOptions).mode("append").save(basePath))
-    } catch {
-      case ex: Exception => {
-        fail(ex)
-      }
-    }
-  }
-
   def assertLastCommitIsUpsert(): Boolean = {
     val metaClient = HoodieTableMetaClient.builder()
       .setBasePath(basePath)

Reply via email to