This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.12.2-shadow
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 07be18cf1add17bf90010104a019a80f9530ccc2
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue Dec 13 08:12:47 2022 -0800

    [HUDI-5366] Closing metadata writer from within writeClient (#7437)
    
    * Closing metadata writer from within writeClient
    
    * Close metadata writer in flink client
    
    Co-authored-by: Sagar Sumit <[email protected]>
---
 .../hudi/metadata/HoodieBackedTableMetadataWriter.java      | 13 +++++++++++++
 .../java/org/apache/hudi/client/HoodieFlinkWriteClient.java |  6 ++++++
 .../java/org/apache/hudi/client/SparkRDDWriteClient.java    |  8 +++++++-
 .../hudi/metadata/SparkHoodieBackedTableMetadataWriter.java |  1 +
 4 files changed, 27 insertions(+), 1 deletion(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 42e35cfa30f..9d758052f71 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -751,6 +751,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
       LOG.warn("Deleting pending indexing instant from the timeline for 
partition: " + partitionPath);
       deletePendingIndexingInstant(dataMetaClient, partitionPath);
     }
+    closeInternal();
   }
 
   /**
@@ -884,6 +885,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
   public void update(HoodieCommitMetadata commitMetadata, String instantTime, 
boolean isTableServiceAction) {
     processAndCommit(instantTime, () -> 
HoodieTableMetadataUtil.convertMetadataToRecords(
         engineContext, commitMetadata, instantTime, 
getRecordsGenerationParams()), !isTableServiceAction);
+    closeInternal();
   }
 
   /**
@@ -896,6 +898,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
   public void update(HoodieCleanMetadata cleanMetadata, String instantTime) {
     processAndCommit(instantTime, () -> 
HoodieTableMetadataUtil.convertMetadataToRecords(engineContext,
         cleanMetadata, getRecordsGenerationParams(), instantTime), false);
+    closeInternal();
   }
 
   /**
@@ -909,6 +912,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
     processAndCommit(instantTime, () -> 
HoodieTableMetadataUtil.convertMetadataToRecords(engineContext,
         metadataMetaClient.getActiveTimeline(), restoreMetadata, 
getRecordsGenerationParams(), instantTime,
         metadata.getSyncedInstantTime()), false);
+    closeInternal();
   }
 
   /**
@@ -937,6 +941,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
               rollbackMetadata, getRecordsGenerationParams(), instantTime,
               metadata.getSyncedInstantTime(), wasSynced);
       commit(instantTime, records, false);
+      closeInternal();
     }
   }
 
@@ -1124,6 +1129,14 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
     return filesPartitionRecords.union(fileListRecords);
   }
 
+  protected void closeInternal() {
+    try {
+      close();
+    } catch (Exception e) {
+      throw new HoodieException("Failed to close HoodieMetadata writer ", e);
+    }
+  }
+
   /**
    * A class which represents a directory and the files and directories inside 
it.
    * <p>
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index b4003712c56..5430dc6eecc 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -43,6 +43,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieClusteringException;
 import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.index.FlinkHoodieIndexFactory;
 import org.apache.hudi.index.HoodieIndex;
@@ -290,6 +291,11 @@ public class HoodieFlinkWriteClient<T extends 
HoodieRecordPayload> extends
     // jobs.
     this.metadataWriter.initTableMetadata();
     this.metadataWriter.update(metadata, instantTime, 
getHoodieTable().isTableServiceAction(actionType, instantTime));
+    try {
+      this.metadataWriter.close();
+    } catch (Exception e) {
+      throw new HoodieException("Failed to close metadata writer ", e);
+    }
   }
 
   /**
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index fa568a7a6fe..fe3a0063567 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -45,6 +45,7 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieClusteringException;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieWriteConflictException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.SparkHoodieIndexFactory;
@@ -453,8 +454,13 @@ public class SparkRDDWriteClient<T extends 
HoodieRecordPayload> extends
    */
   private void initializeMetadataTable(Option<String> 
inFlightInstantTimestamp) {
     if (config.isMetadataTableEnabled()) {
-      
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), 
config,
+      HoodieTableMetadataWriter writer = 
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), 
config,
           context, Option.empty(), inFlightInstantTimestamp);
+      try {
+        writer.close();
+      } catch (Exception e) {
+        throw new HoodieException("Failed to instantiate Metadata table ", e);
+      }
     }
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 7d94b2d4f53..272d3d47985 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -192,5 +192,6 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
       writeClient.startCommitWithTime(instantTime, actionType);
       writeClient.deletePartitions(partitionsToDrop, instantTime);
     }
+    closeInternal();
   }
 }

Reply via email to