platinumhamburg commented on code in PR #1452: URL: https://github.com/apache/fluss/pull/1452#discussion_r2666893218
########## fluss-client/src/test/java/org/apache/fluss/client/admin/RebalanceITCase.java: ########## @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.admin; + +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.cluster.rebalance.GoalType; +import org.apache.fluss.cluster.rebalance.RebalancePlan; +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.cluster.rebalance.RebalanceProgress; +import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket; +import org.apache.fluss.cluster.rebalance.RebalanceStatus; +import org.apache.fluss.cluster.rebalance.ServerTag; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.NoRebalanceInProgressException; +import org.apache.fluss.metadata.DatabaseDescriptor; +import org.apache.fluss.metadata.PartitionSpec; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.replica.ReplicaManager; +import org.apache.fluss.server.testutils.FlussClusterExtension; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; + +import static org.apache.fluss.record.TestData.DATA1_SCHEMA; +import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR; +import static org.apache.fluss.testutils.common.CommonTestUtils.retry; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** ITCase for rebalance. */ +public class RebalanceITCase { + @RegisterExtension + public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION = + FlussClusterExtension.builder() + .setNumOfTabletServers(4) + .setClusterConf(initConfig()) + .build(); + + private Connection conn; + private Admin admin; + + @BeforeEach + protected void setup() throws Exception { + conn = ConnectionFactory.createConnection(FLUSS_CLUSTER_EXTENSION.getClientConfig()); + admin = conn.getAdmin(); + } + + @AfterEach + protected void teardown() throws Exception { + FLUSS_CLUSTER_EXTENSION.getZooKeeperClient().deleteRebalancePlan(); + + if (admin != null) { + admin.close(); + admin = null; + } + + if (conn != null) { + conn.close(); + conn = null; + } + } + + // TODO add test for primary key table, trace by https://github.com/apache/fluss/issues/2315 + + @Test + void testRebalanceForLogTable() throws Exception { + String dbName = "db-balance"; + admin.createDatabase(dbName, DatabaseDescriptor.EMPTY, false).get(); + + // add server tag PERMANENT_OFFLINE for server 3, this will avoid to generate bucket + // assignment on server 3 when create table. + admin.addServerTag(Collections.singletonList(3), ServerTag.PERMANENT_OFFLINE).get(); + + // create some none partitioned log table. + for (int i = 0; i < 2; i++) { + long tableId = + createTable( + new TablePath(dbName, "test-rebalance_table-" + i), + DATA1_TABLE_DESCRIPTOR, + false); + FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId); + } + + // create one partitioned table with two partitions. + TableDescriptor partitionedDescriptor = + TableDescriptor.builder() + .schema(DATA1_SCHEMA) + .distributedBy(3) + .partitionedBy("b") + .build(); + TablePath tablePath = new TablePath(dbName, "test-rebalance_partitioned_table1"); + long tableId = createTable(tablePath, partitionedDescriptor, false); + for (int j = 0; j < 2; j++) { + PartitionSpec partitionSpec = + new PartitionSpec(Collections.singletonMap("b", String.valueOf(j))); + admin.createPartition(tablePath, partitionSpec, false).get(); + long partitionId = + admin.listPartitionInfos(tablePath, partitionSpec) + .get() + .get(0) + .getPartitionId(); + FLUSS_CLUSTER_EXTENSION.waitUntilTablePartitionReady(tableId, partitionId); + } + + // verify before rebalance. As we use unbalance assignment, all replicas will not be + // location on servers 3. + for (int i = 0; i < 3; i++) { + ReplicaManager replicaManager = + FLUSS_CLUSTER_EXTENSION.getTabletServerById(i).getReplicaManager(); + assertThat(replicaManager.onlineReplicas().count()).isGreaterThan(0); + assertThat(replicaManager.leaderCount()).isGreaterThan(0); + } + ReplicaManager replicaManager3 = + FLUSS_CLUSTER_EXTENSION.getTabletServerById(3).getReplicaManager(); + assertThat(replicaManager3.onlineReplicas().count()).isEqualTo(0); + assertThat(replicaManager3.leaderCount()).isEqualTo(0); + + // remove tag after crated table. + admin.removeServerTag(Collections.singletonList(3), ServerTag.PERMANENT_OFFLINE).get(); + + // trigger rebalance with goal set[ReplicaDistributionGoal, LeaderReplicaDistributionGoal] + RebalancePlan rebalancePlan = + admin.rebalance( + Arrays.asList( + GoalType.REPLICA_DISTRIBUTION, + GoalType.LEADER_DISTRIBUTION), + false) + .get(); + + retry( + Duration.ofMinutes(2), + () -> { + RebalanceProgress progress = admin.listRebalanceProgress(null).get(); + assertPlan(rebalancePlan, progress); + }); + for (int i = 0; i < 4; i++) { + ReplicaManager replicaManager = + FLUSS_CLUSTER_EXTENSION.getTabletServerById(i).getReplicaManager(); + // average will be 9 + assertThat(replicaManager.onlineReplicas().count()).isBetween(6L, 12L); + long leaderCount = replicaManager.leaderCount(); + // average will be 3 + assertThat(leaderCount).isBetween(1L, 6L); Review Comment: The test assertions are too vague to effectively verify the rebalance behavior. As expected, we should assert that the cluster—including server 3—has achieved the predefined balance target. ########## fluss-client/src/test/java/org/apache/fluss/client/admin/RebalanceITCase.java: ########## @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.admin; + +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.cluster.rebalance.GoalType; +import org.apache.fluss.cluster.rebalance.RebalancePlan; +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.cluster.rebalance.RebalanceProgress; +import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket; +import org.apache.fluss.cluster.rebalance.RebalanceStatus; +import org.apache.fluss.cluster.rebalance.ServerTag; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.NoRebalanceInProgressException; +import org.apache.fluss.metadata.DatabaseDescriptor; +import org.apache.fluss.metadata.PartitionSpec; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.replica.ReplicaManager; +import org.apache.fluss.server.testutils.FlussClusterExtension; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; + +import static org.apache.fluss.record.TestData.DATA1_SCHEMA; +import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR; +import static org.apache.fluss.testutils.common.CommonTestUtils.retry; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** ITCase for rebalance. */ +public class RebalanceITCase { Review Comment: Missing test case: What happens when multiple rebalance tasks are created simultaneously? (Given certain semantic inconsistencies in our API design—for example, the list endpoint implies that multiple rebalance tasks can coexist, and the existence of a rebalanceId further supports this; yet some other endpoints assume only a single rebalance task can exist at a time, and the cancel endpoint allows ambiguous specification of a rebalance task—we need to clarify and ensure overall behavioral consistency in the design.) ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/ReplicaDistributionAbstractGoal.java: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance.goal; + +import org.apache.fluss.exception.RebalanceFailureException; +import org.apache.fluss.server.coordinator.rebalance.ReBalancingAction; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; +import org.apache.fluss.server.coordinator.rebalance.model.ServerModel; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Set; + +import static org.apache.fluss.server.coordinator.rebalance.ActionAcceptance.ACCEPT; +import static org.apache.fluss.server.coordinator.rebalance.goal.GoalUtils.aliveServersNotExcludeForReplicaMove; + +/** An abstract class for goals that are based on the distribution of replicas. */ +public abstract class ReplicaDistributionAbstractGoal extends AbstractGoal { + private static final Logger LOG = + LoggerFactory.getLogger(ReplicaDistributionAbstractGoal.class); + private static final double BALANCE_MARGIN = 0.9; + protected final Set<Integer> serverIdsAboveRebalanceUpperLimit; + protected final Set<Integer> serverIdsBelowRebalanceLowerLimit; + protected double avgReplicasOnAliveServer; + protected int rebalanceUpperLimit; + protected int rebalanceLowerLimit; + // This is used to identify servers not excluded for replica moves. + protected Set<Integer> serversAllowedReplicaRemove; + + public ReplicaDistributionAbstractGoal() { + serverIdsAboveRebalanceUpperLimit = new HashSet<>(); + serverIdsBelowRebalanceLowerLimit = new HashSet<>(); + } + + private int rebalanceUpperLimit(double balancePercentage) { + return (int) + Math.ceil( + avgReplicasOnAliveServer + * (1 + adjustedRebalancePercentage(balancePercentage))); + } + + private int rebalanceLowerLimit(double balancePercentage) { + return (int) + Math.floor( + avgReplicasOnAliveServer + * Math.max( + 0, (1 - adjustedRebalancePercentage(balancePercentage)))); + } + + private double adjustedRebalancePercentage(double rebalancePercentage) { + return (rebalancePercentage - 1) * BALANCE_MARGIN; + } + + boolean isReplicaCountUnderBalanceUpperLimitAfterChange( + ServerModel server, int currentReplicaCount, ChangeType changeType) { Review Comment: The changeType is always equal to ChangeType.ADD. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalOptimizerUtils.java: ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance.goal; + +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.server.coordinator.rebalance.ActionAcceptance; +import org.apache.fluss.server.coordinator.rebalance.ReBalancingAction; +import org.apache.fluss.server.coordinator.rebalance.model.BucketModel; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; +import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.fluss.server.coordinator.rebalance.ActionAcceptance.ACCEPT; +import static org.apache.fluss.utils.Preconditions.checkNotNull; + +/** An util class for {@link GoalOptimizer}. */ +public class GoalOptimizerUtils { + + public static final double EPSILON = 1E-5; Review Comment: should move to MathUtils. ########## fluss-client/src/test/java/org/apache/fluss/client/admin/RebalanceITCase.java: ########## @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.admin; + +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.cluster.rebalance.GoalType; +import org.apache.fluss.cluster.rebalance.RebalancePlan; +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.cluster.rebalance.RebalanceProgress; +import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket; +import org.apache.fluss.cluster.rebalance.RebalanceStatus; +import org.apache.fluss.cluster.rebalance.ServerTag; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.NoRebalanceInProgressException; +import org.apache.fluss.metadata.DatabaseDescriptor; +import org.apache.fluss.metadata.PartitionSpec; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.replica.ReplicaManager; +import org.apache.fluss.server.testutils.FlussClusterExtension; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; + +import static org.apache.fluss.record.TestData.DATA1_SCHEMA; +import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR; +import static org.apache.fluss.testutils.common.CommonTestUtils.retry; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** ITCase for rebalance. */ +public class RebalanceITCase { + @RegisterExtension + public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION = + FlussClusterExtension.builder() + .setNumOfTabletServers(4) + .setClusterConf(initConfig()) + .build(); + + private Connection conn; + private Admin admin; + + @BeforeEach + protected void setup() throws Exception { + conn = ConnectionFactory.createConnection(FLUSS_CLUSTER_EXTENSION.getClientConfig()); + admin = conn.getAdmin(); + } + + @AfterEach + protected void teardown() throws Exception { + FLUSS_CLUSTER_EXTENSION.getZooKeeperClient().deleteRebalancePlan(); + + if (admin != null) { + admin.close(); + admin = null; + } + + if (conn != null) { + conn.close(); + conn = null; + } + } + + // TODO add test for primary key table, trace by https://github.com/apache/fluss/issues/2315 + + @Test + void testRebalanceForLogTable() throws Exception { + String dbName = "db-balance"; + admin.createDatabase(dbName, DatabaseDescriptor.EMPTY, false).get(); + + // add server tag PERMANENT_OFFLINE for server 3, this will avoid to generate bucket + // assignment on server 3 when create table. + admin.addServerTag(Collections.singletonList(3), ServerTag.PERMANENT_OFFLINE).get(); + + // create some none partitioned log table. + for (int i = 0; i < 2; i++) { + long tableId = + createTable( + new TablePath(dbName, "test-rebalance_table-" + i), + DATA1_TABLE_DESCRIPTOR, + false); + FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId); + } + + // create one partitioned table with two partitions. + TableDescriptor partitionedDescriptor = + TableDescriptor.builder() + .schema(DATA1_SCHEMA) + .distributedBy(3) + .partitionedBy("b") + .build(); + TablePath tablePath = new TablePath(dbName, "test-rebalance_partitioned_table1"); + long tableId = createTable(tablePath, partitionedDescriptor, false); + for (int j = 0; j < 2; j++) { + PartitionSpec partitionSpec = + new PartitionSpec(Collections.singletonMap("b", String.valueOf(j))); + admin.createPartition(tablePath, partitionSpec, false).get(); + long partitionId = + admin.listPartitionInfos(tablePath, partitionSpec) + .get() + .get(0) + .getPartitionId(); + FLUSS_CLUSTER_EXTENSION.waitUntilTablePartitionReady(tableId, partitionId); + } + + // verify before rebalance. As we use unbalance assignment, all replicas will not be + // location on servers 3. + for (int i = 0; i < 3; i++) { + ReplicaManager replicaManager = + FLUSS_CLUSTER_EXTENSION.getTabletServerById(i).getReplicaManager(); + assertThat(replicaManager.onlineReplicas().count()).isGreaterThan(0); + assertThat(replicaManager.leaderCount()).isGreaterThan(0); + } + ReplicaManager replicaManager3 = + FLUSS_CLUSTER_EXTENSION.getTabletServerById(3).getReplicaManager(); + assertThat(replicaManager3.onlineReplicas().count()).isEqualTo(0); + assertThat(replicaManager3.leaderCount()).isEqualTo(0); + + // remove tag after crated table. + admin.removeServerTag(Collections.singletonList(3), ServerTag.PERMANENT_OFFLINE).get(); + + // trigger rebalance with goal set[ReplicaDistributionGoal, LeaderReplicaDistributionGoal] + RebalancePlan rebalancePlan = + admin.rebalance( + Arrays.asList( + GoalType.REPLICA_DISTRIBUTION, + GoalType.LEADER_DISTRIBUTION), + false) + .get(); + + retry( + Duration.ofMinutes(2), + () -> { + RebalanceProgress progress = admin.listRebalanceProgress(null).get(); + assertPlan(rebalancePlan, progress); + }); + for (int i = 0; i < 4; i++) { + ReplicaManager replicaManager = + FLUSS_CLUSTER_EXTENSION.getTabletServerById(i).getReplicaManager(); + // average will be 9 + assertThat(replicaManager.onlineReplicas().count()).isBetween(6L, 12L); + long leaderCount = replicaManager.leaderCount(); + // average will be 3 + assertThat(leaderCount).isBetween(1L, 6L); Review Comment: We might want to create a generic assertion method that specifies which subset of servers is included and, based on which GoalType, what balance target has been achieved. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/BucketModel.java: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance.model; + +import org.apache.fluss.metadata.TableBucket; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.apache.fluss.utils.Preconditions.checkArgument; + +/** A class that holds the information of the {@link TableBucket} for rebalance. */ +public class BucketModel { + private final TableBucket tableBucket; + private final List<ReplicaModel> replicas; + private @Nullable ReplicaModel leader; + // Set of server which are unable to host replica of this replica (such as: the server are + // offline). + private final Set<ServerModel> ineligibleServers; + + public BucketModel(TableBucket tableBucket, Set<ServerModel> ineligibleServers) { + this.tableBucket = tableBucket; + this.replicas = new ArrayList<>(); + this.leader = null; + this.ineligibleServers = ineligibleServers; + } + + public TableBucket tableBucket() { + return tableBucket; + } + + public @Nullable ReplicaModel leader() { + return leader; + } + + public List<ReplicaModel> replicas() { + return replicas; + } + + public Set<ServerModel> bucketServers() { + Set<ServerModel> bucketServers = new HashSet<>(); + replicas.forEach(replica -> bucketServers.add(replica.server())); + return bucketServers; + } + + public boolean canAssignReplicaToServer(ServerModel candidateServer) { + return !ineligibleServers.contains(candidateServer); + } + + public ReplicaModel replica(long serverId) { + for (ReplicaModel replica : replicas) { + if (replica.server().id() == serverId) { + return replica; + } + } + + throw new IllegalArgumentException( + "Requested replica " + serverId + " is not a replica of bucket " + tableBucket); + } + + public void addLeader(ReplicaModel leader, int index) { + checkArgument( + this.leader == null, + String.format( + "Bucket %s already has a leader replica %s. Cannot add a new leader replica %s.", + tableBucket, this.leader, leader)); + + checkArgument( + leader.isLeader(), + String.format( + "Inconsistent leadership information. Trying to set %s as the leader for bucket %s while " + + "the replica is not marked as a leader", + leader, tableBucket)); + + this.leader = leader; + replicas.add(index, leader); + } + + public void addFollower(ReplicaModel follower, int index) { + checkArgument( + !follower.isLeader(), + String.format( + "Inconsistent leadership information. Trying to set %s as the follower for bucket %s while " + + "the replica is marked as a leader", + follower, tableBucket)); + + checkArgument( + follower.tableBucket().equals(this.tableBucket), + String.format( + "Inconsistent table bucket. Trying to add follower replica %s to tableBucket %s", + follower, tableBucket)); + + // Add follower to list of followers + replicas.add(index, follower); + } + + void relocateLeadership(ReplicaModel prospectiveLeader) { + int leaderPos = replicas.indexOf(prospectiveLeader); + swapReplicaPositions(0, leaderPos); Review Comment: Is it safe to always use 0 here? ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/ReplicaDistributionAbstractGoal.java: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance.goal; + +import org.apache.fluss.exception.RebalanceFailureException; +import org.apache.fluss.server.coordinator.rebalance.ReBalancingAction; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; +import org.apache.fluss.server.coordinator.rebalance.model.ServerModel; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Set; + +import static org.apache.fluss.server.coordinator.rebalance.ActionAcceptance.ACCEPT; +import static org.apache.fluss.server.coordinator.rebalance.goal.GoalUtils.aliveServersNotExcludeForReplicaMove; + +/** An abstract class for goals that are based on the distribution of replicas. */ +public abstract class ReplicaDistributionAbstractGoal extends AbstractGoal { + private static final Logger LOG = + LoggerFactory.getLogger(ReplicaDistributionAbstractGoal.class); + private static final double BALANCE_MARGIN = 0.9; + protected final Set<Integer> serverIdsAboveRebalanceUpperLimit; + protected final Set<Integer> serverIdsBelowRebalanceLowerLimit; + protected double avgReplicasOnAliveServer; + protected int rebalanceUpperLimit; + protected int rebalanceLowerLimit; + // This is used to identify servers not excluded for replica moves. + protected Set<Integer> serversAllowedReplicaRemove; + + public ReplicaDistributionAbstractGoal() { + serverIdsAboveRebalanceUpperLimit = new HashSet<>(); + serverIdsBelowRebalanceLowerLimit = new HashSet<>(); + } + + private int rebalanceUpperLimit(double balancePercentage) { + return (int) + Math.ceil( + avgReplicasOnAliveServer + * (1 + adjustedRebalancePercentage(balancePercentage))); + } + + private int rebalanceLowerLimit(double balancePercentage) { + return (int) + Math.floor( + avgReplicasOnAliveServer + * Math.max( + 0, (1 - adjustedRebalancePercentage(balancePercentage)))); + } + + private double adjustedRebalancePercentage(double rebalancePercentage) { + return (rebalancePercentage - 1) * BALANCE_MARGIN; + } + + boolean isReplicaCountUnderBalanceUpperLimitAfterChange( + ServerModel server, int currentReplicaCount, ChangeType changeType) { + int serverBalanceUpperLimit = server.isAlive() ? rebalanceUpperLimit : 0; + + return changeType == ChangeType.ADD + ? currentReplicaCount + 1 <= serverBalanceUpperLimit + : currentReplicaCount - 1 <= serverBalanceUpperLimit; + } + + boolean isReplicaCountAboveBalanceLowerLimitAfterChange( Review Comment: The changeType is always equal to ChangeType.REMOVE. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/ReplicaReassignment.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** Replica reassignment. */ +public class ReplicaReassignment { + private final List<Integer> replicas; + private final List<Integer> addingReplicas; + private final List<Integer> removingReplicas; + + private ReplicaReassignment( + List<Integer> replicas, List<Integer> addingReplicas, List<Integer> removingReplicas) { + this.replicas = Collections.unmodifiableList(replicas); + this.addingReplicas = Collections.unmodifiableList(addingReplicas); + this.removingReplicas = Collections.unmodifiableList(removingReplicas); + } + + private static ReplicaReassignment build( + List<Integer> originReplicas, List<Integer> targetReplicas) { + // targetReplicas behind originReplicas in full set. + List<Integer> fullReplicaSet = new ArrayList<>(targetReplicas); + fullReplicaSet.addAll(originReplicas); + fullReplicaSet = fullReplicaSet.stream().distinct().collect(Collectors.toList()); + + List<Integer> newAddingReplicas = new ArrayList<>(fullReplicaSet); + newAddingReplicas.removeAll(originReplicas); + + List<Integer> newRemovingReplicas = new ArrayList<>(originReplicas); + newRemovingReplicas.removeAll(targetReplicas); + + return new ReplicaReassignment(fullReplicaSet, newAddingReplicas, newRemovingReplicas); + } + + private List<Integer> getTargetReplicas() { + List<Integer> computed = new ArrayList<>(replicas); + computed.removeAll(removingReplicas); + return Collections.unmodifiableList(computed); + } + + private List<Integer> getOriginReplicas() { + List<Integer> computed = new ArrayList<>(replicas); + computed.removeAll(addingReplicas); + return Collections.unmodifiableList(computed); + } + + private boolean isBeingReassigned() { Review Comment: ditto ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/ReplicaReassignment.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** Replica reassignment. */ +public class ReplicaReassignment { + private final List<Integer> replicas; + private final List<Integer> addingReplicas; + private final List<Integer> removingReplicas; + + private ReplicaReassignment( + List<Integer> replicas, List<Integer> addingReplicas, List<Integer> removingReplicas) { + this.replicas = Collections.unmodifiableList(replicas); + this.addingReplicas = Collections.unmodifiableList(addingReplicas); + this.removingReplicas = Collections.unmodifiableList(removingReplicas); + } + + private static ReplicaReassignment build( Review Comment: dead code ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/ReplicaReassignment.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** Replica reassignment. */ +public class ReplicaReassignment { + private final List<Integer> replicas; + private final List<Integer> addingReplicas; + private final List<Integer> removingReplicas; + + private ReplicaReassignment( + List<Integer> replicas, List<Integer> addingReplicas, List<Integer> removingReplicas) { + this.replicas = Collections.unmodifiableList(replicas); + this.addingReplicas = Collections.unmodifiableList(addingReplicas); + this.removingReplicas = Collections.unmodifiableList(removingReplicas); + } + + private static ReplicaReassignment build( + List<Integer> originReplicas, List<Integer> targetReplicas) { + // targetReplicas behind originReplicas in full set. + List<Integer> fullReplicaSet = new ArrayList<>(targetReplicas); + fullReplicaSet.addAll(originReplicas); + fullReplicaSet = fullReplicaSet.stream().distinct().collect(Collectors.toList()); + + List<Integer> newAddingReplicas = new ArrayList<>(fullReplicaSet); + newAddingReplicas.removeAll(originReplicas); + + List<Integer> newRemovingReplicas = new ArrayList<>(originReplicas); + newRemovingReplicas.removeAll(targetReplicas); + + return new ReplicaReassignment(fullReplicaSet, newAddingReplicas, newRemovingReplicas); + } + + private List<Integer> getTargetReplicas() { Review Comment: ditto, dead code. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/ReplicaReassignment.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** Replica reassignment. */ +public class ReplicaReassignment { + private final List<Integer> replicas; + private final List<Integer> addingReplicas; + private final List<Integer> removingReplicas; + + private ReplicaReassignment( + List<Integer> replicas, List<Integer> addingReplicas, List<Integer> removingReplicas) { + this.replicas = Collections.unmodifiableList(replicas); + this.addingReplicas = Collections.unmodifiableList(addingReplicas); + this.removingReplicas = Collections.unmodifiableList(removingReplicas); + } + + private static ReplicaReassignment build( + List<Integer> originReplicas, List<Integer> targetReplicas) { + // targetReplicas behind originReplicas in full set. + List<Integer> fullReplicaSet = new ArrayList<>(targetReplicas); + fullReplicaSet.addAll(originReplicas); + fullReplicaSet = fullReplicaSet.stream().distinct().collect(Collectors.toList()); + + List<Integer> newAddingReplicas = new ArrayList<>(fullReplicaSet); + newAddingReplicas.removeAll(originReplicas); + + List<Integer> newRemovingReplicas = new ArrayList<>(originReplicas); + newRemovingReplicas.removeAll(targetReplicas); + + return new ReplicaReassignment(fullReplicaSet, newAddingReplicas, newRemovingReplicas); + } + + private List<Integer> getTargetReplicas() { + List<Integer> computed = new ArrayList<>(replicas); + computed.removeAll(removingReplicas); + return Collections.unmodifiableList(computed); + } + + private List<Integer> getOriginReplicas() { Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
