This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9906df48e7c [HUDI-5655] Closing write client for spark ds writer in
all cases (including exception) (#7799)
9906df48e7c is described below
commit 9906df48e7c285994d56e1e2d466372eee57c268
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Mon Jan 30 20:37:10 2023 -0800
[HUDI-5655] Closing write client for spark ds writer in all cases
(including exception) (#7799)
Looks like we miss to close the writeClient on some of the failure cases
while writing via spark-ds and spark-sql writes.
---
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 23 ++++++++++++++--------
1 file changed, 15 insertions(+), 8 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 7e234775faa..304a1303a3b 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -376,12 +376,22 @@ object HoodieSparkSqlWriter {
}
// Check for errors and commit the write.
- val (writeSuccessful, compactionInstant, clusteringInstant) =
- commitAndPerformPostOperations(sqlContext.sparkSession, df.schema,
- writeResult, parameters, writeClient, tableConfig, jsc,
- TableInstantInfo(basePath, instantTime, commitActionType,
operation), extraPreCommitFn)
+ try {
+ val (writeSuccessful, compactionInstant, clusteringInstant) =
+ commitAndPerformPostOperations(sqlContext.sparkSession, df.schema,
+ writeResult, parameters, writeClient, tableConfig, jsc,
+ TableInstantInfo(basePath, instantTime, commitActionType,
operation), extraPreCommitFn)
- (writeSuccessful, common.util.Option.ofNullable(instantTime),
compactionInstant, clusteringInstant, writeClient, tableConfig)
+ (writeSuccessful, common.util.Option.ofNullable(instantTime),
compactionInstant, clusteringInstant, writeClient, tableConfig)
+ } finally {
+ // close the write client in all cases
+ val asyncCompactionEnabled = isAsyncCompactionEnabled(writeClient,
tableConfig, parameters, jsc.hadoopConfiguration())
+ val asyncClusteringEnabled = isAsyncClusteringEnabled(writeClient,
parameters)
+ if (!asyncCompactionEnabled && !asyncClusteringEnabled) {
+ log.info("Closing write client")
+ writeClient.close()
+ }
+ }
}
}
@@ -959,9 +969,6 @@ object HoodieSparkSqlWriter {
tableInstantInfo.basePath, schema)
log.info(s"Is Async Compaction Enabled ? $asyncCompactionEnabled")
- if (!asyncCompactionEnabled && !asyncClusteringEnabled) {
- client.close()
- }
(commitSuccess && metaSyncSuccess, compactionInstant, clusteringInstant)
} else {
log.error(s"${tableInstantInfo.operation} failed with errors")