This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e441ba5 [FLINK-23082] Split AkkaRpcServiceUtils
e441ba5 is described below
commit e441ba5ad2cc701d5fffda5ece16aa59b3b1a891
Author: Chesnay Schepler <[email protected]>
AuthorDate: Wed Jun 23 10:40:49 2021 +0200
[FLINK-23082] Split AkkaRpcServiceUtils
---
.../flink/runtime/dispatcher/Dispatcher.java | 4 +-
.../HighAvailabilityServicesUtils.java | 5 +-
.../apache/flink/runtime/jobmaster/JobMaster.java | 4 +-
.../runtime/resourcemanager/ResourceManager.java | 4 +-
.../apache/flink/runtime/rpc/RpcServiceUtils.java | 56 ++++++++++++++++++++++
.../runtime/rpc/akka/AkkaRpcServiceUtils.java | 32 -------------
.../flink/runtime/taskexecutor/TaskExecutor.java | 4 +-
.../flink/runtime/rpc/akka/AkkaRpcActorTest.java | 5 +-
8 files changed, 70 insertions(+), 44 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 5aae4cd..ee1bc67 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -62,7 +62,7 @@ import
org.apache.flink.runtime.resourcemanager.ResourceOverview;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.PermanentlyFencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.rpc.RpcServiceUtils;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.ExceptionUtils;
@@ -152,7 +152,7 @@ public abstract class Dispatcher extends
PermanentlyFencedRpcEndpoint<Dispatcher
DispatcherBootstrapFactory dispatcherBootstrapFactory,
DispatcherServices dispatcherServices)
throws Exception {
- super(rpcService,
AkkaRpcServiceUtils.createRandomName(DISPATCHER_NAME), fencingToken);
+ super(rpcService, RpcServiceUtils.createRandomName(DISPATCHER_NAME),
fencingToken);
checkNotNull(dispatcherServices);
this.configuration = dispatcherServices.getConfiguration();
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index c329826..f24570e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -36,6 +36,7 @@ import
org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperClientHAServ
import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.rpc.RpcServiceUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.util.ConfigurationException;
@@ -94,7 +95,7 @@ public class HighAvailabilityServicesUtils {
AkkaRpcServiceUtils.getRpcUrl(
hostnamePort.f0,
hostnamePort.f1,
- AkkaRpcServiceUtils.createWildcardName(
+ RpcServiceUtils.createWildcardName(
ResourceManager.RESOURCE_MANAGER_NAME),
addressResolution,
configuration);
@@ -102,7 +103,7 @@ public class HighAvailabilityServicesUtils {
AkkaRpcServiceUtils.getRpcUrl(
hostnamePort.f0,
hostnamePort.f1,
-
AkkaRpcServiceUtils.createWildcardName(Dispatcher.DISPATCHER_NAME),
+
RpcServiceUtils.createWildcardName(Dispatcher.DISPATCHER_NAME),
addressResolution,
configuration);
final String webMonitorAddress =
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 75af0be..f4d79af 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -72,7 +72,7 @@ import
org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.PermanentlyFencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.rpc.RpcServiceUtils;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
@@ -223,7 +223,7 @@ public class JobMaster extends
PermanentlyFencedRpcEndpoint<JobMasterId>
long initializationTimestamp)
throws Exception {
- super(rpcService,
AkkaRpcServiceUtils.createRandomName(JOB_MANAGER_NAME), jobMasterId);
+ super(rpcService, RpcServiceUtils.createRandomName(JOB_MANAGER_NAME),
jobMasterId);
final ExecutionDeploymentReconciliationHandler
executionStateReconciliationHandler =
new ExecutionDeploymentReconciliationHandler() {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index b8954d6..38e1312 100755
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -62,7 +62,7 @@ import
org.apache.flink.runtime.rest.messages.taskmanager.ThreadDumpInfo;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.rpc.RpcServiceUtils;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.slots.ResourceRequirements;
import org.apache.flink.runtime.taskexecutor.FileType;
@@ -168,7 +168,7 @@ public abstract class ResourceManager<WorkerType extends
ResourceIDRetrievable>
super(
rpcService,
- AkkaRpcServiceUtils.createRandomName(RESOURCE_MANAGER_NAME),
+ RpcServiceUtils.createRandomName(RESOURCE_MANAGER_NAME),
ResourceManagerId.fromUuid(leaderSessionId));
this.resourceId = checkNotNull(resourceId);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
new file mode 100644
index 0000000..63f0577
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/** These RPC utilities contain helper methods around RPC use. */
+public class RpcServiceUtils {
+ private static final AtomicLong nextNameOffset = new AtomicLong(0L);
+
+ /**
+ * Creates a random name of the form prefix_X, where X is an increasing
number.
+ *
+ * @param prefix Prefix string to prepend to the monotonically increasing
name offset number
+ * @return A random name of the form prefix_X where X is an increasing
number
+ */
+ public static String createRandomName(String prefix) {
+ Preconditions.checkNotNull(prefix, "Prefix must not be null.");
+
+ long nameOffset;
+
+ // obtain the next name offset by incrementing it atomically
+ do {
+ nameOffset = nextNameOffset.get();
+ } while (!nextNameOffset.compareAndSet(nameOffset, nameOffset + 1L));
+
+ return prefix + '_' + nameOffset;
+ }
+
+ /**
+ * Creates a wildcard name symmetric to {@link #createRandomName(String)}.
+ *
+ * @param prefix prefix of the wildcard name
+ * @return wildcard name starting with the prefix
+ */
+ public static String createWildcardName(String prefix) {
+ return prefix + "_*";
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index 264106d6..c065ca0 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -39,7 +39,6 @@ import javax.annotation.Nullable;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Optional;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import static org.apache.flink.util.NetUtils.isValidClientPort;
@@ -65,8 +64,6 @@ public class AkkaRpcServiceUtils {
private static final String MAXIMUM_FRAME_SIZE_PATH =
"akka.remote.netty.tcp.maximum-frame-size";
- private static final AtomicLong nextNameOffset = new AtomicLong(0L);
-
// ------------------------------------------------------------------------
// RPC instantiation
// ------------------------------------------------------------------------
@@ -226,35 +223,6 @@ public class AkkaRpcServiceUtils {
SSL_TCP
}
- /**
- * Creates a random name of the form prefix_X, where X is an increasing
number.
- *
- * @param prefix Prefix string to prepend to the monotonically increasing
name offset number
- * @return A random name of the form prefix_X where X is an increasing
number
- */
- public static String createRandomName(String prefix) {
- Preconditions.checkNotNull(prefix, "Prefix must not be null.");
-
- long nameOffset;
-
- // obtain the next name offset by incrementing it atomically
- do {
- nameOffset = nextNameOffset.get();
- } while (!nextNameOffset.compareAndSet(nameOffset, nameOffset + 1L));
-
- return prefix + '_' + nameOffset;
- }
-
- /**
- * Creates a wildcard name symmetric to {@link #createRandomName(String)}.
- *
- * @param prefix prefix of the wildcard name
- * @return wildcard name starting with the prefix
- */
- public static String createWildcardName(String prefix) {
- return prefix + "_*";
- }
-
// ------------------------------------------------------------------------
// RPC service configuration
// ------------------------------------------------------------------------
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index ccbc3c0..48298fc 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -94,7 +94,7 @@ import
org.apache.flink.runtime.rest.messages.taskmanager.ThreadDumpInfo;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.rpc.RpcServiceUtils;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
@@ -279,7 +279,7 @@ public class TaskExecutor extends RpcEndpoint implements
TaskExecutorGateway {
FatalErrorHandler fatalErrorHandler,
TaskExecutorPartitionTracker partitionTracker) {
- super(rpcService,
AkkaRpcServiceUtils.createRandomName(TASK_MANAGER_NAME));
+ super(rpcService, RpcServiceUtils.createRandomName(TASK_MANAGER_NAME));
checkArgument(
taskManagerConfiguration.getNumberSlots() > 0,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 0a8c560..fc423e0 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -26,6 +26,7 @@ import
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcServiceUtils;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
@@ -498,7 +499,7 @@ public class AkkaRpcActorTest extends TestLogger {
simpleRpcEndpoint1.closeAsync().join();
- final String wildcardName =
AkkaRpcServiceUtils.createWildcardName(endpointName);
+ final String wildcardName =
RpcServiceUtils.createWildcardName(endpointName);
final String wildcardAddress =
AkkaRpcServiceUtils.getLocalRpcUrl(wildcardName);
final RpcGateway rpcGateway =
akkaRpcService.connect(wildcardAddress,
RpcGateway.class).join();
@@ -508,7 +509,7 @@ public class AkkaRpcActorTest extends TestLogger {
}
private RpcEndpoint createRpcEndpointWithRandomNameSuffix(String prefix) {
- return new SimpleRpcEndpoint(akkaRpcService,
AkkaRpcServiceUtils.createRandomName(prefix));
+ return new SimpleRpcEndpoint(akkaRpcService,
RpcServiceUtils.createRandomName(prefix));
}
@Test