[
https://issues.apache.org/jira/browse/FLINK-1550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357042#comment-15357042
]
ASF GitHub Bot commented on FLINK-1550:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2146#discussion_r69127489
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java
---
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.HeapStateStore;
+import org.apache.flink.runtime.checkpoint.SavepointStore;
+import
org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.instance.InstanceManager;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import
org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingMessages;
+import org.apache.flink.runtime.testingUtils.TestingTaskManager;
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.Int;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.flink.metrics.MetricRegistry.KEY_METRICS_SCOPE_NAMING_JM_JOB;
+import static org.junit.Assert.assertEquals;
+
+public class JobManagerMetricTest {
+
+ private static ActorSystem system;
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @BeforeClass
+ public static void setup() {
+ system = AkkaUtils.createLocalActorSystem(new Configuration());
+ }
+
+ @AfterClass
+ public static void teardown() {
+ JavaTestKit.shutdownActorSystem(system);
+ }
+
+ /**
+ * Tests that metrics registered on the JobManager are actually
accessible.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testJobManagerMetricAccess() throws Exception {
+ FiniteDuration timeout = new FiniteDuration(30,
TimeUnit.SECONDS);
+ FiniteDuration jobRecoveryTimeout = new FiniteDuration(3,
TimeUnit.SECONDS);
+ Deadline deadline = new FiniteDuration(2,
TimeUnit.MINUTES).fromNow();
+ Configuration flinkConfiguration = new Configuration();
+ UUID leaderSessionID = UUID.randomUUID();
+ int slots = 2;
+ ActorRef archive = null;
+ ActorRef jobManager = null;
+ ActorRef taskManager = null;
+
+
flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
slots);
+ flinkConfiguration.setString(KEY_METRICS_SCOPE_NAMING_JM_JOB,
"jobmanager.<job_name>");
+
+ try {
+ Scheduler scheduler = new
Scheduler(TestingUtils.defaultExecutionContext());
+
+ TestingLeaderElectionService myLeaderElectionService =
new TestingLeaderElectionService();
+ TestingLeaderRetrievalService myLeaderRetrievalService
= new TestingLeaderRetrievalService();
+
+ InstanceManager instanceManager = new InstanceManager();
+ instanceManager.addInstanceListener(scheduler);
+
+ archive = system.actorOf(Props.create(
+ MemoryArchivist.class,
+ 10), "archive");
+
+ Props jobManagerProps = Props.create(
+ TestingJobManager.class,
+ flinkConfiguration,
+ new ForkJoinPool(),
+ instanceManager,
+ scheduler,
+ new BlobLibraryCacheManager(new
BlobServer(flinkConfiguration), 3600000),
+ archive,
+ new
FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
+ timeout,
+ myLeaderElectionService,
+ new StandaloneSubmittedJobGraphStore(),
+ new StandaloneCheckpointRecoveryFactory(),
+ new SavepointStore(new
HeapStateStore<CompletedCheckpoint>()),
+ jobRecoveryTimeout,
+ Option.apply(new
MetricRegistry(flinkConfiguration)));
+
+ jobManager = system.actorOf(jobManagerProps,
"jobmanager");
+ ActorGateway gateway = new AkkaActorGateway(jobManager,
leaderSessionID);
+
+ taskManager =
TaskManager.startTaskManagerComponentsAndActor(
+ flinkConfiguration,
+ ResourceID.generate(),
+ system,
+ "localhost",
+ Option.apply("taskmanager"),
+ Option.apply((LeaderRetrievalService)
myLeaderRetrievalService),
+ true,
+ TestingTaskManager.class);
+
+ ActorGateway tmGateway = new
AkkaActorGateway(taskManager, leaderSessionID);
+ Future<Object> tmAlive =
tmGateway.ask(TestingMessages.getAlive(), deadline.timeLeft());
+ Await.ready(tmAlive, deadline.timeLeft());
+
+ JobVertex sourceJobVertex = new JobVertex("Source");
+
sourceJobVertex.setInvokableClass(BlockingInvokable.class);
+ sourceJobVertex.setParallelism(slots);
+
+ JobGraph jobGraph = new JobGraph("TestingJob",
sourceJobVertex);
+ jobGraph.setSnapshotSettings(new
JobSnapshottingSettings(
+ Collections.<JobVertexID>emptyList(),
+ Collections.<JobVertexID>emptyList(),
+ Collections.<JobVertexID>emptyList(),
+ 500, 500, 50, 5));
+
+ Future<Object> isLeader = gateway.ask(
+ TestingJobManagerMessages.getNotifyWhenLeader(),
+ deadline.timeLeft());
+
+ Future<Object> isConnectedToJobManager = tmGateway.ask(
+ new
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager),
+ deadline.timeLeft());
+
+ // tell jobManager that he's the leader
+ myLeaderElectionService.isLeader(leaderSessionID);
+ // tell taskManager who's the leader
+ myLeaderRetrievalService.notifyListener(gateway.path(),
leaderSessionID);
+
+ Await.ready(isLeader, deadline.timeLeft());
+ Await.ready(isConnectedToJobManager,
deadline.timeLeft());
--- End diff --
Can it be that you haven't pushed you commit yet? Because I still see the
old version.
> Show JVM Metrics for JobManager
> -------------------------------
>
> Key: FLINK-1550
> URL: https://issues.apache.org/jira/browse/FLINK-1550
> Project: Flink
> Issue Type: Sub-task
> Components: JobManager, Metrics
> Reporter: Robert Metzger
> Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)