This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new dc8b078af68 [HUDI-5080] Unpersist only relevant RDDs instead of all 
(#7914)
dc8b078af68 is described below

commit dc8b078af68c328e6a1d20b67b1e1fdd2cd4d78b
Author: Shiyan Xu <[email protected]>
AuthorDate: Mon Feb 27 20:27:28 2023 -0600

    [HUDI-5080] Unpersist only relevant RDDs instead of all (#7914)
    
    Track persisted RDDs in `HoodieEngineContext` so that it can be used to 
filter which RDD to be unpersisted.
    
    Added `HoodieDataCacheKey` (consist of base path and instant time) to 
identify the RDD for tracking for multi-writer scenario.
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  |   4 +-
 .../metadata/HoodieBackedTableMetadataWriter.java  |   3 +-
 .../hudi/table/action/compact/HoodieCompactor.java |   2 +-
 .../compact/RunCompactionActionExecutor.java       |   4 +-
 .../client/common/HoodieFlinkEngineContext.java    |  16 +++
 .../HoodieFlinkMergeOnReadTableCompactor.java      |   3 +-
 .../client/common/HoodieJavaEngineContext.java     |  21 +++-
 .../HoodieJavaMergeOnReadTableCompactor.java       |   3 +-
 .../apache/hudi/client/SparkRDDWriteClient.java    |  13 ++-
 .../client/common/HoodieSparkEngineContext.java    |  36 +++++-
 .../java/org/apache/hudi/data/HoodieJavaRDD.java   |  16 ++-
 .../commit/BaseSparkCommitActionExecutor.java      |   7 +-
 .../HoodieSparkMergeOnReadTableCompactor.java      |   6 +-
 .../hudi/client/TestSparkRDDWriteClient.java       | 124 +++++++++++++++++++++
 .../common/TestHoodieSparkEngineContext.java       |  51 +++++++++
 .../functional/TestHoodieBackedMetadata.java       |  15 +--
 .../hudi/table/TestHoodieMergeOnReadTable.java     |  52 +--------
 .../TestHoodieSparkMergeOnReadTableCompaction.java |   2 +-
 .../org/apache/hudi/common/data/HoodieData.java    |  56 +++++++++-
 .../apache/hudi/common/data/HoodieListData.java    |  11 ++
 .../hudi/common/engine/HoodieEngineContext.java    |   6 +
 .../common/engine/HoodieLocalEngineContext.java    |  16 +++
 .../common/testutils/HoodieTestDataGenerator.java  |  20 ++++
 .../hudi/utilities/deltastreamer/DeltaSync.java    |   4 -
 24 files changed, 400 insertions(+), 91 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 9fac7a38b18..177e650c44c 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -235,7 +235,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
       commit(table, commitActionType, instantTime, metadata, stats);
       postCommit(table, metadata, instantTime, extraMetadata);
       LOG.info("Committed " + instantTime);
-      releaseResources();
+      releaseResources(instantTime);
     } catch (IOException e) {
       throw new HoodieCommitException("Failed to complete commit " + 
config.getBasePath() + " at time " + instantTime, e);
     } finally {
@@ -1238,7 +1238,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
   /**
    * Called after each write, to release any resources used.
    */
-  protected void releaseResources() {
+  protected void releaseResources(String instantTime) {
     // do nothing here
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 308a898ebda..987fe4d9dfc 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -299,7 +299,8 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
         .withFinalizeWriteParallelism(parallelism)
         .withAllowMultiWriteOnSameInstant(true)
         
.withKeyGenerator(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
-        
.withPopulateMetaFields(HoodieMetadataConfig.POPULATE_META_FIELDS.defaultValue());
+        
.withPopulateMetaFields(HoodieMetadataConfig.POPULATE_META_FIELDS.defaultValue())
+        .withReleaseResourceEnabled(writeConfig.areReleaseResourceEnabled());
 
     // RecordKey properties are needed for the metadata table records
     final Properties properties = new Properties();
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
index 0d18a68cbad..a22d68eb888 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
@@ -83,7 +83,7 @@ public abstract class HoodieCompactor<T, I, K, O> implements 
Serializable {
    *
    * @param writeStatus {@link HoodieData} of {@link WriteStatus}.
    */
-  public abstract void maybePersist(HoodieData<WriteStatus> writeStatus, 
HoodieWriteConfig config);
+  public abstract void maybePersist(HoodieData<WriteStatus> writeStatus, 
HoodieEngineContext context, HoodieWriteConfig config, String instantTime);
 
   /**
    * Execute compaction operations and report back status.
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
index c26e2a9ec51..3f86df7e3ad 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
@@ -29,8 +29,8 @@ import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CompactionUtils;
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.InternalSchemaCache;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCompactionException;
@@ -96,7 +96,7 @@ public class RunCompactionActionExecutor<T> extends
       HoodieData<WriteStatus> statuses = compactor.compact(
           context, compactionPlan, table, configCopy, instantTime, 
compactionHandler);
 
-      compactor.maybePersist(statuses, config);
+      compactor.maybePersist(statuses, context, config, instantTime);
       context.setJobStatus(this.getClass().getSimpleName(), "Preparing 
compaction metadata: " + config.getTableName());
       List<HoodieWriteStat> updateStatusMap = 
statuses.map(WriteStatus::getStat).collectAsList();
       HoodieCommitMetadata metadata = new HoodieCommitMetadata(true);
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
index c9136da6bb4..db5c6ebd296 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
@@ -23,6 +23,7 @@ import 
org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.data.HoodieAccumulator;
 import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
 import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
 import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.EngineProperty;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -165,6 +166,21 @@ public class HoodieFlinkEngineContext extends 
HoodieEngineContext {
     // no operation for now
   }
 
+  @Override
+  public void putCachedDataIds(HoodieDataCacheKey cacheKey, int... ids) {
+    // no operation for now
+  }
+
+  @Override
+  public List<Integer> getCachedDataIds(HoodieDataCacheKey cacheKey) {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public List<Integer> removeCachedDataIds(HoodieDataCacheKey cacheKey) {
+    return Collections.emptyList();
+  }
+
   /**
    * Override the flink context supplier to return constant write token.
    */
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
index 8b99ed815bd..3df829a7c9e 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.action.compact;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -55,7 +56,7 @@ public class HoodieFlinkMergeOnReadTableCompactor<T>
   }
 
   @Override
-  public void maybePersist(HoodieData<WriteStatus> writeStatus, 
HoodieWriteConfig config) {
+  public void maybePersist(HoodieData<WriteStatus> writeStatus, 
HoodieEngineContext context, HoodieWriteConfig config, String instantTime) {
     // No OP
   }
 }
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
index 2211c8a1030..6ab8e5ab029 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
@@ -18,12 +18,11 @@
 
 package org.apache.hudi.client.common;
 
-import org.apache.hadoop.conf.Configuration;
-
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.data.HoodieAccumulator;
 import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
 import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
 import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.EngineProperty;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -34,10 +33,11 @@ import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
 import org.apache.hudi.common.util.Option;
-
 import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
 
+import org.apache.hadoop.conf.Configuration;
+
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -146,4 +146,19 @@ public class HoodieJavaEngineContext extends 
HoodieEngineContext {
   public void setJobStatus(String activeModule, String activityDescription) {
     // no operation for now
   }
+
+  @Override
+  public void putCachedDataIds(HoodieDataCacheKey cacheKey, int... ids) {
+    // no operation for now
+  }
+
+  @Override
+  public List<Integer> getCachedDataIds(HoodieDataCacheKey cacheKey) {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public List<Integer> removeCachedDataIds(HoodieDataCacheKey cacheKey) {
+    return Collections.emptyList();
+  }
 }
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/compact/HoodieJavaMergeOnReadTableCompactor.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/compact/HoodieJavaMergeOnReadTableCompactor.java
index 97e26e7c41d..d8994ce02c3 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/compact/HoodieJavaMergeOnReadTableCompactor.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/compact/HoodieJavaMergeOnReadTableCompactor.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table.action.compact;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -53,7 +54,7 @@ public class HoodieJavaMergeOnReadTableCompactor<T>
   }
 
   @Override
-  public void maybePersist(HoodieData<WriteStatus> writeStatus, 
HoodieWriteConfig config) {
+  public void maybePersist(HoodieData<WriteStatus> writeStatus, 
HoodieEngineContext context, HoodieWriteConfig config, String instantTime) {
     // No OP
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 86f559260a7..ac3355d06fc 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -21,6 +21,7 @@ package org.apache.hudi.client;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
 import org.apache.hudi.common.metrics.Registry;
@@ -381,12 +382,18 @@ public class SparkRDDWriteClient<T> extends
   }
 
   @Override
-  protected void releaseResources() {
+  protected void releaseResources(String instantTime) {
     // If we do not explicitly release the resource, spark will automatically 
manage the resource and clean it up automatically
     // see: 
https://spark.apache.org/docs/latest/rdd-programming-guide.html#removing-data
     if (config.areReleaseResourceEnabled()) {
-      ((HoodieSparkEngineContext) 
context).getJavaSparkContext().getPersistentRDDs().values()
-          .forEach(JavaRDD::unpersist);
+      HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext) 
context;
+      Map<Integer, JavaRDD<?>> allCachedRdds = 
sparkEngineContext.getJavaSparkContext().getPersistentRDDs();
+      List<Integer> dataIds = 
sparkEngineContext.removeCachedDataIds(HoodieDataCacheKey.of(basePath, 
instantTime));
+      for (int id : dataIds) {
+        if (allCachedRdds.containsKey(id)) {
+          allCachedRdds.get(id).unpersist();
+        }
+      }
     }
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
index c97cb78d8c9..33509b66f11 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
@@ -22,6 +22,7 @@ import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.data.HoodieAccumulator;
 import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
 import org.apache.hudi.common.engine.EngineProperty;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.function.SerializableBiFunction;
@@ -41,8 +42,12 @@ import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.sql.SQLContext;
-import scala.Tuple2;
 
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -50,14 +55,18 @@ import java.util.Objects;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import scala.Tuple2;
+
 /**
  * A Spark engine implementation of HoodieEngineContext.
  */
+@ThreadSafe
 public class HoodieSparkEngineContext extends HoodieEngineContext {
 
   private static final Logger LOG = 
LogManager.getLogger(HoodieSparkEngineContext.class);
   private final JavaSparkContext javaSparkContext;
   private final SQLContext sqlContext;
+  private final Map<HoodieDataCacheKey, List<Integer>> cachedRddIds = new 
HashMap<>();
 
   public HoodieSparkEngineContext(JavaSparkContext jsc) {
     this(jsc, SQLContext.getOrCreate(jsc.sc()));
@@ -180,4 +189,29 @@ public class HoodieSparkEngineContext extends 
HoodieEngineContext {
   public void setJobStatus(String activeModule, String activityDescription) {
     javaSparkContext.setJobGroup(activeModule, activityDescription);
   }
+
+  @Override
+  public void putCachedDataIds(HoodieDataCacheKey cacheKey, int... ids) {
+    synchronized (cachedRddIds) {
+      cachedRddIds.putIfAbsent(cacheKey, new ArrayList<>());
+      for (int id : ids) {
+        cachedRddIds.get(cacheKey).add(id);
+      }
+    }
+  }
+
+  @Override
+  public List<Integer> getCachedDataIds(HoodieDataCacheKey cacheKey) {
+    synchronized (cachedRddIds) {
+      return cachedRddIds.getOrDefault(cacheKey, Collections.emptyList());
+    }
+  }
+
+  @Override
+  public List<Integer> removeCachedDataIds(HoodieDataCacheKey cacheKey) {
+    synchronized (cachedRddIds) {
+      List<Integer> removed = cachedRddIds.remove(cacheKey);
+      return removed == null ? Collections.emptyList() : removed;
+    }
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
index 6ed3a854962..a712ee0640e 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
@@ -22,18 +22,21 @@ package org.apache.hudi.data;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
 import org.apache.hudi.common.util.collection.MappingIterator;
 import org.apache.hudi.common.util.collection.Pair;
+
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.storage.StorageLevel;
-import scala.Tuple2;
 
 import java.util.Iterator;
 import java.util.List;
 
+import scala.Tuple2;
+
 /**
  * Holds a {@link JavaRDD} of objects.
  *
@@ -81,11 +84,22 @@ public class HoodieJavaRDD<T> implements HoodieData<T> {
     return ((HoodieJavaPairRDD<K, V>) hoodieData).get();
   }
 
+  @Override
+  public int getId() {
+    return rddData.id();
+  }
+
   @Override
   public void persist(String level) {
     rddData.persist(StorageLevel.fromString(level));
   }
 
+  @Override
+  public void persist(String level, HoodieEngineContext engineContext, 
HoodieDataCacheKey cacheKey) {
+    engineContext.putCachedDataIds(cacheKey, this.getId());
+    rddData.persist(StorageLevel.fromString(level));
+  }
+
   @Override
   public void unpersist() {
     rddData.unpersist();
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index 936ed78aa6a..78d6df51aae 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table.action.commit;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.utils.SparkValidatorUtils;
 import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileGroupId;
@@ -150,8 +151,8 @@ public abstract class BaseSparkCommitActionExecutor<T> 
extends
     // Cache the tagged records, so we don't end up computing both
     JavaRDD<HoodieRecord<T>> inputRDD = HoodieJavaRDD.getJavaRDD(inputRecords);
     if (inputRDD.getStorageLevel() == StorageLevel.NONE()) {
-      String writeStorageLevel = config.getTaggedRecordStorageLevel();
-      inputRDD.persist(StorageLevel.fromString(writeStorageLevel));
+      HoodieJavaRDD.of(inputRDD).persist(config.getTaggedRecordStorageLevel(),
+          context, HoodieDataCacheKey.of(config.getBasePath(), instantTime));
     } else {
       LOG.info("RDD PreppedRecords was persisted at: " + 
inputRDD.getStorageLevel());
     }
@@ -258,7 +259,7 @@ public abstract class BaseSparkCommitActionExecutor<T> 
extends
   protected HoodieData<WriteStatus> updateIndex(HoodieData<WriteStatus> 
writeStatuses, HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
     // cache writeStatusRDD before updating index, so that all actions before 
this are not triggered again for future
     // RDD actions that are performed after updating the index.
-    writeStatuses.persist(config.getString(WRITE_STATUS_STORAGE_LEVEL_VALUE));
+    writeStatuses.persist(config.getString(WRITE_STATUS_STORAGE_LEVEL_VALUE), 
context, HoodieDataCacheKey.of(config.getBasePath(), instantTime));
     Instant indexStartTime = Instant.now();
     // Update the index back
     HoodieData<WriteStatus> statuses = 
table.getIndex().updateLocation(writeStatuses, context, table, instantTime);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
index 8a3daaf4aac..cd9bf334f63 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
@@ -20,6 +20,8 @@ package org.apache.hudi.table.action.compact;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
+import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -52,7 +54,7 @@ public class HoodieSparkMergeOnReadTableCompactor<T>
   }
 
   @Override
-  public void maybePersist(HoodieData<WriteStatus> writeStatus, 
HoodieWriteConfig config) {
-    writeStatus.persist(config.getString(WRITE_STATUS_STORAGE_LEVEL_VALUE));
+  public void maybePersist(HoodieData<WriteStatus> writeStatus, 
HoodieEngineContext context, HoodieWriteConfig config, String instantTime) {
+    writeStatus.persist(config.getString(WRITE_STATUS_STORAGE_LEVEL_VALUE), 
context, HoodieDataCacheKey.of(config.getBasePath(), instantTime));
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java
new file mode 100644
index 00000000000..0d8eda4912d
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.getCommitTimeAtUTC;
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestSparkRDDWriteClient extends SparkClientFunctionalTestHarness {
+
+  static Stream<Arguments> 
testWriteClientReleaseResourcesShouldOnlyUnpersistRelevantRdds() {
+    return Stream.of(
+        Arguments.of(HoodieTableType.COPY_ON_WRITE, true),
+        Arguments.of(HoodieTableType.MERGE_ON_READ, true),
+        Arguments.of(HoodieTableType.COPY_ON_WRITE, false),
+        Arguments.of(HoodieTableType.MERGE_ON_READ, false)
+    );
+  }
+
+  @ParameterizedTest
+  @MethodSource
+  void 
testWriteClientReleaseResourcesShouldOnlyUnpersistRelevantRdds(HoodieTableType 
tableType, boolean shouldReleaseResource) throws IOException {
+    final HoodieTableMetaClient metaClient = getHoodieMetaClient(hadoopConf(), 
URI.create(basePath()).getPath(), tableType, new Properties());
+    final HoodieWriteConfig writeConfig = getConfigBuilder(true)
+        .withPath(metaClient.getBasePathV2().toString())
+        .withAutoCommit(false)
+        .withReleaseResourceEnabled(shouldReleaseResource)
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+        .build();
+    HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEED);
+
+    String instant0 = getCommitTimeAtUTC(0);
+    List<GenericRecord> extraRecords0 = dataGen.generateGenericRecords(10);
+    HoodieJavaRDD<GenericRecord> persistedRdd0 = 
HoodieJavaRDD.of(jsc().parallelize(extraRecords0, 2));
+    persistedRdd0.persist("MEMORY_AND_DISK", context(), 
HoodieDataCacheKey.of(writeConfig.getBasePath(), instant0));
+
+    String instant1 = getCommitTimeAtUTC(1);
+    List<GenericRecord> extraRecords1 = dataGen.generateGenericRecords(10);
+    HoodieJavaRDD<GenericRecord> persistedRdd1 = 
HoodieJavaRDD.of(jsc().parallelize(extraRecords1, 2));
+    persistedRdd1.persist("MEMORY_AND_DISK", context(), 
HoodieDataCacheKey.of(writeConfig.getBasePath(), instant1));
+
+    SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig);
+    List<HoodieRecord> records = dataGen.generateInserts(instant1, 10);
+    JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 2);
+    writeClient.startCommitWithTime(instant1);
+    List<WriteStatus> writeStatuses = writeClient.insert(writeRecords, 
instant1).collect();
+    assertNoWriteErrors(writeStatuses);
+    writeClient.commitStats(instant1, 
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+        Option.empty(), metaClient.getCommitActionType());
+    writeClient.close();
+
+    if (shouldReleaseResource) {
+      assertEquals(Collections.singletonList(persistedRdd0.getId()),
+          
context().getCachedDataIds(HoodieDataCacheKey.of(writeConfig.getBasePath(), 
instant0)),
+          "RDDs cached for " + instant0 + " should be retained.");
+      assertEquals(Collections.emptyList(),
+          
context().getCachedDataIds(HoodieDataCacheKey.of(writeConfig.getBasePath(), 
instant1)),
+          "RDDs cached for " + instant1 + " should be cleared.");
+      assertTrue(jsc().getPersistentRDDs().containsKey(persistedRdd0.getId()),
+          "RDDs cached for " + instant0 + " should be retained.");
+      assertFalse(jsc().getPersistentRDDs().containsKey(persistedRdd1.getId()),
+          "RDDs cached for " + instant1 + " should be cleared.");
+      assertFalse(jsc().getPersistentRDDs().containsKey(writeRecords.id()),
+          "RDDs cached for " + instant1 + " should be cleared.");
+    } else {
+      assertEquals(Collections.singletonList(persistedRdd0.getId()),
+          
context().getCachedDataIds(HoodieDataCacheKey.of(writeConfig.getBasePath(), 
instant0)),
+          "RDDs cached for " + instant0 + " should be retained.");
+      assertEquals(3,
+          
context().getCachedDataIds(HoodieDataCacheKey.of(writeConfig.getBasePath(), 
instant1)).size(),
+          "RDDs cached for " + instant1 + " should be retained.");
+      assertTrue(jsc().getPersistentRDDs().containsKey(persistedRdd0.getId()),
+          "RDDs cached for " + instant0 + " should be retained.");
+      assertTrue(jsc().getPersistentRDDs().containsKey(persistedRdd1.getId()),
+          "RDDs cached for " + instant1 + " should be retained.");
+      assertTrue(jsc().getPersistentRDDs().containsKey(writeRecords.id()),
+          "RDDs cached for " + instant1 + " should be retained.");
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/common/TestHoodieSparkEngineContext.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/common/TestHoodieSparkEngineContext.java
new file mode 100644
index 00000000000..e3d163f077f
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/common/TestHoodieSparkEngineContext.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.common;
+
+import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestHoodieSparkEngineContext extends SparkClientFunctionalTestHarness {
+
+  private HoodieSparkEngineContext context;
+
+  @BeforeEach
+  void setUp() {
+    context = new HoodieSparkEngineContext(jsc());
+  }
+
+  @Test
+  void testAddRemoveCachedDataIds() {
+    String basePath = "/tmp/foo";
+    String instantTime = "000";
+    context.putCachedDataIds(HoodieDataCacheKey.of(basePath, instantTime), 1, 
2, 3);
+    assertEquals(Arrays.asList(1, 2, 3), 
context.getCachedDataIds(HoodieDataCacheKey.of(basePath, instantTime)));
+    assertEquals(Arrays.asList(1, 2, 3), 
context.removeCachedDataIds(HoodieDataCacheKey.of(basePath, instantTime)));
+    assertTrue(context.getCachedDataIds(HoodieDataCacheKey.of(basePath, 
instantTime)).isEmpty());
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index e491d96cffe..6fe543488b8 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -153,6 +153,7 @@ import static 
org.apache.hudi.common.model.WriteOperationType.DELETE;
 import static org.apache.hudi.common.model.WriteOperationType.INSERT;
 import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.getNextCommitTime;
 import static 
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.METADATA_COMPACTION_TIME_SUFFIX;
 import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
 import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
@@ -1395,20 +1396,6 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     validateMetadata(testTable, emptyList(), true);
   }
 
-  /**
-   * Fetches next commit time in seconds from current one.
-   *
-   * @param curCommitTime current commit time.
-   * @return the next valid commit time.
-   */
-  private Long getNextCommitTime(long curCommitTime) {
-    if ((curCommitTime + 1) % 1000000000000L >= 60) { // max seconds is 60 and 
hence
-      return Long.parseLong(HoodieActiveTimeline.createNewInstantTime());
-    } else {
-      return curCommitTime + 1;
-    }
-  }
-
   @ParameterizedTest
   @MethodSource("tableTypeAndEnableOperationArgs")
   public void testMetadataBootstrapLargeCommitList(HoodieTableType tableType, 
boolean nonPartitionedDataset) throws Exception {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index 32d9a6488b0..37cd23451ee 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -64,7 +64,6 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
-import org.apache.spark.storage.StorageLevel;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -304,7 +303,7 @@ public class TestHoodieMergeOnReadTable extends 
SparkClientFunctionalTestHarness
       Dataset<Row> actual = HoodieClientTestUtils.read(jsc(), basePath(), 
sqlContext(), fs(), fullPartitionPaths);
       List<Row> rows = actual.collectAsList();
       assertEquals(updatedRecords.size(), rows.size());
-      for (Row row: rows) {
+      for (Row row : rows) {
         assertEquals(row.getAs(HoodieRecord.COMMIT_TIME_METADATA_FIELD), 
preserveCommitMeta ? newCommitTime : compactionInstantTime);
       }
     }
@@ -458,7 +457,7 @@ public class TestHoodieMergeOnReadTable extends 
SparkClientFunctionalTestHarness
       inserts = 0;
       int upserts = 0;
       List<WriteStatus> writeStatusList = statuses.collect();
-      for (WriteStatus ws: writeStatusList) {
+      for (WriteStatus ws : writeStatusList) {
         inserts += ws.getStat().getNumInserts();
         upserts += ws.getStat().getNumUpdateWrites();
       }
@@ -691,51 +690,4 @@ public class TestHoodieMergeOnReadTable extends 
SparkClientFunctionalTestHarness
       assertEquals(fewRecordsForDelete.size() - numRecordsInPartition, 
status.getTotalErrorRecords());
     }
   }
-
-  @Test
-  public void testReleaseResource() throws Exception {
-    HoodieWriteConfig.Builder builder = getConfigBuilder(true);
-    builder.withReleaseResourceEnabled(true);
-    builder.withAutoCommit(false);
-
-    setUp(builder.build().getProps());
-
-    /**
-     * Write 1 (test when RELEASE_RESOURCE_ENABLE is true)
-     */
-    try (SparkRDDWriteClient client = getHoodieWriteClient(builder.build())) {
-
-      String newCommitTime = "001";
-      client.startCommitWithTime(newCommitTime);
-
-      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
-      JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
-      writeRecords.persist(StorageLevel.MEMORY_AND_DISK());
-      List<WriteStatus> statuses = client.upsert(writeRecords, 
newCommitTime).collect();
-      assertNoWriteErrors(statuses);
-      client.commitStats(newCommitTime, 
statuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()), 
Option.empty(), metaClient.getCommitActionType());
-      assertEquals(spark().sparkContext().persistentRdds().size(), 0);
-    }
-
-    builder.withReleaseResourceEnabled(false);
-
-    /**
-     * Write 2 (test when RELEASE_RESOURCE_ENABLE is false)
-     */
-    try (SparkRDDWriteClient client = getHoodieWriteClient(builder.build())) {
-      String newCommitTime = "002";
-      client.startCommitWithTime(newCommitTime);
-
-      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
-      JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
-
-      writeRecords.persist(StorageLevel.MEMORY_AND_DISK());
-      List<WriteStatus> statuses = client.upsert(writeRecords, 
newCommitTime).collect();
-      assertNoWriteErrors(statuses);
-      client.commitStats(newCommitTime, 
statuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()), 
Option.empty(), metaClient.getCommitActionType());
-      assertTrue(spark().sparkContext().persistentRdds().size() > 0);
-    }
-
-  }
 }
-
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
index b50b0f90cca..d5657c155f1 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
@@ -197,4 +197,4 @@ public class TestHoodieSparkMergeOnReadTableCompaction 
extends SparkClientFuncti
     metaClient = HoodieTableMetaClient.reload(metaClient);
     return writeStatuses;
   }
-}
\ No newline at end of file
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java 
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
index 9ce0947883f..60820d5a0ce 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
@@ -19,6 +19,7 @@
 
 package org.apache.hudi.common.data;
 
+import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
 import org.apache.hudi.common.util.collection.Pair;
@@ -26,6 +27,7 @@ import org.apache.hudi.common.util.collection.Pair;
 import java.io.Serializable;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Objects;
 
 /**
  * An interface abstracting a container holding a collection of objects of 
type {@code T}
@@ -51,10 +53,24 @@ import java.util.List;
 public interface HoodieData<T> extends Serializable {
 
   /**
-   * Persists the data w/ provided {@code level} (if applicable)
+   * Get the {@link HoodieData}'s unique non-negative identifier. -1 indicates 
invalid id.
+   */
+  int getId();
+
+  /**
+   * Persists the data w/ provided {@code level} (if applicable).
+   *
+   * Use this method only when you call {@link #unpersist()} at some later 
point for the same {@link HoodieData}.
+   * Otherwise, use {@link #persist(String, HoodieEngineContext, 
HoodieDataCacheKey)} instead for auto-unpersist
+   * at the end of a client write operation.
    */
   void persist(String level);
 
+  /**
+   * Persists the data w/ provided {@code level} (if applicable), and cache 
the data's ids within the {@code engineContext}.
+   */
+  void persist(String level, HoodieEngineContext engineContext, 
HoodieDataCacheKey cacheKey);
+
   /**
    * Un-persists the data (if previously persisted)
    */
@@ -196,4 +212,42 @@ public interface HoodieData<T> extends Serializable {
         .reduceByKey((value1, value2) -> value1, parallelism)
         .values();
   }
+
+  /**
+   * The key used in a caching map to identify a {@link HoodieData}.
+   *
+   * At the end of a write operation, we manually unpersist the {@link 
HoodieData} associated with that writer.
+   * Therefore, in multi-writer scenario, we need to use both {@code basePath} 
and {@code instantTime} to identify {@link HoodieData}s.
+   */
+  class HoodieDataCacheKey implements Serializable {
+
+    public static HoodieDataCacheKey of(String basePath, String instantTime) {
+      return new HoodieDataCacheKey(basePath, instantTime);
+    }
+
+    private final String basePath;
+    private final String instantTime;
+
+    private HoodieDataCacheKey(String basePath, String instantTime) {
+      this.basePath = basePath;
+      this.instantTime = instantTime;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      HoodieDataCacheKey that = (HoodieDataCacheKey) o;
+      return basePath.equals(that.basePath) && 
instantTime.equals(that.instantTime);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(basePath, instantTime);
+    }
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java 
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
index c6287a744e0..4d9980a3575 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
@@ -19,6 +19,7 @@
 
 package org.apache.hudi.common.data;
 
+import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
 import org.apache.hudi.common.util.ValidationUtils;
@@ -90,11 +91,21 @@ public class HoodieListData<T> extends 
HoodieBaseListData<T> implements HoodieDa
     return new HoodieListData<>(listData, true);
   }
 
+  @Override
+  public int getId() {
+    return -1;
+  }
+
   @Override
   public void persist(String level) {
     // No OP
   }
 
+  @Override
+  public void persist(String level, HoodieEngineContext engineContext, 
HoodieDataCacheKey cacheKey) {
+    // No OP
+  }
+
   @Override
   public void unpersist() {
     // No OP
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
index d400a10f68a..c123c279644 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
@@ -21,6 +21,7 @@ package org.apache.hudi.common.engine;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.data.HoodieAccumulator;
 import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
 import org.apache.hudi.common.function.SerializableBiFunction;
 import org.apache.hudi.common.function.SerializableConsumer;
 import org.apache.hudi.common.function.SerializableFunction;
@@ -95,4 +96,9 @@ public abstract class HoodieEngineContext {
 
   public abstract void setJobStatus(String activeModule, String 
activityDescription);
 
+  public abstract void putCachedDataIds(HoodieDataCacheKey cacheKey, int... 
ids);
+
+  public abstract List<Integer> getCachedDataIds(HoodieDataCacheKey cacheKey);
+
+  public abstract List<Integer> removeCachedDataIds(HoodieDataCacheKey 
cacheKey);
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
index 9781c694dc2..26190b790ca 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
@@ -22,6 +22,7 @@ import 
org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.data.HoodieAccumulator;
 import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
 import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
 import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.function.SerializableBiFunction;
 import org.apache.hudi.common.function.SerializableConsumer;
@@ -144,4 +145,19 @@ public final class HoodieLocalEngineContext extends 
HoodieEngineContext {
   public void setJobStatus(String activeModule, String activityDescription) {
     // no operation for now
   }
+
+  @Override
+  public void putCachedDataIds(HoodieDataCacheKey cacheKey, int... ids) {
+    // no operation for now
+  }
+
+  @Override
+  public List<Integer> getCachedDataIds(HoodieDataCacheKey cacheKey) {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public List<Integer> removeCachedDataIds(HoodieDataCacheKey cacheKey) {
+    return Collections.emptyList();
+  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 28fb77c57ba..267f5c88428 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -29,7 +29,9 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.util.AvroOrcUtils;
@@ -207,6 +209,24 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
     this(System.nanoTime(), partitionPaths, keyPartitionMap);
   }
 
+  /**
+   * Fetches next commit time in seconds from current one.
+   *
+   * @param curCommitTime current commit time.
+   * @return the next valid commit time.
+   */
+  public static Long getNextCommitTime(long curCommitTime) {
+    if ((curCommitTime + 1) % 1000000000000L >= 60) { // max seconds is 60 and 
hence
+      return Long.parseLong(HoodieActiveTimeline.createNewInstantTime());
+    } else {
+      return curCommitTime + 1;
+    }
+  }
+
+  public static String getCommitTimeAtUTC(long epochSecond) {
+    return 
HoodieInstantTimeGenerator.getInstantFromTemporalAccessor(Instant.ofEpochSecond(epochSecond).atZone(ZoneOffset.UTC));
+  }
+
   /**
    * @deprecated please use non-static version
    */
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 4d91e5076e7..eb562117df9 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -397,10 +397,6 @@ public class DeltaSync implements Serializable, Closeable {
     }
 
     metrics.updateDeltaStreamerSyncMetrics(System.currentTimeMillis());
-
-    // TODO revisit (too early to unpersist)
-    // Clear persistent RDDs
-    jssc.getPersistentRDDs().values().forEach(JavaRDD::unpersist);
     return result;
   }
 

Reply via email to