This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bda6afe0b [HUDI-4249] Fixing in-memory `HoodieData` implementation to 
operate lazily  (#5855)
4bda6afe0b is described below

commit 4bda6afe0b2858d31337fe42979929523ba07f6d
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Sat Jul 16 16:26:48 2022 -0700

    [HUDI-4249] Fixing in-memory `HoodieData` implementation to operate lazily  
(#5855)
---
 .../apache/hudi/index/bloom/HoodieBloomIndex.java  |   1 -
 .../bloom/ListBasedHoodieBloomIndexHelper.java     |   3 +-
 .../hudi/index/bucket/HoodieBucketIndex.java       |   3 +-
 .../apache/hudi/client/HoodieFlinkWriteClient.java |   5 +-
 .../client/common/HoodieFlinkEngineContext.java    |   6 +-
 .../org/apache/hudi/index/FlinkHoodieIndex.java    |   8 +-
 .../FlinkHoodieBackedTableMetadataWriter.java      |   3 +-
 .../org/apache/hudi/table/HoodieFlinkTable.java    |   4 +-
 .../table/action/commit/FlinkDeleteHelper.java     |   5 +-
 .../hudi/table/action/commit/FlinkWriteHelper.java |   5 +-
 .../index/bloom/TestFlinkHoodieBloomIndex.java     |  11 +-
 .../testutils/HoodieFlinkClientTestHarness.java    |   5 +-
 .../apache/hudi/client/HoodieJavaWriteClient.java  |   5 +-
 .../run/strategy/JavaExecutionStrategy.java        |   4 +-
 .../client/common/HoodieJavaEngineContext.java     |   6 +-
 .../org/apache/hudi/index/JavaHoodieIndex.java     |   8 +-
 .../org/apache/hudi/table/HoodieJavaTable.java     |   4 +-
 .../commit/BaseJavaCommitActionExecutor.java       |   8 +-
 .../hudi/table/action/commit/JavaDeleteHelper.java |   5 +-
 .../hudi/table/action/commit/JavaWriteHelper.java  |   5 +-
 .../org/apache/hudi/data/HoodieJavaPairRDD.java    |  17 +-
 .../java/org/apache/hudi/data/HoodieJavaRDD.java   |  32 +---
 .../hudi/common/data/HoodieBaseListData.java       |  72 ++++++++
 .../org/apache/hudi/common/data/HoodieData.java    | 167 +++++++++++------
 .../org/apache/hudi/common/data/HoodieList.java    | 178 ------------------
 .../apache/hudi/common/data/HoodieListData.java    | 182 +++++++++++++++++++
 .../hudi/common/data/HoodieListPairData.java       | 201 +++++++++++++++++++++
 .../org/apache/hudi/common/data/HoodieMapPair.java | 168 -----------------
 .../apache/hudi/common/data/HoodiePairData.java    |  70 ++++---
 .../common/engine/HoodieLocalEngineContext.java    |   6 +-
 .../apache/hudi/common/util/CollectionUtils.java   |  14 ++
 .../java/org/apache/hudi/common/util/Either.java   |  93 ++++++++++
 ...TestHoodieList.java => TestHoodieListData.java} |  21 ++-
 ...apPair.java => TestHoodieListDataPairData.java} |  80 +++++---
 .../hudi/sink/compact/CompactionCommitSink.java    |   4 +-
 35 files changed, 861 insertions(+), 548 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
index aeefe6c304..1417e40a9f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
@@ -40,7 +40,6 @@ import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.HoodieIndexUtils;
 import org.apache.hudi.io.HoodieRangeInfoHandle;
 import org.apache.hudi.table.HoodieTable;
-
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java
index c42d80c62e..cffee5ee74 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java
@@ -20,7 +20,6 @@
 package org.apache.hudi.index.bloom;
 
 import org.apache.hudi.common.data.HoodieData;
-import org.apache.hudi.common.data.HoodieList;
 import org.apache.hudi.common.data.HoodiePairData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieKey;
@@ -60,7 +59,7 @@ public class ListBasedHoodieBloomIndexHelper extends 
BaseHoodieBloomIndexHelper
       HoodieData<Pair<String, HoodieKey>> fileComparisonPairs,
       Map<String, List<BloomIndexFileInfo>> partitionToFileInfo, Map<String, 
Long> recordsPerPartition) {
     List<Pair<String, HoodieKey>> fileComparisonPairList =
-        HoodieList.getList(fileComparisonPairs).stream()
+        fileComparisonPairs.collectAsList().stream()
             .sorted(Comparator.comparing(Pair::getLeft)).collect(toList());
 
     List<HoodieKeyLookupResult> keyLookupResults = new ArrayList<>();
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
index c3584d234a..cbb3b07f44 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
@@ -83,7 +83,8 @@ public abstract class HoodieBucketIndex extends 
HoodieIndex<Object, Object> {
             Option<HoodieRecordLocation> loc = 
mapper.getRecordLocation(record.getKey(), record.getPartitionPath());
             return HoodieIndexUtils.getTaggedRecord(record, loc);
           }
-        }
+        },
+        false
     );
   }
 
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index ac75997ea5..dce9245860 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -20,7 +20,7 @@ package org.apache.hudi.client;
 
 import org.apache.hudi.async.AsyncCleanerService;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
-import org.apache.hudi.common.data.HoodieList;
+import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
@@ -127,8 +127,7 @@ public class HoodieFlinkWriteClient<T extends 
HoodieRecordPayload> extends
     // Create a Hoodie table which encapsulated the commits and files visible
     HoodieFlinkTable<T> table = getHoodieTable();
     Timer.Context indexTimer = metrics.getIndexCtx();
-    List<HoodieRecord<T>> recordsWithLocation = HoodieList.getList(
-        getIndex().tagLocation(HoodieList.of(hoodieRecords), context, table));
+    List<HoodieRecord<T>> recordsWithLocation = 
getIndex().tagLocation(HoodieListData.eager(hoodieRecords), context, 
table).collectAsList();
     metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer 
== null ? 0L : indexTimer.stop()));
     return recordsWithLocation.stream().filter(v1 -> 
!v1.isCurrentLocationKnown()).collect(Collectors.toList());
   }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
index 62f8d4fa03..76ca606009 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
@@ -23,7 +23,7 @@ import 
org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.data.HoodieAccumulator;
 import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
 import org.apache.hudi.common.data.HoodieData;
