This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3d1fbf74bbc [HUDI-4822] Extract the baseFile and logFiles from
HoodieDeltaWriteStat in the right way (#11518)
3d1fbf74bbc is described below
commit 3d1fbf74bbcc1dbdda716e5b856f971384584914
Author: Vova Kolmakov <[email protected]>
AuthorDate: Thu Jul 18 17:59:06 2024 +0700
[HUDI-4822] Extract the baseFile and logFiles from HoodieDeltaWriteStat in
the right way (#11518)
Co-authored-by: Vova Kolmakov <[email protected]>
---
.../hudi/common/model/HoodieCommitMetadata.java | 33 ++++------------
.../common/model/TestHoodieCommitMetadata.java | 44 ++++++++++++++++++++++
2 files changed, 52 insertions(+), 25 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
index b132daf97e5..46874bb10f8 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
@@ -28,8 +28,6 @@ import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,7 +37,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -248,30 +245,16 @@ public class HoodieCommitMetadata implements Serializable
{
* parse the bytes of deltacommit, and get the base file and the log files
belonging to this
* provided file group.
*/
- // TODO: refactor this method to avoid doing the json tree walking
(HUDI-4822).
public static Option<Pair<String, List<String>>>
getFileSliceForFileGroupFromDeltaCommit(byte[] bytes, HoodieFileGroupId
fileGroupId) {
try {
- String jsonStr = fromUTF8Bytes(
- convertCommitMetadataToJsonBytes(deserializeCommitMetadata(bytes),
org.apache.hudi.avro.model.HoodieCommitMetadata.class));
- if (jsonStr.isEmpty()) {
- return Option.empty();
- }
- JsonNode ptToWriteStatsMap =
JsonUtils.getObjectMapper().readTree(jsonStr).get("partitionToWriteStats");
- Iterator<Map.Entry<String, JsonNode>> pts = ptToWriteStatsMap.fields();
- while (pts.hasNext()) {
- Map.Entry<String, JsonNode> ptToWriteStats = pts.next();
- if (ptToWriteStats.getValue().isArray()) {
- for (JsonNode writeStat : ptToWriteStats.getValue()) {
- HoodieFileGroupId fgId = new
HoodieFileGroupId(ptToWriteStats.getKey(), writeStat.get("fileId").asText());
- if (fgId.equals(fileGroupId)) {
- String baseFile = writeStat.get("baseFile").asText();
- ArrayNode logFilesNode = (ArrayNode) writeStat.get("logFiles");
- List<String> logFiles = new ArrayList<>();
- for (JsonNode logFile : logFilesNode) {
- logFiles.add(logFile.asText());
- }
- return Option.of(Pair.of(baseFile, logFiles));
- }
+ org.apache.hudi.avro.model.HoodieCommitMetadata commitMetadata =
deserializeCommitMetadata(bytes);
+ Map<String,List<org.apache.hudi.avro.model.HoodieWriteStat>>
partitionToWriteStatsMap =
+ commitMetadata.getPartitionToWriteStats();
+ for (Map.Entry<String, List<org.apache.hudi.avro.model.HoodieWriteStat>>
partitionToWriteStat: partitionToWriteStatsMap.entrySet()) {
+ for (org.apache.hudi.avro.model.HoodieWriteStat writeStat:
partitionToWriteStat.getValue()) {
+ HoodieFileGroupId fgId = new
HoodieFileGroupId(partitionToWriteStat.getKey(), writeStat.getFileId());
+ if (fgId.equals(fileGroupId)) {
+ return Option.of(Pair.of(writeStat.getBaseFile() == null ? "" :
writeStat.getBaseFile(), writeStat.getLogFiles()));
}
}
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java
index e8c159540a3..a425a55db68 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java
@@ -18,16 +18,22 @@
package org.apache.hudi.common.model;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.JsonUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -103,4 +109,42 @@ public class TestHoodieCommitMetadata {
HoodieCommitMetadata.fromJsonString(serializedCommitMetadata,
HoodieCommitMetadata.class);
assertSame(metadata.getOperationType(), WriteOperationType.INSERT);
}
+
+ @Test
+ public void testGetFileSliceForFileGroupFromDeltaCommit() throws IOException
{
+ org.apache.hudi.avro.model.HoodieCommitMetadata commitMetadata = new
org.apache.hudi.avro.model.HoodieCommitMetadata();
+ org.apache.hudi.avro.model.HoodieWriteStat writeStat1 =
createWriteStat("111", "111base", Arrays.asList("1.log", "2.log"));
+ org.apache.hudi.avro.model.HoodieWriteStat writeStat2 =
createWriteStat("111", "111base", Arrays.asList("3.log", "4.log"));
+ org.apache.hudi.avro.model.HoodieWriteStat writeStat3 =
createWriteStat("222", null, Collections.singletonList("5.log"));
+ Map<String,List<org.apache.hudi.avro.model.HoodieWriteStat>>
partitionToWriteStatsMap = new HashMap<>();
+ partitionToWriteStatsMap.put("partition1", Arrays.asList(writeStat2,
writeStat3));
+ partitionToWriteStatsMap.put("partition2",
Collections.singletonList(writeStat1));
+ commitMetadata.setPartitionToWriteStats(partitionToWriteStatsMap);
+ byte[] serializedCommitMetadata =
TimelineMetadataUtils.serializeAvroMetadata(
+ commitMetadata,
org.apache.hudi.avro.model.HoodieCommitMetadata.class).get();
+
+ Option<Pair<String, List<String>>> result =
HoodieCommitMetadata.getFileSliceForFileGroupFromDeltaCommit(
+ serializedCommitMetadata, new
HoodieFileGroupId("partition1","111"));
+
+ assertTrue(result.isPresent());
+ assertEquals("111base", result.get().getKey());
+ assertEquals(2, result.get().getValue().size());
+ assertEquals("3.log", result.get().getValue().get(0));
+ assertEquals("4.log", result.get().getValue().get(1));
+
+ result = HoodieCommitMetadata.getFileSliceForFileGroupFromDeltaCommit(
+ serializedCommitMetadata, new
HoodieFileGroupId("partition1","222"));
+ assertTrue(result.isPresent());
+ assertTrue(result.get().getKey().isEmpty());
+ assertEquals(1, result.get().getValue().size());
+ assertEquals("5.log", result.get().getValue().get(0));
+ }
+
+ private org.apache.hudi.avro.model.HoodieWriteStat createWriteStat(String
fileId, String baseFile, List<String> logFiles) {
+ org.apache.hudi.avro.model.HoodieWriteStat writeStat = new
org.apache.hudi.avro.model.HoodieWriteStat();
+ writeStat.setFileId(fileId);
+ writeStat.setBaseFile(baseFile);
+ writeStat.setLogFiles(logFiles);
+ return writeStat;
+ }
}