This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 48f89aeb1c7 [HUDI-6397][HUDI-6759] Fixing misc bugs w/ metadata table
(#9546)
48f89aeb1c7 is described below
commit 48f89aeb1c732d1ba1a99832b908d99558ac7c9c
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed Sep 6 13:56:21 2023 -0400
[HUDI-6397][HUDI-6759] Fixing misc bugs w/ metadata table (#9546)
1. This commit allows users to disable metadata using write configs cleanly.
2. Valid instants consideration while reading from MDT is solid now. We are
going to treat any special instant time (that has additional suffix compared to
DT's commit time) as valid.
Especially with MDT partition initialization, the suffix is dynamic, and so
we can't really find exact match. So, might have to go with total instant time
length and treat all special instant times as valid ones.
In the LogRecordReader, we will first ignore any uncommitted instants. And
then if it's completed in MDT timeline, we check w/ the instantRange. So it
should be fine to return true for any special instant times.
---
.../metadata/HoodieBackedTableMetadataWriter.java | 2 +-
.../java/org/apache/hudi/table/HoodieTable.java | 6 +----
.../org/apache/hudi/table/HoodieSparkTable.java | 3 ++-
.../functional/TestHoodieBackedMetadata.java | 28 ++++++++++++++++++----
.../hudi/metadata/HoodieBackedTableMetadata.java | 1 +
.../hudi/metadata/HoodieTableMetadataUtil.java | 11 +++++----
.../sink/TestStreamWriteOperatorCoordinator.java | 9 +++----
7 files changed, 40 insertions(+), 20 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 460bfa2c6e2..8a930ba5972 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -172,7 +172,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
this.dataMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(dataWriteConfig.getBasePath()).build();
- if (dataMetaClient.getTableConfig().isMetadataTableAvailable() ||
writeConfig.isMetadataTableEnabled()) {
+ if (writeConfig.isMetadataTableEnabled()) {
this.metadataWriteConfig =
HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig,
failedWritesCleaningPolicy);
try {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index f1de637edf5..101931f8c76 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -1003,12 +1003,8 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
// Only execute metadata table deletion when all the following conditions
are met
// (1) This is data table
// (2) Metadata table is disabled in HoodieWriteConfig for the writer
- // (3) Check `HoodieTableConfig.TABLE_METADATA_PARTITIONS`. Either the
table config
- // does not exist, or the table config is non-empty indicating that
metadata table
- // partitions are ready to use
return !HoodieTableMetadata.isMetadataTable(metaClient.getBasePath())
- && !config.isMetadataTableEnabled()
- && !metaClient.getTableConfig().getMetadataPartitions().isEmpty();
+ && !config.isMetadataTableEnabled();
}
/**
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
index a5202fb7bbe..111b254634b 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
@@ -91,7 +91,7 @@ public abstract class HoodieSparkTable<T>
protected Option<HoodieTableMetadataWriter> getMetadataWriter(
String triggeringInstantTimestamp,
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy) {
- if (config.isMetadataTableEnabled() ||
metaClient.getTableConfig().isMetadataTableAvailable()) {
+ if (config.isMetadataTableEnabled()) {
// if any partition is deleted, we need to reload the metadata table
writer so that new table configs are picked up
// to reflect the delete mdt partitions.
deleteMetadataIndexIfNecessary();
@@ -112,6 +112,7 @@ public abstract class HoodieSparkTable<T>
throw new HoodieMetadataException("Checking existence of metadata
table failed", e);
}
} else {
+ // if metadata is not enabled in the write config, we should try and
delete it (if present)
maybeDeleteMetadataTable();
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 26dc41f73a3..6f6c4b65b11 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -270,7 +270,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
validateMetadata(client);
}
// check table config
- HoodieTableMetaClient.reload(metaClient);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTableConfig tableConfig = metaClient.getTableConfig();
assertFalse(tableConfig.getMetadataPartitions().isEmpty());
assertTrue(tableConfig.getMetadataPartitions().contains(FILES.getPartitionPath()));
@@ -295,7 +295,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
validateMetadata(client);
}
// check table config
- HoodieTableMetaClient.reload(metaClient);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
tableConfig = metaClient.getTableConfig();
assertFalse(tableConfig.getMetadataPartitions().isEmpty());
assertTrue(tableConfig.getMetadataPartitions().contains(FILES.getPartitionPath()));
@@ -321,7 +321,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
validateMetadata(client);
}
// check table config
- HoodieTableMetaClient.reload(metaClient);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
tableConfig = metaClient.getTableConfig();
assertFalse(tableConfig.getMetadataPartitions().isEmpty());
assertTrue(tableConfig.getMetadataPartitions().contains(FILES.getPartitionPath()));
@@ -347,15 +347,33 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
validateMetadata(client);
}
// check table config
- HoodieTableMetaClient.reload(metaClient);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
tableConfig = metaClient.getTableConfig();
assertFalse(tableConfig.getMetadataPartitions().isEmpty());
assertTrue(tableConfig.getMetadataPartitions().contains(FILES.getPartitionPath()));
assertTrue(tableConfig.getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath()));
assertTrue(tableConfig.getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath()));
+
+ // disable entire MDT and validate its deleted
+ HoodieWriteConfig cfgWithMetadataDisabled =
getConfigBuilder(TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.BLOOM,
HoodieFailedWritesCleaningPolicy.EAGER)
+ .withParallelism(1,
1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+ .build();
+
+ try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
cfgWithMetadataDisabled)) {
+ // Upsert
+ String commitTime = "0000006";
+ client.startCommitWithTime(commitTime);
+ List<HoodieRecord> records = dataGen.generateUniqueUpdates(commitTime,
10);
+ List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records,
1), commitTime).collect();
+ assertNoWriteErrors(writeStatuses);
+ }
+
+ // check table config
+ tableConfig = HoodieTableMetaClient.reload(metaClient).getTableConfig();
+ assertTrue(tableConfig.getMetadataPartitions().isEmpty());
}
- @Disabled("HUDI-6397")
@Test
public void testTurnOffMetadataTableAfterEnable() throws Exception {
init(COPY_ON_WRITE, true);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 373945975be..d0ec7f020ab 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -605,6 +605,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
dataMetaClient.reloadActiveTimeline();
if (metadataMetaClient != null) {
metadataMetaClient.reloadActiveTimeline();
+ metadataFileSystemView.close();
metadataFileSystemView = getFileSystemView(metadataMetaClient);
}
// the cached reader has max instant time restriction, they should be
cleared
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 861f8fc8ddd..9367b7b0a07 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -153,6 +153,8 @@ public class HoodieTableMetadataUtil {
// This suffix and all after that are used for initialization of the various
partitions. The unused suffixes lower than this value
// are reserved for future operations on the MDT.
private static final int PARTITION_INITIALIZATION_TIME_SUFFIX = 10; //
corresponds to "010";
+ // we have max of 4 partitions (FILES, COL_STATS, BLOOM, RLI)
+ private static final List<String>
VALID_PARTITION_INITIALIZATION_TIME_SUFFIXES =
Arrays.asList("010","011","012","013");
/**
* Returns whether the files partition of metadata table is ready for read.
@@ -1282,13 +1284,14 @@ public class HoodieTableMetadataUtil {
validInstantTimestamps.addAll(getRollbackedCommits(instant,
datasetTimeline));
});
- // add restore instants from MDT.
+ // add restore and rollback instants from MDT.
metadataMetaClient.getActiveTimeline().getRollbackAndRestoreTimeline().filterCompletedInstants()
- .filter(instant ->
instant.getAction().equals(HoodieTimeline.RESTORE_ACTION))
+ .filter(instant ->
instant.getAction().equals(HoodieTimeline.RESTORE_ACTION) ||
instant.getAction().equals(HoodieTimeline.ROLLBACK_ACTION))
.getInstants().forEach(instant ->
validInstantTimestamps.add(instant.getTimestamp()));
- // SOLO_COMMIT_TIMESTAMP is used during bootstrap so it is a valid
timestamp
- validInstantTimestamps.add(createIndexInitTimestamp(SOLO_COMMIT_TIMESTAMP,
PARTITION_INITIALIZATION_TIME_SUFFIX));
+
metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants()
+ .filter(instant ->
instant.getTimestamp().startsWith(SOLO_COMMIT_TIMESTAMP))
+ .getInstants().forEach(instant ->
validInstantTimestamps.add(instant.getTimestamp()));
return validInstantTimestamps;
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index ee2f50cb20c..9e979a9fbd0 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -322,19 +322,20 @@ public class TestStreamWriteOperatorCoordinator {
assertThat(completedTimeline.lastInstant().get().getTimestamp(),
startsWith(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP));
// test metadata table log compaction
- // write another 5 commits
- for (int i = 1; i < 6; i++) {
+ // already 1 commit is used to initialized FILES partition in MDT
+ // write another 4 commits
+ for (int i = 1; i < 5; i++) {
instant = mockWriteWithMetadata();
metadataTableMetaClient.reloadActiveTimeline();
completedTimeline =
metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
assertThat("One instant need to sync to metadata table",
completedTimeline.countInstants(), is(i + 1));
assertThat(completedTimeline.lastInstant().get().getTimestamp(),
is(instant));
}
- // the 6th commit triggers the log compaction
+ // the 5th commit triggers the log compaction
mockWriteWithMetadata();
metadataTableMetaClient.reloadActiveTimeline();
completedTimeline =
metadataTableMetaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants();
- assertThat("One instant need to sync to metadata table",
completedTimeline.countInstants(), is(8));
+ assertThat("One instant need to sync to metadata table",
completedTimeline.countInstants(), is(7));
assertThat(completedTimeline.nthFromLastInstant(1).get().getTimestamp(),
is(instant + "005"));
// log compaction is another delta commit
assertThat(completedTimeline.nthFromLastInstant(1).get().getAction(),
is(HoodieTimeline.DELTA_COMMIT_ACTION));