This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new be3f8fcd000 [HUDI-7701] Metadata table initailization with pending
instants (#11137)
be3f8fcd000 is described below
commit be3f8fcd000386c0acc1da4aba6a8a51ee6b2b5d
Author: Danny Chan <[email protected]>
AuthorDate: Mon May 6 16:44:21 2024 +0800
[HUDI-7701] Metadata table initailization with pending instants (#11137)
---
.../metadata/HoodieBackedTableMetadataWriter.java | 49 +++++++++-------------
.../common/testutils/HoodieMetadataTestTable.java | 8 ----
.../functional/TestHoodieBackedMetadata.java | 34 ++++++++++++++-
.../functional/TestHoodieMetadataBootstrap.java | 20 ++++-----
.../apache/hudi/io/TestHoodieTimelineArchiver.java | 14 ++++---
.../hudi/table/TestHoodieMergeOnReadTable.java | 27 ++++++------
.../table/timeline/HoodieInstantTimeGenerator.java | 13 ++++--
.../hudi/metadata/HoodieTableMetadataUtil.java | 19 ++-------
.../hudi/common/testutils/HoodieTestTable.java | 8 ++++
.../hudi/metadata/TestHoodieTableMetadataUtil.java | 3 +-
10 files changed, 105 insertions(+), 90 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index a8aa7f3725d..f01b5bd3a08 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -82,7 +82,6 @@ import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
@@ -326,9 +325,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
*/
private boolean initializeFromFilesystem(String initializationTime,
List<MetadataPartitionType> partitionsToInit,
Option<String>
inflightInstantTimestamp) throws IOException {
- if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) {
- return false;
- }
+ Set<String> pendingDataInstants = getPendingDataInstants(dataMetaClient);
// FILES partition is always required and is initialized first
boolean filesPartitionAvailable =
dataMetaClient.getTableConfig().isMetadataPartitionAvailable(FILES);
@@ -354,11 +351,11 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
// Get a complete list of files and partitions from the file system or
from already initialized FILES partition of MDT
List<DirectoryInfo> partitionInfoList;
if (filesPartitionAvailable) {
- partitionInfoList = listAllPartitionsFromMDT(initializationTime);
+ partitionInfoList = listAllPartitionsFromMDT(initializationTime,
pendingDataInstants);
} else {
// if auto initialization is enabled, then we need to list all
partitions from the file system
if (dataWriteConfig.getMetadataConfig().shouldAutoInitialize()) {
- partitionInfoList =
listAllPartitionsFromFilesystem(initializationTime);
+ partitionInfoList =
listAllPartitionsFromFilesystem(initializationTime, pendingDataInstants);
} else {
// if auto initialization is disabled, we can return an empty list
partitionInfoList = Collections.emptyList();
@@ -458,16 +455,16 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
* @return a unique timestamp for MDT
*/
private String generateUniqueCommitInstantTime(String initializationTime) {
- // if it's initialized via Async indexer, we don't need to alter the init
time
+ // If it's initialized via Async indexer, we don't need to alter the init
time.
+ // otherwise yields the timestamp on the fly.
+ // This function would be called multiple times in a single application if
multiple indexes are being
+ // initialized one after the other.
HoodieTimeline dataIndexTimeline =
dataMetaClient.getActiveTimeline().filter(instant ->
instant.getAction().equals(HoodieTimeline.INDEXING_ACTION));
if (HoodieTableMetadataUtil.isIndexingCommit(dataIndexTimeline,
initializationTime)) {
return initializationTime;
}
- // Add suffix to initializationTime to find an unused instant time for the
next index initialization.
- // This function would be called multiple times in a single application if
multiple indexes are being
- // initialized one after the other.
for (int offset = 0; ; ++offset) {
- final String commitInstantTime =
HoodieTableMetadataUtil.createIndexInitTimestamp(initializationTime, offset);
+ final String commitInstantTime =
HoodieInstantTimeGenerator.instantTimePlusMillis(SOLO_COMMIT_TIMESTAMP, offset);
if
(!metadataMetaClient.getCommitsTimeline().containsInstant(commitInstantTime)) {
return commitInstantTime;
}
@@ -602,22 +599,14 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
return Pair.of(fileGroupCount, allPartitionsRecord.union(fileListRecords));
}
- private boolean anyPendingDataInstant(HoodieTableMetaClient dataMetaClient,
Option<String> inflightInstantTimestamp) {
- // We can only initialize if there are no pending operations on the dataset
- List<HoodieInstant> pendingDataInstant = dataMetaClient.getActiveTimeline()
+ private Set<String> getPendingDataInstants(HoodieTableMetaClient
dataMetaClient) {
+ // Initialize excluding the pending operations on the dataset
+ return dataMetaClient.getActiveTimeline()
.getInstantsAsStream().filter(i -> !i.isCompleted())
- .filter(i -> !inflightInstantTimestamp.isPresent() ||
!i.getTimestamp().equals(inflightInstantTimestamp.get()))
// regular writers should not be blocked due to pending indexing action
.filter(i -> !HoodieTimeline.INDEXING_ACTION.equals(i.getAction()))
- .collect(Collectors.toList());
-
- if (!pendingDataInstant.isEmpty()) {
- metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.BOOTSTRAP_ERR_STR, 1));
- LOG.warn("Cannot initialize metadata table as operation(s) are in
progress on the dataset: {}",
- Arrays.toString(pendingDataInstant.toArray()));
- return true;
- }
- return false;
+ .map(HoodieInstant::getTimestamp)
+ .collect(Collectors.toSet());
}
private HoodieTableMetaClient initializeMetaClient() throws IOException {
@@ -643,9 +632,10 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
* Function to find hoodie partitions and list files in them in parallel.
*
* @param initializationTime Files which have a timestamp after this are
neglected
+ * @param pendingDataInstants Pending instants on data set
* @return List consisting of {@code DirectoryInfo} for each partition found.
*/
- private List<DirectoryInfo> listAllPartitionsFromFilesystem(String
initializationTime) {
+ private List<DirectoryInfo> listAllPartitionsFromFilesystem(String
initializationTime, Set<String> pendingDataInstants) {
List<StoragePath> pathsToList = new LinkedList<>();
pathsToList.add(new StoragePath(dataWriteConfig.getBasePath()));
@@ -664,7 +654,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
List<DirectoryInfo> processedDirectories =
engineContext.map(pathsToList.subList(0, numDirsToList), path -> {
HoodieStorage storage = HoodieStorageUtils.getStorage(path,
storageConf);
String relativeDirPath =
FSUtils.getRelativePartitionPath(storageBasePath, path);
- return new DirectoryInfo(relativeDirPath,
storage.listDirectEntries(path), initializationTime);
+ return new DirectoryInfo(relativeDirPath,
storage.listDirectEntries(path), initializationTime, pendingDataInstants);
}, numDirsToList);
pathsToList = new LinkedList<>(pathsToList.subList(numDirsToList,
pathsToList.size()));
@@ -697,15 +687,16 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
* Function to find hoodie partitions and list files in them in parallel
from MDT.
*
* @param initializationTime Files which have a timestamp after this are
neglected
+ * @param pendingDataInstants Files coming from pending instants are
neglected
* @return List consisting of {@code DirectoryInfo} for each partition found.
*/
- private List<DirectoryInfo> listAllPartitionsFromMDT(String
initializationTime) throws IOException {
+ private List<DirectoryInfo> listAllPartitionsFromMDT(String
initializationTime, Set<String> pendingDataInstants) throws IOException {
List<DirectoryInfo> dirinfoList = new LinkedList<>();
List<String> allPartitionPaths = metadata.getAllPartitionPaths().stream()
.map(partitionPath -> dataWriteConfig.getBasePath() +
StoragePath.SEPARATOR_CHAR + partitionPath).collect(Collectors.toList());
Map<String, List<StoragePathInfo>> partitionFileMap =
metadata.getAllFilesInPartitions(allPartitionPaths);
for (Map.Entry<String, List<StoragePathInfo>> entry :
partitionFileMap.entrySet()) {
- dirinfoList.add(new DirectoryInfo(entry.getKey(), entry.getValue(),
initializationTime));
+ dirinfoList.add(new DirectoryInfo(entry.getKey(), entry.getValue(),
initializationTime, pendingDataInstants));
}
return dirinfoList;
}
@@ -1042,7 +1033,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
// Restore requires the existing pipelines to be shutdown. So we can
safely scan the dataset to find the current
// list of files in the filesystem.
- List<DirectoryInfo> dirInfoList =
listAllPartitionsFromFilesystem(instantTime);
+ List<DirectoryInfo> dirInfoList =
listAllPartitionsFromFilesystem(instantTime, Collections.emptySet());
Map<String, DirectoryInfo> dirInfoMap =
dirInfoList.stream().collect(Collectors.toMap(DirectoryInfo::getRelativePath,
Function.identity()));
dirInfoList.clear();
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
index ae6de2f680d..c2ae9024f2a 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
@@ -114,14 +114,6 @@ public class HoodieMetadataTestTable extends
HoodieTestTable {
return this;
}
- public HoodieTestTable moveInflightCommitToComplete(String instantTime,
HoodieCommitMetadata metadata, boolean ignoreWriter) throws IOException {
- super.moveInflightCommitToComplete(instantTime, metadata);
- if (!ignoreWriter && writer != null) {
- writer.updateFromWriteStatuses(metadata,
context.get().emptyHoodieData(), instantTime);
- }
- return this;
- }
-
@Override
public HoodieTestTable moveInflightCompactionToComplete(String instantTime,
HoodieCommitMetadata metadata) throws IOException {
super.moveInflightCompactionToComplete(instantTime, metadata);
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index fc55c6723ee..52938c98547 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -925,6 +925,38 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
assertEquals(HoodieInstantTimeGenerator.instantTimeMinusMillis(inflightInstant2,
1L), tableMetadata.getLatestCompactionTime().get());
}
+ @ParameterizedTest
+ @EnumSource(HoodieTableType.class)
+ public void testInitializeMetadataTableWithPendingInstant(HoodieTableType
tableType) throws Exception {
+ init(tableType, false);
+ initWriteConfigAndMetatableWriter(writeConfig, false);
+ // 1. firstly we disable the metadata table, then create two completed
commits and one inflight commit.
+ // 1.1write 2 commits first.
+ doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT);
+ doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT);
+
+ // 1.2 create another inflight commit
+ String inflightInstant = metaClient.createNewInstantTime();
+ HoodieCommitMetadata inflightCommitMeta =
testTable.doWriteOperation(inflightInstant, UPSERT, emptyList(),
+ asList("p1", "p2"), 2, false, true);
+ doWriteOperation(testTable, metaClient.createNewInstantTime());
+
+ // 2. now enable the metadata table and triggers the initialization
+ writeConfig = getWriteConfigBuilder(true, true, false)
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+ .enable(true)
+ .enableMetrics(false)
+ .withMaxNumDeltaCommitsBeforeCompaction(4)
+ .build()).build();
+
+ // 2.1 initializes the metadata table, it will exclude the files from the
inflight instant.
+ initWriteConfigAndMetatableWriter(writeConfig, true);
+
+ // 2.2 move inflight to completed
+ testTable.moveInflightCommitToComplete(inflightInstant,
inflightCommitMeta);
+ validateMetadata(testTable, true);
+ }
+
/**
* Tests that virtual key configs are honored in base files after compaction
in metadata table.
*/
@@ -1131,7 +1163,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
assertEquals(1,
timeline.getCommitsTimeline().filterCompletedInstants().countInstants());
assertEquals(3, mdtTimeline.countInstants());
assertEquals(3,
mdtTimeline.getCommitsTimeline().filterCompletedInstants().countInstants());
- String mdtInitCommit2 =
HoodieTableMetadataUtil.createIndexInitTimestamp(SOLO_COMMIT_TIMESTAMP, 1);
+ String mdtInitCommit2 =
mdtTimeline.getCommitsTimeline().filterCompletedInstants().getInstants().get(1).getTimestamp();
Pair<HoodieInstant, HoodieCommitMetadata> lastCommitMetadataWithValidData =
mdtTimeline.getLastCommitMetadataWithValidData().get();
String commit = lastCommitMetadataWithValidData.getLeft().getTimestamp();
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBootstrap.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBootstrap.java
index 2641feab321..eb64be99769 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBootstrap.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBootstrap.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.FileCreateUtils;
-import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.config.HoodieArchivalConfig;
@@ -167,20 +166,19 @@ public class TestHoodieMetadataBootstrap extends
TestHoodieMetadataBase {
HoodieTableType tableType = COPY_ON_WRITE;
init(tableType, false);
+ // In real production env, bootstrap action can only happen on empty table,
+ // otherwise we need to roll back the previous bootstrap first,
+ // see 'SparkBootstrapCommitActionExecutor.execute' for more details.
doPreBootstrapWriteOperation(testTable, INSERT, "0000001");
doPreBootstrapWriteOperation(testTable, "0000002");
// add an inflight commit
HoodieCommitMetadata inflightCommitMeta =
testTable.doWriteOperation("00000007", UPSERT, emptyList(),
- asList("p1", "p2"), 2, true, true);
+ asList("p1", "p2"), 2, false, true);
// bootstrap and following validation should fail. bootstrap should not
happen.
bootstrapAndVerifyFailure();
// once the commit is complete, metadata should get fully synced.
- // in prod code path, SparkHoodieBackedTableMetadataWriter.create() will
be called for every commit,
- // which may not be the case here if we directly call
HoodieBackedTableMetadataWriter.update()
- // hence let's first move the commit to complete and invoke sync directly
- ((HoodieMetadataTestTable)
testTable).moveInflightCommitToComplete("00000007", inflightCommitMeta, true);
- syncTableMetadata(writeConfig);
+ testTable.moveInflightCommitToComplete("00000007", inflightCommitMeta);
validateMetadata(testTable);
}
@@ -261,12 +259,8 @@ public class TestHoodieMetadataBootstrap extends
TestHoodieMetadataBase {
writeConfig = getWriteConfig(true, true);
initWriteConfigAndMetatableWriter(writeConfig, true);
syncTableMetadata(writeConfig);
- try {
- validateMetadata(testTable);
- Assertions.fail("Should have failed");
- } catch (IllegalStateException e) {
- // expected
- }
+ Assertions.assertThrows(Error.class, () -> validateMetadata(testTable),
+ "expected 6 lines, but only got 4");
}
private void doWriteInsertAndUpsert(HoodieTestTable testTable) throws
Exception {
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
index 53241a34983..49351b463c2 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
@@ -38,6 +38,7 @@ import
org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
+import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.LSMTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -1519,34 +1520,35 @@ public class TestHoodieTimelineArchiver extends
HoodieSparkClientTestHarness {
List<HoodieInstant> metadataTableInstants =
metadataTableMetaClient.getActiveTimeline()
.getCommitsTimeline().filterCompletedInstants().getInstants();
+ final String mdtInitCommit =
HoodieInstantTimeGenerator.instantTimePlusMillis(SOLO_COMMIT_TIMESTAMP, 0L);
if (i == 1) {
- // In the metadata table timeline, the first delta commit is
"00000000000000"
+ // In the metadata table timeline, the first delta commit is
"00000000000000000"
assertEquals(i + 1, metadataTableInstants.size());
assertTrue(metadataTableInstants.contains(
- new HoodieInstant(State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, SOLO_COMMIT_TIMESTAMP + "010")));
+ new HoodieInstant(State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, mdtInitCommit)));
assertTrue(metadataTableInstants.contains(
new HoodieInstant(State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, instants.get(0))));
} else if (i == 2) {
assertEquals(i - 1, metadataTableInstants.size());
assertTrue(metadataTableInstants.contains(
- new HoodieInstant(State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, SOLO_COMMIT_TIMESTAMP + "010")));
+ new HoodieInstant(State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, mdtInitCommit)));
assertFalse(metadataTableInstants.contains(
new HoodieInstant(State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, instants.get(1))));
} else if (i <= 9) {
- // In the metadata table timeline, the first delta commit is
"00000000000000"
+ // In the metadata table timeline, the first delta commit is
"00000000000000000"
// from metadata table init, delta commits 1 till 8 are added
// later on without archival or compaction
// rollback in DT will also trigger rollback in MDT
assertEquals(i - 1, metadataTableInstants.size());
assertTrue(metadataTableInstants.contains(
- new HoodieInstant(State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, SOLO_COMMIT_TIMESTAMP + "010")));
+ new HoodieInstant(State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, mdtInitCommit)));
// rolled back commits may not be present in MDT timeline [1]
IntStream.range(3, i).forEach(j ->
assertTrue(metadataTableInstants.contains(
new HoodieInstant(State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, instants.get(j - 1)))));
} else if (i == 10) {
// i == 10
- // The instant "00000000000010" was archived since it's less than
+ // The instant "00000000000000000" was archived since it's less than
// the earliest commit on the dataset active timeline,
// the dataset active timeline has instants:
// [7.commit, 8.commit, 9.commit, 10.commit]
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index e9bade9f842..070d4d0d325 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -72,6 +72,7 @@ import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -434,20 +435,22 @@ public class TestHoodieMergeOnReadTable extends
SparkClientFunctionalTestHarness
// Create a commit without metadata stats in metadata to test backwards
compatibility
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
String commitActionType = table.getMetaClient().getCommitActionType();
- HoodieInstant instant = new HoodieInstant(State.REQUESTED,
commitActionType, "000");
+ List<String> instants = new ArrayList<>();
+ String instant0 = metaClient.createNewInstantTime();
+ HoodieInstant instant = new HoodieInstant(State.REQUESTED,
commitActionType, instant0);
activeTimeline.createNewInstant(instant);
activeTimeline.transitionRequestedToInflight(instant, Option.empty());
- instant = new HoodieInstant(State.INFLIGHT, commitActionType, "000");
+ instant = new HoodieInstant(State.INFLIGHT, commitActionType, instant0);
activeTimeline.saveAsComplete(instant, Option.empty());
- String instantTime = "001";
- client.startCommitWithTime(instantTime);
+ String instant1 = metaClient.createNewInstantTime();
+ client.startCommitWithTime(instant1);
- List<HoodieRecord> records = dataGen.generateInserts(instantTime, 200);
+ List<HoodieRecord> records = dataGen.generateInserts(instant1, 200);
JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
- JavaRDD<WriteStatus> statuses = client.insert(writeRecords, instantTime);
- assertTrue(client.commit(instantTime, statuses), "Commit should
succeed");
+ JavaRDD<WriteStatus> statuses = client.insert(writeRecords, instant1);
+ assertTrue(client.commit(instant1, statuses), "Commit should succeed");
// Read from commit file
table = HoodieSparkTable.create(cfg, context());
@@ -462,11 +465,11 @@ public class TestHoodieMergeOnReadTable extends
SparkClientFunctionalTestHarness
}
assertEquals(200, inserts);
- instantTime = "002";
- client.startCommitWithTime(instantTime);
- records = dataGen.generateUpdates(instantTime, records);
+ String instant2 = metaClient.createNewInstantTime();
+ client.startCommitWithTime(instant2);
+ records = dataGen.generateUpdates(instant2, records);
writeRecords = jsc().parallelize(records, 1);
- statuses = client.upsert(writeRecords, instantTime);
+ statuses = client.upsert(writeRecords, instant2);
//assertTrue(client.commit(instantTime, statuses), "Commit should
succeed");
inserts = 0;
int upserts = 0;
@@ -480,7 +483,7 @@ public class TestHoodieMergeOnReadTable extends
SparkClientFunctionalTestHarness
assertEquals(0, inserts);
assertEquals(200, upserts);
- client.rollback(instantTime);
+ client.rollback(instant2);
// Read from commit file
table = HoodieSparkTable.create(cfg, context());
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java
index 80354195b72..174d79acb78 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java
@@ -95,13 +95,18 @@ public class HoodieInstantTimeGenerator {
}
public static String instantTimePlusMillis(String timestamp, long
milliseconds) {
+ final String timestampInMillis = fixInstantTimeCompatibility(timestamp);
try {
- String timestampInMillis = fixInstantTimeCompatibility(timestamp);
LocalDateTime dt = LocalDateTime.parse(timestampInMillis,
MILLIS_INSTANT_TIME_FORMATTER);
ZoneId zoneId = HoodieTimelineTimeZone.UTC.equals(commitTimeZone) ?
ZoneId.of("UTC") : ZoneId.systemDefault();
return
MILLIS_INSTANT_TIME_FORMATTER.format(dt.atZone(zoneId).toInstant().plusMillis(milliseconds).atZone(zoneId).toLocalDateTime());
} catch (DateTimeParseException e) {
- throw new HoodieException(e);
+ // To work with tests, that generate arbitrary timestamps, we need to
pad the timestamp with 0s.
+ if (isValidInstantTime(timestamp)) {
+ return String.format("%0" + MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH +
"d", Long.parseLong(timestamp) + milliseconds);
+ } else {
+ throw new HoodieException(e);
+ }
}
}
@@ -113,8 +118,8 @@ public class HoodieInstantTimeGenerator {
return
MILLIS_INSTANT_TIME_FORMATTER.format(dt.atZone(zoneId).toInstant().minusMillis(milliseconds).atZone(zoneId).toLocalDateTime());
} catch (DateTimeParseException e) {
// To work with tests, that generate arbitrary timestamps, we need to
pad the timestamp with 0s.
- if (isValidInstantTime(timestampInMillis)) {
- return String.format("%0" + timestampInMillis.length() + "d",
Long.parseLong(timestampInMillis) - milliseconds);
+ if (isValidInstantTime(timestamp)) {
+ return String.format("%0" + MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH +
"d", Long.parseLong(timestamp) - milliseconds);
} else {
throw new HoodieException(e);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 09e7be7e993..9e6e5b42975 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -166,9 +166,6 @@ public class HoodieTableMetadataUtil {
DoubleWrapper.class, FloatWrapper.class, LongWrapper.class,
StringWrapper.class, TimeMicrosWrapper.class,
TimestampMicrosWrapper.class));
- // This suffix and all after that are used for initialization of the various
partitions. The unused suffixes lower than this value
- // are reserved for future operations on the MDT.
- private static final int PARTITION_INITIALIZATION_TIME_SUFFIX = 10; //
corresponds to "010";
// we have max of 4 partitions (FILES, COL_STATS, BLOOM, RLI)
private static final List<String>
VALID_PARTITION_INITIALIZATION_TIME_SUFFIXES = Arrays.asList("010", "011",
"012", "013");
@@ -1625,16 +1622,6 @@ public class HoodieTableMetadataUtil {
return fileId.endsWith("-0") ? fileId.length() - 2 : fileId.length();
}
- /**
- * Create the timestamp for an index initialization operation on the
metadata table.
- * <p>
- * Since many MDT partitions can be initialized one after other the offset
parameter controls generating a
- * unique timestamp.
- */
- public static String createIndexInitTimestamp(String timestamp, int offset) {
- return String.format("%s%03d", timestamp,
PARTITION_INITIALIZATION_TIME_SUFFIX + offset);
- }
-
/**
* Estimates the file group count to use for a MDT partition.
*
@@ -2008,7 +1995,7 @@ public class HoodieTableMetadataUtil {
// Is this a hoodie partition
private boolean isHoodiePartition = false;
- public DirectoryInfo(String relativePath, List<StoragePathInfo> pathInfos,
String maxInstantTime) {
+ public DirectoryInfo(String relativePath, List<StoragePathInfo> pathInfos,
String maxInstantTime, Set<String> pendingDataInstants) {
this.relativePath = relativePath;
// Pre-allocate with the maximum length possible
@@ -2026,8 +2013,8 @@ public class HoodieTableMetadataUtil {
} else if (FSUtils.isDataFile(pathInfo.getPath())) {
// Regular HUDI data file (base file or log file)
String dataFileCommitTime =
FSUtils.getCommitTime(pathInfo.getPath().getName());
- // Limit the file listings to files which were created before the
maxInstant time.
- if (HoodieTimeline.compareTimestamps(dataFileCommitTime,
LESSER_THAN_OR_EQUALS, maxInstantTime)) {
+ // Limit the file listings to files which were created by successful
commits before the maxInstant time.
+ if (!pendingDataInstants.contains(dataFileCommitTime) &&
HoodieTimeline.compareTimestamps(dataFileCommitTime, LESSER_THAN_OR_EQUALS,
maxInstantTime)) {
filenameToSizeMap.put(pathInfo.getPath().getName(),
pathInfo.getLength());
}
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index c3aa1eb2d98..2720aa42dd0 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -621,6 +621,13 @@ public class HoodieTestTable {
return this;
}
+ public HoodieTestTable withPartitionMetaFiles(List<String> partitionPaths)
throws IOException {
+ for (String partitionPath : partitionPaths) {
+ FileCreateUtils.createPartitionMetaFile(basePath, partitionPath);
+ }
+ return this;
+ }
+
public HoodieTestTable withMarkerFile(String partitionPath, String fileId,
IOType ioType) throws IOException {
createMarkerFile(basePath, partitionPath, currentInstantTime, fileId,
ioType);
return this;
@@ -1127,6 +1134,7 @@ public class HoodieTestTable {
}
for (Map.Entry<String, List<Pair<String, Integer>>> entry :
partitionToFilesNameLengthMap.entrySet()) {
String partition = entry.getKey();
+ this.withPartitionMetaFiles(partition); // needed by the metadata table
initialization.
this.withBaseFilesInPartition(partition,
testTableState.getPartitionToBaseFileInfoMap(commitTime).get(partition));
if (MERGE_ON_READ.equals(metaClient.getTableType()) &&
UPSERT.equals(operationType)) {
this.withLogFilesInPartition(partition,
testTableState.getPartitionToLogFileInfoMap(commitTime).get(partition));
diff --git
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
index 88b051facaf..b69dc94609b 100644
---
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
+++
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
@@ -123,7 +123,8 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
partitionInfoList.add(new HoodieTableMetadataUtil.DirectoryInfo(
p,
metaClient.getStorage().listDirectEntries(Arrays.asList(storagePath1,
storagePath2)),
- instant2));
+ instant2,
+ Collections.emptySet()));
} catch (Exception e) {
throw new RuntimeException(e);
}