LiebingYu commented on code in PR #3131:
URL: https://github.com/apache/fluss/pull/3131#discussion_r3279504735


##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/TableManager.java:
##########
@@ -70,8 +70,6 @@ public void startup() {
         LOG.info("Start up table manager.");
         replicaStateMachine.startup();
         tableBucketStateMachine.startup();
-        // try to resume one deletion after start up
-        resumeDeletions();

Review Comment:
   I think we should keep this.



##########
fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java:
##########
@@ -430,6 +430,15 @@ public class ConfigOptions {
                             "The interval for cleaning up expired producer 
offsets "
                                     + "and orphan files in remote storage. 
Default is 1 hour.");
 
+    /** The interval at which the coordinator resumes in-progress table and 
partition deletions. */
+    public static final ConfigOption<Duration> 
COORDINATOR_RESUME_DELETION_INTERVAL =
+            key("coordinator.resume-deletion.interval")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(5))

Review Comment:
   5 seconds is too aggressive. This task is designed to handle exceptional 
cases — under normal circumstances, deletion will proceed as expected. I think 
an hour-level interval would be sufficient, for example, 6 hours.



##########
fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerTest.java:
##########
@@ -217,54 +216,6 @@ void testDeleteTable() throws Exception {
         
assertThat(coordinatorContext.getAllReplicasForTable(tableId)).isEmpty();
     }
 
-    @Test
-    void testResumeDeletionAfterRestart() throws Exception {

Review Comment:
   Keep this too.



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java:
##########
@@ -302,9 +314,16 @@ public void startup() {
 
         // start rebalance manager.
         rebalanceManager.startup();
+
+        resumeDeletionScheduledExecutor.scheduleWithFixedDelay(
+                () -> coordinatorEventManager.put(new ResumeDeletionEvent()),
+                resumeDeletionIntervalMs,
+                resumeDeletionIntervalMs,
+                TimeUnit.MILLISECONDS);
     }
 
     public void shutdown() {
+        resumeDeletionScheduledExecutor.shutdownNow();

Review Comment:
   Need graceful shutdown.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to