swuferhong commented on code in PR #1452:
URL: https://github.com/apache/fluss/pull/1452#discussion_r2664527095


##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModel.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance.model;
+
+import org.apache.fluss.metadata.TableBucket;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+/**
+ * A class that holds the information of the cluster for rebalance.The 
information including live
+ * tabletServers, bucket distribution, tabletServer tag etc.
+ *
+ * <p>Currently, the clusterModel can only be created by a rebalance request. 
It's used as the input
+ * of the GoalOptimizer to generate the rebalance plan for load rebalance.
+ */
+public class ClusterModel {
+    // TODO ClusterModel can be implemented in incremental mode, dynamically 
modified when there are
+    // events such as table create, table delete, server offline, etc. 
Currently designed to read
+    // coordinatorContext and generate it directly
+
+    private final Map<String, RackModel> racksById;
+    private final Map<Integer, RackModel> serverIdToRack;
+    private final Set<ServerModel> aliveServers;
+    private final SortedSet<ServerModel> offlineServers;
+    private final SortedSet<ServerModel> servers;
+    private final Map<TableBucket, BucketModel> bucketsByTableBucket;
+
+    public ClusterModel(SortedSet<ServerModel> servers) {
+        this.servers = servers;
+        this.bucketsByTableBucket = new HashMap<>();
+
+        this.aliveServers = new HashSet<>();
+        this.offlineServers = new TreeSet<>();
+        for (ServerModel serverModel : servers) {
+            if (serverModel.isAlive()) {
+                aliveServers.add(serverModel);
+            } else {
+                offlineServers.add(serverModel);
+            }
+        }
+
+        this.racksById = new HashMap<>();
+        this.serverIdToRack = new HashMap<>();
+        for (ServerModel serverModel : servers) {
+            RackModel rackModel = 
racksById.computeIfAbsent(serverModel.rack(), RackModel::new);
+            rackModel.addServer(serverModel);
+            serverIdToRack.put(serverModel.id(), rackModel);
+        }
+    }
+
+    public SortedSet<ServerModel> offlineServers() {
+        return offlineServers;
+    }
+
+    public SortedSet<ServerModel> servers() {
+        return servers;
+    }
+
+    public Set<ServerModel> aliveServers() {
+        return Collections.unmodifiableSet(aliveServers);
+    }
+
+    public @Nullable BucketModel bucket(TableBucket tableBucket) {
+        return bucketsByTableBucket.get(tableBucket);
+    }
+
+    public RackModel rack(String rack) {
+        return racksById.get(rack);
+    }
+
+    public @Nullable ServerModel server(int serverId) {
+        RackModel rack = serverIdToRack.get(serverId);
+        return rack == null ? null : rack.server(serverId);
+    }
+
+    /** Populate the analysis stats with this cluster. */
+    public ClusterModelStats getClusterStats() {
+        return (new ClusterModelStats()).populate(this);
+    }
+
+    public int numReplicas() {
+        return bucketsByTableBucket.values().stream().mapToInt(p -> 
p.replicas().size()).sum();
+    }
+
+    public int numLeaderReplicas() {
+        return bucketsByTableBucket.size();

Review Comment:
   change to find these buckets whose leader not null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to