wuchong commented on code in PR #1258:
URL: https://github.com/apache/fluss/pull/1258#discussion_r2276337849


##########
fluss-client/src/main/java/com/alibaba/fluss/client/write/RecordAccumulator.java:
##########
@@ -283,16 +283,19 @@ public ReadyCheckResult ready(Cluster cluster) {
      *     the requested maxSize.
      */
     public Map<Integer, List<ReadyWriteBatch>> drain(
-            Cluster cluster, Set<ServerNode> nodes, int maxSize) throws 
Exception {
+            Cluster cluster, Set<Integer> nodes, int maxSize) throws Exception 
{
         if (nodes.isEmpty()) {
             return Collections.emptyMap();
         }
 
         Map<Integer, List<ReadyWriteBatch>> batches = new HashMap<>();
-        for (ServerNode node : nodes) {
+        for (Integer node : nodes) {
             List<ReadyWriteBatch> ready = drainBatchesForOneNode(cluster, 
node, maxSize);
             if (!ready.isEmpty()) {
-                batches.put(node.id(), ready);
+                List<ReadyWriteBatch> previous = batches.put(node, ready);
+                if (previous != null) {
+                    throw new IllegalStateException("Duplicate node in nodes 
when drain: " + node);
+                }

Review Comment:
   nit: the checking is redundant because `nodes` is a set, and there shouldn't 
be duplicates in it.



##########
fluss-client/src/main/java/com/alibaba/fluss/client/utils/MetadataUtils.java:
##########
@@ -153,6 +152,19 @@ public static Cluster sendMetadataRequestAndRebuildCluster(
                                 newPartitionIdByPath = 
newTableMetadata.partitionIdByPath;
                             }
 
+                            // update the leader id to check whether the 
leader is alive or not.
+                            newBucketLocations.forEach(
+                                    (physicalTablePath, bucketLocations) ->
+                                            bucketLocations.forEach(
+                                                    bucketLocation -> {
+                                                        Integer leader = 
bucketLocation.getLeader();
+                                                        if (leader != null
+                                                                && 
newAliveTabletServers.get(leader)
+                                                                        == 
null) {
+                                                            
bucketLocation.setLeader(null);

Review Comment:
   This changes `leader` into a mutable variable which is hack and may have 
concurrency problems in the future.  From my understanding, it's fine to have 
`leader` non-null when the corresponding TabletServer ServerNode is null. Could 
you change and verify it? 



##########
fluss-client/src/main/java/com/alibaba/fluss/client/write/RecordAccumulator.java:
##########
@@ -531,10 +534,10 @@ private long bucketReady(
 
     private long batchReady(
             boolean exhausted,
-            ServerNode leader,
+            Integer leader,
             long waitedTimeMs,
             boolean full,
-            Set<ServerNode> readyNodes,
+            Set<Integer> readyNodes,
             long nextReadyCheckDelayMs) {
         if (!readyNodes.contains(leader)) {

Review Comment:
   Add `leader != null` check before the `contains` check, or change the 
`leader` parameter into `int` type. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to