[ 
https://issues.apache.org/jira/browse/FLINK-10247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16623195#comment-16623195
 ] 

ASF GitHub Bot commented on FLINK-10247:
----------------------------------------

Clarkkkkk closed pull request #6676: [FLINK-10247][Metrics] Run 
MetricQueryService in separate thread pool
URL: https://github.com/apache/flink/pull/6676
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
index 8821e0d9f4a..c283893c249 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
@@ -40,6 +40,8 @@
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import static 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpSerializer;
 
@@ -69,54 +71,19 @@ public String filterCharacters(String input) {
        private final Map<Counter, Tuple2<QueryScopeInfo, String>> counters = 
new HashMap<>();
        private final Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms 
= new HashMap<>();
        private final Map<Meter, Tuple2<QueryScopeInfo, String>> meters = new 
HashMap<>();
+       private final ExecutorService threadpool = 
Executors.newSingleThreadExecutor();
 
        @Override
        public void postStop() {
                serializer.close();
+               if (threadpool != null && !threadpool.isShutdown()) {
+                       threadpool.shutdownNow();
+               }
        }
 
        @Override
        public void onReceive(Object message) {
-               try {
-                       if (message instanceof AddMetric) {
-                               AddMetric added = (AddMetric) message;
-
-                               String metricName = added.metricName;
-                               Metric metric = added.metric;
-                               AbstractMetricGroup group = added.group;
-
-                               QueryScopeInfo info = 
group.getQueryServiceMetricInfo(FILTER);
-
-                               if (metric instanceof Counter) {
-                                       counters.put((Counter) metric, new 
Tuple2<>(info, FILTER.filterCharacters(metricName)));
-                               } else if (metric instanceof Gauge) {
-                                       gauges.put((Gauge<?>) metric, new 
Tuple2<>(info, FILTER.filterCharacters(metricName)));
-                               } else if (metric instanceof Histogram) {
-                                       histograms.put((Histogram) metric, new 
Tuple2<>(info, FILTER.filterCharacters(metricName)));
-                               } else if (metric instanceof Meter) {
-                                       meters.put((Meter) metric, new 
Tuple2<>(info, FILTER.filterCharacters(metricName)));
-                               }
-                       } else if (message instanceof RemoveMetric) {
-                               Metric metric = (((RemoveMetric) 
message).metric);
-                               if (metric instanceof Counter) {
-                                       this.counters.remove(metric);
-                               } else if (metric instanceof Gauge) {
-                                       this.gauges.remove(metric);
-                               } else if (metric instanceof Histogram) {
-                                       this.histograms.remove(metric);
-                               } else if (metric instanceof Meter) {
-                                       this.meters.remove(metric);
-                               }
-                       } else if (message instanceof CreateDump) {
-                               
MetricDumpSerialization.MetricSerializationResult dump = 
serializer.serialize(counters, gauges, histograms, meters);
-                               getSender().tell(dump, getSelf());
-                       } else {
-                               LOG.warn("MetricQueryServiceActor received an 
invalid message. " + message.toString());
-                               getSender().tell(new Status.Failure(new 
IOException("MetricQueryServiceActor received an invalid message. " + 
message.toString())), getSelf());
-                       }
-               } catch (Exception e) {
-                       LOG.warn("An exception occurred while processing a 
message.", e);
-               }
+               threadpool.submit(new MetricMessageHandlerRunnable(message, 
gauges, counters, histograms, meters, getSender(), getSelf(), serializer));
        }
 
        /**
@@ -221,4 +188,75 @@ public static Object getCreateDump() {
        private static class CreateDump implements Serializable {
                private static final CreateDump INSTANCE = new CreateDump();
        }
+
+       /**
+        * This runnable executes add metric, remove metric and create dump 
logic after notified.
+        */
+       private static final class MetricMessageHandlerRunnable implements 
Runnable {
+               private final Object message;
+               private final Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> 
gauges;
+               private final Map<Counter, Tuple2<QueryScopeInfo, String>> 
counters;
+               private final Map<Histogram, Tuple2<QueryScopeInfo, String>> 
histograms;
+               private final Map<Meter, Tuple2<QueryScopeInfo, String>> meters;
+               private final ActorRef sender;
+               private final ActorRef self;
+               private final MetricDumpSerializer serializer;
+
+               public MetricMessageHandlerRunnable(Object message, 
Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges,
+                       Map<Counter, Tuple2<QueryScopeInfo, String>> counters, 
Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms,
+                       Map<Meter, Tuple2<QueryScopeInfo, String>> meters, 
ActorRef sender, ActorRef self,
+                       MetricDumpSerializer serializer) {
+                       this.message = message;
+                       this.gauges = gauges;
+                       this.counters = counters;
+                       this.histograms = histograms;
+                       this.meters = meters;
+                       this.sender = sender;
+                       this.self = self;
+                       this.serializer = serializer;
+               }
+
+               @Override public void run() {
+                       try {
+                               if (message instanceof AddMetric) {
+                                       AddMetric added = (AddMetric) message;
+
+                                       String metricName = added.metricName;
+                                       Metric metric = added.metric;
+                                       AbstractMetricGroup group = added.group;
+
+                                       QueryScopeInfo info = 
group.getQueryServiceMetricInfo(FILTER);
+
+                                       if (metric instanceof Counter) {
+                                               counters.put((Counter) metric, 
new Tuple2<>(info, FILTER.filterCharacters(metricName)));
+                                       } else if (metric instanceof Gauge) {
+                                               gauges.put((Gauge<?>) metric, 
new Tuple2<>(info, FILTER.filterCharacters(metricName)));
+                                       } else if (metric instanceof Histogram) 
{
+                                               histograms.put((Histogram) 
metric, new Tuple2<>(info, FILTER.filterCharacters(metricName)));
+                                       } else if (metric instanceof Meter) {
+                                               meters.put((Meter) metric, new 
Tuple2<>(info, FILTER.filterCharacters(metricName)));
+                                       }
+                               } else if (message instanceof RemoveMetric) {
+                                       Metric metric = (((RemoveMetric) 
message).metric);
+                                       if (metric instanceof Counter) {
+                                               this.counters.remove(metric);
+                                       } else if (metric instanceof Gauge) {
+                                               this.gauges.remove(metric);
+                                       } else if (metric instanceof Histogram) 
{
+                                               this.histograms.remove(metric);
+                                       } else if (metric instanceof Meter) {
+                                               this.meters.remove(metric);
+                                       }
+                               } else if (message instanceof CreateDump) {
+                                       
MetricDumpSerialization.MetricSerializationResult dump = 
serializer.serialize(counters, gauges, histograms, meters);
+                                       sender.tell(dump, self);
+                               } else {
+                                       LOG.warn("MetricQueryServiceActor 
received an invalid message: {}.", message.toString());
+                                       sender.tell(new Status.Failure(new 
IOException("MetricQueryServiceActor received an invalid message: " + 
message.toString() + ".")), self);
+                               }
+                       } catch (Exception e) {
+                               LOG.warn("An exception occurred while 
processing a Metric related message.", e);
+                       }
+               }
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaExecutorMode.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaExecutorMode.java
new file mode 100644
index 00000000000..20497e1fc62
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaExecutorMode.java
@@ -0,0 +1,11 @@
+package org.apache.flink.runtime.rpc.akka;
+
+/**
+ * Created by Shimin Yang on 2018/9/21.
+ */
+public enum AkkaExecutorMode {
+       /** Used by default, use fork-join-executor dispatcher **/
+       FORK_JOIN_EXECUTOR,
+       /** Use single thread (Pinned) dispatcher **/
+       SINGLE_THREAD_EXECUTOR
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index 3a626986361..919664c459e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -35,6 +35,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
@@ -71,8 +72,18 @@
         * @throws Exception      Thrown is some other error occurs while 
creating akka actor system
         */
        public static RpcService createRpcService(String hostname, int port, 
Configuration configuration) throws Exception {
+               return createRpcService(hostname, port, configuration, 
AkkaExecutorMode.FORK_JOIN_EXECUTOR);
+       }
+
+       public static RpcService createRpcService(
+               String hostname,
+               int port,
+               Configuration configuration,
+               @Nonnull AkkaExecutorMode executorMode) throws Exception {
                LOG.info("Starting AkkaRpcService at {}.", 
NetUtils.unresolvedHostAndPortToNormalizedString(hostname, port));
 
+               Preconditions.checkNotNull(executorMode);
+
                final ActorSystem actorSystem;
 
                try {
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 9ce1865204f..2b91eb78253 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -28,6 +28,7 @@ import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.configuration.{AkkaOptions, Configuration, 
IllegalConfigurationException, SecurityOptions}
 import org.apache.flink.runtime.net.SSLUtils
+import org.apache.flink.runtime.rpc.akka.{AkkaExecutorMode, 
AkkaRpcServiceUtils}
 import org.apache.flink.util.NetUtils
 import org.jboss.netty.channel.ChannelException
 import org.jboss.netty.logging.{InternalLoggerFactory, Slf4JLoggerFactory}
@@ -116,6 +117,22 @@ object AkkaUtils {
     createActorSystem(getDefaultAkkaConfig)
   }
 
+  /**
+    * Return a remote Akka config for the given configuration values.
+    *
+    * @param configuration containing the user provided configuration values
+    * @param hostname to bind against. If null, then the loopback interface is 
used
+    * @param port to bind against\
+    * @param executorMode containing the user specified mode of executor
+    * @return A remote Akka config
+    */
+  def getAkkaConfig(configuration: Configuration,
+                    hostname: String,
+                    port: Int,
+                    executorMode: AkkaExecutorMode): Config = {
+    getAkkaConfig(configuration, Some((hostname, port)), executorMode)
+  }
+
   /**
     * Return a remote Akka config for the given configuration values.
     *
@@ -153,7 +170,60 @@ object AkkaUtils {
   @throws(classOf[UnknownHostException])
   def getAkkaConfig(configuration: Configuration,
                     externalAddress: Option[(String, Int)]): Config = {
-    val defaultConfig = getBasicAkkaConfig(configuration)
+    getAkkaConfig(configuration, externalAddress, 
AkkaExecutorMode.FORK_JOIN_EXECUTOR)
+  }
+
+  /**
+    * Creates an akka config with the provided configuration values. If the 
listening address is
+    * specified, then the actor system will listen on the respective address.
+    *
+    * @param configuration instance containing the user provided configuration 
values
+    * @param externalAddress optional tuple of bindAddress and port to be 
reachable at.
+    *                        If None is given, then an Akka config for local 
actor system
+    *                        will be returned
+    * @return Akka config
+    */
+  @throws(classOf[UnknownHostException])
+  def getAkkaConfig(configuration: Configuration,
+                    externalAddress: Option[(String, Int)],
+                    executorMode: AkkaExecutorMode): Config = {
+    val defaultConfig = executorMode match {
+      case AkkaExecutorMode.FORK_JOIN_EXECUTOR =>
+        getBasicAkkaConfig(configuration)
+      case AkkaExecutorMode.SINGLE_THREAD_EXECUTOR =>
+        getSingleThreadExecutorBasicAkkaConfig(configuration)
+    }
+
+    externalAddress match {
+
+      case Some((hostname, port)) =>
+
+        val remoteConfig = getRemoteAkkaConfig(configuration,
+          // the wildcard IP lets us bind to all network interfaces
+          NetUtils.getWildcardIPAddress, port,
+          hostname, port)
+
+        remoteConfig.withFallback(defaultConfig)
+
+      case None =>
+        defaultConfig
+    }
+  }
+
+  /**
+    * Creates an akka config with the provided configuration values which use 
single thread executor.
+    * If the listening address is specified, then the actor system will listen 
on the respective address.
+    *
+    * @param configuration instance containing the user provided configuration 
values
+    * @param externalAddress optional tuple of bindAddress and port to be 
reachable at.
+    *                        If None is given, then an Akka config for local 
actor system
+    *                        will be returned
+    * @return Akka config
+    */
+  @throws(classOf[UnknownHostException])
+  def getSingleThreadExecutorAkkaConfig(configuration: Configuration,
+                    externalAddress: Option[(String, Int)]): Config = {
+    val defaultConfig = getSingleThreadExecutorBasicAkkaConfig(configuration)
 
     externalAddress match {
 
@@ -182,29 +252,12 @@ object AkkaUtils {
   }
 
   /**
-   * Gets the basic Akka config which is shared by remote and local actor 
systems.
+   * Gets the basic Akka config with fork join executor which is shared by 
remote and local actor systems.
    *
    * @param configuration instance which contains the user specified values 
for the configuration
    * @return Flink's basic Akka config
    */
   private def getBasicAkkaConfig(configuration: Configuration): Config = {
-    val akkaThroughput = 
configuration.getInteger(AkkaOptions.DISPATCHER_THROUGHPUT)
-    val lifecycleEvents = 
configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS)
-
-    val jvmExitOnFatalError = if (
-      configuration.getBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR)){
-      "on"
-    } else {
-      "off"
-    }
-
-    val logLifecycleEvents = if (lifecycleEvents) "on" else "off"
-
-    val logLevel = getLogLevel
-
-    val supervisorStrategy = 
classOf[StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy]
-      .getCanonicalName
-
     val forkJoinExecutorParallelismFactor =
       
configuration.getDouble(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR)
 
@@ -223,37 +276,83 @@ object AkkaUtils {
          | }
        """.stripMargin
 
+    getBasicAkkaConfigWithParticularExecutor(configuration, 
forkJoinExecutorConfig)
+  }
+
+  /**
+    * Gets the basic Akka config with single thread executor which is shared 
by remote and local actor systems.
+    *
+    * @param configuration instance which contains the user specified values 
for the configuration
+    * @return Flink's basic Akka config
+    */
+  private def getSingleThreadExecutorBasicAkkaConfig(configuration: 
Configuration): Config = {
+    val singleThreadExecutorConfig =
+      s"""
+         | single-thread-executor {
+         |   executor = "thread-pool-executor"
+         |   type = PinnedDispatcher
+         | }
+       """.stripMargin
+
+    getBasicAkkaConfigWithParticularExecutor(configuration, 
singleThreadExecutorConfig)
+  }
+
+  /**
+    * Gets the basic Akka config which is shared by remote and local actor 
systems.
+    *
+    * @param configuration instance which contains the user specified values 
for the configuration
+    * @param executorConfig the akka config for particular executor
+    * @return Flink's basic Akka config
+    */
+  private def getBasicAkkaConfigWithParticularExecutor(configuration: 
Configuration, executorConfig: String): Config = {
+    val akkaThroughput = 
configuration.getInteger(AkkaOptions.DISPATCHER_THROUGHPUT)
+    val lifecycleEvents = 
configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS)
+
+    val jvmExitOnFatalError = if (
+      configuration.getBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR)){
+      "on"
+    } else {
+      "off"
+    }
+
+    val logLifecycleEvents = if (lifecycleEvents) "on" else "off"
+
+    val logLevel = getLogLevel
+
+    val supervisorStrategy = 
classOf[StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy]
+      .getCanonicalName
+
     val config =
       s"""
-        |akka {
-        | daemonic = off
-        |
+         |akka {
+         | daemonic = off
+         |
         | loggers = ["akka.event.slf4j.Slf4jLogger"]
-        | logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
-        | log-config-on-start = off
-        |
+         | logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
+         | log-config-on-start = off
+         |
         | jvm-exit-on-fatal-error = $jvmExitOnFatalError
-        |
+         |
         | serialize-messages = off
-        |
+         |
         | loglevel = $logLevel
-        | stdout-loglevel = OFF
-        |
+         | stdout-loglevel = OFF
+         |
         | log-dead-letters = $logLifecycleEvents
-        | log-dead-letters-during-shutdown = $logLifecycleEvents
-        |
+         | log-dead-letters-during-shutdown = $logLifecycleEvents
+         |
         | actor {
-        |   guardian-supervisor-strategy = $supervisorStrategy
-        |
+         |   guardian-supervisor-strategy = $supervisorStrategy
+         |
         |   warn-about-java-serializer-usage = off
-        |
+         |
         |   default-dispatcher {
-        |     throughput = $akkaThroughput
-        |
-        |   $forkJoinExecutorConfig
-        |   }
-        | }
-        |}
+         |     throughput = $akkaThroughput
+         |
+        |   $executorConfig
+         |   }
+         | }
+         |}
       """.stripMargin
 
     ConfigFactory.parseString(config)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Run MetricQueryService in separate thread pool
> ----------------------------------------------
>
>                 Key: FLINK-10247
>                 URL: https://issues.apache.org/jira/browse/FLINK-10247
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Metrics
>    Affects Versions: 1.5.3, 1.6.0, 1.7.0
>            Reporter: Till Rohrmann
>            Assignee: Shimin Yang
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> In order to make the {{MetricQueryService}} run independently of the main 
> Flink components, it should get its own dedicated thread pool assigned.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to