This is an automated email from the ASF dual-hosted git repository.
nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 11ad4ed [HUDI-1661] Exclude clustering commits from
getExtraMetadataFromLatest API (#2632)
11ad4ed is described below
commit 11ad4ed26b6046201945f0e14449e1cbc5b6f1f2
Author: satishkotha <[email protected]>
AuthorDate: Fri Mar 5 13:42:19 2021 -0800
[HUDI-1661] Exclude clustering commits from getExtraMetadataFromLatest API
(#2632)
---
.../hudi/common/table/timeline/TimelineUtils.java | 39 ++++++++++++++++++-
.../hudi/common/table/TestTimelineUtils.java | 45 ++++++++++++++++++----
2 files changed, 75 insertions(+), 9 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
index f9dacf0..de8c582 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
@@ -22,9 +22,12 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.Collection;
@@ -43,6 +46,7 @@ import java.util.stream.Stream;
* 2) Incremental reads - InputFormats can use this API to query
*/
public class TimelineUtils {
+ private static final Logger LOG = LogManager.getLogger(TimelineUtils.class);
/**
* Returns partitions that have new data strictly after commitTime.
@@ -117,13 +121,27 @@ public class TimelineUtils {
}
/**
- * Get extra metadata for specified key from latest commit/deltacommit
instant.
+ * Get extra metadata for specified key from latest
commit/deltacommit/replacecommit(eg. insert_overwrite) instant.
*/
public static Option<String>
getExtraMetadataFromLatest(HoodieTableMetaClient metaClient, String
extraMetadataKey) {
- return
metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants().findFirst().map(instant
->
+ return
metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants()
+ // exclude clustering commits for returning user stored extra metadata
+ .filter(instant -> !isClusteringCommit(metaClient, instant))
+ .findFirst().map(instant ->
getMetadataValue(metaClient, extraMetadataKey,
instant)).orElse(Option.empty());
}
+
+ /**
+ * Get extra metadata for specified key from latest
commit/deltacommit/replacecommit instant including internal commits
+ * such as clustering.
+ */
+ public static Option<String>
getExtraMetadataFromLatestIncludeClustering(HoodieTableMetaClient metaClient,
String extraMetadataKey) {
+ return
metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants()
+ .findFirst().map(instant ->
+ getMetadataValue(metaClient, extraMetadataKey,
instant)).orElse(Option.empty());
+ }
+
/**
* Get extra metadata for specified key from all active commit/deltacommit
instants.
*/
@@ -134,6 +152,7 @@ public class TimelineUtils {
private static Option<String> getMetadataValue(HoodieTableMetaClient
metaClient, String extraMetadataKey, HoodieInstant instant) {
try {
+ LOG.info("reading checkpoint info for:" + instant + " key: " +
extraMetadataKey);
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
metaClient.getCommitsTimeline().getInstantDetails(instant).get(),
HoodieCommitMetadata.class);
@@ -142,4 +161,20 @@ public class TimelineUtils {
throw new HoodieIOException("Unable to parse instant metadata " +
instant, e);
}
}
+
+ private static boolean isClusteringCommit(HoodieTableMetaClient metaClient,
HoodieInstant instant) {
+ try {
+ if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
+ // replacecommit is used for multiple operations:
insert_overwrite/cluster etc.
+ // Check operation type to see if this instant is related to
clustering.
+ HoodieReplaceCommitMetadata replaceMetadata =
HoodieReplaceCommitMetadata.fromBytes(
+ metaClient.getActiveTimeline().getInstantDetails(instant).get(),
HoodieReplaceCommitMetadata.class);
+ return
WriteOperationType.CLUSTER.equals(replaceMetadata.getOperationType());
+ }
+
+ return false;
+ } catch (IOException e) {
+ throw new HoodieIOException("Unable to read instant information: " +
instant + " for " + metaClient.getBasePath(), e);
+ }
+ }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
index 18c0d3f..cf7f6d8 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -73,7 +74,8 @@ public class TestTimelineUtils extends
HoodieCommonTestHarness {
activeTimeline.createNewInstant(instant1);
// create replace metadata only with replaced file Ids (no new files
created)
activeTimeline.saveAsComplete(instant1,
- Option.of(getReplaceCommitMetadata(basePath, ts1, replacePartition, 2,
newFilePartition, 0, Collections.emptyMap())));
+ Option.of(getReplaceCommitMetadata(basePath, ts1, replacePartition, 2,
+ newFilePartition, 0, Collections.emptyMap(),
WriteOperationType.CLUSTER)));
metaClient.reloadActiveTimeline();
List<String> partitions =
TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("0",
10));
@@ -85,7 +87,8 @@ public class TestTimelineUtils extends
HoodieCommonTestHarness {
activeTimeline.createNewInstant(instant2);
// create replace metadata only with replaced file Ids (no new files
created)
activeTimeline.saveAsComplete(instant2,
- Option.of(getReplaceCommitMetadata(basePath, ts2, replacePartition, 0,
newFilePartition, 3, Collections.emptyMap())));
+ Option.of(getReplaceCommitMetadata(basePath, ts2, replacePartition, 0,
+ newFilePartition, 3, Collections.emptyMap(),
WriteOperationType.CLUSTER)));
metaClient.reloadActiveTimeline();
partitions =
TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1",
10));
assertEquals(1, partitions.size());
@@ -211,16 +214,42 @@ public class TestTimelineUtils extends
HoodieCommonTestHarness {
metaClient.reloadActiveTimeline();
// verify modified partitions included cleaned data
- Option<String> extraLatestValue =
TimelineUtils.getExtraMetadataFromLatest(metaClient, extraMetadataKey);
- assertTrue(extraLatestValue.isPresent());
- assertEquals(extraMetadataValue1, extraLatestValue.get());
+ verifyExtraMetadataLatestValue(extraMetadataKey, extraMetadataValue1,
false);
+ assertFalse(TimelineUtils.getExtraMetadataFromLatest(metaClient,
"unknownKey").isPresent());
+
+ // verify adding clustering commit doesnt change behavior of
getExtraMetadataFromLatest
+ String ts2 = "2";
+ HoodieInstant instant2 = new HoodieInstant(true,
HoodieTimeline.REPLACE_COMMIT_ACTION, ts2);
+ activeTimeline.createNewInstant(instant2);
+ String newValueForMetadata = "newValue2";
+ extraMetadata.put(extraMetadataKey, newValueForMetadata);
+ activeTimeline.saveAsComplete(instant2,
+ Option.of(getReplaceCommitMetadata(basePath, ts2, "p2", 0,
+ "p2", 3, extraMetadata, WriteOperationType.CLUSTER)));
+ metaClient.reloadActiveTimeline();
+
+ verifyExtraMetadataLatestValue(extraMetadataKey, extraMetadataValue1,
false);
+ verifyExtraMetadataLatestValue(extraMetadataKey, newValueForMetadata,
true);
assertFalse(TimelineUtils.getExtraMetadataFromLatest(metaClient,
"unknownKey").isPresent());
Map<String, Option<String>> extraMetadataEntries =
TimelineUtils.getAllExtraMetadataForKey(metaClient, extraMetadataKey);
- assertEquals(2, extraMetadataEntries.size());
+ assertEquals(3, extraMetadataEntries.size());
assertFalse(extraMetadataEntries.get("0").isPresent());
assertTrue(extraMetadataEntries.get("1").isPresent());
assertEquals(extraMetadataValue1, extraMetadataEntries.get("1").get());
+ assertTrue(extraMetadataEntries.get("2").isPresent());
+ assertEquals(newValueForMetadata, extraMetadataEntries.get("2").get());
+ }
+
+ private void verifyExtraMetadataLatestValue(String extraMetadataKey, String
expected, boolean includeClustering) {
+ final Option<String> extraLatestValue;
+ if (includeClustering) {
+ extraLatestValue =
TimelineUtils.getExtraMetadataFromLatestIncludeClustering(metaClient,
extraMetadataKey);
+ } else {
+ extraLatestValue = TimelineUtils.getExtraMetadataFromLatest(metaClient,
extraMetadataKey);
+ }
+ assertTrue(extraLatestValue.isPresent());
+ assertEquals(expected, extraLatestValue.get());
}
private byte[] getRestoreMetadata(String basePath, String partition, String
commitTs, int count, String actionType) throws IOException {
@@ -265,9 +294,11 @@ public class TestTimelineUtils extends
HoodieCommonTestHarness {
}
private byte[] getReplaceCommitMetadata(String basePath, String commitTs,
String replacePartition, int replaceCount,
- String newFilePartition, int newFileCount, Map<String, String>
extraMetadata)
+ String newFilePartition, int
newFileCount, Map<String, String> extraMetadata,
+ WriteOperationType operationType)
throws IOException {
HoodieReplaceCommitMetadata commit = new HoodieReplaceCommitMetadata();
+ commit.setOperationType(operationType);
for (int i = 1; i <= newFileCount; i++) {
HoodieWriteStat stat = new HoodieWriteStat();
stat.setFileId(i + "");