This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e441ba5  [FLINK-23082] Split AkkaRpcServiceUtils
e441ba5 is described below

commit e441ba5ad2cc701d5fffda5ece16aa59b3b1a891
Author: Chesnay Schepler <[email protected]>
AuthorDate: Wed Jun 23 10:40:49 2021 +0200

    [FLINK-23082] Split AkkaRpcServiceUtils
---
 .../flink/runtime/dispatcher/Dispatcher.java       |  4 +-
 .../HighAvailabilityServicesUtils.java             |  5 +-
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  4 +-
 .../runtime/resourcemanager/ResourceManager.java   |  4 +-
 .../apache/flink/runtime/rpc/RpcServiceUtils.java  | 56 ++++++++++++++++++++++
 .../runtime/rpc/akka/AkkaRpcServiceUtils.java      | 32 -------------
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  4 +-
 .../flink/runtime/rpc/akka/AkkaRpcActorTest.java   |  5 +-
 8 files changed, 70 insertions(+), 44 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 5aae4cd..ee1bc67 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -62,7 +62,7 @@ import 
org.apache.flink.runtime.resourcemanager.ResourceOverview;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.PermanentlyFencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.rpc.RpcServiceUtils;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.ExceptionUtils;
@@ -152,7 +152,7 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
             DispatcherBootstrapFactory dispatcherBootstrapFactory,
             DispatcherServices dispatcherServices)
             throws Exception {
-        super(rpcService, 
AkkaRpcServiceUtils.createRandomName(DISPATCHER_NAME), fencingToken);
+        super(rpcService, RpcServiceUtils.createRandomName(DISPATCHER_NAME), 
fencingToken);
         checkNotNull(dispatcherServices);
 
         this.configuration = dispatcherServices.getConfiguration();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index c329826..f24570e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperClientHAServ
 import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.rpc.RpcServiceUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.ConfigurationException;
@@ -94,7 +95,7 @@ public class HighAvailabilityServicesUtils {
                         AkkaRpcServiceUtils.getRpcUrl(
                                 hostnamePort.f0,
                                 hostnamePort.f1,
-                                AkkaRpcServiceUtils.createWildcardName(
+                                RpcServiceUtils.createWildcardName(
                                         ResourceManager.RESOURCE_MANAGER_NAME),
                                 addressResolution,
                                 configuration);
@@ -102,7 +103,7 @@ public class HighAvailabilityServicesUtils {
                         AkkaRpcServiceUtils.getRpcUrl(
                                 hostnamePort.f0,
                                 hostnamePort.f1,
-                                
AkkaRpcServiceUtils.createWildcardName(Dispatcher.DISPATCHER_NAME),
+                                
RpcServiceUtils.createWildcardName(Dispatcher.DISPATCHER_NAME),
                                 addressResolution,
                                 configuration);
                 final String webMonitorAddress =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 75af0be..f4d79af 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -72,7 +72,7 @@ import 
org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.PermanentlyFencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.rpc.RpcServiceUtils;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.scheduler.SchedulerNG;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
@@ -223,7 +223,7 @@ public class JobMaster extends 
PermanentlyFencedRpcEndpoint<JobMasterId>
             long initializationTimestamp)
             throws Exception {
 
-        super(rpcService, 
AkkaRpcServiceUtils.createRandomName(JOB_MANAGER_NAME), jobMasterId);
+        super(rpcService, RpcServiceUtils.createRandomName(JOB_MANAGER_NAME), 
jobMasterId);
 
         final ExecutionDeploymentReconciliationHandler 
executionStateReconciliationHandler =
                 new ExecutionDeploymentReconciliationHandler() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index b8954d6..38e1312 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -62,7 +62,7 @@ import 
org.apache.flink.runtime.rest.messages.taskmanager.ThreadDumpInfo;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.rpc.RpcServiceUtils;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 import org.apache.flink.runtime.slots.ResourceRequirements;
 import org.apache.flink.runtime.taskexecutor.FileType;
@@ -168,7 +168,7 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
 
         super(
                 rpcService,
-                AkkaRpcServiceUtils.createRandomName(RESOURCE_MANAGER_NAME),
+                RpcServiceUtils.createRandomName(RESOURCE_MANAGER_NAME),
                 ResourceManagerId.fromUuid(leaderSessionId));
 
         this.resourceId = checkNotNull(resourceId);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
new file mode 100644
index 0000000..63f0577
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/** These RPC utilities contain helper methods around RPC use. */
+public class RpcServiceUtils {
+    private static final AtomicLong nextNameOffset = new AtomicLong(0L);
+
+    /**
+     * Creates a random name of the form prefix_X, where X is an increasing 
number.
+     *
+     * @param prefix Prefix string to prepend to the monotonically increasing 
name offset number
+     * @return A random name of the form prefix_X where X is an increasing 
number
+     */
+    public static String createRandomName(String prefix) {
+        Preconditions.checkNotNull(prefix, "Prefix must not be null.");
+
+        long nameOffset;
+
+        // obtain the next name offset by incrementing it atomically
+        do {
+            nameOffset = nextNameOffset.get();
+        } while (!nextNameOffset.compareAndSet(nameOffset, nameOffset + 1L));
+
+        return prefix + '_' + nameOffset;
+    }
+
+    /**
+     * Creates a wildcard name symmetric to {@link #createRandomName(String)}.
+     *
+     * @param prefix prefix of the wildcard name
+     * @return wildcard name starting with the prefix
+     */
+    public static String createWildcardName(String prefix) {
+        return prefix + "_*";
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index 264106d6..c065ca0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -39,7 +39,6 @@ import javax.annotation.Nullable;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Optional;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiFunction;
 
 import static org.apache.flink.util.NetUtils.isValidClientPort;
@@ -65,8 +64,6 @@ public class AkkaRpcServiceUtils {
     private static final String MAXIMUM_FRAME_SIZE_PATH =
             "akka.remote.netty.tcp.maximum-frame-size";
 
-    private static final AtomicLong nextNameOffset = new AtomicLong(0L);
-
     // ------------------------------------------------------------------------
     //  RPC instantiation
     // ------------------------------------------------------------------------
@@ -226,35 +223,6 @@ public class AkkaRpcServiceUtils {
         SSL_TCP
     }
 
-    /**
-     * Creates a random name of the form prefix_X, where X is an increasing 
number.
-     *
-     * @param prefix Prefix string to prepend to the monotonically increasing 
name offset number
-     * @return A random name of the form prefix_X where X is an increasing 
number
-     */
-    public static String createRandomName(String prefix) {
-        Preconditions.checkNotNull(prefix, "Prefix must not be null.");
-
-        long nameOffset;
-
-        // obtain the next name offset by incrementing it atomically
-        do {
-            nameOffset = nextNameOffset.get();
-        } while (!nextNameOffset.compareAndSet(nameOffset, nameOffset + 1L));
-
-        return prefix + '_' + nameOffset;
-    }
-
-    /**
-     * Creates a wildcard name symmetric to {@link #createRandomName(String)}.
-     *
-     * @param prefix prefix of the wildcard name
-     * @return wildcard name starting with the prefix
-     */
-    public static String createWildcardName(String prefix) {
-        return prefix + "_*";
-    }
-
     // ------------------------------------------------------------------------
     //  RPC service configuration
     // ------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index ccbc3c0..48298fc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -94,7 +94,7 @@ import 
org.apache.flink.runtime.rest.messages.taskmanager.ThreadDumpInfo;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.rpc.RpcServiceUtils;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
@@ -279,7 +279,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
             FatalErrorHandler fatalErrorHandler,
             TaskExecutorPartitionTracker partitionTracker) {
 
-        super(rpcService, 
AkkaRpcServiceUtils.createRandomName(TASK_MANAGER_NAME));
+        super(rpcService, RpcServiceUtils.createRandomName(TASK_MANAGER_NAME));
 
         checkArgument(
                 taskManagerConfiguration.getNumberSlots() > 0,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 0a8c560..fc423e0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcServiceUtils;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
@@ -498,7 +499,7 @@ public class AkkaRpcActorTest extends TestLogger {
 
             simpleRpcEndpoint1.closeAsync().join();
 
-            final String wildcardName = 
AkkaRpcServiceUtils.createWildcardName(endpointName);
+            final String wildcardName = 
RpcServiceUtils.createWildcardName(endpointName);
             final String wildcardAddress = 
AkkaRpcServiceUtils.getLocalRpcUrl(wildcardName);
             final RpcGateway rpcGateway =
                     akkaRpcService.connect(wildcardAddress, 
RpcGateway.class).join();
@@ -508,7 +509,7 @@ public class AkkaRpcActorTest extends TestLogger {
     }
 
     private RpcEndpoint createRpcEndpointWithRandomNameSuffix(String prefix) {
-        return new SimpleRpcEndpoint(akkaRpcService, 
AkkaRpcServiceUtils.createRandomName(prefix));
+        return new SimpleRpcEndpoint(akkaRpcService, 
RpcServiceUtils.createRandomName(prefix));
     }
 
     @Test

Reply via email to