yuzhaojing commented on code in PR #6732:
URL: https://github.com/apache/hudi/pull/6732#discussion_r1062150964
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java:
##########
@@ -283,44 +270,14 @@ public void preWrite(String instantTime,
WriteOperationType writeOperationType,
@Override
protected void writeTableMetadata(HoodieTable table, String instantTime,
String actionType, HoodieCommitMetadata metadata) {
- try (HoodieBackedTableMetadataWriter metadataWriter =
initMetadataWriter()) {
- metadataWriter.update(metadata, instantTime,
getHoodieTable().isTableServiceAction(actionType, instantTime));
- } catch (Exception e) {
- throw new HoodieException("Failed to update metadata", e);
- }
- }
-
- /**
- * Initialize the table metadata writer, for e.g, bootstrap the metadata
table
- * from the filesystem if it does not exist.
- */
- private HoodieBackedTableMetadataWriter initMetadataWriter() {
- return (HoodieBackedTableMetadataWriter)
FlinkHoodieBackedTableMetadataWriter.create(
- FlinkClientUtil.getHadoopConf(), this.config,
HoodieFlinkEngineContext.DEFAULT);
+ tableServiceClient.writeTableMetadata(table, instantTime, actionType,
metadata);
}
/**
* Initialized the metadata table on start up, should only be called once on
driver.
*/
public void initMetadataTable() {
- HoodieFlinkTable<?> table = getHoodieTable();
- if (config.isMetadataTableEnabled()) {
- // initialize the metadata table path
- // guard the metadata writer with concurrent lock
- try {
- this.txnManager.getLockManager().lock();
- initMetadataWriter().close();
- } catch (Exception e) {
- throw new HoodieException("Failed to initialize metadata table", e);
- } finally {
- this.txnManager.getLockManager().unlock();
- }
- // clean the obsolete index stats
- table.deleteMetadataIndexIfNecessary();
- } else {
- // delete the metadata table if it was enabled but is now disabled
- table.maybeDeleteMetadataTable();
- }
+ ((HoodieFlinkTableServiceClient<T>)
tableServiceClient).initMetadataTable();
Review Comment:
Ok, I will start a follow up task to normalize the initialization of the
flink MDT.
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java:
##########
@@ -283,44 +270,14 @@ public void preWrite(String instantTime,
WriteOperationType writeOperationType,
@Override
protected void writeTableMetadata(HoodieTable table, String instantTime,
String actionType, HoodieCommitMetadata metadata) {
- try (HoodieBackedTableMetadataWriter metadataWriter =
initMetadataWriter()) {
- metadataWriter.update(metadata, instantTime,
getHoodieTable().isTableServiceAction(actionType, instantTime));
- } catch (Exception e) {
- throw new HoodieException("Failed to update metadata", e);
- }
- }
-
- /**
- * Initialize the table metadata writer, for e.g, bootstrap the metadata
table
- * from the filesystem if it does not exist.
- */
- private HoodieBackedTableMetadataWriter initMetadataWriter() {
- return (HoodieBackedTableMetadataWriter)
FlinkHoodieBackedTableMetadataWriter.create(
- FlinkClientUtil.getHadoopConf(), this.config,
HoodieFlinkEngineContext.DEFAULT);
+ tableServiceClient.writeTableMetadata(table, instantTime, actionType,
metadata);
Review Comment:
Ok.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]