This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit e2777a96f1b44c76d55cbde5f84e30e704081b88
Author: Rajesh Mahindra <[email protected]>
AuthorDate: Sun Apr 14 14:38:55 2024 -0700

    [HUDI-7606] Unpersist RDDs after table services, mainly compaction and 
clustering (#11000)
    
    ---------
    
    Co-authored-by: rmahindra123 <[email protected]>
---
 .../hudi/client/BaseHoodieTableServiceClient.java  | 12 ++++
 .../apache/hudi/client/BaseHoodieWriteClient.java  |  2 +-
 .../hudi/client/SparkRDDTableServiceClient.java    |  6 ++
 .../apache/hudi/client/SparkRDDWriteClient.java    | 21 +------
 .../hudi/client/utils/SparkReleaseResources.java   | 64 ++++++++++++++++++++++
 5 files changed, 85 insertions(+), 20 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index e408dc7a779..d6ec07b89d0 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -331,6 +331,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
       CompactHelpers.getInstance().completeInflightCompaction(table, 
compactionCommitTime, metadata);
     } finally {
       this.txnManager.endTransaction(Option.of(compactionInstant));
+      releaseResources(compactionCommitTime);
     }
     WriteMarkersFactory.get(config.getMarkersType(), table, 
compactionCommitTime)
         .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
@@ -391,6 +392,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
       CompactHelpers.getInstance().completeInflightLogCompaction(table, 
logCompactionCommitTime, metadata);
     } finally {
       this.txnManager.endTransaction(Option.of(logCompactionInstant));
+      releaseResources(logCompactionCommitTime);
     }
     WriteMarkersFactory.get(config.getMarkersType(), table, 
logCompactionCommitTime)
         .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
@@ -520,6 +522,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
       throw new HoodieClusteringException("unable to transition clustering 
inflight to complete: " + clusteringCommitTime, e);
     } finally {
       this.txnManager.endTransaction(Option.of(clusteringInstant));
+      releaseResources(clusteringCommitTime);
     }
     WriteMarkersFactory.get(config.getMarkersType(), table, 
clusteringCommitTime)
         .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
@@ -759,6 +762,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
           + " Earliest Retained Instant :" + 
metadata.getEarliestCommitToRetain()
           + " cleanerElapsedMs" + durationMs);
     }
+    releaseResources(cleanInstantTime);
     return metadata;
   }
 
@@ -1133,4 +1137,12 @@ public abstract class BaseHoodieTableServiceClient<I, T, 
O> extends BaseHoodieCl
       }
     }
   }
+
+  /**
+   * Called after each commit of a compaction or clustering table service,
+   * to release any resources used.
+   */
+  protected void releaseResources(String instantTime) {
+    // do nothing here
+  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index d5d74e94673..fdc9eeca90d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -237,11 +237,11 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
       commit(table, commitActionType, instantTime, metadata, stats, 
writeStatuses);
       postCommit(table, metadata, instantTime, extraMetadata);
       LOG.info("Committed " + instantTime);
-      releaseResources(instantTime);
     } catch (IOException e) {
       throw new HoodieCommitException("Failed to complete commit " + 
config.getBasePath() + " at time " + instantTime, e);
     } finally {
       this.txnManager.endTransaction(Option.of(inflightInstant));
+      releaseResources(instantTime);
     }
 
     // trigger clean and archival.
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java
index 54d91fae3cf..98914be7496 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java
@@ -21,6 +21,7 @@ package org.apache.hudi.client;
 import org.apache.hudi.avro.model.HoodieClusteringGroup;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
+import org.apache.hudi.client.utils.SparkReleaseResources;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -73,4 +74,9 @@ public class SparkRDDTableServiceClient<T> extends 
BaseHoodieTableServiceClient<
   protected HoodieTable<?, HoodieData<HoodieRecord<T>>, ?, 
HoodieData<WriteStatus>> createTable(HoodieWriteConfig config, Configuration 
hadoopConf) {
     return HoodieSparkTable.create(config, context);
   }
