This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e2c7f78d940 [HUDI-5366] Closing metadata writer from within
writeClient (#7437)
e2c7f78d940 is described below
commit e2c7f78d9407bc02a22577ef089d4ae951a144f9
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue Dec 13 08:12:47 2022 -0800
[HUDI-5366] Closing metadata writer from within writeClient (#7437)
* Closing metadata writer from within writeClient
* Close metadata writer in flink client
Co-authored-by: Sagar Sumit <[email protected]>
---
.../hudi/metadata/HoodieBackedTableMetadataWriter.java | 13 +++++++++++++
.../java/org/apache/hudi/client/HoodieFlinkWriteClient.java | 6 ++++++
.../java/org/apache/hudi/client/SparkRDDWriteClient.java | 9 +++++++--
.../hudi/metadata/SparkHoodieBackedTableMetadataWriter.java | 1 +
4 files changed, 27 insertions(+), 2 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 1465ce53100..866fb432ab4 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -752,6 +752,7 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
LOG.warn("Deleting pending indexing instant from the timeline for
partition: " + partitionPath);
deletePendingIndexingInstant(dataMetaClient, partitionPath);
}
+ closeInternal();
}
/**
@@ -885,6 +886,7 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
public void update(HoodieCommitMetadata commitMetadata, String instantTime,
boolean isTableServiceAction) {
processAndCommit(instantTime, () ->
HoodieTableMetadataUtil.convertMetadataToRecords(
engineContext, commitMetadata, instantTime,
getRecordsGenerationParams()), !isTableServiceAction);
+ closeInternal();
}
/**
@@ -897,6 +899,7 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
public void update(HoodieCleanMetadata cleanMetadata, String instantTime) {
processAndCommit(instantTime, () ->
HoodieTableMetadataUtil.convertMetadataToRecords(engineContext,
cleanMetadata, getRecordsGenerationParams(), instantTime), false);
+ closeInternal();
}
/**
@@ -910,6 +913,7 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
processAndCommit(instantTime, () ->
HoodieTableMetadataUtil.convertMetadataToRecords(engineContext,
metadataMetaClient.getActiveTimeline(), restoreMetadata,
getRecordsGenerationParams(), instantTime,
metadata.getSyncedInstantTime()), false);
+ closeInternal();
}
/**
@@ -938,6 +942,7 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
rollbackMetadata, getRecordsGenerationParams(), instantTime,
metadata.getSyncedInstantTime(), wasSynced);
commit(instantTime, records, false);
+ closeInternal();
}
}
@@ -1125,6 +1130,14 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
return filesPartitionRecords.union(fileListRecords);
}
+ protected void closeInternal() {
+ try {
+ close();
+ } catch (Exception e) {
+ throw new HoodieException("Failed to close HoodieMetadata writer ", e);
+ }
+ }
+
/**
* A class which represents a directory and the files and directories inside
it.
* <p>
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 98f58db66cd..cacf8ddb834 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -41,6 +41,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.FlinkHoodieIndexFactory;
import org.apache.hudi.index.HoodieIndex;
@@ -291,6 +292,11 @@ public class HoodieFlinkWriteClient<T extends
HoodieRecordPayload> extends
// jobs.
this.metadataWriter.initTableMetadata();
this.metadataWriter.update(metadata, instantTime,
getHoodieTable().isTableServiceAction(actionType, instantTime));
+ try {
+ this.metadataWriter.close();
+ } catch (Exception e) {
+ throw new HoodieException("Failed to close metadata writer ", e);
+ }
}
/**
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index a73c92c383f..54a8607ee47 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -45,8 +45,8 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieClusteringException;
-import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.SparkHoodieIndexFactory;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
@@ -511,8 +511,13 @@ public class SparkRDDWriteClient<T extends
HoodieRecordPayload> extends
*/
private void initializeMetadataTable(Option<String>
inFlightInstantTimestamp) {
if (config.isMetadataTableEnabled()) {
-
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(),
config,
+ HoodieTableMetadataWriter writer =
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(),
config,
context, Option.empty(), inFlightInstantTimestamp);
+ try {
+ writer.close();
+ } catch (Exception e) {
+ throw new HoodieException("Failed to instantiate Metadata table ", e);
+ }
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 7d94b2d4f53..272d3d47985 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -192,5 +192,6 @@ public class SparkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
writeClient.startCommitWithTime(instantTime, actionType);
writeClient.deletePartitions(partitionsToDrop, instantTime);
}
+ closeInternal();
}
}