nsivabalan commented on code in PR #9371:
URL: https://github.com/apache/hudi/pull/9371#discussion_r1285121130
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -75,10 +83,13 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.isIndexingCommit;
-public abstract class BaseHoodieTableServiceClient<O> extends BaseHoodieClient
implements RunsTableService {
+public abstract class BaseHoodieTableServiceClient<I, T, O> extends
BaseHoodieClient implements RunsTableService {
Review Comment:
can we add java docs for the class, also calling out what I, T, O generally
refers to
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -285,7 +373,29 @@ public HoodieWriteMetadata<O> logCompact(String
logCompactionInstantTime) {
* Commit Log Compaction and track metrics.
*/
protected void completeLogCompaction(HoodieCommitMetadata metadata,
HoodieTable table, String logCompactionCommitTime) {
- throw new UnsupportedOperationException("Log compaction is not supported
yet.");
+ this.context.setJobStatus(this.getClass().getSimpleName(), "Collect log
compaction write status and commit compaction");
+ List<HoodieWriteStat> writeStats = metadata.getWriteStats();
+ final HoodieInstant logCompactionInstant = new
HoodieInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.LOG_COMPACTION_ACTION, logCompactionCommitTime);
Review Comment:
handleWriteErrors(writeStats, TableServiceType.LOG_COMPACT); ?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -146,7 +157,10 @@ protected void
setPendingInflightAndRequestedInstants(Set<String> pendingInfligh
* @param metadata commit metadata for which pre commit is being invoked.
*/
protected void preCommit(HoodieCommitMetadata metadata) {
- // To be overridden by specific engines to perform conflict resolution if
any.
+ // Create a Hoodie table after startTxn which encapsulated the commits and
files visible.
Review Comment:
do we even need to have this protected now that no engine is overriding it
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -244,12 +305,39 @@ public Option<String>
scheduleCompaction(Option<Map<String, String>> extraMetada
* @param metadata All the metadata that gets stored along with
a commit
* @param extraMetadata Extra Metadata to be stored
*/
- public abstract void commitCompaction(String compactionInstantTime,
HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata);
+ public void commitCompaction(String compactionInstantTime,
HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
+ extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
+ completeCompaction(metadata, createTable(config,
context.getHadoopConf().get()), compactionInstantTime);
+ }
/**
* Commit Compaction and track metrics.
*/
- protected abstract void completeCompaction(HoodieCommitMetadata metadata,
HoodieTable table, String compactionCommitTime);
+ protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable
table, String compactionCommitTime) {
+ this.context.setJobStatus(this.getClass().getSimpleName(), "Collect
compaction write status and commit compaction: " + config.getTableName());
+ List<HoodieWriteStat> writeStats = metadata.getWriteStats();
+ handleWriteErrors(writeStats, TableServiceType.COMPACT);
+ final HoodieInstant compactionInstant =
HoodieTimeline.getCompactionInflightInstant(compactionCommitTime);
+ try {
+ this.txnManager.beginTransaction(Option.of(compactionInstant),
Option.empty());
+ finalizeWrite(table, compactionCommitTime, writeStats);
+ // commit to data table after committing to metadata table.
+ updateTableMetadata(table, metadata, compactionInstant,
context.emptyHoodieData());
Review Comment:
again, in flink this was named as writeTableMetadata, while in spark its
updateTableMetadata and may be there are reasons for it. can we ensure we don't
change anything for flink.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -974,4 +1159,20 @@ public void close() {
// Stop timeline-server if running
super.close();
}
+
+ protected void updateTableMetadata(HoodieTable table, HoodieCommitMetadata
commitMetadata,
+ HoodieInstant hoodieInstant,
+ HoodieData<WriteStatus> writeStatuses) {
+ // Do not do any conflict resolution here as we do with regular writes. We
take the lock here to ensure all writes to metadata table happens within a
+ // single lock (single writer). Because more than one write to metadata
table will result in conflicts since all of them updates the same partition.
+ table.getMetadataWriter(hoodieInstant.getTimestamp())
+ .ifPresent(writer -> ((HoodieTableMetadataWriter)
writer).update(commitMetadata, writeStatuses, hoodieInstant.getTimestamp()));
+ }
+
+ protected void handleWriteErrors(List<HoodieWriteStat> writeStats,
TableServiceType tableServiceType) {
Review Comment:
this was never invoked in FlinkTableServiceClient.
lets try to keep the functionality as is.
atleast lets override the method and make it no-op. lets not change any
functionality un-intentionally
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java:
##########
@@ -243,7 +169,31 @@ public HoodieWriteMetadata<List<WriteStatus>>
cluster(String clusteringInstant,
}
@Override
- protected HoodieTable<?, ?, ?, ?> createTable(HoodieWriteConfig config,
Configuration hadoopConf) {
+ protected void
validateClusteringCommit(HoodieWriteMetadata<List<WriteStatus>>
clusteringMetadata, String clusteringCommitTime, HoodieTable table) {
Review Comment:
clustering is not supported in flink. we should just throw exception here
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -174,11 +188,43 @@ private void inlineCompaction(HoodieTable table,
Option<Map<String, String>> ext
/**
* Ensures compaction instant is in expected state and performs Log
Compaction for the workload stored in instant-time.s
*
- * @param compactionInstantTime Compaction Instant Time
+ * @param logCompactionInstantTime Compaction Instant Time
* @return Collection of Write Status
*/
- protected HoodieWriteMetadata<O> logCompact(String compactionInstantTime,
boolean shouldComplete) {
- throw new UnsupportedOperationException("Log compaction is not supported
yet.");
+ protected HoodieWriteMetadata<O> logCompact(String logCompactionInstantTime,
boolean shouldComplete) {
Review Comment:
can we revisit all access specifiers and remove protected if its not
required. later if we want to override in any of the engines, we can make them
protected.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -285,7 +373,29 @@ public HoodieWriteMetadata<O> logCompact(String
logCompactionInstantTime) {
* Commit Log Compaction and track metrics.
*/
protected void completeLogCompaction(HoodieCommitMetadata metadata,
HoodieTable table, String logCompactionCommitTime) {
- throw new UnsupportedOperationException("Log compaction is not supported
yet.");
+ this.context.setJobStatus(this.getClass().getSimpleName(), "Collect log
compaction write status and commit compaction");
+ List<HoodieWriteStat> writeStats = metadata.getWriteStats();
+ final HoodieInstant logCompactionInstant = new
HoodieInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.LOG_COMPACTION_ACTION, logCompactionCommitTime);
+ try {
+ this.txnManager.beginTransaction(Option.of(logCompactionInstant),
Option.empty());
+ preCommit(metadata);
+ finalizeWrite(table, logCompactionCommitTime, writeStats);
+ // commit to data table after committing to metadata table.
+ writeTableMetadata(table, logCompactionCommitTime,
HoodieTimeline.LOG_COMPACTION_ACTION, metadata, context.emptyHoodieData());
Review Comment:
same here there are minor differences b/w spark and flink.
##########
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaTableServiceClient.java:
##########
@@ -42,27 +48,31 @@ protected HoodieJavaTableServiceClient(HoodieEngineContext
context,
}
@Override
- protected HoodieWriteMetadata<List<WriteStatus>> compact(String
compactionInstantTime, boolean shouldComplete) {
- throw new HoodieNotSupportedException("Compact is not supported in
HoodieJavaTableServiceClient");
+ protected void
validateClusteringCommit(HoodieWriteMetadata<List<WriteStatus>>
clusteringMetadata, String clusteringCommitTime, HoodieTable table) {
Review Comment:
we can probably keep this in BaseHoodieTableServiceClient only and leave it
as protected. so that if any engine implements cluster, can use this method to
validate.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]