This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0ccd621b258 [HUDI-6980] Fixing closing of write client on failure
scenarios (#10224)
0ccd621b258 is described below
commit 0ccd621b2582e3d40811dd8b803f072747ffa5c9
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Mon Dec 4 20:20:34 2023 -0800
[HUDI-6980] Fixing closing of write client on failure scenarios (#10224)
---
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 33 ++++++++++++++--------
.../timeline/service/handlers/MarkerHandler.java | 4 +--
2 files changed, 24 insertions(+), 13 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 7c4ec8a71e7..bab0448642c 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -365,7 +365,7 @@ class HoodieSparkSqlWriterInternal {
}
}
- val (writeResult, writeClient: SparkRDDWriteClient[_]) =
+ val (writeResult: HoodieWriteResult, writeClient:
SparkRDDWriteClient[_]) =
operation match {
case WriteOperationType.DELETE | WriteOperationType.DELETE_PREPPED =>
mayBeValidateParamsForAutoGenerationOfRecordKeys(parameters,
hoodieConfig)
@@ -509,9 +509,16 @@ class HoodieSparkSqlWriterInternal {
hoodieRecords
}
client.startCommitWithTime(instantTime, commitActionType)
- val writeResult = DataSourceUtils.doWriteOperation(client,
dedupedHoodieRecords, instantTime, operation,
- preppedSparkSqlWrites || preppedWriteOperation)
- (writeResult, client)
+ try {
+ val writeResult = DataSourceUtils.doWriteOperation(client,
dedupedHoodieRecords, instantTime, operation,
+ preppedSparkSqlWrites || preppedWriteOperation)
+ (writeResult, client)
+ } catch {
+ case e: HoodieException =>
+ // close the write client in all cases
+ handleWriteClientClosure(client, tableConfig, parameters,
jsc.hadoopConfiguration())
+ throw e
+ }
}
// Check for errors and commit the write.
@@ -524,17 +531,21 @@ class HoodieSparkSqlWriterInternal {
(writeSuccessful, common.util.Option.ofNullable(instantTime),
compactionInstant, clusteringInstant, writeClient, tableConfig)
} finally {
- // close the write client in all cases
- val asyncCompactionEnabled = isAsyncCompactionEnabled(writeClient,
tableConfig, parameters, jsc.hadoopConfiguration())
- val asyncClusteringEnabled = isAsyncClusteringEnabled(writeClient,
parameters)
- if (!asyncCompactionEnabled && !asyncClusteringEnabled) {
- log.info("Closing write client")
- writeClient.close()
- }
+ handleWriteClientClosure(writeClient, tableConfig, parameters,
jsc.hadoopConfiguration())
}
}
}
+ private def handleWriteClientClosure(writeClient: SparkRDDWriteClient[_],
tableConfig : HoodieTableConfig, parameters: Map[String, String],
configuration: Configuration): Unit = {
+ // close the write client in all cases
+ val asyncCompactionEnabled = isAsyncCompactionEnabled(writeClient,
tableConfig, parameters, configuration)
+ val asyncClusteringEnabled = isAsyncClusteringEnabled(writeClient,
parameters)
+ if (!asyncCompactionEnabled && !asyncClusteringEnabled) {
+ log.warn("Closing write client")
+ writeClient.close()
+ }
+ }
+
def deduceOperation(hoodieConfig: HoodieConfig, paramsWithoutDefaults :
Map[String, String], df: Dataset[Row]): WriteOperationType = {
var operation =
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
// TODO clean up
diff --git
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
index 390a4e2184f..42e2f40e629 100644
---
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
+++
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
@@ -126,8 +126,8 @@ public class MarkerHandler extends Handler {
if (dispatchingThreadFuture != null) {
dispatchingThreadFuture.cancel(true);
}
- dispatchingExecutorService.shutdown();
- batchingExecutorService.shutdown();
+ dispatchingExecutorService.shutdownNow();
+ batchingExecutorService.shutdownNow();
}
/**