This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-0.2
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.2 by this push:
     new cb48944b3 [CELEBORN-507][0.2] Don't set up worker endpoint when update 
meta
cb48944b3 is described below

commit cb48944b3ac865ef4ee1fc9bfc6d7cc525e0fae0
Author: Shuang <[email protected]>
AuthorDate: Wed Jun 7 22:26:34 2023 +0800

    [CELEBORN-507][0.2] Don't set up worker endpoint when update meta
    
    ### What changes were proposed in this pull request?
    see https://github.com/apache/incubator-celeborn/pull/1412
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #1558 from RexXiong/branch-0.2-CELEBORN-507.
    
    Authored-by: Shuang <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../apache/celeborn/common/meta/WorkerInfo.scala   |  1 +
 .../celeborn/common/meta/WorkerInfoSuite.scala     |  8 +++----
 .../master/clustermeta/AbstractMetaManager.java    | 26 +++++++---------------
 .../celeborn/service/deploy/master/Master.scala    | 17 +++-----------
 4 files changed, 16 insertions(+), 36 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala 
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
index f1b605ba7..8f9801308 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
@@ -254,6 +254,7 @@ class WorkerInfo(
        |ReplicatePort: $replicatePort
        |SlotsUsed: $slots
        |LastHeartbeat: $lastHeartbeat
+       |HeartBeatElapsedSeconds: ${(System.currentTimeMillis() - 
lastHeartbeat) / 1000}
        |Disks: $diskInfosString
        |UserResourceConsumption: $userResourceConsumptionString
        |WorkerRef: $endpoint
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala 
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
index 74d814bef..6ad26dc1a 100644
--- 
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
+++ 
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
@@ -310,9 +310,9 @@ class WorkerInfoSuite extends RssFunSuite {
     println(worker3)
     println(worker4)
 
-    assertEquals(exp1, worker1.toString)
-    assertEquals(exp2, worker2.toString)
-    assertEquals(exp3, worker3.toString)
-    assertEquals(exp4, worker4.toString)
+    assertEquals(exp1, 
worker1.toString.replaceAll("HeartBeatElapsedSeconds:.*\n", ""))
+    assertEquals(exp2, 
worker2.toString.replaceAll("HeartBeatElapsedSeconds:.*\n", ""))
+    assertEquals(exp3, 
worker3.toString.replaceAll("HeartBeatElapsedSeconds:.*\n", ""))
+    assertEquals(exp4, 
worker4.toString.replaceAll("HeartBeatElapsedSeconds:.*\n", ""))
   }
 }
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 19c3ce674..61ba9bcb8 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -17,9 +17,10 @@
 
 package org.apache.celeborn.service.deploy.master.clustermeta;
 
-import static org.apache.celeborn.common.protocol.RpcNameConstants.WORKER_EP;
-
-import java.io.*;
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.List;
@@ -37,10 +38,12 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.identity.UserIdentifier;
-import org.apache.celeborn.common.meta.*;
+import org.apache.celeborn.common.meta.AppDiskUsageMetric;
+import org.apache.celeborn.common.meta.AppDiskUsageSnapShot;
+import org.apache.celeborn.common.meta.DiskInfo;
+import org.apache.celeborn.common.meta.WorkerInfo;
 import org.apache.celeborn.common.protocol.PbSnapshotMetaInfo;
 import org.apache.celeborn.common.quota.ResourceConsumption;
-import org.apache.celeborn.common.rpc.RpcAddress;
 import org.apache.celeborn.common.rpc.RpcEnv;
 import org.apache.celeborn.common.util.PbSerDeUtils;
 import org.apache.celeborn.common.util.Utils;
@@ -223,19 +226,6 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
             userResourceConsumption,
             null);
     workerInfo.lastHeartbeat_$eq(System.currentTimeMillis());
-
-    try {
-      workerInfo.setupEndpoint(rpcEnv.setupEndpointRef(RpcAddress.apply(host, 
rpcPort), WORKER_EP));
-    } catch (Exception e) {
-      LOG.warn("Worker register setupEndpoint failed {}, will retry", e);
-      try {
-        workerInfo.setupEndpoint(
-            rpcEnv.setupEndpointRef(RpcAddress.apply(host, rpcPort), 
WORKER_EP));
-      } catch (Exception e1) {
-        workerInfo.setupEndpoint(null);
-      }
-    }
-
     workerInfo.updateDiskMaxSlots(estimatedPartitionSize);
     synchronized (workers) {
       if (!workers.contains(workerInfo)) {
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index c027aca83..301703f5b 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -650,22 +650,11 @@ private[celeborn] class Master(
 
   override def getWorkerInfo: String = {
     val sb = new StringBuilder
+    sb.append("====================== Workers Info in Master 
===========================")
+
     workersSnapShot.asScala.foreach { w =>
-      sb.append("==========WorkerInfos in Master==========\n")
+      sb.append(s"${w.toUniqueId().padTo(50, " ").mkString}\n")
       sb.append(w).append("\n")
-
-      val workerInfo = requestGetWorkerInfos(w.endpoint)
-        .workerInfos.asJava
-        .get(0)
-
-      sb.append("==========WorkerInfos in Workers==========\n")
-      sb.append(workerInfo).append("\n")
-
-      if (w.hasSameInfoWith(workerInfo)) {
-        sb.append("Consist!").append("\n")
-      } else {
-        sb.append("[ERROR] Inconsistent!").append("\n")
-      }
     }
 
     sb.toString()

Reply via email to