This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 9b2b8a01e [CELEBORN-507] don't set up worker endpoint when update meta
and remove compare worker meta with workers (#1412)
9b2b8a01e is described below
commit 9b2b8a01ec3e6c348a82b4121e4ae58076fbd8ce
Author: Shuang <[email protected]>
AuthorDate: Fri Apr 7 11:46:24 2023 +0800
[CELEBORN-507] don't set up worker endpoint when update meta and remove
compare worker meta with workers (#1412)
---
.../apache/celeborn/common/meta/WorkerInfo.scala | 1 +
.../celeborn/common/meta/WorkerInfoSuite.scala | 8 +++----
.../master/clustermeta/AbstractMetaManager.java | 26 +++++++---------------
.../celeborn/service/deploy/master/Master.scala | 19 ++--------------
4 files changed, 15 insertions(+), 39 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
index 720f12b12..4ab40373b 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
@@ -255,6 +255,7 @@ class WorkerInfo(
|ReplicatePort: $replicatePort
|SlotsUsed: $slots
|LastHeartbeat: $lastHeartbeat
+ |HeartBeatElapsedSeconds: ${(System.currentTimeMillis() -
lastHeartbeat) / 1000}
|Disks: $diskInfosString
|UserResourceConsumption: $userResourceConsumptionString
|WorkerRef: $endpoint
diff --git
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
index ad75696eb..23f1a36c2 100644
---
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
+++
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
@@ -312,9 +312,9 @@ class WorkerInfoSuite extends CelebornFunSuite {
println(worker3)
println(worker4)
- assertEquals(exp1, worker1.toString)
- assertEquals(exp2, worker2.toString)
- assertEquals(exp3, worker3.toString)
- assertEquals(exp4, worker4.toString)
+ assertEquals(exp1,
worker1.toString.replaceAll("HeartBeatElapsedSeconds:.*\n", ""))
+ assertEquals(exp2,
worker2.toString.replaceAll("HeartBeatElapsedSeconds:.*\n", ""))
+ assertEquals(exp3,
worker3.toString.replaceAll("HeartBeatElapsedSeconds:.*\n", ""))
+ assertEquals(exp4,
worker4.toString.replaceAll("HeartBeatElapsedSeconds:.*\n", ""))
}
}
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index bbf0c3204..11411c218 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -17,9 +17,10 @@
package org.apache.celeborn.service.deploy.master.clustermeta;
-import static org.apache.celeborn.common.protocol.RpcNameConstants.WORKER_EP;
-
-import java.io.*;
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
@@ -37,10 +38,12 @@ import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.identity.UserIdentifier;
-import org.apache.celeborn.common.meta.*;
+import org.apache.celeborn.common.meta.AppDiskUsageMetric;
+import org.apache.celeborn.common.meta.AppDiskUsageSnapShot;
+import org.apache.celeborn.common.meta.DiskInfo;
+import org.apache.celeborn.common.meta.WorkerInfo;
import org.apache.celeborn.common.protocol.PbSnapshotMetaInfo;
import org.apache.celeborn.common.quota.ResourceConsumption;
-import org.apache.celeborn.common.rpc.RpcAddress;
import org.apache.celeborn.common.rpc.RpcEnv;
import org.apache.celeborn.common.util.JavaUtils;
import org.apache.celeborn.common.util.PbSerDeUtils;
@@ -230,19 +233,6 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
userResourceConsumption,
null);
workerInfo.lastHeartbeat_$eq(System.currentTimeMillis());
-
- try {
- workerInfo.setupEndpoint(rpcEnv.setupEndpointRef(RpcAddress.apply(host,
rpcPort), WORKER_EP));
- } catch (Exception e) {
- LOG.warn("Worker register setupEndpoint failed {}, will retry", e);
- try {
- workerInfo.setupEndpoint(
- rpcEnv.setupEndpointRef(RpcAddress.apply(host, rpcPort),
WORKER_EP));
- } catch (Exception e1) {
- workerInfo.setupEndpoint(null);
- }
- }
-
workerInfo.updateDiskMaxSlots(estimatedPartitionSize);
synchronized (workers) {
if (!workers.contains(workerInfo)) {
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 223f65d68..c8a81d11b 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -702,25 +702,10 @@ private[celeborn] class Master(
override def getWorkerInfo: String = {
val sb = new StringBuilder
sb.append("====================== Workers Info in Master
===========================")
+
workersSnapShot.asScala.foreach { w =>
- sb.append(s"${w.toUniqueId().padTo(50, " ").mkString}in Master\n")
+ sb.append(s"${w.toUniqueId().padTo(50, " ").mkString}\n")
sb.append(w).append("\n")
-
- sb.append("\n")
- val workerInfo = requestGetWorkerInfos(w.endpoint)
- .workerInfos.asJava
- .get(0)
-
- sb.append(s"${w.toUniqueId().padTo(50, " ").mkString}in Worker\n")
- sb.append(workerInfo).append("\n")
- sb.append("\n")
-
- if (w.hasSameInfoWith(workerInfo)) {
- sb.append(s"${w.toUniqueId().padTo(50, " ").mkString}status
consist!\n")
- } else {
- sb.append(s"${w.toUniqueId().padTo(50, " ").mkString}status not
consist!\n")
- }
-
sb.append("=====================================================================\n")
}
sb.toString()