This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d429169 [HUDI-1688]hudi write should uncache rdd, when the write
operation is finnished (#2673)
d429169 is described below
commit d429169ff7ef66dfcd6825b74fcbceddb46369a5
Author: xiarixiaoyao <[email protected]>
AuthorDate: Fri Mar 19 01:19:18 2021 +0800
[HUDI-1688]hudi write should uncache rdd, when the write operation is
finnished (#2673)
---
.../scala/org/apache/hudi/HoodieSparkSqlWriter.scala | 17 +++++++++++++++++
1 file changed, 17 insertions(+)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 77a14ac..94d07b9 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -221,6 +221,23 @@ private[hudi] object HoodieSparkSqlWriter {
val (writeSuccessful, compactionInstant) =
commitAndPerformPostOperations(writeResult, parameters, writeClient,
tableConfig, jsc,
TableInstantInfo(basePath, instantTime, commitActionType, operation))
+
+ def unpersistRdd(rdd: RDD[_]): Unit = {
+ if (sparkContext.getPersistentRDDs.contains(rdd.id)) {
+ try {
+ rdd.unpersist()
+ } catch {
+ case t: Exception => log.warn("Got excepting trying to unpersist
rdd", t)
+ }
+ }
+ val parentRdds = rdd.dependencies.map(_.rdd)
+ parentRdds.foreach { parentRdd =>
+ unpersistRdd(parentRdd)
+ }
+ }
+ // it's safe to unpersist cached rdds here
+ unpersistRdd(writeResult.getWriteStatuses.rdd)
+
(writeSuccessful, common.util.Option.ofNullable(instantTime),
compactionInstant, writeClient, tableConfig)
}
}