This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d15bdfcebd4 [HUDI-7937] Handle legacy writer requirements in 
StreamSync and Clustering (#11534)
d15bdfcebd4 is described below

commit d15bdfcebd4e15308d5b79a2b90925004229aa4a
Author: Tim Brown <[email protected]>
AuthorDate: Tue Sep 17 09:29:16 2024 -0500

    [HUDI-7937] Handle legacy writer requirements in StreamSync and Clustering 
(#11534)
    
    - Update StreamSync to infer whether row writer is allowed based on the 
configs and target schema
    - Update clustering to also infer whether row writer is allowed based on 
the target schema and configs
    - Fix clustering to add legacy writer parameter when required
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   4 -
 .../MultipleSparkJobExecutionStrategy.java         |   8 +-
 .../storage/row/HoodieRowParquetWriteSupport.java  |   2 +-
 .../org/apache/spark/sql/HoodieDataTypeUtils.scala |  85 ++++++++++++---
 .../apache/spark/sql/TestHoodieDataTypeUtils.java  | 115 +++++++++++++++++++++
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java |  65 ++++++++++++
 .../hudi/common/config/HoodieStorageConfig.java    |   2 +-
 .../org/apache/hudi/avro/TestHoodieAvroUtils.java  |  28 +++++
 .../testsuite/HoodieDeltaStreamerWrapper.java      |   2 +-
 .../main/java/org/apache/hudi/DataSourceUtils.java |  38 -------
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |   2 +-
 .../java/org/apache/hudi/TestDataSourceUtils.java  |  54 ----------
 .../TestHoodieSparkMergeOnReadTableClustering.java |   4 +-
 .../functional/TestSparkSortAndSizeClustering.java |  27 ++++-
 .../org/apache/hudi/internal/DefaultSource.java    |   5 +-
 .../apache/hudi/spark3/internal/DefaultSource.java |   4 +-
 .../apache/hudi/utilities/streamer/StreamSync.java |  80 ++++++++------
 .../streamer/TestStreamSyncUnitTests.java          |  10 +-
 18 files changed, 373 insertions(+), 162 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 964e84a04bf..15c16078b0f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2149,10 +2149,6 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getBoolean(HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED);
   }
 
-  public String parquetWriteLegacyFormatEnabled() {
-    return getString(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED);
-  }
-
   public String parquetOutputTimestampType() {
     return getString(HoodieStorageConfig.PARQUET_OUTPUT_TIMESTAMP_TYPE);
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 47ccd8700a8..57fc02960c2 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -72,6 +72,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.HoodieDataTypeUtils;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.sources.BaseRelation;
@@ -115,11 +116,16 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
         Math.min(clusteringPlan.getInputGroups().size(), 
writeConfig.getClusteringMaxParallelism()),
         new CustomizedThreadFactory("clustering-job-group", true));
     try {
+      boolean canUseRowWriter = 
getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable",
 true)
+          && HoodieDataTypeUtils.canUseRowWriter(schema, 
engineContext.hadoopConfiguration());
+      if (canUseRowWriter) {
+        
HoodieDataTypeUtils.tryOverrideParquetWriteLegacyFormatProperty(writeConfig.getProps(),
 schema);
+      }
       // execute clustering for each group async and collect WriteStatus
       Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(
               clusteringPlan.getInputGroups().stream()
                   .map(inputGroup -> {
-                    if 
(getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable",
 true)) {
+                    if (canUseRowWriter) {
                       return runClusteringForGroupAsyncAsRow(inputGroup,
                           clusteringPlan.getStrategy().getStrategyParams(),
                           shouldPreserveMetadata,
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
index d05abb5a5a0..5e45be76d46 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
@@ -46,7 +46,7 @@ public class HoodieRowParquetWriteSupport extends 
ParquetWriteSupport {
 
   public HoodieRowParquetWriteSupport(Configuration conf, StructType 
structType, Option<BloomFilter> bloomFilterOpt, HoodieConfig config) {
     Configuration hadoopConf = new Configuration(conf);
-    hadoopConf.set("spark.sql.parquet.writeLegacyFormat", 
config.getStringOrDefault(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED));
+    hadoopConf.set("spark.sql.parquet.writeLegacyFormat", 
config.getStringOrDefault(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED,
 "false"));
     hadoopConf.set("spark.sql.parquet.outputTimestampType", 
config.getStringOrDefault(HoodieStorageConfig.PARQUET_OUTPUT_TIMESTAMP_TYPE));
     hadoopConf.set("spark.sql.parquet.fieldId.write.enabled", 
config.getStringOrDefault(PARQUET_FIELD_ID_WRITE_ENABLED));
     setSchema(structType, hadoopConf);
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieDataTypeUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieDataTypeUtils.scala
index 6017eca6739..00f4e8d7996 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieDataTypeUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieDataTypeUtils.scala
@@ -18,9 +18,17 @@
 
 package org.apache.spark.sql
 
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.hudi.AvroConversionUtils
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.config.{HoodieStorageConfig, TypedProperties}
+import org.apache.parquet.avro.AvroWriteSupport
 import org.apache.spark.sql.types._
+import org.slf4j.LoggerFactory
 
 object HoodieDataTypeUtils {
+  private val log = LoggerFactory.getLogger(getClass)
 
   /**
    * Parses provided [[jsonSchema]] into [[StructType]].
@@ -30,26 +38,71 @@ object HoodieDataTypeUtils {
   def parseStructTypeFromJson(jsonSchema: String): StructType =
     StructType.fromString(jsonSchema)
 
+  def canUseRowWriter(schema: Schema, conf: Configuration): Boolean = {
+    if (conf.getBoolean(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE, true)) {
+      // if we can write lists with the old list structure, we can use row 
writer regardless of decimal precision
+      true
+    } else if (!HoodieAvroUtils.hasSmallPrecisionDecimalField(schema)) {
+      true
+    } else {
+      // small precision decimals require the legacy write mode but lists and 
maps require the new write mode when
+      // WRITE_OLD_LIST_STRUCTURE is false so we can only use row writer if 
one is present and the other is not
+      if (HoodieAvroUtils.hasListOrMapField(schema)) {
+        log.warn("Cannot use row writer due to presence of list or map with a 
small precision decimal field")
+        false
+      } else {
+        true
+      }
+    }
+  }
+
   /**
-   * Checks whether provided {@link DataType} contains {@link DecimalType} 
whose scale is less than
-   * {@link Decimal# MAX_LONG_DIGITS ( )}
+   * Checks whether default value (false) of 
"hoodie.parquet.writelegacyformat.enabled" should be
+   * overridden in case:
+   *
+   * <ul>
+   * <li>Property has not been explicitly set by the writer</li>
+   * <li>Data schema contains {@code DecimalType} that would be affected by 
it</li>
+   * </ul>
+   *
+   * If both of the aforementioned conditions are true, will override the 
default value of the config
+   * (by essentially setting the value) to make sure that the produced Parquet 
data files could be
+   * read by {@code AvroParquetReader}
+   *
+   * @param properties properties specified by the writer
+   * @param schema     schema of the dataset being written
    */
-  def hasSmallPrecisionDecimalType(sparkType: DataType): Boolean = {
-    sparkType match {
-      case st: StructType =>
-        st.exists(f => hasSmallPrecisionDecimalType(f.dataType))
-
-      case map: MapType =>
-        hasSmallPrecisionDecimalType(map.keyType) ||
-          hasSmallPrecisionDecimalType(map.valueType)
-
-      case at: ArrayType =>
-        hasSmallPrecisionDecimalType(at.elementType)
+  def tryOverrideParquetWriteLegacyFormatProperty(properties: 
java.util.Map[String, String], schema: StructType): Unit = {
+    tryOverrideParquetWriteLegacyFormatProperty(Left(properties),
+      AvroConversionUtils.convertStructTypeToAvroSchema(schema, "struct", 
"hoodie.source"))
+  }
 
-      case dt: DecimalType =>
-        dt.precision < Decimal.MAX_LONG_DIGITS
+  def tryOverrideParquetWriteLegacyFormatProperty(properties: TypedProperties, 
schema: Schema): Unit = {
+    tryOverrideParquetWriteLegacyFormatProperty(Right(properties), schema)
+  }
 
-      case _ => false
+  private def tryOverrideParquetWriteLegacyFormatProperty(properties: 
Either[java.util.Map[String, String], TypedProperties], schema: Schema): Unit = 
{
+    val legacyFormatEnabledProp = properties match {
+      case Left(map) => 
map.get(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key)
+      case Right(typedProps) => 
typedProps.get(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key)
+    }
+    if (legacyFormatEnabledProp == null && 
HoodieAvroUtils.hasSmallPrecisionDecimalField(schema)) {
+      // ParquetWriteSupport writes DecimalType to parquet as INT32/INT64 when 
the scale of decimalType
+      // is less than {@code Decimal.MAX_LONG_DIGITS}, but {@code 
AvroParquetReader} which is used by
+      // {@code HoodieParquetReader} does not support DecimalType encoded as 
INT32/INT64 as.
+      //
+      // To work this problem around we're checking whether
+      //    - Schema contains any decimals that could be encoded as INT32/INT64
+      //    - {@code HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED} 
has not been explicitly
+      //    set by the writer
+      //
+      // If both of these conditions are true, then we override the default 
value of {@code
+      // HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED} and set it 
to "true"
+      log.warn("Small Decimal Type found in the persisted schema, reverting 
default value of 'hoodie.parquet.writelegacyformat.enabled' to true")
+      properties match {
+        case Left(map) => 
map.put(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key, "true")
+        case Right(typedProps) => 
typedProps.put(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key, 
"true")
+      }
     }
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/spark/sql/TestHoodieDataTypeUtils.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/spark/sql/TestHoodieDataTypeUtils.java
new file mode 100644
index 00000000000..285ccd78e11
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/spark/sql/TestHoodieDataTypeUtils.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql;
+
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.avro.AvroWriteSupport;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class TestHoodieDataTypeUtils {
+  private static final Schema SMALL_DECIMAL_SCHEMA = LogicalTypes.decimal(18, 
10).addToSchema(Schema.createFixed("smallDec", null, "org.apache.hudi.test", 
9));
+  private static final Schema LARGE_DECIMAL_SCHEMA = LogicalTypes.decimal(20, 
10).addToSchema(Schema.createFixed("largeDec", null, "org.apache.hudi.test", 
9));
+
+  private static Stream<Arguments> canUseRowWriterCases() {
+
+    Schema listSchema = Schema.createArray(Schema.create(Schema.Type.INT));
+    Schema mapSchema = Schema.createMap(Schema.create(Schema.Type.INT));
+
+    Schema schemaWithSmallDecimal = 
Schema.createRecord("schemaWithSmallDecimal", null, null, false,
+        Collections.singletonList(new Schema.Field("smallDecimal", 
SMALL_DECIMAL_SCHEMA, null, null)));
+    Schema schemaWithSmallDecimalAndList = 
Schema.createRecord("schemaWithSmallDecimalAndList", null, null, false,
+        Arrays.asList(new Schema.Field("smallDecimal", SMALL_DECIMAL_SCHEMA, 
null, null), new Schema.Field("intField", Schema.create(Schema.Type.INT), null, 
null),
+            new Schema.Field("listField", listSchema, null, null)));
+    Schema schemaWithSmallDecimalAndMap = 
Schema.createRecord("schemaWithSmallDecimalAndMap", null, null, false,
+        Arrays.asList(new Schema.Field("smallDecimal", SMALL_DECIMAL_SCHEMA, 
null, null), new Schema.Field("intField", Schema.create(Schema.Type.INT), null, 
null),
+            new Schema.Field("mapField", mapSchema,null, null)));
+    Schema schemaWithLargeDecimalAndList = 
Schema.createRecord("schemaWithLargeDecimalAndList", null, null, false,
+        Arrays.asList(new Schema.Field("largeDecimal", LARGE_DECIMAL_SCHEMA, 
null, null), new Schema.Field("intField", Schema.create(Schema.Type.INT), null, 
null),
+            new Schema.Field("listField", listSchema, null, null)));
+    Schema schemaWithLargeDecimalAndMap = 
Schema.createRecord("schemaWithLargeDecimalAndMap", null, null, false,
+        Arrays.asList(new Schema.Field("largeDecimal", LARGE_DECIMAL_SCHEMA, 
null, null), new Schema.Field("intField", Schema.create(Schema.Type.INT), null, 
null),
+            new Schema.Field("mapField", mapSchema, null, null)));
+    Schema schemaWithoutSpecialTypes = Schema.createRecord("schemaWithInt", 
null, null, false,
+        Collections.singletonList(new Schema.Field("intField", 
Schema.create(Schema.Type.INT), null, null)));
+
+    Configuration configurationWithLegacyListFormat = new Configuration(false);
+    
configurationWithLegacyListFormat.set(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE,
 "true");
+
+    Configuration configurationWithoutLegacyListFormat = new 
Configuration(false);
+    
configurationWithoutLegacyListFormat.set(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE,
 "false");
+
+    return Stream.of(
+        Arguments.of(schemaWithoutSpecialTypes, 
configurationWithoutLegacyListFormat, true),
+        Arguments.of(schemaWithSmallDecimal, 
configurationWithoutLegacyListFormat, true),
+        Arguments.of(schemaWithLargeDecimalAndMap, 
configurationWithoutLegacyListFormat, true),
+        Arguments.of(schemaWithSmallDecimalAndMap, 
configurationWithoutLegacyListFormat, false),
+        Arguments.of(schemaWithLargeDecimalAndMap, 
configurationWithoutLegacyListFormat, true),
+        Arguments.of(schemaWithSmallDecimalAndList, 
configurationWithLegacyListFormat, true),
+        Arguments.of(schemaWithSmallDecimalAndList, 
configurationWithoutLegacyListFormat, false),
+        Arguments.of(schemaWithLargeDecimalAndList, 
configurationWithoutLegacyListFormat, true),
+        Arguments.of(schemaWithLargeDecimalAndList, 
configurationWithLegacyListFormat, true));
+  }
+
+  @ParameterizedTest
+  @MethodSource("canUseRowWriterCases")
+  void testCanUseRowWriter(Schema schema, Configuration conf, boolean 
expected) {
+    assertEquals(expected, HoodieDataTypeUtils.canUseRowWriter(schema, conf));
+  }
+
+  private static Stream<Arguments> 
testAutoModifyParquetWriteLegacyFormatParameterParams() {
+    return Arrays.stream(new Object[][] {
+            {true, null, true},   {false, null, null},
+            {true, false, false}, {true, true, true},
+            {false, true, true},  {false, false, false}})
+        // add useMap option to test both entry-points
+        .map(Arguments::of);
+  }
+
+  @ParameterizedTest
+  @MethodSource("testAutoModifyParquetWriteLegacyFormatParameterParams")
+  void testAutoModifyParquetWriteLegacyFormatParameter(boolean smallDecimal, 
Boolean propValue, Boolean expectedPropValue) {
+    Schema decimalType = smallDecimal ? SMALL_DECIMAL_SCHEMA : 
LARGE_DECIMAL_SCHEMA;
+    Schema schema = Schema.createRecord("test", null, null, false,
+        Collections.singletonList(new Schema.Field("decimalField", 
decimalType, null, null)));
+    TypedProperties options = propValue != null
+        ? 
TypedProperties.fromMap(Collections.singletonMap(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key(),
 String.valueOf(propValue)))
+        : new TypedProperties();
+
+    HoodieDataTypeUtils.tryOverrideParquetWriteLegacyFormatProperty(options, 
schema);
+    Boolean finalPropValue =
+        
Option.ofNullable(options.get(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key()))
+            .map(value -> Boolean.parseBoolean(value.toString()))
+            .orElse(null);
+    assertEquals(expectedPropValue, finalPropValue);
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 078f72d6a9f..1b005091695 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -93,6 +93,7 @@ import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
 import java.util.TimeZone;
+import java.util.function.Function;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
@@ -1144,6 +1145,70 @@ public class HoodieAvroUtils {
     throw new HoodieAvroSchemaException(String.format("cannot support rewrite 
value for schema type: %s since the old schema type is: %s", newSchema, 
oldSchema));
   }
 
+  /**
+   * Checks whether the provided schema contains a decimal with a precision 
less than or equal to 18,
+   * which allows the decimal to be stored as int/long instead of a fixed size 
byte array in
+   * <a 
href="https://github.com/apache/parquet-format/blob/master/LogicalTypes.md";>parquet
 logical types</a>
+   * @param schema the input schema to search
+   * @return true if the schema contains a small precision decimal field and 
false otherwise
+   */
+  public static boolean hasSmallPrecisionDecimalField(Schema schema) {
+    return hasDecimalWithCondition(schema, 
HoodieAvroUtils::isSmallPrecisionDecimalField);
+  }
+
+  private static boolean hasDecimalWithCondition(Schema schema, 
Function<Decimal, Boolean> condition) {
+    switch (schema.getType()) {
+      case RECORD:
+        for (Field field : schema.getFields()) {
+          if (hasDecimalWithCondition(field.schema(), condition)) {
+            return true;
+          }
+        }
+        return false;
+      case ARRAY:
+        return hasDecimalWithCondition(schema.getElementType(), condition);
+      case MAP:
+        return hasDecimalWithCondition(schema.getValueType(), condition);
+      case UNION:
+        return hasDecimalWithCondition(getActualSchemaFromUnion(schema, null), 
condition);
+      default:
+        if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
+          Decimal decimal = (Decimal) schema.getLogicalType();
+          return condition.apply(decimal);
+        } else {
+          return false;
+        }
+    }
+  }
+
+  private static boolean isSmallPrecisionDecimalField(Decimal decimal) {
+    return decimal.getPrecision() <= 18;
+  }
+
+  /**
+   * Checks whether the provided schema contains a list or map field.
+   * @param schema input
+   * @return true if a list or map is present, false otherwise
+   */
+  public static boolean hasListOrMapField(Schema schema) {
+    switch (schema.getType()) {
+      case RECORD:
+        for (Field field : schema.getFields()) {
+          if (hasListOrMapField(field.schema())) {
+            return true;
+          }
+        }
+        return false;
+      case ARRAY:
+      case MAP:
+        return true;
+      case UNION:
+        return hasListOrMapField(getActualSchemaFromUnion(schema, null));
+      default:
+        return false;
+    }
+  }
+
   /**
    * Avro does not support type promotion from numbers to string. This 
function returns true if
    * it will be necessary to rewrite the record to support this promotion.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
index 235754e624b..fcf3bb033d8 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
@@ -129,7 +129,7 @@ public class HoodieStorageConfig extends HoodieConfig {
 
   public static final ConfigProperty<String> 
PARQUET_WRITE_LEGACY_FORMAT_ENABLED = ConfigProperty
       .key("hoodie.parquet.writelegacyformat.enabled")
-      .defaultValue("false")
+      .noDefaultValue()
       .markAdvanced()
       .withDocumentation("Sets spark.sql.parquet.writeLegacyFormat. If true, 
data will be written in a way of Spark 1.4 and earlier. "
           + "For example, decimal values will be written in Parquet's 
fixed-length byte array format which other systems such as Apache Hive and 
Apache Impala use. "
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index f883870440f..776fd6bbf0b 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -82,6 +82,7 @@ import static 
org.apache.hudi.avro.HoodieAvroUtils.wrapValueIntoAvro;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -731,4 +732,31 @@ public class TestHoodieAvroUtils {
       assertArrayEquals(new Object[] {"partition1", "val1", 3.5}, 
sortColumnValues);
     }
   }
+
+  @Test
+  void testHasListOrMapField() {
+    Schema nestedList = Schema.createRecord("nestedList", null, null, false, 
Arrays.asList(
+        new Schema.Field("intField", Schema.create(Schema.Type.INT), null, 
null),
+        new Schema.Field("nested", Schema.createRecord("nestedSchema", null, 
null, false, Collections.singletonList(
+            new Schema.Field("listField", 
Schema.createArray(Schema.create(Schema.Type.INT)), null, null)
+        )), null, null)
+    ));
+    Schema nestedMap = Schema.createRecord("nestedMap", null, null, false, 
Arrays.asList(
+        new Schema.Field("intField", Schema.create(Schema.Type.INT), null, 
null),
+        new Schema.Field("nested", 
Schema.createUnion(Schema.create(Schema.Type.NULL),
+            Schema.createRecord("nestedSchema", null, null, false,
+                Collections.singletonList(new Schema.Field("mapField", 
Schema.createMap(Schema.create(Schema.Type.INT)), null, null)
+                ))), null, null)
+    ));
+    assertTrue(HoodieAvroUtils.hasListOrMapField(nestedList));
+    assertTrue(HoodieAvroUtils.hasListOrMapField(nestedMap));
+    assertFalse(HoodieAvroUtils.hasListOrMapField(new 
Schema.Parser().parse(EXAMPLE_SCHEMA)));
+  }
+
+  @Test
+  void testHasSmallPrecisionDecimalField() {
+    assertTrue(HoodieAvroUtils.hasSmallPrecisionDecimalField(new 
Schema.Parser().parse(SCHEMA_WITH_DECIMAL_FIELD)));
+    assertFalse(HoodieAvroUtils.hasSmallPrecisionDecimalField(new 
Schema.Parser().parse(SCHEMA_WITH_AVRO_TYPES)));
+    assertFalse(HoodieAvroUtils.hasSmallPrecisionDecimalField(new 
Schema.Parser().parse(EXAMPLE_SCHEMA)));
+  }
 }
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
index 184f73402a3..81c6a3dc8d3 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
@@ -86,7 +86,7 @@ public class HoodieDeltaStreamerWrapper extends 
HoodieDeltaStreamer {
         .setBasePath(service.getCfg().targetBasePath)
         .build();
     String instantTime = InProcessTimeGenerator.createNewInstantTime();
-    InputBatch inputBatch = service.readFromSource(instantTime, metaClient);
+    InputBatch inputBatch = service.readFromSource(instantTime, 
metaClient).getLeft();
     return Pair.of(inputBatch.getSchemaProvider(), 
Pair.of(inputBatch.getCheckpointForNextBatch(), (JavaRDD<HoodieRecord>) 
inputBatch.getBatch().get()));
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
index c86e2c908c3..e794c723185 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -22,7 +22,6 @@ import org.apache.hudi.client.HoodieWriteResult;
 import org.apache.hudi.client.SparkRDDReadClient;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieKey;
@@ -48,9 +47,7 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.HoodieDataTypeUtils;
 import org.apache.spark.sql.Row;
-import org.apache.spark.sql.types.StructType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -303,39 +300,4 @@ public class DataSourceUtils {
         
HoodieWriteConfig.newBuilder().withPath(parameters.get("path")).withProps(parameters).build();
     return dropDuplicates(jssc, incomingHoodieRecords, writeConfig);
   }
-
-  /**
-   * Checks whether default value (false) of 
"hoodie.parquet.writelegacyformat.enabled" should be
-   * overridden in case:
-   *
-   * <ul>
-   *   <li>Property has not been explicitly set by the writer</li>
-   *   <li>Data schema contains {@code DecimalType} that would be affected by 
it</li>
-   * </ul>
-   *
-   * If both of the aforementioned conditions are true, will override the 
default value of the config
-   * (by essentially setting the value) to make sure that the produced Parquet 
data files could be
-   * read by {@code AvroParquetReader}
-   *
-   * @param properties properties specified by the writer
-   * @param schema schema of the dataset being written
-   */
-  public static void tryOverrideParquetWriteLegacyFormatProperty(Map<String, 
String> properties, StructType schema) {
-    if (HoodieDataTypeUtils.hasSmallPrecisionDecimalType(schema)
-        && 
properties.get(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key()) 
== null) {
-      // ParquetWriteSupport writes DecimalType to parquet as INT32/INT64 when 
the scale of decimalType
-      // is less than {@code Decimal.MAX_LONG_DIGITS}, but {@code 
AvroParquetReader} which is used by
-      // {@code HoodieParquetReader} does not support DecimalType encoded as 
INT32/INT64 as.
-      //
-      // To work this problem around we're checking whether
-      //    - Schema contains any decimals that could be encoded as INT32/INT64
-      //    - {@code HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED} 
has not been explicitly
-      //    set by the writer
-      //
-      // If both of these conditions are true, then we override the default 
value of {@code
-      // HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED} and set it 
to "true"
-      LOG.warn("Small Decimal Type found in the persisted schema, reverting 
default value of 'hoodie.parquet.writelegacyformat.enabled' to true");
-      
properties.put(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key(), 
"true");
-    }
-  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 351f68709ff..48d8dec474a 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -26,7 +26,6 @@ import org.apache.hadoop.hive.shims.ShimLoader
 import 
org.apache.hudi.AutoRecordKeyGenerationUtils.mayBeValidateParamsForAutoGenerationOfRecordKeys
 import org.apache.hudi.AvroConversionUtils.{convertAvroSchemaToStructType, 
convertStructTypeToAvroSchema, getAvroRecordNameAndNamespace}
 import 
org.apache.hudi.DataSourceOptionsHelper.fetchMissingWriteConfigsFromTableConfig
-import 
org.apache.hudi.DataSourceUtils.tryOverrideParquetWriteLegacyFormatProperty
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.HoodieConversionUtils.{toProperties, toScalaOption}
 import org.apache.hudi.HoodieSparkSqlWriter.StreamingWriteParams
@@ -68,6 +67,7 @@ import org.apache.hudi.sync.common.util.SyncUtilHelpers
 import 
org.apache.hudi.sync.common.util.SyncUtilHelpers.getHoodieMetaSyncException
 import org.apache.hudi.util.SparkKeyGenUtils
 import org.apache.spark.api.java.JavaSparkContext
+import 
org.apache.spark.sql.HoodieDataTypeUtils.tryOverrideParquetWriteLegacyFormatProperty
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
index 6026a47c697..365386a9c8b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
@@ -20,7 +20,6 @@ package org.apache.hudi;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.SparkRDDWriteClient;
-import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -46,18 +45,9 @@ import org.apache.avro.generic.IndexedRecord;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
-import org.apache.spark.sql.types.DecimalType;
-import org.apache.spark.sql.types.DecimalType$;
-import org.apache.spark.sql.types.Metadata;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.sql.types.StructType$;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.Mock;
@@ -67,13 +57,9 @@ import java.io.IOException;
 import java.math.BigDecimal;
 import java.time.LocalDate;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Map;
-import java.util.stream.Stream;
 
-import static 
org.apache.hudi.DataSourceUtils.tryOverrideParquetWriteLegacyFormatProperty;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.instanceOf;
@@ -90,9 +76,6 @@ import static org.mockito.Mockito.when;
 @ExtendWith(MockitoExtension.class)
 public class TestDataSourceUtils {
 
-  private static final String HIVE_DATABASE = "testdb1";
-  private static final String HIVE_TABLE = "hive_trips";
-
   @Mock
   private SparkRDDWriteClient hoodieWriteClient;
 
@@ -294,43 +277,6 @@ public class TestDataSourceUtils {
     }
   }
 
-  @ParameterizedTest
-  @MethodSource("testAutoModifyParquetWriteLegacyFormatParameterParams")
-  public void testAutoModifyParquetWriteLegacyFormatParameter(boolean 
smallDecimal, Boolean propValue, Boolean expectedPropValue) {
-    DecimalType decimalType;
-    if (smallDecimal) {
-      decimalType = DecimalType$.MODULE$.apply(10, 2);
-    } else {
-      decimalType = DecimalType$.MODULE$.apply(38, 10);
-    }
-
-    StructType structType = StructType$.MODULE$.apply(
-        Arrays.asList(
-            StructField.apply("d1", decimalType, false, Metadata.empty())
-        )
-    );
-
-    Map<String, String> options = propValue != null
-        ? 
Collections.singletonMap(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key(),
 String.valueOf(propValue))
-        : new HashMap<>();
-
-    tryOverrideParquetWriteLegacyFormatProperty(options, structType);
-
-    Boolean finalPropValue =
-        
Option.ofNullable(options.get(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key()))
-            .map(Boolean::parseBoolean)
-            .orElse(null);
-    assertEquals(expectedPropValue, finalPropValue);
-  }
-
-  private static Stream<Arguments> 
testAutoModifyParquetWriteLegacyFormatParameterParams() {
-    return Arrays.stream(new Object[][] {
-        {true, null, true},   {false, null, null},
-        {true, false, false}, {true, true, true},
-        {false, true, true},  {false, false, false}
-    }).map(Arguments::of);
-  }
-
   @Test
   public void testSerHoodieMetadataPayload() throws IOException {
     String partitionPath = "2022/10/01";
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java
index f56e0238f62..67b0b929d6c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java
@@ -241,9 +241,7 @@ class TestHoodieSparkMergeOnReadTableClustering extends 
SparkClientFunctionalTes
                                        HoodieWriteConfig cfg,
                                        HoodieTestDataGenerator dataGen,
                                        boolean clusteringAsRow) {
-    if (clusteringAsRow) {
-      client.getConfig().setValue(DataSourceWriteOptions.ENABLE_ROW_WRITER(), 
"true");
-    }
+    client.getConfig().setValue(DataSourceWriteOptions.ENABLE_ROW_WRITER(), 
Boolean.toString(clusteringAsRow));
 
     client.cluster(clusteringCommitTime, true);
     metaClient = HoodieTableMetaClient.reload(metaClient);
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkSortAndSizeClustering.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkSortAndSizeClustering.java
index e748c7ace53..47a496d1a78 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkSortAndSizeClustering.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkSortAndSizeClustering.java
@@ -31,15 +31,19 @@ import 
org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
 import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
 import org.apache.hudi.testutils.MetadataMergeWriteStatus;
 
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -54,6 +58,8 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
 public class TestSparkSortAndSizeClustering extends 
HoodieSparkClientTestHarness {
 
 
@@ -115,17 +121,30 @@ public class TestSparkSortAndSizeClustering extends 
HoodieSparkClientTestHarness
         metaClient, 
HoodieTimeline.getClusteringCommitRequestedInstant(clusteringTime)).map(Pair::getRight).get();
 
     List<HoodieClusteringGroup> inputGroups = plan.getInputGroups();
-    Assertions.assertEquals(1, inputGroups.size(), "Clustering plan will 
contain 1 input group");
+    assertEquals(1, inputGroups.size(), "Clustering plan will contain 1 input 
group");
 
     Integer outputFileGroups = 
plan.getInputGroups().get(0).getNumOutputFileGroups();
-    Assertions.assertEquals(2, outputFileGroups, "Clustering plan will 
generate 2 output groups");
+    assertEquals(2, outputFileGroups, "Clustering plan will generate 2 output 
groups");
 
     HoodieWriteMetadata writeMetadata = writeClient.cluster(clusteringTime, 
true);
     List<HoodieWriteStat> writeStats = 
(List<HoodieWriteStat>)writeMetadata.getWriteStats().get();
-    Assertions.assertEquals(2, writeStats.size(), "Clustering should write 2 
files");
+    assertEquals(2, writeStats.size(), "Clustering should write 2 files");
 
     List<Row> rows = readRecords();
-    Assertions.assertEquals(numRecords, rows.size());
+    assertEquals(numRecords, rows.size());
+    validateDecimalTypeAfterClustering(writeStats);
+  }
+
+  // Validate that clustering produces decimals in legacy format
+  private void validateDecimalTypeAfterClustering(List<HoodieWriteStat> 
writeStats) {
+    writeStats.stream().map(writeStat -> new 
StoragePath(metaClient.getBasePath(), writeStat.getPath())).forEach(writtenPath 
-> {
+      MessageType schema = ParquetUtils.readMetadata(storage, writtenPath)
+          .getFileMetaData().getSchema();
+      int index = schema.getFieldIndex("height");
+      Type decimalType = schema.getFields().get(index);
+      assertEquals("DECIMAL", decimalType.getOriginalType().toString());
+      assertEquals("FIXED_LEN_BYTE_ARRAY", 
decimalType.asPrimitiveType().getPrimitiveTypeName().toString());
+    });
   }
 
   private List<WriteStatus> writeData(String commitTime, int totalRecords, 
boolean doCommit) {
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java
 
b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java
index 3b3b8eafb86..f46139c09c8 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java
+++ 
b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.config.HoodieInternalConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 
+import org.apache.spark.sql.HoodieDataTypeUtils;
 import org.apache.spark.sql.SaveMode;
 import org.apache.spark.sql.sources.DataSourceRegister;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
@@ -36,8 +37,6 @@ import org.apache.spark.sql.types.StructType;
 import java.util.Map;
 import java.util.Optional;
 
-import static 
org.apache.hudi.DataSourceUtils.tryOverrideParquetWriteLegacyFormatProperty;
-
 /**
  * DataSource V2 implementation for managing internal write logic. Only called 
internally.
  */
@@ -69,7 +68,7 @@ public class DefaultSource extends BaseDefaultSource 
implements DataSourceV2,
         HoodieTableConfig.POPULATE_META_FIELDS.defaultValue());
     Map<String, String> properties = options.asMap();
     // Auto set the value of "hoodie.parquet.writelegacyformat.enabled"
-    tryOverrideParquetWriteLegacyFormatProperty(properties, schema);
+    
HoodieDataTypeUtils.tryOverrideParquetWriteLegacyFormatProperty(properties, 
schema);
     // 1st arg to createHoodieConfig is not really required to be set. but 
passing it anyways.
     HoodieWriteConfig config = 
DataSourceUtils.createHoodieConfig(options.get(HoodieWriteConfig.AVRO_SCHEMA_STRING.key()).get(),
 path, tblName, properties);
     boolean arePartitionRecordsSorted = 
HoodieInternalConfig.getBulkInsertIsPartitionRecordsSorted(
diff --git 
a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java
 
b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java
index 233fb20354d..01da1490cf4 100644
--- 
a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java
+++ 
b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java
@@ -35,8 +35,6 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 import java.util.HashMap;
 import java.util.Map;
 
-import static 
org.apache.hudi.DataSourceUtils.tryOverrideParquetWriteLegacyFormatProperty;
-
 /**
  * DataSource V2 implementation for managing internal write logic. Only called 
internally.
  * This class is only compatible with datasource V2 API in Spark 3.
@@ -61,7 +59,7 @@ public class DefaultSource extends BaseDefaultSource 
implements TableProvider {
     // Create a new map as the properties is an unmodifiableMap on Spark 3.2.0
     Map<String, String> newProps = new HashMap<>(properties);
     // Auto set the value of "hoodie.parquet.writelegacyformat.enabled"
-    tryOverrideParquetWriteLegacyFormatProperty(newProps, schema);
+    HoodieDataTypeUtils.tryOverrideParquetWriteLegacyFormatProperty(newProps, 
schema);
     // 1st arg to createHoodieConfig is not really required to be set. but 
passing it anyways.
     HoodieWriteConfig config = 
DataSourceUtils.createHoodieConfig(newProps.get(HoodieWriteConfig.AVRO_SCHEMA_STRING.key()),
 path, tblName, newProps);
     return new HoodieDataSourceInternalTable(instantTime, config, schema, 
getSparkSession(),
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 43f227ac78a..a501e9605b8 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -112,6 +112,7 @@ import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.rdd.RDD;
 import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.HoodieDataTypeUtils;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.slf4j.Logger;
@@ -259,14 +260,12 @@ public class StreamSync implements Serializable, 
Closeable {
 
   private final boolean autoGenerateRecordKeys;
 
-  private final boolean useRowWriter;
-
   @VisibleForTesting
   StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession,
              TypedProperties props, HoodieSparkEngineContext 
hoodieSparkContext, HoodieStorage storage, Configuration conf,
              Function<SparkRDDWriteClient, Boolean> 
onInitializingHoodieWriteClient, SchemaProvider userProvidedSchemaProvider,
              Option<BaseErrorTableWriter> errorTableWriter, 
SourceFormatAdapter formatAdapter, Option<Transformer> transformer,
-             boolean useRowWriter, boolean autoGenerateRecordKeys) {
+             boolean autoGenerateRecordKeys) {
     this.cfg = cfg;
     this.hoodieSparkContext = hoodieSparkContext;
     this.sparkSession = sparkSession;
@@ -282,7 +281,6 @@ public class StreamSync implements Serializable, Closeable {
     this.errorTableWriter = errorTableWriter;
     this.formatAdapter = formatAdapter;
     this.transformer = transformer;
-    this.useRowWriter = useRowWriter;
   }
 
   @Deprecated
@@ -327,9 +325,6 @@ public class StreamSync implements Serializable, Closeable {
 
     Supplier<Option<Schema>> schemaSupplier = schemaProvider == null ? 
Option::empty : () -> Option.ofNullable(schemaProvider.getSourceSchema());
     this.transformer = 
UtilHelpers.createTransformer(Option.ofNullable(cfg.transformerClassNames), 
schemaSupplier, this.errorTableWriter.isPresent());
-    // enable row writer only when operation is BULK_INSERT, and source is ROW 
type and if row writer is not explicitly disabled.
-    this.useRowWriter = this.cfg.operation == WriteOperationType.BULK_INSERT 
&& source.getSourceType() == Source.SourceType.ROW
-        && 
this.props.getBoolean(DataSourceWriteOptions.ENABLE_ROW_WRITER().key(), false);
   }
 
   /**
@@ -443,9 +438,11 @@ public class StreamSync implements Serializable, Closeable 
{
         .build();
     String instantTime = metaClient.createNewInstantTime();
 
-    InputBatch inputBatch = readFromSource(instantTime, metaClient);
+    Pair<InputBatch, Boolean> inputBatchAndUseRowWriter = 
readFromSource(instantTime, metaClient);
 
-    if (inputBatch != null) {
+    if (inputBatchAndUseRowWriter != null) {
+      InputBatch inputBatch = inputBatchAndUseRowWriter.getLeft();
+      boolean useRowWriter = inputBatchAndUseRowWriter.getRight();
 
       // this is the first input batch. If schemaProvider not set, use it and 
register Avro Schema and start
       // compactor
@@ -489,7 +486,7 @@ public class StreamSync implements Serializable, Closeable {
         }
       }
 
-      result = writeToSinkAndDoMetaSync(instantTime, inputBatch, metrics, 
overallTimerContext);
+      result = writeToSinkAndDoMetaSync(instantTime, inputBatch, useRowWriter, 
metrics, overallTimerContext);
     }
     // refresh schemas if need be before next batch
     if (schemaProvider != null) {
@@ -518,10 +515,10 @@ public class StreamSync implements Serializable, 
Closeable {
   /**
    * Read from Upstream Source and apply transformation if needed.
    *
-   * @return Pair<InputBatch and Boolean> Input data read from upstream 
source, and boolean is true if empty.
+   * @return Pair<InputBatch and Boolean> Input data read from upstream 
source, and boolean is true if the result should use the row writer path.
    * @throws Exception in case of any Exception
    */
-  public InputBatch readFromSource(String instantTime, HoodieTableMetaClient 
metaClient) throws IOException {
+  public Pair<InputBatch, Boolean> readFromSource(String instantTime, 
HoodieTableMetaClient metaClient) throws IOException {
     // Retrieve the previous round checkpoints, if any
     Option<String> resumeCheckpointStr = Option.empty();
     if (commitsTimelineOpt.isPresent()) {
@@ -536,7 +533,7 @@ public class StreamSync implements Serializable, Closeable {
 
     int maxRetryCount = cfg.retryOnSourceFailures ? cfg.maxRetryCount : 1;
     int curRetryCount = 0;
-    InputBatch sourceDataToSync = null;
+    Pair<InputBatch, Boolean> sourceDataToSync = null;
     while (curRetryCount++ < maxRetryCount && sourceDataToSync == null) {
       try {
         sourceDataToSync = 
fetchFromSourceAndPrepareRecords(resumeCheckpointStr, instantTime, metaClient);
@@ -556,7 +553,7 @@ public class StreamSync implements Serializable, Closeable {
     return sourceDataToSync;
   }
 
-  private InputBatch fetchFromSourceAndPrepareRecords(Option<String> 
resumeCheckpointStr, String instantTime,
+  private Pair<InputBatch, Boolean> 
fetchFromSourceAndPrepareRecords(Option<String> resumeCheckpointStr, String 
instantTime,
         HoodieTableMetaClient metaClient) {
     hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Fetching 
next batch: " + cfg.targetTableName);
     HoodieRecordType recordType = createRecordMerger(props).getRecordType();
@@ -567,7 +564,9 @@ public class StreamSync implements Serializable, Closeable {
       throw new UnsupportedOperationException("Spark record only support 
parquet log.");
     }
 
-    InputBatch inputBatch = fetchNextBatchFromSource(resumeCheckpointStr, 
metaClient);
+    Pair<InputBatch, Boolean> inputBatchAndRowWriterEnabled = 
fetchNextBatchFromSource(resumeCheckpointStr, metaClient);
+    InputBatch inputBatch = inputBatchAndRowWriterEnabled.getLeft();
+    boolean useRowWriter = inputBatchAndRowWriterEnabled.getRight();
     final String checkpointStr = inputBatch.getCheckpointForNextBatch();
     final SchemaProvider schemaProvider = inputBatch.getSchemaProvider();
 
@@ -584,21 +583,35 @@ public class StreamSync implements Serializable, 
Closeable {
     hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Checking 
if input is empty: " + cfg.targetTableName);
 
     if (useRowWriter) { // no additional processing required for row writer.
-      return inputBatch;
+      return Pair.of(inputBatch, true);
     } else {
       Option<JavaRDD<HoodieRecord>> recordsOpt = 
HoodieStreamerUtils.createHoodieRecords(cfg, props, inputBatch.getBatch(), 
schemaProvider,
           recordType, autoGenerateRecordKeys, instantTime, errorTableWriter);
-      return new InputBatch(recordsOpt, checkpointStr, schemaProvider);
+      return Pair.of(new InputBatch(recordsOpt, checkpointStr, 
schemaProvider), false);
     }
   }
 
+  @VisibleForTesting
+  boolean canUseRowWriter(Schema targetSchema) {
+    // enable row writer only when operation is BULK_INSERT, and source is ROW 
type and if row writer is not explicitly disabled.
+    boolean rowWriterEnabled = isRowWriterEnabled();
+    return rowWriterEnabled && targetSchema != null && 
HoodieDataTypeUtils.canUseRowWriter(targetSchema, conf);
+  }
+
+  @VisibleForTesting
+  boolean isRowWriterEnabled() {
+    return cfg.operation == WriteOperationType.BULK_INSERT && 
formatAdapter.getSource().getSourceType() == Source.SourceType.ROW
+        && this.props.getBoolean("hoodie.streamer.write.row.writer.enable", 
false);
+  }
+
   /**
    * Fetch data from source, apply transformations if any, align with schema 
from schema provider if need be and return the input batch.
    * @param resumeCheckpointStr checkpoint to resume from source.
-   * @return {@link InputBatch} containing the new batch of data from source 
along with new checkpoint and schema provider instance to use.
+   * @return Pair with {@link InputBatch} containing the new batch of data 
from source along with new checkpoint and schema provider instance to use,
+   *         and a boolean set to `true` if row writer can be used.
    */
   @VisibleForTesting
-  InputBatch fetchNextBatchFromSource(Option<String> resumeCheckpointStr, 
HoodieTableMetaClient metaClient) {
+  Pair<InputBatch, Boolean> fetchNextBatchFromSource(Option<String> 
resumeCheckpointStr, HoodieTableMetaClient metaClient) {
     Option<JavaRDD<GenericRecord>> avroRDDOptional = null;
     String checkpointStr = null;
     SchemaProvider schemaProvider = null;
@@ -621,6 +634,7 @@ public class StreamSync implements Serializable, Closeable {
           && this.userProvidedSchemaProvider.getTargetSchema() != 
InputBatch.NULL_SCHEMA) {
         // Let's deduce the schema provider for writer side first!
         schemaProvider = 
getDeducedSchemaProvider(this.userProvidedSchemaProvider.getTargetSchema(), 
this.userProvidedSchemaProvider, metaClient);
+        boolean useRowWriter = 
canUseRowWriter(schemaProvider.getTargetSchema());
         if (useRowWriter) {
           inputBatchForWriter = new InputBatch(transformed, checkpointStr, 
schemaProvider);
         } else {
@@ -657,7 +671,7 @@ public class StreamSync implements Serializable, Closeable {
             .orElseGet(dataAndCheckpoint.getSchemaProvider()::getTargetSchema);
         schemaProvider = getDeducedSchemaProvider(incomingSchema, 
dataAndCheckpoint.getSchemaProvider(), metaClient);
 
-        if (useRowWriter) {
+        if (canUseRowWriter(schemaProvider.getTargetSchema())) {
           inputBatchForWriter = new InputBatch(transformed, checkpointStr, 
schemaProvider);
         } else {
           // Rewrite transformed records into the expected target schema
@@ -666,11 +680,17 @@ public class StreamSync implements Serializable, 
Closeable {
         }
       }
     } else {
-      if (useRowWriter) {
+      if (isRowWriterEnabled()) {
         InputBatch inputBatchNeedsDeduceSchema = 
formatAdapter.fetchNewDataInRowFormat(resumeCheckpointStr, cfg.sourceLimit);
-        inputBatchForWriter = new 
InputBatch<>(inputBatchNeedsDeduceSchema.getBatch(), 
inputBatchNeedsDeduceSchema.getCheckpointForNextBatch(),
-            
getDeducedSchemaProvider(inputBatchNeedsDeduceSchema.getSchemaProvider().getTargetSchema(),
 inputBatchNeedsDeduceSchema.getSchemaProvider(), metaClient));
-      } else {
+        if 
(canUseRowWriter(inputBatchNeedsDeduceSchema.getSchemaProvider().getTargetSchema()))
 {
+          inputBatchForWriter = new 
InputBatch<>(inputBatchNeedsDeduceSchema.getBatch(), 
inputBatchNeedsDeduceSchema.getCheckpointForNextBatch(),
+              
getDeducedSchemaProvider(inputBatchNeedsDeduceSchema.getSchemaProvider().getTargetSchema(),
 inputBatchNeedsDeduceSchema.getSchemaProvider(), metaClient));
+        } else {
+          LOG.warn("Row-writer is enabled but cannot be used due to the target 
schema");
+        }
+      }
+      // if row writer was enabled but the target schema prevents us from 
using it, do not use the row writer
+      if (inputBatchForWriter == null) {
         // Pull the data from the source & prepare the write
         InputBatch<JavaRDD<GenericRecord>> dataAndCheckpoint = 
formatAdapter.fetchNewDataInAvroFormat(resumeCheckpointStr, cfg.sourceLimit);
         checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
@@ -694,10 +714,10 @@ public class StreamSync implements Serializable, 
Closeable {
         }
       }
     }
-    if (useRowWriter) {
-      return inputBatchForWriter;
+    if (inputBatchForWriter != null) {
+      return Pair.of(inputBatchForWriter, true);
     } else {
-      return new InputBatch(avroRDDOptional, checkpointStr, schemaProvider);
+      return Pair.of(new InputBatch(avroRDDOptional, checkpointStr, 
schemaProvider), false);
     }
   }
 
@@ -829,16 +849,18 @@ public class StreamSync implements Serializable, 
Closeable {
    *
    * @param instantTime         instant time to use for ingest.
    * @param inputBatch          input batch that contains the records, 
checkpoint, and schema provider
+   * @param useRowWriter        whether to use row writer
    * @param metrics             Metrics
    * @param overallTimerContext Timer Context
    * @return Option Compaction instant if one is scheduled
    */
   private Pair<Option<String>, JavaRDD<WriteStatus>> 
writeToSinkAndDoMetaSync(String instantTime, InputBatch inputBatch,
+                                                                              
boolean useRowWriter,
                                                                               
HoodieIngestionMetrics metrics,
                                                                               
Timer.Context overallTimerContext) {
     Option<String> scheduledCompactionInstant = Option.empty();
     // write to hudi and fetch result
-    WriteClientWriteResult  writeClientWriteResult = writeToSink(inputBatch, 
instantTime);
+    WriteClientWriteResult  writeClientWriteResult = writeToSink(inputBatch, 
instantTime, useRowWriter);
     JavaRDD<WriteStatus> writeStatusRDD = 
writeClientWriteResult.getWriteStatusRDD();
     Map<String, List<String>> partitionToReplacedFileIds = 
writeClientWriteResult.getPartitionToReplacedFileIds();
 
@@ -961,7 +983,7 @@ public class StreamSync implements Serializable, Closeable {
     throw lastException;
   }
 
-  private WriteClientWriteResult writeToSink(InputBatch inputBatch, String 
instantTime) {
+  private WriteClientWriteResult writeToSink(InputBatch inputBatch, String 
instantTime, boolean useRowWriter) {
     WriteClientWriteResult writeClientWriteResult = null;
     instantTime = startCommit(instantTime, !autoGenerateRecordKeys);
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSyncUnitTests.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSyncUnitTests.java
index f429943532f..6eafe46baef 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSyncUnitTests.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSyncUnitTests.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieErrorTableConfig;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
@@ -116,14 +117,17 @@ public class TestStreamSyncUnitTests {
 
     //Actually create the deltastreamer
     StreamSync streamSync = new StreamSync(cfg, sparkSession, propsSpy, 
hoodieSparkEngineContext,
-        storage, configuration, client -> true, schemaProvider, 
errorTableWriterOption, sourceFormatAdapter, transformerOption, useRowWriter, 
false);
+        storage, configuration, client -> true, schemaProvider, 
errorTableWriterOption, sourceFormatAdapter, transformerOption, false);
     StreamSync spy = spy(streamSync);
+    doReturn(useRowWriter).when(spy).canUseRowWriter(any());
+    doReturn(useRowWriter).when(spy).isRowWriterEnabled();
     SchemaProvider deducedSchemaProvider;
     deducedSchemaProvider = getSchemaProvider("deduced", false);
     doReturn(deducedSchemaProvider).when(spy).getDeducedSchemaProvider(any(), 
any(), any());
 
     //run the method we are unit testing:
-    InputBatch batch = spy.fetchNextBatchFromSource(Option.empty(), 
mock(HoodieTableMetaClient.class));
+    Pair<InputBatch, Boolean> batchAndUseRowWriter = 
spy.fetchNextBatchFromSource(Option.empty(), mock(HoodieTableMetaClient.class));
+    InputBatch batch = batchAndUseRowWriter.getLeft();
 
     //make sure getDeducedSchemaProvider is always called once
     verify(spy, times(1)).getDeducedSchemaProvider(any(), any(), any());
@@ -152,7 +156,7 @@ public class TestStreamSyncUnitTests {
     when(commitsTimeline.lastInstant()).thenReturn(Option.of(hoodieInstant));
 
     StreamSync streamSync = new StreamSync(cfg, sparkSession, props, 
hoodieSparkEngineContext,
-        storage, configuration, client -> true, 
null,Option.empty(),null,Option.empty(),true,true);
+        storage, configuration, client -> true, null, Option.empty(), null, 
Option.empty(), true);
     StreamSync spy = spy(streamSync);
     
doReturn(Option.of(commitMetadata)).when(spy).getLatestCommitMetadataWithValidCheckpointInfo(any());
 

Reply via email to