This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b3ef321ec8e [HUDI-6704] Fix Flink metadata table update (#9456)
b3ef321ec8e is described below
commit b3ef321ec8ee7ca07a4e83aa14c0c8b0af0eb561
Author: Danny Chan <[email protected]>
AuthorDate: Thu Aug 17 09:06:00 2023 +0800
[HUDI-6704] Fix Flink metadata table update (#9456)
---
.../hudi/client/BaseHoodieTableServiceClient.java | 11 +++-----
.../apache/hudi/client/BaseHoodieWriteClient.java | 29 +++++++++-------------
.../java/org/apache/hudi/table/HoodieTable.java | 22 ----------------
.../hudi/client/HoodieFlinkTableServiceClient.java | 13 ++--------
.../apache/hudi/client/HoodieFlinkWriteClient.java | 5 ----
5 files changed, 18 insertions(+), 62 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index 7e78bddd875..0af2ace25f0 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -86,7 +86,6 @@ import java.util.stream.Stream;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
-import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.metadata.HoodieTableMetadata.isMetadataTable;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.isIndexingCommit;
@@ -329,7 +328,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
this.txnManager.beginTransaction(Option.of(compactionInstant),
Option.empty());
finalizeWrite(table, compactionCommitTime, writeStats);
// commit to data table after committing to metadata table.
- writeTableMetadata(table, compactionCommitTime, COMPACTION_ACTION,
metadata, context.emptyHoodieData());
+ writeTableMetadata(table, compactionCommitTime, metadata,
context.emptyHoodieData());
LOG.info("Committing Compaction " + compactionCommitTime + ". Finished
with result " + metadata);
CompactHelpers.getInstance().completeInflightCompaction(table,
compactionCommitTime, metadata);
} finally {
@@ -389,7 +388,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
preCommit(metadata);
finalizeWrite(table, logCompactionCommitTime, writeStats);
// commit to data table after committing to metadata table.
- writeTableMetadata(table, logCompactionCommitTime,
HoodieTimeline.LOG_COMPACTION_ACTION, metadata, context.emptyHoodieData());
+ writeTableMetadata(table, logCompactionCommitTime, metadata,
context.emptyHoodieData());
LOG.info("Committing Log Compaction " + logCompactionCommitTime + ".
Finished with result " + metadata);
CompactHelpers.getInstance().completeInflightLogCompaction(table,
logCompactionCommitTime, metadata);
} finally {
@@ -496,7 +495,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
preCommit(metadata);
}
// Update table's metadata (table)
- writeTableMetadata(table, clusteringInstant.getTimestamp(),
clusteringInstant.getAction(), metadata,
writeStatuses.orElse(context.emptyHoodieData()));
+ writeTableMetadata(table, clusteringInstant.getTimestamp(), metadata,
writeStatuses.orElse(context.emptyHoodieData()));
LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished
with result " + metadata);
@@ -692,12 +691,10 @@ public abstract class BaseHoodieTableServiceClient<I, T,
O> extends BaseHoodieCl
*
* @param table {@link HoodieTable} of interest.
* @param instantTime instant time of the commit.
- * @param actionType action type of the commit.
* @param metadata instance of {@link HoodieCommitMetadata}.
* @param writeStatuses Write statuses of the commit
*/
- protected void writeTableMetadata(HoodieTable table, String instantTime,
String actionType, HoodieCommitMetadata metadata, HoodieData<WriteStatus>
writeStatuses) {
- checkArgument(table.isTableServiceAction(actionType, instantTime),
String.format("Unsupported action: %s.%s is not table service.", actionType,
instantTime));
+ protected void writeTableMetadata(HoodieTable table, String instantTime,
HoodieCommitMetadata metadata, HoodieData<WriteStatus> writeStatuses) {
context.setJobStatus(this.getClass().getSimpleName(), "Committing to
metadata table: " + config.getTableName());
Option<HoodieTableMetadataWriter> metadataWriterOpt =
table.getMetadataWriter(instantTime);
if (metadataWriterOpt.isPresent()) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 6b03c5234f0..4840a0b5882 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -282,7 +282,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
saveInternalSchema(table, instantTime, metadata);
}
// update Metadata table
- writeTableMetadata(table, instantTime, commitActionType, metadata,
writeStatuses);
+ writeTableMetadata(table, instantTime, metadata, writeStatuses);
activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType,
instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
}
@@ -351,25 +351,20 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
*
* @param table {@link HoodieTable} of interest.
* @param instantTime instant time of the commit.
- * @param actionType action type of the commit.
* @param metadata instance of {@link HoodieCommitMetadata}.
* @param writeStatuses WriteStatuses for the completed action.
*/
- protected void writeTableMetadata(HoodieTable table, String instantTime,
String actionType, HoodieCommitMetadata metadata, HoodieData<WriteStatus>
writeStatuses) {
- if (table.isTableServiceAction(actionType, instantTime)) {
- tableServiceClient.writeTableMetadata(table, instantTime, actionType,
metadata, writeStatuses);
- } else {
- context.setJobStatus(this.getClass().getSimpleName(), "Committing to
metadata table: " + config.getTableName());
- Option<HoodieTableMetadataWriter> metadataWriterOpt =
table.getMetadataWriter(instantTime);
- if (metadataWriterOpt.isPresent()) {
- try (HoodieTableMetadataWriter metadataWriter =
metadataWriterOpt.get()) {
- metadataWriter.update(metadata, writeStatuses, instantTime);
- } catch (Exception e) {
- if (e instanceof HoodieException) {
- throw (HoodieException) e;
- } else {
- throw new HoodieException("Failed to update metadata", e);
- }
+ protected void writeTableMetadata(HoodieTable table, String instantTime,
HoodieCommitMetadata metadata, HoodieData<WriteStatus> writeStatuses) {
+ context.setJobStatus(this.getClass().getSimpleName(), "Committing to
metadata table: " + config.getTableName());
+ Option<HoodieTableMetadataWriter> metadataWriterOpt =
table.getMetadataWriter(instantTime);
+ if (metadataWriterOpt.isPresent()) {
+ try (HoodieTableMetadataWriter metadataWriter = metadataWriterOpt.get())
{
+ metadataWriter.update(metadata, writeStatuses, instantTime);
+ } catch (Exception e) {
+ if (e instanceof HoodieException) {
+ throw (HoodieException) e;
+ } else {
+ throw new HoodieException("Failed to update metadata", e);
}
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 12584be55a4..59fa69de2e6 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -45,7 +45,6 @@ import org.apache.hudi.common.fs.OptimisticConsistencyGuard;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -59,7 +58,6 @@ import
org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
-import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
@@ -903,26 +901,6 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
return getMetadataWriter(triggeringInstantTimestamp, EAGER);
}
- /**
- * Check if action type is a table service.
- * @param actionType action type of the instant
- * @param instantTime instant time of the instant.
- * @return true if action represents a table service. false otherwise.
- */
- public boolean isTableServiceAction(String actionType, String instantTime) {
- if (actionType.equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
- Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan =
ClusteringUtils.getClusteringPlan(metaClient, new
HoodieInstant(HoodieInstant.State.NIL, actionType, instantTime));
- // only clustering is table service with replace commit action
- return instantPlan.isPresent();
- } else {
- if (this.metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) {
- return !actionType.equals(HoodieTimeline.COMMIT_ACTION);
- } else {
- return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION);
- }
- }
- }
-
/**
* Gets the metadata writer for async indexer.
*
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
index 72f266fae55..68c32acca24 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
@@ -85,7 +85,7 @@ public class HoodieFlinkTableServiceClient<T> extends
BaseHoodieTableServiceClie
// commit to data table after committing to metadata table.
// Do not do any conflict resolution here as we do with regular writes.
We take the lock here to ensure all writes to metadata table happens within a
// single lock (single writer). Because more than one write to metadata
table will result in conflicts since all of them updates the same partition.
- writeTableMetadata(table, compactionCommitTime,
compactionInstant.getAction(), metadata, context.emptyHoodieData());
+ writeTableMetadata(table, compactionCommitTime, metadata,
context.emptyHoodieData());
LOG.info("Committing Compaction {} finished with result {}.",
compactionCommitTime, metadata);
CompactHelpers.getInstance().completeInflightCompaction(table,
compactionCommitTime, metadata);
} finally {
@@ -132,7 +132,7 @@ public class HoodieFlinkTableServiceClient<T> extends
BaseHoodieTableServiceClie
// commit to data table after committing to metadata table.
// We take the lock here to ensure all writes to metadata table happens
within a single lock (single writer).
// Because more than one write to metadata table will result in
conflicts since all of them updates the same partition.
- writeTableMetadata(table, clusteringCommitTime,
clusteringInstant.getAction(), metadata,
writeStatuses.orElse(context.emptyHoodieData()));
+ writeTableMetadata(table, clusteringCommitTime, metadata,
writeStatuses.orElse(context.emptyHoodieData()));
LOG.info("Committing Clustering {} finished with result {}.",
clusteringCommitTime, metadata);
table.getActiveTimeline().transitionReplaceInflightToComplete(
@@ -189,15 +189,6 @@ public class HoodieFlinkTableServiceClient<T> extends
BaseHoodieTableServiceClie
return HoodieFlinkTable.create(config, context);
}
- @Override
- public void writeTableMetadata(HoodieTable table, String instantTime, String
actionType, HoodieCommitMetadata metadata, HoodieData<WriteStatus>
writeStatuses) {
- try (HoodieBackedTableMetadataWriter metadataWriter =
initMetadataWriter(Option.empty())) {
- metadataWriter.update(metadata, writeStatuses, instantTime);
- } catch (Exception e) {
- throw new HoodieException("Failed to update metadata", e);
- }
- }
-
/**
* Initialize the table metadata writer, for e.g, bootstrap the metadata
table
* from the filesystem if it does not exist.
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index b4763d4eef4..ed1a3408f67 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -311,11 +311,6 @@ public class HoodieFlinkWriteClient<T> extends
}
}
- @Override
- protected void writeTableMetadata(HoodieTable table, String instantTime,
String actionType, HoodieCommitMetadata metadata, HoodieData<WriteStatus>
writeStatuses) {
- tableServiceClient.writeTableMetadata(table, instantTime, actionType,
metadata, writeStatuses);
- }
-
/**
* Initialized the metadata table on start up, should only be called once on
driver.
*/