wuchong commented on code in PR #2570:
URL: https://github.com/apache/fluss/pull/2570#discussion_r2763135329
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java:
##########
@@ -241,6 +241,7 @@ public void addStopReplicaRequestForTabletServers(
Set<Integer> tabletServers,
TableBucket tableBucket,
boolean isDelete,
+ boolean deleteRemote,
Review Comment:
rename `delete` to `deleteLocal`, and add comments to explain the parameters
meaning.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java:
##########
@@ -255,7 +256,10 @@ public void addStopReplicaRequestForTabletServers(
&&
stopBucketReplica.get(tableBucket).isDelete();
PbStopReplicaReqForBucket
protoStopReplicaForBucket =
makeStopBucketReplica(
- tableBucket, alreadyDelete ||
isDelete, leaderEpoch);
+ tableBucket,
+ alreadyDelete || isDelete,
+ deleteRemote,
Review Comment:
Do we need to reduce the `deleteRemote` like `isDelete`?
##########
fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java:
##########
@@ -497,10 +504,78 @@ void testStopReplica(boolean partitionTable) throws
Exception {
assertThatThrownBy(() ->
remoteLogManager.relevantRemoteLogSegments(tb, 0L))
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("RemoteLogTablet can't be found for
table-bucket " + tb);
- FsPath logTabletDir = remoteLogTabletDir(remoteLogDir(conf),
DATA1_PHYSICAL_TABLE_PATH, tb);
+ FsPath logTabletDir =
+ remoteLogTabletDir(
+ remoteLogDir(conf),
+ partitionTable
+ ? DATA1_PHYSICAL_TABLE_PATH_PA_2024
+ : DATA1_PHYSICAL_TABLE_PATH,
+ tb);
assertThat(logTabletDir.getFileSystem().exists(logTabletDir)).isFalse();
}
+ @ParameterizedTest
+ @MethodSource("stopArgs")
+ void testStopReplicaDeleteRemoteLog(boolean partitionTable, boolean
deleteRemote)
+ throws Exception {
+ TableBucket tb = makeTableBucket(partitionTable);
+ // Need to make leader by ReplicaManager.
+ makeLogTableAsLeader(tb, partitionTable);
+ LogTablet logTablet =
replicaManager.getReplicaOrException(tb).getLogTablet();
+ addMultiSegmentsToLogTablet(logTablet, 5);
+ // trigger RLMTask copy local log segment to remote and update
metadata.
+ remoteLogTaskScheduler.triggerPeriodicScheduledTasks();
+ RemoteLogTablet remoteLog = remoteLogManager.remoteLogTablet(tb);
Review Comment:
can remove
##########
fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerITCase.java:
##########
@@ -97,16 +118,141 @@ void testBuildClusterModel() throws Exception {
FLUSS_CLUSTER_EXTENSION.newCoordinatorClient().addServerTag(request).get();
clusterModel = rebalanceManager.buildClusterModel();
- assertThat(clusterModel.servers().size()).isEqualTo(3);
- assertThat(clusterModel.aliveServers().size()).isEqualTo(2);
+ assertThat(clusterModel.servers().size()).isEqualTo(6);
+ assertThat(clusterModel.aliveServers().size()).isEqualTo(5);
assertThat(clusterModel.offlineServers().size()).isEqualTo(1);
assertThat(clusterModel.tables().size()).isEqualTo(2);
assertThat(clusterModel.tables()).contains(tableId1, tableId2);
}
+ @Test
+ void testRebalanceWithRemoteLog() throws Exception {
+ TableBucket tb = setupTableBucket();
+ long tableId = tb.getTableId();
+
+ int leaderId = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
+ TabletServerGateway leaderGateway =
+ FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leaderId);
+
+ // produce test records
+ produceRecordsAndWaitRemoteLogCopy(leaderGateway, tb, 0L);
+ // test metadata updated: verify manifest in metadata
+ TabletServer tabletServer =
FLUSS_CLUSTER_EXTENSION.getTabletServerById(leaderId);
+ RemoteLogManager remoteLogManager =
tabletServer.getReplicaManager().getRemoteLogManager();
+ RemoteLogTablet remoteLogTablet = remoteLogManager.remoteLogTablet(tb);
+
+ RemoteLogManifest manifest = remoteLogTablet.currentManifest();
+
assertThat(manifest.getPhysicalTablePath().getTablePath()).isEqualTo(DATA1_TABLE_PATH);
+ assertThat(manifest.getTableBucket()).isEqualTo(tb);
+ assertThat(manifest.getRemoteLogSegmentList().size()).isGreaterThan(0);
+
+ // try to trigger rebalance. for example, if current assignment is [0,
1, 2], try to trigger
+ // rebalance to [3, 4, 5]
+ RebalancePlanForBucket planForBucket = buildRebalancePlanForBucket(tb);
+ rebalanceManager.registerRebalance(
+ "test-rebalance-dsds",
+ Collections.singletonMap(tb, planForBucket),
+ RebalanceStatus.NOT_STARTED);
+
+ retry(
+ Duration.ofMinutes(2),
+ () -> {
+ // assignment changed.
+ Set<Integer> newReplicaSet = new
HashSet<>(planForBucket.getNewReplicas());
+ BucketAssignment bucketAssignment =
+ zkClient.getTableAssignment(tableId)
+ .get()
+ .getBucketAssignment(tb.getBucket());
+ List<Integer> replicas = bucketAssignment.getReplicas();
+
assertThat(newReplicaSet.size()).isEqualTo(replicas.size());
+ assertThat(newReplicaSet.containsAll(replicas)).isTrue();
+
+ // leader changed.
+ int newLeader = planForBucket.getNewLeader();
+
assertThat(FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb)).isEqualTo(newLeader);
+
+ // origin replicas all set to offline.
+ List<Integer> originReplicas =
planForBucket.getOriginReplicas();
+ for (int originReplica : originReplicas) {
+ TabletServer ts =
+
FLUSS_CLUSTER_EXTENSION.getTabletServerById(originReplica);
+ ReplicaManager rm = ts.getReplicaManager();
+ assertThat(rm.getReplica(tb))
+
.isInstanceOf(ReplicaManager.NoneReplica.class);
+ }
+ });
+ // remote log not be deleted.
+ int newLeader = planForBucket.getNewLeader();
+ TabletServer leaderTs =
FLUSS_CLUSTER_EXTENSION.getTabletServerById(newLeader);
+ RemoteLogManager leaderRm =
leaderTs.getReplicaManager().getRemoteLogManager();
+ RemoteLogTablet leaderRlt = leaderRm.remoteLogTablet(tb);
+
+ RemoteLogManifest newManifest = leaderRlt.currentManifest();
+
assertThat(newManifest.getPhysicalTablePath().getTablePath()).isEqualTo(DATA1_TABLE_PATH);
+ assertThat(newManifest.getTableBucket()).isEqualTo(tb);
+
assertThat(newManifest.getRemoteLogSegmentList().size()).isGreaterThan(0);
Review Comment:
assert the size is the same with preivous size?
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java:
##########
@@ -307,17 +308,36 @@ private void doHandleStateChanges(
replica -> doStateChange(replica,
ReplicaState.OfflineReplica));
break;
+ case ReplicaMigrateSuccessful:
+ validReplicas.forEach(
+ replica -> doStateChange(replica,
ReplicaState.ReplicaMigrateSuccessful));
+ validReplicas.forEach(
+ tableBucketReplica -> {
+ int replicaServer =
tableBucketReplica.getReplica();
+ // send stop replica request with delete = true
and deleteRemote = false
+ // indicates the replica is migrated.
+
coordinatorRequestBatch.addStopReplicaRequestForTabletServers(
+ Collections.singleton(replicaServer),
+ tableBucketReplica.getTableBucket(),
+ true,
+ false,
+ coordinatorContext.getBucketLeaderEpoch(
+
tableBucketReplica.getTableBucket()));
+ });
+ break;
case ReplicaDeletionStarted:
validReplicas.forEach(
replica -> doStateChange(replica,
ReplicaState.ReplicaDeletionStarted));
- // send stop replica request with delete = true
+ // send stop replica request with delete = true and delete =
true indicates the
Review Comment:
```suggestion
// send stop replica request with deleteLocal = true and
deleteRemote = true indicates the
```
##########
fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java:
##########
@@ -387,4 +388,9 @@ private static void shutdownAndAwaitTermination(
public RemoteLogIndexCache getRemoteLogIndexCache() {
return remoteLogIndexCache;
}
+
+ @VisibleForTesting
+ public @Nullable TaskWithFuture getTaskWithFuture(TableBucket tableBucket)
{
Review Comment:
package visible is enough.
##########
fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java:
##########
@@ -716,11 +716,12 @@ public static List<NotifyLeaderAndIsrResultForBucket>
getNotifyLeaderAndIsrRespo
}
public static PbStopReplicaReqForBucket makeStopBucketReplica(
- TableBucket tableBucket, boolean isDelete, int leaderEpoch) {
+ TableBucket tableBucket, boolean isDelete, boolean deleteRemote,
int leaderEpoch) {
Review Comment:
rename `delete` to `deleteLocal`, and add comments to explain the parameters
meaning.
##########
fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java:
##########
@@ -497,10 +504,78 @@ void testStopReplica(boolean partitionTable) throws
Exception {
assertThatThrownBy(() ->
remoteLogManager.relevantRemoteLogSegments(tb, 0L))
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("RemoteLogTablet can't be found for
table-bucket " + tb);
- FsPath logTabletDir = remoteLogTabletDir(remoteLogDir(conf),
DATA1_PHYSICAL_TABLE_PATH, tb);
+ FsPath logTabletDir =
+ remoteLogTabletDir(
+ remoteLogDir(conf),
+ partitionTable
+ ? DATA1_PHYSICAL_TABLE_PATH_PA_2024
+ : DATA1_PHYSICAL_TABLE_PATH,
+ tb);
assertThat(logTabletDir.getFileSystem().exists(logTabletDir)).isFalse();
}
+ @ParameterizedTest
+ @MethodSource("stopArgs")
+ void testStopReplicaDeleteRemoteLog(boolean partitionTable, boolean
deleteRemote)
+ throws Exception {
+ TableBucket tb = makeTableBucket(partitionTable);
+ // Need to make leader by ReplicaManager.
+ makeLogTableAsLeader(tb, partitionTable);
+ LogTablet logTablet =
replicaManager.getReplicaOrException(tb).getLogTablet();
+ addMultiSegmentsToLogTablet(logTablet, 5);
+ // trigger RLMTask copy local log segment to remote and update
metadata.
+ remoteLogTaskScheduler.triggerPeriodicScheduledTasks();
+ RemoteLogTablet remoteLog = remoteLogManager.remoteLogTablet(tb);
+ List<RemoteLogSegment> remoteLogSegmentList =
+ remoteLogManager.relevantRemoteLogSegments(tb, 0L);
+ assertThat(remoteLogSegmentList.size()).isEqualTo(4);
+ assertThat(listRemoteLogFiles(tb))
+ .isEqualTo(
+ remoteLogSegmentList.stream()
+ .map(s -> s.remoteLogSegmentId().toString())
+ .collect(Collectors.toSet()));
+ assertThat(remoteLogManager.getTaskWithFuture(tb)).isNotNull();
+
+ FsPath logTabletDir =
+ remoteLogTabletDir(
+ remoteLogDir(conf),
+ partitionTable
+ ? DATA1_PHYSICAL_TABLE_PATH_PA_2024
+ : DATA1_PHYSICAL_TABLE_PATH,
+ tb);
+ assertThat(logTabletDir.getFileSystem().exists(logTabletDir)).isTrue();
+
+ // stop with delete = false, deleteRemote =false, local and remote log
should be kept,
+ // remote log task will be removed.
+ CompletableFuture<List<StopReplicaResultForBucket>> future1 = new
CompletableFuture<>();
+ replicaManager.stopReplicas(
+ 0,
+ Collections.singletonList(new StopReplicaData(tb, false,
false, 0, 0)),
+ future1::complete);
+ assertThat(future1.get()).containsOnly(new
StopReplicaResultForBucket(tb));
+ ReplicaManager.HostedReplica hostedReplica =
replicaManager.getReplica(tb);
+
assertThat(hostedReplica).isInstanceOf(ReplicaManager.OnlineReplica.class);
+ assertThat(logTabletDir.getFileSystem().exists(logTabletDir)).isTrue();
+ assertThat(remoteLogManager.getTaskWithFuture(tb)).isNull();
+
+ CompletableFuture<List<StopReplicaResultForBucket>> future2 = new
CompletableFuture<>();
+ replicaManager.stopReplicas(
+ 0,
+ Collections.singletonList(new StopReplicaData(tb, true,
deleteRemote, 0, 0)),
+ future2::complete);
+ assertThat(future2.get()).containsOnly(new
StopReplicaResultForBucket(tb));
+ hostedReplica = replicaManager.getReplica(tb);
+
assertThat(hostedReplica).isInstanceOf(ReplicaManager.NoneReplica.class);
+ if (!deleteRemote) {
Review Comment:
assert `logTablet.getLogDir()` is also deleted?
##########
fluss-server/src/main/java/org/apache/fluss/server/entity/StopReplicaData.java:
##########
@@ -24,13 +24,19 @@
public class StopReplicaData {
private final TableBucket tableBucket;
private final boolean delete;
+ private final boolean deleteRemote;
private final int coordinatorEpoch;
private final int leaderEpoch;
public StopReplicaData(
- TableBucket tableBucket, boolean delete, int coordinatorEpoch, int
leaderEpoch) {
+ TableBucket tableBucket,
+ boolean delete,
+ boolean deleteRemote,
Review Comment:
rename `delete` to `deleteLocal`, and add comments to explain the parameters
meaning.
##########
fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java:
##########
@@ -739,6 +740,9 @@ public static List<StopReplicaData>
getStopReplicaData(StopReplicaRequest reques
new StopReplicaData(
toTableBucket(tableBucket),
reqForBucket.isDelete(),
+ reqForBucket.hasDeleteRemote()
+ ? reqForBucket.isDeleteRemote()
+ : reqForBucket.isDelete(),
Review Comment:
add comment for the backward compatibility explanation.
##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -900,6 +900,7 @@ message PbStopReplicaReqForBucket {
required PbTableBucket table_bucket = 1;
required int32 leader_epoch = 2;
required bool delete = 3;
+ optional bool deleteRemote = 4;
Review Comment:
Add a comment to clarify the behavioral difference between `delete` and
`deleteRemote`:
- `delete` removes **local replica data** (i.e., data stored on the current
node).
- `deleteRemote` deletes **remote bucket data** (e.g., data in object
storage) and was introduced in **v0.9**.
For backward compatibility, if a request does not include the `deleteRemote`
flag, the system treats `delete` as `deleteRemote` (i.e., it falls back to
remote deletion). This ensures older. CoordinatorServer continues to function
correctly with newer TabletServers.
##########
fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java:
##########
@@ -1715,6 +1716,7 @@ private void maybeShrinkIsr() {
private StopReplicaResultForBucket stopReplica(
TableBucket tb,
boolean delete,
+ boolean deleteRemote,
Review Comment:
Rename `delete` to `deleteLocal`, and add comment to explain the parameters.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaState.java:
##########
@@ -48,6 +48,14 @@ public Set<ReplicaState> getValidPreviousStates() {
return EnumSet.of(NewReplica, OnlineReplica, OfflineReplica);
}
},
+ ReplicaMigrateSuccessful {
Review Comment:
Rename the state to `ReplicaMigrationStarted`. This indicates that replica
migration has been initiated for the replica, but has not yet completed
successfully. Once the migration succeeds, the state should transition directly
to `NonExistentReplica`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]