This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit e2777a96f1b44c76d55cbde5f84e30e704081b88 Author: Rajesh Mahindra <[email protected]> AuthorDate: Sun Apr 14 14:38:55 2024 -0700 [HUDI-7606] Unpersist RDDs after table services, mainly compaction and clustering (#11000) --------- Co-authored-by: rmahindra123 <[email protected]> --- .../hudi/client/BaseHoodieTableServiceClient.java | 12 ++++ .../apache/hudi/client/BaseHoodieWriteClient.java | 2 +- .../hudi/client/SparkRDDTableServiceClient.java | 6 ++ .../apache/hudi/client/SparkRDDWriteClient.java | 21 +------ .../hudi/client/utils/SparkReleaseResources.java | 64 ++++++++++++++++++++++ 5 files changed, 85 insertions(+), 20 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java index e408dc7a779..d6ec07b89d0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java @@ -331,6 +331,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> extends BaseHoodieCl CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata); } finally { this.txnManager.endTransaction(Option.of(compactionInstant)); + releaseResources(compactionCommitTime); } WriteMarkersFactory.get(config.getMarkersType(), table, compactionCommitTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); @@ -391,6 +392,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> extends BaseHoodieCl CompactHelpers.getInstance().completeInflightLogCompaction(table, logCompactionCommitTime, metadata); } finally { this.txnManager.endTransaction(Option.of(logCompactionInstant)); + releaseResources(logCompactionCommitTime); } WriteMarkersFactory.get(config.getMarkersType(), table, logCompactionCommitTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); @@ -520,6 +522,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> extends BaseHoodieCl throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, e); } finally { this.txnManager.endTransaction(Option.of(clusteringInstant)); + releaseResources(clusteringCommitTime); } WriteMarkersFactory.get(config.getMarkersType(), table, clusteringCommitTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); @@ -759,6 +762,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> extends BaseHoodieCl + " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain() + " cleanerElapsedMs" + durationMs); } + releaseResources(cleanInstantTime); return metadata; } @@ -1133,4 +1137,12 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> extends BaseHoodieCl } } } + + /** + * Called after each commit of a compaction or clustering table service, + * to release any resources used. + */ + protected void releaseResources(String instantTime) { + // do nothing here + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index d5d74e94673..fdc9eeca90d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -237,11 +237,11 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient commit(table, commitActionType, instantTime, metadata, stats, writeStatuses); postCommit(table, metadata, instantTime, extraMetadata); LOG.info("Committed " + instantTime); - releaseResources(instantTime); } catch (IOException e) { throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime, e); } finally { this.txnManager.endTransaction(Option.of(inflightInstant)); + releaseResources(instantTime); } // trigger clean and archival. diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java index 54d91fae3cf..98914be7496 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java @@ -21,6 +21,7 @@ package org.apache.hudi.client; import org.apache.hudi.avro.model.HoodieClusteringGroup; import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.client.embedded.EmbeddedTimelineService; +import org.apache.hudi.client.utils.SparkReleaseResources; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieRecord; @@ -73,4 +74,9 @@ public class SparkRDDTableServiceClient<T> extends BaseHoodieTableServiceClient< protected HoodieTable<?, HoodieData<HoodieRecord<T>>, ?, HoodieData<WriteStatus>> createTable(HoodieWriteConfig config, Configuration hadoopConf) { return HoodieSparkTable.create(config, context); } + + @Override + protected void releaseResources(String instantTime) { + SparkReleaseResources.releaseCachedData(context, config, basePath, instantTime); + } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 4ec886e1edb..0302c573db6 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -21,8 +21,8 @@ package org.apache.hudi.client; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.client.utils.CommitMetadataUtils; +import org.apache.hudi.client.utils.SparkReleaseResources; import org.apache.hudi.common.data.HoodieData; -import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -40,7 +40,6 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.SparkHoodieIndexFactory; -import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.metrics.DistributedRegistry; @@ -58,7 +57,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.function.BiConsumer; @@ -334,21 +332,6 @@ public class SparkRDDWriteClient<T> extends @Override protected void releaseResources(String instantTime) { - // If we do not explicitly release the resource, spark will automatically manage the resource and clean it up automatically - // see: https://spark.apache.org/docs/latest/rdd-programming-guide.html#removing-data - if (config.areReleaseResourceEnabled()) { - HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext) context; - Map<Integer, JavaRDD<?>> allCachedRdds = sparkEngineContext.getJavaSparkContext().getPersistentRDDs(); - List<Integer> allDataIds = new ArrayList<>(sparkEngineContext.removeCachedDataIds(HoodieDataCacheKey.of(basePath, instantTime))); - if (config.isMetadataTableEnabled()) { - String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath); - allDataIds.addAll(sparkEngineContext.removeCachedDataIds(HoodieDataCacheKey.of(metadataTableBasePath, instantTime))); - } - for (int id : allDataIds) { - if (allCachedRdds.containsKey(id)) { - allCachedRdds.get(id).unpersist(); - } - } - } + SparkReleaseResources.releaseCachedData(context, config, basePath, instantTime); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkReleaseResources.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkReleaseResources.java new file mode 100644 index 00000000000..a151a33cee9 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkReleaseResources.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.utils; + +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metadata.HoodieTableMetadata; + +import org.apache.spark.api.java.JavaRDD; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class SparkReleaseResources { + + /** + * Called after each write commit, compaction commit and clustering commit + * to unpersist all RDDs persisted or cached per table. + * @param context the relevant {@link HoodieEngineContext} + * @param config writer configs {@link HoodieWriteConfig} + * @param basePath table base path + * @param instantTime instant time for which the RDDs need to be unpersisted. + */ + public static void releaseCachedData(HoodieEngineContext context, + HoodieWriteConfig config, + String basePath, + String instantTime) { + // If we do not explicitly release the resource, spark will automatically manage the resource and clean it up automatically + // see: https://spark.apache.org/docs/latest/rdd-programming-guide.html#removing-data + if (config.areReleaseResourceEnabled()) { + HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext) context; + Map<Integer, JavaRDD<?>> allCachedRdds = sparkEngineContext.getJavaSparkContext().getPersistentRDDs(); + List<Integer> allDataIds = new ArrayList<>(sparkEngineContext.removeCachedDataIds(HoodieData.HoodieDataCacheKey.of(basePath, instantTime))); + if (config.isMetadataTableEnabled()) { + String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath); + allDataIds.addAll(sparkEngineContext.removeCachedDataIds(HoodieData.HoodieDataCacheKey.of(metadataTableBasePath, instantTime))); + } + for (int id : allDataIds) { + if (allCachedRdds.containsKey(id)) { + allCachedRdds.get(id).unpersist(); + } + } + } + } +}
