This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new df4cca8aa56 [HUDI-7154] Fix NPE from empty batch with row writer
enabled in Hudi Streamer (#10198)
df4cca8aa56 is described below
commit df4cca8aa560d21bde1bf4c1a4079d3d2f760c6f
Author: Y Ethan Guo <[email protected]>
AuthorDate: Mon Dec 4 08:06:59 2023 -0800
[HUDI-7154] Fix NPE from empty batch with row writer enabled in Hudi
Streamer (#10198)
---------
Co-authored-by: sivabalan <[email protected]>
---
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 26 +++++++----
.../apache/hudi/utilities/streamer/StreamSync.java | 5 ++-
.../deltastreamer/TestHoodieDeltaStreamer.java | 51 ++++++++++++++++++----
3 files changed, 62 insertions(+), 20 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index b8dbb18287e..e925e2a5423 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -155,19 +155,27 @@ object HoodieSparkSqlWriter {
Metrics.shutdownAllMetrics()
}
- def getBulkInsertRowConfig(writerSchema: Schema, hoodieConfig: HoodieConfig,
+ def getBulkInsertRowConfig(writerSchema:
org.apache.hudi.common.util.Option[Schema], hoodieConfig: HoodieConfig,
basePath: String, tblName: String):
HoodieWriteConfig = {
- val writerSchemaStr = writerSchema.toString
-
+ var writerSchemaStr : String = null
+ if ( writerSchema.isPresent) {
+ writerSchemaStr = writerSchema.get().toString
+ }
// Make opts mutable since it could be modified by
tryOverrideParquetWriteLegacyFormatProperty
- val opts = mutable.Map() ++ hoodieConfig.getProps.toMap ++
- Map(HoodieWriteConfig.AVRO_SCHEMA_STRING.key -> writerSchemaStr)
+ val optsWithoutSchema = mutable.Map() ++ hoodieConfig.getProps.toMap
+ val opts = if (writerSchema.isPresent) {
+ optsWithoutSchema ++ Map(HoodieWriteConfig.AVRO_SCHEMA_STRING.key ->
writerSchemaStr)
+ } else {
+ optsWithoutSchema
+ }
+
+ if (writerSchema.isPresent) {
+ // Auto set the value of "hoodie.parquet.writelegacyformat.enabled"
+ tryOverrideParquetWriteLegacyFormatProperty(opts,
convertAvroSchemaToStructType(writerSchema.get))
+ }
- // Auto set the value of "hoodie.parquet.writelegacyformat.enabled"
- tryOverrideParquetWriteLegacyFormatProperty(opts,
convertAvroSchemaToStructType(writerSchema))
DataSourceUtils.createHoodieConfig(writerSchemaStr, basePath, tblName,
opts)
}
-
}
class HoodieSparkSqlWriterInternal {
@@ -779,7 +787,7 @@ class HoodieSparkSqlWriterInternal {
val sqlContext =
writeClient.getEngineContext.asInstanceOf[HoodieSparkEngineContext].getSqlContext
val jsc =
writeClient.getEngineContext.asInstanceOf[HoodieSparkEngineContext].getJavaSparkContext
- val writeConfig =
HoodieSparkSqlWriter.getBulkInsertRowConfig(writerSchema, hoodieConfig,
basePath.toString, tblName)
+ val writeConfig =
HoodieSparkSqlWriter.getBulkInsertRowConfig(org.apache.hudi.common.util.Option.of(writerSchema),
hoodieConfig, basePath.toString, tblName)
val overwriteOperationType =
Option(hoodieConfig.getString(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE))
.map(WriteOperationType.fromValue)
.orNull
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 19289e650c4..ff2debc8dcc 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -757,7 +757,8 @@ public class StreamSync implements Serializable, Closeable {
hoodieConfig.setValue(DataSourceWriteOptions.PAYLOAD_CLASS_NAME().key(),
cfg.payloadClassName);
hoodieConfig.setValue(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(),
HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName(props));
hoodieConfig.setValue("path", cfg.targetBasePath);
- return HoodieSparkSqlWriter.getBulkInsertRowConfig(writerSchema,
hoodieConfig, cfg.targetBasePath, cfg.targetTableName);
+ return HoodieSparkSqlWriter.getBulkInsertRowConfig(writerSchema !=
InputBatch.NULL_SCHEMA ? Option.of(writerSchema) : Option.empty(),
+ hoodieConfig, cfg.targetBasePath, cfg.targetTableName);
}
/**
@@ -899,7 +900,7 @@ public class StreamSync implements Serializable, Closeable {
instantTime = startCommit(instantTime, !autoGenerateRecordKeys);
if (useRowWriter) {
- Dataset<Row> df = (Dataset<Row>)
inputBatch.getBatch().orElse(hoodieSparkContext.emptyRDD());
+ Dataset<Row> df = (Dataset<Row>)
inputBatch.getBatch().orElse(hoodieSparkContext.getSqlContext().emptyDataFrame());
HoodieWriteConfig hoodieWriteConfig =
prepareHoodieConfigForRowWriter(inputBatch.getSchemaProvider().getTargetSchema());
BaseDatasetBulkInsertCommitActionExecutor executor = new
HoodieStreamerDatasetBulkInsertCommitActionExecutor(hoodieWriteConfig,
writeClient, instantTime);
writeClientWriteResult = new WriteClientWriteResult(executor.execute(df,
!HoodieStreamerUtils.getPartitionColumns(props).isEmpty()).getWriteStatuses());
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 62aa7328fbb..03208a0c0e5 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -47,6 +47,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
@@ -114,6 +115,7 @@ import org.apache.spark.sql.api.java.UDF4;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -232,6 +234,11 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
return new HoodieClusteringJob(jsc, scheduleClusteringConfig);
}
+ @AfterEach
+ public void perTestAfterEach() {
+ testNum++;
+ }
+
@Test
public void testProps() {
TypedProperties props =
@@ -1340,7 +1347,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
boolean hasTransformer = transformerClassNames != null &&
!transformerClassNames.isEmpty();
prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT,
FIRST_PARQUET_FILE_NAME, false, null, null);
prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc",
"target.avsc", PROPS_FILENAME_TEST_PARQUET,
- PARQUET_SOURCE_ROOT, false, "partition_path", testEmptyBatch ? "1" :
"");
+ PARQUET_SOURCE_ROOT, false, "partition_path", "");
String tableBasePath = basePath + "/test_parquet_table" + testNum;
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.BULK_INSERT, testEmptyBatch ?
TestParquetDFSSourceEmptyBatch.class.getName()
@@ -1351,27 +1358,34 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamer.sync();
assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
+ deltaStreamer.shutdownGracefully();
try {
if (testEmptyBatch) {
+ prepareParquetDFSSource(useSchemaProvider, hasTransformer,
"source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET,
+ PARQUET_SOURCE_ROOT, false, "partition_path", "0");
prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "2.parquet", false,
null, null);
deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamer.sync();
// since we mimic'ed empty batch, total records should be same as
first sync().
- assertRecordCount(200, tableBasePath, sqlContext);
+ assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build();
// validate table schema fetches valid schema from last but one commit.
TableSchemaResolver tableSchemaResolver = new
TableSchemaResolver(metaClient);
assertNotEquals(tableSchemaResolver.getTableAvroSchema(),
Schema.create(Schema.Type.NULL).toString());
+ // schema from latest commit and last but one commit should match
+ compareLatestTwoSchemas(metaClient);
+ prepareParquetDFSSource(useSchemaProvider, hasTransformer,
"source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET,
+ PARQUET_SOURCE_ROOT, false, "partition_path", "");
+ deltaStreamer.shutdownGracefully();
}
- int recordsSoFar = testEmptyBatch ? 200 : 100;
-
+ int recordsSoFar = 100;
+ deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
// add 3 more batches and ensure all commits succeed.
for (int i = 2; i < 5; i++) {
prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, Integer.toString(i) +
".parquet", false, null, null);
- deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamer.sync();
assertRecordCount(recordsSoFar + (i - 1) * 100, tableBasePath,
sqlContext);
if (i == 2 || i == 4) { // this validation reloads the timeline. So,
we are validating only for first and last batch.
@@ -1717,20 +1731,25 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
boolean hasTransformer = transformerClassNames != null &&
!transformerClassNames.isEmpty();
prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT,
FIRST_PARQUET_FILE_NAME, false, null, null);
prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc",
"target.avsc", PROPS_FILENAME_TEST_PARQUET,
- PARQUET_SOURCE_ROOT, false, "partition_path", testEmptyBatch ? "1" :
"");
+ PARQUET_SOURCE_ROOT, false, "partition_path", "");
String tableBasePath = basePath + "/test_parquet_table" + testNum;
- HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+ HoodieDeltaStreamer.Config cfg =
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT,
testEmptyBatch ? TestParquetDFSSourceEmptyBatch.class.getName()
: ParquetDFSSource.class.getName(),
transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false,
- useSchemaProvider, 100000, false, null, null, "timestamp", null),
jsc);
+ useSchemaProvider, 100000, false, null, null, "timestamp", null);
+ HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamer.sync();
assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
+ deltaStreamer.shutdownGracefully();
if (testEmptyBatch) {
prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "2.parquet", false,
null, null);
- deltaStreamer.sync();
+ prepareParquetDFSSource(useSchemaProvider, hasTransformer,
"source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET,
+ PARQUET_SOURCE_ROOT, false, "partition_path", "0");
+ HoodieDeltaStreamer deltaStreamer1 = new HoodieDeltaStreamer(cfg, jsc);
+ deltaStreamer1.sync();
// since we mimic'ed empty batch, total records should be same as first
sync().
assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build();
@@ -1738,6 +1757,11 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
// validate table schema fetches valid schema from last but one commit.
TableSchemaResolver tableSchemaResolver = new
TableSchemaResolver(metaClient);
assertNotEquals(tableSchemaResolver.getTableAvroSchema(),
Schema.create(Schema.Type.NULL).toString());
+ // schema from latest commit and last but one commit should match
+ compareLatestTwoSchemas(metaClient);
+ prepareParquetDFSSource(useSchemaProvider, hasTransformer,
"source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET,
+ PARQUET_SOURCE_ROOT, false, "partition_path", "");
+ deltaStreamer1.shutdownGracefully();
}
// proceed w/ non empty batch.
@@ -1751,6 +1775,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
.forEach(entry -> assertValidSchemaAndOperationTypeInCommitMetadata(
entry, metaClient, WriteOperationType.INSERT));
testNum++;
+ deltaStreamer.shutdownGracefully();
}
private void assertValidSchemaAndOperationTypeInCommitMetadata(HoodieInstant
instant,
@@ -1766,6 +1791,14 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
}
}
+ private void compareLatestTwoSchemas(HoodieTableMetaClient metaClient)
throws IOException {
+ // schema from latest commit and last but one commit should match
+ List<HoodieInstant> completedInstants =
metaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants().getInstants();
+ HoodieCommitMetadata commitMetadata1 =
TimelineUtils.getCommitMetadata(completedInstants.get(0),
metaClient.getActiveTimeline());
+ HoodieCommitMetadata commitMetadata2 =
TimelineUtils.getCommitMetadata(completedInstants.get(1),
metaClient.getActiveTimeline());
+ assertEquals(commitMetadata1.getMetadata(HoodieCommitMetadata.SCHEMA_KEY),
commitMetadata2.getMetadata(HoodieCommitMetadata.SCHEMA_KEY));
+ }
+
private void testORCDFSSource(boolean useSchemaProvider, List<String>
transformerClassNames) throws Exception {
// prepare ORCDFSSource
TypedProperties orcProps = new TypedProperties();