xianjingfeng commented on code in PR #1897:
URL: 
https://github.com/apache/incubator-uniffle/pull/1897#discussion_r1697793765


##########
coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java:
##########
@@ -216,10 +224,54 @@ private void parseExcludeNodesFile(DataInputStream 
fsDataInputStream) throws IOE
         }
       }
     }
-    // update exclude nodes and last modify time
-    excludeNodes = nodes;
-    LOG.info(
-        "Updated exclude nodes and {} nodes were marked as exclude nodes", 
excludeNodes.size());
+    return nodes;
+  }
+
+  private void writeExcludedNodes2File(List<String> excludedNodes) throws 
IOException {
+    if (hadoopFileSystem != null) {
+      Path hadoopPath = new Path(excludedNodesPath);
+      FileStatus fileStatus = hadoopFileSystem.getFileStatus(hadoopPath);
+      if (fileStatus != null && fileStatus.isFile()) {
+        String tempExcludedNodesPath = excludedNodesPath.concat("_tmp");
+        Path tmpHadoopPath = new Path(tempExcludedNodesPath);
+        if (!hadoopFileSystem.exists(tmpHadoopPath)) {
+          hadoopFileSystem.create(tmpHadoopPath);
+        }
+        try (BufferedWriter bufferedWriter =
+            new BufferedWriter(
+                new OutputStreamWriter(
+                    hadoopFileSystem.create(tmpHadoopPath, true), 
StandardCharsets.UTF_8))) {
+          for (String excludedNode : excludedNodes) {
+            bufferedWriter.write(excludedNode);
+            bufferedWriter.newLine();
+          }
+        }
+        hadoopFileSystem.delete(hadoopPath);
+        hadoopFileSystem.rename(tmpHadoopPath, hadoopPath);
+      }
+    }
+  }
+
+  private synchronized boolean putInExcludedNodesFile(List<String> 
excludedNodes)
+      throws IOException {
+    if (hadoopFileSystem != null) {
+      Path hadoopPath = new Path(excludedNodesPath);
+      FileStatus fileStatus = hadoopFileSystem.getFileStatus(hadoopPath);
+      if (fileStatus != null && fileStatus.isFile()) {
+        // Obtains the existing excluded node.
+        Set<String> alreadyExistExcludedNodes =
+            parseExcludedNodesFile(hadoopFileSystem.open(hadoopPath));
+        List<String> newAddExcludedNodes =
+            excludedNodes.stream()
+                .filter(node -> !alreadyExistExcludedNodes.contains(node))
+                .collect(Collectors.toList());
+        newAddExcludedNodes.addAll(alreadyExistExcludedNodes);
+        // Writes to the new excluded node.
+        writeExcludedNodes2File(newAddExcludedNodes);
+        return true;
+      }
+    }
+    return false;

Review Comment:
   ```suggestion
       if (hadoopFileSystem == null) {
         return false;
       }
       Set<String> alreadyExistExcludedNodes =
           parseExcludedNodesFile(new Path(excludedNodesPath));
       List<String> newExcludedNodes = 
           excludedNodes.stream()
               .filter(node -> !alreadyExistExcludedNodes.contains(node))
               .collect(Collectors.toList());
       newExcludedNodes.addAll(alreadyExistExcludedNodes);
       // Writes to the new excluded node.
       writeExcludedNodes2File(newAddExcludedNodes);
       return true;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to