This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 90e3378207d5fcd4a0ad560e160b0ece06d096f0 Author: Danny Chan <[email protected]> AuthorDate: Thu Aug 17 09:06:00 2023 +0800 [HUDI-6704] Fix Flink metadata table update (#9456) --- .../hudi/client/BaseHoodieTableServiceClient.java | 11 +++----- .../apache/hudi/client/BaseHoodieWriteClient.java | 29 +++++++++------------- .../java/org/apache/hudi/table/HoodieTable.java | 22 ---------------- .../hudi/client/HoodieFlinkTableServiceClient.java | 13 ++-------- .../apache/hudi/client/HoodieFlinkWriteClient.java | 5 ---- 5 files changed, 18 insertions(+), 62 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java index 7e78bddd875..0af2ace25f0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java @@ -86,7 +86,6 @@ import java.util.stream.Stream; import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION; import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN; -import static org.apache.hudi.common.util.ValidationUtils.checkArgument; import static org.apache.hudi.metadata.HoodieTableMetadata.isMetadataTable; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.isIndexingCommit; @@ -329,7 +328,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> extends BaseHoodieCl this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty()); finalizeWrite(table, compactionCommitTime, writeStats); // commit to data table after committing to metadata table. - writeTableMetadata(table, compactionCommitTime, COMPACTION_ACTION, metadata, context.emptyHoodieData()); + writeTableMetadata(table, compactionCommitTime, metadata, context.emptyHoodieData()); LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata); CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata); } finally { @@ -389,7 +388,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> extends BaseHoodieCl preCommit(metadata); finalizeWrite(table, logCompactionCommitTime, writeStats); // commit to data table after committing to metadata table. - writeTableMetadata(table, logCompactionCommitTime, HoodieTimeline.LOG_COMPACTION_ACTION, metadata, context.emptyHoodieData()); + writeTableMetadata(table, logCompactionCommitTime, metadata, context.emptyHoodieData()); LOG.info("Committing Log Compaction " + logCompactionCommitTime + ". Finished with result " + metadata); CompactHelpers.getInstance().completeInflightLogCompaction(table, logCompactionCommitTime, metadata); } finally { @@ -496,7 +495,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> extends BaseHoodieCl preCommit(metadata); } // Update table's metadata (table) - writeTableMetadata(table, clusteringInstant.getTimestamp(), clusteringInstant.getAction(), metadata, writeStatuses.orElse(context.emptyHoodieData())); + writeTableMetadata(table, clusteringInstant.getTimestamp(), metadata, writeStatuses.orElse(context.emptyHoodieData())); LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished with result " + metadata); @@ -692,12 +691,10 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> extends BaseHoodieCl * * @param table {@link HoodieTable} of interest. * @param instantTime instant time of the commit. - * @param actionType action type of the commit. * @param metadata instance of {@link HoodieCommitMetadata}. * @param writeStatuses Write statuses of the commit */ - protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata, HoodieData<WriteStatus> writeStatuses) { - checkArgument(table.isTableServiceAction(actionType, instantTime), String.format("Unsupported action: %s.%s is not table service.", actionType, instantTime)); + protected void writeTableMetadata(HoodieTable table, String instantTime, HoodieCommitMetadata metadata, HoodieData<WriteStatus> writeStatuses) { context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table: " + config.getTableName()); Option<HoodieTableMetadataWriter> metadataWriterOpt = table.getMetadataWriter(instantTime); if (metadataWriterOpt.isPresent()) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 6b03c5234f0..4840a0b5882 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -282,7 +282,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient saveInternalSchema(table, instantTime, metadata); } // update Metadata table - writeTableMetadata(table, instantTime, commitActionType, metadata, writeStatuses); + writeTableMetadata(table, instantTime, metadata, writeStatuses); activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType, instantTime), Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); } @@ -351,25 +351,20 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient * * @param table {@link HoodieTable} of interest. * @param instantTime instant time of the commit. - * @param actionType action type of the commit. * @param metadata instance of {@link HoodieCommitMetadata}. * @param writeStatuses WriteStatuses for the completed action. */ - protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata, HoodieData<WriteStatus> writeStatuses) { - if (table.isTableServiceAction(actionType, instantTime)) { - tableServiceClient.writeTableMetadata(table, instantTime, actionType, metadata, writeStatuses); - } else { - context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table: " + config.getTableName()); - Option<HoodieTableMetadataWriter> metadataWriterOpt = table.getMetadataWriter(instantTime); - if (metadataWriterOpt.isPresent()) { - try (HoodieTableMetadataWriter metadataWriter = metadataWriterOpt.get()) { - metadataWriter.update(metadata, writeStatuses, instantTime); - } catch (Exception e) { - if (e instanceof HoodieException) { - throw (HoodieException) e; - } else { - throw new HoodieException("Failed to update metadata", e); - } + protected void writeTableMetadata(HoodieTable table, String instantTime, HoodieCommitMetadata metadata, HoodieData<WriteStatus> writeStatuses) { + context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table: " + config.getTableName()); + Option<HoodieTableMetadataWriter> metadataWriterOpt = table.getMetadataWriter(instantTime); + if (metadataWriterOpt.isPresent()) { + try (HoodieTableMetadataWriter metadataWriter = metadataWriterOpt.get()) { + metadataWriter.update(metadata, writeStatuses, instantTime); + } catch (Exception e) { + if (e instanceof HoodieException) { + throw (HoodieException) e; + } else { + throw new HoodieException("Failed to update metadata", e); } } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 12584be55a4..59fa69de2e6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -45,7 +45,6 @@ import org.apache.hudi.common.fs.OptimisticConsistencyGuard; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -59,7 +58,6 @@ import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.table.view.TableFileSystemView.SliceView; -import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.Functions; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; @@ -903,26 +901,6 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable { return getMetadataWriter(triggeringInstantTimestamp, EAGER); } - /** - * Check if action type is a table service. - * @param actionType action type of the instant - * @param instantTime instant time of the instant. - * @return true if action represents a table service. false otherwise. - */ - public boolean isTableServiceAction(String actionType, String instantTime) { - if (actionType.equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) { - Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = ClusteringUtils.getClusteringPlan(metaClient, new HoodieInstant(HoodieInstant.State.NIL, actionType, instantTime)); - // only clustering is table service with replace commit action - return instantPlan.isPresent(); - } else { - if (this.metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) { - return !actionType.equals(HoodieTimeline.COMMIT_ACTION); - } else { - return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION); - } - } - } - /** * Gets the metadata writer for async indexer. * diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java index 72f266fae55..68c32acca24 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java @@ -85,7 +85,7 @@ public class HoodieFlinkTableServiceClient<T> extends BaseHoodieTableServiceClie // commit to data table after committing to metadata table. // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. - writeTableMetadata(table, compactionCommitTime, compactionInstant.getAction(), metadata, context.emptyHoodieData()); + writeTableMetadata(table, compactionCommitTime, metadata, context.emptyHoodieData()); LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata); CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata); } finally { @@ -132,7 +132,7 @@ public class HoodieFlinkTableServiceClient<T> extends BaseHoodieTableServiceClie // commit to data table after committing to metadata table. // We take the lock here to ensure all writes to metadata table happens within a single lock (single writer). // Because more than one write to metadata table will result in conflicts since all of them updates the same partition. - writeTableMetadata(table, clusteringCommitTime, clusteringInstant.getAction(), metadata, writeStatuses.orElse(context.emptyHoodieData())); + writeTableMetadata(table, clusteringCommitTime, metadata, writeStatuses.orElse(context.emptyHoodieData())); LOG.info("Committing Clustering {} finished with result {}.", clusteringCommitTime, metadata); table.getActiveTimeline().transitionReplaceInflightToComplete( @@ -189,15 +189,6 @@ public class HoodieFlinkTableServiceClient<T> extends BaseHoodieTableServiceClie return HoodieFlinkTable.create(config, context); } - @Override - public void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata, HoodieData<WriteStatus> writeStatuses) { - try (HoodieBackedTableMetadataWriter metadataWriter = initMetadataWriter(Option.empty())) { - metadataWriter.update(metadata, writeStatuses, instantTime); - } catch (Exception e) { - throw new HoodieException("Failed to update metadata", e); - } - } - /** * Initialize the table metadata writer, for e.g, bootstrap the metadata table * from the filesystem if it does not exist. diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index b4763d4eef4..ed1a3408f67 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -311,11 +311,6 @@ public class HoodieFlinkWriteClient<T> extends } } - @Override - protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata, HoodieData<WriteStatus> writeStatuses) { - tableServiceClient.writeTableMetadata(table, instantTime, actionType, metadata, writeStatuses); - } - /** * Initialized the metadata table on start up, should only be called once on driver. */
