This is an automated email from the ASF dual-hosted git repository.
nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4de0fcf [HUDI-566] Added new test cases for class HoodieTimeline,
HoodieDefaultTimeline and HoodieActiveTimeline.
4de0fcf is described below
commit 4de0fcfcb54ac76ed3b6852917588c32fec9bea8
Author: Prashant Wason <[email protected]>
AuthorDate: Mon Jan 27 15:38:33 2020 -0800
[HUDI-566] Added new test cases for class HoodieTimeline,
HoodieDefaultTimeline and HoodieActiveTimeline.
---
.../apache/hudi/common/table/HoodieTimeline.java | 4 +
.../table/string/TestHoodieActiveTimeline.java | 276 ++++++++++++++++++++-
2 files changed, 279 insertions(+), 1 deletion(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java
old mode 100644
new mode 100755
index a964411..015a497
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java
@@ -56,6 +56,10 @@ public interface HoodieTimeline extends Serializable {
String REQUESTED_EXTENSION = ".requested";
String RESTORE_ACTION = "restore";
+ String[] VALID_ACTIONS_IN_TIMELINE = {COMMIT_ACTION, DELTA_COMMIT_ACTION,
+ CLEAN_ACTION, SAVEPOINT_ACTION, RESTORE_ACTION, ROLLBACK_ACTION,
+ COMPACTION_ACTION};
+
String COMMIT_EXTENSION = "." + COMMIT_ACTION;
String DELTA_COMMIT_EXTENSION = "." + DELTA_COMMIT_ACTION;
String CLEAN_EXTENSION = "." + CLEAN_ACTION;
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java
old mode 100644
new mode 100755
index 55a91cf..a9f027e
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java
@@ -18,7 +18,6 @@
package org.apache.hudi.common.table.string;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.HoodieCommonTestHarness;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.model.TimelineLayoutVersion;
@@ -29,12 +28,22 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.util.Option;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.fs.Path;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.common.model.TimelineLayoutVersion.VERSION_0;
@@ -164,4 +173,269 @@ public class TestHoodieActiveTimeline extends
HoodieCommonTestHarness {
assertFalse("", activeCommitTimeline.isBeforeTimelineStarts("02"));
assertTrue("", activeCommitTimeline.isBeforeTimelineStarts("00"));
}
+
+ @Test
+ public void testTimelineGetOperations() {
+ List<HoodieInstant> allInstants = getAllInstants();
+ Supplier<Stream<HoodieInstant>> sup = () -> allInstants.stream();
+ timeline = new HoodieActiveTimeline(metaClient, true);
+ timeline.setInstants(allInstants);
+
+ /**
+ * Helper function to check HoodieTimeline only contains some type of
Instant actions.
+ * @param timeline The HoodieTimeline to check
+ * @param actions The actions that should be present in the timeline being
checked
+ */
+ BiConsumer<HoodieTimeline, Set<String>> checkTimeline = (HoodieTimeline
timeline, Set<String> actions) -> {
+ sup.get().filter(i -> actions.contains(i.getAction())).forEach(i ->
assertTrue(timeline.containsInstant(i)));
+ sup.get().filter(i -> !actions.contains(i.getAction())).forEach(i ->
assertFalse(timeline.containsInstant(i)));
+ };
+
+ // Test that various types of getXXX operations from HoodieActiveTimeline
+ // return the correct set of Instant
+ checkTimeline.accept(timeline.getCommitsTimeline(),
+ Sets.newHashSet(HoodieTimeline.COMMIT_ACTION,
HoodieTimeline.DELTA_COMMIT_ACTION));
+ checkTimeline.accept(timeline.getCommitsAndCompactionTimeline(),
+ Sets.newHashSet(HoodieTimeline.COMMIT_ACTION,
HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.COMPACTION_ACTION));
+ checkTimeline.accept(timeline.getCommitTimeline(),
Sets.newHashSet(HoodieTimeline.COMMIT_ACTION));
+
+ checkTimeline.accept(timeline.getDeltaCommitTimeline(),
Sets.newHashSet(HoodieTimeline.DELTA_COMMIT_ACTION));
+ checkTimeline.accept(timeline.getCleanerTimeline(),
Sets.newHashSet(HoodieTimeline.CLEAN_ACTION));
+ checkTimeline.accept(timeline.getRollbackTimeline(),
Sets.newHashSet(HoodieTimeline.ROLLBACK_ACTION));
+ checkTimeline.accept(timeline.getRestoreTimeline(),
Sets.newHashSet(HoodieTimeline.RESTORE_ACTION));
+ checkTimeline.accept(timeline.getSavePointTimeline(),
Sets.newHashSet(HoodieTimeline.SAVEPOINT_ACTION));
+ checkTimeline.accept(timeline.getAllCommitsTimeline(),
+ Sets.newHashSet(HoodieTimeline.COMMIT_ACTION,
HoodieTimeline.DELTA_COMMIT_ACTION,
+ HoodieTimeline.CLEAN_ACTION,
HoodieTimeline.COMPACTION_ACTION,
+ HoodieTimeline.SAVEPOINT_ACTION,
HoodieTimeline.ROLLBACK_ACTION));
+
+ // Get some random Instants
+ Random rand = new Random();
+ Set<String> randomInstants = sup.get().filter(i -> rand.nextBoolean())
+ .map(i -> i.getAction())
+ .collect(Collectors.toSet());
+ checkTimeline.accept(timeline.getTimelineOfActions(randomInstants),
randomInstants);
+ }
+
+ @Test
+ public void testTimelineInstantOperations() {
+ timeline = new HoodieActiveTimeline(metaClient, true);
+ assertEquals("No instant present", timeline.countInstants(), 0);
+
+ // revertToInflight
+ HoodieInstant commit = new HoodieInstant(State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, "1");
+ timeline.createNewInstant(commit);
+ timeline = timeline.reload();
+ assertEquals(timeline.countInstants(), 1);
+ assertTrue(timeline.containsInstant(commit));
+ HoodieInstant inflight = timeline.revertToInflight(commit);
+ // revert creates the .requested file
+ timeline = timeline.reload();
+ assertEquals(timeline.countInstants(), 1);
+ assertTrue(timeline.containsInstant(inflight));
+ assertFalse(timeline.containsInstant(commit));
+
+ // deleteInflight
+ timeline.deleteInflight(inflight);
+ timeline = timeline.reload();
+ assertEquals(timeline.countInstants(), 1);
+ assertFalse(timeline.containsInstant(inflight));
+ assertFalse(timeline.containsInstant(commit));
+
+ // deletePending
+ timeline.createNewInstant(commit);
+ timeline.createNewInstant(inflight);
+ timeline = timeline.reload();
+ assertEquals(timeline.countInstants(), 1);
+ timeline.deletePending(inflight);
+ timeline = timeline.reload();
+ assertEquals(timeline.countInstants(), 1);
+ assertFalse(timeline.containsInstant(inflight));
+ assertTrue(timeline.containsInstant(commit));
+
+ // deleteCompactionRequested
+ HoodieInstant compaction = new HoodieInstant(State.REQUESTED,
HoodieTimeline.COMPACTION_ACTION, "2");
+ timeline.createNewInstant(compaction);
+ timeline = timeline.reload();
+ assertEquals(timeline.countInstants(), 2);
+ timeline.deleteCompactionRequested(compaction);
+ timeline = timeline.reload();
+ assertEquals(timeline.countInstants(), 1);
+ assertFalse(timeline.containsInstant(inflight));
+ assertFalse(timeline.containsInstant(compaction));
+ assertTrue(timeline.containsInstant(commit));
+
+ // transitionCompactionXXXtoYYY and revertCompactionXXXtoYYY
+ compaction = new HoodieInstant(State.REQUESTED,
HoodieTimeline.COMPACTION_ACTION, "3");
+ timeline.createNewInstant(compaction);
+ timeline = timeline.reload();
+ assertTrue(timeline.containsInstant(compaction));
+ inflight = timeline.transitionCompactionRequestedToInflight(compaction);
+ timeline = timeline.reload();
+ assertFalse(timeline.containsInstant(compaction));
+ assertTrue(timeline.containsInstant(inflight));
+ compaction = timeline.revertCompactionInflightToRequested(inflight);
+ timeline = timeline.reload();
+ assertTrue(timeline.containsInstant(compaction));
+ assertFalse(timeline.containsInstant(inflight));
+ inflight = timeline.transitionCompactionRequestedToInflight(compaction);
+ compaction = timeline.transitionCompactionInflightToComplete(inflight,
Option.empty());
+ timeline = timeline.reload();
+ assertTrue(timeline.containsInstant(compaction));
+ assertFalse(timeline.containsInstant(inflight));
+
+ // transitionCleanXXXtoYYY
+ HoodieInstant clean = new HoodieInstant(State.REQUESTED,
HoodieTimeline.CLEAN_ACTION, "4");
+ timeline.saveToCleanRequested(clean, Option.empty());
+ timeline = timeline.reload();
+ assertTrue(timeline.containsInstant(clean));
+ inflight = timeline.transitionCleanRequestedToInflight(clean,
Option.empty());
+ timeline = timeline.reload();
+ assertFalse(timeline.containsInstant(clean));
+ assertTrue(timeline.containsInstant(inflight));
+ clean = timeline.transitionCleanInflightToComplete(inflight,
Option.empty());
+ timeline = timeline.reload();
+ assertTrue(timeline.containsInstant(clean));
+ assertFalse(timeline.containsInstant(inflight));
+
+ // Various states of Instants
+ HoodieInstant srcInstant = new HoodieInstant(State.COMPLETED,
HoodieTimeline.RESTORE_ACTION, "5");
+ HoodieInstant otherInstant =
HoodieTimeline.getRequestedInstant(srcInstant);
+ assertEquals(otherInstant, new HoodieInstant(State.REQUESTED,
HoodieTimeline.RESTORE_ACTION, "5"));
+ otherInstant = HoodieTimeline.getCleanRequestedInstant("5");
+ assertEquals(otherInstant, new HoodieInstant(State.REQUESTED,
HoodieTimeline.CLEAN_ACTION, "5"));
+ otherInstant = HoodieTimeline.getCleanInflightInstant("5");
+ assertEquals(otherInstant, new HoodieInstant(State.INFLIGHT,
HoodieTimeline.CLEAN_ACTION, "5"));
+ otherInstant = HoodieTimeline.getCompactionRequestedInstant("5");
+ assertEquals(otherInstant, new HoodieInstant(State.REQUESTED,
HoodieTimeline.COMPACTION_ACTION, "5"));
+ otherInstant = HoodieTimeline.getCompactionInflightInstant("5");
+ assertEquals(otherInstant, new HoodieInstant(State.INFLIGHT,
HoodieTimeline.COMPACTION_ACTION, "5"));
+
+ // containsOrBeforeTimelineStarts
+ List<HoodieInstant> allInstants = getAllInstants();
+ timeline = new HoodieActiveTimeline(metaClient, true);
+ timeline.setInstants(allInstants);
+
+ timeline.setInstants(allInstants);
+ timeline.createNewInstant(new HoodieInstant(State.REQUESTED,
HoodieTimeline.COMMIT_ACTION, "2"));
+ allInstants.stream().map(i -> i.getTimestamp()).forEach(s ->
assertTrue(timeline.containsOrBeforeTimelineStarts(s)));
+ assertTrue(timeline.containsOrBeforeTimelineStarts("0"));
+
assertFalse(timeline.containsOrBeforeTimelineStarts(String.valueOf(System.currentTimeMillis()
+ 1000)));
+ assertFalse(timeline.getTimelineHash().isEmpty());
+ }
+
+ @Test
+ public void testCreateInstants() {
+ List<HoodieInstant> allInstants = getAllInstants();
+ for (HoodieInstant instant : allInstants) {
+ timeline.createNewInstant(instant);
+ }
+
+ timeline = timeline.reload();
+ for (HoodieInstant instant : allInstants) {
+ assertTrue(timeline.containsInstant(instant));
+ }
+ }
+
+ @Test
+ public void testInstantFilenameOperations() {
+ HoodieInstant instantRequested = new HoodieInstant(State.REQUESTED,
HoodieTimeline.RESTORE_ACTION, "5");
+ HoodieInstant instantInflight = new HoodieInstant(State.INFLIGHT,
HoodieTimeline.RESTORE_ACTION, "5");
+ HoodieInstant instantComplete = new HoodieInstant(State.COMPLETED,
HoodieTimeline.RESTORE_ACTION, "5");
+
assertEquals(HoodieTimeline.getCommitFromCommitFile(instantRequested.getFileName()),
"5");
+
assertEquals(HoodieTimeline.getCommitFromCommitFile(instantInflight.getFileName()),
"5");
+
assertEquals(HoodieTimeline.getCommitFromCommitFile(instantComplete.getFileName()),
"5");
+
+
assertEquals(HoodieTimeline.makeFileNameAsComplete(instantInflight.getFileName()),
+ instantComplete.getFileName());
+
+
assertEquals(HoodieTimeline.makeFileNameAsInflight(instantComplete.getFileName()),
+ instantInflight.getFileName());
+ }
+
+ @Test
+ public void testFiltering() {
+ List<HoodieInstant> allInstants = getAllInstants();
+ Supplier<Stream<HoodieInstant>> sup = () -> allInstants.stream();
+
+ timeline = new HoodieActiveTimeline(metaClient);
+ timeline.setInstants(allInstants);
+
+ // getReverseOrderedInstants
+ Stream<HoodieInstant> instants = timeline.getReverseOrderedInstants();
+ List<HoodieInstant> v1 = instants.collect(Collectors.toList());
+ List<HoodieInstant> v2 = sup.get().collect(Collectors.toList());
+ Collections.reverse(v2);
+ assertEquals(v1, v2);
+
+ /**
+ * Helper function to check HoodieTimeline only contains some type of
Instant states.
+ * @param timeline The HoodieTimeline to check
+ * @param states The states that should be present in the timeline being
checked
+ */
+ BiConsumer<HoodieTimeline, Set<State>> checkFilter = (HoodieTimeline
timeline, Set<State> states) -> {
+ sup.get().filter(i -> states.contains(i.getState())).forEach(i ->
assertTrue(timeline.containsInstant(i)));
+ sup.get().filter(i -> !states.contains(i.getState())).forEach(i ->
assertFalse(timeline.containsInstant(i)));
+ };
+
+ checkFilter.accept(timeline.filter(i -> false), Sets.newHashSet());
+ checkFilter.accept(timeline.filterInflights(),
Sets.newHashSet(State.INFLIGHT));
+ checkFilter.accept(timeline.filterInflightsAndRequested(),
+ Sets.newHashSet(State.INFLIGHT, State.REQUESTED));
+
+ // filterCompletedAndCompactionInstants
+ // This cannot be done using checkFilter as it involves both states and
actions
+ final HoodieTimeline t1 = timeline.filterCompletedAndCompactionInstants();
+ final Set<State> states = Sets.newHashSet(State.REQUESTED,
State.COMPLETED);
+ final Set<String> actions =
Sets.newHashSet(HoodieTimeline.COMPACTION_ACTION);
+ sup.get().filter(i -> states.contains(i.getState()) ||
actions.contains(i.getAction()))
+ .forEach(i -> assertTrue(t1.containsInstant(i)));
+ sup.get().filter(i -> !(states.contains(i.getState()) ||
actions.contains(i.getAction())))
+ .forEach(i -> assertFalse(t1.containsInstant(i)));
+
+ // filterPendingCompactionTimeline
+ final HoodieTimeline t2 = timeline.filterPendingCompactionTimeline();
+ sup.get().filter(i -> i.getAction() == HoodieTimeline.COMPACTION_ACTION)
+ .forEach(i -> assertTrue(t2.containsInstant(i)));
+ sup.get().filter(i -> i.getAction() != HoodieTimeline.COMPACTION_ACTION)
+ .forEach(i -> assertFalse(t2.containsInstant(i)));
+ }
+
+ /**
+ * Returns an exhaustive list of all possible HoodieInstant.
+ * @return list of HoodieInstant
+ */
+ private List<HoodieInstant> getAllInstants() {
+ timeline = new HoodieActiveTimeline(metaClient);
+ List<HoodieInstant> allInstants = new ArrayList<HoodieInstant>();
+ long commitTime = 1;
+ for (State state : State.values()) {
+ if (state == State.INVALID) {
+ continue;
+ }
+ for (String action : HoodieTimeline.VALID_ACTIONS_IN_TIMELINE) {
+ // Following are not valid combinations of actions and state so we
should
+ // not be generating them.
+ if (state == State.REQUESTED) {
+ if (action == HoodieTimeline.SAVEPOINT_ACTION || action ==
HoodieTimeline.RESTORE_ACTION
+ || action == HoodieTimeline.ROLLBACK_ACTION) {
+ continue;
+ }
+ }
+ if (state == State.INFLIGHT && action ==
HoodieTimeline.ROLLBACK_ACTION) {
+ continue;
+ }
+ if (state == State.COMPLETED && action ==
HoodieTimeline.ROLLBACK_ACTION) {
+ continue;
+ }
+ // Compaction complete is called commit complete
+ if (state == State.COMPLETED && action ==
HoodieTimeline.COMPACTION_ACTION) {
+ action = HoodieTimeline.COMMIT_ACTION;
+ }
+
+ allInstants.add(new HoodieInstant(state, action, String.format("%03d",
commitTime++)));
+ }
+ }
+ return allInstants;
+ }
}