Clarkkkkk commented on a change in pull request #6759: [FLINK-10247][Metrics]
Run MetricQueryService in a dedicated actor system
URL: https://github.com/apache/flink/pull/6759#discussion_r221181734
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
##########
@@ -419,6 +430,49 @@ public static RpcService createRpcService(
final String portRangeDefinition =
configuration.getString(TaskManagerOptions.RPC_PORT);
+ return bindWithPort(configuration, taskManagerHostname,
portRangeDefinition, AkkaExecutorMode.FORK_JOIN_EXECUTOR);
+ }
+
+ /**
+ * Create a RPC service for the metric query service.
+ *
+ * @param configuration The configuration for the TaskManager.
+ * @param haServices to use for the task manager hostname retrieval
+ */
+ public static RpcService createMetricQueryRpcService(
+ final Configuration configuration,
+ final HighAvailabilityServices haServices) throws Exception {
+
+ checkNotNull(configuration);
+ checkNotNull(haServices);
+
+ String taskManagerHostname =
configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null);
+
+ if (taskManagerHostname != null) {
+ LOG.info("Using configured hostname/address for
MetricQueryService: {}.", taskManagerHostname);
+ } else {
+ Time lookupTimeout =
Time.milliseconds(AkkaUtils.getLookupTimeout(configuration).toMillis());
+
+ InetAddress taskManagerAddress =
LeaderRetrievalUtils.findConnectingAddress(
+ haServices.getResourceManagerLeaderRetriever(),
+ lookupTimeout);
+
+ taskManagerHostname = taskManagerAddress.getHostName();
+
+ LOG.info("MetricQueryService will use hostname/address
'{}' ({}) for communication.",
+ taskManagerHostname,
taskManagerAddress.getHostAddress());
+ }
+
+ final String portRangeDefinition =
configuration.getString(TaskManagerOptions.METRIC_QUERY_SERVICE_RPC_PORT);
+
+ return bindWithPort(configuration, taskManagerHostname,
portRangeDefinition, AkkaExecutorMode.SINGLE_THREAD_EXECUTOR);
+ }
+
+ private static RpcService bindWithPort(
+ Configuration configuration,
+ String taskManagerHostname,
+ String portRangeDefinition,
+ @Nonnull AkkaExecutorMode executorMode) throws Exception{
Review comment:
Sounds good.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services