This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8ccd7da2936 [HUDI-6679] Fix initialization of metadata table
partitions upon failure (#9419)
8ccd7da2936 is described below
commit 8ccd7da293620ee94fb08035c04ddc595651332f
Author: Y Ethan Guo <[email protected]>
AuthorDate: Thu Aug 10 19:17:07 2023 -0700
[HUDI-6679] Fix initialization of metadata table partitions upon failure
(#9419)
---
.../hudi/client/BaseHoodieTableServiceClient.java | 8 +-
.../metadata/HoodieBackedTableMetadataWriter.java | 7 +-
.../functional/TestHoodieBackedMetadata.java | 123 ++++++++++++++++++++-
3 files changed, 128 insertions(+), 10 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index e55fb045e1e..7e78bddd875 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -57,7 +57,6 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieLogCompactException;
import org.apache.hudi.exception.HoodieRollbackException;
-import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
@@ -88,6 +87,7 @@ import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+import static org.apache.hudi.metadata.HoodieTableMetadata.isMetadataTable;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.isIndexingCommit;
/**
@@ -932,8 +932,10 @@ public abstract class BaseHoodieTableServiceClient<I, T,
O> extends BaseHoodieCl
LinkedHashMap<String, Option<HoodiePendingRollbackInfo>>
reverseSortedRollbackInstants = instantsToRollback.entrySet()
.stream().sorted((i1, i2) -> i2.getKey().compareTo(i1.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1,
e2) -> e1, LinkedHashMap::new));
+ boolean isMetadataTable = isMetadataTable(basePath);
for (Map.Entry<String, Option<HoodiePendingRollbackInfo>> entry :
reverseSortedRollbackInstants.entrySet()) {
- if (HoodieTimeline.compareTimestamps(entry.getKey(),
HoodieTimeline.LESSER_THAN_OR_EQUALS,
+ if (!isMetadataTable
+ && HoodieTimeline.compareTimestamps(entry.getKey(),
HoodieTimeline.LESSER_THAN_OR_EQUALS,
HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) {
// do we need to handle failed rollback of a bootstrap
rollbackFailedBootstrap();
@@ -954,7 +956,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
// from the async indexer (`HoodieIndexer`).
// TODO(HUDI-5733): This should be cleaned up once the proper fix of
rollbacks in the
// metadata table is landed.
- if
(HoodieTableMetadata.isMetadataTable(metaClient.getBasePathV2().toString())) {
+ if (isMetadataTable(metaClient.getBasePathV2().toString())) {
return
inflightInstantsStream.map(HoodieInstant::getTimestamp).filter(entry -> {
if (curInstantTime.isPresent()) {
return !entry.equals(curInstantTime.get());
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 4f965e587cb..74d8ae16176 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -112,7 +112,6 @@ import static
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deseri
import static
org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
import static
org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.createRollbackTimestamp;
-import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightAndCompletedMetadataPartitions;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightMetadataPartitions;
/**
@@ -257,10 +256,10 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
// check if any of the enabled partition types needs to be initialized
// NOTE: It needs to be guarded by async index config because if that is
enabled then initialization happens through the index scheduler.
if (!dataWriteConfig.isMetadataAsyncIndex()) {
- Set<String> inflightAndCompletedPartitions =
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
- LOG.info("Async metadata indexing disabled and following partitions
already initialized: " + inflightAndCompletedPartitions);
+ Set<String> completedPartitions =
dataMetaClient.getTableConfig().getMetadataPartitions();
+ LOG.info("Async metadata indexing disabled and following partitions
already initialized: " + completedPartitions);
this.enabledPartitionTypes.stream()
- .filter(p ->
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) &&
!MetadataPartitionType.FILES.equals(p))
+ .filter(p -> !completedPartitions.contains(p.getPartitionPath())
&& !MetadataPartitionType.FILES.equals(p))
.forEach(partitionsToInit::add);
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index d33cada74b6..464d47b2a27 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -75,6 +75,7 @@ import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.hash.ColumnIndexID;
import org.apache.hudi.common.util.hash.PartitionIndexID;
import org.apache.hudi.config.HoodieArchivalConfig;
@@ -110,8 +111,10 @@ import org.apache.hudi.testutils.MetadataMergeWriteStatus;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.util.Time;
@@ -160,10 +163,15 @@ import static
org.apache.hudi.common.model.WriteOperationType.DELETE;
import static org.apache.hudi.common.model.WriteOperationType.INSERT;
import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
import static
org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_EXTENSION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_EXTENSION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.INFLIGHT_EXTENSION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.REQUESTED_EXTENSION;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.getNextCommitTime;
import static
org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS;
import static
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.METADATA_COMPACTION_TIME_SUFFIX;
+import static
org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
import static
org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable;
import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
@@ -870,7 +878,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
// Fetch compaction Commit file and rename to some other file. completed
compaction meta file should have some serialized info that table interprets
// for future upserts. so, renaming the file here to some temp name and
later renaming it back to same name.
java.nio.file.Path parentPath = Paths.get(metadataTableBasePath,
METAFOLDER_NAME);
- java.nio.file.Path metaFilePath =
parentPath.resolve(metadataCompactionInstant + HoodieTimeline.COMMIT_EXTENSION);
+ java.nio.file.Path metaFilePath =
parentPath.resolve(metadataCompactionInstant + COMMIT_EXTENSION);
java.nio.file.Path tempFilePath =
FileCreateUtils.renameFileToTemp(metaFilePath, metadataCompactionInstant);
metaClient.reloadActiveTimeline();
testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter,
Option.of(context));
@@ -903,7 +911,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
// Fetch compaction Commit file and rename to some other file. completed
compaction meta file should have some serialized info that table interprets
// for future upserts. so, renaming the file here to some temp name and
later renaming it back to same name.
parentPath = Paths.get(metadataTableBasePath, METAFOLDER_NAME);
- metaFilePath = parentPath.resolve(metadataCompactionInstant +
HoodieTimeline.COMMIT_EXTENSION);
+ metaFilePath = parentPath.resolve(metadataCompactionInstant +
COMMIT_EXTENSION);
tempFilePath = FileCreateUtils.renameFileToTemp(metaFilePath,
metadataCompactionInstant);
validateMetadata(testTable);
@@ -978,6 +986,115 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
}
}
+ @Test
+ public void testMetadataRollbackDuringInit() throws Exception {
+ HoodieTableType tableType = COPY_ON_WRITE;
+ init(tableType, false);
+ writeConfig = getWriteConfigBuilder(false, true, false)
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+ .enable(true)
+ .withEnableRecordIndex(true)
+ .build())
+ .build();
+
+ HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+ // First write that will be rolled back
+ String newCommitTime1 = "20230809230000000";
+ List<HoodieRecord> records1 = dataGen.generateInserts(newCommitTime1, 100);
+ try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
writeConfig)) {
+ client.startCommitWithTime(newCommitTime1);
+ JavaRDD writeStatuses = client.insert(jsc.parallelize(records1, 1),
newCommitTime1);
+ client.commit(newCommitTime1, writeStatuses);
+ }
+
+ // Revert the first commit to inflight, and move the table to a state
where MDT fails
+ // during the initialization of the second partition (record_index)
+ revertTableToInflightState(writeConfig);
+
+ // Second write
+ String newCommitTime2 = "20230809232000000";
+ List<HoodieRecord> records2 = dataGen.generateInserts(newCommitTime2, 20);
+ try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
writeConfig)) {
+ client.startCommitWithTime(newCommitTime2);
+ JavaRDD writeStatuses = client.insert(jsc.parallelize(records2, 1),
newCommitTime2);
+ client.commit(newCommitTime2, writeStatuses);
+ }
+
+ HoodieTableMetadata metadataReader = HoodieTableMetadata.create(
+ context, writeConfig.getMetadataConfig(), writeConfig.getBasePath());
+ Map<String, HoodieRecordGlobalLocation> result = metadataReader
+
.readRecordIndex(records1.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList()));
+ assertEquals(0, result.size(), "RI should not return entries that are
rolled back.");
+ result = metadataReader
+
.readRecordIndex(records2.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList()));
+ assertEquals(records2.size(), result.size(), "RI should return entries in
the commit.");
+ }
+
+ private void revertTableToInflightState(HoodieWriteConfig writeConfig)
throws IOException {
+ String basePath = writeConfig.getBasePath();
+ String mdtBasePath = getMetadataTableBasePath(basePath);
+ HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+ .setConf(new Configuration())
+ .setBasePath(basePath)
+ .build();
+ HoodieTableMetaClient mdtMetaClient = HoodieTableMetaClient.builder()
+ .setConf(new Configuration())
+ .setBasePath(mdtBasePath)
+ .build();
+ HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+ HoodieActiveTimeline mdtTimeline = mdtMetaClient.getActiveTimeline();
+ assertEquals(1, timeline.countInstants());
+ assertEquals(1,
timeline.getCommitsTimeline().filterCompletedInstants().countInstants());
+ assertEquals(3, mdtTimeline.countInstants());
+ assertEquals(3,
mdtTimeline.getCommitsTimeline().filterCompletedInstants().countInstants());
+ String mdtInitCommit2 =
HoodieTableMetadataUtil.createIndexInitTimestamp(SOLO_COMMIT_TIMESTAMP, 1);
+ Pair<HoodieInstant, HoodieCommitMetadata> lastCommitMetadataWithValidData =
+ mdtTimeline.getLastCommitMetadataWithValidData().get();
+ String commit = lastCommitMetadataWithValidData.getLeft().getTimestamp();
+ assertTrue(timeline.getCommitsTimeline().containsInstant(commit));
+ assertTrue(mdtTimeline.getCommitsTimeline().containsInstant(commit));
+
+ // Transition the last commit to inflight in DT
+ deleteMetaFile(metaClient.getFs(), basePath, commit, COMMIT_EXTENSION);
+
+ // Remove the last commit and written data files in MDT
+ List<String> dataFiles =
lastCommitMetadataWithValidData.getRight().getWriteStats().stream().map(
+ HoodieWriteStat::getPath).collect(Collectors.toList());
+
+ for (String relativeFilePath : dataFiles) {
+ deleteFileFromDfs(metaClient.getFs(), mdtBasePath + "/" +
relativeFilePath);
+ }
+
+ deleteMetaFile(metaClient.getFs(), mdtBasePath, commit,
DELTA_COMMIT_EXTENSION);
+ deleteMetaFile(metaClient.getFs(), mdtBasePath, commit,
DELTA_COMMIT_EXTENSION + INFLIGHT_EXTENSION);
+ deleteMetaFile(metaClient.getFs(), mdtBasePath, commit,
DELTA_COMMIT_EXTENSION + REQUESTED_EXTENSION);
+
+ // Transition the second init commit for record_index partition to
inflight in MDT
+ deleteMetaFile(metaClient.getFs(), mdtBasePath, mdtInitCommit2,
DELTA_COMMIT_EXTENSION);
+ metaClient.getTableConfig().setMetadataPartitionState(
+ metaClient, MetadataPartitionType.RECORD_INDEX, false);
+ metaClient.getTableConfig().setMetadataPartitionsInflight(
+ metaClient, MetadataPartitionType.RECORD_INDEX);
+ timeline = metaClient.getActiveTimeline().reload();
+ mdtTimeline = mdtMetaClient.getActiveTimeline().reload();
+ assertEquals(commit, timeline.lastInstant().get().getTimestamp());
+ assertTrue(timeline.lastInstant().get().isInflight());
+ assertEquals(mdtInitCommit2,
mdtTimeline.lastInstant().get().getTimestamp());
+ assertTrue(mdtTimeline.lastInstant().get().isInflight());
+ }
+
+ public static void deleteFileFromDfs(FileSystem fs, String targetPath)
throws IOException {
+ if (fs.exists(new Path(targetPath))) {
+ fs.delete(new Path(targetPath), true);
+ }
+ }
+
+ public static void deleteMetaFile(FileSystem fs, String basePath, String
instantTime, String suffix) throws IOException {
+ String targetPath = basePath + "/" + METAFOLDER_NAME + "/" + instantTime +
suffix;
+ deleteFileFromDfs(fs, targetPath);
+ }
+
/**
* Test arguments - Table type, populate meta fields, exclude key from
payload.
*/
@@ -2163,7 +2280,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
// make all commits to inflight in metadata table. Still read should go
through, just that it may not return any data.
FileCreateUtils.deleteDeltaCommit(basePath + "/.hoodie/metadata/",
commitTimestamps[0]);
- FileCreateUtils.deleteDeltaCommit(basePath + " /.hoodie/metadata/",
HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP);
+ FileCreateUtils.deleteDeltaCommit(basePath + " /.hoodie/metadata/",
SOLO_COMMIT_TIMESTAMP);
assertEquals(getAllFiles(metadata(client)).stream().map(p ->
p.getName()).map(n ->
FSUtils.getCommitTime(n)).collect(Collectors.toSet()).size(), 0);
}
}