This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ccd621b258 [HUDI-6980] Fixing closing of write client on failure 
scenarios (#10224)
0ccd621b258 is described below

commit 0ccd621b2582e3d40811dd8b803f072747ffa5c9
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Mon Dec 4 20:20:34 2023 -0800

    [HUDI-6980] Fixing closing of write client on failure scenarios (#10224)
---
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 33 ++++++++++++++--------
 .../timeline/service/handlers/MarkerHandler.java   |  4 +--
 2 files changed, 24 insertions(+), 13 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 7c4ec8a71e7..bab0448642c 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -365,7 +365,7 @@ class HoodieSparkSqlWriterInternal {
         }
       }
 
-      val (writeResult, writeClient: SparkRDDWriteClient[_]) =
+      val (writeResult: HoodieWriteResult, writeClient: 
SparkRDDWriteClient[_]) =
         operation match {
           case WriteOperationType.DELETE | WriteOperationType.DELETE_PREPPED =>
             mayBeValidateParamsForAutoGenerationOfRecordKeys(parameters, 
hoodieConfig)
@@ -509,9 +509,16 @@ class HoodieSparkSqlWriterInternal {
                 hoodieRecords
               }
             client.startCommitWithTime(instantTime, commitActionType)
-            val writeResult = DataSourceUtils.doWriteOperation(client, 
dedupedHoodieRecords, instantTime, operation,
-              preppedSparkSqlWrites || preppedWriteOperation)
-            (writeResult, client)
+            try {
+              val writeResult = DataSourceUtils.doWriteOperation(client, 
dedupedHoodieRecords, instantTime, operation,
+                preppedSparkSqlWrites || preppedWriteOperation)
+              (writeResult, client)
+            } catch {
+              case e: HoodieException =>
+                // close the write client in all cases
+                handleWriteClientClosure(client, tableConfig, parameters, 
jsc.hadoopConfiguration())
+                throw e
+            }
         }
 
       // Check for errors and commit the write.
@@ -524,17 +531,21 @@ class HoodieSparkSqlWriterInternal {
 
         (writeSuccessful, common.util.Option.ofNullable(instantTime), 
compactionInstant, clusteringInstant, writeClient, tableConfig)
       } finally {
-        // close the write client in all cases
-        val asyncCompactionEnabled = isAsyncCompactionEnabled(writeClient, 
tableConfig, parameters, jsc.hadoopConfiguration())
-        val asyncClusteringEnabled = isAsyncClusteringEnabled(writeClient, 
parameters)
-        if (!asyncCompactionEnabled && !asyncClusteringEnabled) {
-          log.info("Closing write client")
-          writeClient.close()
-        }
+        handleWriteClientClosure(writeClient, tableConfig, parameters, 
jsc.hadoopConfiguration())
       }
     }
   }
 
+  private def handleWriteClientClosure(writeClient: SparkRDDWriteClient[_], 
tableConfig : HoodieTableConfig, parameters: Map[String, String], 
configuration: Configuration): Unit =  {
+    // close the write client in all cases
+    val asyncCompactionEnabled = isAsyncCompactionEnabled(writeClient, 
tableConfig, parameters, configuration)
+    val asyncClusteringEnabled = isAsyncClusteringEnabled(writeClient, 
parameters)
+    if (!asyncCompactionEnabled && !asyncClusteringEnabled) {
+      log.warn("Closing write client")
+      writeClient.close()
+    }
+  }
+
   def deduceOperation(hoodieConfig: HoodieConfig, paramsWithoutDefaults : 
Map[String, String], df: Dataset[Row]): WriteOperationType = {
     var operation = 
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
     // TODO clean up
diff --git 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
index 390a4e2184f..42e2f40e629 100644
--- 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
+++ 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
@@ -126,8 +126,8 @@ public class MarkerHandler extends Handler {
     if (dispatchingThreadFuture != null) {
       dispatchingThreadFuture.cancel(true);
     }
-    dispatchingExecutorService.shutdown();
-    batchingExecutorService.shutdown();
+    dispatchingExecutorService.shutdownNow();
+    batchingExecutorService.shutdownNow();
   }
 
   /**

Reply via email to