This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b752df2a2f9 [HUDI-5921] Partition path should be considered in
BucketIndexConcurrentFileWritesConflictResolutionStrategy (#8163)
b752df2a2f9 is described below
commit b752df2a2f9d2cdee79144d94fe776171696691f
Author: Manu <[email protected]>
AuthorDate: Sat Apr 1 11:36:31 2023 +0800
[HUDI-5921] Partition path should be considered in
BucketIndexConcurrentFileWritesConflictResolutionStrategy (#8163)
Generally to compare fileId and partition path for `ConcurrentOperation`,
the fileId can still conflict among different partitions although the odds is
very low.
The relative file path are kept before the patch but never used, we can add
it back if necessary.
---
...urrentFileWritesConflictResolutionStrategy.java | 22 ++++++----
.../client/transaction/ConcurrentOperation.java | 35 ++++++++--------
...urrentFileWritesConflictResolutionStrategy.java | 9 ++--
.../hudi/table/action/compact/CompactHelpers.java | 2 +
...urrentFileWritesConflictResolutionStrategy.java | 48 +++++++++++++++++-----
.../org/apache/hudi/common/util/CommitUtils.java | 23 ++++++-----
6 files changed, 88 insertions(+), 51 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/BucketIndexConcurrentFileWritesConflictResolutionStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/BucketIndexConcurrentFileWritesConflictResolutionStrategy.java
index 503f1c42185..a15a4cc533c 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/BucketIndexConcurrentFileWritesConflictResolutionStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/BucketIndexConcurrentFileWritesConflictResolutionStrategy.java
@@ -37,10 +37,20 @@ public class
BucketIndexConcurrentFileWritesConflictResolutionStrategy
@Override
public boolean hasConflict(ConcurrentOperation thisOperation,
ConcurrentOperation otherOperation) {
// TODO : UUID's can clash even for insert/insert, handle that case.
- Set<String> bucketIdsSetForFirstInstant =
extractBucketIds(thisOperation.getMutatedFileIds());
- Set<String> bucketIdsSetForSecondInstant =
extractBucketIds(otherOperation.getMutatedFileIds());
- Set<String> intersection = new HashSet<>(bucketIdsSetForFirstInstant);
- intersection.retainAll(bucketIdsSetForSecondInstant);
+ Set<String> partitionBucketIdSetForFirstInstant = thisOperation
+ .getMutatedPartitionAndFileIds()
+ .stream()
+ .map(partitionAndFileId ->
+
BucketIdentifier.partitionBucketIdStr(partitionAndFileId.getLeft(),
BucketIdentifier.bucketIdFromFileId(partitionAndFileId.getRight()))
+ ).collect(Collectors.toSet());
+ Set<String> partitionBucketIdSetForSecondInstant = otherOperation
+ .getMutatedPartitionAndFileIds()
+ .stream()
+ .map(partitionAndFileId ->
+
BucketIdentifier.partitionBucketIdStr(partitionAndFileId.getLeft(),
BucketIdentifier.bucketIdFromFileId(partitionAndFileId.getRight()))
+ ).collect(Collectors.toSet());
+ Set<String> intersection = new
HashSet<>(partitionBucketIdSetForFirstInstant);
+ intersection.retainAll(partitionBucketIdSetForSecondInstant);
if (!intersection.isEmpty()) {
LOG.info("Found conflicting writes between first operation = " +
thisOperation
+ ", second operation = " + otherOperation + " , intersecting bucket
ids " + intersection);
@@ -48,8 +58,4 @@ public class
BucketIndexConcurrentFileWritesConflictResolutionStrategy
}
return false;
}
-
- private static Set<String> extractBucketIds(Set<String> fileIds) {
- return
fileIds.stream().map(BucketIdentifier::bucketIdStrFromFileId).collect(Collectors.toSet());
- }
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
index 4195c5bfab2..a83fee77eb7 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
@@ -18,9 +18,7 @@
package org.apache.hudi.client.transaction;
-import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
-import org.apache.hudi.avro.model.HoodieSliceInfo;
import org.apache.hudi.client.utils.MetadataConversionUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieMetadataWrapper;
@@ -29,6 +27,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
import java.io.IOException;
import java.util.Collections;
@@ -40,7 +39,7 @@ import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_AC
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.LOG_COMPACTION_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
-import static
org.apache.hudi.common.util.CommitUtils.getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord;
+import static
org.apache.hudi.common.util.CommitUtils.getPartitionAndFileIdWithoutSuffixFromSpecificRecord;
/**
* This class is used to hold all information used to identify how to resolve
conflicts between instants.
@@ -55,7 +54,7 @@ public class ConcurrentOperation {
private final String actionState;
private final String actionType;
private final String instantTime;
- private Set<String> mutatedFileIds = Collections.emptySet();
+ private Set<Pair<String, String>> mutatedPartitionAndFileIds =
Collections.emptySet();
public ConcurrentOperation(HoodieInstant instant, HoodieTableMetaClient
metaClient) throws IOException {
this.metadataWrapper = new
HoodieMetadataWrapper(MetadataConversionUtils.createMetaWrapper(instant,
metaClient));
@@ -91,8 +90,8 @@ public class ConcurrentOperation {
return operationType;
}
- public Set<String> getMutatedFileIds() {
- return mutatedFileIds;
+ public Set<Pair<String, String>> getMutatedPartitionAndFileIds() {
+ return mutatedPartitionAndFileIds;
}
public Option<HoodieCommitMetadata> getCommitMetadataOption() {
@@ -104,21 +103,21 @@ public class ConcurrentOperation {
switch (getInstantActionType()) {
case COMPACTION_ACTION:
this.operationType = WriteOperationType.COMPACT;
- this.mutatedFileIds =
this.metadataWrapper.getMetadataFromTimeline().getHoodieCompactionPlan().getOperations()
+ this.mutatedPartitionAndFileIds =
this.metadataWrapper.getMetadataFromTimeline().getHoodieCompactionPlan().getOperations()
.stream()
- .map(HoodieCompactionOperation::getFileId)
+ .map(operation -> Pair.of(operation.getPartitionPath(),
operation.getFileId()))
.collect(Collectors.toSet());
break;
case COMMIT_ACTION:
case DELTA_COMMIT_ACTION:
- this.mutatedFileIds =
getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata()
- .getPartitionToWriteStats()).keySet();
+ this.mutatedPartitionAndFileIds =
getPartitionAndFileIdWithoutSuffixFromSpecificRecord(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata()
+ .getPartitionToWriteStats());
this.operationType =
WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata().getOperationType());
break;
case REPLACE_COMMIT_ACTION:
if (instant.isCompleted()) {
- this.mutatedFileIds =
getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord(
-
this.metadataWrapper.getMetadataFromTimeline().getHoodieReplaceCommitMetadata().getPartitionToWriteStats()).keySet();
+ this.mutatedPartitionAndFileIds =
getPartitionAndFileIdWithoutSuffixFromSpecificRecord(
+
this.metadataWrapper.getMetadataFromTimeline().getHoodieReplaceCommitMetadata().getPartitionToWriteStats());
this.operationType =
WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieReplaceCommitMetadata().getOperationType());
} else {
// we need to have different handling for requested and inflight
replacecommit because
@@ -129,16 +128,16 @@ public class ConcurrentOperation {
if (instant.isRequested()) {
// for insert_overwrite/insert_overwrite_table clusteringPlan
will be empty
if (requestedReplaceMetadata != null &&
requestedReplaceMetadata.getClusteringPlan() != null) {
- this.mutatedFileIds =
getFileIdsFromRequestedReplaceMetadata(requestedReplaceMetadata);
+ this.mutatedPartitionAndFileIds =
getPartitionAndFileIdsFromRequestedReplaceMetadata(requestedReplaceMetadata);
this.operationType = WriteOperationType.CLUSTER;
}
} else {
if (inflightCommitMetadata != null) {
- this.mutatedFileIds =
getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord(inflightCommitMetadata.getPartitionToWriteStats()).keySet();
+ this.mutatedPartitionAndFileIds =
getPartitionAndFileIdWithoutSuffixFromSpecificRecord(inflightCommitMetadata.getPartitionToWriteStats());
this.operationType =
WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieInflightReplaceMetadata().getOperationType());
} else if (requestedReplaceMetadata != null) {
// inflight replacecommit metadata is empty due to clustering,
read fileIds from requested replacecommit
- this.mutatedFileIds =
getFileIdsFromRequestedReplaceMetadata(requestedReplaceMetadata);
+ this.mutatedPartitionAndFileIds =
getPartitionAndFileIdsFromRequestedReplaceMetadata(requestedReplaceMetadata);
this.operationType = WriteOperationType.CLUSTER;
}
// NOTE: it cannot be the case that instant is inflight, and
both the requested and inflight replacecommit metadata are empty
@@ -157,7 +156,7 @@ public class ConcurrentOperation {
case DELTA_COMMIT_ACTION:
case REPLACE_COMMIT_ACTION:
case LOG_COMPACTION_ACTION:
- this.mutatedFileIds =
CommitUtils.getFileIdWithoutSuffixAndRelativePaths(this.metadataWrapper.getCommitMetadata().getPartitionToWriteStats()).keySet();
+ this.mutatedPartitionAndFileIds =
CommitUtils.getPartitionAndFileIdWithoutSuffix(this.metadataWrapper.getCommitMetadata().getPartitionToWriteStats());
this.operationType =
this.metadataWrapper.getCommitMetadata().getOperationType();
break;
default:
@@ -166,12 +165,12 @@ public class ConcurrentOperation {
}
}
- private static Set<String>
getFileIdsFromRequestedReplaceMetadata(HoodieRequestedReplaceMetadata
requestedReplaceMetadata) {
+ private static Set<Pair<String, String>>
getPartitionAndFileIdsFromRequestedReplaceMetadata(HoodieRequestedReplaceMetadata
requestedReplaceMetadata) {
return requestedReplaceMetadata
.getClusteringPlan().getInputGroups()
.stream()
.flatMap(ig -> ig.getSlices().stream())
- .map(HoodieSliceInfo::getFileId)
+ .map(fileSlice -> Pair.of(fileSlice.getPartitionPath(),
fileSlice.getFileId()))
.collect(Collectors.toSet());
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java
index a622486b17d..22bcd8d0e0c 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
@@ -72,10 +73,10 @@ public class
SimpleConcurrentFileWritesConflictResolutionStrategy
@Override
public boolean hasConflict(ConcurrentOperation thisOperation,
ConcurrentOperation otherOperation) {
// TODO : UUID's can clash even for insert/insert, handle that case.
- Set<String> fileIdsSetForFirstInstant = thisOperation.getMutatedFileIds();
- Set<String> fileIdsSetForSecondInstant =
otherOperation.getMutatedFileIds();
- Set<String> intersection = new HashSet<>(fileIdsSetForFirstInstant);
- intersection.retainAll(fileIdsSetForSecondInstant);
+ Set<Pair<String, String>> partitionAndFileIdsSetForFirstInstant =
thisOperation.getMutatedPartitionAndFileIds();
+ Set<Pair<String, String>> partitionAndFileIdsSetForSecondInstant =
otherOperation.getMutatedPartitionAndFileIds();
+ Set<Pair<String, String>> intersection = new
HashSet<>(partitionAndFileIdsSetForFirstInstant);
+ intersection.retainAll(partitionAndFileIdsSetForSecondInstant);
if (!intersection.isEmpty()) {
LOG.info("Found conflicting writes between first operation = " +
thisOperation
+ ", second operation = " + otherOperation + " , intersecting file
ids " + intersection);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
index f4498a82d4f..800e6a4acea 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
@@ -23,6 +23,7 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -65,6 +66,7 @@ public class CompactHelpers<T, I, K, O> {
metadata.addWriteStat(stat.getPartitionPath(), stat);
}
metadata.addMetadata(org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY,
schema);
+ metadata.setOperationType(WriteOperationType.COMPACT);
if (compactionPlan.getExtraMetadata() != null) {
compactionPlan.getExtraMetadata().forEach(metadata::addMetadata);
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestBucketIndexConcurrentFileWritesConflictResolutionStrategy.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestBucketIndexConcurrentFileWritesConflictResolutionStrategy.java
index 24c578606d4..df5b03ec7dd 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestBucketIndexConcurrentFileWritesConflictResolutionStrategy.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestBucketIndexConcurrentFileWritesConflictResolutionStrategy.java
@@ -68,9 +68,9 @@ public class
TestBucketIndexConcurrentFileWritesConflictResolutionStrategy exten
createCommit(newInstantTime);
// consider commits before this are all successful
// writer 1
- createInflightCommit(HoodieTestTable.makeNewCommitTime());
+ createInflightCommit(HoodieTestTable.makeNewCommitTime(),
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
// writer 2
- createInflightCommit(HoodieTestTable.makeNewCommitTime());
+ createInflightCommit(HoodieTestTable.makeNewCommitTime(),
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
Option<HoodieInstant> lastSuccessfulInstant =
metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant();
newInstantTime = HoodieTestTable.makeNewCommitTime();
Option<HoodieInstant> currentInstant = Option.of(new
HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, newInstantTime));
@@ -87,14 +87,14 @@ public class
TestBucketIndexConcurrentFileWritesConflictResolutionStrategy exten
Option<HoodieInstant> lastSuccessfulInstant =
timeline.getCommitsTimeline().filterCompletedInstants().lastInstant();
// writer 1 starts
String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime();
- createInflightCommit(currentWriterInstant);
+ createInflightCommit(currentWriterInstant,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
// writer 2 starts and finishes
String newInstantTime = HoodieActiveTimeline.createNewInstantTime();
createCommit(newInstantTime);
Option<HoodieInstant> currentInstant = Option.of(new
HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION,
currentWriterInstant));
SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new
BucketIndexConcurrentFileWritesConflictResolutionStrategy();
- HoodieCommitMetadata currentMetadata =
createCommitMetadata(currentWriterInstant);
+ HoodieCommitMetadata currentMetadata =
createCommitMetadata(currentWriterInstant,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
timeline = timeline.reload();
List<HoodieInstant> candidateInstants =
strategy.getCandidateInstants(timeline, currentInstant.get(),
lastSuccessfulInstant).collect(
Collectors.toList());
@@ -111,6 +111,34 @@ public class
TestBucketIndexConcurrentFileWritesConflictResolutionStrategy exten
}
}
+ @Test
+ public void testConcurrentWritesWithDifferentPartition() throws Exception {
+ createCommit(HoodieActiveTimeline.createNewInstantTime());
+ HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+ // consider commits before this are all successful
+ Option<HoodieInstant> lastSuccessfulInstant =
timeline.getCommitsTimeline().filterCompletedInstants().lastInstant();
+ // writer 1 starts
+ String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime();
+ createInflightCommit(currentWriterInstant,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH);
+ // writer 2 starts and finishes
+ String newInstantTime = HoodieActiveTimeline.createNewInstantTime();
+ createCommit(newInstantTime);
+
+ Option<HoodieInstant> currentInstant = Option.of(new
HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION,
currentWriterInstant));
+ SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new
BucketIndexConcurrentFileWritesConflictResolutionStrategy();
+ HoodieCommitMetadata currentMetadata =
createCommitMetadata(currentWriterInstant,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH);
+ timeline = timeline.reload();
+ List<HoodieInstant> candidateInstants =
strategy.getCandidateInstants(timeline, currentInstant.get(),
lastSuccessfulInstant).collect(
+ Collectors.toList());
+
+ // there should be 1 candidate instant
+ Assertions.assertEquals(1, candidateInstants.size());
+ ConcurrentOperation thatCommitOperation = new
ConcurrentOperation(candidateInstants.get(0), metaClient);
+ ConcurrentOperation thisCommitOperation = new
ConcurrentOperation(currentInstant.get(), currentMetadata);
+ // there should be no conflict between writer 1 and writer 2
+ Assertions.assertFalse(strategy.hasConflict(thisCommitOperation,
thatCommitOperation));
+ }
+
private void createCommit(String instantTime) throws Exception {
String fileId1 = "00000001-file-" + instantTime + "-1";
String fileId2 = "00000002-file-" + instantTime + "-2";
@@ -126,25 +154,25 @@ public class
TestBucketIndexConcurrentFileWritesConflictResolutionStrategy exten
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
fileId1, fileId2);
}
- private HoodieCommitMetadata createCommitMetadata(String instantTime, String
writeFileName) {
+ private HoodieCommitMetadata createCommitMetadata(String instantTime, String
writeFileName, String partition) {
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
commitMetadata.addMetadata("test", "test");
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setFileId(writeFileName);
-
commitMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
writeStat);
+ commitMetadata.addWriteStat(partition, writeStat);
commitMetadata.setOperationType(WriteOperationType.INSERT);
return commitMetadata;
}
- private HoodieCommitMetadata createCommitMetadata(String instantTime) {
- return createCommitMetadata(instantTime, "00000001-file-" + instantTime +
"-1");
+ private HoodieCommitMetadata createCommitMetadata(String instantTime, String
partition) {
+ return createCommitMetadata(instantTime, "00000001-file-" + instantTime +
"-1", partition);
}
- private void createInflightCommit(String instantTime) throws Exception {
+ private void createInflightCommit(String instantTime, String partition)
throws Exception {
String fileId1 = "00000001-file-" + instantTime + "-1";
String fileId2 = "00000002-file-" + instantTime + "-2";
HoodieTestTable.of(metaClient)
.addInflightCommit(instantTime)
-
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
fileId1, fileId2);
+ .withBaseFilesInPartition(partition, fileId1, fileId2);
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
index ce94c92fd45..dbfffbeaf09 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
@@ -33,9 +34,10 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
-import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* Helper class to generate commit metadata.
@@ -117,28 +119,27 @@ public class CommitUtils {
return commitMetadata;
}
- public static HashMap<String, String>
getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord(Map<String,
List<org.apache.hudi.avro.model.HoodieWriteStat>>
-
partitionToWriteStats) {
- HashMap<String, String> fileIdToPath = new HashMap<>();
+ public static Set<Pair<String, String>>
getPartitionAndFileIdWithoutSuffixFromSpecificRecord(Map<String,
List<org.apache.hudi.avro.model.HoodieWriteStat>>
+
partitionToWriteStats) {
+ Set<Pair<String, String>> partitionToFileId = new HashSet<>();
// list all partitions paths
for (Map.Entry<String, List<org.apache.hudi.avro.model.HoodieWriteStat>>
entry : partitionToWriteStats.entrySet()) {
for (org.apache.hudi.avro.model.HoodieWriteStat stat : entry.getValue())
{
- fileIdToPath.put(stat.getFileId(), stat.getPath());
+ partitionToFileId.add(Pair.of(entry.getKey(), stat.getFileId()));
}
}
- return fileIdToPath;
+ return partitionToFileId;
}
- public static HashMap<String, String>
getFileIdWithoutSuffixAndRelativePaths(Map<String, List<HoodieWriteStat>>
- partitionToWriteStats) {
- HashMap<String, String> fileIdToPath = new HashMap<>();
+ public static Set<Pair<String, String>>
getPartitionAndFileIdWithoutSuffix(Map<String, List<HoodieWriteStat>>
partitionToWriteStats) {
+ Set<Pair<String, String>> partitionTofileId = new HashSet<>();
// list all partitions paths
for (Map.Entry<String, List<HoodieWriteStat>> entry :
partitionToWriteStats.entrySet()) {
for (HoodieWriteStat stat : entry.getValue()) {
- fileIdToPath.put(stat.getFileId(), stat.getPath());
+ partitionTofileId.add(Pair.of(entry.getKey(), stat.getFileId()));
}
}
- return fileIdToPath;
+ return partitionTofileId;
}
/**