+
+  @Override
+  protected void releaseResources(String instantTime) {
+    SparkReleaseResources.releaseCachedData(context, config, basePath, 
instantTime);
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 4ec886e1edb..0302c573db6 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -21,8 +21,8 @@ package org.apache.hudi.client;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.client.utils.CommitMetadataUtils;
+import org.apache.hudi.client.utils.SparkReleaseResources;
 import org.apache.hudi.common.data.HoodieData;
-import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -40,7 +40,6 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.SparkHoodieIndexFactory;
-import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
 import org.apache.hudi.metrics.DistributedRegistry;
@@ -58,7 +57,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.function.BiConsumer;
@@ -334,21 +332,6 @@ public class SparkRDDWriteClient<T> extends
 
   @Override
   protected void releaseResources(String instantTime) {
-    // If we do not explicitly release the resource, spark will automatically 
manage the resource and clean it up automatically
-    // see: 
https://spark.apache.org/docs/latest/rdd-programming-guide.html#removing-data
-    if (config.areReleaseResourceEnabled()) {
-      HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext) 
context;
-      Map<Integer, JavaRDD<?>> allCachedRdds = 
sparkEngineContext.getJavaSparkContext().getPersistentRDDs();
-      List<Integer> allDataIds = new 
ArrayList<>(sparkEngineContext.removeCachedDataIds(HoodieDataCacheKey.of(basePath,
 instantTime)));
-      if (config.isMetadataTableEnabled()) {
-        String metadataTableBasePath = 
HoodieTableMetadata.getMetadataTableBasePath(basePath);
-        
allDataIds.addAll(sparkEngineContext.removeCachedDataIds(HoodieDataCacheKey.of(metadataTableBasePath,
 instantTime)));
-      }
-      for (int id : allDataIds) {
-        if (allCachedRdds.containsKey(id)) {
-          allCachedRdds.get(id).unpersist();
-        }
-      }
-    }
+    SparkReleaseResources.releaseCachedData(context, config, basePath, 
instantTime);
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkReleaseResources.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkReleaseResources.java
new file mode 100644
index 00000000000..a151a33cee9
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkReleaseResources.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+
+import org.apache.spark.api.java.JavaRDD;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class SparkReleaseResources {
+
+  /**
+   * Called after each write commit, compaction commit and clustering commit
+   * to unpersist all RDDs persisted or cached per table.
+   * @param context the relevant {@link HoodieEngineContext}
+   * @param config writer configs {@link HoodieWriteConfig}
+   * @param basePath table base path
+   * @param instantTime instant time for which the RDDs need to be unpersisted.
+   */
+  public static void releaseCachedData(HoodieEngineContext context,
+                                      HoodieWriteConfig config,
+                                      String basePath,
+                                      String instantTime) {
+    // If we do not explicitly release the resource, spark will automatically 
manage the resource and clean it up automatically
+    // see: 
https://spark.apache.org/docs/latest/rdd-programming-guide.html#removing-data
+    if (config.areReleaseResourceEnabled()) {
+      HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext) 
context;
+      Map<Integer, JavaRDD<?>> allCachedRdds = 
sparkEngineContext.getJavaSparkContext().getPersistentRDDs();
+      List<Integer> allDataIds = new 
ArrayList<>(sparkEngineContext.removeCachedDataIds(HoodieData.HoodieDataCacheKey.of(basePath,
 instantTime)));
+      if (config.isMetadataTableEnabled()) {
+        String metadataTableBasePath = 
HoodieTableMetadata.getMetadataTableBasePath(basePath);
+        
allDataIds.addAll(sparkEngineContext.removeCachedDataIds(HoodieData.HoodieDataCacheKey.of(metadataTableBasePath,
 instantTime)));
+      }
+      for (int id : allDataIds) {
+        if (allCachedRdds.containsKey(id)) {
+          allCachedRdds.get(id).unpersist();
+        }
+      }
+    }
+  }
+}

Reply via email to