nsivabalan commented on code in PR #10965:
URL: https://github.com/apache/hudi/pull/10965#discussion_r1566489716


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -1135,8 +1138,36 @@ protected void 
completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable
    */
   protected HoodieWriteMetadata<O> compact(String compactionInstantTime, 
boolean shouldComplete) {
     HoodieTable table = createTable(config, context.getHadoopConf().get());
+    Option<HoodieInstant> instantToCompactOption = 
Option.fromJavaOptional(table.getActiveTimeline()
+        .filterCompletedAndCompactionInstants()
+        .getInstants()
+        .stream()
+        .filter(instant -> 
HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), compactionInstantTime))
+        .findFirst());
+    try {
+      // Transaction serves to ensure only one compact job for this instant 
will start heartbeat, and any other concurrent
+      // compact job will abort if they attempt to execute compact before 
heartbeat expires
+      // Note that as long as all jobs for this table use this API for 
compact, then this alone should prevent
+      // compact rollbacks from running concurrently to compact commits.
+      txnManager.beginTransaction(instantToCompactOption, 
txnManager.getLastCompletedTransactionOwner());

Review Comment:
   1. yeah. After reading Kishan's response, i feel we should fail the 
execution if compaction is being currently attempted by another concurrent 
writer.  
   
   2. Even on REQUESTED: since we are taking a lock and checking for heart beat 
client, wouldn't that ensure only one writer can proceed and the other writer 
will fail. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -1135,8 +1138,34 @@ protected void 
completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable
    */
   protected HoodieWriteMetadata<O> compact(String compactionInstantTime, 
boolean shouldComplete) {
     HoodieTable table = createTable(config, context.getHadoopConf().get());
+    Option<HoodieInstant> instantToCompactOption = 
Option.fromJavaOptional(table.getActiveTimeline()
+        .filterCompletedAndCompactionInstants()
+        .getInstants()
+        .stream()
+        .filter(instant -> 
HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), compactionInstantTime))
+        .findFirst());
+    try {
+      // Transaction serves to ensure only one compact job for this instant 
will start heartbeat, and any other concurrent
+      // compact job will abort if they attempt to execute compact before 
heartbeat expires
+      // Note that as long as all jobs for this table use this API for 
compact, then this alone should prevent
+      // compact rollbacks from running concurrently to compact commits.
+      txnManager.beginTransaction(instantToCompactOption, 
txnManager.getLastCompletedTransactionOwner());
+      try {
+        if (!this.heartbeatClient.isHeartbeatExpired(compactionInstantTime)) {
+          throw new HoodieLockException("Cannot compact instant " + 
compactionInstantTime + " due to heartbeat by existing job");
+        }
+      } catch (IOException e) {
+        throw new HoodieHeartbeatException("Error accessing heartbeat of 
instant to compact " + compactionInstantTime, e);
+      }
+      this.heartbeatClient.start(compactionInstantTime);
+    } finally {
+      txnManager.endTransaction(txnManager.getCurrentTransactionOwner());
+    }
     preWrite(compactionInstantTime, WriteOperationType.COMPACT, 
table.getMetaClient());
-    return tableServiceClient.compact(compactionInstantTime, shouldComplete);
+    HoodieWriteMetadata compactMetadata = 
tableServiceClient.compact(compactionInstantTime, shouldComplete);
+    this.heartbeatClient.stop(compactionInstantTime, true);

Review Comment:
   yeah. I see your point. probably every caller when calling stop, should 
remove it from map. 
   looks like we never remove any entry from instantToHeartbeatMap as per 
master (except shutting down entire HeartbeatClient). 
   @n3nash : any pointers on this regard? 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -1135,8 +1137,34 @@ protected void 
completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable
    */
   protected HoodieWriteMetadata<O> compact(String compactionInstantTime, 
boolean shouldComplete) {
     HoodieTable table = createTable(config, context.getHadoopConf().get());
+    Option<HoodieInstant> instantToCompactOption = 
Option.fromJavaOptional(table.getActiveTimeline()
+        .filterCompletedAndCompactionInstants()
+        .getInstants()
+        .stream()
+        .filter(instant -> 
HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), compactionInstantTime))
+        .findFirst());
+    try {
+      // Transaction serves to ensure only one compact job for this instant 
will start heartbeat, and any other concurrent
+      // compact job will abort if they attempt to execute compact before 
heartbeat expires
+      // Note that as long as all jobs for this table use this API for 
compact, then this alone should prevent
+      // compact rollbacks from running concurrently to compact commits.
+      txnManager.beginTransaction(instantToCompactOption, 
txnManager.getLastCompletedTransactionOwner());
+      try {
+        if (!this.heartbeatClient.isHeartbeatExpired(compactionInstantTime)) {
+          throw new HoodieLockException("Cannot compact instant " + 
compactionInstantTime + " due to heartbeat by existing job");
+        }
+      } catch (IOException e) {
+        throw new HoodieHeartbeatException("Error accessing heartbeat of 
instant to compact " + compactionInstantTime, e);
+      }
+      this.heartbeatClient.start(compactionInstantTime);
+    } finally {
+      txnManager.endTransaction(txnManager.getCurrentTransactionOwner());
+    }
     preWrite(compactionInstantTime, WriteOperationType.COMPACT, 
table.getMetaClient());
-    return tableServiceClient.compact(compactionInstantTime, shouldComplete);
+    HoodieWriteMetadata compactMetadata = 
tableServiceClient.compact(compactionInstantTime, shouldComplete);
+    this.heartbeatClient.stop(compactionInstantTime, true);

Review Comment:
   should you move heartBeatCiient.stop to finally 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -1156,8 +1184,31 @@ protected Option<String> 
inlineScheduleCompaction(Option<Map<String, String>> ex
    */
   protected HoodieWriteMetadata<O> logCompact(String logCompactionInstantTime, 
boolean shouldComplete) {
     HoodieTable table = createTable(config, context.getHadoopConf().get());
+    try {
+      // Transaction serves to ensure only one logcompact job for this instant 
will start heartbeat, and any other concurrent
+      // logcompact job will abort if they attempt to execute logcompact 
before heartbeat expires

Review Comment:
   there is lot of duplicated code b/w regular compaction and log compaction. 
   may be in a follow up PR, we should try to dedup the code



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -1135,8 +1138,36 @@ protected void 
completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable
    */
   protected HoodieWriteMetadata<O> compact(String compactionInstantTime, 
boolean shouldComplete) {
     HoodieTable table = createTable(config, context.getHadoopConf().get());
+    Option<HoodieInstant> instantToCompactOption = 
Option.fromJavaOptional(table.getActiveTimeline()
+        .filterCompletedAndCompactionInstants()
+        .getInstants()
+        .stream()
+        .filter(instant -> 
HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), compactionInstantTime))
+        .findFirst());
+    try {
+      // Transaction serves to ensure only one compact job for this instant 
will start heartbeat, and any other concurrent
+      // compact job will abort if they attempt to execute compact before 
heartbeat expires
+      // Note that as long as all jobs for this table use this API for 
compact, then this alone should prevent
+      // compact rollbacks from running concurrently to compact commits.
+      txnManager.beginTransaction(instantToCompactOption, 
txnManager.getLastCompletedTransactionOwner());

Review Comment:
   hey Kishan, in your example, why would (3) not see heartbeat? we are taking 
a block and start emitting heart beats right. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to