This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch release-0.14.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 90e3378207d5fcd4a0ad560e160b0ece06d096f0
Author: Danny Chan <[email protected]>
AuthorDate: Thu Aug 17 09:06:00 2023 +0800

    [HUDI-6704] Fix Flink metadata table update (#9456)
---
 .../hudi/client/BaseHoodieTableServiceClient.java  | 11 +++-----
 .../apache/hudi/client/BaseHoodieWriteClient.java  | 29 +++++++++-------------
 .../java/org/apache/hudi/table/HoodieTable.java    | 22 ----------------
 .../hudi/client/HoodieFlinkTableServiceClient.java | 13 ++--------
 .../apache/hudi/client/HoodieFlinkWriteClient.java |  5 ----
 5 files changed, 18 insertions(+), 62 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index 7e78bddd875..0af2ace25f0 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -86,7 +86,6 @@ import java.util.stream.Stream;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
-import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
 import static org.apache.hudi.metadata.HoodieTableMetadata.isMetadataTable;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.isIndexingCommit;
 
@@ -329,7 +328,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
       this.txnManager.beginTransaction(Option.of(compactionInstant), 
Option.empty());
       finalizeWrite(table, compactionCommitTime, writeStats);
       // commit to data table after committing to metadata table.
-      writeTableMetadata(table, compactionCommitTime, COMPACTION_ACTION, 
metadata, context.emptyHoodieData());
+      writeTableMetadata(table, compactionCommitTime, metadata, 
context.emptyHoodieData());
       LOG.info("Committing Compaction " + compactionCommitTime + ". Finished 
with result " + metadata);
       CompactHelpers.getInstance().completeInflightCompaction(table, 
compactionCommitTime, metadata);
     } finally {
@@ -389,7 +388,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
       preCommit(metadata);
       finalizeWrite(table, logCompactionCommitTime, writeStats);
       // commit to data table after committing to metadata table.
-      writeTableMetadata(table, logCompactionCommitTime, 
HoodieTimeline.LOG_COMPACTION_ACTION, metadata, context.emptyHoodieData());
+      writeTableMetadata(table, logCompactionCommitTime, metadata, 
context.emptyHoodieData());
       LOG.info("Committing Log Compaction " + logCompactionCommitTime + ". 
Finished with result " + metadata);
       CompactHelpers.getInstance().completeInflightLogCompaction(table, 
logCompactionCommitTime, metadata);
     } finally {
@@ -496,7 +495,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
         preCommit(metadata);
       }
       // Update table's metadata (table)
-      writeTableMetadata(table, clusteringInstant.getTimestamp(), 
clusteringInstant.getAction(), metadata, 
writeStatuses.orElse(context.emptyHoodieData()));
+      writeTableMetadata(table, clusteringInstant.getTimestamp(), metadata, 
writeStatuses.orElse(context.emptyHoodieData()));
 
       LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished 
with result " + metadata);
 
@@ -692,12 +691,10 @@ public abstract class BaseHoodieTableServiceClient<I, T, 
O> extends BaseHoodieCl
    *
    * @param table         {@link HoodieTable} of interest.
    * @param instantTime   instant time of the commit.
-   * @param actionType    action type of the commit.
    * @param metadata      instance of {@link HoodieCommitMetadata}.
    * @param writeStatuses Write statuses of the commit
    */
-  protected void writeTableMetadata(HoodieTable table, String instantTime, 
String actionType, HoodieCommitMetadata metadata, HoodieData<WriteStatus> 
writeStatuses) {
-    checkArgument(table.isTableServiceAction(actionType, instantTime), 
String.format("Unsupported action: %s.%s is not table service.", actionType, 
instantTime));
+  protected void writeTableMetadata(HoodieTable table, String instantTime, 
HoodieCommitMetadata metadata, HoodieData<WriteStatus> writeStatuses) {
     context.setJobStatus(this.getClass().getSimpleName(), "Committing to 
metadata table: " + config.getTableName());
     Option<HoodieTableMetadataWriter> metadataWriterOpt = 
table.getMetadataWriter(instantTime);
     if (metadataWriterOpt.isPresent()) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 6b03c5234f0..4840a0b5882 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -282,7 +282,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
       saveInternalSchema(table, instantTime, metadata);
     }
     // update Metadata table
-    writeTableMetadata(table, instantTime, commitActionType, metadata, 
writeStatuses);
+    writeTableMetadata(table, instantTime, metadata, writeStatuses);
     activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType, 
instantTime),
         Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
   }
@@ -351,25 +351,20 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
    *
    * @param table         {@link HoodieTable} of interest.
    * @param instantTime   instant time of the commit.
-   * @param actionType    action type of the commit.
    * @param metadata      instance of {@link HoodieCommitMetadata}.
    * @param writeStatuses WriteStatuses for the completed action.
    */
-  protected void writeTableMetadata(HoodieTable table, String instantTime, 
String actionType, HoodieCommitMetadata metadata, HoodieData<WriteStatus> 
writeStatuses) {
-    if (table.isTableServiceAction(actionType, instantTime)) {
-      tableServiceClient.writeTableMetadata(table, instantTime, actionType, 
metadata, writeStatuses);
-    } else {
-      context.setJobStatus(this.getClass().getSimpleName(), "Committing to 
metadata table: " + config.getTableName());
-      Option<HoodieTableMetadataWriter> metadataWriterOpt = 
table.getMetadataWriter(instantTime);
-      if (metadataWriterOpt.isPresent()) {
-        try (HoodieTableMetadataWriter metadataWriter = 
metadataWriterOpt.get()) {
-          metadataWriter.update(metadata, writeStatuses, instantTime);
-        } catch (Exception e) {
-          if (e instanceof HoodieException) {
-            throw (HoodieException) e;
-          } else {
-            throw new HoodieException("Failed to update metadata", e);
-          }
+  protected void writeTableMetadata(HoodieTable table, String instantTime, 
HoodieCommitMetadata metadata, HoodieData<WriteStatus> writeStatuses) {
+    context.setJobStatus(this.getClass().getSimpleName(), "Committing to 
metadata table: " + config.getTableName());
+    Option<HoodieTableMetadataWriter> metadataWriterOpt = 
table.getMetadataWriter(instantTime);
+    if (metadataWriterOpt.isPresent()) {
+      try (HoodieTableMetadataWriter metadataWriter = metadataWriterOpt.get()) 
{
+        metadataWriter.update(metadata, writeStatuses, instantTime);
+      } catch (Exception e) {
+        if (e instanceof HoodieException) {
+          throw (HoodieException) e;
+        } else {
+          throw new HoodieException("Failed to update metadata", e);
         }
       }
     }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 12584be55a4..59fa69de2e6 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -45,7 +45,6 @@ import org.apache.hudi.common.fs.OptimisticConsistencyGuard;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -59,7 +58,6 @@ import 
org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
 import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
-import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.Functions;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
@@ -903,26 +901,6 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
     return getMetadataWriter(triggeringInstantTimestamp, EAGER);
   }
 
-  /**
-   * Check if action type is a table service.
-   * @param actionType action type of the instant
-   * @param instantTime instant time of the instant.
-   * @return true if action represents a table service. false otherwise.
-   */
-  public boolean isTableServiceAction(String actionType, String instantTime) {
-    if (actionType.equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
-      Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = 
ClusteringUtils.getClusteringPlan(metaClient, new 
HoodieInstant(HoodieInstant.State.NIL, actionType, instantTime));
-      // only clustering is table service with replace commit action
-      return instantPlan.isPresent();
-    } else {
-      if (this.metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) {
-        return !actionType.equals(HoodieTimeline.COMMIT_ACTION);
-      } else {
-        return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION);
-      }
-    }
-  }
-
   /**
    * Gets the metadata writer for async indexer.
    *
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
index 72f266fae55..68c32acca24 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
@@ -85,7 +85,7 @@ public class HoodieFlinkTableServiceClient<T> extends 
BaseHoodieTableServiceClie
       // commit to data table after committing to metadata table.
       // Do not do any conflict resolution here as we do with regular writes. 
We take the lock here to ensure all writes to metadata table happens within a
       // single lock (single writer). Because more than one write to metadata 
table will result in conflicts since all of them updates the same partition.
-      writeTableMetadata(table, compactionCommitTime, 
compactionInstant.getAction(), metadata, context.emptyHoodieData());
+      writeTableMetadata(table, compactionCommitTime, metadata, 
context.emptyHoodieData());
       LOG.info("Committing Compaction {} finished with result {}.", 
compactionCommitTime, metadata);
       CompactHelpers.getInstance().completeInflightCompaction(table, 
compactionCommitTime, metadata);
     } finally {
@@ -132,7 +132,7 @@ public class HoodieFlinkTableServiceClient<T> extends 
BaseHoodieTableServiceClie
       // commit to data table after committing to metadata table.
       // We take the lock here to ensure all writes to metadata table happens 
within a single lock (single writer).
       // Because more than one write to metadata table will result in 
conflicts since all of them updates the same partition.
-      writeTableMetadata(table, clusteringCommitTime, 
clusteringInstant.getAction(), metadata, 
writeStatuses.orElse(context.emptyHoodieData()));
+      writeTableMetadata(table, clusteringCommitTime, metadata, 
writeStatuses.orElse(context.emptyHoodieData()));
 
       LOG.info("Committing Clustering {} finished with result {}.", 
clusteringCommitTime, metadata);
       table.getActiveTimeline().transitionReplaceInflightToComplete(
@@ -189,15 +189,6 @@ public class HoodieFlinkTableServiceClient<T> extends 
BaseHoodieTableServiceClie
     return HoodieFlinkTable.create(config, context);
   }
 
-  @Override
-  public void writeTableMetadata(HoodieTable table, String instantTime, String 
actionType, HoodieCommitMetadata metadata, HoodieData<WriteStatus> 
writeStatuses) {
-    try (HoodieBackedTableMetadataWriter metadataWriter = 
initMetadataWriter(Option.empty())) {
-      metadataWriter.update(metadata, writeStatuses, instantTime);
-    } catch (Exception e) {
-      throw new HoodieException("Failed to update metadata", e);
-    }
-  }
-
   /**
    * Initialize the table metadata writer, for e.g, bootstrap the metadata 
table
    * from the filesystem if it does not exist.
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index b4763d4eef4..ed1a3408f67 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -311,11 +311,6 @@ public class HoodieFlinkWriteClient<T> extends
     }
   }
 
-  @Override
-  protected void writeTableMetadata(HoodieTable table, String instantTime, 
String actionType, HoodieCommitMetadata metadata, HoodieData<WriteStatus> 
writeStatuses) {
-    tableServiceClient.writeTableMetadata(table, instantTime, actionType, 
metadata, writeStatuses);
-  }
-
   /**
    * Initialized the metadata table on start up, should only be called once on 
driver.
    */

Reply via email to