This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 135387c31774c41130ec3aaa5e02d033aaaa9817 Author: Sivabalan Narayanan <[email protected]> AuthorDate: Wed Sep 6 13:56:21 2023 -0400 [HUDI-6397][HUDI-6759] Fixing misc bugs w/ metadata table (#9546) 1. This commit allows users to disable metadata using write configs cleanly. 2. Valid instants consideration while reading from MDT is solid now. We are going to treat any special instant time (that has additional suffix compared to DT's commit time) as valid. Especially with MDT partition initialization, the suffix is dynamic, and so we can't really find exact match. So, might have to go with total instant time length and treat all special instant times as valid ones. In the LogRecordReader, we will first ignore any uncommitted instants. And then if it's completed in MDT timeline, we check w/ the instantRange. So it should be fine to return true for any special instant times. --- .../metadata/HoodieBackedTableMetadataWriter.java | 2 +- .../java/org/apache/hudi/table/HoodieTable.java | 6 +---- .../org/apache/hudi/table/HoodieSparkTable.java | 3 ++- .../functional/TestHoodieBackedMetadata.java | 28 ++++++++++++++++++---- .../hudi/metadata/HoodieBackedTableMetadata.java | 1 + .../hudi/metadata/HoodieTableMetadataUtil.java | 11 +++++---- .../sink/TestStreamWriteOperatorCoordinator.java | 9 +++---- 7 files changed, 40 insertions(+), 20 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 460bfa2c6e2..8a930ba5972 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -172,7 +172,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM this.dataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(dataWriteConfig.getBasePath()).build(); - if (dataMetaClient.getTableConfig().isMetadataTableAvailable() || writeConfig.isMetadataTableEnabled()) { + if (writeConfig.isMetadataTableEnabled()) { this.metadataWriteConfig = HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig, failedWritesCleaningPolicy); try { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index f1de637edf5..101931f8c76 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -1003,12 +1003,8 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable { // Only execute metadata table deletion when all the following conditions are met // (1) This is data table // (2) Metadata table is disabled in HoodieWriteConfig for the writer - // (3) Check `HoodieTableConfig.TABLE_METADATA_PARTITIONS`. Either the table config - // does not exist, or the table config is non-empty indicating that metadata table - // partitions are ready to use return !HoodieTableMetadata.isMetadataTable(metaClient.getBasePath()) - && !config.isMetadataTableEnabled() - && !metaClient.getTableConfig().getMetadataPartitions().isEmpty(); + && !config.isMetadataTableEnabled(); } /** diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java index a5202fb7bbe..111b254634b 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java @@ -91,7 +91,7 @@ public abstract class HoodieSparkTable<T> protected Option<HoodieTableMetadataWriter> getMetadataWriter( String triggeringInstantTimestamp, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy) { - if (config.isMetadataTableEnabled() || metaClient.getTableConfig().isMetadataTableAvailable()) { + if (config.isMetadataTableEnabled()) { // if any partition is deleted, we need to reload the metadata table writer so that new table configs are picked up // to reflect the delete mdt partitions. deleteMetadataIndexIfNecessary(); @@ -112,6 +112,7 @@ public abstract class HoodieSparkTable<T> throw new HoodieMetadataException("Checking existence of metadata table failed", e); } } else { + // if metadata is not enabled in the write config, we should try and delete it (if present) maybeDeleteMetadataTable(); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 26dc41f73a3..6f6c4b65b11 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -270,7 +270,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { validateMetadata(client); } // check table config - HoodieTableMetaClient.reload(metaClient); + metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTableConfig tableConfig = metaClient.getTableConfig(); assertFalse(tableConfig.getMetadataPartitions().isEmpty()); assertTrue(tableConfig.getMetadataPartitions().contains(FILES.getPartitionPath())); @@ -295,7 +295,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { validateMetadata(client); } // check table config - HoodieTableMetaClient.reload(metaClient); + metaClient = HoodieTableMetaClient.reload(metaClient); tableConfig = metaClient.getTableConfig(); assertFalse(tableConfig.getMetadataPartitions().isEmpty()); assertTrue(tableConfig.getMetadataPartitions().contains(FILES.getPartitionPath())); @@ -321,7 +321,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { validateMetadata(client); } // check table config - HoodieTableMetaClient.reload(metaClient); + metaClient = HoodieTableMetaClient.reload(metaClient); tableConfig = metaClient.getTableConfig(); assertFalse(tableConfig.getMetadataPartitions().isEmpty()); assertTrue(tableConfig.getMetadataPartitions().contains(FILES.getPartitionPath())); @@ -347,15 +347,33 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { validateMetadata(client); } // check table config - HoodieTableMetaClient.reload(metaClient); + metaClient = HoodieTableMetaClient.reload(metaClient); tableConfig = metaClient.getTableConfig(); assertFalse(tableConfig.getMetadataPartitions().isEmpty()); assertTrue(tableConfig.getMetadataPartitions().contains(FILES.getPartitionPath())); assertTrue(tableConfig.getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath())); assertTrue(tableConfig.getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath())); + + // disable entire MDT and validate its deleted + HoodieWriteConfig cfgWithMetadataDisabled = getConfigBuilder(TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER) + .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) + .build(); + + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, cfgWithMetadataDisabled)) { + // Upsert + String commitTime = "0000006"; + client.startCommitWithTime(commitTime); + List<HoodieRecord> records = dataGen.generateUniqueUpdates(commitTime, 10); + List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), commitTime).collect(); + assertNoWriteErrors(writeStatuses); + } + + // check table config + tableConfig = HoodieTableMetaClient.reload(metaClient).getTableConfig(); + assertTrue(tableConfig.getMetadataPartitions().isEmpty()); } - @Disabled("HUDI-6397") @Test public void testTurnOffMetadataTableAfterEnable() throws Exception { init(COPY_ON_WRITE, true); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 373945975be..d0ec7f020ab 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -605,6 +605,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { dataMetaClient.reloadActiveTimeline(); if (metadataMetaClient != null) { metadataMetaClient.reloadActiveTimeline(); + metadataFileSystemView.close(); metadataFileSystemView = getFileSystemView(metadataMetaClient); } // the cached reader has max instant time restriction, they should be cleared diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 861f8fc8ddd..9367b7b0a07 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -153,6 +153,8 @@ public class HoodieTableMetadataUtil { // This suffix and all after that are used for initialization of the various partitions. The unused suffixes lower than this value // are reserved for future operations on the MDT. private static final int PARTITION_INITIALIZATION_TIME_SUFFIX = 10; // corresponds to "010"; + // we have max of 4 partitions (FILES, COL_STATS, BLOOM, RLI) + private static final List<String> VALID_PARTITION_INITIALIZATION_TIME_SUFFIXES = Arrays.asList("010","011","012","013"); /** * Returns whether the files partition of metadata table is ready for read. @@ -1282,13 +1284,14 @@ public class HoodieTableMetadataUtil { validInstantTimestamps.addAll(getRollbackedCommits(instant, datasetTimeline)); }); - // add restore instants from MDT. + // add restore and rollback instants from MDT. metadataMetaClient.getActiveTimeline().getRollbackAndRestoreTimeline().filterCompletedInstants() - .filter(instant -> instant.getAction().equals(HoodieTimeline.RESTORE_ACTION)) + .filter(instant -> instant.getAction().equals(HoodieTimeline.RESTORE_ACTION) || instant.getAction().equals(HoodieTimeline.ROLLBACK_ACTION)) .getInstants().forEach(instant -> validInstantTimestamps.add(instant.getTimestamp())); - // SOLO_COMMIT_TIMESTAMP is used during bootstrap so it is a valid timestamp - validInstantTimestamps.add(createIndexInitTimestamp(SOLO_COMMIT_TIMESTAMP, PARTITION_INITIALIZATION_TIME_SUFFIX)); + metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants() + .filter(instant -> instant.getTimestamp().startsWith(SOLO_COMMIT_TIMESTAMP)) + .getInstants().forEach(instant -> validInstantTimestamps.add(instant.getTimestamp())); return validInstantTimestamps; } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java index ee2f50cb20c..9e979a9fbd0 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java @@ -322,19 +322,20 @@ public class TestStreamWriteOperatorCoordinator { assertThat(completedTimeline.lastInstant().get().getTimestamp(), startsWith(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP)); // test metadata table log compaction - // write another 5 commits - for (int i = 1; i < 6; i++) { + // already 1 commit is used to initialized FILES partition in MDT + // write another 4 commits + for (int i = 1; i < 5; i++) { instant = mockWriteWithMetadata(); metadataTableMetaClient.reloadActiveTimeline(); completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants(); assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(i + 1)); assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant)); } - // the 6th commit triggers the log compaction + // the 5th commit triggers the log compaction mockWriteWithMetadata(); metadataTableMetaClient.reloadActiveTimeline(); completedTimeline = metadataTableMetaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants(); - assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(8)); + assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(7)); assertThat(completedTimeline.nthFromLastInstant(1).get().getTimestamp(), is(instant + "005")); // log compaction is another delta commit assertThat(completedTimeline.nthFromLastInstant(1).get().getAction(), is(HoodieTimeline.DELTA_COMMIT_ACTION));
