This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 07de2f0dc6bf878a1631a7e7f0c7628e9984afb6
Author: Jon Vexler <[email protected]>
AuthorDate: Tue Mar 5 04:31:55 2024 -0500

    [HUDI-7479] SQL confs don't propagate to spark row writer properly (#10786)
---
 .../hudi/HoodieDatasetBulkInsertHelper.scala       | 15 +++++++++-----
 .../scala/org/apache/hudi/HoodieSparkUtils.scala   |  2 +-
 .../common/testutils/HoodieTestDataGenerator.java  | 15 +++++++++++++-
 .../deltastreamer/HoodieDeltaStreamerTestBase.java |  9 ++++++--
 .../deltastreamer/TestHoodieDeltaStreamer.java     | 24 +++++++++++++++++-----
 5 files changed, 51 insertions(+), 14 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index 0214b0a1030..d64f2c34ded 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -17,6 +17,7 @@
 
 package org.apache.hudi
 
+import org.apache.hudi.HoodieSparkUtils.injectSQLConf
 import org.apache.hudi.client.WriteStatus
 import org.apache.hudi.client.model.HoodieInternalRow
 import org.apache.hudi.common.config.TypedProperties
@@ -40,11 +41,14 @@ import 
org.apache.spark.sql.HoodieUnsafeUtils.getNumPartitions
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
 import org.apache.spark.sql.catalyst.plans.logical.Project
+import org.apache.spark.sql.execution.SQLConfInjectingRDD
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{StringType, StructField, StructType}
 import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeUtils, Row}
 import org.apache.spark.unsafe.types.UTF8String
 
 import scala.collection.JavaConverters.{asScalaBufferConverter, 
seqAsJavaListConverter}
+import scala.reflect.ClassTag
 
 object HoodieDatasetBulkInsertHelper
   extends ParallelismHelper[DataFrame](toJavaSerializableFunctionUnchecked(df 
=> getNumPartitions(df))) with Logging {
@@ -83,8 +87,8 @@ object HoodieDatasetBulkInsertHelper
       val keyGeneratorClassName = 
config.getStringOrThrow(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME,
         "Key-generator class name is required")
 
-      val prependedRdd: RDD[InternalRow] =
-        df.queryExecution.toRdd.mapPartitions { iter =>
+      val prependedRdd: RDD[InternalRow] = {
+        injectSQLConf(df.queryExecution.toRdd.mapPartitions { iter =>
           val typedProps = new TypedProperties(config.getProps)
           if (autoGenerateRecordKeys) {
             
typedProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, 
String.valueOf(TaskContext.getPartitionId()))
@@ -110,7 +114,8 @@ object HoodieDatasetBulkInsertHelper
             // TODO use mutable row, avoid re-allocating
             new HoodieInternalRow(commitTimestamp, commitSeqNo, recordKey, 
partitionPath, filename, row, false)
           }
-        }
+        }, SQLConf.get)
+      }
 
       val dedupedRdd = if (config.shouldCombineBeforeInsert) {
         dedupeRows(prependedRdd, updatedSchema, config.getPreCombineField, 
SparkHoodieIndexFactory.isGlobalIndex(config), targetParallelism)
@@ -144,7 +149,7 @@ object HoodieDatasetBulkInsertHelper
                  arePartitionRecordsSorted: Boolean,
                  shouldPreserveHoodieMetadata: Boolean): 
HoodieData[WriteStatus] = {
     val schema = dataset.schema
-    val writeStatuses = dataset.queryExecution.toRdd.mapPartitions(iter => {
+    val writeStatuses = 
injectSQLConf(dataset.queryExecution.toRdd.mapPartitions(iter => {
       val taskContextSupplier: TaskContextSupplier = 
table.getTaskContextSupplier
       val taskPartitionId = taskContextSupplier.getPartitionIdSupplier.get
       val taskId = taskContextSupplier.getStageIdSupplier.get.toLong
@@ -189,7 +194,7 @@ object HoodieDatasetBulkInsertHelper
       }
 
       writer.getWriteStatuses.asScala.iterator
-    }).collect()
+    }), SQLConf.get).collect()
     table.getContext.parallelize(writeStatuses.toList.asJava)
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index 975135c13d5..03d977f6fc9 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -128,7 +128,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with 
SparkVersionsSupport wi
     }, SQLConf.get)
   }
 
