danny0405 commented on code in PR #6733:
URL: https://github.com/apache/hudi/pull/6733#discussion_r1009173949


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java:
##########
@@ -129,9 +128,6 @@ private void scheduleCompaction(HoodieFlinkTable<?> table, 
long checkpointId) th
       List<CompactionOperation> operations = 
compactionPlan.getOperations().stream()
           
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
       LOG.info("Execute compaction plan for instant {} as {} file groups", 
compactionInstantTime, operations.size());
-      WriteMarkersFactory
-          .get(table.getConfig().getMarkersType(), table, 
compactionInstantTime)
-          .deleteMarkerDir(table.getContext(), 
table.getConfig().getMarkersDeleteParallelism());

Review Comment:
   Did you notice that only the `REQUESTED` instant are scheduled here ? That 
means it is not a failed compaction (or the state would be `INFLIGHT`). And we 
did have some rollback logic in `CompactPlanOperator#open` and 
`CompactionCommitSink#commitIfNecessary`.
   
   We need to figure out how a marker dir exists with a `REQUESTED` instant on 
timeline first :)
   
   There is a question needed to be answered here: a duplicated marker file 
means also a duplicated data file (either complete or corrupted), the duplicate 
data file copy would cause the new data file creation exception because of file 
exists exception, how could we resolve that ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to