This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new d5a1bcdb6 [CELEBORN-1256] Added internal port and auth support to
Celeborn worker
d5a1bcdb6 is described below
commit d5a1bcdb6def098efd9e2f13549c9e2319046021
Author: Chandni Singh <[email protected]>
AuthorDate: Thu Feb 29 10:09:22 2024 +0800
[CELEBORN-1256] Added internal port and auth support to Celeborn worker
### What changes were proposed in this pull request?
This adds an internal port and auth support to Celeborn Wokers.
1. Internal port is used by a worker to receive messages from Celeborn
Master.
2. Authentication support for secure communication with clients. This
change doesn't add the support in clients to communicate to the Workers
securely. That will be in a future change.
This change targets just adding the port and auth support to Worker. The
following items from the proposal are still pending:
- Persisting the app secrets in Ratis.
- Forwarding secrets to Workers and having ability for the workers to pull
registration info from the Master.
- Secured communication between workers and clients.
### Why are the changes needed?
It is needed for adding authentication support to Celeborn
([CELEBORN-1011](https://issues.apache.org/jira/browse/CELEBORN-1011))
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
Part of a bigger change. For this change, only modified existing UTs.
Closes #2292 from otterc/CELEBORN-1256.
Authored-by: Chandni Singh <[email protected]>
Signed-off-by: waitinfuture <[email protected]>
---
.../celeborn/common/network/TransportContext.java | 4 +
.../celeborn/common/protocol/RpcNameConstants.java | 2 +
common/src/main/proto/TransportMessages.proto | 2 +
.../org/apache/celeborn/common/CelebornConf.scala | 19 +++--
.../apache/celeborn/common/meta/WorkerInfo.scala | 30 ++++++-
.../common/protocol/message/ControlMessages.scala | 2 +
.../celeborn/common/rpc/netty/NettyRpcEnv.scala | 2 +-
.../apache/celeborn/common/util/PbSerDeUtils.scala | 2 +
.../celeborn/common/meta/WorkerInfoSuite.scala | 92 ++++++++++++++++-----
.../celeborn/common/util/PbSerDeUtilsTest.scala | 20 ++++-
docs/configuration/worker.md | 1 +
.../master/clustermeta/AbstractMetaManager.java | 12 ++-
.../master/clustermeta/IMetadataHandler.java | 1 +
.../deploy/master/clustermeta/MetaUtil.java | 4 +-
.../clustermeta/SingleMasterMetaManager.java | 10 ++-
.../master/clustermeta/ha/HAMasterMetaManager.java | 2 +
.../deploy/master/clustermeta/ha/MetaHandler.java | 5 +-
master/src/main/proto/Resource.proto | 2 +
.../celeborn/service/deploy/master/Master.scala | 8 ++
.../master/SlotsAllocatorRackAwareSuiteJ.java | 14 ++--
.../deploy/master/SlotsAllocatorSuiteJ.java | 6 +-
.../clustermeta/DefaultMetaSystemSuiteJ.java | 39 +++++++++
.../clustermeta/ha/MasterStateMachineSuiteJ.java | 12 +--
.../ha/RatisMasterStatusSystemSuiteJ.java | 44 ++++++++++
.../deploy/master/AppDiskUsageMetricSuite.scala | 6 +-
.../deploy/worker/InternalRpcEndpoint.scala | 43 ++++++++++
.../celeborn/service/deploy/worker/Worker.scala | 96 +++++++++++++++++++---
.../service/deploy/worker/WorkerArguments.scala | 9 ++
28 files changed, 422 insertions(+), 67 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
b/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
index f1319d81e..488e0fd04 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
@@ -123,6 +123,10 @@ public class TransportContext {
return createServer(null, port, Collections.emptyList());
}
+ public TransportServer createServer(int port, List<TransportServerBootstrap>
bootstraps) {
+ return createServer(null, port, bootstraps);
+ }
+
/** For Suite only */
public TransportServer createServer() {
return createServer(null, 0, Collections.emptyList());
diff --git
a/common/src/main/java/org/apache/celeborn/common/protocol/RpcNameConstants.java
b/common/src/main/java/org/apache/celeborn/common/protocol/RpcNameConstants.java
index 7be02b160..1d10c50ca 100644
---
a/common/src/main/java/org/apache/celeborn/common/protocol/RpcNameConstants.java
+++
b/common/src/main/java/org/apache/celeborn/common/protocol/RpcNameConstants.java
@@ -28,8 +28,10 @@ public class RpcNameConstants {
// For Worker
public static String WORKER_SYS = "Worker";
+ public static String WORKER_INTERNAL_SYS = "WorkerInternal";
// Worker Endpoint Name
public static String WORKER_EP = "WorkerEndpoint";
+ public static String WORKER_INTERNAL_EP = "WorkerInternalEndpoint";
// For LifecycleManager
public static String LIFECYCLE_MANAGER_SYS = "LifecycleManager";
diff --git a/common/src/main/proto/TransportMessages.proto
b/common/src/main/proto/TransportMessages.proto
index 8d7f2d83b..9547e08f0 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -164,6 +164,7 @@ message PbWorkerInfo {
int32 replicatePort = 5;
repeated PbDiskInfo disks = 6;
map<string, PbResourceConsumption> userResourceConsumption = 7;
+ int32 internalPort = 8;
}
message PbFileGroup {
@@ -179,6 +180,7 @@ message PbRegisterWorker {
repeated PbDiskInfo disks = 6;
string requestId = 9;
map<string, PbResourceConsumption> userResourceConsumption = 8;
+ int32 internalPort = 10;
}
message PbHeartbeatFromWorker {
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 583a63571..475ac0cd2 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1155,17 +1155,19 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
}
}
- // //////////////////////////////////////////////////////
- // Rack Resolver //
- // //////////////////////////////////////////////////////
- def rackResolverRefreshInterval = get(RACKRESOLVER_REFRESH_INTERVAL)
-
def haMasterNodeInternalPort(nodeId: String): Int = {
val key = HA_MASTER_NODE_INTERNAL_PORT.key.replace("<id>", nodeId)
getInt(key, HA_MASTER_NODE_INTERNAL_PORT.defaultValue.get)
}
def masterInternalPort: Int = get(MASTER_INTERNAL_PORT)
+
+ def workerInternalPort: Int = get(WORKER_INTERNAL_PORT)
+
+ // //////////////////////////////////////////////////////
+ // Rack Resolver //
+ // //////////////////////////////////////////////////////
+ def rackResolverRefreshInterval = get(RACKRESOLVER_REFRESH_INTERVAL)
}
object CelebornConf extends Logging {
@@ -4609,4 +4611,11 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30s")
+ val WORKER_INTERNAL_PORT: ConfigEntry[Int] =
+ buildConf("celeborn.worker.internal.port")
+ .categories("worker")
+ .doc("Internal server port on the Worker where the master nodes
connect.")
+ .version("0.5.0")
+ .intConf
+ .createWithDefault(0)
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
index 220663e59..629417d25 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
@@ -36,6 +36,7 @@ class WorkerInfo(
val pushPort: Int,
val fetchPort: Int,
val replicatePort: Int,
+ val internalPort: Int,
_diskInfos: util.Map[String, DiskInfo],
_userResourceConsumption: util.Map[UserIdentifier, ResourceConsumption])
extends Serializable
with Logging {
@@ -50,13 +51,37 @@ class WorkerInfo(
else null
var endpoint: RpcEndpointRef = null
- def this(host: String, rpcPort: Int, pushPort: Int, fetchPort: Int,
replicatePort: Int) {
+ def this(
+ host: String,
+ rpcPort: Int,
+ pushPort: Int,
+ fetchPort: Int,
+ replicatePort: Int) = {
this(
host,
rpcPort,
pushPort,
fetchPort,
replicatePort,
+ -1,
+ new util.HashMap[String, DiskInfo](),
+ new util.HashMap[UserIdentifier, ResourceConsumption]())
+ }
+
+ def this(
+ host: String,
+ rpcPort: Int,
+ pushPort: Int,
+ fetchPort: Int,
+ replicatePort: Int,
+ internalPort: Int) = {
+ this(
+ host,
+ rpcPort,
+ pushPort,
+ fetchPort,
+ replicatePort,
+ internalPort,
new util.HashMap[String, DiskInfo](),
new util.HashMap[UserIdentifier, ResourceConsumption]())
}
@@ -128,7 +153,7 @@ class WorkerInfo(
def readableAddress(): String = {
s"Host:$host:RpcPort:$rpcPort:PushPort:$pushPort:" +
- s"FetchPort:$fetchPort:ReplicatePort:$replicatePort"
+ s"FetchPort:$fetchPort:ReplicatePort:$replicatePort:$internalPort"
}
def toUniqueId(): String = {
@@ -232,6 +257,7 @@ class WorkerInfo(
|PushPort: $pushPort
|FetchPort: $fetchPort
|ReplicatePort: $replicatePort
+ |InternalPort: $internalPort
|SlotsUsed: $slots
|LastHeartbeat: $lastHeartbeat
|HeartbeatElapsedSeconds: ${TimeUnit.MILLISECONDS.toSeconds(
diff --git
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index fed0cd95d..07573e648 100644
---
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -86,6 +86,7 @@ object ControlMessages extends Logging {
pushPort: Int,
fetchPort: Int,
replicatePort: Int,
+ internalPort: Int,
disks: Map[String, DiskInfo],
userResourceConsumption: Map[UserIdentifier, ResourceConsumption],
requestId: String): PbRegisterWorker = {
@@ -98,6 +99,7 @@ object ControlMessages extends Logging {
.setPushPort(pushPort)
.setFetchPort(fetchPort)
.setReplicatePort(replicatePort)
+ .setInternalPort(internalPort)
.addAllDisks(pbDisks)
.putAllUserResourceConsumption(pbUserResourceConsumption)
.setRequestId(requestId)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
index c2f5bcc35..824935051 100644
---
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
@@ -143,7 +143,7 @@ class NettyRpcEnv(
}
override def setupEndpoint(name: String, endpoint: RpcEndpoint):
RpcEndpointRef = {
- if (name == RpcNameConstants.WORKER_EP) {
+ if (name == RpcNameConstants.WORKER_EP || name ==
RpcNameConstants.WORKER_INTERNAL_EP) {
worker = endpoint
}
dispatcher.registerRpcEndpoint(name, endpoint)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index 6c4892c12..533238095 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -230,6 +230,7 @@ object PbSerDeUtils {
pbWorkerInfo.getPushPort,
pbWorkerInfo.getFetchPort,
pbWorkerInfo.getReplicatePort,
+ pbWorkerInfo.getInternalPort,
disks,
userResourceConsumption)
}
@@ -246,6 +247,7 @@ object PbSerDeUtils {
.setFetchPort(workerInfo.fetchPort)
.setPushPort(workerInfo.pushPort)
.setReplicatePort(workerInfo.replicatePort)
+ .setInternalPort(workerInfo.internalPort)
.addAllDisks(pbDiskInfos)
if (!eliminateUserResourceConsumption) {
builder.putAllUserResourceConsumption(
diff --git
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
index b74c8e6e7..d8c737244 100644
---
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
+++
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
@@ -53,9 +53,17 @@ class WorkerInfoSuite extends CelebornFunSuite {
pushPort: Int,
fetchPort: Int,
replicatePort: Int,
+ internalPort: Int,
workerInfos: jMap[WorkerInfo, util.Map[String, Integer]],
allocationMap: util.Map[String, Integer]): Unit = {
- val worker = new WorkerInfo(host, rpcPort, pushPort, fetchPort,
replicatePort)
+ val worker =
+ new WorkerInfo(
+ host,
+ rpcPort,
+ pushPort,
+ fetchPort,
+ replicatePort,
+ internalPort)
val realWorker = workerInfos.get(worker)
assertNotNull(s"Worker $worker didn't exist.", realWorker)
}
@@ -70,7 +78,15 @@ class WorkerInfoSuite extends CelebornFunSuite {
JavaUtils.newConcurrentHashMap[UserIdentifier, ResourceConsumption]()
userResourceConsumption.put(UserIdentifier("tenant1", "name1"),
ResourceConsumption(1, 1, 1, 1))
val worker =
- new WorkerInfo("localhost", 10000, 10001, 10002, 10003, disks,
userResourceConsumption)
+ new WorkerInfo(
+ "localhost",
+ 10000,
+ 10001,
+ 10002,
+ 10003,
+ 10004,
+ disks,
+ userResourceConsumption)
val allocatedSlots = new AtomicInteger(0)
val shuffleKey = "appId-1"
@@ -136,32 +152,42 @@ class WorkerInfoSuite extends CelebornFunSuite {
}
test("WorkerInfo not equals when host different.") {
- val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null)
- val worker2 = new WorkerInfo("h2", 10001, 10002, 10003, 1000, null, null)
+ val worker1 =
+ new WorkerInfo("h1", 10001, 10002, 10003, 1000, 10004, null, null)
+ val worker2 =
+ new WorkerInfo("h2", 10001, 10002, 10003, 1000, 10004, null, null)
assertNotEquals(worker1, worker2)
}
test("WorkerInfo not equals when rpc port different.") {
- val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null)
- val worker2 = new WorkerInfo("h1", 20001, 10002, 10003, 1000, null, null)
+ val worker1 =
+ new WorkerInfo("h1", 10001, 10002, 10003, 1000, 10004, null, null)
+ val worker2 =
+ new WorkerInfo("h1", 20001, 10002, 10003, 1000, 10004, null, null)
assertNotEquals(worker1, worker2)
}
test("WorkerInfo not equals when push port different.") {
- val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null)
- val worker2 = new WorkerInfo("h1", 10001, 20002, 10003, 1000, null, null)
+ val worker1 =
+ new WorkerInfo("h1", 10001, 10002, 10003, 1000, 10004, null, null)
+ val worker2 =
+ new WorkerInfo("h1", 10001, 20002, 10003, 1000, 10004, null, null)
assertNotEquals(worker1, worker2)
}
test("WorkerInfo not equals when fetch port different.") {
- val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null)
- val worker2 = new WorkerInfo("h1", 10001, 10002, 20003, 1000, null, null)
+ val worker1 =
+ new WorkerInfo("h1", 10001, 10002, 10003, 1000, 10004, null, null)
+ val worker2 =
+ new WorkerInfo("h1", 10001, 10002, 20003, 1000, 10004, null, null)
assertNotEquals(worker1, worker2)
}
test("WorkerInfo not equals when replicate port different.") {
- val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null)
- val worker2 = new WorkerInfo("h1", 10001, 10002, 10003, 2000, null, null)
+ val worker1 =
+ new WorkerInfo("h1", 10001, 10002, 10003, 1000, 10004, null, null)
+ val worker2 =
+ new WorkerInfo("h1", 10001, 10002, 10003, 2000, 10004, null, null)
assertNotEquals(worker1, worker2)
}
@@ -172,9 +198,11 @@ class WorkerInfoSuite extends CelebornFunSuite {
10002,
10003,
1000,
+ 10004,
new util.HashMap[String, DiskInfo](),
null)
- val worker2 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null)
+ val worker2 =
+ new WorkerInfo("h1", 10001, 10002, 10003, 1000, 10004, null, null)
assertEquals(worker1, worker2)
}
@@ -185,21 +213,26 @@ class WorkerInfoSuite extends CelebornFunSuite {
10002,
10003,
1000,
+ 10004,
null,
new util.HashMap[UserIdentifier, ResourceConsumption]())
- val worker2 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null)
+ val worker2 =
+ new WorkerInfo("h1", 10001, 10002, 10003, 1000, 10004, null, null)
assertEquals(worker1, worker2)
}
test("WorkerInfo equals when endpoint different") {
- val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null)
- val worker2 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null)
+ val worker1 =
+ new WorkerInfo("h1", 10001, 10002, 10003, 1000, 10004, null, null)
+ val worker2 =
+ new WorkerInfo("h1", 10001, 10002, 10003, 1000, 10004, null, null)
assertEquals(worker1, worker2)
}
test("WorkerInfo toString output") {
- val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000)
- val worker2 = new WorkerInfo("h2", 20001, 20002, 20003, 2000, null, null)
+ val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, 10004)
+ val worker2 =
+ new WorkerInfo("h2", 20001, 20002, 20003, 2000, 20004, null, null)
val worker3 = new WorkerInfo(
"h3",
@@ -207,6 +240,7 @@ class WorkerInfoSuite extends CelebornFunSuite {
30002,
30003,
3000,
+ 30004,
new util.HashMap[String, DiskInfo](),
null)
@@ -240,6 +274,7 @@ class WorkerInfoSuite extends CelebornFunSuite {
40002,
40003,
4000,
+ 40004,
disks,
userResourceConsumption)
@@ -251,6 +286,7 @@ class WorkerInfoSuite extends CelebornFunSuite {
|PushPort: 10002
|FetchPort: 10003
|ReplicatePort: 1000
+ |InternalPort: 10004
|SlotsUsed: 0
|LastHeartbeat: 0
|Disks: empty
@@ -265,6 +301,7 @@ class WorkerInfoSuite extends CelebornFunSuite {
|PushPort: 20002
|FetchPort: 20003
|ReplicatePort: 2000
+ |InternalPort: 20004
|SlotsUsed: 0
|LastHeartbeat: 0
|Disks: empty
@@ -278,6 +315,7 @@ class WorkerInfoSuite extends CelebornFunSuite {
|PushPort: 30002
|FetchPort: 30003
|ReplicatePort: 3000
+ |InternalPort: 30004
|SlotsUsed: 0
|LastHeartbeat: 0
|Disks: empty
@@ -291,6 +329,7 @@ class WorkerInfoSuite extends CelebornFunSuite {
|PushPort: 40002
|FetchPort: 40003
|ReplicatePort: 4000
+ |InternalPort: 40004
|SlotsUsed: 60
|LastHeartbeat: 0
|Disks: $placeholder
@@ -336,10 +375,23 @@ class WorkerInfoSuite extends CelebornFunSuite {
val pushPort = Random.nextInt(65536)
val fetchPort = Random.nextInt(65536)
val replicatePort = Random.nextInt(65536)
- val workerInfo = new WorkerInfo(host, rpcPort, pushPort, fetchPort,
replicatePort)
+ val internalPort = Random.nextInt(65536)
+ val workerInfo =
+ new WorkerInfo(
+ host,
+ rpcPort,
+ pushPort,
+ fetchPort,
+ replicatePort,
+ internalPort)
// origin hashCode() logic
- val state = Seq(host, rpcPort, pushPort, fetchPort, replicatePort)
+ val state = Seq(
+ host,
+ rpcPort,
+ pushPort,
+ fetchPort,
+ replicatePort)
val originHash = state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
val hashCode1 = workerInfo.hashCode()
diff --git
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
index 15b15ce7f..e5d18e071 100644
---
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
+++
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
@@ -86,9 +86,25 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
userResourceConsumption.put(userIdentifier2, resourceConsumption2)
val workerInfo1 =
- new WorkerInfo("localhost", 1001, 1002, 1003, 1004, diskInfos,
userResourceConsumption)
+ new WorkerInfo(
+ "localhost",
+ 1001,
+ 1002,
+ 1003,
+ 1004,
+ 1005,
+ diskInfos,
+ userResourceConsumption)
val workerInfo2 =
- new WorkerInfo("localhost", 2001, 2002, 2003, 2004, diskInfos,
userResourceConsumption)
+ new WorkerInfo(
+ "localhost",
+ 2001,
+ 2002,
+ 2003,
+ 2004,
+ 2005,
+ diskInfos,
+ userResourceConsumption)
val partitionLocation1 =
new PartitionLocation(0, 0, "host1", 10, 9, 8, 14,
PartitionLocation.Mode.REPLICA)
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index c4fb02bd8..fc9f6b713 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -83,6 +83,7 @@ license: |
| celeborn.worker.graceful.shutdown.timeout | 600s | false | The worker's
graceful shutdown timeout time. | 0.2.0 | |
| celeborn.worker.http.host | <localhost> | false | Worker's http host.
| 0.4.0 |
celeborn.metrics.worker.prometheus.host,celeborn.worker.metrics.prometheus.host
|
| celeborn.worker.http.port | 9096 | false | Worker's http port. | 0.4.0 |
celeborn.metrics.worker.prometheus.port,celeborn.worker.metrics.prometheus.port
|
+| celeborn.worker.internal.port | 0 | false | Internal server port on the
Worker where the master nodes connect. | 0.5.0 | |
| celeborn.worker.jvmQuake.check.interval | 1s | false | Interval of gc
behavior checking for worker jvm quake. | 0.4.0 | |
| celeborn.worker.jvmQuake.dump.enabled | true | false | Whether to heap dump
for the maximum GC 'deficit' during worker jvm quake. | 0.4.0 | |
| celeborn.worker.jvmQuake.dump.path | <tmp>/jvm-quake/dump/<pid>
| false | The path of heap dump for the maximum GC 'deficit' during worker jvm
quake. | 0.4.0 | |
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index abe19ec13..86b34b258 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -175,7 +175,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
boolean highWorkload) {
WorkerInfo worker =
new WorkerInfo(
- host, rpcPort, pushPort, fetchPort, replicatePort, disks,
userResourceConsumption);
+ host, rpcPort, pushPort, fetchPort, replicatePort, -1, disks,
userResourceConsumption);
AtomicLong availableSlots = new AtomicLong();
LOG.debug("update worker {}:{} heartbeat {}", host, rpcPort, disks);
synchronized (workers) {
@@ -219,11 +219,19 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
int pushPort,
int fetchPort,
int replicatePort,
+ int internalPort,
Map<String, DiskInfo> disks,
Map<UserIdentifier, ResourceConsumption> userResourceConsumption) {
WorkerInfo workerInfo =
new WorkerInfo(
- host, rpcPort, pushPort, fetchPort, replicatePort, disks,
userResourceConsumption);
+ host,
+ rpcPort,
+ pushPort,
+ fetchPort,
+ replicatePort,
+ internalPort,
+ disks,
+ userResourceConsumption);
workerInfo.lastHeartbeat_$eq(System.currentTimeMillis());
workerInfo.networkLocation_$eq(rackResolver.resolve(host).getNetworkLocation());
workerInfo.updateDiskMaxSlots(estimatedPartitionSize);
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
index 04fd294f4..fd87768b4 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
@@ -71,6 +71,7 @@ public interface IMetadataHandler {
int pushPort,
int fetchPort,
int replicatePort,
+ int internalPort,
Map<String, DiskInfo> disks,
Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
String requestId);
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/MetaUtil.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/MetaUtil.java
index c44fb0107..6878e047c 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/MetaUtil.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/MetaUtil.java
@@ -40,7 +40,8 @@ public class MetaUtil {
address.getRpcPort(),
address.getPushPort(),
address.getFetchPort(),
- address.getReplicatePort());
+ address.getReplicatePort(),
+ address.getInternalPort());
}
public static ResourceProtos.WorkerAddress infoToAddr(WorkerInfo info) {
@@ -50,6 +51,7 @@ public class MetaUtil {
.setPushPort(info.pushPort())
.setFetchPort(info.fetchPort())
.setReplicatePort(info.replicatePort())
+ .setInternalPort(info.internalPort())
.build();
}
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
index 4cd589d0b..eff63a634 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
@@ -134,11 +134,19 @@ public class SingleMasterMetaManager extends
AbstractMetaManager {
int pushPort,
int fetchPort,
int replicatePort,
+ int internalPort,
Map<String, DiskInfo> disks,
Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
String requestId) {
updateRegisterWorkerMeta(
- host, rpcPort, pushPort, fetchPort, replicatePort, disks,
userResourceConsumption);
+ host,
+ rpcPort,
+ pushPort,
+ fetchPort,
+ replicatePort,
+ internalPort,
+ disks,
+ userResourceConsumption);
}
@Override
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
index d772d6b1b..77c31807a 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
@@ -290,6 +290,7 @@ public class HAMasterMetaManager extends
AbstractMetaManager {
int pushPort,
int fetchPort,
int replicatePort,
+ int internalPort,
Map<String, DiskInfo> disks,
Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
String requestId) {
@@ -305,6 +306,7 @@ public class HAMasterMetaManager extends
AbstractMetaManager {
.setPushPort(pushPort)
.setFetchPort(fetchPort)
.setReplicatePort(replicatePort)
+ .setInternalPort(internalPort)
.putAllDisks(MetaUtil.toPbDiskInfos(disks))
.putAllUserResourceConsumption(
MetaUtil.toPbUserResourceConsumption(userResourceConsumption))
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
index d9905dcd0..37d7797a0 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
@@ -213,17 +213,19 @@ public class MetaHandler {
pushPort = request.getRegisterWorkerRequest().getPushPort();
fetchPort = request.getRegisterWorkerRequest().getFetchPort();
replicatePort =
request.getRegisterWorkerRequest().getReplicatePort();
+ int internalPort =
request.getRegisterWorkerRequest().getInternalPort();
diskInfos =
MetaUtil.fromPbDiskInfos(request.getRegisterWorkerRequest().getDisksMap());
userResourceConsumption =
MetaUtil.fromPbUserResourceConsumption(
request.getRegisterWorkerRequest().getUserResourceConsumptionMap());
LOG.debug(
- "Handle worker register for {} {} {} {} {} {} {}",
+ "Handle worker register for {} {} {} {} {} {} {} {}",
host,
rpcPort,
pushPort,
fetchPort,
replicatePort,
+ internalPort,
diskInfos,
userResourceConsumption);
metaSystem.updateRegisterWorkerMeta(
@@ -232,6 +234,7 @@ public class MetaHandler {
pushPort,
fetchPort,
replicatePort,
+ internalPort,
diskInfos,
userResourceConsumption);
break;
diff --git a/master/src/main/proto/Resource.proto
b/master/src/main/proto/Resource.proto
index 01635c187..91a69f536 100644
--- a/master/src/main/proto/Resource.proto
+++ b/master/src/main/proto/Resource.proto
@@ -170,6 +170,7 @@ message RegisterWorkerRequest {
required int32 replicatePort = 5;
map<string, DiskInfo> disks = 6;
map<string, ResourceConsumption> userResourceConsumption = 7;
+ required int32 internalPort = 8;
}
message ReportWorkerUnavailableRequest {
@@ -191,6 +192,7 @@ message WorkerAddress {
required int32 pushPort = 3;
required int32 fetchPort = 4;
required int32 replicatePort = 5;
+ required int32 internalPort = 6;
}
message UserIdentifier {
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index d61a4ee51..33754a673 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -394,6 +394,7 @@ private[celeborn] class Master(
val pushPort = pbRegisterWorker.getPushPort
val fetchPort = pbRegisterWorker.getFetchPort
val replicatePort = pbRegisterWorker.getReplicatePort
+ val internalPort = pbRegisterWorker.getInternalPort
val disks = pbRegisterWorker.getDisksList.asScala
.map { pbDiskInfo => pbDiskInfo.getMountPoint ->
PbSerDeUtils.fromPbDiskInfo(pbDiskInfo) }
.toMap.asJava
@@ -411,6 +412,7 @@ private[celeborn] class Master(
pushPort,
fetchPort,
replicatePort,
+ internalPort,
disks,
userResourceConsumption,
requestId))
@@ -667,6 +669,7 @@ private[celeborn] class Master(
pushPort,
fetchPort,
replicatePort,
+ -1,
new util.HashMap[String, DiskInfo](),
JavaUtils.newConcurrentHashMap[UserIdentifier, ResourceConsumption]())
val worker: WorkerInfo = workersSnapShot
@@ -691,6 +694,7 @@ private[celeborn] class Master(
pushPort: Int,
fetchPort: Int,
replicatePort: Int,
+ internalPort: Int,
disks: util.Map[String, DiskInfo],
userResourceConsumption: util.Map[UserIdentifier, ResourceConsumption],
requestId: String): Unit = {
@@ -701,6 +705,7 @@ private[celeborn] class Master(
pushPort,
fetchPort,
replicatePort,
+ internalPort,
disks,
userResourceConsumption)
if (workersSnapShot.contains(workerToRegister)) {
@@ -715,6 +720,7 @@ private[celeborn] class Master(
pushPort,
fetchPort,
replicatePort,
+ internalPort,
disks,
userResourceConsumption,
newRequestId)
@@ -729,6 +735,7 @@ private[celeborn] class Master(
pushPort,
fetchPort,
replicatePort,
+ internalPort,
disks,
userResourceConsumption,
requestId)
@@ -740,6 +747,7 @@ private[celeborn] class Master(
pushPort,
fetchPort,
replicatePort,
+ internalPort,
disks,
userResourceConsumption,
requestId)
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java
index 7140cc6c6..7af8953f2 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java
@@ -136,12 +136,12 @@ public class SlotsAllocatorRackAwareSuiteJ {
private List<WorkerInfo> prepareWorkers(CelebornRackResolver resolver) {
ArrayList<WorkerInfo> workers = new ArrayList<>(3);
- workers.add(new WorkerInfo("host1", 9, 10, 110, 113, new HashMap<>(),
null));
- workers.add(new WorkerInfo("host2", 9, 11, 111, 114, new HashMap<>(),
null));
- workers.add(new WorkerInfo("host3", 9, 12, 112, 115, new HashMap<>(),
null));
- workers.add(new WorkerInfo("host4", 9, 10, 110, 113, new HashMap<>(),
null));
- workers.add(new WorkerInfo("host5", 9, 11, 111, 114, new HashMap<>(),
null));
- workers.add(new WorkerInfo("host6", 9, 12, 112, 115, new HashMap<>(),
null));
+ workers.add(new WorkerInfo("host1", 9, 10, 110, 113, 212, new HashMap<>(),
null));
+ workers.add(new WorkerInfo("host2", 9, 11, 111, 114, 212, new HashMap<>(),
null));
+ workers.add(new WorkerInfo("host3", 9, 12, 112, 115, 212, new HashMap<>(),
null));
+ workers.add(new WorkerInfo("host4", 9, 10, 110, 113, 212, new HashMap<>(),
null));
+ workers.add(new WorkerInfo("host5", 9, 11, 111, 114, 212, new HashMap<>(),
null));
+ workers.add(new WorkerInfo("host6", 9, 12, 112, 115, 212, new HashMap<>(),
null));
workers.forEach(
new Consumer<WorkerInfo>() {
@@ -391,7 +391,7 @@ public class SlotsAllocatorRackAwareSuiteJ {
diskInfoMap.put(diskInfo.mountPoint(), diskInfo);
WorkerInfo workerInfo =
- new WorkerInfo(host, 1, 2, 3, 4,
Collections.unmodifiableMap(diskInfoMap), null);
+ new WorkerInfo(host, 1, 2, 3, 4, 5,
Collections.unmodifiableMap(diskInfoMap), null);
workerInfo.networkLocation_$eq(rack);
return workerInfo;
}
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
index 511046922..4bd04eeb1 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
@@ -133,9 +133,9 @@ public class SlotsAllocatorSuiteJ {
}
ArrayList<WorkerInfo> workers = new ArrayList<>(3);
- workers.add(new WorkerInfo("host1", 9, 10, 110, 113, disks1, null));
- workers.add(new WorkerInfo("host2", 9, 11, 111, 114, disks2, null));
- workers.add(new WorkerInfo("host3", 9, 12, 112, 115, disks3, null));
+ workers.add(new WorkerInfo("host1", 9, 10, 110, 113, 116, disks1, null));
+ workers.add(new WorkerInfo("host2", 9, 11, 111, 114, 118, disks2, null));
+ workers.add(new WorkerInfo("host3", 9, 12, 112, 115, 120, disks3, null));
return workers;
}
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
index b1ec73d3e..25094ea13 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
@@ -57,6 +57,7 @@ public class DefaultMetaSystemSuiteJ {
private static final int PUSHPORT1 = 1112;
private static final int FETCHPORT1 = 1113;
private static final int REPLICATEPORT1 = 1114;
+ private static final int INTERNALPORT1 = 1115;
private static final Map<String, DiskInfo> disks1 = new HashMap<>();
private static final Map<UserIdentifier, ResourceConsumption>
userResourceConsumption1 =
new HashMap<>();
@@ -66,6 +67,7 @@ public class DefaultMetaSystemSuiteJ {
private static final int PUSHPORT2 = 2112;
private static final int FETCHPORT2 = 2113;
private static final int REPLICATEPORT2 = 2114;
+ private static final int INTERNALPORT2 = 2115;
private static final Map<String, DiskInfo> disks2 = new HashMap<>();
private static final Map<UserIdentifier, ResourceConsumption>
userResourceConsumption2 =
new HashMap<>();
@@ -75,6 +77,7 @@ public class DefaultMetaSystemSuiteJ {
private static final int PUSHPORT3 = 3112;
private static final int FETCHPORT3 = 3113;
private static final int REPLICATEPORT3 = 3114;
+ private static final int INTERNALPORT3 = 3115;
private static final Map<String, DiskInfo> disks3 = new HashMap<>();
private static final Map<UserIdentifier, ResourceConsumption>
userResourceConsumption3 =
new HashMap<>();
@@ -121,6 +124,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
+ INTERNALPORT1,
disks1,
userResourceConsumption1,
getNewReqeustId());
@@ -130,6 +134,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
+ INTERNALPORT2,
disks2,
userResourceConsumption2,
getNewReqeustId());
@@ -139,6 +144,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT3,
FETCHPORT3,
REPLICATEPORT3,
+ INTERNALPORT3,
disks3,
userResourceConsumption3,
getNewReqeustId());
@@ -155,6 +161,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
+ INTERNALPORT1,
disks1,
userResourceConsumption1);
WorkerInfo workerInfo2 =
@@ -164,6 +171,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
+ INTERNALPORT2,
disks2,
userResourceConsumption2);
@@ -173,6 +181,7 @@ public class DefaultMetaSystemSuiteJ {
workerInfo1.pushPort(),
workerInfo1.fetchPort(),
workerInfo1.replicatePort(),
+ workerInfo1.internalPort(),
workerInfo1.diskInfos(),
workerInfo1.userResourceConsumption(),
getNewReqeustId());
@@ -182,6 +191,7 @@ public class DefaultMetaSystemSuiteJ {
workerInfo2.pushPort(),
workerInfo2.fetchPort(),
workerInfo2.replicatePort(),
+ workerInfo2.internalPort(),
workerInfo2.diskInfos(),
workerInfo2.userResourceConsumption(),
getNewReqeustId());
@@ -203,6 +213,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
+ INTERNALPORT1,
disks1,
userResourceConsumption1,
getNewReqeustId());
@@ -212,6 +223,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
+ INTERNALPORT2,
disks2,
userResourceConsumption2,
getNewReqeustId());
@@ -221,6 +233,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT3,
FETCHPORT3,
REPLICATEPORT3,
+ INTERNALPORT3,
disks3,
userResourceConsumption3,
getNewReqeustId());
@@ -242,6 +255,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
+ INTERNALPORT1,
disks1,
userResourceConsumption1,
getNewReqeustId());
@@ -251,6 +265,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
+ INTERNALPORT2,
disks2,
userResourceConsumption2,
getNewReqeustId());
@@ -260,6 +275,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT3,
FETCHPORT3,
REPLICATEPORT3,
+ INTERNALPORT3,
disks3,
userResourceConsumption3,
getNewReqeustId());
@@ -271,6 +287,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
+ INTERNALPORT1,
disks1,
userResourceConsumption1);
WorkerInfo workerInfo2 =
@@ -280,6 +297,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
+ INTERNALPORT2,
disks2,
userResourceConsumption2);
WorkerInfo workerInfo3 =
@@ -289,6 +307,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT3,
FETCHPORT3,
REPLICATEPORT3,
+ INTERNALPORT3,
disks3,
userResourceConsumption3);
@@ -313,6 +332,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
+ INTERNALPORT1,
disks1,
userResourceConsumption1,
getNewReqeustId());
@@ -322,6 +342,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
+ INTERNALPORT2,
disks2,
userResourceConsumption2,
getNewReqeustId());
@@ -331,6 +352,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT3,
FETCHPORT3,
REPLICATEPORT3,
+ INTERNALPORT3,
disks3,
userResourceConsumption3,
getNewReqeustId());
@@ -373,6 +395,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
+ INTERNALPORT1,
disks1,
userResourceConsumption1,
getNewReqeustId());
@@ -382,6 +405,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
+ INTERNALPORT2,
disks2,
userResourceConsumption2,
getNewReqeustId());
@@ -391,6 +415,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT3,
FETCHPORT3,
REPLICATEPORT3,
+ INTERNALPORT3,
disks3,
userResourceConsumption3,
getNewReqeustId());
@@ -402,6 +427,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
+ INTERNALPORT1,
disks1,
userResourceConsumption1);
WorkerInfo workerInfo2 =
@@ -411,6 +437,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
+ INTERNALPORT2,
disks2,
userResourceConsumption2);
@@ -437,6 +464,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
+ INTERNALPORT1,
disks1,
userResourceConsumption1,
getNewReqeustId());
@@ -446,6 +474,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
+ INTERNALPORT2,
disks2,
userResourceConsumption2,
getNewReqeustId());
@@ -455,6 +484,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT3,
FETCHPORT3,
REPLICATEPORT3,
+ INTERNALPORT3,
disks3,
userResourceConsumption3,
getNewReqeustId());
@@ -466,6 +496,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
+ INTERNALPORT1,
disks1,
userResourceConsumption1);
WorkerInfo workerInfo2 =
@@ -475,6 +506,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
+ INTERNALPORT2,
disks2,
userResourceConsumption2);
@@ -514,6 +546,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
+ INTERNALPORT1,
new HashMap<>(),
userResourceConsumption1,
getNewReqeustId());
@@ -523,6 +556,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
+ INTERNALPORT2,
new HashMap<>(),
userResourceConsumption2,
getNewReqeustId());
@@ -532,6 +566,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT3,
FETCHPORT3,
REPLICATEPORT3,
+ INTERNALPORT3,
new HashMap<>(),
userResourceConsumption3,
getNewReqeustId());
@@ -609,6 +644,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
+ INTERNALPORT1,
disks1,
userResourceConsumption1,
getNewReqeustId());
@@ -618,6 +654,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
+ INTERNALPORT2,
disks2,
userResourceConsumption2,
getNewReqeustId());
@@ -627,6 +664,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT3,
FETCHPORT3,
REPLICATEPORT3,
+ INTERNALPORT3,
disks3,
userResourceConsumption3,
getNewReqeustId());
@@ -639,6 +677,7 @@ public class DefaultMetaSystemSuiteJ {
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
+ INTERNALPORT1,
disks1,
userResourceConsumption1));
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
index 79fb047c5..89d2ccbaa 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
@@ -185,9 +185,9 @@ public class MasterStateMachineSuiteJ extends
RatisBaseSuiteJ {
3,
Collections.singletonMap("appId3", new ResourceConsumption(2000,
2, 2000, 2, null))));
- WorkerInfo info1 = new WorkerInfo("host1", 1, 2, 3, 10, disks1,
userResourceConsumption1);
- WorkerInfo info2 = new WorkerInfo("host2", 4, 5, 6, 11, disks2,
userResourceConsumption2);
- WorkerInfo info3 = new WorkerInfo("host3", 7, 8, 9, 12, disks3,
userResourceConsumption3);
+ WorkerInfo info1 = new WorkerInfo("host1", 1, 2, 3, 10, 13, disks1,
userResourceConsumption1);
+ WorkerInfo info2 = new WorkerInfo("host2", 4, 5, 6, 11, 15, disks2,
userResourceConsumption2);
+ WorkerInfo info3 = new WorkerInfo("host3", 7, 8, 9, 12, 17, disks3,
userResourceConsumption3);
String host1 = "host1";
String host2 = "host2";
@@ -220,9 +220,9 @@ public class MasterStateMachineSuiteJ extends
RatisBaseSuiteJ {
AppDiskUsageSnapShot originCurrentSnapshot =
masterStatusSystem.appDiskUsageMetric.currentSnapShot().get();
- masterStatusSystem.workers.add(new WorkerInfo(host1, 9095, 9094, 9093,
9092));
- masterStatusSystem.workers.add(new WorkerInfo(host2, 9095, 9094, 9093,
9092));
- masterStatusSystem.workers.add(new WorkerInfo(host3, 9095, 9094, 9093,
9092));
+ masterStatusSystem.workers.add(new WorkerInfo(host1, 9095, 9094, 9093,
9092, 9091));
+ masterStatusSystem.workers.add(new WorkerInfo(host2, 9095, 9094, 9093,
9092, 9091));
+ masterStatusSystem.workers.add(new WorkerInfo(host3, 9095, 9094, 9093,
9092, 9091));
masterStatusSystem.writeMetaInfoToFile(tmpFile);
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index f192e91f9..e42d3aac9 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -186,6 +186,7 @@ public class RatisMasterStatusSystemSuiteJ {
private static final int PUSHPORT1 = 1112;
private static final int FETCHPORT1 = 1113;
private static final int REPLICATEPORT1 = 1114;
+ private static final int INTERNALPORT1 = 1115;
private static final Map<String, DiskInfo> disks1 = new HashMap<>();
private static final Map<UserIdentifier, ResourceConsumption>
userResourceConsumption1 =
new HashMap<>();
@@ -195,6 +196,7 @@ public class RatisMasterStatusSystemSuiteJ {
private static final int PUSHPORT2 = 2112;
private static final int FETCHPORT2 = 2113;
private static final int REPLICATEPORT2 = 2114;
+ private static final int INTERNALPORT2 = 2115;
private static final Map<String, DiskInfo> disks2 = new HashMap<>();
private static final Map<UserIdentifier, ResourceConsumption>
userResourceConsumption2 =
new HashMap<>();
@@ -204,6 +206,7 @@ public class RatisMasterStatusSystemSuiteJ {
private static final int PUSHPORT3 = 3112;
private static final int FETCHPORT3 = 3113;
private static final int REPLICATEPORT3 = 3114;
+ private static final int INTERNALPORT3 = 3115;
private static final Map<String, DiskInfo> disks3 = new HashMap<>();
private static final Map<UserIdentifier, ResourceConsumption>
userResourceConsumption3 =
new HashMap<>();
@@ -251,6 +254,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
+ INTERNALPORT1,
disks1,
userResourceConsumption1,
getNewReqeustId());
@@ -273,6 +277,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
+ INTERNALPORT1,
disks1,
userResourceConsumption1,
getNewReqeustId());
@@ -282,6 +287,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
+ INTERNALPORT2,
disks2,
userResourceConsumption2,
getNewReqeustId());
@@ -291,6 +297,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT3,
FETCHPORT3,
REPLICATEPORT3,
+ INTERNALPORT3,
disks3,
userResourceConsumption3,
getNewReqeustId());
@@ -314,6 +321,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
+ INTERNALPORT1,
disks1,
userResourceConsumption1);
WorkerInfo workerInfo2 =
@@ -323,6 +331,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
+ INTERNALPORT2,
disks2,
userResourceConsumption2);
@@ -332,6 +341,7 @@ public class RatisMasterStatusSystemSuiteJ {
workerInfo1.pushPort(),
workerInfo1.fetchPort(),
workerInfo1.replicatePort(),
+ workerInfo1.internalPort(),
workerInfo1.diskInfos(),
workerInfo1.userResourceConsumption(),
getNewReqeustId());
@@ -341,6 +351,7 @@ public class RatisMasterStatusSystemSuiteJ {
workerInfo2.pushPort(),
workerInfo2.fetchPort(),
workerInfo2.replicatePort(),
+ workerInfo2.internalPort(),
workerInfo2.diskInfos(),
workerInfo2.userResourceConsumption(),
getNewReqeustId());
@@ -373,6 +384,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
+ INTERNALPORT1,
disks1,
userResourceConsumption1,
getNewReqeustId());
@@ -382,6 +394,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
+ INTERNALPORT2,
disks2,
userResourceConsumption2,
getNewReqeustId());
@@ -391,6 +404,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT3,
FETCHPORT3,
REPLICATEPORT3,
+ INTERNALPORT3,
disks3,
userResourceConsumption3,
getNewReqeustId());
@@ -415,6 +429,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
+ INTERNALPORT1,
disks1,
userResourceConsumption1,
getNewReqeustId());
@@ -424,6 +439,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
+ INTERNALPORT2,
disks2,
userResourceConsumption2,
getNewReqeustId());
@@ -433,6 +449,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT3,
FETCHPORT3,
REPLICATEPORT3,
+ INTERNALPORT3,
disks3,
userResourceConsumption3,
getNewReqeustId());
@@ -444,6 +461,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
+ INTERNALPORT1,
disks1,
userResourceConsumption1);
WorkerInfo workerInfo2 =
@@ -453,6 +471,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
+ INTERNALPORT2,
disks2,
userResourceConsumption2);
WorkerInfo workerInfo3 =
@@ -462,6 +481,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT3,
FETCHPORT3,
REPLICATEPORT3,
+ INTERNALPORT3,
disks3,
userResourceConsumption3);
@@ -514,6 +534,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
+ INTERNALPORT1,
disks1,
userResourceConsumption1,
getNewReqeustId());
@@ -523,6 +544,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
+ INTERNALPORT2,
disks2,
userResourceConsumption2,
getNewReqeustId());
@@ -532,6 +554,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT3,
FETCHPORT3,
REPLICATEPORT3,
+ INTERNALPORT3,
disks3,
userResourceConsumption3,
getNewReqeustId());
@@ -598,6 +621,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
+ INTERNALPORT1,
disks1,
userResourceConsumption1,
getNewReqeustId());
@@ -607,6 +631,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
+ INTERNALPORT2,
disks2,
userResourceConsumption2,
getNewReqeustId());
@@ -616,6 +641,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT3,
FETCHPORT3,
REPLICATEPORT3,
+ INTERNALPORT3,
disks3,
userResourceConsumption3,
getNewReqeustId());
@@ -628,6 +654,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
+ INTERNALPORT1,
disks1,
userResourceConsumption1);
WorkerInfo workerInfo2 =
@@ -637,6 +664,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
+ INTERNALPORT2,
disks2,
userResourceConsumption2);
Map<String, Map<String, Integer>> workersToAllocate = new HashMap<>();
@@ -671,6 +699,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
+ INTERNALPORT1,
disks1,
userResourceConsumption1,
getNewReqeustId());
@@ -680,6 +709,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
+ INTERNALPORT2,
disks2,
userResourceConsumption2,
getNewReqeustId());
@@ -689,6 +719,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT3,
FETCHPORT3,
REPLICATEPORT3,
+ INTERNALPORT3,
disks3,
userResourceConsumption3,
getNewReqeustId());
@@ -700,6 +731,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
+ INTERNALPORT1,
disks1,
userResourceConsumption1);
WorkerInfo workerInfo2 =
@@ -709,6 +741,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
+ INTERNALPORT2,
disks2,
userResourceConsumption2);
@@ -769,6 +802,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
+ INTERNALPORT1,
disks1,
userResourceConsumption1,
getNewReqeustId());
@@ -778,6 +812,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
+ INTERNALPORT2,
disks2,
userResourceConsumption2,
getNewReqeustId());
@@ -787,6 +822,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT3,
FETCHPORT3,
REPLICATEPORT3,
+ INTERNALPORT3,
disks3,
userResourceConsumption3,
getNewReqeustId());
@@ -923,6 +959,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
+ INTERNALPORT1,
disks1,
userResourceConsumption1,
getNewReqeustId());
@@ -932,6 +969,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
+ INTERNALPORT2,
disks2,
userResourceConsumption2,
getNewReqeustId());
@@ -941,6 +979,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT3,
FETCHPORT3,
REPLICATEPORT3,
+ INTERNALPORT3,
disks3,
userResourceConsumption3,
getNewReqeustId());
@@ -953,6 +992,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
+ INTERNALPORT1,
disks1,
userResourceConsumption1));
@@ -977,6 +1017,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
+ INTERNALPORT1,
disks1,
userResourceConsumption1,
getNewReqeustId());
@@ -986,6 +1027,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
+ INTERNALPORT2,
disks2,
userResourceConsumption2,
getNewReqeustId());
@@ -995,6 +1037,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT3,
FETCHPORT3,
REPLICATEPORT3,
+ INTERNALPORT3,
disks3,
userResourceConsumption3,
getNewReqeustId());
@@ -1006,6 +1049,7 @@ public class RatisMasterStatusSystemSuiteJ {
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
+ INTERNALPORT1,
disks1,
userResourceConsumption1);
diff --git
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/AppDiskUsageMetricSuite.scala
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/AppDiskUsageMetricSuite.scala
index 00f600bb7..c91d197d7 100644
---
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/AppDiskUsageMetricSuite.scala
+++
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/AppDiskUsageMetricSuite.scala
@@ -32,9 +32,9 @@ class AppDiskUsageMetricSuite extends AnyFunSuite
with BeforeAndAfterAll
with BeforeAndAfterEach
with Logging {
- val WORKER1 = new WorkerInfo("host1", 111, 112, 113, 114)
- val WORKER2 = new WorkerInfo("host2", 211, 212, 213, 214)
- val WORKER3 = new WorkerInfo("host3", 311, 312, 313, 314)
+ val WORKER1 = new WorkerInfo("host1", 111, 112, 113, 114, 115)
+ val WORKER2 = new WorkerInfo("host2", 211, 212, 213, 214, 215)
+ val WORKER3 = new WorkerInfo("host3", 311, 312, 313, 314, 315)
test("test snapshot ordering") {
val snapShot = new AppDiskUsageSnapShot(50)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/InternalRpcEndpoint.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/InternalRpcEndpoint.scala
new file mode 100644
index 000000000..5f84aed4a
--- /dev/null
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/InternalRpcEndpoint.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.exception.CelebornException
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.rpc._
+
+/**
+ * Internal RPC endpoint used by the Workers to communicate with the Masters.
+ * internal port.
+ */
+private[celeborn] class InternalRpcEndpoint(
+ override val rpcEnv: RpcEnv,
+ val conf: CelebornConf)
+ extends RpcEndpoint with Logging {
+
+ override def onDisconnected(address: RpcAddress): Unit = {
+ logDebug(s"Client $address got disconnected.")
+ }
+
+ override def receive: PartialFunction[Any, Unit] = {
+ // TODO: [CELEBORN-1234] Handle the application secret from the Master
+ case _ => throw new CelebornException(self + " not implemented")
+ }
+
+}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index fefa14ec7..604a588e6 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -39,11 +39,16 @@ import org.apache.celeborn.common.meta.{DiskInfo,
WorkerInfo, WorkerPartitionLoc
import org.apache.celeborn.common.metrics.MetricsSystem
import org.apache.celeborn.common.metrics.source.{JVMCPUSource, JVMSource,
ResourceConsumptionSource, SystemMiscSource, ThreadPoolSource}
import org.apache.celeborn.common.network.TransportContext
+import org.apache.celeborn.common.network.sasl.SaslServerBootstrap
+import org.apache.celeborn.common.network.sasl.SecretRegistryImpl
+import org.apache.celeborn.common.network.server.TransportServerBootstrap
+import org.apache.celeborn.common.network.util.TransportConf
import org.apache.celeborn.common.protocol.{PartitionType,
PbRegisterWorkerResponse, PbWorkerLostResponse, RpcNameConstants,
TransportModuleConstants, WorkerEventType}
import org.apache.celeborn.common.protocol.PbWorkerStatus.State
import org.apache.celeborn.common.protocol.message.ControlMessages._
import org.apache.celeborn.common.quota.ResourceConsumption
import org.apache.celeborn.common.rpc._
+import org.apache.celeborn.common.rpc.{RpcSecurityContextBuilder,
ServerSaslContextBuilder}
import org.apache.celeborn.common.util.{CelebornExitKind, CollectionUtils,
JavaUtils, ShutdownHookManager, ThreadUtils, Utils}
// Can Remove this if celeborn don't support scala211 in future
import org.apache.celeborn.common.util.FunctionConverter._
@@ -78,27 +83,65 @@ private[celeborn] class Worker(
metricsSystem.registerSource(new SystemMiscSource(conf,
MetricsSystem.ROLE_WORKER))
val workerStatusManager = new WorkerStatusManager(conf)
- val rpcEnv: RpcEnv = RpcEnv.create(
- RpcNameConstants.WORKER_SYS,
- workerArgs.host,
- workerArgs.host,
- workerArgs.port,
- conf,
- Math.min(64, Math.max(4, Runtime.getRuntime.availableProcessors())))
+ private val authEnabled = conf.authEnabled
+ private val secretRegistry = new SecretRegistryImpl()
+ val rpcEnv: RpcEnv =
+ if (!authEnabled) {
+ RpcEnv.create(
+ RpcNameConstants.WORKER_SYS,
+ workerArgs.host,
+ workerArgs.host,
+ workerArgs.port,
+ conf,
+ Math.min(64, Math.max(4, Runtime.getRuntime.availableProcessors())))
+ } else {
+ val externalSecurityContext = new RpcSecurityContextBuilder()
+ .withServerSaslContext(
+ new ServerSaslContextBuilder()
+ .withAddRegistrationBootstrap(false)
+ .withSecretRegistry(secretRegistry).build()).build()
+ logInfo(
+ s"Secure port enabled ${workerArgs.port} for secured RPC.")
+ RpcEnv.create(
+ RpcNameConstants.WORKER_SYS,
+ workerArgs.host,
+ workerArgs.host,
+ workerArgs.port,
+ conf,
+ Math.max(64, Runtime.getRuntime.availableProcessors()),
+ Some(externalSecurityContext))
+ }
+
+ private[worker] var internalRpcEnvInUse =
+ if (!conf.internalPortEnabled) {
+ rpcEnv
+ } else {
+ RpcEnv.create(
+ RpcNameConstants.WORKER_INTERNAL_SYS,
+ workerArgs.host,
+ workerArgs.host,
+ workerArgs.internalPort,
+ conf,
+ Math.min(64, Math.max(4, Runtime.getRuntime.availableProcessors())))
+ }
private val host = rpcEnv.address.host
private val rpcPort = rpcEnv.address.port
+ private val internalPort = internalRpcEnvInUse.address.port
Utils.checkHost(host)
private val WORKER_SHUTDOWN_PRIORITY = 100
val shutdown = new AtomicBoolean(false)
private val gracefulShutdown = conf.workerGracefulShutdown
if (gracefulShutdown) {
- val checkPortMap = Map(
+ var checkPortMap = Map(
WORKER_RPC_PORT -> conf.workerRpcPort,
WORKER_FETCH_PORT -> conf.workerFetchPort,
WORKER_PUSH_PORT -> conf.workerPushPort,
WORKER_REPLICATE_PORT -> conf.workerReplicatePort)
+ if (conf.internalPortEnabled) {
+ checkPortMap += (WORKER_INTERNAL_PORT -> conf.workerInternalPort)
+ }
assert(
!checkPortMap.values.exists(_ == 0),
"If enable graceful shutdown, the worker should use non-zero port. " +
@@ -141,6 +184,16 @@ private[celeborn] class Worker(
var controller = new Controller(rpcEnv, conf, metricsSystem, workerSource)
rpcEnv.setupEndpoint(RpcNameConstants.WORKER_EP, controller)
+ // Visible for testing
+ private[worker] var internalRpcEndpoint: RpcEndpoint = _
+ private var internalRpcEndpointRef: RpcEndpointRef = _
+ if (conf.internalPortEnabled) {
+ internalRpcEndpoint = new InternalRpcEndpoint(internalRpcEnvInUse, conf)
+ internalRpcEndpointRef = internalRpcEnvInUse.setupEndpoint(
+ RpcNameConstants.WORKER_INTERNAL_EP,
+ internalRpcEndpoint)
+ }
+
val pushDataHandler = new PushDataHandler(workerSource)
private val pushServer = {
val closeIdleConnections = conf.workerCloseIdleConnections
@@ -156,7 +209,7 @@ private[celeborn] class Worker(
pushServerLimiter,
conf.workerPushHeartbeatEnabled,
workerSource)
- transportContext.createServer(conf.workerPushPort)
+ transportContext.createServer(conf.workerPushPort,
getServerBootstraps(transportConf))
}
val replicateHandler = new PushDataHandler(workerSource)
@@ -194,7 +247,7 @@ private[celeborn] class Worker(
closeIdleConnections,
conf.workerFetchHeartbeatEnabled,
workerSource)
- transportContext.createServer(conf.workerFetchPort)
+ transportContext.createServer(conf.workerFetchPort,
getServerBootstraps(transportConf))
}
private val pushPort = pushServer.getPort
@@ -221,6 +274,7 @@ private[celeborn] class Worker(
pushPort,
fetchPort,
replicatePort,
+ internalPort,
diskInfos,
JavaUtils.newConcurrentHashMap[UserIdentifier, ResourceConsumption])
@@ -237,8 +291,7 @@ private[celeborn] class Worker(
val shuffleCommitInfos: ConcurrentHashMap[String, ConcurrentHashMap[Long,
CommitInfo]] =
JavaUtils.newConcurrentHashMap[String, ConcurrentHashMap[Long,
CommitInfo]]()
- // TODO: pass the internal rpc env here when internal port is added to the
worker.
- private val masterClient = new MasterClient(rpcEnv, conf, true)
+ private val masterClient = new MasterClient(internalRpcEnvInUse, conf, true)
// (workerInfo -> last connect timeout timestamp)
val unavailablePeers: ConcurrentHashMap[WorkerInfo, Long] =
@@ -435,6 +488,9 @@ private[celeborn] class Worker(
logInfo("Worker started.")
rpcEnv.awaitTermination()
+ if (conf.internalPortEnabled) {
+ internalRpcEnvInUse.awaitTermination()
+ }
}
override def stop(exitKind: Int): Unit = {
@@ -482,7 +538,9 @@ private[celeborn] class Worker(
fetchServer.shutdown(exitKind)
pushServer.shutdown(exitKind)
metricsSystem.stop()
-
+ if (conf.internalPortEnabled) {
+ internalRpcEnvInUse.stop(internalRpcEndpointRef)
+ }
super.stop(exitKind)
logInfo("Worker is stopped.")
@@ -504,6 +562,7 @@ private[celeborn] class Worker(
pushPort,
fetchPort,
replicatePort,
+ internalPort,
// Use WorkerInfo's diskInfo since re-register when heartbeat
return not-registered,
// StorageManager have update the disk info.
workerInfo.diskInfos.asScala.toMap,
@@ -860,6 +919,17 @@ private[celeborn] class Worker(
@VisibleForTesting
def getPushFetchServerPort: (Int, Int) = (pushPort, fetchPort)
+
+ def getServerBootstraps(transportConf: TransportConf)
+ : java.util.List[TransportServerBootstrap] = {
+ val serverBootstraps = new java.util.ArrayList[TransportServerBootstrap]()
+ if (authEnabled) {
+ serverBootstraps.add(new SaslServerBootstrap(
+ transportConf,
+ secretRegistry))
+ }
+ serverBootstraps
+ }
}
private[deploy] object Worker extends Logging {
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerArguments.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerArguments.scala
index 6f8fb682b..45c84f98c 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerArguments.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerArguments.scala
@@ -26,6 +26,7 @@ class WorkerArguments(args: Array[String], conf:
CelebornConf) {
private var _host: Option[String] = None
private var _port: Option[Int] = None
+ private var _internalPort: Option[Int] = None
// for local testing.
private var _master: Option[String] = None
private var _propertiesFile: Option[String] = None
@@ -36,11 +37,14 @@ class WorkerArguments(args: Array[String], conf:
CelebornConf) {
_propertiesFile = Some(Utils.loadDefaultCelebornProperties(conf,
_propertiesFile.orNull))
_host = _host.orElse(Some(Utils.localHostName(conf)))
_port = _port.orElse(Some(conf.workerRpcPort))
+ _internalPort = _internalPort.orElse(Some(conf.workerInternalPort))
def host: String = _host.get
def port: Int = _port.get
+ def internalPort: Int = _internalPort.get
+
def master: Option[String] = _master
@tailrec
@@ -54,6 +58,10 @@ class WorkerArguments(args: Array[String], conf:
CelebornConf) {
_port = Some(value)
parse(tail)
+ case ("--internal-port") :: IntParam(value) :: tail =>
+ _internalPort = Some(value)
+ parse(tail)
+
case "--properties-file" :: value :: tail =>
_propertiesFile = Some(value)
parse(tail)
@@ -83,6 +91,7 @@ class WorkerArguments(args: Array[String], conf:
CelebornConf) {
|Options:
| -h HOST, --host HOST Hostname to listen on
| -p PORT, --port PORT Port to listen on (default: random)
+ | --internal-port PORT Port for internal communication (default:
random)
| --properties-file FILE Path to a custom Celeborn properties file.
| Default is conf/celeborn-defaults.conf.
|""".stripMargin)