platinumhamburg commented on code in PR #1452: URL: https://github.com/apache/fluss/pull/1452#discussion_r2663736266
########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/Goal.java: ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance.goal; + +import org.apache.fluss.exception.RebalanceFailureException; +import org.apache.fluss.server.coordinator.rebalance.ActionAcceptance; +import org.apache.fluss.server.coordinator.rebalance.ReBalancingAction; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModelStats; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Comparator; +import java.util.Set; + +/** This is the interface of the optimization goals used for rebalance. */ +public interface Goal { + Logger LOG = LoggerFactory.getLogger(Goal.class); + + /** + * Optimize the given cluster model as needed for this goal. + * + * <p>The method will be given a cluster model. The goal can try to optimize the cluster model + * by performing some admin operations(e.g. move replicas or leader of tableBuckets). + * + * <p>During the optimization, the implementation should make sure that all the previously + * optimized goals are still satisfied after this method completes its execution. The + * implementation can use {@link #actionAcceptance(ReBalancingAction, ClusterModel)} to check + * whether an admin operation is allowed by a previously optimized goals. + * + * <p>The implementation of a soft goal should return a boolean indicating whether the goal has + * been met after the optimization or not. + * + * <p>The implementation of a hard goal should throw an {@link RebalanceFailureException} when + * the goal cannot be met. This will then fail the entire optimization attempt. + */ + void optimize(ClusterModel clusterModel, Set<Goal> optimizedGoals); + + /** + * Check whether the given action is acceptable by this goal in the given state of the cluster. + * An action is (1) accepted by a goal if it satisfies requirements of the goal, or (2) rejected + * by a goal if it violates its requirements. The return value indicates whether the action is + * accepted or why it is rejected. + */ + ActionAcceptance actionAcceptance(ReBalancingAction action, ClusterModel clusterModel); + + /** + * Get an instance of {@link ClusterModelStatsComparator} for this goal. + * + * <p>The {@link ClusterModelStatsComparator#compare(ClusterModelStats, ClusterModelStats)} + * method should give a preference between two {@link ClusterModelStats}. + * + * <p>The returned value must not be null. + * + * @return An instance of {@link ClusterModelStatsComparator} for this goal. + */ + ClusterModelStatsComparator clusterModelStatsComparator(); + + /** + * Signal for finishing the process for rebalance. It is intended to mark the goal optimization + * as finished and perform the memory clean up after the goal optimization. + */ + void finish(); + + /** + * @return {@code true} if this is a hard goal, {@code false} otherwise. + */ + boolean isHardGoal(); Review Comment: What is 'hard' goal? ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/Goal.java: ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance.goal; + +import org.apache.fluss.exception.RebalanceFailureException; +import org.apache.fluss.server.coordinator.rebalance.ActionAcceptance; +import org.apache.fluss.server.coordinator.rebalance.ReBalancingAction; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModelStats; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Comparator; +import java.util.Set; + +/** This is the interface of the optimization goals used for rebalance. */ +public interface Goal { + Logger LOG = LoggerFactory.getLogger(Goal.class); + + /** + * Optimize the given cluster model as needed for this goal. + * + * <p>The method will be given a cluster model. The goal can try to optimize the cluster model + * by performing some admin operations(e.g. move replicas or leader of tableBuckets). + * + * <p>During the optimization, the implementation should make sure that all the previously + * optimized goals are still satisfied after this method completes its execution. The + * implementation can use {@link #actionAcceptance(ReBalancingAction, ClusterModel)} to check + * whether an admin operation is allowed by a previously optimized goals. + * + * <p>The implementation of a soft goal should return a boolean indicating whether the goal has + * been met after the optimization or not. + * + * <p>The implementation of a hard goal should throw an {@link RebalanceFailureException} when + * the goal cannot be met. This will then fail the entire optimization attempt. + */ + void optimize(ClusterModel clusterModel, Set<Goal> optimizedGoals); + + /** + * Check whether the given action is acceptable by this goal in the given state of the cluster. + * An action is (1) accepted by a goal if it satisfies requirements of the goal, or (2) rejected + * by a goal if it violates its requirements. The return value indicates whether the action is + * accepted or why it is rejected. + */ + ActionAcceptance actionAcceptance(ReBalancingAction action, ClusterModel clusterModel); + + /** + * Get an instance of {@link ClusterModelStatsComparator} for this goal. + * + * <p>The {@link ClusterModelStatsComparator#compare(ClusterModelStats, ClusterModelStats)} + * method should give a preference between two {@link ClusterModelStats}. + * + * <p>The returned value must not be null. + * + * @return An instance of {@link ClusterModelStatsComparator} for this goal. + */ + ClusterModelStatsComparator clusterModelStatsComparator(); + + /** + * Signal for finishing the process for rebalance. It is intended to mark the goal optimization + * as finished and perform the memory clean up after the goal optimization. + */ + void finish(); + + /** + * @return {@code true} if this is a hard goal, {@code false} otherwise. + */ + boolean isHardGoal(); + + /** + * @return The name of this goal. Name of a goal provides an identification for the goal in + * human-readable format. + */ + String name(); + + /** + * A comparator that compares two cluster model stats. + * + * <p>Note: this comparator imposes orderings that are inconsistent with equals. + */ + interface ClusterModelStatsComparator extends Comparator<ClusterModelStats>, Serializable { Review Comment: Should be placed in ClusterModelStats ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/Goal.java: ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance.goal; + +import org.apache.fluss.exception.RebalanceFailureException; +import org.apache.fluss.server.coordinator.rebalance.ActionAcceptance; +import org.apache.fluss.server.coordinator.rebalance.ReBalancingAction; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModelStats; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Comparator; +import java.util.Set; + +/** This is the interface of the optimization goals used for rebalance. */ +public interface Goal { + Logger LOG = LoggerFactory.getLogger(Goal.class); + + /** + * Optimize the given cluster model as needed for this goal. + * + * <p>The method will be given a cluster model. The goal can try to optimize the cluster model + * by performing some admin operations(e.g. move replicas or leader of tableBuckets). + * + * <p>During the optimization, the implementation should make sure that all the previously + * optimized goals are still satisfied after this method completes its execution. The + * implementation can use {@link #actionAcceptance(ReBalancingAction, ClusterModel)} to check + * whether an admin operation is allowed by a previously optimized goals. + * + * <p>The implementation of a soft goal should return a boolean indicating whether the goal has + * been met after the optimization or not. + * + * <p>The implementation of a hard goal should throw an {@link RebalanceFailureException} when + * the goal cannot be met. This will then fail the entire optimization attempt. + */ + void optimize(ClusterModel clusterModel, Set<Goal> optimizedGoals); Review Comment: The optimize method should be placed in the GoalOptimizer. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalOptimizerUtils.java: ########## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance.goal; + +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.server.coordinator.rebalance.ActionAcceptance; +import org.apache.fluss.server.coordinator.rebalance.ReBalancingAction; +import org.apache.fluss.server.coordinator.rebalance.model.BucketModel; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; +import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.fluss.server.coordinator.rebalance.ActionAcceptance.ACCEPT; +import static org.apache.fluss.utils.Preconditions.checkNotNull; + +/** An util class for {@link GoalOptimizer}. */ +public class GoalOptimizerUtils { + + public static final double EPSILON = 1E-5; + + /** Check whether the given proposal is acceptable for all the given optimized goals. */ + public static ActionAcceptance isProposalAcceptableForOptimizedGoals( + Set<Goal> optimizedGoals, ReBalancingAction action, ClusterModel cluster) { + for (Goal goal : optimizedGoals) { + ActionAcceptance acceptance = goal.actionAcceptance(action, cluster); + if (acceptance != ACCEPT) { + return acceptance; + } + } + return ACCEPT; + } + + /** + * Compare the given values. + * + * <pre> + * 1. Return 1 if first + * 2. -1 if first + * 3. 0 otherwise. + * </pre> + */ + public static int compare(double d1, double d2, double epsilon) { + if (d2 - d1 > epsilon) { + // Second value is larger than the first value. + return -1; + } + if (d1 - d2 > epsilon) { + // First value is larger than the second value. + return 1; + } + // Given values are approximately equal. + return 0; + } + + /** + * Get whether there is any diff represented by a set of rebalance plan to move from the initial + * to final distribution. + */ + public static boolean hasDiff( + Map<TableBucket, List<Integer>> initialReplicaDistribution, + Map<TableBucket, Integer> initialLeaderDistribution, + ClusterModel optimizedCluster) { + Map<TableBucket, List<Integer>> finalReplicaDistribution = + optimizedCluster.getReplicaDistribution(); + sanityCheckReplicaDistribution(initialReplicaDistribution, finalReplicaDistribution); + + boolean hasDiff = false; + for (Map.Entry<TableBucket, List<Integer>> entry : initialReplicaDistribution.entrySet()) { + TableBucket tableBucket = entry.getKey(); + List<Integer> initialReplicas = entry.getValue(); + List<Integer> finalReplicas = finalReplicaDistribution.get(tableBucket); + + if (!finalReplicas.equals(initialReplicas)) { + hasDiff = true; + break; + } else { + BucketModel bucket = optimizedCluster.bucket(tableBucket); + checkNotNull(bucket, "Bucket is not in the cluster."); + ReplicaModel finalLeaderReplica = bucket.leader(); + checkNotNull(finalLeaderReplica, "Leader replica is not in the bucket."); + Integer finalLeader = finalLeaderReplica.server().id(); + if (!initialLeaderDistribution.get(tableBucket).equals(finalLeader)) { + hasDiff = true; + break; + } + // The bucket has no change. + } + } + return hasDiff; + } + + /** + * Get the diff represented by the set of rebalance plan for bucket to move from initial to + * final distribution. + */ + public static List<RebalancePlanForBucket> getDiff( + Map<TableBucket, List<Integer>> initialReplicaDistribution, Review Comment: ditoo ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalOptimizerUtils.java: ########## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance.goal; + +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.server.coordinator.rebalance.ActionAcceptance; +import org.apache.fluss.server.coordinator.rebalance.ReBalancingAction; +import org.apache.fluss.server.coordinator.rebalance.model.BucketModel; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; +import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.fluss.server.coordinator.rebalance.ActionAcceptance.ACCEPT; +import static org.apache.fluss.utils.Preconditions.checkNotNull; + +/** An util class for {@link GoalOptimizer}. */ +public class GoalOptimizerUtils { + + public static final double EPSILON = 1E-5; + + /** Check whether the given proposal is acceptable for all the given optimized goals. */ + public static ActionAcceptance isProposalAcceptableForOptimizedGoals( + Set<Goal> optimizedGoals, ReBalancingAction action, ClusterModel cluster) { + for (Goal goal : optimizedGoals) { + ActionAcceptance acceptance = goal.actionAcceptance(action, cluster); + if (acceptance != ACCEPT) { + return acceptance; + } + } + return ACCEPT; + } + + /** + * Compare the given values. + * + * <pre> + * 1. Return 1 if first + * 2. -1 if first + * 3. 0 otherwise. + * </pre> + */ + public static int compare(double d1, double d2, double epsilon) { + if (d2 - d1 > epsilon) { + // Second value is larger than the first value. + return -1; + } + if (d1 - d2 > epsilon) { + // First value is larger than the second value. + return 1; + } + // Given values are approximately equal. + return 0; + } + + /** + * Get whether there is any diff represented by a set of rebalance plan to move from the initial + * to final distribution. + */ + public static boolean hasDiff( + Map<TableBucket, List<Integer>> initialReplicaDistribution, Review Comment: As a best practice, hasDiff and getDiff should not be implemented as two separate interfaces to avoid redundant computations. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalOptimizerUtils.java: ########## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance.goal; + +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.server.coordinator.rebalance.ActionAcceptance; +import org.apache.fluss.server.coordinator.rebalance.ReBalancingAction; +import org.apache.fluss.server.coordinator.rebalance.model.BucketModel; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; +import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.fluss.server.coordinator.rebalance.ActionAcceptance.ACCEPT; +import static org.apache.fluss.utils.Preconditions.checkNotNull; + +/** An util class for {@link GoalOptimizer}. */ +public class GoalOptimizerUtils { + + public static final double EPSILON = 1E-5; + + /** Check whether the given proposal is acceptable for all the given optimized goals. */ + public static ActionAcceptance isProposalAcceptableForOptimizedGoals( + Set<Goal> optimizedGoals, ReBalancingAction action, ClusterModel cluster) { + for (Goal goal : optimizedGoals) { + ActionAcceptance acceptance = goal.actionAcceptance(action, cluster); + if (acceptance != ACCEPT) { + return acceptance; + } + } + return ACCEPT; + } + + /** + * Compare the given values. + * + * <pre> + * 1. Return 1 if first + * 2. -1 if first + * 3. 0 otherwise. + * </pre> + */ + public static int compare(double d1, double d2, double epsilon) { Review Comment: This is a general-purpose comparison function and should be placed in a location that facilitates better reuse. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
