This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 046454d1169 [HUDI-7479] SQL confs don't propagate to spark row writer
properly (#10786)
046454d1169 is described below
commit 046454d11692c420e715bd920bbf24c348cb918c
Author: Jon Vexler <[email protected]>
AuthorDate: Tue Mar 5 04:31:55 2024 -0500
[HUDI-7479] SQL confs don't propagate to spark row writer properly (#10786)
---
.../hudi/HoodieDatasetBulkInsertHelper.scala | 15 +++++++++-----
.../scala/org/apache/hudi/HoodieSparkUtils.scala | 2 +-
.../common/testutils/HoodieTestDataGenerator.java | 15 +++++++++++++-
.../deltastreamer/HoodieDeltaStreamerTestBase.java | 9 ++++++--
.../deltastreamer/TestHoodieDeltaStreamer.java | 24 +++++++++++++++++-----
5 files changed, 51 insertions(+), 14 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index 0214b0a1030..d64f2c34ded 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -17,6 +17,7 @@
package org.apache.hudi
+import org.apache.hudi.HoodieSparkUtils.injectSQLConf
import org.apache.hudi.client.WriteStatus
import org.apache.hudi.client.model.HoodieInternalRow
import org.apache.hudi.common.config.TypedProperties
@@ -40,11 +41,14 @@ import
org.apache.spark.sql.HoodieUnsafeUtils.getNumPartitions
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
import org.apache.spark.sql.catalyst.plans.logical.Project
+import org.apache.spark.sql.execution.SQLConfInjectingRDD
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeUtils, Row}
import org.apache.spark.unsafe.types.UTF8String
import scala.collection.JavaConverters.{asScalaBufferConverter,
seqAsJavaListConverter}
+import scala.reflect.ClassTag
object HoodieDatasetBulkInsertHelper
extends ParallelismHelper[DataFrame](toJavaSerializableFunctionUnchecked(df
=> getNumPartitions(df))) with Logging {
@@ -83,8 +87,8 @@ object HoodieDatasetBulkInsertHelper
val keyGeneratorClassName =
config.getStringOrThrow(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME,
"Key-generator class name is required")
- val prependedRdd: RDD[InternalRow] =
- df.queryExecution.toRdd.mapPartitions { iter =>
+ val prependedRdd: RDD[InternalRow] = {
+ injectSQLConf(df.queryExecution.toRdd.mapPartitions { iter =>
val typedProps = new TypedProperties(config.getProps)
if (autoGenerateRecordKeys) {
typedProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG,
String.valueOf(TaskContext.getPartitionId()))
@@ -110,7 +114,8 @@ object HoodieDatasetBulkInsertHelper
// TODO use mutable row, avoid re-allocating
new HoodieInternalRow(commitTimestamp, commitSeqNo, recordKey,
partitionPath, filename, row, false)
}
- }
+ }, SQLConf.get)
+ }
val dedupedRdd = if (config.shouldCombineBeforeInsert) {
dedupeRows(prependedRdd, updatedSchema, config.getPreCombineField,
SparkHoodieIndexFactory.isGlobalIndex(config), targetParallelism)
@@ -144,7 +149,7 @@ object HoodieDatasetBulkInsertHelper
arePartitionRecordsSorted: Boolean,
shouldPreserveHoodieMetadata: Boolean):
HoodieData[WriteStatus] = {
val schema = dataset.schema
- val writeStatuses = dataset.queryExecution.toRdd.mapPartitions(iter => {
+ val writeStatuses =
injectSQLConf(dataset.queryExecution.toRdd.mapPartitions(iter => {
val taskContextSupplier: TaskContextSupplier =
table.getTaskContextSupplier
val taskPartitionId = taskContextSupplier.getPartitionIdSupplier.get
val taskId = taskContextSupplier.getStageIdSupplier.get.toLong
@@ -189,7 +194,7 @@ object HoodieDatasetBulkInsertHelper
}
writer.getWriteStatuses.asScala.iterator
- }).collect()
+ }), SQLConf.get).collect()
table.getContext.parallelize(writeStatuses.toList.asJava)
}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index 975135c13d5..03d977f6fc9 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -128,7 +128,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with
SparkVersionsSupport wi
}, SQLConf.get)
}
- private def injectSQLConf[T: ClassTag](rdd: RDD[T], conf: SQLConf): RDD[T] =
+ def injectSQLConf[T: ClassTag](rdd: RDD[T], conf: SQLConf): RDD[T] =
new SQLConfInjectingRDD(rdd, conf)
def safeCreateRDD(df: DataFrame, structName: String, recordNamespace:
String, reconcileToLatestSchema: Boolean,
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index fe2583b87af..5a66901d473 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -91,6 +91,13 @@ import static
org.apache.hudi.common.util.ValidationUtils.checkState;
*/
public class HoodieTestDataGenerator implements AutoCloseable {
+ /**
+ * You may get a different result due to the upgrading of Spark 3.0: reading
dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z from Parquet
INT96 files can be ambiguous,
+ * as the files may be written by Spark 2.x or legacy versions of Hive,
which uses a legacy hybrid calendar that is different from Spark 3.0+s
Proleptic Gregorian calendar.
+ * See more details in SPARK-31404.
+ */
+ private boolean makeDatesAmbiguous = false;
+
// based on examination of sample file, the schema produces the following
per record size
public static final int BYTES_PER_RECORD = (int) (1.2 * 1024);
// with default bloom filter with 60,000 entries and 0.000000001 FPRate
@@ -208,6 +215,11 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
this(DEFAULT_PARTITION_PATHS);
}
+ public HoodieTestDataGenerator(boolean makeDatesAmbiguous) {
+ this();
+ this.makeDatesAmbiguous = makeDatesAmbiguous;
+ }
+
@Deprecated
public HoodieTestDataGenerator(String[] partitionPaths, Map<Integer,
KeyPartition> keyPartitionMap) {
// NOTE: This used as a workaround to make sure that new instantiations of
the generator
@@ -392,7 +404,8 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
rec.put("nation", ByteBuffer.wrap(bytes));
long randomMillis = genRandomTimeMillis(rand);
Instant instant = Instant.ofEpochMilli(randomMillis);
- rec.put("current_date", (int) LocalDateTime.ofInstant(instant,
ZoneOffset.UTC).toLocalDate().toEpochDay());
+ rec.put("current_date", makeDatesAmbiguous ? -1000000 :
+ (int) LocalDateTime.ofInstant(instant,
ZoneOffset.UTC).toLocalDate().toEpochDay());
rec.put("current_ts", randomMillis);
BigDecimal bigDecimal = new BigDecimal(String.format(Locale.ENGLISH,
"%5f", rand.nextFloat()));
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index 864c3502825..8f9fc6f1c50 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -312,9 +312,14 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
}
protected static HoodieTestDataGenerator prepareParquetDFSFiles(int
numRecords, String baseParquetPath, String fileName, boolean useCustomSchema,
- String
schemaStr, Schema schema) throws IOException {
+ String
schemaStr, Schema schema) throws IOException {
+ return prepareParquetDFSFiles(numRecords, baseParquetPath, fileName,
useCustomSchema, schemaStr, schema, false);
+ }
+
+ protected static HoodieTestDataGenerator prepareParquetDFSFiles(int
numRecords, String baseParquetPath, String fileName, boolean useCustomSchema,
+ String
schemaStr, Schema schema, boolean makeDatesAmbiguous) throws IOException {
String path = baseParquetPath + "/" + fileName;
- HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+ HoodieTestDataGenerator dataGenerator = new
HoodieTestDataGenerator(makeDatesAmbiguous);
if (useCustomSchema) {
Helpers.saveParquetToDFS(Helpers.toGenericRecords(
dataGenerator.generateInsertsAsPerSchema("000", numRecords,
schemaStr),
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 67f6d2736f5..108d1cc8a87 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -1403,20 +1403,34 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
@Test
public void testBulkInsertRowWriterContinuousModeWithAsyncClustering()
throws Exception {
testBulkInsertRowWriterContinuousMode(false, null, false,
- getTableServicesConfigs(2000, "false", "", "", "true", "3"));
+ getTableServicesConfigs(2000, "false", "", "", "true", "3"), false);
}
@Test
public void testBulkInsertRowWriterContinuousModeWithInlineClustering()
throws Exception {
testBulkInsertRowWriterContinuousMode(false, null, false,
- getTableServicesConfigs(2000, "false", "true", "3", "false", ""));
+ getTableServicesConfigs(2000, "false", "true", "3", "false", ""),
false);
}
- private void testBulkInsertRowWriterContinuousMode(Boolean
useSchemaProvider, List<String> transformerClassNames, boolean testEmptyBatch,
List<String> customConfigs) throws Exception {
+ @Test
+ public void
testBulkInsertRowWriterContinuousModeWithInlineClusteringAmbiguousDates()
throws Exception {
+
sparkSession.sqlContext().setConf("spark.sql.parquet.datetimeRebaseModeInWrite",
"LEGACY");
+
sparkSession.sqlContext().setConf("spark.sql.avro.datetimeRebaseModeInWrite",
"LEGACY");
+
sparkSession.sqlContext().setConf("spark.sql.parquet.int96RebaseModeInWrite",
"LEGACY");
+
sparkSession.sqlContext().setConf("spark.sql.parquet.datetimeRebaseModeInRead",
"LEGACY");
+
sparkSession.sqlContext().setConf("spark.sql.avro.datetimeRebaseModeInRead",
"LEGACY");
+
sparkSession.sqlContext().setConf("spark.sql.parquet.int96RebaseModeInRead",
"LEGACY");
+ testBulkInsertRowWriterContinuousMode(false, null, false,
+ getTableServicesConfigs(2000, "false", "true", "3",
+ "false", ""), true);
+ }
+
+ private void testBulkInsertRowWriterContinuousMode(Boolean
useSchemaProvider, List<String> transformerClassNames,
+ boolean testEmptyBatch,
List<String> customConfigs, boolean makeDatesAmbiguous) throws Exception {
PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
int parquetRecordsCount = 100;
boolean hasTransformer = transformerClassNames != null &&
!transformerClassNames.isEmpty();
- prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT,
FIRST_PARQUET_FILE_NAME, false, null, null);
+ prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT,
FIRST_PARQUET_FILE_NAME, false, null, null, makeDatesAmbiguous);
prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc",
"target.avsc", PROPS_FILENAME_TEST_PARQUET,
PARQUET_SOURCE_ROOT, false, "partition_path", testEmptyBatch ? "1" :
"");
@@ -1426,7 +1440,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
int counter = 2;
while (counter < 100) { // lets keep going. if the test times out, we
will cancel the future within finally. So, safe to generate 100 batches.
LOG.info("Generating data for batch " + counter);
- prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT,
Integer.toString(counter) + ".parquet", false, null, null);
+ prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT,
Integer.toString(counter) + ".parquet", false, null, null, makeDatesAmbiguous);
counter++;
Thread.sleep(2000);
}