This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9906df48e7c [HUDI-5655] Closing write client for spark ds writer in 
all cases (including exception) (#7799)
9906df48e7c is described below

commit 9906df48e7c285994d56e1e2d466372eee57c268
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Mon Jan 30 20:37:10 2023 -0800

    [HUDI-5655] Closing write client for spark ds writer in all cases 
(including exception) (#7799)
    
    Looks like we miss to close the writeClient on some of the failure cases 
while writing via spark-ds and spark-sql writes.
---
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 23 ++++++++++++++--------
 1 file changed, 15 insertions(+), 8 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 7e234775faa..304a1303a3b 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -376,12 +376,22 @@ object HoodieSparkSqlWriter {
         }
 
       // Check for errors and commit the write.
-      val (writeSuccessful, compactionInstant, clusteringInstant) =
-        commitAndPerformPostOperations(sqlContext.sparkSession, df.schema,
-          writeResult, parameters, writeClient, tableConfig, jsc,
-          TableInstantInfo(basePath, instantTime, commitActionType, 
operation), extraPreCommitFn)
+      try {
+        val (writeSuccessful, compactionInstant, clusteringInstant) =
+          commitAndPerformPostOperations(sqlContext.sparkSession, df.schema,
+            writeResult, parameters, writeClient, tableConfig, jsc,
+            TableInstantInfo(basePath, instantTime, commitActionType, 
operation), extraPreCommitFn)
 
-      (writeSuccessful, common.util.Option.ofNullable(instantTime), 
compactionInstant, clusteringInstant, writeClient, tableConfig)
+        (writeSuccessful, common.util.Option.ofNullable(instantTime), 
compactionInstant, clusteringInstant, writeClient, tableConfig)
+      } finally {
+        // close the write client in all cases
+        val asyncCompactionEnabled = isAsyncCompactionEnabled(writeClient, 
tableConfig, parameters, jsc.hadoopConfiguration())
+        val asyncClusteringEnabled = isAsyncClusteringEnabled(writeClient, 
parameters)
+        if (!asyncCompactionEnabled && !asyncClusteringEnabled) {
+          log.info("Closing write client")
+          writeClient.close()
+        }
+      }
     }
   }
 
@@ -959,9 +969,6 @@ object HoodieSparkSqlWriter {
         tableInstantInfo.basePath, schema)
 
       log.info(s"Is Async Compaction Enabled ? $asyncCompactionEnabled")
-      if (!asyncCompactionEnabled && !asyncClusteringEnabled) {
-        client.close()
-      }
       (commitSuccess && metaSyncSuccess, compactionInstant, clusteringInstant)
     } else {
       log.error(s"${tableInstantInfo.operation} failed with errors")

Reply via email to