This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 48952ae5dfd [HUDI-7606] Unpersist RDDs after table services, mainly
compaction and clustering (#11000)
48952ae5dfd is described below
commit 48952ae5dfd82f84cc338886e653cf1412e8cda8
Author: Rajesh Mahindra <[email protected]>
AuthorDate: Sun Apr 14 14:38:55 2024 -0700
[HUDI-7606] Unpersist RDDs after table services, mainly compaction and
clustering (#11000)
---------
Co-authored-by: rmahindra123 <[email protected]>
---
.../hudi/client/BaseHoodieTableServiceClient.java | 12 ++++
.../apache/hudi/client/BaseHoodieWriteClient.java | 2 +-
.../hudi/client/SparkRDDTableServiceClient.java | 6 ++
.../apache/hudi/client/SparkRDDWriteClient.java | 21 +------
.../hudi/client/utils/SparkReleaseResources.java | 64 ++++++++++++++++++++++
5 files changed, 85 insertions(+), 20 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index 22f1a9995bd..228eaf4d554 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -333,6 +333,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
CompactHelpers.getInstance().completeInflightCompaction(table,
compactionCommitTime, metadata);
} finally {
this.txnManager.endTransaction(Option.of(compactionInstant));
+ releaseResources(compactionCommitTime);
}
WriteMarkersFactory.get(config.getMarkersType(), table,
compactionCommitTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
@@ -393,6 +394,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
CompactHelpers.getInstance().completeInflightLogCompaction(table,
logCompactionCommitTime, metadata);
} finally {
this.txnManager.endTransaction(Option.of(logCompactionInstant));
+ releaseResources(logCompactionCommitTime);
}
WriteMarkersFactory.get(config.getMarkersType(), table,
logCompactionCommitTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
@@ -534,6 +536,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
throw new HoodieClusteringException("unable to transition clustering
inflight to complete: " + clusteringCommitTime, e);
} finally {
this.txnManager.endTransaction(Option.of(clusteringInstant));
+ releaseResources(clusteringCommitTime);
}
WriteMarkersFactory.get(config.getMarkersType(), table,
clusteringCommitTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
@@ -779,6 +782,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
+ " Earliest Retained Instant :" +
metadata.getEarliestCommitToRetain()
+ " cleanerElapsedMs" + durationMs);
}
+ releaseResources(cleanInstantTime);
return metadata;
}
@@ -1171,4 +1175,12 @@ public abstract class BaseHoodieTableServiceClient<I, T,
O> extends BaseHoodieCl
}
}
}
+
+ /**
+ * Called after each commit of a compaction or clustering table service,
+ * to release any resources used.
+ */
+ protected void releaseResources(String instantTime) {
+ // do nothing here
+ }
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 9ade694d340..2b9bc83091b 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -236,11 +236,11 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
commit(table, commitActionType, instantTime, metadata, stats,
writeStatuses);
postCommit(table, metadata, instantTime, extraMetadata);
LOG.info("Committed " + instantTime);
- releaseResources(instantTime);
} catch (IOException e) {
throw new HoodieCommitException("Failed to complete commit " +
config.getBasePath() + " at time " + instantTime, e);
} finally {
this.txnManager.endTransaction(Option.of(inflightInstant));
+ releaseResources(instantTime);
}
// trigger clean and archival.
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java
index 54d91fae3cf..98914be7496 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java
@@ -21,6 +21,7 @@ package org.apache.hudi.client;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
+import org.apache.hudi.client.utils.SparkReleaseResources;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
@@ -73,4 +74,9 @@ public class SparkRDDTableServiceClient<T> extends
BaseHoodieTableServiceClient<
protected HoodieTable<?, HoodieData<HoodieRecord<T>>, ?,
HoodieData<WriteStatus>> createTable(HoodieWriteConfig config, Configuration
hadoopConf) {
return HoodieSparkTable.create(config, context);
}
+
+ @Override
+ protected void releaseResources(String instantTime) {
+ SparkReleaseResources.releaseCachedData(context, config, basePath,
instantTime);
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 0308649dbf6..d5fc1bf411e 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -20,8 +20,8 @@ package org.apache.hudi.client;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
+import org.apache.hudi.client.utils.SparkReleaseResources;
import org.apache.hudi.common.data.HoodieData;
-import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -38,7 +38,6 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.SparkHoodieIndexFactory;
-import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.metrics.DistributedRegistry;
@@ -55,7 +54,6 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
@@ -328,21 +326,6 @@ public class SparkRDDWriteClient<T> extends
@Override
protected void releaseResources(String instantTime) {
- // If we do not explicitly release the resource, spark will automatically
manage the resource and clean it up automatically
- // see:
https://spark.apache.org/docs/latest/rdd-programming-guide.html#removing-data
- if (config.areReleaseResourceEnabled()) {
- HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)
context;
- Map<Integer, JavaRDD<?>> allCachedRdds =
sparkEngineContext.getJavaSparkContext().getPersistentRDDs();
- List<Integer> allDataIds = new
ArrayList<>(sparkEngineContext.removeCachedDataIds(HoodieDataCacheKey.of(basePath,
instantTime)));
- if (config.isMetadataTableEnabled()) {
- String metadataTableBasePath =
HoodieTableMetadata.getMetadataTableBasePath(basePath);
-
allDataIds.addAll(sparkEngineContext.removeCachedDataIds(HoodieDataCacheKey.of(metadataTableBasePath,
instantTime)));
- }
- for (int id : allDataIds) {
- if (allCachedRdds.containsKey(id)) {
- allCachedRdds.get(id).unpersist();
- }
- }
- }
+ SparkReleaseResources.releaseCachedData(context, config, basePath,
instantTime);
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkReleaseResources.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkReleaseResources.java
new file mode 100644
index 00000000000..a151a33cee9
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkReleaseResources.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+
+import org.apache.spark.api.java.JavaRDD;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class SparkReleaseResources {
+
+ /**
+ * Called after each write commit, compaction commit and clustering commit
+ * to unpersist all RDDs persisted or cached per table.
+ * @param context the relevant {@link HoodieEngineContext}
+ * @param config writer configs {@link HoodieWriteConfig}
+ * @param basePath table base path
+ * @param instantTime instant time for which the RDDs need to be unpersisted.
+ */
+ public static void releaseCachedData(HoodieEngineContext context,
+ HoodieWriteConfig config,
+ String basePath,
+ String instantTime) {
+ // If we do not explicitly release the resource, spark will automatically
manage the resource and clean it up automatically
+ // see:
https://spark.apache.org/docs/latest/rdd-programming-guide.html#removing-data
+ if (config.areReleaseResourceEnabled()) {
+ HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)
context;
+ Map<Integer, JavaRDD<?>> allCachedRdds =
sparkEngineContext.getJavaSparkContext().getPersistentRDDs();
+ List<Integer> allDataIds = new
ArrayList<>(sparkEngineContext.removeCachedDataIds(HoodieData.HoodieDataCacheKey.of(basePath,
instantTime)));
+ if (config.isMetadataTableEnabled()) {
+ String metadataTableBasePath =
HoodieTableMetadata.getMetadataTableBasePath(basePath);
+
allDataIds.addAll(sparkEngineContext.removeCachedDataIds(HoodieData.HoodieDataCacheKey.of(metadataTableBasePath,
instantTime)));
+ }
+ for (int id : allDataIds) {
+ if (allCachedRdds.containsKey(id)) {
+ allCachedRdds.get(id).unpersist();
+ }
+ }
+ }
+ }
+}