Copilot commented on code in PR #2286: URL: https://github.com/apache/fluss/pull/2286#discussion_r2659618044
########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java: ########## @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.cluster.rebalance.RebalanceProgress; +import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket; +import org.apache.fluss.cluster.rebalance.RebalanceStatus; +import org.apache.fluss.cluster.rebalance.ServerTag; +import org.apache.fluss.exception.RebalanceFailureException; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.server.coordinator.CoordinatorContext; +import org.apache.fluss.server.coordinator.CoordinatorEventProcessor; +import org.apache.fluss.server.coordinator.rebalance.goal.Goal; +import org.apache.fluss.server.coordinator.rebalance.goal.GoalOptimizer; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; +import org.apache.fluss.server.coordinator.rebalance.model.RackModel; +import org.apache.fluss.server.coordinator.rebalance.model.ServerModel; +import org.apache.fluss.server.metadata.ServerInfo; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.LeaderAndIsr; +import org.apache.fluss.server.zk.data.RebalancePlan; +import org.apache.fluss.utils.MapUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.CANCELED; +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.COMPLETED; +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.NOT_STARTED; +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.NO_TASK; +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.REBALANCING; +import static org.apache.fluss.cluster.rebalance.RebalanceUtils.FINAL_STATUSES; +import static org.apache.fluss.utils.Preconditions.checkArgument; +import static org.apache.fluss.utils.Preconditions.checkNotNull; +import static org.apache.fluss.utils.concurrent.LockUtils.inLock; + +/** + * A rebalance manager to generate rebalance plan, and execution rebalance plan. + * + * <p>This manager can only be used in {@link CoordinatorEventProcessor} as a single threaded model. + */ +@ThreadSafe +public class RebalanceManager { + + private static final Logger LOG = LoggerFactory.getLogger(RebalanceManager.class); + + private final AtomicBoolean isClosed = new AtomicBoolean(false); + private final Lock lock = new ReentrantLock(); + private final ZooKeeperClient zkClient; + private final CoordinatorEventProcessor eventProcessor; + + @GuardedBy("lock") + private final Queue<TableBucket> ongoingRebalanceTasksQueue = new ArrayDeque<>(); + + /** A mapping from table bucket to rebalance status of pending and running tasks. */ + @GuardedBy("lock") + private final Map<TableBucket, RebalanceResultForBucket> ongoingRebalanceTasks = + MapUtils.newConcurrentHashMap(); + + /** A mapping from table bucket to rebalance status of failed or completed tasks. */ + @GuardedBy("lock") + private final Map<TableBucket, RebalanceResultForBucket> finishedRebalanceTasks = + MapUtils.newConcurrentHashMap(); + + @GuardedBy("lock") + private final GoalOptimizer goalOptimizer; + + @GuardedBy("lock") + private long registerTime; + + @GuardedBy("lock") + private volatile RebalanceStatus rebalanceStatus = NO_TASK; + + @GuardedBy("lock") + private volatile @Nullable String currentRebalanceId; + + public RebalanceManager(CoordinatorEventProcessor eventProcessor, ZooKeeperClient zkClient) { + this.eventProcessor = eventProcessor; + this.zkClient = zkClient; + this.goalOptimizer = new GoalOptimizer(); + } + + public void startup() { + LOG.info("Start up rebalance manager."); + initialize(); + } + + private void initialize() { + try { + zkClient.getRebalancePlan() + .ifPresent( + rebalancePlan -> + registerRebalance( + rebalancePlan.getRebalanceId(), + rebalancePlan.getExecutePlan())); + } catch (Exception e) { + LOG.error( + "Failed to get rebalance plan from zookeeper, it will be treated as no" + + "rebalance tasks.", + e); + } + } + + public void registerRebalance( + String rebalanceId, Map<TableBucket, RebalancePlanForBucket> rebalancePlan) { + checkNotClosed(); + registerTime = System.currentTimeMillis(); Review Comment: This field access (publicly accessible via [this expression](1)) is not protected by any monitor, but the class is annotated as @ThreadSafe. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java: ########## @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.cluster.rebalance.RebalanceProgress; +import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket; +import org.apache.fluss.cluster.rebalance.RebalanceStatus; +import org.apache.fluss.cluster.rebalance.ServerTag; +import org.apache.fluss.exception.RebalanceFailureException; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.server.coordinator.CoordinatorContext; +import org.apache.fluss.server.coordinator.CoordinatorEventProcessor; +import org.apache.fluss.server.coordinator.rebalance.goal.Goal; +import org.apache.fluss.server.coordinator.rebalance.goal.GoalOptimizer; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; +import org.apache.fluss.server.coordinator.rebalance.model.RackModel; +import org.apache.fluss.server.coordinator.rebalance.model.ServerModel; +import org.apache.fluss.server.metadata.ServerInfo; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.LeaderAndIsr; +import org.apache.fluss.server.zk.data.RebalancePlan; +import org.apache.fluss.utils.MapUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.CANCELED; +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.COMPLETED; +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.NOT_STARTED; +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.NO_TASK; +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.REBALANCING; +import static org.apache.fluss.cluster.rebalance.RebalanceUtils.FINAL_STATUSES; +import static org.apache.fluss.utils.Preconditions.checkArgument; +import static org.apache.fluss.utils.Preconditions.checkNotNull; +import static org.apache.fluss.utils.concurrent.LockUtils.inLock; + +/** + * A rebalance manager to generate rebalance plan, and execution rebalance plan. + * + * <p>This manager can only be used in {@link CoordinatorEventProcessor} as a single threaded model. + */ +@ThreadSafe +public class RebalanceManager { + + private static final Logger LOG = LoggerFactory.getLogger(RebalanceManager.class); + + private final AtomicBoolean isClosed = new AtomicBoolean(false); + private final Lock lock = new ReentrantLock(); + private final ZooKeeperClient zkClient; + private final CoordinatorEventProcessor eventProcessor; + + @GuardedBy("lock") + private final Queue<TableBucket> ongoingRebalanceTasksQueue = new ArrayDeque<>(); + + /** A mapping from table bucket to rebalance status of pending and running tasks. */ + @GuardedBy("lock") + private final Map<TableBucket, RebalanceResultForBucket> ongoingRebalanceTasks = + MapUtils.newConcurrentHashMap(); + + /** A mapping from table bucket to rebalance status of failed or completed tasks. */ + @GuardedBy("lock") + private final Map<TableBucket, RebalanceResultForBucket> finishedRebalanceTasks = + MapUtils.newConcurrentHashMap(); + + @GuardedBy("lock") + private final GoalOptimizer goalOptimizer; + + @GuardedBy("lock") + private long registerTime; + + @GuardedBy("lock") + private volatile RebalanceStatus rebalanceStatus = NO_TASK; + + @GuardedBy("lock") + private volatile @Nullable String currentRebalanceId; + + public RebalanceManager(CoordinatorEventProcessor eventProcessor, ZooKeeperClient zkClient) { + this.eventProcessor = eventProcessor; + this.zkClient = zkClient; + this.goalOptimizer = new GoalOptimizer(); + } + + public void startup() { + LOG.info("Start up rebalance manager."); + initialize(); + } + + private void initialize() { + try { + zkClient.getRebalancePlan() + .ifPresent( + rebalancePlan -> + registerRebalance( + rebalancePlan.getRebalanceId(), + rebalancePlan.getExecutePlan())); + } catch (Exception e) { + LOG.error( + "Failed to get rebalance plan from zookeeper, it will be treated as no" + + "rebalance tasks.", Review Comment: This string appears to be missing a space after 'no'. ```suggestion + " rebalance tasks.", ``` ########## fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalOptimizerTest.java: ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance.goal; + +import org.apache.fluss.server.coordinator.rebalance.model.ServerModel; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.SortedSet; +import java.util.TreeSet; + +/** Test for {@link GoalOptimizer}. */ +public class GoalOptimizerTest { + + private SortedSet<ServerModel> servers; Review Comment: The contents of this container are never accessed. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java: ########## @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.cluster.rebalance.RebalanceProgress; +import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket; +import org.apache.fluss.cluster.rebalance.RebalanceStatus; +import org.apache.fluss.cluster.rebalance.ServerTag; +import org.apache.fluss.exception.RebalanceFailureException; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.server.coordinator.CoordinatorContext; +import org.apache.fluss.server.coordinator.CoordinatorEventProcessor; +import org.apache.fluss.server.coordinator.rebalance.goal.Goal; +import org.apache.fluss.server.coordinator.rebalance.goal.GoalOptimizer; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; +import org.apache.fluss.server.coordinator.rebalance.model.RackModel; +import org.apache.fluss.server.coordinator.rebalance.model.ServerModel; +import org.apache.fluss.server.metadata.ServerInfo; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.LeaderAndIsr; +import org.apache.fluss.server.zk.data.RebalancePlan; +import org.apache.fluss.utils.MapUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.CANCELED; +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.COMPLETED; +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.NOT_STARTED; +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.NO_TASK; +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.REBALANCING; +import static org.apache.fluss.cluster.rebalance.RebalanceUtils.FINAL_STATUSES; +import static org.apache.fluss.utils.Preconditions.checkArgument; +import static org.apache.fluss.utils.Preconditions.checkNotNull; +import static org.apache.fluss.utils.concurrent.LockUtils.inLock; + +/** + * A rebalance manager to generate rebalance plan, and execution rebalance plan. + * + * <p>This manager can only be used in {@link CoordinatorEventProcessor} as a single threaded model. + */ +@ThreadSafe +public class RebalanceManager { + + private static final Logger LOG = LoggerFactory.getLogger(RebalanceManager.class); + + private final AtomicBoolean isClosed = new AtomicBoolean(false); + private final Lock lock = new ReentrantLock(); + private final ZooKeeperClient zkClient; + private final CoordinatorEventProcessor eventProcessor; + + @GuardedBy("lock") + private final Queue<TableBucket> ongoingRebalanceTasksQueue = new ArrayDeque<>(); + + /** A mapping from table bucket to rebalance status of pending and running tasks. */ + @GuardedBy("lock") + private final Map<TableBucket, RebalanceResultForBucket> ongoingRebalanceTasks = + MapUtils.newConcurrentHashMap(); + + /** A mapping from table bucket to rebalance status of failed or completed tasks. */ + @GuardedBy("lock") + private final Map<TableBucket, RebalanceResultForBucket> finishedRebalanceTasks = + MapUtils.newConcurrentHashMap(); + + @GuardedBy("lock") + private final GoalOptimizer goalOptimizer; + + @GuardedBy("lock") + private long registerTime; + + @GuardedBy("lock") + private volatile RebalanceStatus rebalanceStatus = NO_TASK; + + @GuardedBy("lock") + private volatile @Nullable String currentRebalanceId; + + public RebalanceManager(CoordinatorEventProcessor eventProcessor, ZooKeeperClient zkClient) { + this.eventProcessor = eventProcessor; + this.zkClient = zkClient; + this.goalOptimizer = new GoalOptimizer(); + } + + public void startup() { + LOG.info("Start up rebalance manager."); + initialize(); + } + + private void initialize() { + try { + zkClient.getRebalancePlan() + .ifPresent( + rebalancePlan -> + registerRebalance( + rebalancePlan.getRebalanceId(), + rebalancePlan.getExecutePlan())); + } catch (Exception e) { + LOG.error( + "Failed to get rebalance plan from zookeeper, it will be treated as no" + + "rebalance tasks.", + e); + } + } + + public void registerRebalance( + String rebalanceId, Map<TableBucket, RebalancePlanForBucket> rebalancePlan) { + checkNotClosed(); + registerTime = System.currentTimeMillis(); + // Register to zookeeper first. + try { + // first clear all exists tasks. + ongoingRebalanceTasks.clear(); + ongoingRebalanceTasksQueue.clear(); + finishedRebalanceTasks.clear(); + + Optional<RebalancePlan> existPlanOpt = zkClient.getRebalancePlan(); + if (!existPlanOpt.isPresent()) { + zkClient.registerRebalancePlan( + new RebalancePlan(rebalanceId, NOT_STARTED, rebalancePlan)); + } else { + RebalancePlan existPlan = existPlanOpt.get(); + if (FINAL_STATUSES.contains(existPlan.getRebalanceStatus())) { + zkClient.updateRebalancePlan( + new RebalancePlan( + rebalanceId, NOT_STARTED, existPlanOpt.get().getExecutePlan())); + } else { + throw new RebalanceFailureException( + "Rebalance task already exists. Please wait for it to finish or cancel it first."); + } + } + + currentRebalanceId = rebalanceId; + rebalanceStatus = NOT_STARTED; + } catch (Exception e) { + LOG.error("Error when register rebalance plan to zookeeper.", e); + throw new RebalanceFailureException( + "Error when register rebalance plan to zookeeper.", e); Review Comment: Access of [element](1) annotated with VisibleForTesting found in production code. ```suggestion "Error when register rebalance plan to zookeeper."); ``` ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java: ########## @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.cluster.rebalance.RebalanceProgress; +import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket; +import org.apache.fluss.cluster.rebalance.RebalanceStatus; +import org.apache.fluss.cluster.rebalance.ServerTag; +import org.apache.fluss.exception.RebalanceFailureException; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.server.coordinator.CoordinatorContext; +import org.apache.fluss.server.coordinator.CoordinatorEventProcessor; +import org.apache.fluss.server.coordinator.rebalance.goal.Goal; +import org.apache.fluss.server.coordinator.rebalance.goal.GoalOptimizer; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; +import org.apache.fluss.server.coordinator.rebalance.model.RackModel; +import org.apache.fluss.server.coordinator.rebalance.model.ServerModel; +import org.apache.fluss.server.metadata.ServerInfo; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.LeaderAndIsr; +import org.apache.fluss.server.zk.data.RebalancePlan; +import org.apache.fluss.utils.MapUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.CANCELED; +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.COMPLETED; +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.NOT_STARTED; +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.NO_TASK; +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.REBALANCING; +import static org.apache.fluss.cluster.rebalance.RebalanceUtils.FINAL_STATUSES; +import static org.apache.fluss.utils.Preconditions.checkArgument; +import static org.apache.fluss.utils.Preconditions.checkNotNull; +import static org.apache.fluss.utils.concurrent.LockUtils.inLock; + +/** + * A rebalance manager to generate rebalance plan, and execution rebalance plan. + * + * <p>This manager can only be used in {@link CoordinatorEventProcessor} as a single threaded model. + */ +@ThreadSafe +public class RebalanceManager { + + private static final Logger LOG = LoggerFactory.getLogger(RebalanceManager.class); + + private final AtomicBoolean isClosed = new AtomicBoolean(false); + private final Lock lock = new ReentrantLock(); + private final ZooKeeperClient zkClient; + private final CoordinatorEventProcessor eventProcessor; + + @GuardedBy("lock") + private final Queue<TableBucket> ongoingRebalanceTasksQueue = new ArrayDeque<>(); + + /** A mapping from table bucket to rebalance status of pending and running tasks. */ + @GuardedBy("lock") + private final Map<TableBucket, RebalanceResultForBucket> ongoingRebalanceTasks = + MapUtils.newConcurrentHashMap(); + + /** A mapping from table bucket to rebalance status of failed or completed tasks. */ + @GuardedBy("lock") + private final Map<TableBucket, RebalanceResultForBucket> finishedRebalanceTasks = + MapUtils.newConcurrentHashMap(); + + @GuardedBy("lock") + private final GoalOptimizer goalOptimizer; + + @GuardedBy("lock") + private long registerTime; + + @GuardedBy("lock") + private volatile RebalanceStatus rebalanceStatus = NO_TASK; + + @GuardedBy("lock") + private volatile @Nullable String currentRebalanceId; + + public RebalanceManager(CoordinatorEventProcessor eventProcessor, ZooKeeperClient zkClient) { + this.eventProcessor = eventProcessor; + this.zkClient = zkClient; + this.goalOptimizer = new GoalOptimizer(); + } + + public void startup() { + LOG.info("Start up rebalance manager."); + initialize(); + } + + private void initialize() { + try { + zkClient.getRebalancePlan() + .ifPresent( + rebalancePlan -> + registerRebalance( + rebalancePlan.getRebalanceId(), + rebalancePlan.getExecutePlan())); + } catch (Exception e) { + LOG.error( + "Failed to get rebalance plan from zookeeper, it will be treated as no" + + "rebalance tasks.", + e); + } + } + + public void registerRebalance( + String rebalanceId, Map<TableBucket, RebalancePlanForBucket> rebalancePlan) { + checkNotClosed(); + registerTime = System.currentTimeMillis(); + // Register to zookeeper first. + try { + // first clear all exists tasks. + ongoingRebalanceTasks.clear(); + ongoingRebalanceTasksQueue.clear(); + finishedRebalanceTasks.clear(); + + Optional<RebalancePlan> existPlanOpt = zkClient.getRebalancePlan(); + if (!existPlanOpt.isPresent()) { + zkClient.registerRebalancePlan( + new RebalancePlan(rebalanceId, NOT_STARTED, rebalancePlan)); + } else { + RebalancePlan existPlan = existPlanOpt.get(); + if (FINAL_STATUSES.contains(existPlan.getRebalanceStatus())) { + zkClient.updateRebalancePlan( + new RebalancePlan( + rebalanceId, NOT_STARTED, existPlanOpt.get().getExecutePlan())); + } else { + throw new RebalanceFailureException( + "Rebalance task already exists. Please wait for it to finish or cancel it first."); Review Comment: Access of [element](1) annotated with VisibleForTesting found in production code. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java: ########## @@ -1092,10 +1137,335 @@ private RemoveServerTagResponse processRemoveServerTag(RemoveServerTagEvent even // Then update coordinatorContext. serverIds.forEach(coordinatorContext::removeServerTag); + LOG.info("Server tag {} removed for servers {}.", serverTag, serverIds); return removeServerTagResponse; } + private RebalanceResponse processRebalance(RebalanceEvent rebalanceEvent) { + boolean isDryRun = rebalanceEvent.isDryRun(); + RebalancePlan rebalancePlan; + try { + rebalancePlan = + rebalanceManager.generateRebalancePlan(rebalanceEvent.getGoalsByPriority()); + } catch (Exception e) { + throw new RebalanceFailureException("Failed to generate rebalance plan.", e); Review Comment: Access of [element](1) annotated with VisibleForTesting found in production code. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/ReplicaDistributionAbstractGoal.java: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance.goal; + +import org.apache.fluss.exception.RebalanceFailureException; +import org.apache.fluss.server.coordinator.rebalance.ReBalancingAction; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; +import org.apache.fluss.server.coordinator.rebalance.model.ServerModel; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Set; + +import static org.apache.fluss.server.coordinator.rebalance.ActionAcceptance.ACCEPT; +import static org.apache.fluss.server.coordinator.rebalance.goal.GoalUtils.aliveServersNotExcludeForReplicaMove; + +/** An abstract class for goals that are based on the distribution of replicas. */ +public abstract class ReplicaDistributionAbstractGoal extends AbstractGoal { + private static final Logger LOG = + LoggerFactory.getLogger(ReplicaDistributionAbstractGoal.class); + private static final double BALANCE_MARGIN = 0.9; + protected final Set<Integer> serverIdsAboveRebalanceUpperLimit; + protected final Set<Integer> serverIdsBelowRebalanceLowerLimit; + protected double avgReplicasOnAliveServer; + protected int rebalanceUpperLimit; + protected int rebalanceLowerLimit; + // This is used to identify servers not excluded for replica moves. + protected Set<Integer> serversAllowedReplicaRemove; + + public ReplicaDistributionAbstractGoal() { + serverIdsAboveRebalanceUpperLimit = new HashSet<>(); + serverIdsBelowRebalanceLowerLimit = new HashSet<>(); + } + + private int rebalanceUpperLimit(double balancePercentage) { + return (int) + Math.ceil( + avgReplicasOnAliveServer + * (1 + adjustedRebalancePercentage(balancePercentage))); + } + + private int rebalanceLowerLimit(double balancePercentage) { + return (int) + Math.floor( + avgReplicasOnAliveServer + * Math.max( + 0, (1 - adjustedRebalancePercentage(balancePercentage)))); + } + + private double adjustedRebalancePercentage(double rebalancePercentage) { + return (rebalancePercentage - 1) * BALANCE_MARGIN; + } + + boolean isReplicaCountUnderBalanceUpperLimitAfterChange( + ServerModel server, int currentReplicaCount, ChangeType changeType) { + int serverBalanceUpperLimit = server.isAlive() ? rebalanceUpperLimit : 0; + + return changeType == ChangeType.ADD + ? currentReplicaCount + 1 <= serverBalanceUpperLimit + : currentReplicaCount - 1 <= serverBalanceUpperLimit; + } + + boolean isReplicaCountAboveBalanceLowerLimitAfterChange( + ServerModel server, int currentReplicaCount, ChangeType changeType) { + int serverBalanceLowerLimit = server.isAlive() ? rebalanceLowerLimit : 0; + + return changeType == ChangeType.ADD + ? currentReplicaCount + 1 >= serverBalanceLowerLimit + : currentReplicaCount - 1 >= serverBalanceLowerLimit; + } + + @Override + public boolean isHardGoal() { + return false; + } + + @Override + protected void initGoalState(ClusterModel clusterModel) throws RebalanceFailureException { + serversAllowedReplicaRemove = aliveServersNotExcludeForReplicaMove(clusterModel); + if (serversAllowedReplicaRemove.isEmpty()) { + throw new RebalanceFailureException( + String.format( + "[%s] All alive tabletServers are excluded from replica moves.", + name())); Review Comment: Access of [element](1) annotated with VisibleForTesting found in production code. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/AbstractGoal.java: ########## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance.goal; + +import org.apache.fluss.exception.RebalanceFailureException; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.server.coordinator.rebalance.ActionAcceptance; +import org.apache.fluss.server.coordinator.rebalance.ActionType; +import org.apache.fluss.server.coordinator.rebalance.ReBalancingAction; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModelStats; +import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel; +import org.apache.fluss.server.coordinator.rebalance.model.ServerModel; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; + +import static org.apache.fluss.server.coordinator.rebalance.ActionAcceptance.ACCEPT; +import static org.apache.fluss.server.coordinator.rebalance.ActionType.LEADERSHIP_MOVEMENT; +import static org.apache.fluss.server.coordinator.rebalance.ActionType.REPLICA_MOVEMENT; +import static org.apache.fluss.server.coordinator.rebalance.goal.GoalOptimizerUtils.isProposalAcceptableForOptimizedGoals; +import static org.apache.fluss.server.coordinator.rebalance.goal.GoalUtils.legitMove; + +/** An abstract class for goals. */ +public abstract class AbstractGoal implements Goal { + private static final Logger LOG = LoggerFactory.getLogger(AbstractGoal.class); + protected boolean finished; + protected boolean succeeded; + + public AbstractGoal() { + finished = false; + succeeded = true; + } + + @Override + public void optimize(ClusterModel clusterModel, Set<Goal> optimizedGoals) { + LOG.debug("Starting Optimizing for goal {}", name()); + // Initialize pre-optimized stats. + ClusterModelStats statsBeforeOptimization = clusterModel.getClusterStats(); + LOG.trace("[PRE - {}] {}", name(), statsBeforeOptimization); + finished = false; + long goalStartTime = System.currentTimeMillis(); + initGoalState(clusterModel); + SortedSet<ServerModel> offlineServers = clusterModel.offlineServers(); + + while (!finished) { + for (ServerModel server : serversToBalance(clusterModel)) { + rebalanceForServer(server, clusterModel, optimizedGoals); + } + updateGoalState(clusterModel); + } + + ClusterModelStats statsAfterOptimization = clusterModel.getClusterStats(); + LOG.trace("[POST - {}] {}", name(), statsAfterOptimization); Review Comment: Default toString(): ClusterModelStats inherits toString() from Object, and so is not suitable for printing. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/AbstractGoal.java: ########## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance.goal; + +import org.apache.fluss.exception.RebalanceFailureException; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.server.coordinator.rebalance.ActionAcceptance; +import org.apache.fluss.server.coordinator.rebalance.ActionType; +import org.apache.fluss.server.coordinator.rebalance.ReBalancingAction; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModelStats; +import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel; +import org.apache.fluss.server.coordinator.rebalance.model.ServerModel; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; + +import static org.apache.fluss.server.coordinator.rebalance.ActionAcceptance.ACCEPT; +import static org.apache.fluss.server.coordinator.rebalance.ActionType.LEADERSHIP_MOVEMENT; +import static org.apache.fluss.server.coordinator.rebalance.ActionType.REPLICA_MOVEMENT; +import static org.apache.fluss.server.coordinator.rebalance.goal.GoalOptimizerUtils.isProposalAcceptableForOptimizedGoals; +import static org.apache.fluss.server.coordinator.rebalance.goal.GoalUtils.legitMove; + +/** An abstract class for goals. */ +public abstract class AbstractGoal implements Goal { + private static final Logger LOG = LoggerFactory.getLogger(AbstractGoal.class); + protected boolean finished; + protected boolean succeeded; + + public AbstractGoal() { + finished = false; + succeeded = true; + } + + @Override + public void optimize(ClusterModel clusterModel, Set<Goal> optimizedGoals) { + LOG.debug("Starting Optimizing for goal {}", name()); + // Initialize pre-optimized stats. + ClusterModelStats statsBeforeOptimization = clusterModel.getClusterStats(); + LOG.trace("[PRE - {}] {}", name(), statsBeforeOptimization); Review Comment: Default toString(): ClusterModelStats inherits toString() from Object, and so is not suitable for printing. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java: ########## @@ -1092,10 +1137,335 @@ private RemoveServerTagResponse processRemoveServerTag(RemoveServerTagEvent even // Then update coordinatorContext. serverIds.forEach(coordinatorContext::removeServerTag); + LOG.info("Server tag {} removed for servers {}.", serverTag, serverIds); return removeServerTagResponse; } + private RebalanceResponse processRebalance(RebalanceEvent rebalanceEvent) { + boolean isDryRun = rebalanceEvent.isDryRun(); + RebalancePlan rebalancePlan; + try { + rebalancePlan = + rebalanceManager.generateRebalancePlan(rebalanceEvent.getGoalsByPriority()); + } catch (Exception e) { + throw new RebalanceFailureException("Failed to generate rebalance plan.", e); + } + + if (!isDryRun) { + if (rebalanceManager.hasOngoingRebalance()) { + throw new RebalanceFailureException( + "Rebalance task already exists. Please wait for it to finish or cancel it first."); Review Comment: Access of [element](1) annotated with VisibleForTesting found in production code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
