platinumhamburg commented on code in PR #1452:
URL: https://github.com/apache/fluss/pull/1452#discussion_r2663736266


##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/Goal.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance.goal;
+
+import org.apache.fluss.exception.RebalanceFailureException;
+import org.apache.fluss.server.coordinator.rebalance.ActionAcceptance;
+import org.apache.fluss.server.coordinator.rebalance.ReBalancingAction;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModelStats;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.Set;
+
+/** This is the interface of the optimization goals used for rebalance. */
+public interface Goal {
+    Logger LOG = LoggerFactory.getLogger(Goal.class);
+
+    /**
+     * Optimize the given cluster model as needed for this goal.
+     *
+     * <p>The method will be given a cluster model. The goal can try to 
optimize the cluster model
+     * by performing some admin operations(e.g. move replicas or leader of 
tableBuckets).
+     *
+     * <p>During the optimization, the implementation should make sure that 
all the previously
+     * optimized goals are still satisfied after this method completes its 
execution. The
+     * implementation can use {@link #actionAcceptance(ReBalancingAction, 
ClusterModel)} to check
+     * whether an admin operation is allowed by a previously optimized goals.
+     *
+     * <p>The implementation of a soft goal should return a boolean indicating 
whether the goal has
+     * been met after the optimization or not.
+     *
+     * <p>The implementation of a hard goal should throw an {@link 
RebalanceFailureException} when
+     * the goal cannot be met. This will then fail the entire optimization 
attempt.
+     */
+    void optimize(ClusterModel clusterModel, Set<Goal> optimizedGoals);
+
+    /**
+     * Check whether the given action is acceptable by this goal in the given 
state of the cluster.
+     * An action is (1) accepted by a goal if it satisfies requirements of the 
goal, or (2) rejected
+     * by a goal if it violates its requirements. The return value indicates 
whether the action is
+     * accepted or why it is rejected.
+     */
+    ActionAcceptance actionAcceptance(ReBalancingAction action, ClusterModel 
clusterModel);
+
+    /**
+     * Get an instance of {@link ClusterModelStatsComparator} for this goal.
+     *
+     * <p>The {@link ClusterModelStatsComparator#compare(ClusterModelStats, 
ClusterModelStats)}
+     * method should give a preference between two {@link ClusterModelStats}.
+     *
+     * <p>The returned value must not be null.
+     *
+     * @return An instance of {@link ClusterModelStatsComparator} for this 
goal.
+     */
+    ClusterModelStatsComparator clusterModelStatsComparator();
+
+    /**
+     * Signal for finishing the process for rebalance. It is intended to mark 
the goal optimization
+     * as finished and perform the memory clean up after the goal optimization.
+     */
+    void finish();
+
+    /**
+     * @return {@code true} if this is a hard goal, {@code false} otherwise.
+     */
+    boolean isHardGoal();

Review Comment:
   What is 'hard' goal?



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/Goal.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance.goal;
+
+import org.apache.fluss.exception.RebalanceFailureException;
+import org.apache.fluss.server.coordinator.rebalance.ActionAcceptance;
+import org.apache.fluss.server.coordinator.rebalance.ReBalancingAction;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModelStats;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.Set;
+
+/** This is the interface of the optimization goals used for rebalance. */
+public interface Goal {
+    Logger LOG = LoggerFactory.getLogger(Goal.class);
+
+    /**
+     * Optimize the given cluster model as needed for this goal.
+     *
+     * <p>The method will be given a cluster model. The goal can try to 
optimize the cluster model
+     * by performing some admin operations(e.g. move replicas or leader of 
tableBuckets).
+     *
+     * <p>During the optimization, the implementation should make sure that 
all the previously
+     * optimized goals are still satisfied after this method completes its 
execution. The
+     * implementation can use {@link #actionAcceptance(ReBalancingAction, 
ClusterModel)} to check
+     * whether an admin operation is allowed by a previously optimized goals.
+     *
+     * <p>The implementation of a soft goal should return a boolean indicating 
whether the goal has
+     * been met after the optimization or not.
+     *
+     * <p>The implementation of a hard goal should throw an {@link 
RebalanceFailureException} when
+     * the goal cannot be met. This will then fail the entire optimization 
attempt.
+     */
+    void optimize(ClusterModel clusterModel, Set<Goal> optimizedGoals);
+
+    /**
+     * Check whether the given action is acceptable by this goal in the given 
state of the cluster.
+     * An action is (1) accepted by a goal if it satisfies requirements of the 
goal, or (2) rejected
+     * by a goal if it violates its requirements. The return value indicates 
whether the action is
+     * accepted or why it is rejected.
+     */
+    ActionAcceptance actionAcceptance(ReBalancingAction action, ClusterModel 
clusterModel);
+
+    /**
+     * Get an instance of {@link ClusterModelStatsComparator} for this goal.
+     *
+     * <p>The {@link ClusterModelStatsComparator#compare(ClusterModelStats, 
ClusterModelStats)}
+     * method should give a preference between two {@link ClusterModelStats}.
+     *
+     * <p>The returned value must not be null.
+     *
+     * @return An instance of {@link ClusterModelStatsComparator} for this 
goal.
+     */
+    ClusterModelStatsComparator clusterModelStatsComparator();
+
+    /**
+     * Signal for finishing the process for rebalance. It is intended to mark 
the goal optimization
+     * as finished and perform the memory clean up after the goal optimization.
+     */
+    void finish();
+
+    /**
+     * @return {@code true} if this is a hard goal, {@code false} otherwise.
+     */
+    boolean isHardGoal();
+
+    /**
+     * @return The name of this goal. Name of a goal provides an 
identification for the goal in
+     *     human-readable format.
+     */
+    String name();
+
+    /**
+     * A comparator that compares two cluster model stats.
+     *
+     * <p>Note: this comparator imposes orderings that are inconsistent with 
equals.
+     */
+    interface ClusterModelStatsComparator extends 
Comparator<ClusterModelStats>, Serializable {

Review Comment:
   Should be placed in ClusterModelStats



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/Goal.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance.goal;
+
+import org.apache.fluss.exception.RebalanceFailureException;
+import org.apache.fluss.server.coordinator.rebalance.ActionAcceptance;
+import org.apache.fluss.server.coordinator.rebalance.ReBalancingAction;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModelStats;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.Set;
+
+/** This is the interface of the optimization goals used for rebalance. */
+public interface Goal {
+    Logger LOG = LoggerFactory.getLogger(Goal.class);
+
+    /**
+     * Optimize the given cluster model as needed for this goal.
+     *
+     * <p>The method will be given a cluster model. The goal can try to 
optimize the cluster model
+     * by performing some admin operations(e.g. move replicas or leader of 
tableBuckets).
+     *
+     * <p>During the optimization, the implementation should make sure that 
all the previously
+     * optimized goals are still satisfied after this method completes its 
execution. The
+     * implementation can use {@link #actionAcceptance(ReBalancingAction, 
ClusterModel)} to check
+     * whether an admin operation is allowed by a previously optimized goals.
+     *
+     * <p>The implementation of a soft goal should return a boolean indicating 
whether the goal has
+     * been met after the optimization or not.
+     *
+     * <p>The implementation of a hard goal should throw an {@link 
RebalanceFailureException} when
+     * the goal cannot be met. This will then fail the entire optimization 
attempt.
+     */
+    void optimize(ClusterModel clusterModel, Set<Goal> optimizedGoals);

Review Comment:
   The optimize method should be placed in the GoalOptimizer.



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalOptimizerUtils.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance.goal;
+
+import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.coordinator.rebalance.ActionAcceptance;
+import org.apache.fluss.server.coordinator.rebalance.ReBalancingAction;
+import org.apache.fluss.server.coordinator.rebalance.model.BucketModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.fluss.server.coordinator.rebalance.ActionAcceptance.ACCEPT;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/** An util class for {@link GoalOptimizer}. */
+public class GoalOptimizerUtils {
+
+    public static final double EPSILON = 1E-5;
+
+    /** Check whether the given proposal is acceptable for all the given 
optimized goals. */
+    public static ActionAcceptance isProposalAcceptableForOptimizedGoals(
+            Set<Goal> optimizedGoals, ReBalancingAction action, ClusterModel 
cluster) {
+        for (Goal goal : optimizedGoals) {
+            ActionAcceptance acceptance = goal.actionAcceptance(action, 
cluster);
+            if (acceptance != ACCEPT) {
+                return acceptance;
+            }
+        }
+        return ACCEPT;
+    }
+
+    /**
+     * Compare the given values.
+     *
+     * <pre>
+     *     1. Return 1 if first
+     *     2. -1 if first
+     *     3. 0 otherwise.
+     * </pre>
+     */
+    public static int compare(double d1, double d2, double epsilon) {
+        if (d2 - d1 > epsilon) {
+            // Second value is larger than the first value.
+            return -1;
+        }
+        if (d1 - d2 > epsilon) {
+            // First value is larger than the second value.
+            return 1;
+        }
+        // Given values are approximately equal.
+        return 0;
+    }
+
+    /**
+     * Get whether there is any diff represented by a set of rebalance plan to 
move from the initial
+     * to final distribution.
+     */
+    public static boolean hasDiff(
+            Map<TableBucket, List<Integer>> initialReplicaDistribution,
+            Map<TableBucket, Integer> initialLeaderDistribution,
+            ClusterModel optimizedCluster) {
+        Map<TableBucket, List<Integer>> finalReplicaDistribution =
+                optimizedCluster.getReplicaDistribution();
+        sanityCheckReplicaDistribution(initialReplicaDistribution, 
finalReplicaDistribution);
+
+        boolean hasDiff = false;
+        for (Map.Entry<TableBucket, List<Integer>> entry : 
initialReplicaDistribution.entrySet()) {
+            TableBucket tableBucket = entry.getKey();
+            List<Integer> initialReplicas = entry.getValue();
+            List<Integer> finalReplicas = 
finalReplicaDistribution.get(tableBucket);
+
+            if (!finalReplicas.equals(initialReplicas)) {
+                hasDiff = true;
+                break;
+            } else {
+                BucketModel bucket = optimizedCluster.bucket(tableBucket);
+                checkNotNull(bucket, "Bucket is not in the cluster.");
+                ReplicaModel finalLeaderReplica = bucket.leader();
+                checkNotNull(finalLeaderReplica, "Leader replica is not in the 
bucket.");
+                Integer finalLeader = finalLeaderReplica.server().id();
+                if 
(!initialLeaderDistribution.get(tableBucket).equals(finalLeader)) {
+                    hasDiff = true;
+                    break;
+                }
+                // The bucket has no change.
+            }
+        }
+        return hasDiff;
+    }
+
+    /**
+     * Get the diff represented by the set of rebalance plan for bucket to 
move from initial to
+     * final distribution.
+     */
+    public static List<RebalancePlanForBucket> getDiff(
+            Map<TableBucket, List<Integer>> initialReplicaDistribution,

Review Comment:
   ditoo
   



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalOptimizerUtils.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance.goal;
+
+import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.coordinator.rebalance.ActionAcceptance;
+import org.apache.fluss.server.coordinator.rebalance.ReBalancingAction;
+import org.apache.fluss.server.coordinator.rebalance.model.BucketModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.fluss.server.coordinator.rebalance.ActionAcceptance.ACCEPT;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/** An util class for {@link GoalOptimizer}. */
+public class GoalOptimizerUtils {
+
+    public static final double EPSILON = 1E-5;
+
+    /** Check whether the given proposal is acceptable for all the given 
optimized goals. */
+    public static ActionAcceptance isProposalAcceptableForOptimizedGoals(
+            Set<Goal> optimizedGoals, ReBalancingAction action, ClusterModel 
cluster) {
+        for (Goal goal : optimizedGoals) {
+            ActionAcceptance acceptance = goal.actionAcceptance(action, 
cluster);
+            if (acceptance != ACCEPT) {
+                return acceptance;
+            }
+        }
+        return ACCEPT;
+    }
+
+    /**
+     * Compare the given values.
+     *
+     * <pre>
+     *     1. Return 1 if first
+     *     2. -1 if first
+     *     3. 0 otherwise.
+     * </pre>
+     */
+    public static int compare(double d1, double d2, double epsilon) {
+        if (d2 - d1 > epsilon) {
+            // Second value is larger than the first value.
+            return -1;
+        }
+        if (d1 - d2 > epsilon) {
+            // First value is larger than the second value.
+            return 1;
+        }
+        // Given values are approximately equal.
+        return 0;
+    }
+
+    /**
+     * Get whether there is any diff represented by a set of rebalance plan to 
move from the initial
+     * to final distribution.
+     */
+    public static boolean hasDiff(
+            Map<TableBucket, List<Integer>> initialReplicaDistribution,

Review Comment:
   As a best practice, hasDiff and getDiff should not be implemented as two 
separate interfaces to avoid redundant computations.
   
   



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalOptimizerUtils.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance.goal;
+
+import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.coordinator.rebalance.ActionAcceptance;
+import org.apache.fluss.server.coordinator.rebalance.ReBalancingAction;
+import org.apache.fluss.server.coordinator.rebalance.model.BucketModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.fluss.server.coordinator.rebalance.ActionAcceptance.ACCEPT;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/** An util class for {@link GoalOptimizer}. */
+public class GoalOptimizerUtils {
+
+    public static final double EPSILON = 1E-5;
+
+    /** Check whether the given proposal is acceptable for all the given 
optimized goals. */
+    public static ActionAcceptance isProposalAcceptableForOptimizedGoals(
+            Set<Goal> optimizedGoals, ReBalancingAction action, ClusterModel 
cluster) {
+        for (Goal goal : optimizedGoals) {
+            ActionAcceptance acceptance = goal.actionAcceptance(action, 
cluster);
+            if (acceptance != ACCEPT) {
+                return acceptance;
+            }
+        }
+        return ACCEPT;
+    }
+
+    /**
+     * Compare the given values.
+     *
+     * <pre>
+     *     1. Return 1 if first
+     *     2. -1 if first
+     *     3. 0 otherwise.
+     * </pre>
+     */
+    public static int compare(double d1, double d2, double epsilon) {

Review Comment:
   This is a general-purpose comparison function and should be placed in a 
location that facilitates better reuse.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to