This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new c90a1647a [CELEBORN-1489] Update Flink support with authentication
support
c90a1647a is described below
commit c90a1647afe96493cbc48e4f5c72fbd8195086c5
Author: Mridul Muralidharan <mridulatgmail.com>
AuthorDate: Thu Jul 4 10:17:56 2024 +0800
[CELEBORN-1489] Update Flink support with authentication support
### What changes were proposed in this pull request?
Fix authentication support for Apache Flink.
### Why are the changes needed?
Without these changes, Apache Flink applications fail when Celeborn cluster
has authentication enabled.
### Does this PR introduce _any_ user-facing change?
Fixes authentication support for Apache Flink integration
### How was this patch tested?
This is forward port + adaptation of changes we did internally (against
0.4) when testing Apache Flink applications against Celeborn cluster with
authentication (and TLS) enabled.
Integration test has been updated to additionally test for Flink
applications with authentication enabled in Celeborn cluster.
Closes #2596 from mridulm/fix-flink-auth-support.
Authored-by: Mridul Muralidharan <mridulatgmail.com>
Signed-off-by: Shuang <[email protected]>
---
.../celeborn/plugin/flink/RemoteShuffleMaster.java | 5 ++-
.../flink/network/FlinkTransportClientFactory.java | 17 +++++++---
.../flink/readclient/FlinkShuffleClientImpl.java | 30 ++++++++++++++++--
.../apache/celeborn/client/ShuffleClientImpl.java | 34 ++++++++++++--------
.../celeborn/common/client/MasterClient.java | 1 +
.../apache/celeborn/common/network/TestHelper.java | 21 +++++++++++++
.../common/network/ssl/SslSampleConfigs.java | 19 ++----------
project/CelebornBuild.scala | 2 +-
service/pom.xml | 7 +++++
.../http/ApiBaseResourceAuthenticationSuite.scala | 3 +-
.../server/common/http/ApiBaseResourceSuite.scala | 3 +-
.../celeborn/tests/flink/WordCountTest.scala | 36 ++++++++++++++++++++--
.../service/deploy/worker/WorkerSource.scala | 6 ++--
.../service/deploy/MiniClusterFeature.scala | 11 +++++--
14 files changed, 146 insertions(+), 49 deletions(-)
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
index e1eb5b481..219988542 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
@@ -232,7 +232,10 @@ public class RemoteShuffleMaster implements
ShuffleMaster<RemoteShuffleDescripto
public void close() throws Exception {
try {
jobShuffleIds.clear();
- lifecycleManager.stop();
+ LifecycleManager manager = lifecycleManager;
+ if (null != manager) {
+ manager.stop();
+ }
} catch (Exception e) {
LOG.warn("Encounter exception when shutdown: {}", e.getMessage(), e);
}
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java
index b3cfc09ea..aef4891ca 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java
@@ -18,7 +18,7 @@
package org.apache.celeborn.plugin.flink.network;
import java.io.IOException;
-import java.util.Collections;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
@@ -29,8 +29,10 @@ import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.network.TransportContext;
import org.apache.celeborn.common.network.client.TransportClient;
+import org.apache.celeborn.common.network.client.TransportClientBootstrap;
import org.apache.celeborn.common.network.client.TransportClientFactory;
import org.apache.celeborn.common.util.JavaUtils;
+import org.apache.celeborn.plugin.flink.utils.Utils;
public class FlinkTransportClientFactory extends TransportClientFactory {
@@ -39,8 +41,9 @@ public class FlinkTransportClientFactory extends
TransportClientFactory {
private ConcurrentHashMap<Long, Supplier<ByteBuf>> bufferSuppliers;
private final int fetchMaxRetries;
- public FlinkTransportClientFactory(TransportContext context, int
fetchMaxRetries) {
- super(context, Collections.emptyList());
+ public FlinkTransportClientFactory(
+ TransportContext context, int fetchMaxRetries,
List<TransportClientBootstrap> bootstraps) {
+ super(context, bootstraps);
bufferSuppliers = JavaUtils.newConcurrentHashMap();
this.fetchMaxRetries = fetchMaxRetries;
this.pooledAllocator = new UnpooledByteBufAllocator(true);
@@ -53,7 +56,7 @@ public class FlinkTransportClientFactory extends
TransportClientFactory {
while (retryCount > 0) {
try {
return createClient(remoteHost, remotePort);
- } catch (IOException e) {
+ } catch (Exception e) {
retryCount--;
logger.warn(
"Retrying ({}/{}) times create client to {}:{}",
@@ -63,7 +66,11 @@ public class FlinkTransportClientFactory extends
TransportClientFactory {
remotePort,
e);
if (retryCount == 0) {
- throw e;
+ if (e instanceof InterruptedException || e instanceof IOException) {
+ throw e;
+ } else {
+ Utils.rethrowAsRuntimeException(e);
+ }
}
}
}
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
index 79c659db5..d9b261edb 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
@@ -59,6 +59,7 @@ import org.apache.celeborn.common.protocol.ReviveRequest;
import org.apache.celeborn.common.protocol.TransportModuleConstants;
import org.apache.celeborn.common.protocol.message.ControlMessages;
import org.apache.celeborn.common.protocol.message.StatusCode;
+import org.apache.celeborn.common.rpc.RpcEndpointRef;
import org.apache.celeborn.common.util.JavaUtils;
import org.apache.celeborn.common.util.PbSerDeUtils;
import org.apache.celeborn.common.util.Utils;
@@ -76,6 +77,8 @@ public class FlinkShuffleClientImpl extends ShuffleClientImpl
{
JavaUtils.newConcurrentHashMap();
private long driverTimestamp;
+ private final TransportContext context;
+
public static FlinkShuffleClientImpl get(
String appUniqueId,
String driverHost,
@@ -133,15 +136,33 @@ public class FlinkShuffleClientImpl extends
ShuffleClientImpl {
String module = TransportModuleConstants.DATA_MODULE;
TransportConf dataTransportConf =
Utils.fromCelebornConf(conf, module, conf.getInt("celeborn." + module
+ ".io.threads", 8));
- TransportContext context =
+ this.context =
new TransportContext(
dataTransportConf, readClientHandler,
conf.clientCloseIdleConnections());
- this.flinkTransportClientFactory =
- new FlinkTransportClientFactory(context,
conf.clientFetchMaxRetriesForEachReplica());
this.setupLifecycleManagerRef(driverHost, port);
this.driverTimestamp = driverTimestamp;
}
+ private void initializeTransportClientFactory() {
+ if (null == flinkTransportClientFactory) {
+ flinkTransportClientFactory =
+ new FlinkTransportClientFactory(
+ context, conf.clientFetchMaxRetriesForEachReplica(),
createBootstraps());
+ }
+ }
+
+ @Override
+ public void setupLifecycleManagerRef(String host, int port) {
+ super.setupLifecycleManagerRef(host, port);
+ initializeTransportClientFactory();
+ }
+
+ @Override
+ public void setupLifecycleManagerRef(RpcEndpointRef endpointRef) {
+ super.setupLifecycleManagerRef(endpointRef);
+ initializeTransportClientFactory();
+ }
+
public CelebornBufferStream readBufferedPartition(
int shuffleId, int partitionId, int subPartitionIndexStart, int
subPartitionIndexEnd)
throws IOException {
@@ -160,6 +181,8 @@ public class FlinkShuffleClientImpl extends
ShuffleClientImpl {
shuffleKey,
partitionId,
partitionLocations);
+
+ initializeTransportClientFactory();
return CelebornBufferStream.create(
this,
flinkTransportClientFactory,
@@ -565,6 +588,7 @@ public class FlinkShuffleClientImpl extends
ShuffleClientImpl {
@Override
@VisibleForTesting
public TransportClientFactory getDataClientFactory() {
+ initializeTransportClientFactory();
return flinkTransportClientFactory;
}
}
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index f68086448..1b22a7a62 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -209,17 +209,8 @@ public class ShuffleClientImpl extends ShuffleClient {
logger.info("Created ShuffleClientImpl, appUniqueId: {}", appUniqueId);
}
- private void initDataClientFactoryIfNeeded() {
- if (dataClientFactory != null) {
- return;
- }
- this.transportContext =
- new TransportContext(
- dataTransportConf, new BaseMessageHandler(),
conf.clientCloseIdleConnections());
- if (!authEnabled) {
- logger.info("Initializing data client factory for {}.", appUniqueId);
- dataClientFactory = transportContext.createClientFactory();
- } else if (lifecycleManagerRef != null) {
+ protected List<TransportClientBootstrap> createBootstraps() {
+ if (authEnabled && null != lifecycleManagerRef) {
PbApplicationMetaRequest pbApplicationMetaRequest =
PbApplicationMetaRequest.newBuilder().setAppId(appUniqueId).build();
PbApplicationMeta pbApplicationMeta =
@@ -227,13 +218,31 @@ public class ShuffleClientImpl extends ShuffleClient {
pbApplicationMetaRequest,
conf.clientRpcRegisterShuffleAskTimeout(),
ClassTag$.MODULE$.apply(PbApplicationMeta.class));
- logger.info("Initializing data client factory for secured {}.",
appUniqueId);
List<TransportClientBootstrap> bootstraps = Lists.newArrayList();
bootstraps.add(
new SaslClientBootstrap(
dataTransportConf,
appUniqueId,
new SaslCredentials(appUniqueId,
pbApplicationMeta.getSecret())));
+ return Collections.unmodifiableList(bootstraps);
+ } else {
+ return Collections.emptyList();
+ }
+ }
+
+ private void initDataClientFactoryIfNeeded() {
+ if (dataClientFactory != null) {
+ return;
+ }
+ this.transportContext =
+ new TransportContext(
+ dataTransportConf, new BaseMessageHandler(),
conf.clientCloseIdleConnections());
+ if (!authEnabled) {
+ logger.info("Initializing data client factory for {}.", appUniqueId);
+ dataClientFactory = transportContext.createClientFactory();
+ } else if (lifecycleManagerRef != null) {
+ logger.info("Initializing data client factory for secured {}.",
appUniqueId);
+ List<TransportClientBootstrap> bootstraps = createBootstraps();
dataClientFactory = transportContext.createClientFactory(bootstraps);
}
}
@@ -1755,6 +1764,7 @@ public class ShuffleClientImpl extends ShuffleClient {
@Override
public void setupLifecycleManagerRef(String host, int port) {
+ logger.info("setupLifecycleManagerRef: host = {}, port = {}", host, port);
lifecycleManagerRef =
rpcEnv.setupEndpointRef(new RpcAddress(host, port),
RpcNameConstants.LIFECYCLE_MANAGER_EP);
initDataClientFactoryIfNeeded();
diff --git
a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
index 24fcb69a2..df12fa520 100644
--- a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
+++ b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
@@ -65,6 +65,7 @@ public class MasterClient {
this.isWorker = isWorker;
this.masterEndpoints = resolveMasterEndpoints();
Collections.shuffle(this.masterEndpoints);
+ LOG.info("masterEndpoints = {}", masterEndpoints);
this.maxRetries = Math.max(masterEndpoints.size(),
conf.masterClientMaxRetries());
this.rpcTimeout = conf.masterClientRpcAskTimeout();
this.rpcEndpointRef = new AtomicReference<>();
diff --git
a/common/src/test/java/org/apache/celeborn/common/network/TestHelper.java
b/common/src/test/java/org/apache/celeborn/common/network/TestHelper.java
index 1e9c1cb48..efc9093fb 100644
--- a/common/src/test/java/org/apache/celeborn/common/network/TestHelper.java
+++ b/common/src/test/java/org/apache/celeborn/common/network/TestHelper.java
@@ -17,9 +17,15 @@
package org.apache.celeborn.common.network;
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
import java.util.Map;
+import org.apache.commons.io.FileUtils;
+
import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.network.ssl.SslSampleConfigs;
/** A few helper utilities to reduce duplication within test code. */
public class TestHelper {
@@ -30,4 +36,19 @@ public class TestHelper {
}
return conf;
}
+
+ public static String getResourceAsAbsolutePath(String path) {
+ try {
+ File tempFile = File.createTempFile(new File(path).getName(), null);
+ tempFile.deleteOnExit();
+ URL url = SslSampleConfigs.class.getResource(path);
+ if (null == url) {
+ throw new IllegalArgumentException("Unable to find " + path);
+ }
+ FileUtils.copyInputStreamToFile(url.openStream(), tempFile);
+ return tempFile.getCanonicalPath();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to resolve path " + path, e);
+ }
+ }
}
diff --git
a/common/src/test/java/org/apache/celeborn/common/network/ssl/SslSampleConfigs.java
b/common/src/test/java/org/apache/celeborn/common/network/ssl/SslSampleConfigs.java
index 3b634ae17..b571e9594 100644
---
a/common/src/test/java/org/apache/celeborn/common/network/ssl/SslSampleConfigs.java
+++
b/common/src/test/java/org/apache/celeborn/common/network/ssl/SslSampleConfigs.java
@@ -17,11 +17,12 @@
package org.apache.celeborn.common.network.ssl;
+import static
org.apache.celeborn.common.network.TestHelper.getResourceAsAbsolutePath;
+
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.math.BigInteger;
-import java.net.URL;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.security.*;
@@ -30,7 +31,6 @@ import java.security.cert.X509Certificate;
import java.util.*;
import java.util.stream.Stream;
-import org.apache.commons.io.FileUtils;
import org.bouncycastle.asn1.x500.X500Name;
import org.bouncycastle.asn1.x509.BasicConstraints;
import org.bouncycastle.asn1.x509.Extension;
@@ -276,19 +276,4 @@ public class SslSampleConfigs {
out.close();
}
}
-
- public static String getResourceAsAbsolutePath(String path) {
- try {
- File tempFile = File.createTempFile(new File(path).getName(), null);
- tempFile.deleteOnExit();
- URL url = SslSampleConfigs.class.getResource(path);
- if (null == url) {
- throw new IllegalArgumentException("Unable to find " + path);
- }
- FileUtils.copyInputStreamToFile(url.openStream(), tempFile);
- return tempFile.getCanonicalPath();
- } catch (IOException e) {
- throw new RuntimeException("Failed to resolve path " + path, e);
- }
- }
}
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index 9c7d54b75..1a4b7beeb 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -502,7 +502,7 @@ object CelebornClient {
object CelebornService {
lazy val service = Project("celeborn-service", file("service"))
- .dependsOn(CelebornCommon.common)
+ .dependsOn(CelebornCommon.common % "test->test;compile->compile")
.settings (
commonSettings,
libraryDependencies ++= Seq(
diff --git a/service/pom.xml b/service/pom.xml
index b6d0da739..8cc0721a5 100644
--- a/service/pom.xml
+++ b/service/pom.xml
@@ -65,6 +65,13 @@
<artifactId>celeborn-common_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
diff --git
a/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceAuthenticationSuite.scala
b/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceAuthenticationSuite.scala
index 8b7921777..938454707 100644
---
a/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceAuthenticationSuite.scala
+++
b/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceAuthenticationSuite.scala
@@ -24,6 +24,7 @@ import javax.ws.rs.core.MediaType
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.authentication.HttpAuthSchemes
+import org.apache.celeborn.common.network.TestHelper
import
org.apache.celeborn.server.common.http.HttpAuthUtils.AUTHORIZATION_HEADER
import
org.apache.celeborn.server.common.http.authentication.{UserDefinePasswordAuthenticationProviderImpl,
UserDefineTokenAuthenticationProviderImpl}
@@ -32,7 +33,7 @@ abstract class ApiBaseResourceAuthenticationSuite extends
HttpTestHelper {
.set(CelebornConf.METRICS_ENABLED.key, "true")
.set(
CelebornConf.METRICS_CONF.key,
-
Thread.currentThread().getContextClassLoader.getResource("metrics-api.properties").getFile)
+ TestHelper.getResourceAsAbsolutePath("/metrics-api.properties"))
.set(CelebornConf.MASTER_HTTP_AUTH_SUPPORTED_SCHEMES, Seq("BASIC",
"BEARER"))
.set(
CelebornConf.MASTER_HTTP_AUTH_BASIC_PROVIDER,
diff --git
a/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceSuite.scala
b/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceSuite.scala
index 28e1ed9ed..4a0c2cbd4 100644
---
a/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceSuite.scala
+++
b/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceSuite.scala
@@ -21,12 +21,13 @@ import javax.servlet.http.HttpServletResponse
import javax.ws.rs.core.MediaType
import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.network.TestHelper
abstract class ApiBaseResourceSuite extends HttpTestHelper {
celebornConf.set(CelebornConf.METRICS_ENABLED.key, "true")
.set(
CelebornConf.METRICS_CONF.key,
-
Thread.currentThread().getContextClassLoader.getResource("metrics-api.properties").getFile)
+ TestHelper.getResourceAsAbsolutePath("/metrics-api.properties"))
test("ping") {
val response = webTarget.path("ping").request(MediaType.TEXT_PLAIN).get()
diff --git
a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala
b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala
index a01251d09..aa44ce493 100644
---
a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala
+++
b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala
@@ -30,18 +30,25 @@ import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite
import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.CelebornConf.{AUTH_ENABLED,
INTERNAL_PORT_ENABLED}
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.service.deploy.MiniClusterFeature
import org.apache.celeborn.service.deploy.worker.Worker
-class WordCountTest extends AnyFunSuite with Logging with MiniClusterFeature
+abstract class WordCountTestBase extends AnyFunSuite with Logging with
MiniClusterFeature
with BeforeAndAfterAll {
var workers: collection.Set[Worker] = null
var port = 0
+ protected def getMasterConf: Map[String, String]
+ protected def getWorkerConf: Map[String, String]
+ protected def getWorkerNum: Int = 3
+
+ protected def getClientConf: Map[String, String] = Map()
+
override def beforeAll(): Unit = {
logInfo("test initialized , setup celeborn mini cluster")
- val (m, w) = setupMiniClusterWithRandomPorts()
+ val (m, w) = setupMiniClusterWithRandomPorts(getMasterConf, getWorkerConf,
getWorkerNum)
workers = w
port = m.conf.get(CelebornConf.MASTER_PORT)
}
@@ -51,7 +58,13 @@ class WordCountTest extends AnyFunSuite with Logging with
MiniClusterFeature
shutdownMiniCluster()
}
- test("celeborn flink integration test - word count") {
+ private def addClientConf(configuration: Configuration): Unit = {
+ for ((k, v) <- getClientConf) {
+ configuration.setString(k, v)
+ }
+ }
+
+ test(getClass.getName + ": celeborn flink integration test - word count") {
// set up execution environment
val configuration = new Configuration
val parallelism = 8
@@ -69,6 +82,7 @@ class WordCountTest extends AnyFunSuite with Logging with
MiniClusterFeature
configuration.setString("restart-strategy.type", "fixed-delay")
configuration.setString("restart-strategy.fixed-delay.attempts", "50")
configuration.setString("restart-strategy.fixed-delay.delay", "5s")
+ addClientConf(configuration)
val env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration)
env.getConfig.setExecutionMode(ExecutionMode.BATCH)
env.getConfig.setParallelism(parallelism)
@@ -93,3 +107,19 @@ class WordCountTest extends AnyFunSuite with Logging with
MiniClusterFeature
})
}
}
+
+class WordCountTest extends WordCountTestBase {
+ override protected def getMasterConf: Map[String, String] = Map()
+ override protected def getWorkerConf: Map[String, String] = Map()
+}
+
+class WordCountTestWithAuthentication extends WordCountTestBase {
+
+ private val authConfig = Map(
+ AUTH_ENABLED.key -> "true",
+ INTERNAL_PORT_ENABLED.key -> "true")
+
+ override protected def getMasterConf: Map[String, String] = authConfig
+ override protected def getWorkerConf: Map[String, String] = authConfig
+ override protected def getClientConf: Map[String, String] =
Map(AUTH_ENABLED.key -> "true")
+}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index efd596002..f4b152b08 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -95,8 +95,10 @@ class WorkerSource(conf: CelebornConf) extends
AbstractSource(conf, MetricsSyste
def connectionInactive(client: TransportClient): Unit = {
val applicationIds =
appActiveConnections.remove(client.getChannel.id().asLongText())
incCounter(ACTIVE_CONNECTION_COUNT, -1)
- applicationIds.asScala.foreach(applicationId =>
- incCounter(ACTIVE_CONNECTION_COUNT, -1, Map(applicationLabel ->
applicationId)))
+ if (null != applicationIds) {
+ applicationIds.asScala.foreach(applicationId =>
+ incCounter(ACTIVE_CONNECTION_COUNT, -1, Map(applicationLabel ->
applicationId)))
+ }
}
def recordAppActiveConnection(client: TransportClient, shuffleKey: String):
Unit = {
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
index 914512cca..a87015d21 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
@@ -55,16 +55,21 @@ trait MiniClusterFeature extends Logging {
while (!created) {
try {
val randomPort = selectRandomPort(1024, 65535)
+ val randomInternalPort = selectRandomPort(1024, 65535)
val finalMasterConf = Map(
s"${CelebornConf.MASTER_HOST.key}" -> "localhost",
s"${CelebornConf.PORT_MAX_RETRY.key}" -> "0",
s"${CelebornConf.MASTER_PORT.key}" -> s"$randomPort",
- s"${CelebornConf.MASTER_ENDPOINTS.key}" -> s"localhost:$randomPort")
++
+ s"${CelebornConf.MASTER_ENDPOINTS.key}" -> s"localhost:$randomPort",
+ s"${CelebornConf.MASTER_INTERNAL_PORT.key}" ->
s"$randomInternalPort",
+ s"${CelebornConf.MASTER_INTERNAL_ENDPOINTS.key}" ->
s"localhost:$randomInternalPort") ++
masterConf
val finalWorkerConf = Map(
- s"${CelebornConf.MASTER_ENDPOINTS.key}" -> s"localhost:$randomPort")
++
+ s"${CelebornConf.MASTER_ENDPOINTS.key}" -> s"localhost:$randomPort",
+ s"${CelebornConf.MASTER_INTERNAL_ENDPOINTS.key}" ->
s"localhost:$randomInternalPort") ++
workerConf
- logInfo(s"generated configuration $finalMasterConf")
+ logInfo(
+ s"generated configuration. Master conf = $finalMasterConf, worker
conf = $finalWorkerConf")
val (m, w) =
setUpMiniCluster(masterConf = finalMasterConf, workerConf =
finalWorkerConf, workerNum)
master = m