This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new db741da47ba [HUDI-6389] Fix instant time check against the active
timeline in meta sync (#8991)
db741da47ba is described below
commit db741da47ba455504dec0ec6768f1fdfbe01a848
Author: Y Ethan Guo <[email protected]>
AuthorDate: Fri Jun 16 13:21:10 2023 -0700
[HUDI-6389] Fix instant time check against the active timeline in meta sync
(#8991)
This commit fixes the problematic API implementation
(TimelineUtils.getCommitsTimelineAfter) introduced by #7561.
---
.../hudi/common/table/timeline/TimelineUtils.java | 8 +--
.../hudi/common/table/TestTimelineUtils.java | 42 +++++++++++++
.../org/apache/hudi/hive/TestHiveSyncTool.java | 36 +++++++++++
.../apache/hudi/hive/testutils/HiveTestUtil.java | 71 +++++++++++++++-------
4 files changed, 132 insertions(+), 25 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
index acca48537d4..6264d2fc1a0 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
@@ -232,11 +232,11 @@ public class TimelineUtils {
*/
public static HoodieTimeline getCommitsTimelineAfter(
HoodieTableMetaClient metaClient, String exclusiveStartInstantTime,
Option<String> lastMaxCompletionTime) {
- HoodieDefaultTimeline activeTimeline = metaClient.getActiveTimeline();
+ HoodieDefaultTimeline writeTimeline =
metaClient.getActiveTimeline().getWriteTimeline();
- HoodieDefaultTimeline timeline =
activeTimeline.isBeforeTimelineStarts(exclusiveStartInstantTime)
- ?
metaClient.getArchivedTimeline(exclusiveStartInstantTime).mergeTimeline(activeTimeline)
- : activeTimeline;
+ HoodieDefaultTimeline timeline =
writeTimeline.isBeforeTimelineStarts(exclusiveStartInstantTime)
+ ?
metaClient.getArchivedTimeline(exclusiveStartInstantTime).mergeTimeline(writeTimeline)
+ : writeTimeline;
HoodieDefaultTimeline timelineSinceLastSync = (HoodieDefaultTimeline)
timeline.getCommitsTimeline()
.findInstantsAfter(exclusiveStartInstantTime, Integer.MAX_VALUE);
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
index 6b48e748b50..a1f00b8eaf0 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
@@ -59,6 +59,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -67,6 +68,9 @@ import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT
import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.LOG_COMPACTION_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.SAVEPOINT_ACTION;
@@ -77,6 +81,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -323,6 +328,33 @@ public class TestTimelineUtils extends
HoodieCommonTestHarness {
new HoodieInstant(COMPLETED, COMMIT_ACTION, "012", "012")),
TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs,
Option.of(startTs)));
verify(mockMetaClient, times(1)).getArchivedTimeline(any());
+
+ // Should load both archived and active timeline
+ startTs = "005";
+ mockMetaClient = prepareMetaClient(
+ Arrays.asList(
+ new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "003", "003"),
+ new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "007", "007"),
+ new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "009", "009"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "010", "010"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "011", "011"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "012", "012")),
+ Arrays.asList(new HoodieInstant(COMPLETED, COMMIT_ACTION, "001",
"001"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "002", "002"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "005", "005"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "006", "006"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "008", "008")),
+ startTs
+ );
+ verifyTimeline(
+ Arrays.asList(
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "006", "006"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "008", "008"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "010", "010"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "011", "011"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "012", "012")),
+ TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs,
Option.of(startTs)));
+ verify(mockMetaClient, times(1)).getArchivedTimeline(any());
}
private HoodieTableMetaClient prepareMetaClient(
@@ -337,6 +369,8 @@ public class TestTimelineUtils extends
HoodieCommonTestHarness {
HoodieActiveTimeline activeTimeline = new
HoodieActiveTimeline(mockMetaClient);
when(mockMetaClient.getActiveTimeline())
.thenReturn(activeTimeline);
+ Set<String> validWriteActions = CollectionUtils.createSet(
+ COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION,
LOG_COMPACTION_ACTION, REPLACE_COMMIT_ACTION);
when(mockMetaClient.getArchivedTimeline(any()))
.thenReturn(mockArchivedTimeline);
HoodieDefaultTimeline mergedTimeline = new HoodieDefaultTimeline(
@@ -346,6 +380,14 @@ public class TestTimelineUtils extends
HoodieCommonTestHarness {
.mergeTimeline(activeTimeline);
when(mockArchivedTimeline.mergeTimeline(eq(activeTimeline)))
.thenReturn(mergedTimeline);
+ HoodieDefaultTimeline mergedWriteTimeline = new HoodieDefaultTimeline(
+ archivedInstants.stream()
+ .filter(instant -> instant.getTimestamp().compareTo(startTs) >= 0),
+ i -> Option.empty())
+ .mergeTimeline(activeTimeline.getWriteTimeline());
+ when(mockArchivedTimeline.mergeTimeline(argThat(timeline ->
timeline.filter(
+ instant ->
instant.getAction().equals(ROLLBACK_ACTION)).countInstants() == 0)))
+ .thenReturn(mergedWriteTimeline);
return mockMetaClient;
}
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index da23d8876c7..c51ceadd7ff 100644
---
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieSyncTableStrategy;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.testutils.NetworkTestUtils;
import org.apache.hudi.common.testutils.SchemaTestUtil;
@@ -390,6 +391,41 @@ public class TestHiveSyncTool {
tablePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
assertEquals(Option.of("300"),
hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME));
assertEquals(7, tablePartitions.size());
+
+ // Add the following instants to the active timeline and sync: "400"
(rollback), "500" (commit)
+ // Last commit time sync is "500" after Hive sync
+ HiveTestUtil.addRollbackInstantToTable("400", "350");
+ HiveTestUtil.commitToTable("500", 7, useSchemaFromCommitMetadata);
+ reInitHiveSyncClient();
+ reSyncHiveTable();
+ tablePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
+ assertEquals(Option.of("500"),
hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME));
+ assertEquals(8, tablePartitions.size());
+
+ // Add more instants with adding a partition and simulate the case where
the commit adding
+ // the new partition is archived.
+ // Before simulated archival: "300", "400" (rollback), "500", "600"
(adding new partition), "700", "800"
+ // After simulated archival: "400" (rollback), "700", "800"
+ // In this case, listing all partitions should be triggered to catch up.
+ HiveTestUtil.commitToTable("600", 8, useSchemaFromCommitMetadata);
+ HiveTestUtil.commitToTable("700", 1, useSchemaFromCommitMetadata);
+ HiveTestUtil.commitToTable("800", 1, useSchemaFromCommitMetadata);
+ HiveTestUtil.removeCommitFromActiveTimeline("300", COMMIT_ACTION);
+ HiveTestUtil.removeCommitFromActiveTimeline("500", COMMIT_ACTION);
+ HiveTestUtil.removeCommitFromActiveTimeline("600", COMMIT_ACTION);
+ HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+
.setConf(hiveClient.config.getHadoopConf()).setBasePath(basePath).build();
+ assertEquals(
+ Arrays.asList("400", "700", "800"),
+ metaClient.getActiveTimeline().getInstants().stream()
+ .map(HoodieInstant::getTimestamp).sorted()
+ .collect(Collectors.toList()));
+
+ reInitHiveSyncClient();
+ reSyncHiveTable();
+ tablePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
+ assertEquals(Option.of("800"),
hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME));
+ assertEquals(9, tablePartitions.size());
}
@ParameterizedTest
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
index d63c026eb6d..85a5789317d 100644
---
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
@@ -19,6 +19,7 @@
package org.apache.hudi.hive.testutils;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
@@ -81,6 +82,7 @@ import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -92,6 +94,7 @@ import java.util.UUID;
import java.util.stream.Collectors;
import static
org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
+import static
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeRollbackMetadata;
import static
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_PASS;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL;
@@ -263,6 +266,32 @@ public class HiveTestUtil {
createReplaceCommitFile(replaceCommitMetadata, instantTime);
}
+ public static void addRollbackInstantToTable(String instantTime, String
commitToRollback)
+ throws IOException {
+ HoodieRollbackMetadata rollbackMetadata =
HoodieRollbackMetadata.newBuilder()
+ .setVersion(1)
+ .setStartRollbackTime(instantTime)
+ .setTotalFilesDeleted(1)
+ .setTimeTakenInMillis(1000)
+ .setCommitsRollback(Collections.singletonList(commitToRollback))
+ .setPartitionMetadata(Collections.emptyMap())
+ .setInstantsRollback(Collections.emptyList())
+ .build();
+
+ createMetaFile(
+ basePath,
+ HoodieTimeline.makeRequestedRollbackFileName(instantTime),
+ "".getBytes());
+ createMetaFile(
+ basePath,
+ HoodieTimeline.makeInflightRollbackFileName(instantTime),
+ "".getBytes());
+ createMetaFile(
+ basePath,
+ HoodieTimeline.makeRollbackFileName(instantTime),
+ serializeRollbackMetadata(rollbackMetadata).get());
+ }
+
public static void createCOWTableWithSchema(String instantTime, String
schemaFileName)
throws IOException, URISyntaxException {
Path path = new Path(basePath);
@@ -521,21 +550,17 @@ public class HiveTestUtil {
}
public static void createCommitFile(HoodieCommitMetadata commitMetadata,
String instantTime, String basePath) throws IOException {
- byte[] bytes =
commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
- Path fullPath = new Path(basePath + "/" + METAFOLDER_NAME + "/"
- + HoodieTimeline.makeCommitFileName(instantTime));
- FSDataOutputStream fsout = fileSystem.create(fullPath, true);
- fsout.write(bytes);
- fsout.close();
+ createMetaFile(
+ basePath,
+ HoodieTimeline.makeCommitFileName(instantTime),
+ commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
}
public static void createReplaceCommitFile(HoodieReplaceCommitMetadata
commitMetadata, String instantTime) throws IOException {
- byte[] bytes =
commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
- Path fullPath = new Path(basePath + "/" + METAFOLDER_NAME + "/"
- + HoodieTimeline.makeReplaceFileName(instantTime));
- FSDataOutputStream fsout = fileSystem.create(fullPath, true);
- fsout.write(bytes);
- fsout.close();
+ createMetaFile(
+ basePath,
+ HoodieTimeline.makeReplaceFileName(instantTime),
+ commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
}
public static void createCommitFileWithSchema(HoodieCommitMetadata
commitMetadata, String instantTime, boolean isSimpleSchema) throws IOException {
@@ -545,19 +570,23 @@ public class HiveTestUtil {
private static void createCompactionCommitFile(HoodieCommitMetadata
commitMetadata, String instantTime)
throws IOException {
- byte[] bytes =
commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
- Path fullPath = new Path(basePath + "/" + METAFOLDER_NAME + "/"
- + HoodieTimeline.makeCommitFileName(instantTime));
- FSDataOutputStream fsout = fileSystem.create(fullPath, true);
- fsout.write(bytes);
- fsout.close();
+ createMetaFile(
+ basePath,
+ HoodieTimeline.makeCommitFileName(instantTime),
+ commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
}
private static void createDeltaCommitFile(HoodieCommitMetadata
deltaCommitMetadata, String deltaCommitTime)
throws IOException {
- byte[] bytes =
deltaCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
- Path fullPath = new Path(basePath + "/" + METAFOLDER_NAME + "/"
- + HoodieTimeline.makeDeltaFileName(deltaCommitTime));
+ createMetaFile(
+ basePath,
+ HoodieTimeline.makeDeltaFileName(deltaCommitTime),
+ deltaCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
+ }
+
+ private static void createMetaFile(String basePath, String fileName, byte[]
bytes)
+ throws IOException {
+ Path fullPath = new Path(basePath + "/" + METAFOLDER_NAME + "/" +
fileName);
FSDataOutputStream fsout = fileSystem.create(fullPath, true);
fsout.write(bytes);
fsout.close();