This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new dc8b078af68 [HUDI-5080] Unpersist only relevant RDDs instead of all
(#7914)
dc8b078af68 is described below
commit dc8b078af68c328e6a1d20b67b1e1fdd2cd4d78b
Author: Shiyan Xu <[email protected]>
AuthorDate: Mon Feb 27 20:27:28 2023 -0600
[HUDI-5080] Unpersist only relevant RDDs instead of all (#7914)
Track persisted RDDs in `HoodieEngineContext` so that it can be used to
filter which RDD to be unpersisted.
Added `HoodieDataCacheKey` (consist of base path and instant time) to
identify the RDD for tracking for multi-writer scenario.
---
.../apache/hudi/client/BaseHoodieWriteClient.java | 4 +-
.../metadata/HoodieBackedTableMetadataWriter.java | 3 +-
.../hudi/table/action/compact/HoodieCompactor.java | 2 +-
.../compact/RunCompactionActionExecutor.java | 4 +-
.../client/common/HoodieFlinkEngineContext.java | 16 +++
.../HoodieFlinkMergeOnReadTableCompactor.java | 3 +-
.../client/common/HoodieJavaEngineContext.java | 21 +++-
.../HoodieJavaMergeOnReadTableCompactor.java | 3 +-
.../apache/hudi/client/SparkRDDWriteClient.java | 13 ++-
.../client/common/HoodieSparkEngineContext.java | 36 +++++-
.../java/org/apache/hudi/data/HoodieJavaRDD.java | 16 ++-
.../commit/BaseSparkCommitActionExecutor.java | 7 +-
.../HoodieSparkMergeOnReadTableCompactor.java | 6 +-
.../hudi/client/TestSparkRDDWriteClient.java | 124 +++++++++++++++++++++
.../common/TestHoodieSparkEngineContext.java | 51 +++++++++
.../functional/TestHoodieBackedMetadata.java | 15 +--
.../hudi/table/TestHoodieMergeOnReadTable.java | 52 +--------
.../TestHoodieSparkMergeOnReadTableCompaction.java | 2 +-
.../org/apache/hudi/common/data/HoodieData.java | 56 +++++++++-
.../apache/hudi/common/data/HoodieListData.java | 11 ++
.../hudi/common/engine/HoodieEngineContext.java | 6 +
.../common/engine/HoodieLocalEngineContext.java | 16 +++
.../common/testutils/HoodieTestDataGenerator.java | 20 ++++
.../hudi/utilities/deltastreamer/DeltaSync.java | 4 -
24 files changed, 400 insertions(+), 91 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 9fac7a38b18..177e650c44c 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -235,7 +235,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
commit(table, commitActionType, instantTime, metadata, stats);
postCommit(table, metadata, instantTime, extraMetadata);
LOG.info("Committed " + instantTime);
- releaseResources();
+ releaseResources(instantTime);
} catch (IOException e) {
throw new HoodieCommitException("Failed to complete commit " +
config.getBasePath() + " at time " + instantTime, e);
} finally {
@@ -1238,7 +1238,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
/**
* Called after each write, to release any resources used.
*/
- protected void releaseResources() {
+ protected void releaseResources(String instantTime) {
// do nothing here
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 308a898ebda..987fe4d9dfc 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -299,7 +299,8 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
.withFinalizeWriteParallelism(parallelism)
.withAllowMultiWriteOnSameInstant(true)
.withKeyGenerator(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
-
.withPopulateMetaFields(HoodieMetadataConfig.POPULATE_META_FIELDS.defaultValue());
+
.withPopulateMetaFields(HoodieMetadataConfig.POPULATE_META_FIELDS.defaultValue())
+ .withReleaseResourceEnabled(writeConfig.areReleaseResourceEnabled());
// RecordKey properties are needed for the metadata table records
final Properties properties = new Properties();
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
index 0d18a68cbad..a22d68eb888 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
@@ -83,7 +83,7 @@ public abstract class HoodieCompactor<T, I, K, O> implements
Serializable {
*
* @param writeStatus {@link HoodieData} of {@link WriteStatus}.
*/
- public abstract void maybePersist(HoodieData<WriteStatus> writeStatus,
HoodieWriteConfig config);
+ public abstract void maybePersist(HoodieData<WriteStatus> writeStatus,
HoodieEngineContext context, HoodieWriteConfig config, String instantTime);
/**
* Execute compaction operations and report back status.
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
index c26e2a9ec51..3f86df7e3ad 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
@@ -29,8 +29,8 @@ import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.InternalSchemaCache;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCompactionException;
@@ -96,7 +96,7 @@ public class RunCompactionActionExecutor<T> extends
HoodieData<WriteStatus> statuses = compactor.compact(
context, compactionPlan, table, configCopy, instantTime,
compactionHandler);
- compactor.maybePersist(statuses, config);
+ compactor.maybePersist(statuses, context, config, instantTime);
context.setJobStatus(this.getClass().getSimpleName(), "Preparing
compaction metadata: " + config.getTableName());
List<HoodieWriteStat> updateStatusMap =
statuses.map(WriteStatus::getStat).collectAsList();
HoodieCommitMetadata metadata = new HoodieCommitMetadata(true);
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
index c9136da6bb4..db5c6ebd296 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
@@ -23,6 +23,7 @@ import
org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieAccumulator;
import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -165,6 +166,21 @@ public class HoodieFlinkEngineContext extends
HoodieEngineContext {
// no operation for now
}
+ @Override
+ public void putCachedDataIds(HoodieDataCacheKey cacheKey, int... ids) {
+ // no operation for now
+ }
+
+ @Override
+ public List<Integer> getCachedDataIds(HoodieDataCacheKey cacheKey) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<Integer> removeCachedDataIds(HoodieDataCacheKey cacheKey) {
+ return Collections.emptyList();
+ }
+
/**
* Override the flink context supplier to return constant write token.
*/
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
index 8b99ed815bd..3df829a7c9e 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.action.compact;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
@@ -55,7 +56,7 @@ public class HoodieFlinkMergeOnReadTableCompactor<T>
}
@Override
- public void maybePersist(HoodieData<WriteStatus> writeStatus,
HoodieWriteConfig config) {
+ public void maybePersist(HoodieData<WriteStatus> writeStatus,
HoodieEngineContext context, HoodieWriteConfig config, String instantTime) {
// No OP
}
}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
index 2211c8a1030..6ab8e5ab029 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
@@ -18,12 +18,11 @@
package org.apache.hudi.client.common;
-import org.apache.hadoop.conf.Configuration;
-
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieAccumulator;
import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -34,10 +33,11 @@ import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.util.Option;
-
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hadoop.conf.Configuration;
+
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -146,4 +146,19 @@ public class HoodieJavaEngineContext extends
HoodieEngineContext {
public void setJobStatus(String activeModule, String activityDescription) {
// no operation for now
}
+
+ @Override
+ public void putCachedDataIds(HoodieDataCacheKey cacheKey, int... ids) {
+ // no operation for now
+ }
+
+ @Override
+ public List<Integer> getCachedDataIds(HoodieDataCacheKey cacheKey) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<Integer> removeCachedDataIds(HoodieDataCacheKey cacheKey) {
+ return Collections.emptyList();
+ }
}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/compact/HoodieJavaMergeOnReadTableCompactor.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/compact/HoodieJavaMergeOnReadTableCompactor.java
index 97e26e7c41d..d8994ce02c3 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/compact/HoodieJavaMergeOnReadTableCompactor.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/compact/HoodieJavaMergeOnReadTableCompactor.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table.action.compact;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
@@ -53,7 +54,7 @@ public class HoodieJavaMergeOnReadTableCompactor<T>
}
@Override
- public void maybePersist(HoodieData<WriteStatus> writeStatus,
HoodieWriteConfig config) {
+ public void maybePersist(HoodieData<WriteStatus> writeStatus,
HoodieEngineContext context, HoodieWriteConfig config, String instantTime) {
// No OP
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 86f559260a7..ac3355d06fc 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -21,6 +21,7 @@ package org.apache.hudi.client;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.metrics.Registry;
@@ -381,12 +382,18 @@ public class SparkRDDWriteClient<T> extends
}
@Override
- protected void releaseResources() {
+ protected void releaseResources(String instantTime) {
// If we do not explicitly release the resource, spark will automatically
manage the resource and clean it up automatically
// see:
https://spark.apache.org/docs/latest/rdd-programming-guide.html#removing-data
if (config.areReleaseResourceEnabled()) {
- ((HoodieSparkEngineContext)
context).getJavaSparkContext().getPersistentRDDs().values()
- .forEach(JavaRDD::unpersist);
+ HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)
context;
+ Map<Integer, JavaRDD<?>> allCachedRdds =
sparkEngineContext.getJavaSparkContext().getPersistentRDDs();
+ List<Integer> dataIds =
sparkEngineContext.removeCachedDataIds(HoodieDataCacheKey.of(basePath,
instantTime));
+ for (int id : dataIds) {
+ if (allCachedRdds.containsKey(id)) {
+ allCachedRdds.get(id).unpersist();
+ }
+ }
}
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
index c97cb78d8c9..33509b66f11 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
@@ -22,6 +22,7 @@ import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieAccumulator;
import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.function.SerializableBiFunction;
@@ -41,8 +42,12 @@ import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.sql.SQLContext;
-import scala.Tuple2;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -50,14 +55,18 @@ import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import scala.Tuple2;
+
/**
* A Spark engine implementation of HoodieEngineContext.
*/
+@ThreadSafe
public class HoodieSparkEngineContext extends HoodieEngineContext {
private static final Logger LOG =
LogManager.getLogger(HoodieSparkEngineContext.class);
private final JavaSparkContext javaSparkContext;
private final SQLContext sqlContext;
+ private final Map<HoodieDataCacheKey, List<Integer>> cachedRddIds = new
HashMap<>();
public HoodieSparkEngineContext(JavaSparkContext jsc) {
this(jsc, SQLContext.getOrCreate(jsc.sc()));
@@ -180,4 +189,29 @@ public class HoodieSparkEngineContext extends
HoodieEngineContext {
public void setJobStatus(String activeModule, String activityDescription) {
javaSparkContext.setJobGroup(activeModule, activityDescription);
}
+
+ @Override
+ public void putCachedDataIds(HoodieDataCacheKey cacheKey, int... ids) {
+ synchronized (cachedRddIds) {
+ cachedRddIds.putIfAbsent(cacheKey, new ArrayList<>());
+ for (int id : ids) {
+ cachedRddIds.get(cacheKey).add(id);
+ }
+ }
+ }
+
+ @Override
+ public List<Integer> getCachedDataIds(HoodieDataCacheKey cacheKey) {
+ synchronized (cachedRddIds) {
+ return cachedRddIds.getOrDefault(cacheKey, Collections.emptyList());
+ }
+ }
+
+ @Override
+ public List<Integer> removeCachedDataIds(HoodieDataCacheKey cacheKey) {
+ synchronized (cachedRddIds) {
+ List<Integer> removed = cachedRddIds.remove(cacheKey);
+ return removed == null ? Collections.emptyList() : removed;
+ }
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
index 6ed3a854962..a712ee0640e 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
@@ -22,18 +22,21 @@ package org.apache.hudi.data;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.util.collection.MappingIterator;
import org.apache.hudi.common.util.collection.Pair;
+
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.storage.StorageLevel;
-import scala.Tuple2;
import java.util.Iterator;
import java.util.List;
+import scala.Tuple2;
+
/**
* Holds a {@link JavaRDD} of objects.
*
@@ -81,11 +84,22 @@ public class HoodieJavaRDD<T> implements HoodieData<T> {
return ((HoodieJavaPairRDD<K, V>) hoodieData).get();
}
+ @Override
+ public int getId() {
+ return rddData.id();
+ }
+
@Override
public void persist(String level) {
rddData.persist(StorageLevel.fromString(level));
}
+ @Override
+ public void persist(String level, HoodieEngineContext engineContext,
HoodieDataCacheKey cacheKey) {
+ engineContext.putCachedDataIds(cacheKey, this.getId());
+ rddData.persist(StorageLevel.fromString(level));
+ }
+
@Override
public void unpersist() {
rddData.unpersist();
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index 936ed78aa6a..78d6df51aae 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.SparkValidatorUtils;
import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroupId;
@@ -150,8 +151,8 @@ public abstract class BaseSparkCommitActionExecutor<T>
extends
// Cache the tagged records, so we don't end up computing both
JavaRDD<HoodieRecord<T>> inputRDD = HoodieJavaRDD.getJavaRDD(inputRecords);
if (inputRDD.getStorageLevel() == StorageLevel.NONE()) {
- String writeStorageLevel = config.getTaggedRecordStorageLevel();
- inputRDD.persist(StorageLevel.fromString(writeStorageLevel));
+ HoodieJavaRDD.of(inputRDD).persist(config.getTaggedRecordStorageLevel(),
+ context, HoodieDataCacheKey.of(config.getBasePath(), instantTime));
} else {
LOG.info("RDD PreppedRecords was persisted at: " +
inputRDD.getStorageLevel());
}
@@ -258,7 +259,7 @@ public abstract class BaseSparkCommitActionExecutor<T>
extends
protected HoodieData<WriteStatus> updateIndex(HoodieData<WriteStatus>
writeStatuses, HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
// cache writeStatusRDD before updating index, so that all actions before
this are not triggered again for future
// RDD actions that are performed after updating the index.
- writeStatuses.persist(config.getString(WRITE_STATUS_STORAGE_LEVEL_VALUE));
+ writeStatuses.persist(config.getString(WRITE_STATUS_STORAGE_LEVEL_VALUE),
context, HoodieDataCacheKey.of(config.getBasePath(), instantTime));
Instant indexStartTime = Instant.now();
// Update the index back
HoodieData<WriteStatus> statuses =
table.getIndex().updateLocation(writeStatuses, context, table, instantTime);
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
index 8a3daaf4aac..cd9bf334f63 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
@@ -20,6 +20,8 @@ package org.apache.hudi.table.action.compact;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
+import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
@@ -52,7 +54,7 @@ public class HoodieSparkMergeOnReadTableCompactor<T>
}
@Override
- public void maybePersist(HoodieData<WriteStatus> writeStatus,
HoodieWriteConfig config) {
- writeStatus.persist(config.getString(WRITE_STATUS_STORAGE_LEVEL_VALUE));
+ public void maybePersist(HoodieData<WriteStatus> writeStatus,
HoodieEngineContext context, HoodieWriteConfig config, String instantTime) {
+ writeStatus.persist(config.getString(WRITE_STATUS_STORAGE_LEVEL_VALUE),
context, HoodieDataCacheKey.of(config.getBasePath(), instantTime));
}
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java
new file mode 100644
index 00000000000..0d8eda4912d
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.getCommitTimeAtUTC;
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestSparkRDDWriteClient extends SparkClientFunctionalTestHarness {
+
+ static Stream<Arguments>
testWriteClientReleaseResourcesShouldOnlyUnpersistRelevantRdds() {
+ return Stream.of(
+ Arguments.of(HoodieTableType.COPY_ON_WRITE, true),
+ Arguments.of(HoodieTableType.MERGE_ON_READ, true),
+ Arguments.of(HoodieTableType.COPY_ON_WRITE, false),
+ Arguments.of(HoodieTableType.MERGE_ON_READ, false)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource
+ void
testWriteClientReleaseResourcesShouldOnlyUnpersistRelevantRdds(HoodieTableType
tableType, boolean shouldReleaseResource) throws IOException {
+ final HoodieTableMetaClient metaClient = getHoodieMetaClient(hadoopConf(),
URI.create(basePath()).getPath(), tableType, new Properties());
+ final HoodieWriteConfig writeConfig = getConfigBuilder(true)
+ .withPath(metaClient.getBasePathV2().toString())
+ .withAutoCommit(false)
+ .withReleaseResourceEnabled(shouldReleaseResource)
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+ .build();
+ HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEED);
+
+ String instant0 = getCommitTimeAtUTC(0);
+ List<GenericRecord> extraRecords0 = dataGen.generateGenericRecords(10);
+ HoodieJavaRDD<GenericRecord> persistedRdd0 =
HoodieJavaRDD.of(jsc().parallelize(extraRecords0, 2));
+ persistedRdd0.persist("MEMORY_AND_DISK", context(),
HoodieDataCacheKey.of(writeConfig.getBasePath(), instant0));
+
+ String instant1 = getCommitTimeAtUTC(1);
+ List<GenericRecord> extraRecords1 = dataGen.generateGenericRecords(10);
+ HoodieJavaRDD<GenericRecord> persistedRdd1 =
HoodieJavaRDD.of(jsc().parallelize(extraRecords1, 2));
+ persistedRdd1.persist("MEMORY_AND_DISK", context(),
HoodieDataCacheKey.of(writeConfig.getBasePath(), instant1));
+
+ SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig);
+ List<HoodieRecord> records = dataGen.generateInserts(instant1, 10);
+ JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 2);
+ writeClient.startCommitWithTime(instant1);
+ List<WriteStatus> writeStatuses = writeClient.insert(writeRecords,
instant1).collect();
+ assertNoWriteErrors(writeStatuses);
+ writeClient.commitStats(instant1,
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.empty(), metaClient.getCommitActionType());
+ writeClient.close();
+
+ if (shouldReleaseResource) {
+ assertEquals(Collections.singletonList(persistedRdd0.getId()),
+
context().getCachedDataIds(HoodieDataCacheKey.of(writeConfig.getBasePath(),
instant0)),
+ "RDDs cached for " + instant0 + " should be retained.");
+ assertEquals(Collections.emptyList(),
+
context().getCachedDataIds(HoodieDataCacheKey.of(writeConfig.getBasePath(),
instant1)),
+ "RDDs cached for " + instant1 + " should be cleared.");
+ assertTrue(jsc().getPersistentRDDs().containsKey(persistedRdd0.getId()),
+ "RDDs cached for " + instant0 + " should be retained.");
+ assertFalse(jsc().getPersistentRDDs().containsKey(persistedRdd1.getId()),
+ "RDDs cached for " + instant1 + " should be cleared.");
+ assertFalse(jsc().getPersistentRDDs().containsKey(writeRecords.id()),
+ "RDDs cached for " + instant1 + " should be cleared.");
+ } else {
+ assertEquals(Collections.singletonList(persistedRdd0.getId()),
+
context().getCachedDataIds(HoodieDataCacheKey.of(writeConfig.getBasePath(),
instant0)),
+ "RDDs cached for " + instant0 + " should be retained.");
+ assertEquals(3,
+
context().getCachedDataIds(HoodieDataCacheKey.of(writeConfig.getBasePath(),
instant1)).size(),
+ "RDDs cached for " + instant1 + " should be retained.");
+ assertTrue(jsc().getPersistentRDDs().containsKey(persistedRdd0.getId()),
+ "RDDs cached for " + instant0 + " should be retained.");
+ assertTrue(jsc().getPersistentRDDs().containsKey(persistedRdd1.getId()),
+ "RDDs cached for " + instant1 + " should be retained.");
+ assertTrue(jsc().getPersistentRDDs().containsKey(writeRecords.id()),
+ "RDDs cached for " + instant1 + " should be retained.");
+ }
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/common/TestHoodieSparkEngineContext.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/common/TestHoodieSparkEngineContext.java
new file mode 100644
index 00000000000..e3d163f077f
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/common/TestHoodieSparkEngineContext.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.common;
+
+import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestHoodieSparkEngineContext extends SparkClientFunctionalTestHarness {
+
+ private HoodieSparkEngineContext context;
+
+ @BeforeEach
+ void setUp() {
+ context = new HoodieSparkEngineContext(jsc());
+ }
+
+ @Test
+ void testAddRemoveCachedDataIds() {
+ String basePath = "/tmp/foo";
+ String instantTime = "000";
+ context.putCachedDataIds(HoodieDataCacheKey.of(basePath, instantTime), 1,
2, 3);
+ assertEquals(Arrays.asList(1, 2, 3),
context.getCachedDataIds(HoodieDataCacheKey.of(basePath, instantTime)));
+ assertEquals(Arrays.asList(1, 2, 3),
context.removeCachedDataIds(HoodieDataCacheKey.of(basePath, instantTime)));
+ assertTrue(context.getCachedDataIds(HoodieDataCacheKey.of(basePath,
instantTime)).isEmpty());
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index e491d96cffe..6fe543488b8 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -153,6 +153,7 @@ import static
org.apache.hudi.common.model.WriteOperationType.DELETE;
import static org.apache.hudi.common.model.WriteOperationType.INSERT;
import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.getNextCommitTime;
import static
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.METADATA_COMPACTION_TIME_SUFFIX;
import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
@@ -1395,20 +1396,6 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
validateMetadata(testTable, emptyList(), true);
}
- /**
- * Fetches next commit time in seconds from current one.
- *
- * @param curCommitTime current commit time.
- * @return the next valid commit time.
- */
- private Long getNextCommitTime(long curCommitTime) {
- if ((curCommitTime + 1) % 1000000000000L >= 60) { // max seconds is 60 and
hence
- return Long.parseLong(HoodieActiveTimeline.createNewInstantTime());
- } else {
- return curCommitTime + 1;
- }
- }
-
@ParameterizedTest
@MethodSource("tableTypeAndEnableOperationArgs")
public void testMetadataBootstrapLargeCommitList(HoodieTableType tableType,
boolean nonPartitionedDataset) throws Exception {
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index 32d9a6488b0..37cd23451ee 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -64,7 +64,6 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
-import org.apache.spark.storage.StorageLevel;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -304,7 +303,7 @@ public class TestHoodieMergeOnReadTable extends
SparkClientFunctionalTestHarness
Dataset<Row> actual = HoodieClientTestUtils.read(jsc(), basePath(),
sqlContext(), fs(), fullPartitionPaths);
List<Row> rows = actual.collectAsList();
assertEquals(updatedRecords.size(), rows.size());
- for (Row row: rows) {
+ for (Row row : rows) {
assertEquals(row.getAs(HoodieRecord.COMMIT_TIME_METADATA_FIELD),
preserveCommitMeta ? newCommitTime : compactionInstantTime);
}
}
@@ -458,7 +457,7 @@ public class TestHoodieMergeOnReadTable extends
SparkClientFunctionalTestHarness
inserts = 0;
int upserts = 0;
List<WriteStatus> writeStatusList = statuses.collect();
- for (WriteStatus ws: writeStatusList) {
+ for (WriteStatus ws : writeStatusList) {
inserts += ws.getStat().getNumInserts();
upserts += ws.getStat().getNumUpdateWrites();
}
@@ -691,51 +690,4 @@ public class TestHoodieMergeOnReadTable extends
SparkClientFunctionalTestHarness
assertEquals(fewRecordsForDelete.size() - numRecordsInPartition,
status.getTotalErrorRecords());
}
}
-
- @Test
- public void testReleaseResource() throws Exception {
- HoodieWriteConfig.Builder builder = getConfigBuilder(true);
- builder.withReleaseResourceEnabled(true);
- builder.withAutoCommit(false);
-
- setUp(builder.build().getProps());
-
- /**
- * Write 1 (test when RELEASE_RESOURCE_ENABLE is true)
- */
- try (SparkRDDWriteClient client = getHoodieWriteClient(builder.build())) {
-
- String newCommitTime = "001";
- client.startCommitWithTime(newCommitTime);
-
- List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
- JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
- writeRecords.persist(StorageLevel.MEMORY_AND_DISK());
- List<WriteStatus> statuses = client.upsert(writeRecords,
newCommitTime).collect();
- assertNoWriteErrors(statuses);
- client.commitStats(newCommitTime,
statuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
Option.empty(), metaClient.getCommitActionType());
- assertEquals(spark().sparkContext().persistentRdds().size(), 0);
- }
-
- builder.withReleaseResourceEnabled(false);
-
- /**
- * Write 2 (test when RELEASE_RESOURCE_ENABLE is false)
- */
- try (SparkRDDWriteClient client = getHoodieWriteClient(builder.build())) {
- String newCommitTime = "002";
- client.startCommitWithTime(newCommitTime);
-
- List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
- JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
-
- writeRecords.persist(StorageLevel.MEMORY_AND_DISK());
- List<WriteStatus> statuses = client.upsert(writeRecords,
newCommitTime).collect();
- assertNoWriteErrors(statuses);
- client.commitStats(newCommitTime,
statuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
Option.empty(), metaClient.getCommitActionType());
- assertTrue(spark().sparkContext().persistentRdds().size() > 0);
- }
-
- }
}
-
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
index b50b0f90cca..d5657c155f1 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
@@ -197,4 +197,4 @@ public class TestHoodieSparkMergeOnReadTableCompaction
extends SparkClientFuncti
metaClient = HoodieTableMetaClient.reload(metaClient);
return writeStatuses;
}
-}
\ No newline at end of file
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
index 9ce0947883f..60820d5a0ce 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
@@ -19,6 +19,7 @@
package org.apache.hudi.common.data;
+import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.util.collection.Pair;
@@ -26,6 +27,7 @@ import org.apache.hudi.common.util.collection.Pair;
import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
+import java.util.Objects;
/**
* An interface abstracting a container holding a collection of objects of
type {@code T}
@@ -51,10 +53,24 @@ import java.util.List;
public interface HoodieData<T> extends Serializable {
/**
- * Persists the data w/ provided {@code level} (if applicable)
+ * Get the {@link HoodieData}'s unique non-negative identifier. -1 indicates
invalid id.
+ */
+ int getId();
+
+ /**
+ * Persists the data w/ provided {@code level} (if applicable).
+ *
+ * Use this method only when you call {@link #unpersist()} at some later
point for the same {@link HoodieData}.
+ * Otherwise, use {@link #persist(String, HoodieEngineContext,
HoodieDataCacheKey)} instead for auto-unpersist
+ * at the end of a client write operation.
*/
void persist(String level);
+ /**
+ * Persists the data w/ provided {@code level} (if applicable), and cache
the data's ids within the {@code engineContext}.
+ */
+ void persist(String level, HoodieEngineContext engineContext,
HoodieDataCacheKey cacheKey);
+
/**
* Un-persists the data (if previously persisted)
*/
@@ -196,4 +212,42 @@ public interface HoodieData<T> extends Serializable {
.reduceByKey((value1, value2) -> value1, parallelism)
.values();
}
+
+ /**
+ * The key used in a caching map to identify a {@link HoodieData}.
+ *
+ * At the end of a write operation, we manually unpersist the {@link
HoodieData} associated with that writer.
+ * Therefore, in multi-writer scenario, we need to use both {@code basePath}
and {@code instantTime} to identify {@link HoodieData}s.
+ */
+ class HoodieDataCacheKey implements Serializable {
+
+ public static HoodieDataCacheKey of(String basePath, String instantTime) {
+ return new HoodieDataCacheKey(basePath, instantTime);
+ }
+
+ private final String basePath;
+ private final String instantTime;
+
+ private HoodieDataCacheKey(String basePath, String instantTime) {
+ this.basePath = basePath;
+ this.instantTime = instantTime;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ HoodieDataCacheKey that = (HoodieDataCacheKey) o;
+ return basePath.equals(that.basePath) &&
instantTime.equals(that.instantTime);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(basePath, instantTime);
+ }
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
index c6287a744e0..4d9980a3575 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
@@ -19,6 +19,7 @@
package org.apache.hudi.common.data;
+import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.util.ValidationUtils;
@@ -90,11 +91,21 @@ public class HoodieListData<T> extends
HoodieBaseListData<T> implements HoodieDa
return new HoodieListData<>(listData, true);
}
+ @Override
+ public int getId() {
+ return -1;
+ }
+
@Override
public void persist(String level) {
// No OP
}
+ @Override
+ public void persist(String level, HoodieEngineContext engineContext,
HoodieDataCacheKey cacheKey) {
+ // No OP
+ }
+
@Override
public void unpersist() {
// No OP
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
index d400a10f68a..c123c279644 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
@@ -21,6 +21,7 @@ package org.apache.hudi.common.engine;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieAccumulator;
import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction;
@@ -95,4 +96,9 @@ public abstract class HoodieEngineContext {
public abstract void setJobStatus(String activeModule, String
activityDescription);
+ public abstract void putCachedDataIds(HoodieDataCacheKey cacheKey, int...
ids);
+
+ public abstract List<Integer> getCachedDataIds(HoodieDataCacheKey cacheKey);
+
+ public abstract List<Integer> removeCachedDataIds(HoodieDataCacheKey
cacheKey);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
index 9781c694dc2..26190b790ca 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
@@ -22,6 +22,7 @@ import
org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieAccumulator;
import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableConsumer;
@@ -144,4 +145,19 @@ public final class HoodieLocalEngineContext extends
HoodieEngineContext {
public void setJobStatus(String activeModule, String activityDescription) {
// no operation for now
}
+
+ @Override
+ public void putCachedDataIds(HoodieDataCacheKey cacheKey, int... ids) {
+ // no operation for now
+ }
+
+ @Override
+ public List<Integer> getCachedDataIds(HoodieDataCacheKey cacheKey) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<Integer> removeCachedDataIds(HoodieDataCacheKey cacheKey) {
+ return Collections.emptyList();
+ }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 28fb77c57ba..267f5c88428 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -29,7 +29,9 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.AvroOrcUtils;
@@ -207,6 +209,24 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
this(System.nanoTime(), partitionPaths, keyPartitionMap);
}
+ /**
+ * Fetches next commit time in seconds from current one.
+ *
+ * @param curCommitTime current commit time.
+ * @return the next valid commit time.
+ */
+ public static Long getNextCommitTime(long curCommitTime) {
+ if ((curCommitTime + 1) % 1000000000000L >= 60) { // max seconds is 60 and
hence
+ return Long.parseLong(HoodieActiveTimeline.createNewInstantTime());
+ } else {
+ return curCommitTime + 1;
+ }
+ }
+
+ public static String getCommitTimeAtUTC(long epochSecond) {
+ return
HoodieInstantTimeGenerator.getInstantFromTemporalAccessor(Instant.ofEpochSecond(epochSecond).atZone(ZoneOffset.UTC));
+ }
+
/**
* @deprecated please use non-static version
*/
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 4d91e5076e7..eb562117df9 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -397,10 +397,6 @@ public class DeltaSync implements Serializable, Closeable {
}
metrics.updateDeltaStreamerSyncMetrics(System.currentTimeMillis());
-
- // TODO revisit (too early to unpersist)
- // Clear persistent RDDs
- jssc.getPersistentRDDs().values().forEach(JavaRDD::unpersist);
return result;
}