This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ceb03a47a4 [HUDI-6665] Fixing metadata writer close (#9385)
1ceb03a47a4 is described below

commit 1ceb03a47a49dce623703e0b17ee235603ca5f99
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue Aug 8 01:23:17 2023 -0400

    [HUDI-6665] Fixing metadata writer close (#9385)
    
    Closing metadata writer for all invocations
---
 .../hudi/client/BaseHoodieTableServiceClient.java  | 13 ++++-
 .../apache/hudi/client/BaseHoodieWriteClient.java  | 31 ++++++++----
 .../hudi/table/action/BaseActionExecutor.java      | 55 ++++++++++++++++++++--
 .../table/action/index/RunIndexActionExecutor.java | 54 +++++++++++----------
 .../functional/TestExternalPathHandling.java       | 40 ++++++++--------
 5 files changed, 137 insertions(+), 56 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index 141cc9b5be6..e55fb045e1e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -699,7 +699,18 @@ public abstract class BaseHoodieTableServiceClient<I, T, 
O> extends BaseHoodieCl
   protected void writeTableMetadata(HoodieTable table, String instantTime, 
String actionType, HoodieCommitMetadata metadata, HoodieData<WriteStatus> 
writeStatuses) {
     checkArgument(table.isTableServiceAction(actionType, instantTime), 
String.format("Unsupported action: %s.%s is not table service.", actionType, 
instantTime));
     context.setJobStatus(this.getClass().getSimpleName(), "Committing to 
metadata table: " + config.getTableName());
-    table.getMetadataWriter(instantTime).ifPresent(w -> 
((HoodieTableMetadataWriter) w).update(metadata, writeStatuses, instantTime));
+    Option<HoodieTableMetadataWriter> metadataWriterOpt = 
table.getMetadataWriter(instantTime);
+    if (metadataWriterOpt.isPresent()) {
+      try (HoodieTableMetadataWriter metadataWriter = metadataWriterOpt.get()) 
{
+        metadataWriter.update(metadata, writeStatuses, instantTime);
+      } catch (Exception e) {
+        if (e instanceof HoodieException) {
+          throw (HoodieException) e;
+        } else {
+          throw new HoodieException("Failed to update metadata", e);
+        }
+      }
+    }
   }
 
   /**
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 220393b4fb2..6b03c5234f0 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -60,7 +60,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.exception.HoodieRestoreException;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.exception.HoodieSavepointException;
@@ -361,7 +360,18 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
       tableServiceClient.writeTableMetadata(table, instantTime, actionType, 
metadata, writeStatuses);
     } else {
       context.setJobStatus(this.getClass().getSimpleName(), "Committing to 
metadata table: " + config.getTableName());
-      table.getMetadataWriter(instantTime).ifPresent(w -> 
((HoodieTableMetadataWriter) w).update(metadata, writeStatuses, instantTime));
+      Option<HoodieTableMetadataWriter> metadataWriterOpt = 
table.getMetadataWriter(instantTime);
+      if (metadataWriterOpt.isPresent()) {
+        try (HoodieTableMetadataWriter metadataWriter = 
metadataWriterOpt.get()) {
+          metadataWriter.update(metadata, writeStatuses, instantTime);
+        } catch (Exception e) {
+          if (e instanceof HoodieException) {
+            throw (HoodieException) e;
+          } else {
+            throw new HoodieException("Failed to update metadata", e);
+          }
+        }
+      }
     }
   }
 
@@ -1016,13 +1026,18 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
     this.txnManager.beginTransaction(Option.of(ownerInstant), Option.empty());
     try {
       context.setJobStatus(this.getClass().getSimpleName(), "Dropping 
partitions from metadata table: " + config.getTableName());
-      table.getMetadataWriter(dropInstant).ifPresent(w -> {
-        try {
-          ((HoodieTableMetadataWriter) 
w).dropMetadataPartitions(partitionTypes);
-        } catch (IOException e) {
-          throw new HoodieIndexException("Failed to drop metadata index. ", e);
+      Option<HoodieTableMetadataWriter> metadataWriterOpt = 
table.getMetadataWriter(dropInstant);
+      if (metadataWriterOpt.isPresent()) {
+        try (HoodieTableMetadataWriter metadataWriter = 
metadataWriterOpt.get()) {
+          metadataWriter.dropMetadataPartitions(partitionTypes);
+        } catch (Exception e) {
+          if (e instanceof HoodieException) {
+            throw (HoodieException) e;
+          } else {
+            throw new HoodieException("Failed to drop partitions from 
metadata", e);
+          }
         }
-      });
+      }
     } finally {
       this.txnManager.endTransaction(Option.of(ownerInstant));
     }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
index 1800031f1a8..31966203660 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
@@ -28,7 +28,10 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.table.HoodieTable;
 
 public abstract class BaseActionExecutor<T, I, K, O, R> implements 
Serializable {
@@ -58,7 +61,18 @@ public abstract class BaseActionExecutor<T, I, K, O, R> 
implements Serializable
    * @param metadata commit metadata of interest.
    */
   protected final void writeTableMetadata(HoodieCommitMetadata metadata, 
HoodieData<WriteStatus> writeStatus, String actionType) {
-    table.getMetadataWriter(instantTime).ifPresent(w -> w.update(metadata, 
writeStatus, instantTime));
+    Option<HoodieTableMetadataWriter> metadataWriterOpt = 
table.getMetadataWriter(instantTime);
+    if (metadataWriterOpt.isPresent()) {
+      try (HoodieTableMetadataWriter metadataWriter = metadataWriterOpt.get()) 
{
+        metadataWriter.update(metadata, writeStatus, instantTime);
+      } catch (Exception e) {
+        if (e instanceof HoodieException) {
+          throw (HoodieException) e;
+        } else {
+          throw new HoodieException("Failed to update metadata", e);
+        }
+      }
+    }
   }
 
   /**
@@ -66,7 +80,18 @@ public abstract class BaseActionExecutor<T, I, K, O, R> 
implements Serializable
    * @param metadata clean metadata of interest.
    */
   protected final void writeTableMetadata(HoodieCleanMetadata metadata, String 
instantTime) {
-    table.getMetadataWriter(instantTime).ifPresent(w -> w.update(metadata, 
instantTime));
+    Option<HoodieTableMetadataWriter> metadataWriterOpt = 
table.getMetadataWriter(instantTime);
+    if (metadataWriterOpt.isPresent()) {
+      try (HoodieTableMetadataWriter metadataWriter = metadataWriterOpt.get()) 
{
+        metadataWriter.update(metadata, instantTime);
+      } catch (Exception e) {
+        if (e instanceof HoodieException) {
+          throw (HoodieException) e;
+        } else {
+          throw new HoodieException("Failed to apply clean commit to 
metadata", e);
+        }
+      }
+    }
   }
 
   /**
@@ -74,7 +99,18 @@ public abstract class BaseActionExecutor<T, I, K, O, R> 
implements Serializable
    * @param metadata rollback metadata of interest.
    */
   protected final void writeTableMetadata(HoodieRollbackMetadata metadata) {
-    table.getMetadataWriter(instantTime).ifPresent(w -> w.update(metadata, 
instantTime));
+    Option<HoodieTableMetadataWriter> metadataWriterOpt = 
table.getMetadataWriter(instantTime);
+    if (metadataWriterOpt.isPresent()) {
+      try (HoodieTableMetadataWriter metadataWriter = metadataWriterOpt.get()) 
{
+        metadataWriter.update(metadata, instantTime);
+      } catch (Exception e) {
+        if (e instanceof HoodieException) {
+          throw (HoodieException) e;
+        } else {
+          throw new HoodieException("Failed to apply rollbacks in metadata", 
e);
+        }
+      }
+    }
   }
 
   /**
@@ -82,6 +118,17 @@ public abstract class BaseActionExecutor<T, I, K, O, R> 
implements Serializable
    * @param metadata restore metadata of interest.
    */
   protected final void writeTableMetadata(HoodieRestoreMetadata metadata) {
-    table.getMetadataWriter(instantTime).ifPresent(w -> w.update(metadata, 
instantTime));
+    Option<HoodieTableMetadataWriter> metadataWriterOpt = 
table.getMetadataWriter(instantTime);
+    if (metadataWriterOpt.isPresent()) {
+      try (HoodieTableMetadataWriter metadataWriter = metadataWriterOpt.get()) 
{
+        metadataWriter.update(metadata, instantTime);
+      } catch (Exception e) {
+        if (e instanceof HoodieException) {
+          throw (HoodieException) e;
+        } else {
+          throw new HoodieException("Failed to apply restore to metadata", e);
+        }
+      }
+    }
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
index 93e8a8f859f..9b91167899c 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
@@ -39,6 +39,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.table.HoodieTable;
@@ -49,6 +50,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
@@ -135,34 +137,38 @@ public class RunIndexActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I,
       List<HoodieIndexPartitionInfo> finalIndexPartitionInfos = null;
       if (!firstTimeInitializingMetadataTable) {
         // start indexing for each partition
-        HoodieTableMetadataWriter metadataWriter = 
table.getIndexingMetadataWriter(instantTime)
+        try (HoodieTableMetadataWriter metadataWriter = 
table.getIndexingMetadataWriter(instantTime)
             .orElseThrow(() -> new HoodieIndexException(String.format(
-                "Could not get metadata writer to run index action for 
instant: %s", instantTime)));
-        // this will only build index upto base instant as generated by the 
plan, we will be doing catchup later
-        String indexUptoInstant = 
indexPartitionInfos.get(0).getIndexUptoInstant();
-        LOG.info("Starting Index Building with base instant: " + 
indexUptoInstant);
-        metadataWriter.buildMetadataPartitions(context, indexPartitionInfos);
+                "Could not get metadata writer to run index action for 
instant: %s", instantTime)))) {
+          // this will only build index upto base instant as generated by the 
plan, we will be doing catchup later
+          String indexUptoInstant = 
indexPartitionInfos.get(0).getIndexUptoInstant();
+          LOG.info("Starting Index Building with base instant: " + 
indexUptoInstant);
+          metadataWriter.buildMetadataPartitions(context, indexPartitionInfos);
 
-        // get remaining instants to catchup
-        List<HoodieInstant> instantsToCatchup = 
getInstantsToCatchup(indexUptoInstant);
-        LOG.info("Total remaining instants to index: " + 
instantsToCatchup.size());
+          // get remaining instants to catchup
+          List<HoodieInstant> instantsToCatchup = 
getInstantsToCatchup(indexUptoInstant);
+          LOG.info("Total remaining instants to index: " + 
instantsToCatchup.size());
 
-        // reconcile with metadata table timeline
-        String metadataBasePath = 
getMetadataTableBasePath(table.getMetaClient().getBasePath());
-        HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataBasePath).build();
-        Set<String> metadataCompletedTimestamps = 
getCompletedArchivedAndActiveInstantsAfter(indexUptoInstant, 
metadataMetaClient).stream()
-            .map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
+          // reconcile with metadata table timeline
+          String metadataBasePath = 
getMetadataTableBasePath(table.getMetaClient().getBasePath());
+          HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataBasePath).build();
+          Set<String> metadataCompletedTimestamps = 
getCompletedArchivedAndActiveInstantsAfter(indexUptoInstant, 
metadataMetaClient).stream()
+              .map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
 
-        // index catchup for all remaining instants with a timeout
-        currentCaughtupInstant = indexUptoInstant;
-        catchupWithInflightWriters(metadataWriter, instantsToCatchup, 
metadataMetaClient, metadataCompletedTimestamps);
-        // save index commit metadata and update table config
-        finalIndexPartitionInfos = indexPartitionInfos.stream()
-            .map(info -> new HoodieIndexPartitionInfo(
-                info.getVersion(),
-                info.getMetadataPartitionPath(),
-                currentCaughtupInstant))
-            .collect(Collectors.toList());
+          // index catchup for all remaining instants with a timeout
+          currentCaughtupInstant = indexUptoInstant;
+          catchupWithInflightWriters(metadataWriter, instantsToCatchup, 
metadataMetaClient, metadataCompletedTimestamps);
+          // save index commit metadata and update table config
+          finalIndexPartitionInfos = indexPartitionInfos.stream()
+              .map(info -> new HoodieIndexPartitionInfo(
+                  info.getVersion(),
+                  info.getMetadataPartitionPath(),
+                  currentCaughtupInstant))
+              .collect(Collectors.toList());
+        } catch (Exception e) {
+          throw new HoodieMetadataException("Failed to index partition " + 
Arrays.toString(indexPartitionInfos.stream()
+                .map(entry -> 
entry.getMetadataPartitionPath()).collect(Collectors.toList()).toArray()));
+        }
       } else {
         String indexUptoInstant = fileIndexPartitionInfo.getIndexUptoInstant();
         // save index commit metadata and update table config
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
index 9f28290f1ad..0785f9eea76 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
@@ -156,27 +156,29 @@ public class TestExternalPathHandling extends 
HoodieClientTestBase {
         cleanTime,
         Option.empty(),
         cleanStats);
-    HoodieTableMetadataWriter hoodieTableMetadataWriter = 
(HoodieTableMetadataWriter) writeClient.initTable(WriteOperationType.UPSERT, 
Option.of(cleanTime)).getMetadataWriter(cleanTime).get();
-    hoodieTableMetadataWriter.update(cleanMetadata, cleanTime);
-    
metaClient.getActiveTimeline().transitionCleanInflightToComplete(inflightClean,
-        TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
-    // make sure we still get the same results as before
-    assertFileGroupCorrectness(instantTime2, partitionPath1, filePath2, 
fileId2, partitionPath2.isEmpty() ? 2 : 1);
-    assertFileGroupCorrectness(instantTime3, partitionPath2, filePath3, 
fileId3, partitionPath2.isEmpty() ? 2 : 1);
+    try (HoodieTableMetadataWriter hoodieTableMetadataWriter = 
(HoodieTableMetadataWriter) writeClient.initTable(WriteOperationType.UPSERT, 
Option.of(cleanTime)).getMetadataWriter(cleanTime).get()) {
+      hoodieTableMetadataWriter.update(cleanMetadata, cleanTime);
+      
metaClient.getActiveTimeline().transitionCleanInflightToComplete(inflightClean,
+          TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
+      // make sure we still get the same results as before
+      assertFileGroupCorrectness(instantTime2, partitionPath1, filePath2, 
fileId2, partitionPath2.isEmpty() ? 2 : 1);
+      assertFileGroupCorrectness(instantTime3, partitionPath2, filePath3, 
fileId3, partitionPath2.isEmpty() ? 2 : 1);
 
-    // trigger archiver manually
-    writeClient.archive();
-    // assert commit was archived
-    Assertions.assertEquals(1, 
metaClient.getArchivedTimeline().reload().filterCompletedInstants().countInstants());
-    // make sure we still get the same results as before
-    assertFileGroupCorrectness(instantTime2, partitionPath1, filePath2, 
fileId2, partitionPath2.isEmpty() ? 2 : 1);
-    assertFileGroupCorrectness(instantTime3, partitionPath2, filePath3, 
fileId3, partitionPath2.isEmpty() ? 2 : 1);
+      // trigger archiver manually
+      writeClient.archive();
+      // assert commit was archived
+      Assertions.assertEquals(1, 
metaClient.getArchivedTimeline().reload().filterCompletedInstants().countInstants());
+      // make sure we still get the same results as before
+      assertFileGroupCorrectness(instantTime2, partitionPath1, filePath2, 
fileId2, partitionPath2.isEmpty() ? 2 : 1);
+      assertFileGroupCorrectness(instantTime3, partitionPath2, filePath3, 
fileId3, partitionPath2.isEmpty() ? 2 : 1);
+
+      // assert that column stats are correct
+      HoodieBackedTableMetadata hoodieBackedTableMetadata = new 
HoodieBackedTableMetadata(context, writeConfig.getMetadataConfig(), 
writeConfig.getBasePath(), true);
+      assertEmptyColStats(hoodieBackedTableMetadata, partitionPath1, 
fileName1);
+      assertColStats(hoodieBackedTableMetadata, partitionPath1, fileName2);
+      assertColStats(hoodieBackedTableMetadata, partitionPath2, fileName3);
+    }
 
-    // assert that column stats are correct
-    HoodieBackedTableMetadata hoodieBackedTableMetadata = new 
HoodieBackedTableMetadata(context, writeConfig.getMetadataConfig(), 
writeConfig.getBasePath(), true);
-    assertEmptyColStats(hoodieBackedTableMetadata, partitionPath1, fileName1);
-    assertColStats(hoodieBackedTableMetadata, partitionPath1, fileName2);
-    assertColStats(hoodieBackedTableMetadata, partitionPath2, fileName3);
   }
 
   static Stream<Arguments> getArgs() {

Reply via email to