This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4bda6afe0b [HUDI-4249] Fixing in-memory `HoodieData` implementation to
operate lazily (#5855)
4bda6afe0b is described below
commit 4bda6afe0b2858d31337fe42979929523ba07f6d
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Sat Jul 16 16:26:48 2022 -0700
[HUDI-4249] Fixing in-memory `HoodieData` implementation to operate lazily
(#5855)
---
.../apache/hudi/index/bloom/HoodieBloomIndex.java | 1 -
.../bloom/ListBasedHoodieBloomIndexHelper.java | 3 +-
.../hudi/index/bucket/HoodieBucketIndex.java | 3 +-
.../apache/hudi/client/HoodieFlinkWriteClient.java | 5 +-
.../client/common/HoodieFlinkEngineContext.java | 6 +-
.../org/apache/hudi/index/FlinkHoodieIndex.java | 8 +-
.../FlinkHoodieBackedTableMetadataWriter.java | 3 +-
.../org/apache/hudi/table/HoodieFlinkTable.java | 4 +-
.../table/action/commit/FlinkDeleteHelper.java | 5 +-
.../hudi/table/action/commit/FlinkWriteHelper.java | 5 +-
.../index/bloom/TestFlinkHoodieBloomIndex.java | 11 +-
.../testutils/HoodieFlinkClientTestHarness.java | 5 +-
.../apache/hudi/client/HoodieJavaWriteClient.java | 5 +-
.../run/strategy/JavaExecutionStrategy.java | 4 +-
.../client/common/HoodieJavaEngineContext.java | 6 +-
.../org/apache/hudi/index/JavaHoodieIndex.java | 8 +-
.../org/apache/hudi/table/HoodieJavaTable.java | 4 +-
.../commit/BaseJavaCommitActionExecutor.java | 8 +-
.../hudi/table/action/commit/JavaDeleteHelper.java | 5 +-
.../hudi/table/action/commit/JavaWriteHelper.java | 5 +-
.../org/apache/hudi/data/HoodieJavaPairRDD.java | 17 +-
.../java/org/apache/hudi/data/HoodieJavaRDD.java | 32 +---
.../hudi/common/data/HoodieBaseListData.java | 72 ++++++++
.../org/apache/hudi/common/data/HoodieData.java | 167 +++++++++++------
.../org/apache/hudi/common/data/HoodieList.java | 178 ------------------
.../apache/hudi/common/data/HoodieListData.java | 182 +++++++++++++++++++
.../hudi/common/data/HoodieListPairData.java | 201 +++++++++++++++++++++
.../org/apache/hudi/common/data/HoodieMapPair.java | 168 -----------------
.../apache/hudi/common/data/HoodiePairData.java | 70 ++++---
.../common/engine/HoodieLocalEngineContext.java | 6 +-
.../apache/hudi/common/util/CollectionUtils.java | 14 ++
.../java/org/apache/hudi/common/util/Either.java | 93 ++++++++++
...TestHoodieList.java => TestHoodieListData.java} | 21 ++-
...apPair.java => TestHoodieListDataPairData.java} | 80 +++++---
.../hudi/sink/compact/CompactionCommitSink.java | 4 +-
35 files changed, 861 insertions(+), 548 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
index aeefe6c304..1417e40a9f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
@@ -40,7 +40,6 @@ import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.io.HoodieRangeInfoHandle;
import org.apache.hudi.table.HoodieTable;
-
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java
index c42d80c62e..cffee5ee74 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java
@@ -20,7 +20,6 @@
package org.apache.hudi.index.bloom;
import org.apache.hudi.common.data.HoodieData;
-import org.apache.hudi.common.data.HoodieList;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
@@ -60,7 +59,7 @@ public class ListBasedHoodieBloomIndexHelper extends
BaseHoodieBloomIndexHelper
HoodieData<Pair<String, HoodieKey>> fileComparisonPairs,
Map<String, List<BloomIndexFileInfo>> partitionToFileInfo, Map<String,
Long> recordsPerPartition) {
List<Pair<String, HoodieKey>> fileComparisonPairList =
- HoodieList.getList(fileComparisonPairs).stream()
+ fileComparisonPairs.collectAsList().stream()
.sorted(Comparator.comparing(Pair::getLeft)).collect(toList());
List<HoodieKeyLookupResult> keyLookupResults = new ArrayList<>();
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
index c3584d234a..cbb3b07f44 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
@@ -83,7 +83,8 @@ public abstract class HoodieBucketIndex extends
HoodieIndex<Object, Object> {
Option<HoodieRecordLocation> loc =
mapper.getRecordLocation(record.getKey(), record.getPartitionPath());
return HoodieIndexUtils.getTaggedRecord(record, loc);
}
- }
+ },
+ false
);
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index ac75997ea5..dce9245860 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -20,7 +20,7 @@ package org.apache.hudi.client;
import org.apache.hudi.async.AsyncCleanerService;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
-import org.apache.hudi.common.data.HoodieList;
+import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
@@ -127,8 +127,7 @@ public class HoodieFlinkWriteClient<T extends
HoodieRecordPayload> extends
// Create a Hoodie table which encapsulated the commits and files visible
HoodieFlinkTable<T> table = getHoodieTable();
Timer.Context indexTimer = metrics.getIndexCtx();
- List<HoodieRecord<T>> recordsWithLocation = HoodieList.getList(
- getIndex().tagLocation(HoodieList.of(hoodieRecords), context, table));
+ List<HoodieRecord<T>> recordsWithLocation =
getIndex().tagLocation(HoodieListData.eager(hoodieRecords), context,
table).collectAsList();
metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer
== null ? 0L : indexTimer.stop()));
return recordsWithLocation.stream().filter(v1 ->
!v1.isCurrentLocationKnown()).collect(Collectors.toList());
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
index 62f8d4fa03..76ca606009 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
@@ -23,7 +23,7 @@ import
org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieAccumulator;
import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
import org.apache.hudi.common.data.HoodieData;
-import org.apache.hudi.common.data.HoodieList;
+import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
@@ -84,12 +84,12 @@ public class HoodieFlinkEngineContext extends
HoodieEngineContext {
@Override
public <T> HoodieData<T> emptyHoodieData() {
- return HoodieList.of(Collections.emptyList());
+ return HoodieListData.eager(Collections.emptyList());
}
@Override
public <T> HoodieData<T> parallelize(List<T> data, int parallelism) {
- return HoodieList.of(data);
+ return HoodieListData.eager(data);
}
public RuntimeContext getRuntimeContext() {
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndex.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndex.java
index 66c1b07793..be2273a840 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndex.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndex.java
@@ -23,7 +23,7 @@ import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
-import org.apache.hudi.common.data.HoodieList;
+import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -61,8 +61,8 @@ public abstract class FlinkHoodieIndex<T extends
HoodieRecordPayload> extends Ho
public <R> HoodieData<HoodieRecord<R>> tagLocation(
HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
HoodieTable hoodieTable) throws HoodieIndexException {
- List<HoodieRecord<T>> hoodieRecords =
tagLocation(HoodieList.getList(records.map(record -> (HoodieRecord<T>)
record)), context, hoodieTable);
- return HoodieList.of(hoodieRecords.stream().map(r -> (HoodieRecord<R>)
r).collect(Collectors.toList()));
+ List<HoodieRecord<T>> hoodieRecords = tagLocation(records.map(record ->
(HoodieRecord<T>) record).collectAsList(), context, hoodieTable);
+ return HoodieListData.eager(hoodieRecords.stream().map(r ->
(HoodieRecord<R>) r).collect(Collectors.toList()));
}
@Override
@@ -70,6 +70,6 @@ public abstract class FlinkHoodieIndex<T extends
HoodieRecordPayload> extends Ho
public HoodieData<WriteStatus> updateLocation(
HoodieData<WriteStatus> writeStatuses, HoodieEngineContext context,
HoodieTable hoodieTable) throws HoodieIndexException {
- return HoodieList.of(updateLocation(HoodieList.getList(writeStatuses),
context, hoodieTable));
+ return HoodieListData.eager(updateLocation(writeStatuses.collectAsList(),
context, hoodieTable));
}
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index 222ff78edc..81df5a9361 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -21,7 +21,6 @@ package org.apache.hudi.metadata;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
-import org.apache.hudi.common.data.HoodieList;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.HoodieRecord;
@@ -106,7 +105,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
ValidationUtils.checkState(enabled, "Metadata table cannot be committed to
as it is not enabled");
ValidationUtils.checkState(metadataMetaClient != null, "Metadata table is
not fully initialized yet.");
HoodieData<HoodieRecord> preppedRecords = prepRecords(partitionRecordsMap);
- List<HoodieRecord> preppedRecordList = HoodieList.getList(preppedRecords);
+ List<HoodieRecord> preppedRecordList = preppedRecords.collectAsList();
try (HoodieFlinkWriteClient writeClient = new
HoodieFlinkWriteClient(engineContext, metadataWriteConfig)) {
if (canTriggerTableService) {
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
index 26149918c6..3431c7334a 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
@@ -40,8 +40,6 @@ import org.apache.avro.specific.SpecificRecordBase;
import java.util.List;
-import static org.apache.hudi.common.data.HoodieList.getList;
-
public abstract class HoodieFlinkTable<T extends HoodieRecordPayload>
extends HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>>
implements ExplicitWriteHandleTable<T> {
@@ -78,7 +76,7 @@ public abstract class HoodieFlinkTable<T extends
HoodieRecordPayload>
public static HoodieWriteMetadata<List<WriteStatus>> convertMetadata(
HoodieWriteMetadata<HoodieData<WriteStatus>> metadata) {
- return metadata.clone(getList(metadata.getWriteStatuses()));
+ return metadata.clone(metadata.getWriteStatuses().collectAsList());
}
@Override
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteHelper.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteHelper.java
index 8dd0c99bae..9540354e10 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteHelper.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteHelper.java
@@ -19,7 +19,7 @@
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.data.HoodieList;
+import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -97,8 +97,7 @@ public class FlinkDeleteHelper<R> extends
dedupedKeys.stream().map(key -> new HoodieAvroRecord<>(key, new
EmptyHoodieRecordPayload())).collect(Collectors.toList());
Instant beginTag = Instant.now();
// perform index look up to get existing location of records
- List<HoodieRecord<EmptyHoodieRecordPayload>> taggedRecords =
HoodieList.getList(
- table.getIndex().tagLocation(HoodieList.of(dedupedRecords), context,
table));
+ List<HoodieRecord<EmptyHoodieRecordPayload>> taggedRecords =
table.getIndex().tagLocation(HoodieListData.eager(dedupedRecords), context,
table).collectAsList();
Duration tagLocationDuration = Duration.between(beginTag, Instant.now());
// filter out non existent keys/records
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
index 9c17e77b91..57e5aa9ad5 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
@@ -19,7 +19,7 @@
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.data.HoodieList;
+import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
@@ -83,8 +83,7 @@ public class FlinkWriteHelper<T extends HoodieRecordPayload,
R> extends BaseWrit
@Override
protected List<HoodieRecord<T>> tag(List<HoodieRecord<T>> dedupedRecords,
HoodieEngineContext context, HoodieTable<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>> table) {
- return HoodieList.getList(
- table.getIndex().tagLocation(HoodieList.of(dedupedRecords), context,
table));
+ return table.getIndex().tagLocation(HoodieListData.eager(dedupedRecords),
context, table).collectAsList();
}
@Override
diff --git
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
index e23ee4ad58..b62e09eb27 100644
---
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
+++
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
@@ -18,11 +18,12 @@
package org.apache.hudi.index.bloom;
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
-import org.apache.hudi.common.data.HoodieList;
-import org.apache.hudi.common.data.HoodieMapPair;
+import org.apache.hudi.common.data.HoodieListPairData;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -37,9 +38,6 @@ import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieFlinkClientTestHarness;
import org.apache.hudi.testutils.HoodieFlinkWriteableTestTable;
-
-import org.apache.avro.Schema;
-import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -185,8 +183,7 @@ public class TestFlinkHoodieBloomIndex extends
HoodieFlinkClientTestHarness {
partitionRecordKeyMap.put(t.getLeft(), recordKeyList);
});
- List<Pair<String, HoodieKey>> comparisonKeyList = HoodieList.getList(
- index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo,
HoodieMapPair.of(partitionRecordKeyMap)));
+ List<Pair<String, HoodieKey>> comparisonKeyList =
index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo,
HoodieListPairData.lazy(partitionRecordKeyMap)).collectAsList();
assertEquals(10, comparisonKeyList.size());
java.util.Map<String, List<String>> recordKeyToFileComps =
comparisonKeyList.stream()
diff --git
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkClientTestHarness.java
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkClientTestHarness.java
index 054a363168..dfbb3336b9 100644
---
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkClientTestHarness.java
+++
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkClientTestHarness.java
@@ -21,7 +21,8 @@ package org.apache.hudi.testutils;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
-import org.apache.hudi.common.data.HoodieList;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
@@ -133,7 +134,7 @@ public class HoodieFlinkClientTestHarness extends
HoodieCommonTestHarness implem
protected List<HoodieRecord> tagLocation(
HoodieIndex index, List<HoodieRecord> records, HoodieTable table) {
- return HoodieList.getList(index.tagLocation(HoodieList.of(records),
context, table));
+ return ((HoodieData<HoodieRecord>)
index.tagLocation(HoodieListData.eager(records), context,
table)).collectAsList();
}
/**
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
index fbfb85bab3..b6951bc6b7 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
@@ -20,7 +20,7 @@ package org.apache.hudi.client;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
-import org.apache.hudi.common.data.HoodieList;
+import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
@@ -67,8 +67,7 @@ public class HoodieJavaWriteClient<T extends
HoodieRecordPayload> extends
// Create a Hoodie table which encapsulated the commits and files visible
HoodieJavaTable<T> table = HoodieJavaTable.create(config,
(HoodieJavaEngineContext) context);
Timer.Context indexTimer = metrics.getIndexCtx();
- List<HoodieRecord<T>> recordsWithLocation = HoodieList.getList(
- getIndex().tagLocation(HoodieList.of(hoodieRecords), context, table));
+ List<HoodieRecord<T>> recordsWithLocation =
getIndex().tagLocation(HoodieListData.eager(hoodieRecords), context,
table).collectAsList();
metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer
== null ? 0L : indexTimer.stop()));
return recordsWithLocation.stream().filter(v1 ->
!v1.isCurrentLocationKnown()).collect(Collectors.toList());
}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
index adcbb874e8..456bb3cb47 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
@@ -25,7 +25,7 @@ import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.JavaTaskContextSupplier;
import org.apache.hudi.common.data.HoodieData;
-import org.apache.hudi.common.data.HoodieList;
+import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -94,7 +94,7 @@ public abstract class JavaExecutionStrategy<T extends
HoodieRecordPayload<T>>
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false),
instantTime)));
HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = new
HoodieWriteMetadata<>();
- writeMetadata.setWriteStatuses(HoodieList.of(writeStatusList));
+ writeMetadata.setWriteStatuses(HoodieListData.eager(writeStatusList));
return writeMetadata;
}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
index 22d4ccabcd..2211c8a103 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
@@ -24,7 +24,7 @@ import
org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieAccumulator;
import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
import org.apache.hudi.common.data.HoodieData;
-import org.apache.hudi.common.data.HoodieList;
+import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
@@ -74,12 +74,12 @@ public class HoodieJavaEngineContext extends
HoodieEngineContext {
@Override
public <T> HoodieData<T> emptyHoodieData() {
- return HoodieList.of(Collections.emptyList());
+ return HoodieListData.eager(Collections.emptyList());
}
@Override
public <T> HoodieData<T> parallelize(List<T> data, int parallelism) {
- return HoodieList.of(data);
+ return HoodieListData.eager(data);
}
@Override
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndex.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndex.java
index dd64859cad..dcc9d050dc 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndex.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndex.java
@@ -23,7 +23,7 @@ import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
-import org.apache.hudi.common.data.HoodieList;
+import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -58,8 +58,8 @@ public abstract class JavaHoodieIndex<T extends
HoodieRecordPayload> extends Hoo
public <R> HoodieData<HoodieRecord<R>> tagLocation(
HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
HoodieTable hoodieTable) throws HoodieIndexException {
- List<HoodieRecord<T>> hoodieRecords =
tagLocation(HoodieList.getList(records.map(record -> (HoodieRecord<T>)
record)), context, hoodieTable);
- return HoodieList.of(hoodieRecords.stream().map(r -> (HoodieRecord<R>)
r).collect(Collectors.toList()));
+ List<HoodieRecord<T>> hoodieRecords = tagLocation(records.map(record ->
(HoodieRecord<T>) record).collectAsList(), context, hoodieTable);
+ return HoodieListData.eager(hoodieRecords.stream().map(r ->
(HoodieRecord<R>) r).collect(Collectors.toList()));
}
@Override
@@ -67,6 +67,6 @@ public abstract class JavaHoodieIndex<T extends
HoodieRecordPayload> extends Hoo
public HoodieData<WriteStatus> updateLocation(
HoodieData<WriteStatus> writeStatuses, HoodieEngineContext context,
HoodieTable hoodieTable) throws HoodieIndexException {
- return HoodieList.of(updateLocation(HoodieList.getList(writeStatuses),
context, hoodieTable));
+ return HoodieListData.eager(updateLocation(writeStatuses.collectAsList(),
context, hoodieTable));
}
}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
index f9c7caff6e..3c878cbc14 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
@@ -36,8 +36,6 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
import java.util.List;
-import static org.apache.hudi.common.data.HoodieList.getList;
-
public abstract class HoodieJavaTable<T extends HoodieRecordPayload>
extends HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>> {
protected HoodieJavaTable(HoodieWriteConfig config, HoodieEngineContext
context, HoodieTableMetaClient metaClient) {
@@ -67,7 +65,7 @@ public abstract class HoodieJavaTable<T extends
HoodieRecordPayload>
public static HoodieWriteMetadata<List<WriteStatus>> convertMetadata(
HoodieWriteMetadata<HoodieData<WriteStatus>> metadata) {
- return metadata.clone(getList(metadata.getWriteStatuses()));
+ return metadata.clone(metadata.getWriteStatuses().collectAsList());
}
@Override
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
index dc6994d315..22c90eb8bb 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
@@ -19,7 +19,7 @@
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.data.HoodieList;
+import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -130,8 +130,7 @@ public abstract class BaseJavaCommitActionExecutor<T
extends HoodieRecordPayload
protected List<WriteStatus> updateIndex(List<WriteStatus> writeStatuses,
HoodieWriteMetadata<List<WriteStatus>> result) {
Instant indexStartTime = Instant.now();
// Update the index back
- List<WriteStatus> statuses = HoodieList.getList(
- table.getIndex().updateLocation(HoodieList.of(writeStatuses), context,
table));
+ List<WriteStatus> statuses =
table.getIndex().updateLocation(HoodieListData.eager(writeStatuses), context,
table).collectAsList();
result.setIndexUpdateDuration(Duration.between(indexStartTime,
Instant.now()));
result.setWriteStatuses(statuses);
return statuses;
@@ -339,8 +338,7 @@ public abstract class BaseJavaCommitActionExecutor<T
extends HoodieRecordPayload
public void updateIndexAndCommitIfNeeded(List<WriteStatus> writeStatuses,
HoodieWriteMetadata result) {
Instant indexStartTime = Instant.now();
// Update the index back
- List<WriteStatus> statuses = HoodieList.getList(
- table.getIndex().updateLocation(HoodieList.of(writeStatuses), context,
table));
+ List<WriteStatus> statuses =
table.getIndex().updateLocation(HoodieListData.eager(writeStatuses), context,
table).collectAsList();
result.setIndexUpdateDuration(Duration.between(indexStartTime,
Instant.now()));
result.setWriteStatuses(statuses);
result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(result));
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteHelper.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteHelper.java
index f82c1c561b..57d796c925 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteHelper.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteHelper.java
@@ -19,7 +19,7 @@
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.data.HoodieList;
+import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -99,8 +99,7 @@ public class JavaDeleteHelper<R> extends
dedupedKeys.stream().map(key -> new HoodieAvroRecord<>(key, new
EmptyHoodieRecordPayload())).collect(Collectors.toList());
Instant beginTag = Instant.now();
// perform index look up to get existing location of records
- List<HoodieRecord<EmptyHoodieRecordPayload>> taggedRecords =
HoodieList.getList(
- table.getIndex().tagLocation(HoodieList.of(dedupedRecords), context,
table));
+ List<HoodieRecord<EmptyHoodieRecordPayload>> taggedRecords =
table.getIndex().tagLocation(HoodieListData.eager(dedupedRecords), context,
table).collectAsList();
Duration tagLocationDuration = Duration.between(beginTag, Instant.now());
// filter out non existent keys/records
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
index 3a1fa4b884..4504a9bdcc 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
@@ -19,7 +19,7 @@
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.data.HoodieList;
+import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
@@ -50,8 +50,7 @@ public class JavaWriteHelper<T extends HoodieRecordPayload,R>
extends BaseWriteH
@Override
protected List<HoodieRecord<T>> tag(List<HoodieRecord<T>> dedupedRecords,
HoodieEngineContext context, HoodieTable<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>> table) {
- return HoodieList.getList(
- table.getIndex().tagLocation(HoodieList.of(dedupedRecords), context,
table));
+ return table.getIndex().tagLocation(HoodieListData.eager(dedupedRecords),
context, table).collectAsList();
}
@Override
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java
index ddcaaec0fa..9ec3c4cf71 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.storage.StorageLevel;
+import java.util.List;
import java.util.Map;
import scala.Tuple2;
@@ -41,7 +42,7 @@ import scala.Tuple2;
* @param <K> type of key.
* @param <V> type of value.
*/
-public class HoodieJavaPairRDD<K, V> extends HoodiePairData<K, V> {
+public class HoodieJavaPairRDD<K, V> implements HoodiePairData<K, V> {
private final JavaPairRDD<K, V> pairRDDData;
@@ -105,8 +106,13 @@ public class HoodieJavaPairRDD<K, V> extends
HoodiePairData<K, V> {
}
@Override
- public HoodiePairData<K, V> reduceByKey(SerializableBiFunction<V, V, V>
func, int parallelism) {
- return HoodieJavaPairRDD.of(pairRDDData.reduceByKey(func::apply,
parallelism));
+ public HoodiePairData<K, Iterable<V>> groupByKey() {
+ return new HoodieJavaPairRDD<>(pairRDDData.groupByKey());
+ }
+
+ @Override
+ public HoodiePairData<K, V> reduceByKey(SerializableBiFunction<V, V, V>
combiner, int parallelism) {
+ return HoodieJavaPairRDD.of(pairRDDData.reduceByKey(combiner::apply,
parallelism));
}
@Override
@@ -130,4 +136,9 @@ public class HoodieJavaPairRDD<K, V> extends
HoodiePairData<K, V> {
.map(tuple -> new Tuple2<>(tuple._1,
new ImmutablePair<>(tuple._2._1,
Option.ofNullable(tuple._2._2.orElse(null)))))));
}
+
+ @Override
+ public List<Pair<K, V>> collectAsList() {
+ return pairRDDData.map(t -> Pair.of(t._1, t._2)).collect();
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
index 0843dfc3c9..3964fa2d6b 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.storage.StorageLevel;
@@ -39,7 +40,7 @@ import scala.Tuple2;
*
* @param <T> type of object.
*/
-public class HoodieJavaRDD<T> extends HoodieData<T> {
+public class HoodieJavaRDD<T> implements HoodieData<T> {
private final JavaRDD<T> rddData;
@@ -74,17 +75,16 @@ public class HoodieJavaRDD<T> extends HoodieData<T> {
* @return the a {@link JavaRDD} of objects in type T.
*/
public static <T> JavaRDD<T> getJavaRDD(HoodieData<T> hoodieData) {
- return ((HoodieJavaRDD<T>) hoodieData).get();
+ return ((HoodieJavaRDD<T>) hoodieData).rddData;
}
- @Override
- public JavaRDD<T> get() {
- return rddData;
+ public static <K, V> JavaPairRDD<K, V> getJavaRDD(HoodiePairData<K, V>
hoodieData) {
+ return ((HoodieJavaPairRDD<K, V>) hoodieData).get();
}
@Override
- public void persist(String cacheConfig) {
- rddData.persist(StorageLevel.fromString(cacheConfig));
+ public void persist(String level) {
+ rddData.persist(StorageLevel.fromString(level));
}
@Override
@@ -112,20 +112,15 @@ public class HoodieJavaRDD<T> extends HoodieData<T> {
return HoodieJavaRDD.of(rddData.mapPartitions(func::apply,
preservesPartitioning));
}
- @Override
- public <O> HoodieData<O> mapPartitions(SerializableFunction<Iterator<T>,
Iterator<O>> func) {
- return HoodieJavaRDD.of(rddData.mapPartitions(func::apply));
- }
-
@Override
public <O> HoodieData<O> flatMap(SerializableFunction<T, Iterator<O>> func) {
return HoodieJavaRDD.of(rddData.flatMap(e -> func.apply(e)));
}
@Override
- public <K, V> HoodiePairData<K, V> mapToPair(SerializablePairFunction<T, K,
V> mapToPairFunc) {
+ public <K, V> HoodiePairData<K, V> mapToPair(SerializablePairFunction<T, K,
V> func) {
return HoodieJavaPairRDD.of(rddData.mapToPair(input -> {
- Pair<K, V> pair = mapToPairFunc.call(input);
+ Pair<K, V> pair = func.call(input);
return new Tuple2<>(pair.getLeft(), pair.getRight());
}));
}
@@ -140,13 +135,6 @@ public class HoodieJavaRDD<T> extends HoodieData<T> {
return HoodieJavaRDD.of(rddData.distinct(parallelism));
}
- @Override
- public <O> HoodieData<T> distinctWithKey(SerializableFunction<T, O>
keyGetter, int parallelism) {
- return mapToPair(i -> Pair.of(keyGetter.apply(i), i))
- .reduceByKey((value1, value2) -> value1, parallelism)
- .values();
- }
-
@Override
public HoodieData<T> filter(SerializableFunction<T, Boolean> filterFunc) {
return HoodieJavaRDD.of(rddData.filter(filterFunc::apply));
@@ -154,7 +142,7 @@ public class HoodieJavaRDD<T> extends HoodieData<T> {
@Override
public HoodieData<T> union(HoodieData<T> other) {
- return HoodieJavaRDD.of(rddData.union((JavaRDD<T>) other.get()));
+ return HoodieJavaRDD.of(rddData.union(((HoodieJavaRDD<T>) other).rddData));
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieBaseListData.java
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieBaseListData.java
new file mode 100644
index 0000000000..398762dc63
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieBaseListData.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.data;
+
+import org.apache.hudi.common.util.Either;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public abstract class HoodieBaseListData<T> {
+
+ protected final Either<Stream<T>, List<T>> data;
+ protected final boolean lazy;
+
+ protected HoodieBaseListData(List<T> data, boolean lazy) {
+ this.data = lazy ? Either.left(data.stream().parallel()) :
Either.right(data);
+ this.lazy = lazy;
+ }
+
+ protected HoodieBaseListData(Stream<T> dataStream, boolean lazy) {
+ // NOTE: In case this container is being instantiated by an eager parent,
we have to
+ // pre-materialize the stream
+ this.data = lazy ? Either.left(dataStream) :
Either.right(dataStream.collect(Collectors.toList()));
+ this.lazy = lazy;
+ }
+
+ protected Stream<T> asStream() {
+ return lazy ? data.asLeft() : data.asRight().parallelStream();
+ }
+
+ protected boolean isEmpty() {
+ if (lazy) {
+ return data.asLeft().findAny().isPresent();
+ } else {
+ return data.asRight().isEmpty();
+ }
+ }
+
+ protected long count() {
+ if (lazy) {
+ return data.asLeft().count();
+ } else {
+ return data.asRight().size();
+ }
+ }
+
+ protected List<T> collectAsList() {
+ if (lazy) {
+ return data.asLeft().collect(Collectors.toList());
+ } else {
+ return data.asRight();
+ }
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
index 4b391ecbab..2d24e7dd12 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
@@ -21,108 +21,163 @@ package org.apache.hudi.common.data;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.util.collection.Pair;
import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
/**
- * An abstraction for a data collection of objects in type T to store the
reference
- * and do transformation.
+ * An interface abstracting a container holding a collection of objects of
type {@code T}
+ * allowing to perform common transformation on it.
*
- * @param <T> type of object.
+ * This abstraction provides common API implemented by
+ * <ol>
+ * <li>In-memory implementation ({@code HoodieListData}, {@code
HoodieListPairData}), where all objects
+ * are held in-memory by the executing process</li>
+ * <li>RDD-based implementation ({@code HoodieJavaRDD}, etc)</li>, where
underlying collection is held
+ * by an RDD allowing to execute transformations using Spark engine on the
cluster
+ * </ol>
+ *
+ * All implementations provide for consistent semantic, where
+ * <ul>
+ * <li>All non-terminal* operations are executed lazily (for ex, {@code
map}, {@code filter}, etc)</li>
+ * <li>All terminal operations are executed eagerly, executing all
previously accumulated transformations.
+ * Note that, collection could not be re-used after invoking terminal
operation on it.</li>
+ * </ul>
+ *
+ * @param <T> type of object
*/
-public abstract class HoodieData<T> implements Serializable {
+public interface HoodieData<T> extends Serializable {
+
/**
- * @return the collection of objects.
+ * Persists the data w/ provided {@code level} (if applicable)
*/
- public abstract Object get();
+ void persist(String level);
/**
- * Caches the data.
- *
- * @param cacheConfig config value for caching.
+ * Un-persists the data (if previously persisted)
*/
- public abstract void persist(String cacheConfig);
+ void unpersist();
/**
- * Removes the cached data.
+ * Returns whether the collection is empty.
*/
- public abstract void unpersist();
+ boolean isEmpty();
/**
- * @return whether the collection is empty.
+ * Returns number of objects held in the collection
+ *
+ * NOTE: This is a terminal operation
*/
- public abstract boolean isEmpty();
+ long count();
/**
- * @return the number of objects.
+ * Maps every element in the collection using provided mapping {@code func}.
+ *
+ * This is an intermediate operation
+ *
+ * @param func serializable map function
+ * @param <O> output object type
+ * @return {@link HoodieData<O>} holding mapped elements
*/
- public abstract long count();
+ <O> HoodieData<O> map(SerializableFunction<T, O> func);
/**
- * @param func serializable map function.
- * @param <O> output object type.
- * @return {@link HoodieData<O>} containing the result. Actual execution may
be deferred.
+ * Maps every element in the collection's partition (if applicable) by
applying provided
+ * mapping {@code func} to every collection's partition
+ *
+ * This is an intermediate operation
+ *
+ * @param func serializable map function accepting {@link
Iterator} of a single
+ * partition's elements and returning a new
{@link Iterator} mapping
+ * every element of the partition into a new one
+ * @param preservesPartitioning whether to preserve partitioning in the
resulting collection
+ * @param <O> output object type
+ * @return {@link HoodieData<O>} holding mapped elements
*/
- public abstract <O> HoodieData<O> map(SerializableFunction<T, O> func);
+ <O> HoodieData<O> mapPartitions(SerializableFunction<Iterator<T>,
+ Iterator<O>> func, boolean preservesPartitioning);
/**
- * @param func serializable map function by taking a
partition of objects
- * and generating an iterator.
- * @param preservesPartitioning whether to preserve partitions in the result.
- * @param <O> output object type.
- * @return {@link HoodieData<O>} containing the result. Actual execution may
be deferred.
+ * Maps every element in the collection into a collection of the new
elements (provided by
+ * {@link Iterator}) using provided mapping {@code func}, subsequently
flattening the result
+ * (by concatenating) into a single collection
+ *
+ * This is an intermediate operation
+ *
+ * @param func serializable function mapping every element {@link T} into
{@code Iterator<O>}
+ * @param <O> output object type
+ * @return {@link HoodieData<O>} holding mapped elements
*/
- public abstract <O> HoodieData<O> mapPartitions(
- SerializableFunction<Iterator<T>, Iterator<O>> func, boolean
preservesPartitioning);
+ <O> HoodieData<O> flatMap(SerializableFunction<T, Iterator<O>> func);
/**
- * @param func serializable map function by taking a
partition of objects
- * and generating an iterator.
- * @param <O> output object type.
- * @return {@link HoodieData<O>} containing the result. Actual execution may
be deferred.
+ * Maps every element in the collection using provided mapping {@code func}
into a {@link Pair<K, V>}
+ * of elements {@code K} and {@code V}
+ * <p>
+ * This is an intermediate operation
+ *
+ * @param func serializable map function
+ * @param <K> key type of the pair
+ * @param <V> value type of the pair
+ * @return {@link HoodiePairData<K, V>} holding mapped elements
*/
- public abstract <O> HoodieData<O> mapPartitions(
- SerializableFunction<Iterator<T>, Iterator<O>> func);
+ <K, V> HoodiePairData<K, V> mapToPair(SerializablePairFunction<T, K, V>
func);
/**
- * @param func serializable flatmap function.
- * @param <O> output object type.
- * @return {@link HoodieData<O>} containing the result. Actual execution may
be deferred.
+ * Returns new {@link HoodieData} collection holding only distinct objects
of the original one
+ *
+ * This is a stateful intermediate operation
*/
- public abstract <O> HoodieData<O> flatMap(SerializableFunction<T,
Iterator<O>> func);
+ HoodieData<T> distinct();
/**
- * @param mapToPairFunc serializable map function to generate a pair.
- * @param <K> key type of the pair.
- * @param <V> value type of the pair.
- * @return {@link HoodiePairData<K, V>} containing the result. Actual
execution may be deferred.
+ * Returns new {@link HoodieData} collection holding only distinct objects
of the original one
+ *
+ * This is a stateful intermediate operation
*/
- public abstract <K, V> HoodiePairData<K, V>
mapToPair(SerializablePairFunction<T, K, V> mapToPairFunc);
+ HoodieData<T> distinct(int parallelism);
/**
- * @return distinct objects in {@link HoodieData}.
+ * Returns new instance of {@link HoodieData} collection only containing
elements matching provided
+ * {@code filterFunc} (ie ones it returns true on)
+ *
+ * @param filterFunc filtering func either accepting or rejecting the
elements
+ * @return {@link HoodieData<T>} holding filtered elements
*/
- public abstract HoodieData<T> distinct();
+ HoodieData<T> filter(SerializableFunction<T, Boolean> filterFunc);
- public abstract HoodieData<T> distinct(int parallelism);
-
- public abstract <O> HoodieData<T> distinctWithKey(SerializableFunction<T, O>
keyGetter, int parallelism);
-
- public abstract HoodieData<T> filter(SerializableFunction<T, Boolean>
filterFunc);
+ /**
+ * Unions {@link HoodieData} with another instance of {@link HoodieData}.
+ * Note that, it's only able to union same underlying collection
implementations.
+ *
+ * This is a stateful intermediate operation
+ *
+ * @param other {@link HoodieData} collection
+ * @return {@link HoodieData<T>} holding superset of elements of this and
{@code other} collections
+ */
+ HoodieData<T> union(HoodieData<T> other);
/**
- * Unions this {@link HoodieData} with other {@link HoodieData}.
- * @param other {@link HoodieData} of interest.
- * @return the union of two as as instance of {@link HoodieData}.
+ * Collects results of the underlying collection into a {@link List<T>}
+ *
+ * This is a terminal operation
*/
- public abstract HoodieData<T> union(HoodieData<T> other);
+ List<T> collectAsList();
/**
- * @return collected results in {@link List<T>}.
+ * Re-partitions underlying collection (if applicable) making sure new
{@link HoodieData} has
+ * exactly {@code parallelism} partitions
+ *
+ * @param parallelism target number of partitions in the underlying
collection
+ * @return {@link HoodieData<T>} holding re-partitioned collection
*/
- public abstract List<T> collectAsList();
+ HoodieData<T> repartition(int parallelism);
- public abstract HoodieData<T> repartition(int parallelism);
+ default <O> HoodieData<T> distinctWithKey(SerializableFunction<T, O>
keyGetter, int parallelism) {
+ return mapToPair(i -> Pair.of(keyGetter.apply(i), i))
+ .reduceByKey((value1, value2) -> value1, parallelism)
+ .values();
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java
deleted file mode 100644
index 28ed2e282d..0000000000
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi.common.data;
-
-import org.apache.hudi.common.function.SerializableFunction;
-import org.apache.hudi.common.function.SerializablePairFunction;
-import org.apache.hudi.common.util.collection.Pair;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import static
org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper;
-import static
org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper;
-
-/**
- * Holds a {@link List} of objects.
- *
- * @param <T> type of object.
- */
-public class HoodieList<T> extends HoodieData<T> {
-
- private final List<T> listData;
-
- private HoodieList(List<T> listData) {
- this.listData = listData;
- }
-
- /**
- * @param listData a {@link List} of objects in type T.
- * @param <T> type of object.
- * @return a new instance containing the {@link List<T>} reference.
- */
- public static <T> HoodieList<T> of(List<T> listData) {
- return new HoodieList<>(listData);
- }
-
- /**
- * @param hoodieData {@link HoodieList <T>} instance containing the {@link
List} of objects.
- * @param <T> type of object.
- * @return the a {@link List} of objects in type T.
- */
- public static <T> List<T> getList(HoodieData<T> hoodieData) {
- return ((HoodieList<T>) hoodieData).get();
- }
-
- @Override
- public List<T> get() {
- return listData;
- }
-
- @Override
- public void persist(String cacheConfig) {
- // No OP
- }
-
- @Override
- public void unpersist() {
- // No OP
- }
-
- @Override
- public boolean isEmpty() {
- return listData.isEmpty();
- }
-
- @Override
- public long count() {
- return listData.size();
- }
-
- @Override
- public <O> HoodieData<O> map(SerializableFunction<T, O> func) {
- return HoodieList.of(listData.stream().parallel()
- .map(throwingMapWrapper(func)).collect(Collectors.toList()));
- }
-
- @Override
- public <O> HoodieData<O> mapPartitions(SerializableFunction<Iterator<T>,
Iterator<O>> func, boolean preservesPartitioning) {
- return mapPartitions(func);
- }
-
- @Override
- public <O> HoodieData<O> mapPartitions(SerializableFunction<Iterator<T>,
Iterator<O>> func) {
- List<O> result = new ArrayList<>();
-
throwingMapWrapper(func).apply(listData.iterator()).forEachRemaining(result::add);
- return HoodieList.of(result);
- }
-
- @Override
- public <O> HoodieData<O> flatMap(SerializableFunction<T, Iterator<O>> func) {
- Function<T, Iterator<O>> throwableFunc = throwingMapWrapper(func);
- return HoodieList.of(listData.stream().flatMap(e -> {
- List<O> result = new ArrayList<>();
- Iterator<O> iterator = throwableFunc.apply(e);
- iterator.forEachRemaining(result::add);
- return result.stream();
- }).collect(Collectors.toList()));
- }
-
- @Override
- public <K, V> HoodiePairData<K, V> mapToPair(SerializablePairFunction<T, K,
V> mapToPairFunc) {
- Map<K, List<V>> mapOfPairs = new HashMap<>();
- Function<T, Pair<K, V>> throwableMapToPairFunc =
throwingMapToPairWrapper(mapToPairFunc);
- listData.forEach(data -> {
- Pair<K, V> pair = throwableMapToPairFunc.apply(data);
- List<V> list = mapOfPairs.computeIfAbsent(pair.getKey(), k -> new
ArrayList<>());
- list.add(pair.getValue());
- });
- return HoodieMapPair.of(mapOfPairs);
- }
-
- @Override
- public HoodieData<T> distinct() {
- return HoodieList.of(new ArrayList<>(new HashSet<>(listData)));
- }
-
- @Override
- public HoodieData<T> distinct(int parallelism) {
- return distinct();
- }
-
- @Override
- public <O> HoodieData<T> distinctWithKey(SerializableFunction<T, O>
keyGetter, int parallelism) {
- return mapToPair(i -> Pair.of(keyGetter.apply(i), i))
- .reduceByKey((value1, value2) -> value1, parallelism)
- .values();
- }
-
- @Override
- public HoodieData<T> filter(SerializableFunction<T, Boolean> filterFunc) {
- return HoodieList.of(listData
- .stream()
- .filter(i -> throwingMapWrapper(filterFunc).apply(i))
- .collect(Collectors.toList()));
- }
-
- @Override
- public HoodieData<T> union(HoodieData<T> other) {
- List<T> unionResult = new ArrayList<>();
- unionResult.addAll(listData);
- unionResult.addAll(other.collectAsList());
- return HoodieList.of(unionResult);
- }
-
- @Override
- public List<T> collectAsList() {
- return listData;
- }
-
- @Override
- public HoodieData<T> repartition(int parallelism) {
- // no op
- return this;
- }
-}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
new file mode 100644
index 0000000000..0be9ec9fa7
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.data;
+
+import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static
org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper;
+import static
org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper;
+
+/**
+ * In-memory implementation of {@link HoodieData} holding internally a {@link
Stream} of objects.
+ *
+ * {@link HoodieListData} can have either of the 2 execution semantics:
+ *
+ * <ol>
+ * <li>Eager: with every operation being executed right away</li>
+ * <li>Lazy: with every operation being "stacked up", with it execution
postponed until
+ * "terminal" operation is invoked</li>
+ * </ol>
+ *
+ * NOTE: This is an in-memory counterpart for {@code HoodieJavaRDD}, and it
strives to provide
+ * similar semantic as RDD container -- all intermediate (non-terminal,
not de-referencing
+ * the stream like "collect", "groupBy", etc) operations are executed
*lazily*.
+ * This allows to make sure that compute/memory churn is minimal since
only necessary
+ * computations will ultimately be performed.
+ *
+ * Please note, however, that while RDD container allows the same
collection to be
+ * de-referenced more than once (ie terminal operation invoked more than
once),
+ * {@link HoodieListData} allows that only when instantiated w/ an eager
execution semantic.
+ *
+ * @param <T> type of object.
+ */
+public class HoodieListData<T> extends HoodieBaseListData<T> implements
HoodieData<T> {
+
+ private HoodieListData(List<T> data, boolean lazy) {
+ super(data, lazy);
+ }
+
+ HoodieListData(Stream<T> dataStream, boolean lazy) {
+ super(dataStream, lazy);
+ }
+
+ /**
+ * Creates instance of {@link HoodieListData} bearing *eager* execution
semantic
+ *
+ * @param listData a {@link List} of objects in type T
+ * @param <T> type of object
+ * @return a new instance containing the {@link List<T>} reference
+ */
+ public static <T> HoodieListData<T> eager(List<T> listData) {
+ return new HoodieListData<>(listData, false);
+ }
+
+ /**
+ * Creates instance of {@link HoodieListData} bearing *lazy* execution
semantic
+ *
+ * @param listData a {@link List} of objects in type T
+ * @param <T> type of object
+ * @return a new instance containing the {@link List<T>} reference
+ */
+ public static <T> HoodieListData<T> lazy(List<T> listData) {
+ return new HoodieListData<>(listData, true);
+ }
+
+ @Override
+ public void persist(String level) {
+ // No OP
+ }
+
+ @Override
+ public void unpersist() {
+ // No OP
+ }
+
+ @Override
+ public <O> HoodieData<O> map(SerializableFunction<T, O> func) {
+ return new HoodieListData<>(asStream().map(throwingMapWrapper(func)),
lazy);
+ }
+
+ @Override
+ public <O> HoodieData<O> mapPartitions(SerializableFunction<Iterator<T>,
Iterator<O>> func, boolean preservesPartitioning) {
+ Function<Iterator<T>, Iterator<O>> mapper = throwingMapWrapper(func);
+ return new HoodieListData<>(
+ StreamSupport.stream(
+ Spliterators.spliteratorUnknownSize(
+ mapper.apply(asStream().iterator()), Spliterator.ORDERED),
true),
+ lazy
+ );
+ }
+
+ @Override
+ public <O> HoodieData<O> flatMap(SerializableFunction<T, Iterator<O>> func) {
+ Function<T, Iterator<O>> mapper = throwingMapWrapper(func);
+ Stream<O> mappedStream = asStream().flatMap(e ->
+ StreamSupport.stream(
+ Spliterators.spliteratorUnknownSize(mapper.apply(e),
Spliterator.ORDERED), true));
+ return new HoodieListData<>(mappedStream, lazy);
+ }
+
+ @Override
+ public <K, V> HoodiePairData<K, V> mapToPair(SerializablePairFunction<T, K,
V> func) {
+ Function<T, Pair<K, V>> throwableMapToPairFunc =
throwingMapToPairWrapper(func);
+ return new HoodieListPairData<>(asStream().map(throwableMapToPairFunc),
lazy);
+ }
+
+ @Override
+ public HoodieData<T> distinct() {
+ return new HoodieListData<>(asStream().distinct(), lazy);
+ }
+
+ @Override
+ public HoodieData<T> distinct(int parallelism) {
+ return distinct();
+ }
+
+ @Override
+ public <O> HoodieData<T> distinctWithKey(SerializableFunction<T, O>
keyGetter, int parallelism) {
+ return mapToPair(i -> Pair.of(keyGetter.apply(i), i))
+ .reduceByKey((value1, value2) -> value1, parallelism)
+ .values();
+ }
+
+ @Override
+ public HoodieData<T> filter(SerializableFunction<T, Boolean> filterFunc) {
+ return new HoodieListData<>(asStream().filter(r ->
throwingMapWrapper(filterFunc).apply(r)), lazy);
+ }
+
+ @Override
+ public HoodieData<T> union(HoodieData<T> other) {
+ ValidationUtils.checkArgument(other instanceof HoodieListData);
+ return new HoodieListData<>(Stream.concat(asStream(),
((HoodieListData<T>)other).asStream()), lazy);
+ }
+
+ @Override
+ public HoodieData<T> repartition(int parallelism) {
+ // no op
+ return this;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return super.isEmpty();
+ }
+
+ @Override
+ public long count() {
+ return super.count();
+ }
+
+ @Override
+ public List<T> collectAsList() {
+ return super.collectAsList();
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java
new file mode 100644
index 0000000000..a389649548
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.data;
+
+import org.apache.hudi.common.function.SerializableBiFunction;
+import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static
org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper;
+import static
org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper;
+
+/**
+ * In-memory implementation of {@link HoodiePairData} holding internally a
{@link Stream} of {@link Pair}s.
+ *
+ * {@link HoodieListData} can have either of the 2 execution semantics:
+ *
+ * <ol>
+ * <li>Eager: with every operation being executed right away</li>
+ * <li>Lazy: with every operation being "stacked up", with it execution
postponed until
+ * "terminal" operation is invoked</li>
+ * </ol>
+ *
+ *
+ * NOTE: This is an in-memory counterpart for {@code HoodieJavaPairRDD}, and
it strives to provide
+ * similar semantic as RDD container -- all intermediate (non-terminal,
not de-referencing
+ * the stream like "collect", "groupBy", etc) operations are executed
*lazily*.
+ * This allows to make sure that compute/memory churn is minimal since
only necessary
+ * computations will ultimately be performed.
+ *
+ * Please note, however, that while RDD container allows the same
collection to be
+ * de-referenced more than once (ie terminal operation invoked more than
once),
+ * {@link HoodieListData} allows that only when instantiated w/ an eager
execution semantic.
+ *
+ * @param <K> type of the key in the pair
+ * @param <V> type of the value in the pair
+ */
+public class HoodieListPairData<K, V> extends HoodieBaseListData<Pair<K, V>>
implements HoodiePairData<K, V> {
+
+ private HoodieListPairData(List<Pair<K, V>> data, boolean lazy) {
+ super(data, lazy);
+ }
+
+ HoodieListPairData(Stream<Pair<K, V>> dataStream, boolean lazy) {
+ super(dataStream, lazy);
+ }
+
+ @Override
+ public List<Pair<K, V>> get() {
+ return collectAsList();
+ }
+
+ @Override
+ public void persist(String cacheConfig) {
+ // no-op
+ }
+
+ @Override
+ public void unpersist() {
+ // no-op
+ }
+
+ @Override
+ public HoodieData<K> keys() {
+ return new HoodieListData<>(asStream().map(Pair::getKey), lazy);
+ }
+
+ @Override
+ public HoodieData<V> values() {
+ return new HoodieListData<>(asStream().map(Pair::getValue), lazy);
+ }
+
+ @Override
+ public Map<K, Long> countByKey() {
+ return asStream().collect(Collectors.groupingBy(Pair::getKey,
Collectors.counting()));
+ }
+
+ @Override
+ public HoodiePairData<K, Iterable<V>> groupByKey() {
+ Collector<Pair<K, V>, ?, List<V>> mappingCollector =
Collectors.mapping(Pair::getValue, Collectors.toList());
+ Collector<Pair<K, V>, ?, Map<K, List<V>>> groupingCollector =
+ Collectors.groupingBy(Pair::getKey, mappingCollector);
+
+ Map<K, List<V>> groupedByKey = asStream().collect(groupingCollector);
+ return new HoodieListPairData<>(
+ groupedByKey.entrySet().stream().map(e -> Pair.of(e.getKey(),
e.getValue())),
+ lazy
+ );
+ }
+
+ @Override
+ public HoodiePairData<K, V> reduceByKey(SerializableBiFunction<V, V, V>
combiner, int parallelism) {
+ Map<K, java.util.Optional<V>> reducedMap = asStream().collect(
+ Collectors.groupingBy(
+ Pair::getKey,
+ HashMap::new,
+ Collectors.mapping(Pair::getValue,
Collectors.reducing(combiner::apply))));
+
+ return new HoodieListPairData<>(
+ reducedMap.entrySet()
+ .stream()
+ .map(e -> Pair.of(e.getKey(), e.getValue().orElse(null))),
+ lazy
+ );
+ }
+
+ @Override
+ public <O> HoodieData<O> map(SerializableFunction<Pair<K, V>, O> func) {
+ Function<Pair<K, V>, O> uncheckedMapper = throwingMapWrapper(func);
+ return new HoodieListData<>(asStream().map(uncheckedMapper), lazy);
+ }
+
+ @Override
+ public <L, W> HoodiePairData<L, W>
mapToPair(SerializablePairFunction<Pair<K, V>, L, W> mapToPairFunc) {
+ return new HoodieListPairData<>(asStream().map(p ->
throwingMapToPairWrapper(mapToPairFunc).apply(p)), lazy);
+ }
+
+ @Override
+ public <W> HoodiePairData<K, Pair<V, Option<W>>>
leftOuterJoin(HoodiePairData<K, W> other) {
+ ValidationUtils.checkArgument(other instanceof HoodieListPairData);
+
+ // Transform right-side container to a multi-map of [[K]] to [[List<W>]]
values
+ HashMap<K, List<W>> rightStreamMap = ((HoodieListPairData<K, W>)
other).asStream().collect(
+ Collectors.groupingBy(
+ Pair::getKey,
+ HashMap::new,
+ Collectors.mapping(Pair::getValue, Collectors.toList())));
+
+ Stream<Pair<K, Pair<V, Option<W>>>> leftOuterJoined =
asStream().flatMap(pair -> {
+ K key = pair.getKey();
+ V leftValue = pair.getValue();
+ List<W> rightValues = rightStreamMap.get(key);
+
+ if (rightValues == null) {
+ return Stream.of(Pair.of(key, Pair.of(leftValue, Option.empty())));
+ } else {
+ return rightValues.stream().map(rightValue ->
+ Pair.of(key, Pair.of(leftValue, Option.of(rightValue))));
+ }
+ });
+
+ return new HoodieListPairData<>(leftOuterJoined, lazy);
+ }
+
+ @Override
+ public long count() {
+ return super.count();
+ }
+
+ @Override
+ public List<Pair<K, V>> collectAsList() {
+ return super.collectAsList();
+ }
+
+ public static <K, V> HoodieListPairData<K, V> lazy(List<Pair<K, V>> data) {
+ return new HoodieListPairData<>(data, true);
+ }
+
+ public static <K, V> HoodieListPairData<K, V> eager(List<Pair<K, V>> data) {
+ return new HoodieListPairData<>(data, false);
+ }
+
+ public static <K, V> HoodieListPairData<K, V> lazy(Map<K, List<V>> data) {
+ return new HoodieListPairData<>(explode(data), true);
+ }
+
+ public static <K, V> HoodieListPairData<K, V> eager(Map<K, List<V>> data) {
+ return new HoodieListPairData<>(explode(data), false);
+ }
+
+ private static <K, V> Stream<Pair<K, V>> explode(Map<K, List<V>> data) {
+ return data.entrySet().stream()
+ .flatMap(e -> e.getValue().stream().map(v -> Pair.of(e.getKey(), v)));
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieMapPair.java
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieMapPair.java
deleted file mode 100644
index 1e125c90a1..0000000000
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieMapPair.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi.common.data;
-
-import org.apache.hudi.common.function.FunctionWrapper;
-import org.apache.hudi.common.function.SerializableBiFunction;
-import org.apache.hudi.common.function.SerializableFunction;
-import org.apache.hudi.common.function.SerializablePairFunction;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.ImmutablePair;
-import org.apache.hudi.common.util.collection.Pair;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static
org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper;
-
-/**
- * Implementation of {@link HoodiePairData} using Java {@link Map}.
- * The pairs are organized by the key in the Map and values for the same key
- * are stored in a list as the value corresponding to the key in the Map.
- *
- * @param <K> type of key.
- * @param <V> type of value.
- */
-public class HoodieMapPair<K, V> extends HoodiePairData<K, V> {
-
- private final Map<K, List<V>> mapPairData;
-
- private HoodieMapPair(Map<K, List<V>> mapPairData) {
- this.mapPairData = mapPairData;
- }
-
- /**
- * @param mapPairData a {@link Map} of pairs.
- * @param <K> type of key.
- * @param <V> type of value.
- * @return a new instance containing the {@link Map<K, List<V>>} reference.
- */
- public static <K, V> HoodieMapPair<K, V> of(Map<K, List<V>> mapPairData) {
- return new HoodieMapPair<>(mapPairData);
- }
-
- /**
- * @param hoodiePairData {@link HoodieMapPair <K, V>} instance containing
the {@link Map} of pairs.
- * @param <K> type of key.
- * @param <V> type of value.
- * @return the {@link Map} of pairs.
- */
- public static <K, V> Map<K, List<V>> getMapPair(HoodiePairData<K, V>
hoodiePairData) {
- return ((HoodieMapPair<K, V>) hoodiePairData).get();
- }
-
- @Override
- public Map<K, List<V>> get() {
- return mapPairData;
- }
-
- @Override
- public void persist(String cacheConfig) {
- // No OP
- }
-
- @Override
- public void unpersist() {
- // No OP
- }
-
- @Override
- public HoodieData<K> keys() {
- return HoodieList.of(new ArrayList<>(mapPairData.keySet()));
- }
-
- @Override
- public HoodieData<V> values() {
- return HoodieList.of(
-
mapPairData.values().stream().flatMap(List::stream).collect(Collectors.toList()));
- }
-
- @Override
- public long count() {
- return mapPairData.values().stream().map(
- list -> (long) list.size()).reduce(Long::sum).orElse(0L);
- }
-
- @Override
- public Map<K, Long> countByKey() {
- return mapPairData.entrySet().stream().collect(
- Collectors.toMap(Map.Entry::getKey, entry -> (long)
entry.getValue().size()));
- }
-
- @Override
- public HoodiePairData<K, V> reduceByKey(SerializableBiFunction<V, V, V>
func, int parallelism) {
- return HoodieMapPair.of(mapPairData.entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> {
- Option<V> reducedValue =
Option.fromJavaOptional(e.getValue().stream().reduce(func::apply));
- return reducedValue.isPresent() ?
Collections.singletonList(reducedValue.get()) : Collections.emptyList();
- })));
- }
-
- @Override
- public <O> HoodieData<O> map(SerializableFunction<Pair<K, V>, O> func) {
- Function<Pair<K, V>, O> throwableFunc = throwingMapWrapper(func);
- return HoodieList.of(
- streamAllPairs().map(throwableFunc).collect(Collectors.toList()));
- }
-
- @Override
- public <L, W> HoodiePairData<L, W>
mapToPair(SerializablePairFunction<Pair<K, V>, L, W> mapToPairFunc) {
- Map<L, List<W>> newMap = new HashMap<>();
- Function<Pair<K, V>, Pair<L, W>> throwableMapToPairFunc =
- FunctionWrapper.throwingMapToPairWrapper(mapToPairFunc);
- streamAllPairs().map(pair ->
throwableMapToPairFunc.apply(pair)).forEach(newPair -> {
- List<W> list = newMap.computeIfAbsent(newPair.getKey(), k -> new
ArrayList<>());
- list.add(newPair.getValue());
- });
- return HoodieMapPair.of(newMap);
- }
-
- @Override
- public <W> HoodiePairData<K, Pair<V, Option<W>>>
leftOuterJoin(HoodiePairData<K, W> other) {
- Map<K, List<W>> otherMapPairData = HoodieMapPair.getMapPair(other);
- Stream<ImmutablePair<K, ImmutablePair<V, Option<List<W>>>>> pairs =
streamAllPairs()
- .map(pair -> new ImmutablePair<>(pair.getKey(), new ImmutablePair<>(
- pair.getValue(),
Option.ofNullable(otherMapPairData.get(pair.getKey())))));
- Map<K, List<Pair<V, Option<W>>>> resultMap = new HashMap<>();
- pairs.forEach(pair -> {
- K key = pair.getKey();
- ImmutablePair<V, Option<List<W>>> valuePair = pair.getValue();
- List<Pair<V, Option<W>>> resultList = resultMap.computeIfAbsent(key, k
-> new ArrayList<>());
- if (!valuePair.getRight().isPresent()) {
- resultList.add(new ImmutablePair<>(valuePair.getLeft(),
Option.empty()));
- } else {
- resultList.addAll(valuePair.getRight().get().stream().map(
- w -> new ImmutablePair<>(valuePair.getLeft(),
Option.of(w))).collect(Collectors.toList()));
- }
- });
- return HoodieMapPair.of(resultMap);
- }
-
- private Stream<ImmutablePair<K, V>> streamAllPairs() {
- return mapPairData.entrySet().stream().flatMap(
- entry -> entry.getValue().stream().map(e -> new
ImmutablePair<>(entry.getKey(), e)));
- }
-}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java
index 9ff52793d6..49fa7174da 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import java.io.Serializable;
+import java.util.List;
import java.util.Map;
/**
@@ -35,71 +36,92 @@ import java.util.Map;
* @param <K> type of key.
* @param <V> type of value.
*/
-public abstract class HoodiePairData<K, V> implements Serializable {
+public interface HoodiePairData<K, V> extends Serializable {
/**
* @return the collection of pairs.
*/
- public abstract Object get();
+ Object get();
/**
- * Caches the data.
+ * Persists the data (if applicable)
*
* @param cacheConfig config value for caching.
*/
- public abstract void persist(String cacheConfig);
+ void persist(String cacheConfig);
/**
- * Removes the cached data.
+ * Un-persists the data (if applicable)
*/
- public abstract void unpersist();
+ void unpersist();
/**
- * @return all keys in {@link HoodieData}.
+ * Returns a {@link HoodieData} holding the key from every corresponding pair
*/
- public abstract HoodieData<K> keys();
+ HoodieData<K> keys();
/**
- * @return all values in {@link HoodieData}.
+ * Returns a {@link HoodieData} holding the value from every corresponding
pair
*/
- public abstract HoodieData<V> values();
+ HoodieData<V> values();
/**
- * @return the number of pairs.
+ * Returns number of held pairs
*/
- public abstract long count();
+ long count();
/**
- * @return the number of pairs per key in a {@link Map}.
+ * Counts the number of pairs grouping them by key
*/
- public abstract Map<K, Long> countByKey();
+ Map<K, Long> countByKey();
- public abstract HoodiePairData<K, V> reduceByKey(SerializableBiFunction<V,
V, V> func, int parallelism);
+ /**
+ * Groups the values for each key in the dataset into a single sequence
+ */
+ HoodiePairData<K, Iterable<V>> groupByKey();
+
+ /**
+ * Reduces original sequence by de-duplicating the pairs w/ the same key,
using provided
+ * binary operator {@code combiner}. Returns an instance of {@link
HoodiePairData} holding
+ * the "de-duplicated" pairs, ie only pairs with unique keys.
+ *
+ * @param combiner method to combine values of the pairs with the same key
+ * @param parallelism target parallelism (if applicable)
+ */
+ HoodiePairData<K, V> reduceByKey(SerializableBiFunction<V, V, V> combiner,
int parallelism);
/**
* @param func serializable map function.
* @param <O> output object type.
* @return {@link HoodieData<O>} containing the result. Actual execution may
be deferred.
*/
- public abstract <O> HoodieData<O> map(SerializableFunction<Pair<K, V>, O>
func);
+ <O> HoodieData<O> map(SerializableFunction<Pair<K, V>, O> func);
/**
* @param mapToPairFunc serializable map function to generate another pair.
* @param <L> new key type.
* @param <W> new value type.
- * @return {@link HoodiePairData<L, W>} containing the result. Actual
execution may be deferred.
+ * @return containing the result. Actual execution may be deferred.
*/
- public abstract <L, W> HoodiePairData<L, W> mapToPair(
+ <L, W> HoodiePairData<L, W> mapToPair(
SerializablePairFunction<Pair<K, V>, L, W> mapToPairFunc);
/**
- * Performs a left outer join of this and other. For each element (k, v) in
this,
- * the resulting HoodiePairData will either contain all pairs (k, (v,
Some(w))) for w in other,
- * or the pair (k, (v, None)) if no elements in other have key k.
+ * Performs a left outer join of this dataset against {@code other}.
+ *
+ * For each element (k, v) in this, the resulting {@link HoodiePairData}
will either contain all
+ * pairs {@code (k, (v, Some(w)))} for every {@code w} in the {@code other},
or the pair {@code (k, (v, None))}
+ * if no elements in {@code other} have the pair w/ a key {@code k}
*
* @param other the other {@link HoodiePairData}
* @param <W> value type of the other {@link HoodiePairData}
- * @return {@link HoodiePairData<K, Pair<V, Option<W>>>} containing the left
outer join result.
- * Actual execution may be deferred.
+ * @return containing the result of the left outer join
+ */
+ <W> HoodiePairData<K, Pair<V, Option<W>>> leftOuterJoin(HoodiePairData<K, W>
other);
+
+ /**
+ * Collects results of the underlying collection into a {@link List<Pair<K,
V>>}
+ *
+ * This is a terminal operation
*/
- public abstract <W> HoodiePairData<K, Pair<V, Option<W>>>
leftOuterJoin(HoodiePairData<K, W> other);
+ List<Pair<K, V>> collectAsList();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
index c99430e284..5d7d193dc6 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
@@ -24,7 +24,7 @@ import
org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieAccumulator;
import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
import org.apache.hudi.common.data.HoodieData;
-import org.apache.hudi.common.data.HoodieList;
+import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction;
@@ -71,12 +71,12 @@ public final class HoodieLocalEngineContext extends
HoodieEngineContext {
@Override
public <T> HoodieData<T> emptyHoodieData() {
- return HoodieList.of(Collections.emptyList());
+ return HoodieListData.eager(Collections.emptyList());
}
@Override
public <T> HoodieData<T> parallelize(List<T> data, int parallelism) {
- return HoodieList.of(data);
+ return HoodieListData.eager(data);
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
index 9040a04d5e..bb1ef72bea 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
@@ -34,6 +34,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
+import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@@ -107,6 +108,19 @@ public class CollectionUtils {
return combined;
}
+ /**
+ * Combines provided {@link Map}s into one, returning new instance of {@link
HashMap}.
+ *
+ * NOTE: That values associated with overlapping keys from the second map,
will override
+ * values from the first one
+ */
+ public static <K, V> HashMap<K, V> combine(Map<K, V> one, Map<K, V> another,
BiFunction<V, V, V> merge) {
+ HashMap<K, V> combined = new HashMap<>(one.size() + another.size());
+ combined.putAll(one);
+ another.forEach((k, v) -> combined.merge(k, v, merge));
+ return combined;
+ }
+
/**
* Returns difference b/w {@code one} {@link Set} of elements and {@code
another}
*/
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/Either.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/Either.java
new file mode 100644
index 0000000000..93accc4b75
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/Either.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import javax.annotation.Nonnull;
+
+import static org.apache.hudi.TypeUtils.unsafeCast;
+
+/**
+ * Utility that could hold exclusively only either of (hence the name):
+ * <ul>
+ * <li>Non-null value of type {@link L}</li>
+ * <li>Non-null value of type {@link R}</li>
+ * </ul>
+ *
+ * @param <L> type of the "left" potential element
+ * @param <R> type of the "right" potential element
+ */
+public abstract class Either<L, R> {
+
+ @Nonnull
+ protected abstract Object getValue();
+
+ public final boolean isLeft() {
+ return this instanceof EitherLeft;
+ }
+
+ public final boolean isRight() {
+ return this instanceof EitherRight;
+ }
+
+ public R asRight() {
+ ValidationUtils.checkArgument(isRight(), "Trying to access non-existent
value of Either");
+ EitherRight<L, R> right = unsafeCast(this);
+ return right.getValue();
+ }
+
+ public L asLeft() {
+ ValidationUtils.checkArgument(isLeft(), "Trying to access non-existent
value of Either");
+ EitherLeft<L, R> left = unsafeCast(this);
+ return left.getValue();
+ }
+
+ public static <L, R> Either<L, R> right(R right) {
+ return new EitherRight<>(right);
+ }
+
+ public static <L, R> Either<L, R> left(L left) {
+ return new EitherLeft<>(left);
+ }
+
+ public static class EitherRight<L, R> extends Either<L, R> {
+ private final R value;
+ private EitherRight(@Nonnull R right) {
+ this.value = right;
+ }
+
+ @Nonnull
+ @Override
+ protected R getValue() {
+ return value;
+ }
+ }
+
+ public static class EitherLeft<L, R> extends Either<L, R> {
+ private final L value;
+ private EitherLeft(@Nonnull L value) {
+ this.value = value;
+ }
+
+ @Nonnull
+ @Override
+ protected L getValue() {
+ return value;
+ }
+ }
+}
\ No newline at end of file
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieList.java
b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java
similarity index 64%
rename from
hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieList.java
rename to
hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java
index 6130d4af10..8da8be1338 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieList.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java
@@ -21,17 +21,19 @@ package org.apache.hudi.common.data;
import org.apache.hudi.common.util.collection.Pair;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.Arrays;
import java.util.List;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
-class TestHoodieList {
+class TestHoodieListData {
private static Stream<Arguments> distinctWithKey() {
return Stream.of(
@@ -44,7 +46,22 @@ class TestHoodieList {
@ParameterizedTest
@MethodSource
void distinctWithKey(List<Pair<String, Integer>> expected, List<Pair<String,
Integer>> originalList) {
- List<Pair<String, Integer>> distinctList =
HoodieList.of(originalList).distinctWithKey(Pair::getLeft, 1).collectAsList();
+ List<Pair<String, Integer>> distinctList =
HoodieListData.eager(originalList).distinctWithKey(Pair::getLeft,
1).collectAsList();
assertEquals(expected, distinctList);
}
+
+ @Test
+ void testEagerSemantic() {
+ List<String> sourceList = Arrays.asList("quick", "brown", "fox");
+
+ HoodieListData<String> originalListData = HoodieListData.eager(sourceList);
+ HoodieData<Integer> lengthsListData = originalListData.map(String::length);
+
+ List<Integer> expectedLengths =
sourceList.stream().map(String::length).collect(Collectors.toList());
+ assertEquals(expectedLengths, lengthsListData.collectAsList());
+ // Here we assert that even though we already de-referenced derivative
container,
+ // we still can dereference its parent (multiple times)
+ assertEquals(3, originalListData.count());
+ assertEquals(sourceList, originalListData.collectAsList());
+ }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieMapPair.java
b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java
similarity index 75%
rename from
hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieMapPair.java
rename to
hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java
index 20e9a8f5d9..bb65909230 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieMapPair.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java
@@ -22,8 +22,7 @@ package org.apache.hudi.common.data;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
-
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -37,12 +36,13 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
import static org.apache.hudi.common.util.CollectionUtils.createImmutableList;
import static org.apache.hudi.common.util.CollectionUtils.createImmutableMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
-public class TestHoodieMapPair {
+public class TestHoodieListDataPairData {
private static final String KEY1 = "key1";
private static final String KEY2 = "key2";
@@ -63,30 +63,30 @@ public class TestHoodieMapPair {
private static final int INTEGER_VALUE4 = 4;
private static final int INTEGER_VALUE5 = 5;
- private static List<Pair<String, String>> TEST_PAIRS;
- private static HoodiePairData<String, String> TEST_HOODIE_MAP_PAIR;
+ private List<Pair<String, String>> testPairs;
+ private HoodiePairData<String, String> testHoodiePairData;
- @BeforeAll
- public static void setup() {
- TEST_PAIRS = constructPairs();
- TEST_HOODIE_MAP_PAIR = constructTestMapPairData(TEST_PAIRS);
+ @BeforeEach
+ public void setup() {
+ testPairs = constructPairs();
+ testHoodiePairData = HoodieListPairData.lazy(testPairs);
}
@Test
public void testKeys() {
- assertHoodieDataEquals(Arrays.asList(KEY1, KEY2, KEY3, KEY4),
TEST_HOODIE_MAP_PAIR.keys());
+ assertHoodieDataEquals(Arrays.asList(KEY1, KEY1, KEY2, KEY2, KEY3, KEY4),
testHoodiePairData.keys());
}
@Test
public void testValues() {
assertHoodieDataEquals(Arrays.asList(
STRING_VALUE1, STRING_VALUE2, STRING_VALUE3, STRING_VALUE4,
STRING_VALUE5, STRING_VALUE6),
- TEST_HOODIE_MAP_PAIR.values());
+ testHoodiePairData.values());
}
@Test
public void testCount() {
- assertEquals(6, TEST_HOODIE_MAP_PAIR.count());
+ assertEquals(6, testHoodiePairData.count());
}
@Test
@@ -97,14 +97,14 @@ public class TestHoodieMapPair {
expectedResultMap.put(KEY3, 1L);
expectedResultMap.put(KEY4, 1L);
- assertEquals(expectedResultMap, TEST_HOODIE_MAP_PAIR.countByKey());
+ assertEquals(expectedResultMap, testHoodiePairData.countByKey());
}
@Test
public void testMap() {
assertHoodieDataEquals(Arrays.asList(
"key1,value1", "key1,value2", "key2,value3", "key2,value4",
"key3,value5", "key4,value6"),
- TEST_HOODIE_MAP_PAIR.map(pair -> pair.getKey() + "," +
pair.getValue()));
+ testHoodiePairData.map(pair -> pair.getKey() + "," + pair.getValue()));
}
@Test
@@ -114,8 +114,8 @@ public class TestHoodieMapPair {
expectedResultMap.put("key20", Arrays.asList(3, 4));
expectedResultMap.put("key30", Arrays.asList(5));
expectedResultMap.put("key40", Arrays.asList(6));
- assertEquals(expectedResultMap, HoodieMapPair.getMapPair(
- TEST_HOODIE_MAP_PAIR.mapToPair(
+ assertEquals(expectedResultMap, toMap(
+ testHoodiePairData.mapToPair(
pair -> {
String value = pair.getValue();
return new ImmutablePair<>(pair.getKey() + "0",
@@ -129,8 +129,7 @@ public class TestHoodieMapPair {
createImmutableMap(
Pair.of(1, createImmutableList(1001)),
Pair.of(2, createImmutableList(2001)),
- Pair.of(3, createImmutableList(3001)),
- Pair.of(4, createImmutableList())),
+ Pair.of(3, createImmutableList(3001))),
createImmutableMap(
Pair.of(1, createImmutableList(1001, 1002, 1003)),
Pair.of(2, createImmutableList(2001, 2002)),
@@ -142,20 +141,20 @@ public class TestHoodieMapPair {
@ParameterizedTest
@MethodSource
public void testReduceByKey(Map<Integer, List<Integer>> expected,
Map<Integer, List<Integer>> original) {
- HoodiePairData<Integer, Integer> reduced =
HoodieMapPair.of(original).reduceByKey((a, b) -> a, 1);
- assertEquals(expected, HoodieMapPair.getMapPair(reduced));
+ HoodiePairData<Integer, Integer> reduced =
HoodieListPairData.lazy(original).reduceByKey((a, b) -> a, 1);
+ assertEquals(expected, toMap(reduced));
}
@Test
public void testLeftOuterJoinSingleValuePerKey() {
- HoodiePairData<String, String> pairData1 =
constructTestMapPairData(Arrays.asList(
+ HoodiePairData<String, String> pairData1 =
HoodieListPairData.lazy(Arrays.asList(
ImmutablePair.of(KEY1, STRING_VALUE1),
ImmutablePair.of(KEY2, STRING_VALUE2),
ImmutablePair.of(KEY3, STRING_VALUE3),
ImmutablePair.of(KEY4, STRING_VALUE4)
));
- HoodiePairData<String, Integer> pairData2 =
constructTestMapPairData(Arrays.asList(
+ HoodiePairData<String, Integer> pairData2 =
HoodieListPairData.lazy(Arrays.asList(
ImmutablePair.of(KEY1, INTEGER_VALUE1),
ImmutablePair.of(KEY2, INTEGER_VALUE2),
ImmutablePair.of(KEY5, INTEGER_VALUE3)
@@ -172,12 +171,12 @@ public class TestHoodieMapPair {
ImmutablePair.of(STRING_VALUE4, Option.empty())));
assertEquals(expectedResultMap,
- HoodieMapPair.getMapPair(pairData1.leftOuterJoin(pairData2)));
+ toMap(pairData1.leftOuterJoin(pairData2)));
}
@Test
public void testLeftOuterJoinMultipleValuesPerKey() {
- HoodiePairData<String, Integer> otherPairData =
constructTestMapPairData(Arrays.asList(
+ HoodiePairData<String, Integer> otherPairData =
HoodieListPairData.lazy(Arrays.asList(
ImmutablePair.of(KEY1, INTEGER_VALUE1),
ImmutablePair.of(KEY2, INTEGER_VALUE2),
ImmutablePair.of(KEY2, INTEGER_VALUE3),
@@ -200,7 +199,25 @@ public class TestHoodieMapPair {
ImmutablePair.of(STRING_VALUE6, Option.empty())));
assertEquals(expectedResultMap,
-
HoodieMapPair.getMapPair(TEST_HOODIE_MAP_PAIR.leftOuterJoin(otherPairData)));
+ toMap(testHoodiePairData.leftOuterJoin(otherPairData)));
+ }
+
+ @Test
+ void testEagerSemantic() {
+ List<Pair<String, Integer>> sourceList =
+ Stream.of("quick", "brown", "fox")
+ .map(s -> Pair.of(s, s.length()))
+ .collect(Collectors.toList());
+
+ HoodieListPairData<String, Integer> originalListData =
HoodieListPairData.eager(sourceList);
+ HoodieData<Integer> lengthsListData = originalListData.values();
+
+ List<Integer> expectedLengths =
sourceList.stream().map(Pair::getValue).collect(Collectors.toList());
+ assertEquals(expectedLengths, lengthsListData.collectAsList());
+ // Here we assert that even though we already de-referenced derivative
container,
+ // we still can dereference its parent (multiple times)
+ assertEquals(3, originalListData.count());
+ assertEquals(sourceList, originalListData.collectAsList());
}
private static List<Pair<String, String>> constructPairs() {
@@ -214,11 +231,14 @@ public class TestHoodieMapPair {
);
}
- private static <V> HoodiePairData<String, V> constructTestMapPairData(
- final List<Pair<String, V>> pairs) {
- Map<String, List<V>> map = new HashMap<>();
- addPairsToMap(map, pairs);
- return HoodieMapPair.of(map);
+ private static <K,V> Map<K, List<V>> toMap(HoodiePairData<K, V> pairData) {
+ return ((List<Pair<K, Iterable<V>>>) pairData.groupByKey().get()).stream()
+ .collect(
+ Collectors.toMap(
+ p -> p.getKey(),
+ p -> StreamSupport.stream(p.getValue().spliterator(),
false).collect(Collectors.toList())
+ )
+ );
}
private static <V> void addPairsToMap(
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
index 4eed1c4ef5..8dadd2e2dc 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
@@ -20,7 +20,7 @@ package org.apache.hudi.sink.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.data.HoodieList;
+import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
@@ -158,7 +158,7 @@ public class CompactionCommitSink extends
CleanFunction<CompactionCommitEvent> {
.collect(Collectors.toList());
HoodieCommitMetadata metadata =
CompactHelpers.getInstance().createCompactionMetadata(
- table, instant, HoodieList.of(statuses),
writeClient.getConfig().getSchema());
+ table, instant, HoodieListData.eager(statuses),
writeClient.getConfig().getSchema());
// commit the compaction
this.writeClient.commitCompaction(instant, metadata, Option.empty());