This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new db741da47ba [HUDI-6389] Fix instant time check against the active 
timeline in meta sync (#8991)
db741da47ba is described below

commit db741da47ba455504dec0ec6768f1fdfbe01a848
Author: Y Ethan Guo <[email protected]>
AuthorDate: Fri Jun 16 13:21:10 2023 -0700

    [HUDI-6389] Fix instant time check against the active timeline in meta sync 
(#8991)
    
    This commit fixes the problematic API implementation 
(TimelineUtils.getCommitsTimelineAfter) introduced by #7561.
---
 .../hudi/common/table/timeline/TimelineUtils.java  |  8 +--
 .../hudi/common/table/TestTimelineUtils.java       | 42 +++++++++++++
 .../org/apache/hudi/hive/TestHiveSyncTool.java     | 36 +++++++++++
 .../apache/hudi/hive/testutils/HiveTestUtil.java   | 71 +++++++++++++++-------
 4 files changed, 132 insertions(+), 25 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
index acca48537d4..6264d2fc1a0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
@@ -232,11 +232,11 @@ public class TimelineUtils {
    */
   public static HoodieTimeline getCommitsTimelineAfter(
       HoodieTableMetaClient metaClient, String exclusiveStartInstantTime, 
Option<String> lastMaxCompletionTime) {
-    HoodieDefaultTimeline activeTimeline = metaClient.getActiveTimeline();
+    HoodieDefaultTimeline writeTimeline = 
metaClient.getActiveTimeline().getWriteTimeline();
 
-    HoodieDefaultTimeline timeline = 
activeTimeline.isBeforeTimelineStarts(exclusiveStartInstantTime)
-        ? 
metaClient.getArchivedTimeline(exclusiveStartInstantTime).mergeTimeline(activeTimeline)
-        : activeTimeline;
+    HoodieDefaultTimeline timeline = 
writeTimeline.isBeforeTimelineStarts(exclusiveStartInstantTime)
+        ? 
metaClient.getArchivedTimeline(exclusiveStartInstantTime).mergeTimeline(writeTimeline)
+        : writeTimeline;
 
     HoodieDefaultTimeline timelineSinceLastSync = (HoodieDefaultTimeline) 
timeline.getCommitsTimeline()
         .findInstantsAfter(exclusiveStartInstantTime, Integer.MAX_VALUE);
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
index 6b48e748b50..a1f00b8eaf0 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
@@ -59,6 +59,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -67,6 +68,9 @@ import static 
org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT
 import static 
org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.LOG_COMPACTION_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.SAVEPOINT_ACTION;
@@ -77,6 +81,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -323,6 +328,33 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
             new HoodieInstant(COMPLETED, COMMIT_ACTION, "012", "012")),
         TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs, 
Option.of(startTs)));
     verify(mockMetaClient, times(1)).getArchivedTimeline(any());
+
+    // Should load both archived and active timeline
+    startTs = "005";
+    mockMetaClient = prepareMetaClient(
+        Arrays.asList(
+            new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "003", "003"),
+            new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "007", "007"),
+            new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "009", "009"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "010", "010"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "011", "011"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "012", "012")),
+        Arrays.asList(new HoodieInstant(COMPLETED, COMMIT_ACTION, "001", 
"001"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "002", "002"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "005", "005"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "006", "006"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "008", "008")),
+        startTs
+    );
+    verifyTimeline(
+        Arrays.asList(
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "006", "006"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "008", "008"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "010", "010"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "011", "011"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "012", "012")),
+        TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs, 
Option.of(startTs)));
+    verify(mockMetaClient, times(1)).getArchivedTimeline(any());
   }
 
   private HoodieTableMetaClient prepareMetaClient(
@@ -337,6 +369,8 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
     HoodieActiveTimeline activeTimeline = new 
HoodieActiveTimeline(mockMetaClient);
     when(mockMetaClient.getActiveTimeline())
         .thenReturn(activeTimeline);
+    Set<String> validWriteActions = CollectionUtils.createSet(
+        COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION, 
LOG_COMPACTION_ACTION, REPLACE_COMMIT_ACTION);
     when(mockMetaClient.getArchivedTimeline(any()))
         .thenReturn(mockArchivedTimeline);
     HoodieDefaultTimeline mergedTimeline = new HoodieDefaultTimeline(
@@ -346,6 +380,14 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
         .mergeTimeline(activeTimeline);
     when(mockArchivedTimeline.mergeTimeline(eq(activeTimeline)))
         .thenReturn(mergedTimeline);
+    HoodieDefaultTimeline mergedWriteTimeline = new HoodieDefaultTimeline(
+        archivedInstants.stream()
+            .filter(instant -> instant.getTimestamp().compareTo(startTs) >= 0),
+        i -> Option.empty())
+        .mergeTimeline(activeTimeline.getWriteTimeline());
+    when(mockArchivedTimeline.mergeTimeline(argThat(timeline -> 
timeline.filter(
+        instant -> 
instant.getAction().equals(ROLLBACK_ACTION)).countInstants() == 0)))
+        .thenReturn(mergedWriteTimeline);
 
     return mockMetaClient;
   }
diff --git 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index da23d8876c7..c51ceadd7ff 100644
--- 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++ 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieSyncTableStrategy;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
 import org.apache.hudi.common.testutils.NetworkTestUtils;
 import org.apache.hudi.common.testutils.SchemaTestUtil;
@@ -390,6 +391,41 @@ public class TestHiveSyncTool {
     tablePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
     assertEquals(Option.of("300"), 
hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME));
     assertEquals(7, tablePartitions.size());
+
+    // Add the following instants to the active timeline and sync: "400" 
(rollback), "500" (commit)
+    // Last commit time sync is "500" after Hive sync
+    HiveTestUtil.addRollbackInstantToTable("400", "350");
+    HiveTestUtil.commitToTable("500", 7, useSchemaFromCommitMetadata);
+    reInitHiveSyncClient();
+    reSyncHiveTable();
+    tablePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
+    assertEquals(Option.of("500"), 
hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME));
+    assertEquals(8, tablePartitions.size());
+
+    // Add more instants with adding a partition and simulate the case where 
the commit adding
+    // the new partition is archived.
+    // Before simulated archival: "300", "400" (rollback), "500", "600" 
(adding new partition), "700", "800"
+    // After simulated archival: "400" (rollback), "700", "800"
+    // In this case, listing all partitions should be triggered to catch up.
+    HiveTestUtil.commitToTable("600", 8, useSchemaFromCommitMetadata);
+    HiveTestUtil.commitToTable("700", 1, useSchemaFromCommitMetadata);
+    HiveTestUtil.commitToTable("800", 1, useSchemaFromCommitMetadata);
+    HiveTestUtil.removeCommitFromActiveTimeline("300", COMMIT_ACTION);
+    HiveTestUtil.removeCommitFromActiveTimeline("500", COMMIT_ACTION);
+    HiveTestUtil.removeCommitFromActiveTimeline("600", COMMIT_ACTION);
+    HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+        
.setConf(hiveClient.config.getHadoopConf()).setBasePath(basePath).build();
+    assertEquals(
+        Arrays.asList("400", "700", "800"),
+        metaClient.getActiveTimeline().getInstants().stream()
+            .map(HoodieInstant::getTimestamp).sorted()
+            .collect(Collectors.toList()));
+
+    reInitHiveSyncClient();
+    reSyncHiveTable();
+    tablePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
+    assertEquals(Option.of("800"), 
hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME));
+    assertEquals(9, tablePartitions.size());
   }
 
   @ParameterizedTest
