This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e2c7f78d940 [HUDI-5366] Closing metadata writer from within 
writeClient (#7437)
e2c7f78d940 is described below

commit e2c7f78d9407bc02a22577ef089d4ae951a144f9
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue Dec 13 08:12:47 2022 -0800

    [HUDI-5366] Closing metadata writer from within writeClient (#7437)
    
    * Closing metadata writer from within writeClient
    
    * Close metadata writer in flink client
    
    Co-authored-by: Sagar Sumit <[email protected]>
---
 .../hudi/metadata/HoodieBackedTableMetadataWriter.java      | 13 +++++++++++++
 .../java/org/apache/hudi/client/HoodieFlinkWriteClient.java |  6 ++++++
 .../java/org/apache/hudi/client/SparkRDDWriteClient.java    |  9 +++++++--
 .../hudi/metadata/SparkHoodieBackedTableMetadataWriter.java |  1 +
 4 files changed, 27 insertions(+), 2 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 1465ce53100..866fb432ab4 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -752,6 +752,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
       LOG.warn("Deleting pending indexing instant from the timeline for 
partition: " + partitionPath);
       deletePendingIndexingInstant(dataMetaClient, partitionPath);
     }
+    closeInternal();
   }
 
   /**
@@ -885,6 +886,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
   public void update(HoodieCommitMetadata commitMetadata, String instantTime, 
boolean isTableServiceAction) {
     processAndCommit(instantTime, () -> 
HoodieTableMetadataUtil.convertMetadataToRecords(
         engineContext, commitMetadata, instantTime, 
getRecordsGenerationParams()), !isTableServiceAction);
+    closeInternal();
   }
 
   /**
@@ -897,6 +899,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
   public void update(HoodieCleanMetadata cleanMetadata, String instantTime) {
     processAndCommit(instantTime, () -> 
HoodieTableMetadataUtil.convertMetadataToRecords(engineContext,
         cleanMetadata, getRecordsGenerationParams(), instantTime), false);
+    closeInternal();
   }
 
   /**
@@ -910,6 +913,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
     processAndCommit(instantTime, () -> 
HoodieTableMetadataUtil.convertMetadataToRecords(engineContext,
         metadataMetaClient.getActiveTimeline(), restoreMetadata, 
getRecordsGenerationParams(), instantTime,
         metadata.getSyncedInstantTime()), false);
+    closeInternal();
   }
 
   /**
@@ -938,6 +942,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
               rollbackMetadata, getRecordsGenerationParams(), instantTime,
               metadata.getSyncedInstantTime(), wasSynced);
       commit(instantTime, records, false);
+      closeInternal();
     }
   }
 
@@ -1125,6 +1130,14 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
     return filesPartitionRecords.union(fileListRecords);
   }
 
+  protected void closeInternal() {
+    try {
+      close();
+    } catch (Exception e) {
+      throw new HoodieException("Failed to close HoodieMetadata writer ", e);
+    }
+  }
+
   /**
    * A class which represents a directory and the files and directories inside 
it.
    * <p>
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 98f58db66cd..cacf8ddb834 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -41,6 +41,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieClusteringException;
 import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.index.FlinkHoodieIndexFactory;
 import org.apache.hudi.index.HoodieIndex;
@@ -291,6 +292,11 @@ public class HoodieFlinkWriteClient<T extends 
HoodieRecordPayload> extends
     // jobs.
     this.metadataWriter.initTableMetadata();
     this.metadataWriter.update(metadata, instantTime, 
getHoodieTable().isTableServiceAction(actionType, instantTime));
+    try {
+      this.metadataWriter.close();
+    } catch (Exception e) {
+      throw new HoodieException("Failed to close metadata writer ", e);
+    }
   }
 
   /**
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index a73c92c383f..54a8607ee47 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -45,8 +45,8 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieClusteringException;
-import org.apache.hudi.exception.HoodieWriteConflictException;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieWriteConflictException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.SparkHoodieIndexFactory;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
@@ -511,8 +511,13 @@ public class SparkRDDWriteClient<T extends 
HoodieRecordPayload> extends
    */
   private void initializeMetadataTable(Option<String> 
inFlightInstantTimestamp) {
     if (config.isMetadataTableEnabled()) {
-      
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), 
config,
+      HoodieTableMetadataWriter writer = 
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), 
config,
           context, Option.empty(), inFlightInstantTimestamp);
+      try {
+        writer.close();
+      } catch (Exception e) {
+        throw new HoodieException("Failed to instantiate Metadata table ", e);
+      }
     }
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 7d94b2d4f53..272d3d47985 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -192,5 +192,6 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
       writeClient.startCommitWithTime(instantTime, actionType);
       writeClient.deletePartitions(partitionsToDrop, instantTime);
     }
+    closeInternal();
   }
 }

Reply via email to