This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ee11b9c951c [HUDI-7489] Avoid collecting WriteStatus to driver in row
writer code path (#10836)
ee11b9c951c is described below
commit ee11b9c951c11c367629245cfc2f45d57bb289c4
Author: Jon Vexler <[email protected]>
AuthorDate: Mon Mar 11 17:25:41 2024 -0700
[HUDI-7489] Avoid collecting WriteStatus to driver in row writer code path
(#10836)
* get rid of collect in row writer clustering
* fix race condition
* add logging
---------
Co-authored-by: Jonathan Vexler <=>
---
.../index/bucket/ConsistentBucketIndexUtils.java | 11 ++-
.../hudi/HoodieDatasetBulkInsertHelper.scala | 89 +++++++++++-----------
2 files changed, 55 insertions(+), 45 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
index d22e4b21a5e..0e47d0a688a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
@@ -210,7 +210,16 @@ public class ConsistentBucketIndexUtils {
if (fs.exists(fullPath)) {
return;
}
- FileIOUtils.createFileInPath(fs, fullPath,
Option.of(getUTF8Bytes(StringUtils.EMPTY_STRING)));
+ //prevent exception from race condition. We are ok with the file being
created in another thread, so we should
+ // check for the marker after catching the exception and we don't need to
fail if the file exists
+ try {
+ FileIOUtils.createFileInPath(fs, fullPath,
Option.of(getUTF8Bytes(StringUtils.EMPTY_STRING)));
+ } catch (HoodieIOException e) {
+ if (!fs.exists(fullPath)) {
+ throw e;
+ }
+ LOG.warn("Failed to create marker but " + fullPath + " exists", e);
+ }
}
/***
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index d64f2c34ded..6df92860582 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -26,6 +26,7 @@ import org.apache.hudi.common.engine.TaskContextSupplier
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.util.ReflectionUtils
import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.data.HoodieJavaRDD
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.index.HoodieIndex.BucketIndexEngineType
import org.apache.hudi.index.{HoodieIndex, SparkHoodieIndexFactory}
@@ -149,53 +150,53 @@ object HoodieDatasetBulkInsertHelper
arePartitionRecordsSorted: Boolean,
shouldPreserveHoodieMetadata: Boolean):
HoodieData[WriteStatus] = {
val schema = dataset.schema
- val writeStatuses =
injectSQLConf(dataset.queryExecution.toRdd.mapPartitions(iter => {
- val taskContextSupplier: TaskContextSupplier =
table.getTaskContextSupplier
- val taskPartitionId = taskContextSupplier.getPartitionIdSupplier.get
- val taskId = taskContextSupplier.getStageIdSupplier.get.toLong
- val taskEpochId = taskContextSupplier.getAttemptIdSupplier.get
+ HoodieJavaRDD.of(
+ injectSQLConf(dataset.queryExecution.toRdd.mapPartitions(iter => {
+ val taskContextSupplier: TaskContextSupplier =
table.getTaskContextSupplier
+ val taskPartitionId = taskContextSupplier.getPartitionIdSupplier.get
+ val taskId = taskContextSupplier.getStageIdSupplier.get.toLong
+ val taskEpochId = taskContextSupplier.getAttemptIdSupplier.get
- val writer = writeConfig.getIndexType match {
- case HoodieIndex.IndexType.BUCKET if
writeConfig.getBucketIndexEngineType
- == BucketIndexEngineType.CONSISTENT_HASHING =>
- new ConsistentBucketBulkInsertDataInternalWriterHelper(
- table,
- writeConfig,
- instantTime,
- taskPartitionId,
- taskId,
- taskEpochId,
- schema,
- writeConfig.populateMetaFields,
- arePartitionRecordsSorted,
- shouldPreserveHoodieMetadata)
- case _ =>
- new BulkInsertDataInternalWriterHelper(
- table,
- writeConfig,
- instantTime,
- taskPartitionId,
- taskId,
- taskEpochId,
- schema,
- writeConfig.populateMetaFields,
- arePartitionRecordsSorted,
- shouldPreserveHoodieMetadata)
- }
+ val writer = writeConfig.getIndexType match {
+ case HoodieIndex.IndexType.BUCKET if
writeConfig.getBucketIndexEngineType
+ == BucketIndexEngineType.CONSISTENT_HASHING =>
+ new ConsistentBucketBulkInsertDataInternalWriterHelper(
+ table,
+ writeConfig,
+ instantTime,
+ taskPartitionId,
+ taskId,
+ taskEpochId,
+ schema,
+ writeConfig.populateMetaFields,
+ arePartitionRecordsSorted,
+ shouldPreserveHoodieMetadata)
+ case _ =>
+ new BulkInsertDataInternalWriterHelper(
+ table,
+ writeConfig,
+ instantTime,
+ taskPartitionId,
+ taskId,
+ taskEpochId,
+ schema,
+ writeConfig.populateMetaFields,
+ arePartitionRecordsSorted,
+ shouldPreserveHoodieMetadata)
+ }
- try {
- iter.foreach(writer.write)
- } catch {
- case t: Throwable =>
- writer.abort()
- throw t
- } finally {
- writer.close()
- }
+ try {
+ iter.foreach(writer.write)
+ } catch {
+ case t: Throwable =>
+ writer.abort()
+ throw t
+ } finally {
+ writer.close()
+ }
- writer.getWriteStatuses.asScala.iterator
- }), SQLConf.get).collect()
- table.getContext.parallelize(writeStatuses.toList.asJava)
+ writer.getWriteStatuses.asScala.iterator
+ }), SQLConf.get).toJavaRDD())
}
private def dedupeRows(rdd: RDD[InternalRow], schema: StructType,
preCombineFieldRef: String, isGlobalIndex: Boolean, targetParallelism: Int):
RDD[InternalRow] = {