wuchong commented on code in PR #2570:
URL: https://github.com/apache/fluss/pull/2570#discussion_r2763135329


##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java:
##########
@@ -241,6 +241,7 @@ public void addStopReplicaRequestForTabletServers(
             Set<Integer> tabletServers,
             TableBucket tableBucket,
             boolean isDelete,
+            boolean deleteRemote,

Review Comment:
   rename `delete` to `deleteLocal`, and add comments to explain the parameters 
meaning. 



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java:
##########
@@ -255,7 +256,10 @@ public void addStopReplicaRequestForTabletServers(
                                             && 
stopBucketReplica.get(tableBucket).isDelete();
                             PbStopReplicaReqForBucket 
protoStopReplicaForBucket =
                                     makeStopBucketReplica(
-                                            tableBucket, alreadyDelete || 
isDelete, leaderEpoch);
+                                            tableBucket,
+                                            alreadyDelete || isDelete,
+                                            deleteRemote,

Review Comment:
   Do we need to reduce the `deleteRemote` like `isDelete`?



##########
fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java:
##########
@@ -497,10 +504,78 @@ void testStopReplica(boolean partitionTable) throws 
Exception {
         assertThatThrownBy(() -> 
remoteLogManager.relevantRemoteLogSegments(tb, 0L))
                 .isInstanceOf(IllegalStateException.class)
                 .hasMessageContaining("RemoteLogTablet can't be found for 
table-bucket " + tb);
-        FsPath logTabletDir = remoteLogTabletDir(remoteLogDir(conf), 
DATA1_PHYSICAL_TABLE_PATH, tb);
+        FsPath logTabletDir =
+                remoteLogTabletDir(
+                        remoteLogDir(conf),
+                        partitionTable
+                                ? DATA1_PHYSICAL_TABLE_PATH_PA_2024
+                                : DATA1_PHYSICAL_TABLE_PATH,
+                        tb);
         
assertThat(logTabletDir.getFileSystem().exists(logTabletDir)).isFalse();
     }
 
+    @ParameterizedTest
+    @MethodSource("stopArgs")
+    void testStopReplicaDeleteRemoteLog(boolean partitionTable, boolean 
deleteRemote)
+            throws Exception {
+        TableBucket tb = makeTableBucket(partitionTable);
+        // Need to make leader by ReplicaManager.
+        makeLogTableAsLeader(tb, partitionTable);
+        LogTablet logTablet = 
replicaManager.getReplicaOrException(tb).getLogTablet();
+        addMultiSegmentsToLogTablet(logTablet, 5);
+        // trigger RLMTask copy local log segment to remote and update 
metadata.
+        remoteLogTaskScheduler.triggerPeriodicScheduledTasks();
+        RemoteLogTablet remoteLog = remoteLogManager.remoteLogTablet(tb);

Review Comment:
   can remove



##########
fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerITCase.java:
##########
@@ -97,16 +118,141 @@ void testBuildClusterModel() throws Exception {
         
FLUSS_CLUSTER_EXTENSION.newCoordinatorClient().addServerTag(request).get();
 
         clusterModel = rebalanceManager.buildClusterModel();
-        assertThat(clusterModel.servers().size()).isEqualTo(3);
-        assertThat(clusterModel.aliveServers().size()).isEqualTo(2);
+        assertThat(clusterModel.servers().size()).isEqualTo(6);
+        assertThat(clusterModel.aliveServers().size()).isEqualTo(5);
         assertThat(clusterModel.offlineServers().size()).isEqualTo(1);
         assertThat(clusterModel.tables().size()).isEqualTo(2);
         assertThat(clusterModel.tables()).contains(tableId1, tableId2);
     }
 
+    @Test
+    void testRebalanceWithRemoteLog() throws Exception {
+        TableBucket tb = setupTableBucket();
+        long tableId = tb.getTableId();
+
+        int leaderId = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
+        TabletServerGateway leaderGateway =
+                FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leaderId);
+
+        // produce test records
+        produceRecordsAndWaitRemoteLogCopy(leaderGateway, tb, 0L);
+        // test metadata updated: verify manifest in metadata
+        TabletServer tabletServer = 
FLUSS_CLUSTER_EXTENSION.getTabletServerById(leaderId);
+        RemoteLogManager remoteLogManager = 
tabletServer.getReplicaManager().getRemoteLogManager();
+        RemoteLogTablet remoteLogTablet = remoteLogManager.remoteLogTablet(tb);
+
+        RemoteLogManifest manifest = remoteLogTablet.currentManifest();
+        
assertThat(manifest.getPhysicalTablePath().getTablePath()).isEqualTo(DATA1_TABLE_PATH);
+        assertThat(manifest.getTableBucket()).isEqualTo(tb);
+        assertThat(manifest.getRemoteLogSegmentList().size()).isGreaterThan(0);
+
+        // try to trigger rebalance. for example, if current assignment is [0, 
1, 2], try to trigger
+        // rebalance to [3, 4, 5]
+        RebalancePlanForBucket planForBucket = buildRebalancePlanForBucket(tb);
+        rebalanceManager.registerRebalance(
+                "test-rebalance-dsds",
+                Collections.singletonMap(tb, planForBucket),
+                RebalanceStatus.NOT_STARTED);
+
+        retry(
+                Duration.ofMinutes(2),
+                () -> {
+                    // assignment changed.
+                    Set<Integer> newReplicaSet = new 
HashSet<>(planForBucket.getNewReplicas());
+                    BucketAssignment bucketAssignment =
+                            zkClient.getTableAssignment(tableId)
+                                    .get()
+                                    .getBucketAssignment(tb.getBucket());
+                    List<Integer> replicas = bucketAssignment.getReplicas();
+                    
assertThat(newReplicaSet.size()).isEqualTo(replicas.size());
+                    assertThat(newReplicaSet.containsAll(replicas)).isTrue();
+
+                    // leader changed.
+                    int newLeader = planForBucket.getNewLeader();
+                    
assertThat(FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb)).isEqualTo(newLeader);
+
+                    // origin replicas all set to offline.
+                    List<Integer> originReplicas = 
planForBucket.getOriginReplicas();
+                    for (int originReplica : originReplicas) {
+                        TabletServer ts =
+                                
FLUSS_CLUSTER_EXTENSION.getTabletServerById(originReplica);
+                        ReplicaManager rm = ts.getReplicaManager();
+                        assertThat(rm.getReplica(tb))
+                                
.isInstanceOf(ReplicaManager.NoneReplica.class);
+                    }
+                });
+        // remote log not be deleted.
+        int newLeader = planForBucket.getNewLeader();
+        TabletServer leaderTs = 
FLUSS_CLUSTER_EXTENSION.getTabletServerById(newLeader);
+        RemoteLogManager leaderRm = 
leaderTs.getReplicaManager().getRemoteLogManager();
+        RemoteLogTablet leaderRlt = leaderRm.remoteLogTablet(tb);
+
+        RemoteLogManifest newManifest = leaderRlt.currentManifest();
+        
assertThat(newManifest.getPhysicalTablePath().getTablePath()).isEqualTo(DATA1_TABLE_PATH);
+        assertThat(newManifest.getTableBucket()).isEqualTo(tb);
+        
assertThat(newManifest.getRemoteLogSegmentList().size()).isGreaterThan(0);

Review Comment:
   assert the size is the same with preivous size?



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java:
##########
@@ -307,17 +308,36 @@ private void doHandleStateChanges(
                         replica -> doStateChange(replica, 
ReplicaState.OfflineReplica));
 
                 break;
+            case ReplicaMigrateSuccessful:
+                validReplicas.forEach(
+                        replica -> doStateChange(replica, 
ReplicaState.ReplicaMigrateSuccessful));
+                validReplicas.forEach(
+                        tableBucketReplica -> {
+                            int replicaServer = 
tableBucketReplica.getReplica();
+                            // send stop replica request with delete = true 
and deleteRemote = false
+                            // indicates the replica is migrated.
+                            
coordinatorRequestBatch.addStopReplicaRequestForTabletServers(
+                                    Collections.singleton(replicaServer),
+                                    tableBucketReplica.getTableBucket(),
+                                    true,
+                                    false,
+                                    coordinatorContext.getBucketLeaderEpoch(
+                                            
tableBucketReplica.getTableBucket()));
+                        });
+                break;
             case ReplicaDeletionStarted:
                 validReplicas.forEach(
                         replica -> doStateChange(replica, 
ReplicaState.ReplicaDeletionStarted));
-                // send stop replica request with delete = true
+                // send stop replica request with delete = true and delete = 
true indicates the

Review Comment:
   ```suggestion
                   // send stop replica request with deleteLocal = true and 
deleteRemote = true indicates the
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java:
##########
@@ -387,4 +388,9 @@ private static void shutdownAndAwaitTermination(
     public RemoteLogIndexCache getRemoteLogIndexCache() {
         return remoteLogIndexCache;
     }
+
+    @VisibleForTesting
+    public @Nullable TaskWithFuture getTaskWithFuture(TableBucket tableBucket) 
{

Review Comment:
   package visible is enough.



##########
fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java:
##########
@@ -716,11 +716,12 @@ public static List<NotifyLeaderAndIsrResultForBucket> 
getNotifyLeaderAndIsrRespo
     }
 
     public static PbStopReplicaReqForBucket makeStopBucketReplica(
-            TableBucket tableBucket, boolean isDelete, int leaderEpoch) {
+            TableBucket tableBucket, boolean isDelete, boolean deleteRemote, 
int leaderEpoch) {

Review Comment:
   rename `delete` to `deleteLocal`, and add comments to explain the parameters 
meaning. 



##########
fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java:
##########
@@ -497,10 +504,78 @@ void testStopReplica(boolean partitionTable) throws 
Exception {
         assertThatThrownBy(() -> 
remoteLogManager.relevantRemoteLogSegments(tb, 0L))
                 .isInstanceOf(IllegalStateException.class)
                 .hasMessageContaining("RemoteLogTablet can't be found for 
table-bucket " + tb);
-        FsPath logTabletDir = remoteLogTabletDir(remoteLogDir(conf), 
DATA1_PHYSICAL_TABLE_PATH, tb);
+        FsPath logTabletDir =
+                remoteLogTabletDir(
+                        remoteLogDir(conf),
+                        partitionTable
+                                ? DATA1_PHYSICAL_TABLE_PATH_PA_2024
+                                : DATA1_PHYSICAL_TABLE_PATH,
+                        tb);
         
assertThat(logTabletDir.getFileSystem().exists(logTabletDir)).isFalse();
     }
 
+    @ParameterizedTest
+    @MethodSource("stopArgs")
+    void testStopReplicaDeleteRemoteLog(boolean partitionTable, boolean 
deleteRemote)
+            throws Exception {
+        TableBucket tb = makeTableBucket(partitionTable);
+        // Need to make leader by ReplicaManager.
+        makeLogTableAsLeader(tb, partitionTable);
+        LogTablet logTablet = 
replicaManager.getReplicaOrException(tb).getLogTablet();
+        addMultiSegmentsToLogTablet(logTablet, 5);
+        // trigger RLMTask copy local log segment to remote and update 
metadata.
+        remoteLogTaskScheduler.triggerPeriodicScheduledTasks();
+        RemoteLogTablet remoteLog = remoteLogManager.remoteLogTablet(tb);
+        List<RemoteLogSegment> remoteLogSegmentList =
+                remoteLogManager.relevantRemoteLogSegments(tb, 0L);
+        assertThat(remoteLogSegmentList.size()).isEqualTo(4);
+        assertThat(listRemoteLogFiles(tb))
+                .isEqualTo(
+                        remoteLogSegmentList.stream()
+                                .map(s -> s.remoteLogSegmentId().toString())
+                                .collect(Collectors.toSet()));
+        assertThat(remoteLogManager.getTaskWithFuture(tb)).isNotNull();
+
+        FsPath logTabletDir =
+                remoteLogTabletDir(
+                        remoteLogDir(conf),
+                        partitionTable
+                                ? DATA1_PHYSICAL_TABLE_PATH_PA_2024
+                                : DATA1_PHYSICAL_TABLE_PATH,
+                        tb);
+        assertThat(logTabletDir.getFileSystem().exists(logTabletDir)).isTrue();
+
+        // stop with delete = false, deleteRemote =false, local and remote log 
should be kept,
+        // remote log task will be removed.
+        CompletableFuture<List<StopReplicaResultForBucket>> future1 = new 
CompletableFuture<>();
+        replicaManager.stopReplicas(
+                0,
+                Collections.singletonList(new StopReplicaData(tb, false, 
false, 0, 0)),
+                future1::complete);
+        assertThat(future1.get()).containsOnly(new 
StopReplicaResultForBucket(tb));
+        ReplicaManager.HostedReplica hostedReplica = 
replicaManager.getReplica(tb);
+        
assertThat(hostedReplica).isInstanceOf(ReplicaManager.OnlineReplica.class);
+        assertThat(logTabletDir.getFileSystem().exists(logTabletDir)).isTrue();
+        assertThat(remoteLogManager.getTaskWithFuture(tb)).isNull();
+
+        CompletableFuture<List<StopReplicaResultForBucket>> future2 = new 
CompletableFuture<>();
+        replicaManager.stopReplicas(
+                0,
+                Collections.singletonList(new StopReplicaData(tb, true, 
deleteRemote, 0, 0)),
+                future2::complete);
+        assertThat(future2.get()).containsOnly(new 
StopReplicaResultForBucket(tb));
+        hostedReplica = replicaManager.getReplica(tb);
+        
assertThat(hostedReplica).isInstanceOf(ReplicaManager.NoneReplica.class);
+        if (!deleteRemote) {

Review Comment:
   assert `logTablet.getLogDir()` is also deleted?



##########
fluss-server/src/main/java/org/apache/fluss/server/entity/StopReplicaData.java:
##########
@@ -24,13 +24,19 @@
 public class StopReplicaData {
     private final TableBucket tableBucket;
     private final boolean delete;
+    private final boolean deleteRemote;
     private final int coordinatorEpoch;
     private final int leaderEpoch;
 
     public StopReplicaData(
-            TableBucket tableBucket, boolean delete, int coordinatorEpoch, int 
leaderEpoch) {
+            TableBucket tableBucket,
+            boolean delete,
+            boolean deleteRemote,

Review Comment:
   rename `delete` to `deleteLocal`, and add comments to explain the parameters 
meaning. 



##########
fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java:
##########
@@ -739,6 +740,9 @@ public static List<StopReplicaData> 
getStopReplicaData(StopReplicaRequest reques
                     new StopReplicaData(
                             toTableBucket(tableBucket),
                             reqForBucket.isDelete(),
+                            reqForBucket.hasDeleteRemote()
+                                    ? reqForBucket.isDeleteRemote()
+                                    : reqForBucket.isDelete(),

Review Comment:
   add comment for the backward compatibility explanation. 



##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -900,6 +900,7 @@ message PbStopReplicaReqForBucket {
   required PbTableBucket table_bucket = 1;
   required int32 leader_epoch = 2;
   required bool delete = 3;
+  optional bool deleteRemote = 4;

Review Comment:
   Add a comment to clarify the behavioral difference between `delete` and 
`deleteRemote`:
   
   - `delete` removes **local replica data** (i.e., data stored on the current 
node).
   - `deleteRemote` deletes **remote bucket data** (e.g., data in object 
storage) and was introduced in **v0.9**.
   
   For backward compatibility, if a request does not include the `deleteRemote` 
flag, the system treats `delete` as `deleteRemote` (i.e., it falls back to 
remote deletion). This ensures older. CoordinatorServer continues to function 
correctly with newer TabletServers. 



##########
fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java:
##########
@@ -1715,6 +1716,7 @@ private void maybeShrinkIsr() {
     private StopReplicaResultForBucket stopReplica(
             TableBucket tb,
             boolean delete,
+            boolean deleteRemote,

Review Comment:
   Rename `delete` to `deleteLocal`, and add comment to explain the parameters. 



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaState.java:
##########
@@ -48,6 +48,14 @@ public Set<ReplicaState> getValidPreviousStates() {
             return EnumSet.of(NewReplica, OnlineReplica, OfflineReplica);
         }
     },
+    ReplicaMigrateSuccessful {

Review Comment:
   Rename the state to `ReplicaMigrationStarted`. This indicates that replica 
migration has been initiated for the replica, but has not yet completed 
successfully. Once the migration succeeds, the state should transition directly 
to `NonExistentReplica`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to