nsivabalan commented on code in PR #9371:
URL: https://github.com/apache/hudi/pull/9371#discussion_r1285121130


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -75,10 +83,13 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
 import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.isIndexingCommit;
 
-public abstract class BaseHoodieTableServiceClient<O> extends BaseHoodieClient 
implements RunsTableService {
+public abstract class BaseHoodieTableServiceClient<I, T, O> extends 
BaseHoodieClient implements RunsTableService {

Review Comment:
   can we add java docs for the class, also calling out what I, T, O generally 
refers to



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -285,7 +373,29 @@ public HoodieWriteMetadata<O> logCompact(String 
logCompactionInstantTime) {
    * Commit Log Compaction and track metrics.
    */
   protected void completeLogCompaction(HoodieCommitMetadata metadata, 
HoodieTable table, String logCompactionCommitTime) {
-    throw new UnsupportedOperationException("Log compaction is not supported 
yet.");
+    this.context.setJobStatus(this.getClass().getSimpleName(), "Collect log 
compaction write status and commit compaction");
+    List<HoodieWriteStat> writeStats = metadata.getWriteStats();
+    final HoodieInstant logCompactionInstant = new 
HoodieInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.LOG_COMPACTION_ACTION, logCompactionCommitTime);

Review Comment:
    handleWriteErrors(writeStats, TableServiceType.LOG_COMPACT); ?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -146,7 +157,10 @@ protected void 
setPendingInflightAndRequestedInstants(Set<String> pendingInfligh
    * @param metadata commit metadata for which pre commit is being invoked.
    */
   protected void preCommit(HoodieCommitMetadata metadata) {
-    // To be overridden by specific engines to perform conflict resolution if 
any.
+    // Create a Hoodie table after startTxn which encapsulated the commits and 
files visible.

Review Comment:
   do we even need to have this protected now that no engine is overriding it



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -244,12 +305,39 @@ public Option<String> 
scheduleCompaction(Option<Map<String, String>> extraMetada
    * @param metadata              All the metadata that gets stored along with 
a commit
    * @param extraMetadata         Extra Metadata to be stored
    */
-  public abstract void commitCompaction(String compactionInstantTime, 
HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata);
+  public void commitCompaction(String compactionInstantTime, 
HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
+    extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
+    completeCompaction(metadata, createTable(config, 
context.getHadoopConf().get()), compactionInstantTime);
+  }
 
   /**
    * Commit Compaction and track metrics.
    */
-  protected abstract void completeCompaction(HoodieCommitMetadata metadata, 
HoodieTable table, String compactionCommitTime);
+  protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable 
table, String compactionCommitTime) {
+    this.context.setJobStatus(this.getClass().getSimpleName(), "Collect 
compaction write status and commit compaction: " + config.getTableName());
+    List<HoodieWriteStat> writeStats = metadata.getWriteStats();
+    handleWriteErrors(writeStats, TableServiceType.COMPACT);
+    final HoodieInstant compactionInstant = 
HoodieTimeline.getCompactionInflightInstant(compactionCommitTime);
+    try {
+      this.txnManager.beginTransaction(Option.of(compactionInstant), 
Option.empty());
+      finalizeWrite(table, compactionCommitTime, writeStats);
+      // commit to data table after committing to metadata table.
+      updateTableMetadata(table, metadata, compactionInstant, 
context.emptyHoodieData());

Review Comment:
   again, in flink this was named as writeTableMetadata, while in spark its 
updateTableMetadata and may be there are reasons for it. can we ensure we don't 
change anything for flink. 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -974,4 +1159,20 @@ public void close() {
     // Stop timeline-server if running
     super.close();
   }
+
+  protected void updateTableMetadata(HoodieTable table, HoodieCommitMetadata 
commitMetadata,
+                                   HoodieInstant hoodieInstant,
+                                   HoodieData<WriteStatus> writeStatuses) {
+    // Do not do any conflict resolution here as we do with regular writes. We 
take the lock here to ensure all writes to metadata table happens within a
+    // single lock (single writer). Because more than one write to metadata 
table will result in conflicts since all of them updates the same partition.
+    table.getMetadataWriter(hoodieInstant.getTimestamp())
+        .ifPresent(writer -> ((HoodieTableMetadataWriter) 
writer).update(commitMetadata, writeStatuses, hoodieInstant.getTimestamp()));
+  }
+
+  protected void handleWriteErrors(List<HoodieWriteStat> writeStats, 
TableServiceType tableServiceType) {

Review Comment:
   this was never invoked in FlinkTableServiceClient. 
   lets try to keep the functionality as is. 
   atleast lets override the method and make it no-op. lets not change any 
functionality un-intentionally 



##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java:
##########
@@ -243,7 +169,31 @@ public HoodieWriteMetadata<List<WriteStatus>> 
cluster(String clusteringInstant,
   }
 
   @Override
-  protected HoodieTable<?, ?, ?, ?> createTable(HoodieWriteConfig config, 
Configuration hadoopConf) {
+  protected void 
validateClusteringCommit(HoodieWriteMetadata<List<WriteStatus>> 
clusteringMetadata, String clusteringCommitTime, HoodieTable table) {

Review Comment:
   clustering is not supported in flink. we should just throw exception here



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -174,11 +188,43 @@ private void inlineCompaction(HoodieTable table, 
Option<Map<String, String>> ext
   /**
    * Ensures compaction instant is in expected state and performs Log 
Compaction for the workload stored in instant-time.s
    *
-   * @param compactionInstantTime Compaction Instant Time
+   * @param logCompactionInstantTime Compaction Instant Time
    * @return Collection of Write Status
    */
-  protected HoodieWriteMetadata<O> logCompact(String compactionInstantTime, 
boolean shouldComplete) {
-    throw new UnsupportedOperationException("Log compaction is not supported 
yet.");
+  protected HoodieWriteMetadata<O> logCompact(String logCompactionInstantTime, 
boolean shouldComplete) {

Review Comment:
   can we revisit all access specifiers and remove protected if its not 
required. later if we want to override in any of the engines, we can make them 
protected. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -285,7 +373,29 @@ public HoodieWriteMetadata<O> logCompact(String 
logCompactionInstantTime) {
    * Commit Log Compaction and track metrics.
    */
   protected void completeLogCompaction(HoodieCommitMetadata metadata, 
HoodieTable table, String logCompactionCommitTime) {
-    throw new UnsupportedOperationException("Log compaction is not supported 
yet.");
+    this.context.setJobStatus(this.getClass().getSimpleName(), "Collect log 
compaction write status and commit compaction");
+    List<HoodieWriteStat> writeStats = metadata.getWriteStats();
+    final HoodieInstant logCompactionInstant = new 
HoodieInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.LOG_COMPACTION_ACTION, logCompactionCommitTime);
+    try {
+      this.txnManager.beginTransaction(Option.of(logCompactionInstant), 
Option.empty());
+      preCommit(metadata);
+      finalizeWrite(table, logCompactionCommitTime, writeStats);
+      // commit to data table after committing to metadata table.
+      writeTableMetadata(table, logCompactionCommitTime, 
HoodieTimeline.LOG_COMPACTION_ACTION, metadata, context.emptyHoodieData());

Review Comment:
   same here there are minor differences b/w spark and flink. 



##########
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaTableServiceClient.java:
##########
@@ -42,27 +48,31 @@ protected HoodieJavaTableServiceClient(HoodieEngineContext 
context,
   }
 
   @Override
-  protected HoodieWriteMetadata<List<WriteStatus>> compact(String 
compactionInstantTime, boolean shouldComplete) {
-    throw new HoodieNotSupportedException("Compact is not supported in 
HoodieJavaTableServiceClient");
+  protected void 
validateClusteringCommit(HoodieWriteMetadata<List<WriteStatus>> 
clusteringMetadata, String clusteringCommitTime, HoodieTable table) {

Review Comment:
   we can probably keep this in BaseHoodieTableServiceClient only and leave it 
as protected. so that if any engine implements cluster, can use this method to 
validate. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to