This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.13.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 571c0f29c9ec55c23478d801c21a0fa4f19b2463 Author: Sivabalan Narayanan <[email protected]> AuthorDate: Mon Jan 30 20:37:10 2023 -0800 [HUDI-5655] Closing write client for spark ds writer in all cases (including exception) (#7799) Looks like we miss to close the writeClient on some of the failure cases while writing via spark-ds and spark-sql writes. --- .../org/apache/hudi/HoodieSparkSqlWriter.scala | 23 ++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 7e234775faa..304a1303a3b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -376,12 +376,22 @@ object HoodieSparkSqlWriter { } // Check for errors and commit the write. - val (writeSuccessful, compactionInstant, clusteringInstant) = - commitAndPerformPostOperations(sqlContext.sparkSession, df.schema, - writeResult, parameters, writeClient, tableConfig, jsc, - TableInstantInfo(basePath, instantTime, commitActionType, operation), extraPreCommitFn) + try { + val (writeSuccessful, compactionInstant, clusteringInstant) = + commitAndPerformPostOperations(sqlContext.sparkSession, df.schema, + writeResult, parameters, writeClient, tableConfig, jsc, + TableInstantInfo(basePath, instantTime, commitActionType, operation), extraPreCommitFn) - (writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, clusteringInstant, writeClient, tableConfig) + (writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, clusteringInstant, writeClient, tableConfig) + } finally { + // close the write client in all cases + val asyncCompactionEnabled = isAsyncCompactionEnabled(writeClient, tableConfig, parameters, jsc.hadoopConfiguration()) + val asyncClusteringEnabled = isAsyncClusteringEnabled(writeClient, parameters) + if (!asyncCompactionEnabled && !asyncClusteringEnabled) { + log.info("Closing write client") + writeClient.close() + } + } } } @@ -959,9 +969,6 @@ object HoodieSparkSqlWriter { tableInstantInfo.basePath, schema) log.info(s"Is Async Compaction Enabled ? $asyncCompactionEnabled") - if (!asyncCompactionEnabled && !asyncClusteringEnabled) { - client.close() - } (commitSuccess && metaSyncSuccess, compactionInstant, clusteringInstant) } else { log.error(s"${tableInstantInfo.operation} failed with errors")
