This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 47e5dddf7e6 [HUDI-8352] Delete and recreate MDT for insert overwrite 
table operation (#12097)
47e5dddf7e6 is described below

commit 47e5dddf7e64f79728398233314faf72c02e5283
Author: Lin Liu <[email protected]>
AuthorDate: Tue Oct 15 21:41:23 2024 -0700

    [HUDI-8352] Delete and recreate MDT for insert overwrite table operation 
(#12097)
---
 .../hudi/table/action/BaseActionExecutor.java      |   8 ++
 .../org/apache/hudi/TestMetadataTableSupport.java  | 113 +++++++++++++++++++++
 2 files changed, 121 insertions(+)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
index 06f6b67f99f..d0a7acd676c 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
@@ -25,6 +25,7 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
@@ -64,6 +65,13 @@ public abstract class BaseActionExecutor<T, I, K, O, R> 
implements Serializable
    * @param metadata commit metadata of interest.
    */
   protected final void writeTableMetadata(HoodieCommitMetadata metadata, 
HoodieData<WriteStatus> writeStatus, String actionType) {
+    // Recreate MDT for insert_overwrite_table operation.
+    if (table.getConfig().isMetadataTableEnabled()
+        && WriteOperationType.INSERT_OVERWRITE_TABLE == 
metadata.getOperationType()) {
+      HoodieTableMetadataUtil.deleteMetadataTable(table.getMetaClient(), 
table.getContext(), false);
+    }
+
+    // MDT should be recreated if it has been deleted for 
insert_overwrite_table operation.
     Option<HoodieTableMetadataWriter> metadataWriterOpt = 
table.getMetadataWriter(instantTime);
     if (metadataWriterOpt.isPresent()) {
       try (HoodieTableMetadataWriter metadataWriter = metadataWriterOpt.get()) 
{
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestMetadataTableSupport.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestMetadataTableSupport.java
new file mode 100644
index 00000000000..905a67cea79
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestMetadataTableSupport.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.testutils.HoodieSparkClientTestBase;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestMetadataTableSupport extends HoodieSparkClientTestBase {
+  @BeforeEach
+  void start() throws Exception {
+    super.setUp();
+  }
+
+  @AfterEach
+  void end() throws Exception {
+    super.tearDown();
+  }
+
+  @Test
+  void testRecreateMDTForInsertOverwriteTableOperation() {
+    HoodieWriteConfig config = getConfigBuilder()
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(true)
+            .withEnableRecordIndex(true).build())
+        .build();
+
+    try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) {
+      // Insert first batch.
+      String timestamp0 = "20241015000000000";
+      List<HoodieRecord> records0 = dataGen.generateInserts(timestamp0, 100);
+      JavaRDD<HoodieRecord> dataset0 = jsc.parallelize(records0, 2);
+
+      writeClient.startCommitWithTime(timestamp0);
+      writeClient.insert(dataset0, timestamp0).collect();
+
+      // Confirm MDT enabled.
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      assertTrue(metaClient.getTableConfig().isMetadataTableAvailable());
+
+      // Confirm the instant for the first insert exists.
+      StoragePath mdtBasePath =
+          
HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath());
+      HoodieTableMetaClient mdtMetaClient = HoodieTableMetaClient.builder()
+          .setConf(storageConf.newInstance())
+          .setBasePath(mdtBasePath).build();
+      HoodieActiveTimeline timeline = mdtMetaClient.getActiveTimeline();
+      List<HoodieInstant> instants = timeline.getInstants();
+      assertEquals(3, instants.size());
+      // For MDT bootstrap instant.
+      assertEquals("00000000000000000", instants.get(0).getTimestamp());
+      // For RLI bootstrap instant.
+      assertEquals("00000000000000001", instants.get(1).getTimestamp());
+      // For the insert instant.
+      assertEquals(timestamp0, instants.get(2).getTimestamp());
+
+      // Insert second batch.
+      String timestamp1 = "20241015000000001";
+      List<HoodieRecord> records1 = dataGen.generateInserts(timestamp1, 50);
+      JavaRDD<HoodieRecord> dataset1 = jsc.parallelize(records1, 2);
+
+      writeClient.startCommitWithTime(timestamp1, REPLACE_COMMIT_ACTION);
+      writeClient.insertOverwriteTable(dataset1, timestamp1);
+
+      // Validate.
+      mdtMetaClient = HoodieTableMetaClient.reload(mdtMetaClient);
+      timeline = mdtMetaClient.getActiveTimeline();
+      instants = timeline.getInstants();
+      assertEquals(3, timeline.getInstants().size());
+      // For MDT bootstrap instant.
+      assertEquals("00000000000000000", instants.get(0).getTimestamp());
+      // For RLI bootstrap instant.
+      assertEquals("00000000000000001", instants.get(1).getTimestamp());
+      // For the insert_overwrite_table instant.
+      assertEquals(timestamp1, instants.get(2).getTimestamp());
+    }
+  }
+}

Reply via email to