-import org.apache.hudi.common.data.HoodieList;
+import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.EngineProperty;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.engine.TaskContextSupplier;
@@ -84,12 +84,12 @@ public class HoodieFlinkEngineContext extends 
HoodieEngineContext {
 
   @Override
   public <T> HoodieData<T> emptyHoodieData() {
-    return HoodieList.of(Collections.emptyList());
+    return HoodieListData.eager(Collections.emptyList());
   }
 
   @Override
   public <T> HoodieData<T> parallelize(List<T> data, int parallelism) {
-    return HoodieList.of(data);
+    return HoodieListData.eager(data);
   }
 
   public RuntimeContext getRuntimeContext() {
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndex.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndex.java
index 66c1b07793..be2273a840 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndex.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndex.java
@@ -23,7 +23,7 @@ import org.apache.hudi.ApiMaturityLevel;
 import org.apache.hudi.PublicAPIMethod;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.data.HoodieData;
-import org.apache.hudi.common.data.HoodieList;
+import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -61,8 +61,8 @@ public abstract class FlinkHoodieIndex<T extends 
HoodieRecordPayload> extends Ho
   public <R> HoodieData<HoodieRecord<R>> tagLocation(
       HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
       HoodieTable hoodieTable) throws HoodieIndexException {
-    List<HoodieRecord<T>> hoodieRecords = 
tagLocation(HoodieList.getList(records.map(record -> (HoodieRecord<T>) 
record)), context, hoodieTable);
-    return HoodieList.of(hoodieRecords.stream().map(r -> (HoodieRecord<R>) 
r).collect(Collectors.toList()));
+    List<HoodieRecord<T>> hoodieRecords = tagLocation(records.map(record -> 
(HoodieRecord<T>) record).collectAsList(), context, hoodieTable);
+    return HoodieListData.eager(hoodieRecords.stream().map(r -> 
(HoodieRecord<R>) r).collect(Collectors.toList()));
   }
 
   @Override
@@ -70,6 +70,6 @@ public abstract class FlinkHoodieIndex<T extends 
HoodieRecordPayload> extends Ho
   public HoodieData<WriteStatus> updateLocation(
       HoodieData<WriteStatus> writeStatuses, HoodieEngineContext context,
       HoodieTable hoodieTable) throws HoodieIndexException {
-    return HoodieList.of(updateLocation(HoodieList.getList(writeStatuses), 
context, hoodieTable));
+    return HoodieListData.eager(updateLocation(writeStatuses.collectAsList(), 
context, hoodieTable));
   }
 }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index 222ff78edc..81df5a9361 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -21,7 +21,6 @@ package org.apache.hudi.metadata;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.data.HoodieData;
-import org.apache.hudi.common.data.HoodieList;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -106,7 +105,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
     ValidationUtils.checkState(enabled, "Metadata table cannot be committed to 
as it is not enabled");
     ValidationUtils.checkState(metadataMetaClient != null, "Metadata table is 
not fully initialized yet.");
     HoodieData<HoodieRecord> preppedRecords = prepRecords(partitionRecordsMap);
-    List<HoodieRecord> preppedRecordList = HoodieList.getList(preppedRecords);
+    List<HoodieRecord> preppedRecordList = preppedRecords.collectAsList();
 
     try (HoodieFlinkWriteClient writeClient = new 
HoodieFlinkWriteClient(engineContext, metadataWriteConfig)) {
       if (canTriggerTableService) {
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
index 26149918c6..3431c7334a 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
@@ -40,8 +40,6 @@ import org.apache.avro.specific.SpecificRecordBase;
 
 import java.util.List;
 
-import static org.apache.hudi.common.data.HoodieList.getList;
-
 public abstract class HoodieFlinkTable<T extends HoodieRecordPayload>
     extends HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>>
     implements ExplicitWriteHandleTable<T> {
@@ -78,7 +76,7 @@ public abstract class HoodieFlinkTable<T extends 
HoodieRecordPayload>
 
   public static HoodieWriteMetadata<List<WriteStatus>> convertMetadata(
       HoodieWriteMetadata<HoodieData<WriteStatus>> metadata) {
-    return metadata.clone(getList(metadata.getWriteStatuses()));
+    return metadata.clone(metadata.getWriteStatuses().collectAsList());
   }
 
   @Override
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteHelper.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteHelper.java
index 8dd0c99bae..9540354e10 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteHelper.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteHelper.java
@@ -19,7 +19,7 @@
 package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.data.HoodieList;
+import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -97,8 +97,7 @@ public class FlinkDeleteHelper<R> extends
           dedupedKeys.stream().map(key -> new HoodieAvroRecord<>(key, new 
EmptyHoodieRecordPayload())).collect(Collectors.toList());
       Instant beginTag = Instant.now();
       // perform index look up to get existing location of records
-      List<HoodieRecord<EmptyHoodieRecordPayload>> taggedRecords = 
HoodieList.getList(
-          table.getIndex().tagLocation(HoodieList.of(dedupedRecords), context, 
table));
+      List<HoodieRecord<EmptyHoodieRecordPayload>> taggedRecords = 
table.getIndex().tagLocation(HoodieListData.eager(dedupedRecords), context, 
table).collectAsList();
       Duration tagLocationDuration = Duration.between(beginTag, Instant.now());
 
       // filter out non existent keys/records
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
index 9c17e77b91..57e5aa9ad5 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
@@ -19,7 +19,7 @@
 package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.data.HoodieList;
+import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieKey;
@@ -83,8 +83,7 @@ public class FlinkWriteHelper<T extends HoodieRecordPayload, 
R> extends BaseWrit
 
   @Override
   protected List<HoodieRecord<T>> tag(List<HoodieRecord<T>> dedupedRecords, 
HoodieEngineContext context, HoodieTable<T, List<HoodieRecord<T>>, 
List<HoodieKey>, List<WriteStatus>> table) {
-    return HoodieList.getList(
-        table.getIndex().tagLocation(HoodieList.of(dedupedRecords), context, 
table));
+    return table.getIndex().tagLocation(HoodieListData.eager(dedupedRecords), 
context, table).collectAsList();
   }
 
   @Override
diff --git 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
index e23ee4ad58..b62e09eb27 100644
--- 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
+++ 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
@@ -18,11 +18,12 @@
 
 package org.apache.hudi.index.bloom;
 
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.bloom.BloomFilterFactory;
 import org.apache.hudi.common.bloom.BloomFilterTypeCode;
-import org.apache.hudi.common.data.HoodieList;
-import org.apache.hudi.common.data.HoodieMapPair;
+import org.apache.hudi.common.data.HoodieListPairData;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -37,9 +38,6 @@ import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.testutils.HoodieFlinkClientTestHarness;
 import org.apache.hudi.testutils.HoodieFlinkWriteableTestTable;
-
-import org.apache.avro.Schema;
-import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -185,8 +183,7 @@ public class TestFlinkHoodieBloomIndex extends 
HoodieFlinkClientTestHarness {
           partitionRecordKeyMap.put(t.getLeft(), recordKeyList);
         });
 
-    List<Pair<String, HoodieKey>> comparisonKeyList = HoodieList.getList(
-        index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, 
HoodieMapPair.of(partitionRecordKeyMap)));
+    List<Pair<String, HoodieKey>> comparisonKeyList = 
index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, 
HoodieListPairData.lazy(partitionRecordKeyMap)).collectAsList();
 
     assertEquals(10, comparisonKeyList.size());
     java.util.Map<String, List<String>> recordKeyToFileComps = 
comparisonKeyList.stream()
diff --git 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkClientTestHarness.java
 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkClientTestHarness.java
index 054a363168..dfbb3336b9 100644
--- 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkClientTestHarness.java
+++ 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkClientTestHarness.java
@@ -21,7 +21,8 @@ package org.apache.hudi.testutils;
 import org.apache.hudi.client.FlinkTaskContextSupplier;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
-import org.apache.hudi.common.data.HoodieList;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
@@ -133,7 +134,7 @@ public class HoodieFlinkClientTestHarness extends 
HoodieCommonTestHarness implem
 
   protected List<HoodieRecord> tagLocation(
       HoodieIndex index, List<HoodieRecord> records, HoodieTable table) {
-    return HoodieList.getList(index.tagLocation(HoodieList.of(records), 
context, table));
+    return ((HoodieData<HoodieRecord>) 
index.tagLocation(HoodieListData.eager(records), context, 
table)).collectAsList();
   }
 
   /**
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
index fbfb85bab3..b6951bc6b7 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
@@ -20,7 +20,7 @@ package org.apache.hudi.client;
 
 import org.apache.hudi.client.common.HoodieJavaEngineContext;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
-import org.apache.hudi.common.data.HoodieList;
+import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieKey;
@@ -67,8 +67,7 @@ public class HoodieJavaWriteClient<T extends 
HoodieRecordPayload> extends
     // Create a Hoodie table which encapsulated the commits and files visible
     HoodieJavaTable<T> table = HoodieJavaTable.create(config, 
(HoodieJavaEngineContext) context);
     Timer.Context indexTimer = metrics.getIndexCtx();
-    List<HoodieRecord<T>> recordsWithLocation = HoodieList.getList(
-        getIndex().tagLocation(HoodieList.of(hoodieRecords), context, table));
+    List<HoodieRecord<T>> recordsWithLocation = 
getIndex().tagLocation(HoodieListData.eager(hoodieRecords), context, 
table).collectAsList();
     metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer 
== null ? 0L : indexTimer.stop()));
     return recordsWithLocation.stream().filter(v1 -> 
!v1.isCurrentLocationKnown()).collect(Collectors.toList());
   }
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
index adcbb874e8..456bb3cb47 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
@@ -25,7 +25,7 @@ import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.JavaTaskContextSupplier;
 import org.apache.hudi.common.data.HoodieData;
-import org.apache.hudi.common.data.HoodieList;
+import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.ClusteringOperation;
 import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -94,7 +94,7 @@ public abstract class JavaExecutionStrategy<T extends 
HoodieRecordPayload<T>>
             
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false),
             instantTime)));
     HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = new 
HoodieWriteMetadata<>();
-    writeMetadata.setWriteStatuses(HoodieList.of(writeStatusList));
+    writeMetadata.setWriteStatuses(HoodieListData.eager(writeStatusList));
     return writeMetadata;
   }
 
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
index 22d4ccabcd..2211c8a103 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
@@ -24,7 +24,7 @@ import 
org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.data.HoodieAccumulator;
 import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
 import org.apache.hudi.common.data.HoodieData;
-import org.apache.hudi.common.data.HoodieList;
+import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.EngineProperty;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.engine.TaskContextSupplier;
@@ -74,12 +74,12 @@ public class HoodieJavaEngineContext extends 
HoodieEngineContext {
 
   @Override
   public <T> HoodieData<T> emptyHoodieData() {
-    return HoodieList.of(Collections.emptyList());
+    return HoodieListData.eager(Collections.emptyList());
   }
 
   @Override
   public <T> HoodieData<T> parallelize(List<T> data, int parallelism) {
-    return HoodieList.of(data);
+    return HoodieListData.eager(data);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndex.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndex.java
index dd64859cad..dcc9d050dc 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndex.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndex.java
@@ -23,7 +23,7 @@ import org.apache.hudi.ApiMaturityLevel;
 import org.apache.hudi.PublicAPIMethod;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.data.HoodieData;
-import org.apache.hudi.common.data.HoodieList;
+import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -58,8 +58,8 @@ public abstract class JavaHoodieIndex<T extends 
HoodieRecordPayload> extends Hoo
   public <R> HoodieData<HoodieRecord<R>> tagLocation(
       HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
       HoodieTable hoodieTable) throws HoodieIndexException {
-    List<HoodieRecord<T>> hoodieRecords = 
tagLocation(HoodieList.getList(records.map(record -> (HoodieRecord<T>) 
record)), context, hoodieTable);
-    return HoodieList.of(hoodieRecords.stream().map(r -> (HoodieRecord<R>) 
r).collect(Collectors.toList()));
+    List<HoodieRecord<T>> hoodieRecords = tagLocation(records.map(record -> 
(HoodieRecord<T>) record).collectAsList(), context, hoodieTable);
+    return HoodieListData.eager(hoodieRecords.stream().map(r -> 
(HoodieRecord<R>) r).collect(Collectors.toList()));
   }
 
   @Override
@@ -67,6 +67,6 @@ public abstract class JavaHoodieIndex<T extends 
HoodieRecordPayload> extends Hoo
   public HoodieData<WriteStatus> updateLocation(
       HoodieData<WriteStatus> writeStatuses, HoodieEngineContext context,
       HoodieTable hoodieTable) throws HoodieIndexException {
-    return HoodieList.of(updateLocation(HoodieList.getList(writeStatuses), 
context, hoodieTable));
+    return HoodieListData.eager(updateLocation(writeStatuses.collectAsList(), 
context, hoodieTable));
   }
 }
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
index f9c7caff6e..3c878cbc14 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
@@ -36,8 +36,6 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
 
 import java.util.List;
 
-import static org.apache.hudi.common.data.HoodieList.getList;
-
 public abstract class HoodieJavaTable<T extends HoodieRecordPayload>
     extends HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> {
   protected HoodieJavaTable(HoodieWriteConfig config, HoodieEngineContext 
context, HoodieTableMetaClient metaClient) {
@@ -67,7 +65,7 @@ public abstract class HoodieJavaTable<T extends 
HoodieRecordPayload>
 
   public static HoodieWriteMetadata<List<WriteStatus>> convertMetadata(
       HoodieWriteMetadata<HoodieData<WriteStatus>> metadata) {
-    return metadata.clone(getList(metadata.getWriteStatuses()));
+    return metadata.clone(metadata.getWriteStatuses().collectAsList());
   }
 
   @Override
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
index dc6994d315..22c90eb8bb 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
@@ -19,7 +19,7 @@
 package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.data.HoodieList;
+import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -130,8 +130,7 @@ public abstract class BaseJavaCommitActionExecutor<T 
extends HoodieRecordPayload
   protected List<WriteStatus> updateIndex(List<WriteStatus> writeStatuses, 
HoodieWriteMetadata<List<WriteStatus>> result) {
     Instant indexStartTime = Instant.now();
     // Update the index back
-    List<WriteStatus> statuses = HoodieList.getList(
-        table.getIndex().updateLocation(HoodieList.of(writeStatuses), context, 
table));
+    List<WriteStatus> statuses = 
table.getIndex().updateLocation(HoodieListData.eager(writeStatuses), context, 
table).collectAsList();
     result.setIndexUpdateDuration(Duration.between(indexStartTime, 
Instant.now()));
     result.setWriteStatuses(statuses);
     return statuses;
@@ -339,8 +338,7 @@ public abstract class BaseJavaCommitActionExecutor<T 
extends HoodieRecordPayload
   public void updateIndexAndCommitIfNeeded(List<WriteStatus> writeStatuses, 
HoodieWriteMetadata result) {
     Instant indexStartTime = Instant.now();
     // Update the index back
-    List<WriteStatus> statuses = HoodieList.getList(
-        table.getIndex().updateLocation(HoodieList.of(writeStatuses), context, 
table));
+    List<WriteStatus> statuses = 
table.getIndex().updateLocation(HoodieListData.eager(writeStatuses), context, 
table).collectAsList();
     result.setIndexUpdateDuration(Duration.between(indexStartTime, 
Instant.now()));
     result.setWriteStatuses(statuses);
     result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(result));
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteHelper.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteHelper.java
index f82c1c561b..57d796c925 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteHelper.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteHelper.java
@@ -19,7 +19,7 @@
 package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.data.HoodieList;
+import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -99,8 +99,7 @@ public class JavaDeleteHelper<R> extends
           dedupedKeys.stream().map(key -> new HoodieAvroRecord<>(key, new 
EmptyHoodieRecordPayload())).collect(Collectors.toList());
       Instant beginTag = Instant.now();
       // perform index look up to get existing location of records
-      List<HoodieRecord<EmptyHoodieRecordPayload>> taggedRecords = 
HoodieList.getList(
-          table.getIndex().tagLocation(HoodieList.of(dedupedRecords), context, 
table));
+      List<HoodieRecord<EmptyHoodieRecordPayload>> taggedRecords = 
table.getIndex().tagLocation(HoodieListData.eager(dedupedRecords), context, 
table).collectAsList();
       Duration tagLocationDuration = Duration.between(beginTag, Instant.now());
 
       // filter out non existent keys/records
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
index 3a1fa4b884..4504a9bdcc 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
@@ -19,7 +19,7 @@
 package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.data.HoodieList;
+import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieKey;
@@ -50,8 +50,7 @@ public class JavaWriteHelper<T extends HoodieRecordPayload,R> 
extends BaseWriteH
 
   @Override
   protected List<HoodieRecord<T>> tag(List<HoodieRecord<T>> dedupedRecords, 
HoodieEngineContext context, HoodieTable<T, List<HoodieRecord<T>>, 
List<HoodieKey>, List<WriteStatus>> table) {
-    return HoodieList.getList(
-        table.getIndex().tagLocation(HoodieList.of(dedupedRecords), context, 
table));
+    return table.getIndex().tagLocation(HoodieListData.eager(dedupedRecords), 
context, table).collectAsList();
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java
index ddcaaec0fa..9ec3c4cf71 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.storage.StorageLevel;
 
+import java.util.List;
 import java.util.Map;
 
 import scala.Tuple2;
@@ -41,7 +42,7 @@ import scala.Tuple2;
  * @param <K> type of key.
  * @param <V> type of value.
  */
-public class HoodieJavaPairRDD<K, V> extends HoodiePairData<K, V> {
+public class HoodieJavaPairRDD<K, V> implements HoodiePairData<K, V> {
 
   private final JavaPairRDD<K, V> pairRDDData;
 
@@ -105,8 +106,13 @@ public class HoodieJavaPairRDD<K, V> extends 
HoodiePairData<K, V> {
   }
 
   @Override
-  public HoodiePairData<K, V> reduceByKey(SerializableBiFunction<V, V, V> 
func, int parallelism) {
-    return HoodieJavaPairRDD.of(pairRDDData.reduceByKey(func::apply, 
parallelism));
+  public HoodiePairData<K, Iterable<V>> groupByKey() {
+    return new HoodieJavaPairRDD<>(pairRDDData.groupByKey());
+  }
+
+  @Override
+  public HoodiePairData<K, V> reduceByKey(SerializableBiFunction<V, V, V> 
combiner, int parallelism) {
+    return HoodieJavaPairRDD.of(pairRDDData.reduceByKey(combiner::apply, 
parallelism));
   }
 
   @Override
@@ -130,4 +136,9 @@ public class HoodieJavaPairRDD<K, V> extends 
HoodiePairData<K, V> {
             .map(tuple -> new Tuple2<>(tuple._1,
                 new ImmutablePair<>(tuple._2._1, 
Option.ofNullable(tuple._2._2.orElse(null)))))));
   }
+
+  @Override
+  public List<Pair<K, V>> collectAsList() {
+    return pairRDDData.map(t -> Pair.of(t._1, t._2)).collect();
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
index 0843dfc3c9..3964fa2d6b 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
 import org.apache.hudi.common.util.collection.Pair;
 
+import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.storage.StorageLevel;
 
@@ -39,7 +40,7 @@ import scala.Tuple2;
  *
  * @param <T> type of object.
  */
-public class HoodieJavaRDD<T> extends HoodieData<T> {
+public class HoodieJavaRDD<T> implements HoodieData<T> {
 
   private final JavaRDD<T> rddData;
 
@@ -74,17 +75,16 @@ public class HoodieJavaRDD<T> extends HoodieData<T> {
    * @return the a {@link JavaRDD} of objects in type T.
    */
   public static <T> JavaRDD<T> getJavaRDD(HoodieData<T> hoodieData) {
-    return ((HoodieJavaRDD<T>) hoodieData).get();
+    return ((HoodieJavaRDD<T>) hoodieData).rddData;
   }
 
-  @Override
-  public JavaRDD<T> get() {
-    return rddData;
+  public static <K, V> JavaPairRDD<K, V> getJavaRDD(HoodiePairData<K, V> 
hoodieData) {
+    return ((HoodieJavaPairRDD<K, V>) hoodieData).get();
   }
 
   @Override
-  public void persist(String cacheConfig) {
-    rddData.persist(StorageLevel.fromString(cacheConfig));
+  public void persist(String level) {
+    rddData.persist(StorageLevel.fromString(level));
   }
 
   @Override
@@ -112,20 +112,15 @@ public class HoodieJavaRDD<T> extends HoodieData<T> {
     return HoodieJavaRDD.of(rddData.mapPartitions(func::apply, 
preservesPartitioning));
   }
 
-  @Override
-  public <O> HoodieData<O> mapPartitions(SerializableFunction<Iterator<T>, 
Iterator<O>> func) {
-    return HoodieJavaRDD.of(rddData.mapPartitions(func::apply));
-  }
-
   @Override
   public <O> HoodieData<O> flatMap(SerializableFunction<T, Iterator<O>> func) {
     return HoodieJavaRDD.of(rddData.flatMap(e -> func.apply(e)));
   }
 
   @Override
-  public <K, V> HoodiePairData<K, V> mapToPair(SerializablePairFunction<T, K, 
V> mapToPairFunc) {
+  public <K, V> HoodiePairData<K, V> mapToPair(SerializablePairFunction<T, K, 
V> func) {
     return HoodieJavaPairRDD.of(rddData.mapToPair(input -> {
-      Pair<K, V> pair = mapToPairFunc.call(input);
+      Pair<K, V> pair = func.call(input);
       return new Tuple2<>(pair.getLeft(), pair.getRight());
     }));
   }
@@ -140,13 +135,6 @@ public class HoodieJavaRDD<T> extends HoodieData<T> {
     return HoodieJavaRDD.of(rddData.distinct(parallelism));
   }
 
-  @Override
-  public <O> HoodieData<T> distinctWithKey(SerializableFunction<T, O> 
keyGetter, int parallelism) {
-    return mapToPair(i -> Pair.of(keyGetter.apply(i), i))
-        .reduceByKey((value1, value2) -> value1, parallelism)
-        .values();
-  }
-
   @Override
   public HoodieData<T> filter(SerializableFunction<T, Boolean> filterFunc) {
     return HoodieJavaRDD.of(rddData.filter(filterFunc::apply));
@@ -154,7 +142,7 @@ public class HoodieJavaRDD<T> extends HoodieData<T> {
 
   @Override
   public HoodieData<T> union(HoodieData<T> other) {
-    return HoodieJavaRDD.of(rddData.union((JavaRDD<T>) other.get()));
+    return HoodieJavaRDD.of(rddData.union(((HoodieJavaRDD<T>) other).rddData));
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieBaseListData.java 
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieBaseListData.java
new file mode 100644
index 0000000000..398762dc63
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieBaseListData.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.data;
+
+import org.apache.hudi.common.util.Either;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public abstract class HoodieBaseListData<T> {
+
+  protected final Either<Stream<T>, List<T>> data;
+  protected final boolean lazy;
+
+  protected HoodieBaseListData(List<T> data, boolean lazy) {
+    this.data = lazy ? Either.left(data.stream().parallel()) : 
Either.right(data);
+    this.lazy = lazy;
+  }
+
+  protected HoodieBaseListData(Stream<T> dataStream, boolean lazy) {
+    // NOTE: In case this container is being instantiated by an eager parent, 
we have to
+    //       pre-materialize the stream
+    this.data = lazy ? Either.left(dataStream) : 
Either.right(dataStream.collect(Collectors.toList()));
+    this.lazy = lazy;
+  }
+
+  protected Stream<T> asStream() {
+    return lazy ? data.asLeft() : data.asRight().parallelStream();
+  }
+
+  protected boolean isEmpty() {
+    if (lazy) {
+      return data.asLeft().findAny().isPresent();
+    } else {
+      return data.asRight().isEmpty();
+    }
+  }
+
+  protected long count() {
+    if (lazy) {
+      return data.asLeft().count();
+    } else {
+      return data.asRight().size();
+    }
+  }
+
+  protected List<T> collectAsList() {
+    if (lazy) {
+      return data.asLeft().collect(Collectors.toList());
+    } else {
+      return data.asRight();
+    }
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java 
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
index 4b391ecbab..2d24e7dd12 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
@@ -21,108 +21,163 @@ package org.apache.hudi.common.data;
 
 import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.util.collection.Pair;
 
 import java.io.Serializable;
 import java.util.Iterator;
 import java.util.List;
 
 /**
- * An abstraction for a data collection of objects in type T to store the 
reference
- * and do transformation.
+ * An interface abstracting a container holding a collection of objects of 
type {@code T}
+ * allowing to perform common transformation on it.
  *
- * @param <T> type of object.
+ * This abstraction provides common API implemented by
+ * <ol>
+ *   <li>In-memory implementation ({@code HoodieListData}, {@code 
HoodieListPairData}), where all objects
+ *   are held in-memory by the executing process</li>
+ *   <li>RDD-based implementation ({@code HoodieJavaRDD}, etc)</li>, where 
underlying collection is held
+ *   by an RDD allowing to execute transformations using Spark engine on the 
cluster
+ * </ol>
+ *
+ * All implementations provide for consistent semantic, where
+ * <ul>
+ *   <li>All non-terminal* operations are executed lazily (for ex, {@code 
map}, {@code filter}, etc)</li>
+ *   <li>All terminal operations are executed eagerly, executing all 
previously accumulated transformations.
+ *   Note that, collection could not be re-used after invoking terminal 
operation on it.</li>
+ * </ul>
+ *
+ * @param <T> type of object
  */
-public abstract class HoodieData<T> implements Serializable {
+public interface HoodieData<T> extends Serializable {
+
   /**
-   * @return the collection of objects.
+   * Persists the data w/ provided {@code level} (if applicable)
    */
-  public abstract Object get();
+  void persist(String level);
 
   /**
-   * Caches the data.
-   *
-   * @param cacheConfig config value for caching.
+   * Un-persists the data (if previously persisted)
    */
-  public abstract void persist(String cacheConfig);
+  void unpersist();
 
   /**
-   * Removes the cached data.
+   * Returns whether the collection is empty.
    */
-  public abstract void unpersist();
+  boolean isEmpty();
 
   /**
-   * @return whether the collection is empty.
+   * Returns number of objects held in the collection
+   *
+   * NOTE: This is a terminal operation
    */
-  public abstract boolean isEmpty();
+  long count();
 
   /**
-   * @return the number of objects.
+   * Maps every element in the collection using provided mapping {@code func}.
+   *
+   * This is an intermediate operation
+   *
+   * @param func serializable map function
+   * @param <O>  output object type
+   * @return {@link HoodieData<O>} holding mapped elements
    */
-  public abstract long count();
+  <O> HoodieData<O> map(SerializableFunction<T, O> func);
 
   /**
-   * @param func serializable map function.
-   * @param <O>  output object type.
-   * @return {@link HoodieData<O>} containing the result. Actual execution may 
be deferred.
+   * Maps every element in the collection's partition (if applicable) by 
applying provided
+   * mapping {@code func} to every collection's partition
+   *
+   * This is an intermediate operation
+   *
+   * @param func                  serializable map function accepting {@link 
Iterator} of a single
+   *                              partition's elements and returning a new 
{@link Iterator} mapping
+   *                              every element of the partition into a new one
+   * @param preservesPartitioning whether to preserve partitioning in the 
resulting collection
+   * @param <O>                   output object type
+   * @return {@link HoodieData<O>} holding mapped elements
    */
-  public abstract <O> HoodieData<O> map(SerializableFunction<T, O> func);
+  <O> HoodieData<O> mapPartitions(SerializableFunction<Iterator<T>,
+      Iterator<O>> func, boolean preservesPartitioning);
 
   /**
-   * @param func                  serializable map function by taking a 
partition of objects
-   *                              and generating an iterator.
-   * @param preservesPartitioning whether to preserve partitions in the result.
-   * @param <O>                   output object type.
-   * @return {@link HoodieData<O>} containing the result. Actual execution may 
be deferred.
+   * Maps every element in the collection into a collection of the new 
elements (provided by
+   * {@link Iterator}) using provided mapping {@code func}, subsequently 
flattening the result
+   * (by concatenating) into a single collection
+   *
+   * This is an intermediate operation
+   *
+   * @param func serializable function mapping every element {@link T} into 
{@code Iterator<O>}
+   * @param <O>  output object type
+   * @return {@link HoodieData<O>} holding mapped elements
    */
-  public abstract <O> HoodieData<O> mapPartitions(
-      SerializableFunction<Iterator<T>, Iterator<O>> func, boolean 
preservesPartitioning);
+  <O> HoodieData<O> flatMap(SerializableFunction<T, Iterator<O>> func);
 
   /**
-   * @param func                  serializable map function by taking a 
partition of objects
-   *                              and generating an iterator.
-   * @param <O>                   output object type.
-   * @return {@link HoodieData<O>} containing the result. Actual execution may 
be deferred.
+   * Maps every element in the collection using provided mapping {@code func} 
into a {@link Pair<K, V>}
+   * of elements {@code K} and {@code V}
+   * <p>
+   * This is an intermediate operation
+   *
+   * @param func serializable map function
+   * @param <K>  key type of the pair
+   * @param <V>  value type of the pair
+   * @return {@link HoodiePairData<K, V>} holding mapped elements
    */
-  public abstract <O> HoodieData<O> mapPartitions(
-      SerializableFunction<Iterator<T>, Iterator<O>> func);
+  <K, V> HoodiePairData<K, V> mapToPair(SerializablePairFunction<T, K, V> 
func);
 
   /**
-   * @param func serializable flatmap function.
-   * @param <O>  output object type.
-   * @return {@link HoodieData<O>} containing the result. Actual execution may 
be deferred.
+   * Returns new {@link HoodieData} collection holding only distinct objects 
of the original one
+   *
+   * This is a stateful intermediate operation
    */
-  public abstract <O> HoodieData<O> flatMap(SerializableFunction<T, 
Iterator<O>> func);
+  HoodieData<T> distinct();
 
   /**
-   * @param mapToPairFunc serializable map function to generate a pair.
-   * @param <K>           key type of the pair.
-   * @param <V>           value type of the pair.
-   * @return {@link HoodiePairData<K, V>} containing the result. Actual 
execution may be deferred.
+   * Returns new {@link HoodieData} collection holding only distinct objects 
of the original one
+   *
+   * This is a stateful intermediate operation
    */
-  public abstract <K, V> HoodiePairData<K, V> 
mapToPair(SerializablePairFunction<T, K, V> mapToPairFunc);
+  HoodieData<T> distinct(int parallelism);
 
   /**
-   * @return distinct objects in {@link HoodieData}.
+   * Returns new instance of {@link HoodieData} collection only containing 
elements matching provided
+   * {@code filterFunc} (ie ones it returns true on)
+   *
+   * @param filterFunc filtering func either accepting or rejecting the 
elements
+   * @return {@link HoodieData<T>} holding filtered elements
    */
-  public abstract HoodieData<T> distinct();
+  HoodieData<T> filter(SerializableFunction<T, Boolean> filterFunc);
 
-  public abstract HoodieData<T> distinct(int parallelism);
-
-  public abstract <O> HoodieData<T> distinctWithKey(SerializableFunction<T, O> 
keyGetter, int parallelism);
-
-  public abstract HoodieData<T> filter(SerializableFunction<T, Boolean> 
filterFunc);
+  /**
+   * Unions {@link HoodieData} with another instance of {@link HoodieData}.
+   * Note that, it's only able to union same underlying collection 
implementations.
+   *
+   * This is a stateful intermediate operation
+   *
+   * @param other {@link HoodieData} collection
+   * @return {@link HoodieData<T>} holding superset of elements of this and 
{@code other} collections
+   */
+  HoodieData<T> union(HoodieData<T> other);
 
   /**
-   * Unions this {@link HoodieData} with other {@link HoodieData}.
-   * @param other {@link HoodieData} of interest.
-   * @return the union of two as as instance of {@link HoodieData}.
+   * Collects results of the underlying collection into a {@link List<T>}
+   *
+   * This is a terminal operation
    */
-  public abstract HoodieData<T> union(HoodieData<T> other);
+  List<T> collectAsList();
 
   /**
-   * @return collected results in {@link List<T>}.
+   * Re-partitions underlying collection (if applicable) making sure new 
{@link HoodieData} has
+   * exactly {@code parallelism} partitions
+   *
+   * @param parallelism target number of partitions in the underlying 
collection
+   * @return {@link HoodieData<T>} holding re-partitioned collection
    */
-  public abstract List<T> collectAsList();
+  HoodieData<T> repartition(int parallelism);
 
-  public abstract HoodieData<T> repartition(int parallelism);
+  default <O> HoodieData<T> distinctWithKey(SerializableFunction<T, O> 
keyGetter, int parallelism) {
+    return mapToPair(i -> Pair.of(keyGetter.apply(i), i))
+        .reduceByKey((value1, value2) -> value1, parallelism)
+        .values();
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java 
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java
deleted file mode 100644
index 28ed2e282d..0000000000
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi.common.data;
-
-import org.apache.hudi.common.function.SerializableFunction;
-import org.apache.hudi.common.function.SerializablePairFunction;
-import org.apache.hudi.common.util.collection.Pair;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import static 
org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper;
-import static 
org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper;
-
-/**
- * Holds a {@link List} of objects.
- *
- * @param <T> type of object.
- */
-public class HoodieList<T> extends HoodieData<T> {
-
-  private final List<T> listData;
-
-  private HoodieList(List<T> listData) {
-    this.listData = listData;
-  }
-
-  /**
-   * @param listData a {@link List} of objects in type T.
-   * @param <T>      type of object.
-   * @return a new instance containing the {@link List<T>} reference.
-   */
-  public static <T> HoodieList<T> of(List<T> listData) {
-    return new HoodieList<>(listData);
-  }
-
-  /**
-   * @param hoodieData {@link HoodieList <T>} instance containing the {@link 
List} of objects.
-   * @param <T>        type of object.
-   * @return the a {@link List} of objects in type T.
-   */
-  public static <T> List<T> getList(HoodieData<T> hoodieData) {
-    return ((HoodieList<T>) hoodieData).get();
-  }
-
-  @Override
-  public List<T> get() {
-    return listData;
-  }
-
-  @Override
-  public void persist(String cacheConfig) {
-    // No OP
-  }
-
-  @Override
-  public void unpersist() {
-    // No OP
-  }
-
-  @Override
-  public boolean isEmpty() {
-    return listData.isEmpty();
-  }
-
-  @Override
-  public long count() {
-    return listData.size();
-  }
-
-  @Override
-  public <O> HoodieData<O> map(SerializableFunction<T, O> func) {
-    return HoodieList.of(listData.stream().parallel()
-        .map(throwingMapWrapper(func)).collect(Collectors.toList()));
-  }
-
-  @Override
-  public <O> HoodieData<O> mapPartitions(SerializableFunction<Iterator<T>, 
Iterator<O>> func, boolean preservesPartitioning) {
-    return mapPartitions(func);
-  }
-
-  @Override
-  public <O> HoodieData<O> mapPartitions(SerializableFunction<Iterator<T>, 
Iterator<O>> func) {
-    List<O> result = new ArrayList<>();
-    
throwingMapWrapper(func).apply(listData.iterator()).forEachRemaining(result::add);
-    return HoodieList.of(result);
-  }
-
-  @Override
-  public <O> HoodieData<O> flatMap(SerializableFunction<T, Iterator<O>> func) {
-    Function<T, Iterator<O>> throwableFunc = throwingMapWrapper(func);
-    return HoodieList.of(listData.stream().flatMap(e -> {
-      List<O> result = new ArrayList<>();
-      Iterator<O> iterator = throwableFunc.apply(e);
-      iterator.forEachRemaining(result::add);
-      return result.stream();
-    }).collect(Collectors.toList()));
-  }
-
-  @Override
-  public <K, V> HoodiePairData<K, V> mapToPair(SerializablePairFunction<T, K, 
V> mapToPairFunc) {
-    Map<K, List<V>> mapOfPairs = new HashMap<>();
-    Function<T, Pair<K, V>> throwableMapToPairFunc = 
throwingMapToPairWrapper(mapToPairFunc);
-    listData.forEach(data -> {
-      Pair<K, V> pair = throwableMapToPairFunc.apply(data);
-      List<V> list = mapOfPairs.computeIfAbsent(pair.getKey(), k -> new 
ArrayList<>());
-      list.add(pair.getValue());
-    });
-    return HoodieMapPair.of(mapOfPairs);
-  }
-
-  @Override
-  public HoodieData<T> distinct() {
-    return HoodieList.of(new ArrayList<>(new HashSet<>(listData)));
-  }
-
-  @Override
-  public HoodieData<T> distinct(int parallelism) {
-    return distinct();
-  }
-
-  @Override
-  public <O> HoodieData<T> distinctWithKey(SerializableFunction<T, O> 
keyGetter, int parallelism) {
-    return mapToPair(i -> Pair.of(keyGetter.apply(i), i))
-        .reduceByKey((value1, value2) -> value1, parallelism)
-        .values();
-  }
-
-  @Override
-  public HoodieData<T> filter(SerializableFunction<T, Boolean> filterFunc) {
-    return HoodieList.of(listData
-        .stream()
-        .filter(i -> throwingMapWrapper(filterFunc).apply(i))
-        .collect(Collectors.toList()));
-  }
-
-  @Override
-  public HoodieData<T> union(HoodieData<T> other) {
-    List<T> unionResult = new ArrayList<>();
-    unionResult.addAll(listData);
-    unionResult.addAll(other.collectAsList());
-    return HoodieList.of(unionResult);
-  }
-
-  @Override
-  public List<T> collectAsList() {
-    return listData;
-  }
-
-  @Override
-  public HoodieData<T> repartition(int parallelism) {
-    // no op
-    return this;
-  }
-}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java 
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
new file mode 100644
index 0000000000..0be9ec9fa7
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.data;
+
+import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static 
org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper;
+import static 
org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper;
+
+/**
+ * In-memory implementation of {@link HoodieData} holding internally a {@link 
Stream} of objects.
+ *
+ * {@link HoodieListData} can have either of the 2 execution semantics:
+ *
+ * <ol>
+ *   <li>Eager: with every operation being executed right away</li>
+ *   <li>Lazy: with every operation being "stacked up", with it execution 
postponed until
+ *   "terminal" operation is invoked</li>
+ * </ol>
+ *
+ * NOTE: This is an in-memory counterpart for {@code HoodieJavaRDD}, and it 
strives to provide
+ *       similar semantic as RDD container -- all intermediate (non-terminal, 
not de-referencing
+ *       the stream like "collect", "groupBy", etc) operations are executed 
*lazily*.
+ *       This allows to make sure that compute/memory churn is minimal since 
only necessary
+ *       computations will ultimately be performed.
+ *
+ *       Please note, however, that while RDD container allows the same 
collection to be
+ *       de-referenced more than once (ie terminal operation invoked more than 
once),
+ *       {@link HoodieListData} allows that only when instantiated w/ an eager 
execution semantic.
+ *
+ * @param <T> type of object.
+ */
+public class HoodieListData<T> extends HoodieBaseListData<T> implements 
HoodieData<T> {
+
+  private HoodieListData(List<T> data, boolean lazy) {
+    super(data, lazy);
+  }
+
+  HoodieListData(Stream<T> dataStream, boolean lazy) {
+    super(dataStream, lazy);
+  }
+
+  /**
+   * Creates instance of {@link HoodieListData} bearing *eager* execution 
semantic
+   *
+   * @param listData a {@link List} of objects in type T
+   * @param <T>      type of object
+   * @return a new instance containing the {@link List<T>} reference
+   */
+  public static <T> HoodieListData<T> eager(List<T> listData) {
+    return new HoodieListData<>(listData, false);
+  }
+
+  /**
+   * Creates instance of {@link HoodieListData} bearing *lazy* execution 
semantic
+   *
+   * @param listData a {@link List} of objects in type T
+   * @param <T>      type of object
+   * @return a new instance containing the {@link List<T>} reference
+   */
+  public static <T> HoodieListData<T> lazy(List<T> listData) {
+    return new HoodieListData<>(listData, true);
+  }
+
+  @Override
+  public void persist(String level) {
+    // No OP
+  }
+
+  @Override
+  public void unpersist() {
+    // No OP
+  }
+
+  @Override
+  public <O> HoodieData<O> map(SerializableFunction<T, O> func) {
+    return new HoodieListData<>(asStream().map(throwingMapWrapper(func)), 
lazy);
+  }
+
+  @Override
+  public <O> HoodieData<O> mapPartitions(SerializableFunction<Iterator<T>, 
Iterator<O>> func, boolean preservesPartitioning) {
+    Function<Iterator<T>, Iterator<O>> mapper = throwingMapWrapper(func);
+    return new HoodieListData<>(
+        StreamSupport.stream(
+            Spliterators.spliteratorUnknownSize(
+                mapper.apply(asStream().iterator()), Spliterator.ORDERED), 
true),
+        lazy
+    );
+  }
+
+  @Override
+  public <O> HoodieData<O> flatMap(SerializableFunction<T, Iterator<O>> func) {
+    Function<T, Iterator<O>> mapper = throwingMapWrapper(func);
+    Stream<O> mappedStream = asStream().flatMap(e ->
+        StreamSupport.stream(
+            Spliterators.spliteratorUnknownSize(mapper.apply(e), 
Spliterator.ORDERED), true));
+    return new HoodieListData<>(mappedStream, lazy);
+  }
+
+  @Override
+  public <K, V> HoodiePairData<K, V> mapToPair(SerializablePairFunction<T, K, 
V> func) {
+    Function<T, Pair<K, V>> throwableMapToPairFunc = 
throwingMapToPairWrapper(func);
+    return new HoodieListPairData<>(asStream().map(throwableMapToPairFunc), 
lazy);
+  }
+
+  @Override
+  public HoodieData<T> distinct() {
+    return new HoodieListData<>(asStream().distinct(), lazy);
+  }
+
+  @Override
+  public HoodieData<T> distinct(int parallelism) {
+    return distinct();
+  }
+
+  @Override
+  public <O> HoodieData<T> distinctWithKey(SerializableFunction<T, O> 
keyGetter, int parallelism) {
+    return mapToPair(i -> Pair.of(keyGetter.apply(i), i))
+        .reduceByKey((value1, value2) -> value1, parallelism)
+        .values();
+  }
+
+  @Override
+  public HoodieData<T> filter(SerializableFunction<T, Boolean> filterFunc) {
+    return new HoodieListData<>(asStream().filter(r -> 
throwingMapWrapper(filterFunc).apply(r)), lazy);
+  }
+
+  @Override
+  public HoodieData<T> union(HoodieData<T> other) {
+    ValidationUtils.checkArgument(other instanceof HoodieListData);
+    return new HoodieListData<>(Stream.concat(asStream(), 
((HoodieListData<T>)other).asStream()), lazy);
+  }
+
+  @Override
+  public HoodieData<T> repartition(int parallelism) {
+    // no op
+    return this;
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return super.isEmpty();
+  }
+
+  @Override
+  public long count() {
+    return super.count();
+  }
+
+  @Override
+  public List<T> collectAsList() {
+    return super.collectAsList();
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java 
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java
new file mode 100644
index 0000000000..a389649548
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.data;
+
+import org.apache.hudi.common.function.SerializableBiFunction;
+import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper;
+import static 
org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper;
+
+/**
+ * In-memory implementation of {@link HoodiePairData} holding internally a 
{@link Stream} of {@link Pair}s.
+ *
+ * {@link HoodieListData} can have either of the 2 execution semantics:
+ *
+ * <ol>
+ *   <li>Eager: with every operation being executed right away</li>
+ *   <li>Lazy: with every operation being "stacked up", with it execution 
postponed until
+ *   "terminal" operation is invoked</li>
+ * </ol>
+ *
+ *
+ * NOTE: This is an in-memory counterpart for {@code HoodieJavaPairRDD}, and 
it strives to provide
+ *       similar semantic as RDD container -- all intermediate (non-terminal, 
not de-referencing
+ *       the stream like "collect", "groupBy", etc) operations are executed 
*lazily*.
+ *       This allows to make sure that compute/memory churn is minimal since 
only necessary
+ *       computations will ultimately be performed.
+ *
+ *       Please note, however, that while RDD container allows the same 
collection to be
+ *       de-referenced more than once (ie terminal operation invoked more than 
once),
+ *       {@link HoodieListData} allows that only when instantiated w/ an eager 
execution semantic.
+ *
+ * @param <K> type of the key in the pair
+ * @param <V> type of the value in the pair
+ */
+public class HoodieListPairData<K, V> extends HoodieBaseListData<Pair<K, V>> 
implements HoodiePairData<K, V> {
+
+  private HoodieListPairData(List<Pair<K, V>> data, boolean lazy) {
+    super(data, lazy);
+  }
+
+  HoodieListPairData(Stream<Pair<K, V>> dataStream, boolean lazy) {
+    super(dataStream, lazy);
+  }
+
+  @Override
+  public List<Pair<K, V>> get() {
+    return collectAsList();
+  }
+
+  @Override
+  public void persist(String cacheConfig) {
+    // no-op
+  }
+
+  @Override
+  public void unpersist() {
+    // no-op
+  }
+
+  @Override
+  public HoodieData<K> keys() {
+    return new HoodieListData<>(asStream().map(Pair::getKey), lazy);
+  }
+
+  @Override
+  public HoodieData<V> values() {
+    return new HoodieListData<>(asStream().map(Pair::getValue), lazy);
+  }
+
+  @Override
+  public Map<K, Long> countByKey() {
+    return asStream().collect(Collectors.groupingBy(Pair::getKey, 
Collectors.counting()));
+  }
+
+  @Override
+  public HoodiePairData<K, Iterable<V>> groupByKey() {
+    Collector<Pair<K, V>, ?, List<V>> mappingCollector = 
Collectors.mapping(Pair::getValue, Collectors.toList());
+    Collector<Pair<K, V>, ?, Map<K, List<V>>> groupingCollector =
+        Collectors.groupingBy(Pair::getKey, mappingCollector);
+
+    Map<K, List<V>> groupedByKey = asStream().collect(groupingCollector);
+    return new HoodieListPairData<>(
+        groupedByKey.entrySet().stream().map(e -> Pair.of(e.getKey(), 
e.getValue())),
+        lazy
+    );
+  }
+
+  @Override
+  public HoodiePairData<K, V> reduceByKey(SerializableBiFunction<V, V, V> 
combiner, int parallelism) {
+    Map<K, java.util.Optional<V>> reducedMap = asStream().collect(
+        Collectors.groupingBy(
+            Pair::getKey,
+            HashMap::new,
+            Collectors.mapping(Pair::getValue, 
Collectors.reducing(combiner::apply))));
+
+    return new HoodieListPairData<>(
+        reducedMap.entrySet()
+            .stream()
+            .map(e -> Pair.of(e.getKey(), e.getValue().orElse(null))),
+        lazy
+    );
+  }
+
+  @Override
+  public <O> HoodieData<O> map(SerializableFunction<Pair<K, V>, O> func) {
+    Function<Pair<K, V>, O> uncheckedMapper = throwingMapWrapper(func);
+    return new HoodieListData<>(asStream().map(uncheckedMapper), lazy);
+  }
+
+  @Override
+  public <L, W> HoodiePairData<L, W> 
mapToPair(SerializablePairFunction<Pair<K, V>, L, W> mapToPairFunc) {
+    return new HoodieListPairData<>(asStream().map(p -> 
throwingMapToPairWrapper(mapToPairFunc).apply(p)), lazy);
+  }
+
+  @Override
+  public <W> HoodiePairData<K, Pair<V, Option<W>>> 
leftOuterJoin(HoodiePairData<K, W> other) {
+    ValidationUtils.checkArgument(other instanceof HoodieListPairData);
+
+    // Transform right-side container to a multi-map of [[K]] to [[List<W>]] 
values
+    HashMap<K, List<W>> rightStreamMap = ((HoodieListPairData<K, W>) 
other).asStream().collect(
+        Collectors.groupingBy(
+            Pair::getKey,
+            HashMap::new,
+            Collectors.mapping(Pair::getValue, Collectors.toList())));
+
+    Stream<Pair<K, Pair<V, Option<W>>>> leftOuterJoined = 
asStream().flatMap(pair -> {
+      K key = pair.getKey();
+      V leftValue = pair.getValue();
+      List<W> rightValues = rightStreamMap.get(key);
+
+      if (rightValues == null) {
+        return Stream.of(Pair.of(key, Pair.of(leftValue, Option.empty())));
+      } else {
+        return rightValues.stream().map(rightValue ->
+            Pair.of(key, Pair.of(leftValue, Option.of(rightValue))));
+      }
+    });
+
+    return new HoodieListPairData<>(leftOuterJoined, lazy);
+  }
+
+  @Override
+  public long count() {
+    return super.count();
+  }
+
+  @Override
+  public List<Pair<K, V>> collectAsList() {
+    return super.collectAsList();
+  }
+
+  public static <K, V> HoodieListPairData<K, V> lazy(List<Pair<K, V>> data) {
+    return new HoodieListPairData<>(data, true);
+  }
+
+  public static <K, V> HoodieListPairData<K, V> eager(List<Pair<K, V>> data) {
+    return new HoodieListPairData<>(data, false);
+  }
+
+  public static <K, V> HoodieListPairData<K, V> lazy(Map<K, List<V>> data) {
+    return new HoodieListPairData<>(explode(data), true);
+  }
+
+  public static <K, V> HoodieListPairData<K, V> eager(Map<K, List<V>> data) {
+    return new HoodieListPairData<>(explode(data), false);
+  }
+
+  private static <K, V> Stream<Pair<K, V>> explode(Map<K, List<V>> data) {
+    return data.entrySet().stream()
+        .flatMap(e -> e.getValue().stream().map(v -> Pair.of(e.getKey(), v)));
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieMapPair.java 
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieMapPair.java
deleted file mode 100644
index 1e125c90a1..0000000000
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieMapPair.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi.common.data;
-
-import org.apache.hudi.common.function.FunctionWrapper;
-import org.apache.hudi.common.function.SerializableBiFunction;
-import org.apache.hudi.common.function.SerializableFunction;
-import org.apache.hudi.common.function.SerializablePairFunction;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.ImmutablePair;
-import org.apache.hudi.common.util.collection.Pair;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static 
org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper;
-
-/**
- * Implementation of {@link HoodiePairData} using Java {@link Map}.
- * The pairs are organized by the key in the Map and values for the same key
- * are stored in a list as the value corresponding to the key in the Map.
- *
- * @param <K> type of key.
- * @param <V> type of value.
- */
-public class HoodieMapPair<K, V> extends HoodiePairData<K, V> {
-
-  private final Map<K, List<V>> mapPairData;
-
-  private HoodieMapPair(Map<K, List<V>> mapPairData) {
-    this.mapPairData = mapPairData;
-  }
-
-  /**
-   * @param mapPairData a {@link Map} of pairs.
-   * @param <K>         type of key.
-   * @param <V>         type of value.
-   * @return a new instance containing the {@link Map<K, List<V>>} reference.
-   */
-  public static <K, V> HoodieMapPair<K, V> of(Map<K, List<V>> mapPairData) {
-    return new HoodieMapPair<>(mapPairData);
-  }
-
-  /**
-   * @param hoodiePairData {@link HoodieMapPair <K, V>} instance containing 
the {@link Map} of pairs.
-   * @param <K>            type of key.
-   * @param <V>            type of value.
-   * @return the {@link Map} of pairs.
-   */
-  public static <K, V> Map<K, List<V>> getMapPair(HoodiePairData<K, V> 
hoodiePairData) {
-    return ((HoodieMapPair<K, V>) hoodiePairData).get();
-  }
-
-  @Override
-  public Map<K, List<V>> get() {
-    return mapPairData;
-  }
-
-  @Override
-  public void persist(String cacheConfig) {
-    // No OP
-  }
-
-  @Override
-  public void unpersist() {
-    // No OP
-  }
-
-  @Override
-  public HoodieData<K> keys() {
-    return HoodieList.of(new ArrayList<>(mapPairData.keySet()));
-  }
-
-  @Override
-  public HoodieData<V> values() {
-    return HoodieList.of(
-        
mapPairData.values().stream().flatMap(List::stream).collect(Collectors.toList()));
-  }
-
-  @Override
-  public long count() {
-    return mapPairData.values().stream().map(
-        list -> (long) list.size()).reduce(Long::sum).orElse(0L);
-  }
-
-  @Override
-  public Map<K, Long> countByKey() {
-    return mapPairData.entrySet().stream().collect(
-        Collectors.toMap(Map.Entry::getKey, entry -> (long) 
entry.getValue().size()));
-  }
-
-  @Override
-  public HoodiePairData<K, V> reduceByKey(SerializableBiFunction<V, V, V> 
func, int parallelism) {
-    return HoodieMapPair.of(mapPairData.entrySet().stream()
-        .collect(Collectors.toMap(Map.Entry::getKey, e -> {
-          Option<V> reducedValue = 
Option.fromJavaOptional(e.getValue().stream().reduce(func::apply));
-          return reducedValue.isPresent() ? 
Collections.singletonList(reducedValue.get()) : Collections.emptyList();
-        })));
-  }
-
-  @Override
-  public <O> HoodieData<O> map(SerializableFunction<Pair<K, V>, O> func) {
-    Function<Pair<K, V>, O> throwableFunc = throwingMapWrapper(func);
-    return HoodieList.of(
-        streamAllPairs().map(throwableFunc).collect(Collectors.toList()));
-  }
-
-  @Override
-  public <L, W> HoodiePairData<L, W> 
mapToPair(SerializablePairFunction<Pair<K, V>, L, W> mapToPairFunc) {
-    Map<L, List<W>> newMap = new HashMap<>();
-    Function<Pair<K, V>, Pair<L, W>> throwableMapToPairFunc =
-        FunctionWrapper.throwingMapToPairWrapper(mapToPairFunc);
-    streamAllPairs().map(pair -> 
throwableMapToPairFunc.apply(pair)).forEach(newPair -> {
-      List<W> list = newMap.computeIfAbsent(newPair.getKey(), k -> new 
ArrayList<>());
-      list.add(newPair.getValue());
-    });
-    return HoodieMapPair.of(newMap);
-  }
-
-  @Override
-  public <W> HoodiePairData<K, Pair<V, Option<W>>> 
leftOuterJoin(HoodiePairData<K, W> other) {
-    Map<K, List<W>> otherMapPairData = HoodieMapPair.getMapPair(other);
-    Stream<ImmutablePair<K, ImmutablePair<V, Option<List<W>>>>> pairs = 
streamAllPairs()
-        .map(pair -> new ImmutablePair<>(pair.getKey(), new ImmutablePair<>(
-            pair.getValue(), 
Option.ofNullable(otherMapPairData.get(pair.getKey())))));
-    Map<K, List<Pair<V, Option<W>>>> resultMap = new HashMap<>();
-    pairs.forEach(pair -> {
-      K key = pair.getKey();
-      ImmutablePair<V, Option<List<W>>> valuePair = pair.getValue();
-      List<Pair<V, Option<W>>> resultList = resultMap.computeIfAbsent(key, k 
-> new ArrayList<>());
-      if (!valuePair.getRight().isPresent()) {
-        resultList.add(new ImmutablePair<>(valuePair.getLeft(), 
Option.empty()));
-      } else {
-        resultList.addAll(valuePair.getRight().get().stream().map(
-            w -> new ImmutablePair<>(valuePair.getLeft(), 
Option.of(w))).collect(Collectors.toList()));
-      }
-    });
-    return HoodieMapPair.of(resultMap);
-  }
-
-  private Stream<ImmutablePair<K, V>> streamAllPairs() {
-    return mapPairData.entrySet().stream().flatMap(
-        entry -> entry.getValue().stream().map(e -> new 
ImmutablePair<>(entry.getKey(), e)));
-  }
-}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java 
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java
index 9ff52793d6..49fa7174da 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 
 import java.io.Serializable;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -35,71 +36,92 @@ import java.util.Map;
  * @param <K> type of key.
  * @param <V> type of value.
  */
-public abstract class HoodiePairData<K, V> implements Serializable {
+public interface HoodiePairData<K, V> extends Serializable {
   /**
    * @return the collection of pairs.
    */
-  public abstract Object get();
+  Object get();
 
   /**
-   * Caches the data.
+   * Persists the data (if applicable)
    *
    * @param cacheConfig config value for caching.
    */
-  public abstract void persist(String cacheConfig);
+  void persist(String cacheConfig);
 
   /**
-   * Removes the cached data.
+   * Un-persists the data (if applicable)
    */
-  public abstract void unpersist();
+  void unpersist();
 
   /**
-   * @return all keys in {@link HoodieData}.
+   * Returns a {@link HoodieData} holding the key from every corresponding pair
    */
-  public abstract HoodieData<K> keys();
+  HoodieData<K> keys();
 
   /**
-   * @return all values in {@link HoodieData}.
+   * Returns a {@link HoodieData} holding the value from every corresponding 
pair
    */
-  public abstract HoodieData<V> values();
+  HoodieData<V> values();
 
   /**
-   * @return the number of pairs.
+   * Returns number of held pairs
    */
-  public abstract long count();
+  long count();
 
   /**
-   * @return the number of pairs per key in a {@link Map}.
+   * Counts the number of pairs grouping them by key
    */
-  public abstract Map<K, Long> countByKey();
+  Map<K, Long> countByKey();
 
-  public abstract HoodiePairData<K, V> reduceByKey(SerializableBiFunction<V, 
V, V> func, int parallelism);
+  /**
+   * Groups the values for each key in the dataset into a single sequence
+   */
+  HoodiePairData<K, Iterable<V>> groupByKey();
+
+  /**
+   * Reduces original sequence by de-duplicating the pairs w/ the same key, 
using provided
+   * binary operator {@code combiner}. Returns an instance of {@link 
HoodiePairData} holding
+   * the "de-duplicated" pairs, ie only pairs with unique keys.
+   *
+   * @param combiner method to combine values of the pairs with the same key
+   * @param parallelism target parallelism (if applicable)
+   */
+  HoodiePairData<K, V> reduceByKey(SerializableBiFunction<V, V, V> combiner, 
int parallelism);
 
   /**
    * @param func serializable map function.
    * @param <O>  output object type.
    * @return {@link HoodieData<O>} containing the result. Actual execution may 
be deferred.
    */
-  public abstract <O> HoodieData<O> map(SerializableFunction<Pair<K, V>, O> 
func);
+  <O> HoodieData<O> map(SerializableFunction<Pair<K, V>, O> func);
 
   /**
    * @param mapToPairFunc serializable map function to generate another pair.
    * @param <L>           new key type.
    * @param <W>           new value type.
-   * @return {@link HoodiePairData<L, W>} containing the result. Actual 
execution may be deferred.
+   * @return containing the result. Actual execution may be deferred.
    */
-  public abstract <L, W> HoodiePairData<L, W> mapToPair(
+  <L, W> HoodiePairData<L, W> mapToPair(
       SerializablePairFunction<Pair<K, V>, L, W> mapToPairFunc);
 
   /**
-   * Performs a left outer join of this and other. For each element (k, v) in 
this,
-   * the resulting HoodiePairData will either contain all pairs (k, (v, 
Some(w))) for w in other,
-   * or the pair (k, (v, None)) if no elements in other have key k.
+   * Performs a left outer join of this dataset against {@code other}.
+   *
+   * For each element (k, v) in this, the resulting {@link HoodiePairData} 
will either contain all
+   * pairs {@code (k, (v, Some(w)))} for every {@code w} in the {@code other}, 
or the pair {@code (k, (v, None))}
+   * if no elements in {@code other} have the pair w/ a key {@code k}
    *
    * @param other the other {@link HoodiePairData}
    * @param <W>   value type of the other {@link HoodiePairData}
-   * @return {@link HoodiePairData<K, Pair<V, Option<W>>>} containing the left 
outer join result.
-   * Actual execution may be deferred.
+   * @return containing the result of the left outer join
+   */
+  <W> HoodiePairData<K, Pair<V, Option<W>>> leftOuterJoin(HoodiePairData<K, W> 
other);
+
+  /**
+   * Collects results of the underlying collection into a {@link List<Pair<K, 
V>>}
+   *
+   * This is a terminal operation
    */
-  public abstract <W> HoodiePairData<K, Pair<V, Option<W>>> 
leftOuterJoin(HoodiePairData<K, W> other);
+  List<Pair<K, V>> collectAsList();
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
index c99430e284..5d7d193dc6 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
@@ -24,7 +24,7 @@ import 
org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.data.HoodieAccumulator;
 import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
 import org.apache.hudi.common.data.HoodieData;
-import org.apache.hudi.common.data.HoodieList;
+import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.function.SerializableBiFunction;
 import org.apache.hudi.common.function.SerializableConsumer;
 import org.apache.hudi.common.function.SerializableFunction;
@@ -71,12 +71,12 @@ public final class HoodieLocalEngineContext extends 
HoodieEngineContext {
 
   @Override
   public <T> HoodieData<T> emptyHoodieData() {
-    return HoodieList.of(Collections.emptyList());
+    return HoodieListData.eager(Collections.emptyList());
   }
 
   @Override
   public <T> HoodieData<T> parallelize(List<T> data, int parallelism) {
-    return HoodieList.of(data);
+    return HoodieListData.eager(data);
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
index 9040a04d5e..bb1ef72bea 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
@@ -34,6 +34,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.Spliterator;
 import java.util.Spliterators;
+import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
@@ -107,6 +108,19 @@ public class CollectionUtils {
     return combined;
   }
 
+  /**
+   * Combines provided {@link Map}s into one, returning new instance of {@link 
HashMap}.
+   *
+   * NOTE: That values associated with overlapping keys from the second map, 
will override
+   *       values from the first one
+   */
+  public static <K, V> HashMap<K, V> combine(Map<K, V> one, Map<K, V> another, 
BiFunction<V, V, V> merge) {
+    HashMap<K, V> combined = new HashMap<>(one.size() + another.size());
+    combined.putAll(one);
+    another.forEach((k, v) -> combined.merge(k, v, merge));
+    return combined;
+  }
+
   /**
    * Returns difference b/w {@code one} {@link Set} of elements and {@code 
another}
    */
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/Either.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/Either.java
new file mode 100644
index 0000000000..93accc4b75
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/Either.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import javax.annotation.Nonnull;
+
+import static org.apache.hudi.TypeUtils.unsafeCast;
+
+/**
+ * Utility that could hold exclusively only either of (hence the name):
+ * <ul>
+ *     <li>Non-null value of type {@link L}</li>
+ *     <li>Non-null value of type {@link R}</li>
+ * </ul>
+ *
+ * @param <L> type of the "left" potential element
+ * @param <R> type of the "right" potential element
+ */
+public abstract class Either<L, R> {
+
+  @Nonnull
+  protected abstract Object getValue();
+
+  public final boolean isLeft() {
+    return this instanceof EitherLeft;
+  }
+
+  public final boolean isRight() {
+    return this instanceof EitherRight;
+  }
+
+  public R asRight() {
+    ValidationUtils.checkArgument(isRight(), "Trying to access non-existent 
value of Either");
+    EitherRight<L, R> right = unsafeCast(this);
+    return right.getValue();
+  }
+
+  public L asLeft() {
+    ValidationUtils.checkArgument(isLeft(), "Trying to access non-existent 
value of Either");
+    EitherLeft<L, R> left = unsafeCast(this);
+    return left.getValue();
+  }
+
+  public static <L, R> Either<L, R> right(R right) {
+    return new EitherRight<>(right);
+  }
+
+  public static <L, R> Either<L, R> left(L left) {
+    return new EitherLeft<>(left);
+  }
+
+  public static class EitherRight<L, R> extends Either<L, R> {
+    private final R value;
+    private EitherRight(@Nonnull R right) {
+      this.value = right;
+    }
+
+    @Nonnull
+    @Override
+    protected R getValue() {
+      return value;
+    }
+  }
+
+  public static class EitherLeft<L, R> extends Either<L, R> {
+    private final L value;
+    private EitherLeft(@Nonnull L value) {
+      this.value = value;
+    }
+
+    @Nonnull
+    @Override
+    protected L getValue() {
+      return value;
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieList.java 
b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java
similarity index 64%
rename from 
hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieList.java
rename to 
hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java
index 6130d4af10..8da8be1338 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieList.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java
@@ -21,17 +21,19 @@ package org.apache.hudi.common.data;
 
 import org.apache.hudi.common.util.collection.Pair;
 
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
-class TestHoodieList {
+class TestHoodieListData {
 
   private static Stream<Arguments> distinctWithKey() {
     return Stream.of(
@@ -44,7 +46,22 @@ class TestHoodieList {
   @ParameterizedTest
   @MethodSource
   void distinctWithKey(List<Pair<String, Integer>> expected, List<Pair<String, 
Integer>> originalList) {
-    List<Pair<String, Integer>> distinctList = 
HoodieList.of(originalList).distinctWithKey(Pair::getLeft, 1).collectAsList();
+    List<Pair<String, Integer>> distinctList = 
HoodieListData.eager(originalList).distinctWithKey(Pair::getLeft, 
1).collectAsList();
     assertEquals(expected, distinctList);
   }
+
+  @Test
+  void testEagerSemantic() {
+    List<String> sourceList = Arrays.asList("quick", "brown", "fox");
+
+    HoodieListData<String> originalListData = HoodieListData.eager(sourceList);
+    HoodieData<Integer> lengthsListData = originalListData.map(String::length);
+
+    List<Integer> expectedLengths = 
sourceList.stream().map(String::length).collect(Collectors.toList());
+    assertEquals(expectedLengths, lengthsListData.collectAsList());
+    // Here we assert that even though we already de-referenced derivative 
container,
+    // we still can dereference its parent (multiple times)
+    assertEquals(3, originalListData.count());
+    assertEquals(sourceList, originalListData.collectAsList());
+  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieMapPair.java 
b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java
similarity index 75%
rename from 
hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieMapPair.java
rename to 
hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java
index 20e9a8f5d9..bb65909230 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieMapPair.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java
@@ -22,8 +22,7 @@ package org.apache.hudi.common.data;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
-
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -37,12 +36,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 import static org.apache.hudi.common.util.CollectionUtils.createImmutableList;
 import static org.apache.hudi.common.util.CollectionUtils.createImmutableMap;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
-public class TestHoodieMapPair {
+public class TestHoodieListDataPairData {
 
   private static final String KEY1 = "key1";
   private static final String KEY2 = "key2";
@@ -63,30 +63,30 @@ public class TestHoodieMapPair {
   private static final int INTEGER_VALUE4 = 4;
   private static final int INTEGER_VALUE5 = 5;
 
-  private static List<Pair<String, String>> TEST_PAIRS;
-  private static HoodiePairData<String, String> TEST_HOODIE_MAP_PAIR;
+  private List<Pair<String, String>> testPairs;
+  private HoodiePairData<String, String> testHoodiePairData;
 
-  @BeforeAll
-  public static void setup() {
-    TEST_PAIRS = constructPairs();
-    TEST_HOODIE_MAP_PAIR = constructTestMapPairData(TEST_PAIRS);
+  @BeforeEach
+  public void setup() {
+    testPairs = constructPairs();
+    testHoodiePairData = HoodieListPairData.lazy(testPairs);
   }
 
   @Test
   public void testKeys() {
-    assertHoodieDataEquals(Arrays.asList(KEY1, KEY2, KEY3, KEY4), 
TEST_HOODIE_MAP_PAIR.keys());
+    assertHoodieDataEquals(Arrays.asList(KEY1, KEY1, KEY2, KEY2, KEY3, KEY4), 
testHoodiePairData.keys());
   }
 
   @Test
   public void testValues() {
     assertHoodieDataEquals(Arrays.asList(
         STRING_VALUE1, STRING_VALUE2, STRING_VALUE3, STRING_VALUE4, 
STRING_VALUE5, STRING_VALUE6),
-        TEST_HOODIE_MAP_PAIR.values());
+        testHoodiePairData.values());
   }
 
   @Test
   public void testCount() {
-    assertEquals(6, TEST_HOODIE_MAP_PAIR.count());
+    assertEquals(6, testHoodiePairData.count());
   }
 
   @Test
@@ -97,14 +97,14 @@ public class TestHoodieMapPair {
     expectedResultMap.put(KEY3, 1L);
     expectedResultMap.put(KEY4, 1L);
 
-    assertEquals(expectedResultMap, TEST_HOODIE_MAP_PAIR.countByKey());
+    assertEquals(expectedResultMap, testHoodiePairData.countByKey());
   }
 
   @Test
   public void testMap() {
     assertHoodieDataEquals(Arrays.asList(
         "key1,value1", "key1,value2", "key2,value3", "key2,value4", 
"key3,value5", "key4,value6"),
-        TEST_HOODIE_MAP_PAIR.map(pair -> pair.getKey() + "," + 
pair.getValue()));
+        testHoodiePairData.map(pair -> pair.getKey() + "," + pair.getValue()));
   }
 
   @Test
@@ -114,8 +114,8 @@ public class TestHoodieMapPair {
     expectedResultMap.put("key20", Arrays.asList(3, 4));
     expectedResultMap.put("key30", Arrays.asList(5));
     expectedResultMap.put("key40", Arrays.asList(6));
-    assertEquals(expectedResultMap, HoodieMapPair.getMapPair(
-        TEST_HOODIE_MAP_PAIR.mapToPair(
+    assertEquals(expectedResultMap, toMap(
+        testHoodiePairData.mapToPair(
             pair -> {
               String value = pair.getValue();
               return new ImmutablePair<>(pair.getKey() + "0",
@@ -129,8 +129,7 @@ public class TestHoodieMapPair {
             createImmutableMap(
                 Pair.of(1, createImmutableList(1001)),
                 Pair.of(2, createImmutableList(2001)),
-                Pair.of(3, createImmutableList(3001)),
-                Pair.of(4, createImmutableList())),
+                Pair.of(3, createImmutableList(3001))),
             createImmutableMap(
                 Pair.of(1, createImmutableList(1001, 1002, 1003)),
                 Pair.of(2, createImmutableList(2001, 2002)),
@@ -142,20 +141,20 @@ public class TestHoodieMapPair {
   @ParameterizedTest
   @MethodSource
   public void testReduceByKey(Map<Integer, List<Integer>> expected, 
Map<Integer, List<Integer>> original) {
-    HoodiePairData<Integer, Integer> reduced = 
HoodieMapPair.of(original).reduceByKey((a, b) -> a, 1);
-    assertEquals(expected, HoodieMapPair.getMapPair(reduced));
+    HoodiePairData<Integer, Integer> reduced = 
HoodieListPairData.lazy(original).reduceByKey((a, b) -> a, 1);
+    assertEquals(expected, toMap(reduced));
   }
 
   @Test
   public void testLeftOuterJoinSingleValuePerKey() {
-    HoodiePairData<String, String> pairData1 = 
constructTestMapPairData(Arrays.asList(
+    HoodiePairData<String, String> pairData1 = 
HoodieListPairData.lazy(Arrays.asList(
         ImmutablePair.of(KEY1, STRING_VALUE1),
         ImmutablePair.of(KEY2, STRING_VALUE2),
         ImmutablePair.of(KEY3, STRING_VALUE3),
         ImmutablePair.of(KEY4, STRING_VALUE4)
     ));
 
-    HoodiePairData<String, Integer> pairData2 = 
constructTestMapPairData(Arrays.asList(
+    HoodiePairData<String, Integer> pairData2 = 
HoodieListPairData.lazy(Arrays.asList(
         ImmutablePair.of(KEY1, INTEGER_VALUE1),
         ImmutablePair.of(KEY2, INTEGER_VALUE2),
         ImmutablePair.of(KEY5, INTEGER_VALUE3)
@@ -172,12 +171,12 @@ public class TestHoodieMapPair {
         ImmutablePair.of(STRING_VALUE4, Option.empty())));
 
     assertEquals(expectedResultMap,
-        HoodieMapPair.getMapPair(pairData1.leftOuterJoin(pairData2)));
+        toMap(pairData1.leftOuterJoin(pairData2)));
   }
 
   @Test
   public void testLeftOuterJoinMultipleValuesPerKey() {
-    HoodiePairData<String, Integer> otherPairData = 
constructTestMapPairData(Arrays.asList(
+    HoodiePairData<String, Integer> otherPairData = 
HoodieListPairData.lazy(Arrays.asList(
         ImmutablePair.of(KEY1, INTEGER_VALUE1),
         ImmutablePair.of(KEY2, INTEGER_VALUE2),
         ImmutablePair.of(KEY2, INTEGER_VALUE3),
@@ -200,7 +199,25 @@ public class TestHoodieMapPair {
         ImmutablePair.of(STRING_VALUE6, Option.empty())));
 
     assertEquals(expectedResultMap,
-        
HoodieMapPair.getMapPair(TEST_HOODIE_MAP_PAIR.leftOuterJoin(otherPairData)));
+        toMap(testHoodiePairData.leftOuterJoin(otherPairData)));
+  }
+
+  @Test
+  void testEagerSemantic() {
+    List<Pair<String, Integer>> sourceList =
+        Stream.of("quick", "brown", "fox")
+            .map(s -> Pair.of(s, s.length()))
+            .collect(Collectors.toList());
+
+    HoodieListPairData<String, Integer> originalListData = 
HoodieListPairData.eager(sourceList);
+    HoodieData<Integer> lengthsListData = originalListData.values();
+
+    List<Integer> expectedLengths = 
sourceList.stream().map(Pair::getValue).collect(Collectors.toList());
+    assertEquals(expectedLengths, lengthsListData.collectAsList());
+    // Here we assert that even though we already de-referenced derivative 
container,
+    // we still can dereference its parent (multiple times)
+    assertEquals(3, originalListData.count());
+    assertEquals(sourceList, originalListData.collectAsList());
   }
 
   private static List<Pair<String, String>> constructPairs() {
@@ -214,11 +231,14 @@ public class TestHoodieMapPair {
     );
   }
 
-  private static <V> HoodiePairData<String, V> constructTestMapPairData(
-      final List<Pair<String, V>> pairs) {
-    Map<String, List<V>> map = new HashMap<>();
-    addPairsToMap(map, pairs);
-    return HoodieMapPair.of(map);
+  private static <K,V> Map<K, List<V>> toMap(HoodiePairData<K, V> pairData) {
+    return ((List<Pair<K, Iterable<V>>>) pairData.groupByKey().get()).stream()
+        .collect(
+            Collectors.toMap(
+                p -> p.getKey(),
+                p -> StreamSupport.stream(p.getValue().spliterator(), 
false).collect(Collectors.toList())
+            )
+        );
   }
 
   private static <V> void addPairsToMap(
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
index 4eed1c4ef5..8dadd2e2dc 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
@@ -20,7 +20,7 @@ package org.apache.hudi.sink.compact;
 
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.data.HoodieList;
+import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
@@ -158,7 +158,7 @@ public class CompactionCommitSink extends 
CleanFunction<CompactionCommitEvent> {
         .collect(Collectors.toList());
 
     HoodieCommitMetadata metadata = 
CompactHelpers.getInstance().createCompactionMetadata(
-        table, instant, HoodieList.of(statuses), 
writeClient.getConfig().getSchema());
+        table, instant, HoodieListData.eager(statuses), 
writeClient.getConfig().getSchema());
 
     // commit the compaction
     this.writeClient.commitCompaction(instant, metadata, Option.empty());

Reply via email to