This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 47e5dddf7e6 [HUDI-8352] Delete and recreate MDT for insert overwrite
table operation (#12097)
47e5dddf7e6 is described below
commit 47e5dddf7e64f79728398233314faf72c02e5283
Author: Lin Liu <[email protected]>
AuthorDate: Tue Oct 15 21:41:23 2024 -0700
[HUDI-8352] Delete and recreate MDT for insert overwrite table operation
(#12097)
---
.../hudi/table/action/BaseActionExecutor.java | 8 ++
.../org/apache/hudi/TestMetadataTableSupport.java | 113 +++++++++++++++++++++
2 files changed, 121 insertions(+)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
index 06f6b67f99f..d0a7acd676c 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
@@ -25,6 +25,7 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
@@ -64,6 +65,13 @@ public abstract class BaseActionExecutor<T, I, K, O, R>
implements Serializable
* @param metadata commit metadata of interest.
*/
protected final void writeTableMetadata(HoodieCommitMetadata metadata,
HoodieData<WriteStatus> writeStatus, String actionType) {
+ // Recreate MDT for insert_overwrite_table operation.
+ if (table.getConfig().isMetadataTableEnabled()
+ && WriteOperationType.INSERT_OVERWRITE_TABLE ==
metadata.getOperationType()) {
+ HoodieTableMetadataUtil.deleteMetadataTable(table.getMetaClient(),
table.getContext(), false);
+ }
+
+ // MDT should be recreated if it has been deleted for
insert_overwrite_table operation.
Option<HoodieTableMetadataWriter> metadataWriterOpt =
table.getMetadataWriter(instantTime);
if (metadataWriterOpt.isPresent()) {
try (HoodieTableMetadataWriter metadataWriter = metadataWriterOpt.get())
{
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestMetadataTableSupport.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestMetadataTableSupport.java
new file mode 100644
index 00000000000..905a67cea79
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestMetadataTableSupport.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.testutils.HoodieSparkClientTestBase;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestMetadataTableSupport extends HoodieSparkClientTestBase {
+ @BeforeEach
+ void start() throws Exception {
+ super.setUp();
+ }
+
+ @AfterEach
+ void end() throws Exception {
+ super.tearDown();
+ }
+
+ @Test
+ void testRecreateMDTForInsertOverwriteTableOperation() {
+ HoodieWriteConfig config = getConfigBuilder()
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+ .enable(true)
+ .withEnableRecordIndex(true).build())
+ .build();
+
+ try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) {
+ // Insert first batch.
+ String timestamp0 = "20241015000000000";
+ List<HoodieRecord> records0 = dataGen.generateInserts(timestamp0, 100);
+ JavaRDD<HoodieRecord> dataset0 = jsc.parallelize(records0, 2);
+
+ writeClient.startCommitWithTime(timestamp0);
+ writeClient.insert(dataset0, timestamp0).collect();
+
+ // Confirm MDT enabled.
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ assertTrue(metaClient.getTableConfig().isMetadataTableAvailable());
+
+ // Confirm the instant for the first insert exists.
+ StoragePath mdtBasePath =
+
HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath());
+ HoodieTableMetaClient mdtMetaClient = HoodieTableMetaClient.builder()
+ .setConf(storageConf.newInstance())
+ .setBasePath(mdtBasePath).build();
+ HoodieActiveTimeline timeline = mdtMetaClient.getActiveTimeline();
+ List<HoodieInstant> instants = timeline.getInstants();
+ assertEquals(3, instants.size());
+ // For MDT bootstrap instant.
+ assertEquals("00000000000000000", instants.get(0).getTimestamp());
+ // For RLI bootstrap instant.
+ assertEquals("00000000000000001", instants.get(1).getTimestamp());
+ // For the insert instant.
+ assertEquals(timestamp0, instants.get(2).getTimestamp());
+
+ // Insert second batch.
+ String timestamp1 = "20241015000000001";
+ List<HoodieRecord> records1 = dataGen.generateInserts(timestamp1, 50);
+ JavaRDD<HoodieRecord> dataset1 = jsc.parallelize(records1, 2);
+
+ writeClient.startCommitWithTime(timestamp1, REPLACE_COMMIT_ACTION);
+ writeClient.insertOverwriteTable(dataset1, timestamp1);
+
+ // Validate.
+ mdtMetaClient = HoodieTableMetaClient.reload(mdtMetaClient);
+ timeline = mdtMetaClient.getActiveTimeline();
+ instants = timeline.getInstants();
+ assertEquals(3, timeline.getInstants().size());
+ // For MDT bootstrap instant.
+ assertEquals("00000000000000000", instants.get(0).getTimestamp());
+ // For RLI bootstrap instant.
+ assertEquals("00000000000000001", instants.get(1).getTimestamp());
+ // For the insert_overwrite_table instant.
+ assertEquals(timestamp1, instants.get(2).getTimestamp());
+ }
+ }
+}