diff --git 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
index d63c026eb6d..85a5789317d 100644
--- 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
+++ 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.hive.testutils;
 
 import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.bloom.BloomFilterFactory;
 import org.apache.hudi.common.bloom.BloomFilterTypeCode;
@@ -81,6 +82,7 @@ import java.time.ZonedDateTime;
 import java.time.format.DateTimeFormatter;
 import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -92,6 +94,7 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
+import static 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeRollbackMetadata;
 import static 
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM;
 import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_PASS;
 import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL;
@@ -263,6 +266,32 @@ public class HiveTestUtil {
     createReplaceCommitFile(replaceCommitMetadata, instantTime);
   }
 
+  public static void addRollbackInstantToTable(String instantTime, String 
commitToRollback)
+      throws IOException {
+    HoodieRollbackMetadata rollbackMetadata = 
HoodieRollbackMetadata.newBuilder()
+        .setVersion(1)
+        .setStartRollbackTime(instantTime)
+        .setTotalFilesDeleted(1)
+        .setTimeTakenInMillis(1000)
+        .setCommitsRollback(Collections.singletonList(commitToRollback))
+        .setPartitionMetadata(Collections.emptyMap())
+        .setInstantsRollback(Collections.emptyList())
+        .build();
+
+    createMetaFile(
+        basePath,
+        HoodieTimeline.makeRequestedRollbackFileName(instantTime),
+        "".getBytes());
+    createMetaFile(
+        basePath,
+        HoodieTimeline.makeInflightRollbackFileName(instantTime),
+        "".getBytes());
+    createMetaFile(
+        basePath,
+        HoodieTimeline.makeRollbackFileName(instantTime),
+        serializeRollbackMetadata(rollbackMetadata).get());
+  }
+
   public static void createCOWTableWithSchema(String instantTime, String 
schemaFileName)
       throws IOException, URISyntaxException {
     Path path = new Path(basePath);
@@ -521,21 +550,17 @@ public class HiveTestUtil {
   }
 
   public static void createCommitFile(HoodieCommitMetadata commitMetadata, 
String instantTime, String basePath) throws IOException {
-    byte[] bytes = 
commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
-    Path fullPath = new Path(basePath + "/" + METAFOLDER_NAME + "/"
-        + HoodieTimeline.makeCommitFileName(instantTime));
-    FSDataOutputStream fsout = fileSystem.create(fullPath, true);
-    fsout.write(bytes);
-    fsout.close();
+    createMetaFile(
+        basePath,
+        HoodieTimeline.makeCommitFileName(instantTime),
+        commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
   }
 
   public static void createReplaceCommitFile(HoodieReplaceCommitMetadata 
commitMetadata, String instantTime) throws IOException {
-    byte[] bytes = 
commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
-    Path fullPath = new Path(basePath + "/" + METAFOLDER_NAME + "/"
-        + HoodieTimeline.makeReplaceFileName(instantTime));
-    FSDataOutputStream fsout = fileSystem.create(fullPath, true);
-    fsout.write(bytes);
-    fsout.close();
+    createMetaFile(
+        basePath,
+        HoodieTimeline.makeReplaceFileName(instantTime),
+        commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
   }
 
   public static void createCommitFileWithSchema(HoodieCommitMetadata 
commitMetadata, String instantTime, boolean isSimpleSchema) throws IOException {
@@ -545,19 +570,23 @@ public class HiveTestUtil {
 
   private static void createCompactionCommitFile(HoodieCommitMetadata 
commitMetadata, String instantTime)
       throws IOException {
-    byte[] bytes = 
commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
-    Path fullPath = new Path(basePath + "/" + METAFOLDER_NAME + "/"
-        + HoodieTimeline.makeCommitFileName(instantTime));
-    FSDataOutputStream fsout = fileSystem.create(fullPath, true);
-    fsout.write(bytes);
-    fsout.close();
+    createMetaFile(
+        basePath,
+        HoodieTimeline.makeCommitFileName(instantTime),
+        commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
   }
 
   private static void createDeltaCommitFile(HoodieCommitMetadata 
deltaCommitMetadata, String deltaCommitTime)
       throws IOException {
-    byte[] bytes = 
deltaCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
-    Path fullPath = new Path(basePath + "/" + METAFOLDER_NAME + "/"
-        + HoodieTimeline.makeDeltaFileName(deltaCommitTime));
+    createMetaFile(
+        basePath,
+        HoodieTimeline.makeDeltaFileName(deltaCommitTime),
+        deltaCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
+  }
+
+  private static void createMetaFile(String basePath, String fileName, byte[] 
bytes)
+      throws IOException {
+    Path fullPath = new Path(basePath + "/" + METAFOLDER_NAME + "/" + 
fileName);
     FSDataOutputStream fsout = fileSystem.create(fullPath, true);
     fsout.write(bytes);
     fsout.close();

Reply via email to