This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.12.2-shadow in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 07be18cf1add17bf90010104a019a80f9530ccc2 Author: Sivabalan Narayanan <[email protected]> AuthorDate: Tue Dec 13 08:12:47 2022 -0800 [HUDI-5366] Closing metadata writer from within writeClient (#7437) * Closing metadata writer from within writeClient * Close metadata writer in flink client Co-authored-by: Sagar Sumit <[email protected]> --- .../hudi/metadata/HoodieBackedTableMetadataWriter.java | 13 +++++++++++++ .../java/org/apache/hudi/client/HoodieFlinkWriteClient.java | 6 ++++++ .../java/org/apache/hudi/client/SparkRDDWriteClient.java | 8 +++++++- .../hudi/metadata/SparkHoodieBackedTableMetadataWriter.java | 1 + 4 files changed, 27 insertions(+), 1 deletion(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 42e35cfa30f..9d758052f71 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -751,6 +751,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta LOG.warn("Deleting pending indexing instant from the timeline for partition: " + partitionPath); deletePendingIndexingInstant(dataMetaClient, partitionPath); } + closeInternal(); } /** @@ -884,6 +885,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta public void update(HoodieCommitMetadata commitMetadata, String instantTime, boolean isTableServiceAction) { processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords( engineContext, commitMetadata, instantTime, getRecordsGenerationParams()), !isTableServiceAction); + closeInternal(); } /** @@ -896,6 +898,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta public void update(HoodieCleanMetadata cleanMetadata, String instantTime) { processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, cleanMetadata, getRecordsGenerationParams(), instantTime), false); + closeInternal(); } /** @@ -909,6 +912,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, metadataMetaClient.getActiveTimeline(), restoreMetadata, getRecordsGenerationParams(), instantTime, metadata.getSyncedInstantTime()), false); + closeInternal(); } /** @@ -937,6 +941,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta rollbackMetadata, getRecordsGenerationParams(), instantTime, metadata.getSyncedInstantTime(), wasSynced); commit(instantTime, records, false); + closeInternal(); } } @@ -1124,6 +1129,14 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta return filesPartitionRecords.union(fileListRecords); } + protected void closeInternal() { + try { + close(); + } catch (Exception e) { + throw new HoodieException("Failed to close HoodieMetadata writer ", e); + } + } + /** * A class which represents a directory and the files and directories inside it. * <p> diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index b4003712c56..5430dc6eecc 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -43,6 +43,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.exception.HoodieCommitException; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.index.FlinkHoodieIndexFactory; import org.apache.hudi.index.HoodieIndex; @@ -290,6 +291,11 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends // jobs. this.metadataWriter.initTableMetadata(); this.metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType, instantTime)); + try { + this.metadataWriter.close(); + } catch (Exception e) { + throw new HoodieException("Failed to close metadata writer ", e); + } } /** diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index fa568a7a6fe..fe3a0063567 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -45,6 +45,7 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.exception.HoodieClusteringException; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieWriteConflictException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.SparkHoodieIndexFactory; @@ -453,8 +454,13 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends */ private void initializeMetadataTable(Option<String> inFlightInstantTimestamp) { if (config.isMetadataTableEnabled()) { - SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, + HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context, Option.empty(), inFlightInstantTimestamp); + try { + writer.close(); + } catch (Exception e) { + throw new HoodieException("Failed to instantiate Metadata table ", e); + } } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index 7d94b2d4f53..272d3d47985 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -192,5 +192,6 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad writeClient.startCommitWithTime(instantTime, actionType); writeClient.deletePartitions(partitionsToDrop, instantTime); } + closeInternal(); } }
