This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-0.2
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.2 by this push:
new cb48944b3 [CELEBORN-507][0.2] Don't set up worker endpoint when update
meta
cb48944b3 is described below
commit cb48944b3ac865ef4ee1fc9bfc6d7cc525e0fae0
Author: Shuang <[email protected]>
AuthorDate: Wed Jun 7 22:26:34 2023 +0800
[CELEBORN-507][0.2] Don't set up worker endpoint when update meta
### What changes were proposed in this pull request?
see https://github.com/apache/incubator-celeborn/pull/1412
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #1558 from RexXiong/branch-0.2-CELEBORN-507.
Authored-by: Shuang <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../apache/celeborn/common/meta/WorkerInfo.scala | 1 +
.../celeborn/common/meta/WorkerInfoSuite.scala | 8 +++----
.../master/clustermeta/AbstractMetaManager.java | 26 +++++++---------------
.../celeborn/service/deploy/master/Master.scala | 17 +++-----------
4 files changed, 16 insertions(+), 36 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
index f1b605ba7..8f9801308 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
@@ -254,6 +254,7 @@ class WorkerInfo(
|ReplicatePort: $replicatePort
|SlotsUsed: $slots
|LastHeartbeat: $lastHeartbeat
+ |HeartBeatElapsedSeconds: ${(System.currentTimeMillis() -
lastHeartbeat) / 1000}
|Disks: $diskInfosString
|UserResourceConsumption: $userResourceConsumptionString
|WorkerRef: $endpoint
diff --git
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
index 74d814bef..6ad26dc1a 100644
---
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
+++
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
@@ -310,9 +310,9 @@ class WorkerInfoSuite extends RssFunSuite {
println(worker3)
println(worker4)
- assertEquals(exp1, worker1.toString)
- assertEquals(exp2, worker2.toString)
- assertEquals(exp3, worker3.toString)
- assertEquals(exp4, worker4.toString)
+ assertEquals(exp1,
worker1.toString.replaceAll("HeartBeatElapsedSeconds:.*\n", ""))
+ assertEquals(exp2,
worker2.toString.replaceAll("HeartBeatElapsedSeconds:.*\n", ""))
+ assertEquals(exp3,
worker3.toString.replaceAll("HeartBeatElapsedSeconds:.*\n", ""))
+ assertEquals(exp4,
worker4.toString.replaceAll("HeartBeatElapsedSeconds:.*\n", ""))
}
}
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 19c3ce674..61ba9bcb8 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -17,9 +17,10 @@
package org.apache.celeborn.service.deploy.master.clustermeta;
-import static org.apache.celeborn.common.protocol.RpcNameConstants.WORKER_EP;
-
-import java.io.*;
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
@@ -37,10 +38,12 @@ import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.identity.UserIdentifier;
-import org.apache.celeborn.common.meta.*;
+import org.apache.celeborn.common.meta.AppDiskUsageMetric;
+import org.apache.celeborn.common.meta.AppDiskUsageSnapShot;
+import org.apache.celeborn.common.meta.DiskInfo;
+import org.apache.celeborn.common.meta.WorkerInfo;
import org.apache.celeborn.common.protocol.PbSnapshotMetaInfo;
import org.apache.celeborn.common.quota.ResourceConsumption;
-import org.apache.celeborn.common.rpc.RpcAddress;
import org.apache.celeborn.common.rpc.RpcEnv;
import org.apache.celeborn.common.util.PbSerDeUtils;
import org.apache.celeborn.common.util.Utils;
@@ -223,19 +226,6 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
userResourceConsumption,
null);
workerInfo.lastHeartbeat_$eq(System.currentTimeMillis());
-
- try {
- workerInfo.setupEndpoint(rpcEnv.setupEndpointRef(RpcAddress.apply(host,
rpcPort), WORKER_EP));
- } catch (Exception e) {
- LOG.warn("Worker register setupEndpoint failed {}, will retry", e);
- try {
- workerInfo.setupEndpoint(
- rpcEnv.setupEndpointRef(RpcAddress.apply(host, rpcPort),
WORKER_EP));
- } catch (Exception e1) {
- workerInfo.setupEndpoint(null);
- }
- }
-
workerInfo.updateDiskMaxSlots(estimatedPartitionSize);
synchronized (workers) {
if (!workers.contains(workerInfo)) {
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index c027aca83..301703f5b 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -650,22 +650,11 @@ private[celeborn] class Master(
override def getWorkerInfo: String = {
val sb = new StringBuilder
+ sb.append("====================== Workers Info in Master
===========================")
+
workersSnapShot.asScala.foreach { w =>
- sb.append("==========WorkerInfos in Master==========\n")
+ sb.append(s"${w.toUniqueId().padTo(50, " ").mkString}\n")
sb.append(w).append("\n")
-
- val workerInfo = requestGetWorkerInfos(w.endpoint)
- .workerInfos.asJava
- .get(0)
-
- sb.append("==========WorkerInfos in Workers==========\n")
- sb.append(workerInfo).append("\n")
-
- if (w.hasSameInfoWith(workerInfo)) {
- sb.append("Consist!").append("\n")
- } else {
- sb.append("[ERROR] Inconsistent!").append("\n")
- }
}
sb.toString()