-  private def injectSQLConf[T: ClassTag](rdd: RDD[T], conf: SQLConf): RDD[T] =
+  def injectSQLConf[T: ClassTag](rdd: RDD[T], conf: SQLConf): RDD[T] =
     new SQLConfInjectingRDD(rdd, conf)
 
   def safeCreateRDD(df: DataFrame, structName: String, recordNamespace: 
String, reconcileToLatestSchema: Boolean,
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 5e467e84bfb..2adaa74e648 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -91,6 +91,13 @@ import static 
org.apache.hudi.common.util.ValidationUtils.checkState;
  */
 public class HoodieTestDataGenerator implements AutoCloseable {
 
+  /**
+   * You may get a different result due to the upgrading of Spark 3.0: reading 
dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z from Parquet 
INT96 files can be ambiguous,
+   * as the files may be written by Spark 2.x or legacy versions of Hive, 
which uses a legacy hybrid calendar that is different from Spark 3.0+s 
Proleptic Gregorian calendar.
+   * See more details in SPARK-31404.
+   */
+  private boolean makeDatesAmbiguous = false;
+
   // based on examination of sample file, the schema produces the following 
per record size
   public static final int BYTES_PER_RECORD = (int) (1.2 * 1024);
   // with default bloom filter with 60,000 entries and 0.000000001 FPRate
@@ -208,6 +215,11 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
     this(DEFAULT_PARTITION_PATHS);
   }
 
+  public HoodieTestDataGenerator(boolean makeDatesAmbiguous) {
+    this();
+    this.makeDatesAmbiguous = makeDatesAmbiguous;
+  }
+
   @Deprecated
   public HoodieTestDataGenerator(String[] partitionPaths, Map<Integer, 
KeyPartition> keyPartitionMap) {
     // NOTE: This used as a workaround to make sure that new instantiations of 
the generator
@@ -392,7 +404,8 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
     rec.put("nation", ByteBuffer.wrap(bytes));
     long randomMillis = genRandomTimeMillis(rand);
     Instant instant = Instant.ofEpochMilli(randomMillis);
-    rec.put("current_date", (int) LocalDateTime.ofInstant(instant, 
ZoneOffset.UTC).toLocalDate().toEpochDay());
+    rec.put("current_date", makeDatesAmbiguous ? -1000000 :
+        (int) LocalDateTime.ofInstant(instant, 
ZoneOffset.UTC).toLocalDate().toEpochDay());
     rec.put("current_ts", randomMillis);
 
     BigDecimal bigDecimal = new BigDecimal(String.format(Locale.ENGLISH, 
"%5f", rand.nextFloat()));
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index 58b5d79883e..9af764e3d85 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -316,9 +316,14 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
   }
 
   protected static HoodieTestDataGenerator prepareParquetDFSFiles(int 
numRecords, String baseParquetPath, String fileName, boolean useCustomSchema,
-                                                                        String 
schemaStr, Schema schema) throws IOException {
+                                                                  String 
schemaStr, Schema schema) throws IOException {
+    return prepareParquetDFSFiles(numRecords, baseParquetPath, fileName, 
useCustomSchema, schemaStr, schema, false);
+  }
+
+  protected static HoodieTestDataGenerator prepareParquetDFSFiles(int 
numRecords, String baseParquetPath, String fileName, boolean useCustomSchema,
+                                                                        String 
schemaStr, Schema schema, boolean makeDatesAmbiguous) throws IOException {
     String path = baseParquetPath + "/" + fileName;
-    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    HoodieTestDataGenerator dataGenerator = new 
HoodieTestDataGenerator(makeDatesAmbiguous);
     if (useCustomSchema) {
       Helpers.saveParquetToDFS(Helpers.toGenericRecords(
           dataGenerator.generateInsertsAsPerSchema("000", numRecords, 
schemaStr),
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 263389af698..516e323766d 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -1403,20 +1403,34 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   @Test
   public void testBulkInsertRowWriterContinuousModeWithAsyncClustering() 
throws Exception {
     testBulkInsertRowWriterContinuousMode(false, null, false,
-        getTableServicesConfigs(2000, "false", "", "", "true", "3"));
+        getTableServicesConfigs(2000, "false", "", "", "true", "3"), false);
   }
 
   @Test
   public void testBulkInsertRowWriterContinuousModeWithInlineClustering() 
throws Exception {
     testBulkInsertRowWriterContinuousMode(false, null, false,
-        getTableServicesConfigs(2000, "false", "true", "3", "false", ""));
+        getTableServicesConfigs(2000, "false", "true", "3", "false", ""), 
false);
   }
 
-  private void testBulkInsertRowWriterContinuousMode(Boolean 
useSchemaProvider, List<String> transformerClassNames, boolean testEmptyBatch, 
List<String> customConfigs) throws Exception {
+  @Test
+  public void 
testBulkInsertRowWriterContinuousModeWithInlineClusteringAmbiguousDates() 
throws Exception {
+    
sparkSession.sqlContext().setConf("spark.sql.parquet.datetimeRebaseModeInWrite",
 "LEGACY");
+    
sparkSession.sqlContext().setConf("spark.sql.avro.datetimeRebaseModeInWrite", 
"LEGACY");
+    
sparkSession.sqlContext().setConf("spark.sql.parquet.int96RebaseModeInWrite", 
"LEGACY");
+    
sparkSession.sqlContext().setConf("spark.sql.parquet.datetimeRebaseModeInRead", 
"LEGACY");
+    
sparkSession.sqlContext().setConf("spark.sql.avro.datetimeRebaseModeInRead", 
"LEGACY");
+    
sparkSession.sqlContext().setConf("spark.sql.parquet.int96RebaseModeInRead", 
"LEGACY");
+    testBulkInsertRowWriterContinuousMode(false, null, false,
+        getTableServicesConfigs(2000, "false", "true", "3",
+            "false", ""), true);
+  }
+
+  private void testBulkInsertRowWriterContinuousMode(Boolean 
useSchemaProvider, List<String> transformerClassNames,
+                                                     boolean testEmptyBatch, 
List<String> customConfigs, boolean makeDatesAmbiguous) throws Exception {
     PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
     int parquetRecordsCount = 100;
     boolean hasTransformer = transformerClassNames != null && 
!transformerClassNames.isEmpty();
-    prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, 
FIRST_PARQUET_FILE_NAME, false, null, null);
+    prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, 
FIRST_PARQUET_FILE_NAME, false, null, null, makeDatesAmbiguous);
     prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", 
"target.avsc", PROPS_FILENAME_TEST_PARQUET,
         PARQUET_SOURCE_ROOT, false, "partition_path", testEmptyBatch ? "1" : 
"");
 
@@ -1426,7 +1440,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         int counter = 2;
         while (counter < 100) { // lets keep going. if the test times out, we 
will cancel the future within finally. So, safe to generate 100 batches.
           LOG.info("Generating data for batch " + counter);
-          prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, 
Integer.toString(counter) + ".parquet", false, null, null);
+          prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, 
Integer.toString(counter) + ".parquet", false, null, null, makeDatesAmbiguous);
           counter++;
           Thread.sleep(2000);
         }

Reply via email to