http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java index f419908..d992b85 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java @@ -18,16 +18,20 @@ package org.apache.flink.runtime.webmonitor.handlers; +import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; + import org.junit.Assert; import org.junit.Test; +import static org.mockito.Mockito.mock; + /** * Tests for the SubtaskCurrentAttemptDetailsHandler. */ public class SubtaskCurrentAttemptDetailsHandlerTest { @Test public void testGetPaths() { - SubtaskCurrentAttemptDetailsHandler handler = new SubtaskCurrentAttemptDetailsHandler(null, null); + SubtaskCurrentAttemptDetailsHandler handler = new SubtaskCurrentAttemptDetailsHandler(mock(ExecutionGraphHolder.class), null); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java index 74a19a9..ce8e72f 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.runtime.executiongraph.AccessExecution; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; @@ -33,6 +34,8 @@ import org.junit.Test; import java.io.IOException; import java.util.Collection; +import static org.mockito.Mockito.mock; + /** * Tests for the SubtaskExecutionAttemptAccumulatorsHandler. */ @@ -61,7 +64,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandlerTest { @Test public void testGetPaths() { - SubtaskExecutionAttemptAccumulatorsHandler handler = new SubtaskExecutionAttemptAccumulatorsHandler(null); + SubtaskExecutionAttemptAccumulatorsHandler handler = new SubtaskExecutionAttemptAccumulatorsHandler(mock(ExecutionGraphHolder.class)); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators", paths[0]); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java index a9161b3..e1fbf92 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecution; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; @@ -34,6 +35,8 @@ import java.io.IOException; import java.util.Collection; import java.util.Iterator; +import static org.mockito.Mockito.mock; + /** * Tests for the SubtaskExecutionAttemptDetailsHandler. */ @@ -70,7 +73,7 @@ public class SubtaskExecutionAttemptDetailsHandlerTest { @Test public void testGetPaths() { - SubtaskExecutionAttemptDetailsHandler handler = new SubtaskExecutionAttemptDetailsHandler(null, null); + SubtaskExecutionAttemptDetailsHandler handler = new SubtaskExecutionAttemptDetailsHandler(mock(ExecutionGraphHolder.class), null); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", paths[0]); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java index 6022be2..f33da80 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; @@ -33,6 +34,8 @@ import org.junit.Test; import java.io.IOException; import java.util.Collection; +import static org.mockito.Mockito.mock; + /** * Tests for the SubtasksAllAccumulatorsHandler. */ @@ -55,7 +58,7 @@ public class SubtasksAllAccumulatorsHandlerTest { @Test public void testGetPaths() { - SubtasksAllAccumulatorsHandler handler = new SubtasksAllAccumulatorsHandler(null); + SubtasksAllAccumulatorsHandler handler = new SubtasksAllAccumulatorsHandler(mock(ExecutionGraphHolder.class)); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", paths[0]); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java index 22a2d27..548efaf 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecution; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; @@ -34,6 +35,8 @@ import org.junit.Test; import java.io.IOException; import java.util.Collection; +import static org.mockito.Mockito.mock; + /** * Tests for the SubtasksTimesHandler. */ @@ -56,7 +59,7 @@ public class SubtasksTimesHandlerTest { @Test public void testGetPaths() { - SubtasksTimesHandler handler = new SubtasksTimesHandler(null); + SubtasksTimesHandler handler = new SubtasksTimesHandler(mock(ExecutionGraphHolder.class)); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasktimes", paths[0]); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java index 5846d75..cf59f05 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java @@ -21,17 +21,16 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; -import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.webmonitor.JobManagerRetriever; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever; import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; @@ -46,19 +45,12 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import java.io.IOException; -import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; -import scala.Option; -import scala.collection.JavaConverters; -import scala.concurrent.ExecutionContext$; -import scala.concurrent.ExecutionContextExecutor; -import scala.concurrent.Future$; -import scala.concurrent.duration.FiniteDuration; - import static org.mockito.Matchers.any; import static org.mockito.Matchers.isA; import static org.powermock.api.mockito.PowerMockito.mock; @@ -72,9 +64,9 @@ public class TaskManagerLogHandlerTest { public void testGetPaths() { TaskManagerLogHandler handlerLog = new TaskManagerLogHandler( mock(JobManagerRetriever.class), - mock(ExecutionContextExecutor.class), - Future$.MODULE$.successful("/jm/address"), - AkkaUtils.getDefaultClientTimeout(), + Executors.directExecutor(), + CompletableFuture.completedFuture("/jm/address"), + TestingUtils.TIMEOUT(), TaskManagerLogHandler.FileMode.LOG, new Configuration(), false, @@ -85,9 +77,9 @@ public class TaskManagerLogHandlerTest { TaskManagerLogHandler handlerOut = new TaskManagerLogHandler( mock(JobManagerRetriever.class), - mock(ExecutionContextExecutor.class), - Future$.MODULE$.successful("/jm/address"), - AkkaUtils.getDefaultClientTimeout(), + Executors.directExecutor(), + CompletableFuture.completedFuture("/jm/address"), + TestingUtils.TIMEOUT(), TaskManagerLogHandler.FileMode.STDOUT, new Configuration(), false, @@ -115,27 +107,21 @@ public class TaskManagerLogHandlerTest { // ========= setup JobManager ================================================================================== - ActorGateway jobManagerGateway = mock(ActorGateway.class); - Object registeredTaskManagersAnswer = new JobManagerMessages.RegisteredTaskManagers( - JavaConverters.collectionAsScalaIterableConverter(Collections.singletonList(taskManager)).asScala()); - - when(jobManagerGateway.ask(isA(JobManagerMessages.RequestRegisteredTaskManagers$.class), any(FiniteDuration.class))) - .thenReturn(Future$.MODULE$.successful(registeredTaskManagersAnswer)); - when(jobManagerGateway.ask(isA(JobManagerMessages.getRequestBlobManagerPort().getClass()), any(FiniteDuration.class))) - .thenReturn(Future$.MODULE$.successful((Object) 5)); - when(jobManagerGateway.ask(isA(JobManagerMessages.RequestTaskManagerInstance.class), any(FiniteDuration.class))) - .thenReturn(Future$.MODULE$.successful((Object) new JobManagerMessages.TaskManagerInstance(Option.apply(taskManager)))); - when(jobManagerGateway.path()).thenReturn("/jm/address"); + JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); + when(jobManagerGateway.requestBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(1337)); + when(jobManagerGateway.getHostname()).thenReturn("localhost"); + when(jobManagerGateway.requestTaskManagerInstance(any(InstanceID.class), any(Time.class))).thenReturn( + CompletableFuture.completedFuture(Optional.of(taskManager))); JobManagerRetriever retriever = mock(JobManagerRetriever.class); - when(retriever.getJobManagerGatewayAndWebPort()) - .thenReturn(Option.apply(new scala.Tuple2<ActorGateway, Integer>(jobManagerGateway, 0))); + when(retriever.getJobManagerGatewayNow()) + .thenReturn(Optional.of(jobManagerGateway)); TaskManagerLogHandler handler = new TaskManagerLogHandler( retriever, - ExecutionContext$.MODULE$.fromExecutor(Executors.directExecutor()), - Future$.MODULE$.successful("/jm/address"), - AkkaUtils.getDefaultClientTimeout(), + Executors.directExecutor(), + CompletableFuture.completedFuture("/jm/address"), + TestingUtils.TIMEOUT(), TaskManagerLogHandler.FileMode.LOG, new Configuration(), false, http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java index 17e7e9d..2992d91 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java @@ -18,14 +18,13 @@ package org.apache.flink.runtime.webmonitor.handlers; +import org.apache.flink.api.common.time.Time; + import com.google.common.collect.Lists; import org.junit.Assert; import org.junit.Test; import java.util.List; -import java.util.concurrent.TimeUnit; - -import scala.concurrent.duration.FiniteDuration; /** * Tests for the TaskManagersHandler. @@ -33,7 +32,7 @@ import scala.concurrent.duration.FiniteDuration; public class TaskManagersHandlerTest { @Test public void testGetPaths() { - TaskManagersHandler handler = new TaskManagersHandler(new FiniteDuration(0, TimeUnit.SECONDS), null); + TaskManagersHandler handler = new TaskManagersHandler(Time.seconds(0L), null); String[] paths = handler.getPaths(); Assert.assertEquals(2, paths.length); List<String> pathsList = Lists.newArrayList(paths); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java index b032061..90e032d 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java @@ -18,17 +18,17 @@ package org.apache.flink.runtime.webmonitor.metrics; -import org.apache.flink.runtime.webmonitor.JobManagerRetriever; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever; +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; import org.apache.flink.util.TestLogger; -import akka.actor.ActorSystem; import org.junit.Test; import java.util.HashMap; import java.util.Map; -import scala.concurrent.ExecutionContext; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.powermock.api.mockito.PowerMockito.mock; @@ -42,7 +42,11 @@ public class AbstractMetricsHandlerTest extends TestLogger { */ @Test public void testHandleRequest() throws Exception { - MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class)); + MetricFetcher fetcher = new MetricFetcher( + mock(JobManagerRetriever.class), + mock(MetricQueryServiceRetriever.class), + Executors.directExecutor(), + TestingUtils.TIMEOUT()); MetricStoreTest.setupStore(fetcher.getMetricStore()); JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher); @@ -91,7 +95,11 @@ public class AbstractMetricsHandlerTest extends TestLogger { */ @Test public void testInvalidListDoesNotFail() { - MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class)); + MetricFetcher fetcher = new MetricFetcher( + mock(JobManagerRetriever.class), + mock(MetricQueryServiceRetriever.class), + Executors.directExecutor(), + TestingUtils.TIMEOUT()); MetricStoreTest.setupStore(fetcher.getMetricStore()); JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher); @@ -117,7 +125,11 @@ public class AbstractMetricsHandlerTest extends TestLogger { */ @Test public void testInvalidGetDoesNotFail() { - MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class)); + MetricFetcher fetcher = new MetricFetcher( + mock(JobManagerRetriever.class), + mock(MetricQueryServiceRetriever.class), + Executors.directExecutor(), + TestingUtils.TIMEOUT()); MetricStoreTest.setupStore(fetcher.getMetricStore()); JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java index 97c2055..994fc5e 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java @@ -18,18 +18,18 @@ package org.apache.flink.runtime.webmonitor.metrics; -import org.apache.flink.runtime.webmonitor.JobManagerRetriever; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever; +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; import org.apache.flink.util.TestLogger; -import akka.actor.ActorSystem; import org.junit.Assert; import org.junit.Test; import java.util.HashMap; import java.util.Map; -import scala.concurrent.ExecutionContext; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.powermock.api.mockito.PowerMockito.mock; @@ -48,7 +48,11 @@ public class JobManagerMetricsHandlerTest extends TestLogger { @Test public void getMapFor() { - MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class)); + MetricFetcher fetcher = new MetricFetcher( + mock(JobManagerRetriever.class), + mock(MetricQueryServiceRetriever.class), + Executors.directExecutor(), + TestingUtils.TIMEOUT()); MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore()); JobManagerMetricsHandler handler = new JobManagerMetricsHandler(fetcher); @@ -62,7 +66,11 @@ public class JobManagerMetricsHandlerTest extends TestLogger { @Test public void getMapForNull() { - MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class)); + MetricFetcher fetcher = new MetricFetcher( + mock(JobManagerRetriever.class), + mock(MetricQueryServiceRetriever.class), + Executors.directExecutor(), + TestingUtils.TIMEOUT()); MetricStore store = fetcher.getMetricStore(); JobManagerMetricsHandler handler = new JobManagerMetricsHandler(fetcher); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java index 53666eb..a35af22 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java @@ -18,18 +18,18 @@ package org.apache.flink.runtime.webmonitor.metrics; -import org.apache.flink.runtime.webmonitor.JobManagerRetriever; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever; +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; import org.apache.flink.util.TestLogger; -import akka.actor.ActorSystem; import org.junit.Assert; import org.junit.Test; import java.util.HashMap; import java.util.Map; -import scala.concurrent.ExecutionContext; - import static org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler.PARAMETER_JOB_ID; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -49,7 +49,11 @@ public class JobMetricsHandlerTest extends TestLogger { @Test public void getMapFor() throws Exception { - MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class)); + MetricFetcher fetcher = new MetricFetcher( + mock(JobManagerRetriever.class), + mock(MetricQueryServiceRetriever.class), + Executors.directExecutor(), + TestingUtils.TIMEOUT()); MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore()); JobMetricsHandler handler = new JobMetricsHandler(fetcher); @@ -64,7 +68,11 @@ public class JobMetricsHandlerTest extends TestLogger { @Test public void getMapForNull() { - MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class)); + MetricFetcher fetcher = new MetricFetcher( + mock(JobManagerRetriever.class), + mock(MetricQueryServiceRetriever.class), + Executors.directExecutor(), + TestingUtils.TIMEOUT()); MetricStore store = fetcher.getMetricStore(); JobMetricsHandler handler = new JobMetricsHandler(fetcher); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java index 5f68c6f..e84b11d 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java @@ -18,18 +18,18 @@ package org.apache.flink.runtime.webmonitor.metrics; -import org.apache.flink.runtime.webmonitor.JobManagerRetriever; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever; +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; import org.apache.flink.util.TestLogger; -import akka.actor.ActorSystem; import org.junit.Assert; import org.junit.Test; import java.util.HashMap; import java.util.Map; -import scala.concurrent.ExecutionContext; - import static org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler.PARAMETER_JOB_ID; import static org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler.PARAMETER_VERTEX_ID; import static org.junit.Assert.assertEquals; @@ -50,7 +50,11 @@ public class JobVertexMetricsHandlerTest extends TestLogger { @Test public void getMapFor() throws Exception { - MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class)); + MetricFetcher fetcher = new MetricFetcher( + mock(JobManagerRetriever.class), + mock(MetricQueryServiceRetriever.class), + Executors.directExecutor(), + TestingUtils.TIMEOUT()); MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore()); JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher); @@ -68,7 +72,11 @@ public class JobVertexMetricsHandlerTest extends TestLogger { @Test public void getMapForNull() { - MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class)); + MetricFetcher fetcher = new MetricFetcher( + mock(JobManagerRetriever.class), + mock(MetricQueryServiceRetriever.class), + Executors.directExecutor(), + TestingUtils.TIMEOUT()); MetricStore store = fetcher.getMetricStore(); JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java index 369e8aa..4c91997 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.webmonitor.metrics; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; @@ -26,49 +27,39 @@ import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; -import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; -import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails; import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization; import org.apache.flink.runtime.metrics.dump.MetricQueryService; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.apache.flink.runtime.metrics.util.TestingHistogram; -import org.apache.flink.runtime.webmonitor.JobManagerRetriever; +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway; +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; +import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever; import org.apache.flink.util.TestLogger; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; -import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.Executor; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; -import scala.Option; -import scala.collection.JavaConverters; -import scala.concurrent.ExecutionContext$; -import scala.concurrent.ExecutionContextExecutor; -import scala.concurrent.Future$; -import scala.concurrent.duration.FiniteDuration; - -import static org.apache.flink.runtime.metrics.dump.MetricQueryService.METRIC_QUERY_SERVICE_NAME; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.eq; -import static org.mockito.Matchers.isA; import static org.powermock.api.mockito.PowerMockito.mock; import static org.powermock.api.mockito.PowerMockito.when; -import static org.powermock.api.mockito.PowerMockito.whenNew; /** * Tests for the MetricFetcher. @@ -78,6 +69,8 @@ import static org.powermock.api.mockito.PowerMockito.whenNew; public class MetricFetcherTest extends TestLogger { @Test public void testUpdate() throws Exception { + final Time timeout = Time.seconds(10L); + // ========= setup TaskManager ================================================================================= JobID jobID = new JobID(); InstanceID tmID = new InstanceID(); @@ -94,58 +87,40 @@ public class MetricFetcherTest extends TestLogger { JobDetails details = mock(JobDetails.class); when(details.getJobId()).thenReturn(jobID); - ActorGateway jobManagerGateway = mock(ActorGateway.class); - Object registeredTaskManagersAnswer = new JobManagerMessages.RegisteredTaskManagers( - JavaConverters.collectionAsScalaIterableConverter(Collections.singletonList(taskManager)).asScala()); + JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); - when(jobManagerGateway.ask(isA(RequestJobDetails.class), any(FiniteDuration.class))) - .thenReturn(Future$.MODULE$.successful((Object) new MultipleJobsDetails(new JobDetails[0], new JobDetails[0]))); - when(jobManagerGateway.ask(isA(JobManagerMessages.RequestRegisteredTaskManagers$.class), any(FiniteDuration.class))) - .thenReturn(Future$.MODULE$.successful(registeredTaskManagersAnswer)); - when(jobManagerGateway.path()).thenReturn("/jm/address"); + when(jobManagerGateway.requestJobDetails(anyBoolean(), anyBoolean(), any(Time.class))) + .thenReturn(CompletableFuture.completedFuture(new MultipleJobsDetails(new JobDetails[0], new JobDetails[0]))); + when(jobManagerGateway.requestTaskManagerInstances(any(Time.class))) + .thenReturn(CompletableFuture.completedFuture(Collections.singleton(taskManager))); + when(jobManagerGateway.getAddress()).thenReturn("/jm/address"); + when(jobManagerGateway.requestWebPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(0)); - JobManagerRetriever retriever = mock(JobManagerRetriever.class); - when(retriever.getJobManagerGatewayAndWebPort()) - .thenReturn(Option.apply(new scala.Tuple2<ActorGateway, Integer>(jobManagerGateway, 0))); + AkkaJobManagerRetriever retriever = mock(AkkaJobManagerRetriever.class); + when(retriever.getJobManagerGatewayNow()) + .thenReturn(Optional.of(jobManagerGateway)); // ========= setup QueryServices ================================================================================ - Object requestMetricsAnswer = createRequestDumpAnswer(tmID, jobID); - - final ActorRef jmQueryService = mock(ActorRef.class); - final ActorRef tmQueryService = mock(ActorRef.class); - - ActorSystem actorSystem = mock(ActorSystem.class); - when(actorSystem.actorFor(eq("/jm/" + METRIC_QUERY_SERVICE_NAME))).thenReturn(jmQueryService); - when(actorSystem.actorFor(eq("/tm/" + METRIC_QUERY_SERVICE_NAME + "_" + tmRID.getResourceIdString()))).thenReturn(tmQueryService); - - MetricFetcher.BasicGateway jmQueryServiceGateway = mock(MetricFetcher.BasicGateway.class); - when(jmQueryServiceGateway.ask(any(MetricQueryService.getCreateDump().getClass()), any(FiniteDuration.class))) - .thenReturn(Future$.MODULE$.successful((Object) new MetricDumpSerialization.MetricSerializationResult(new byte[0], 0, 0, 0, 0))); - - MetricFetcher.BasicGateway tmQueryServiceGateway = mock(MetricFetcher.BasicGateway.class); - when(tmQueryServiceGateway.ask(any(MetricQueryService.getCreateDump().getClass()), any(FiniteDuration.class))) - .thenReturn(Future$.MODULE$.successful(requestMetricsAnswer)); - - whenNew(MetricFetcher.BasicGateway.class) - .withArguments(eq(new Object() { - @Override - public boolean equals(Object o) { - return o == jmQueryService; - } - })) - .thenReturn(jmQueryServiceGateway); - whenNew(MetricFetcher.BasicGateway.class) - .withArguments(eq(new Object() { - @Override - public boolean equals(Object o) { - return o == tmQueryService; - } - })) - .thenReturn(tmQueryServiceGateway); + MetricQueryServiceGateway jmQueryService = mock(MetricQueryServiceGateway.class); + MetricQueryServiceGateway tmQueryService = mock(MetricQueryServiceGateway.class); + + MetricDumpSerialization.MetricSerializationResult requestMetricsAnswer = createRequestDumpAnswer(tmID, jobID); + + when(jmQueryService.queryMetrics(any(Time.class))) + .thenReturn(CompletableFuture.completedFuture(new MetricDumpSerialization.MetricSerializationResult(new byte[0], 0, 0, 0, 0))); + when(tmQueryService.queryMetrics(any(Time.class))) + .thenReturn(CompletableFuture.completedFuture(requestMetricsAnswer)); + + MetricQueryServiceRetriever queryServiceRetriever = mock(MetricQueryServiceRetriever.class); + when(queryServiceRetriever.retrieveService(eq("/jm/" + MetricQueryService.METRIC_QUERY_SERVICE_NAME))).thenReturn(CompletableFuture.completedFuture(jmQueryService)); + when(queryServiceRetriever.retrieveService(eq("/tm/" + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + tmRID.getResourceIdString()))).thenReturn(CompletableFuture.completedFuture(tmQueryService)); // ========= start MetricFetcher testing ======================================================================= - ExecutionContextExecutor context = ExecutionContext$.MODULE$.fromExecutor(new CurrentThreadExecutor()); - MetricFetcher fetcher = new MetricFetcher(actorSystem, retriever, context); + MetricFetcher fetcher = new MetricFetcher( + retriever, + queryServiceRetriever, + Executors.directExecutor(), + timeout); // verify that update fetches metrics and updates the store fetcher.update(); @@ -170,13 +145,7 @@ public class MetricFetcherTest extends TestLogger { } } - private static class CurrentThreadExecutor implements Executor { - public void execute(Runnable r) { - r.run(); - } - } - - private static MetricDumpSerialization.MetricSerializationResult createRequestDumpAnswer(InstanceID tmID, JobID jobID) throws IOException { + private static MetricDumpSerialization.MetricSerializationResult createRequestDumpAnswer(InstanceID tmID, JobID jobID) { Map<Counter, Tuple2<QueryScopeInfo, String>> counters = new HashMap<>(); Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new HashMap<>(); Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java index 4333f04..c20ea98 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java @@ -18,18 +18,18 @@ package org.apache.flink.runtime.webmonitor.metrics; -import org.apache.flink.runtime.webmonitor.JobManagerRetriever; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever; +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; import org.apache.flink.util.TestLogger; -import akka.actor.ActorSystem; import org.junit.Assert; import org.junit.Test; import java.util.HashMap; import java.util.Map; -import scala.concurrent.ExecutionContext; - import static org.apache.flink.runtime.webmonitor.handlers.TaskManagersHandler.TASK_MANAGER_ID_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -49,7 +49,11 @@ public class TaskManagerMetricsHandlerTest extends TestLogger { @Test public void getMapFor() throws Exception { - MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class)); + MetricFetcher fetcher = new MetricFetcher( + mock(JobManagerRetriever.class), + mock(MetricQueryServiceRetriever.class), + Executors.directExecutor(), + TestingUtils.TIMEOUT()); MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore()); TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(fetcher); @@ -64,7 +68,11 @@ public class TaskManagerMetricsHandlerTest extends TestLogger { @Test public void getMapForNull() { - MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class)); + MetricFetcher fetcher = new MetricFetcher( + mock(JobManagerRetriever.class), + mock(MetricQueryServiceRetriever.class), + Executors.directExecutor(), + TestingUtils.TIMEOUT()); MetricStore store = fetcher.getMetricStore(); TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(fetcher); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java index 6ee78dd..bbc5889 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java @@ -22,13 +22,23 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.concurrent.FlinkFutureException; import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview; +import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; +import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails; +import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview; +import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview; +import org.apache.flink.runtime.messages.webmonitor.StatusOverview; import org.apache.flink.util.Preconditions; +import java.util.Collection; import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -37,7 +47,8 @@ import scala.Option; import scala.reflect.ClassTag$; /** - * Implementation of the {@link JobManagerGateway} for the {@link ActorGateway}. + * Implementation of the {@link JobManagerGateway} for old JobManager code based + * on Akka actors and the {@link ActorGateway}. */ public class AkkaJobManagerGateway implements JobManagerGateway { @@ -48,7 +59,6 @@ public class AkkaJobManagerGateway implements JobManagerGateway { this.jobManagerGateway = Preconditions.checkNotNull(jobManagerGateway); final Option<String> optHostname = jobManagerGateway.actor().path().address().host(); - hostname = optHostname.isDefined() ? optHostname.get() : "localhost"; } @@ -63,25 +73,6 @@ public class AkkaJobManagerGateway implements JobManagerGateway { } @Override - public CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> requestClassloadingProps(JobID jobId, Time timeout) { - return FutureUtils - .toJava(jobManagerGateway - .ask( - new JobManagerMessages.RequestClassloadingProps(jobId), - FutureUtils.toFiniteDuration(timeout))) - .thenApply( - (Object response) -> { - if (response instanceof JobManagerMessages.ClassloadingProps) { - return Optional.of(((JobManagerMessages.ClassloadingProps) response)); - } else if (response instanceof JobManagerMessages.JobNotFound) { - return Optional.empty(); - } else { - throw new FlinkFutureException("Unknown response: " + response + '.'); - } - }); - } - - @Override public CompletableFuture<Integer> requestBlobServerPort(Time timeout) { return FutureUtils.toJava( jobManagerGateway @@ -90,6 +81,21 @@ public class AkkaJobManagerGateway implements JobManagerGateway { } @Override + public CompletableFuture<Integer> requestWebPort(Time timeout) { + CompletableFuture<JobManagerMessages.ResponseWebMonitorPort> portResponseFuture = FutureUtils.toJava( + jobManagerGateway + .ask(JobManagerMessages.getRequestWebMonitorPort(), FutureUtils.toFiniteDuration(timeout)) + .mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.ResponseWebMonitorPort.class))); + + return portResponseFuture.thenApply( + JobManagerMessages.ResponseWebMonitorPort::port); + } + + //-------------------------------------------------------------------------------- + // Job control + //-------------------------------------------------------------------------------- + + @Override public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, ListeningBehaviour listeningBehaviour, Time timeout) { return FutureUtils .toJava( @@ -119,4 +125,146 @@ public class AkkaJobManagerGateway implements JobManagerGateway { } ); } + + @Override + public CompletableFuture<String> cancelJobWithSavepoint(JobID jobId, String savepointPath, Time timeout) { + CompletableFuture<JobManagerMessages.CancellationResponse> cancellationFuture = FutureUtils.toJava( + jobManagerGateway + .ask(new JobManagerMessages.CancelJobWithSavepoint(jobId, savepointPath), FutureUtils.toFiniteDuration(timeout)) + .mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationResponse.class))); + + return cancellationFuture.thenApply( + (JobManagerMessages.CancellationResponse response) -> { + if (response instanceof JobManagerMessages.CancellationSuccess) { + return ((JobManagerMessages.CancellationSuccess) response).savepointPath(); + } else { + throw new FlinkFutureException("Cancel with savepoint failed.", ((JobManagerMessages.CancellationFailure) response).cause()); + } + }); + } + + @Override + public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout) { + CompletableFuture<JobManagerMessages.CancellationResponse> responseFuture = FutureUtils.toJava( + jobManagerGateway + .ask(new JobManagerMessages.CancelJob(jobId), FutureUtils.toFiniteDuration(timeout)) + .mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationResponse.class))); + + return responseFuture.thenApply( + (JobManagerMessages.CancellationResponse response) -> { + if (response instanceof JobManagerMessages.CancellationSuccess) { + return Acknowledge.get(); + } else { + throw new FlinkFutureException("Cancel job failed " + jobId + '.', ((JobManagerMessages.CancellationFailure) response).cause()); + } + }); + } + + @Override + public CompletableFuture<Acknowledge> stopJob(JobID jobId, Time timeout) { + CompletableFuture<JobManagerMessages.StoppingResponse> responseFuture = FutureUtils.toJava( + jobManagerGateway + .ask(new JobManagerMessages.StopJob(jobId), FutureUtils.toFiniteDuration(timeout)) + .mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.StoppingResponse.class))); + + return responseFuture.thenApply( + (JobManagerMessages.StoppingResponse response) -> { + if (response instanceof JobManagerMessages.StoppingSuccess) { + return Acknowledge.get(); + } else { + throw new FlinkFutureException("Stop job failed " + jobId + '.', ((JobManagerMessages.StoppingFailure) response).cause()); + } + }); + } + + //-------------------------------------------------------------------------------- + // JobManager information + //-------------------------------------------------------------------------------- + + @Override + public CompletableFuture<Optional<Instance>> requestTaskManagerInstance(InstanceID instanceId, Time timeout) { + return FutureUtils.toJava( + jobManagerGateway + .ask(new JobManagerMessages.RequestTaskManagerInstance(instanceId), FutureUtils.toFiniteDuration(timeout)) + .mapTo(ClassTag$.MODULE$.<JobManagerMessages.TaskManagerInstance>apply(JobManagerMessages.TaskManagerInstance.class))) + .thenApply( + (JobManagerMessages.TaskManagerInstance taskManagerResponse) -> { + if (taskManagerResponse.instance().isDefined()) { + return Optional.of(taskManagerResponse.instance().get()); + } else { + return Optional.empty(); + } + }); + } + + @Override + public CompletableFuture<Collection<Instance>> requestTaskManagerInstances(Time timeout) { + CompletableFuture<JobManagerMessages.RegisteredTaskManagers> taskManagersFuture = FutureUtils.toJava( + jobManagerGateway + .ask(JobManagerMessages.getRequestRegisteredTaskManagers(), FutureUtils.toFiniteDuration(timeout)) + .mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.RegisteredTaskManagers.class))); + + return taskManagersFuture.thenApply( + JobManagerMessages.RegisteredTaskManagers::asJavaCollection); + } + + @Override + public CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> requestClassloadingProps(JobID jobId, Time timeout) { + return FutureUtils + .toJava(jobManagerGateway + .ask( + new JobManagerMessages.RequestClassloadingProps(jobId), + FutureUtils.toFiniteDuration(timeout))) + .thenApply( + (Object response) -> { + if (response instanceof JobManagerMessages.ClassloadingProps) { + return Optional.of(((JobManagerMessages.ClassloadingProps) response)); + } else if (response instanceof JobManagerMessages.JobNotFound) { + return Optional.empty(); + } else { + throw new FlinkFutureException("Unknown response: " + response + '.'); + } + }); + } + + @Override + public CompletableFuture<MultipleJobsDetails> requestJobDetails(boolean includeRunning, boolean includeFinished, Time timeout) { + return FutureUtils.toJava( + jobManagerGateway + .ask(new RequestJobDetails(true, true), FutureUtils.toFiniteDuration(timeout)) + .mapTo(ClassTag$.MODULE$.apply(MultipleJobsDetails.class))); + } + + @Override + public CompletableFuture<Optional<AccessExecutionGraph>> requestJob(JobID jobId, Time timeout) { + CompletableFuture<JobManagerMessages.JobResponse> jobResponseFuture = FutureUtils.toJava( + jobManagerGateway + .ask(new JobManagerMessages.RequestJob(jobId), FutureUtils.toFiniteDuration(timeout)) + .mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.JobResponse.class))); + + return jobResponseFuture.thenApply( + (JobManagerMessages.JobResponse jobResponse) -> { + if (jobResponse instanceof JobManagerMessages.JobFound) { + return Optional.of(((JobManagerMessages.JobFound) jobResponse).executionGraph()); + } else { + return Optional.empty(); + } + }); + } + + @Override + public CompletableFuture<StatusOverview> requestStatusOverview(Time timeout) { + return FutureUtils.toJava( + jobManagerGateway + .ask(RequestStatusOverview.getInstance(), FutureUtils.toFiniteDuration(timeout)) + .mapTo(ClassTag$.MODULE$.apply(StatusOverview.class))); + } + + @Override + public CompletableFuture<JobsWithIDsOverview> requestJobsOverview(Time timeout) { + return FutureUtils.toJava( + jobManagerGateway + .ask(RequestJobsWithIDsOverview.getInstance(), FutureUtils.toFiniteDuration(timeout)) + .mapTo(ClassTag$.MODULE$.apply(JobsWithIDsOverview.class))); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java index 562e697..19f0e2c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java @@ -421,7 +421,9 @@ public class JobClient { LOG.info("Checking and uploading JAR files"); - final CompletableFuture<InetSocketAddress> blobServerAddressFuture = retrieveBlobServerAddress(jobManagerGateway, timeout); + final CompletableFuture<InetSocketAddress> blobServerAddressFuture = retrieveBlobServerAddress( + jobManagerGateway, + timeout); final InetSocketAddress blobServerAddress; @@ -448,7 +450,7 @@ public class JobClient { "JobManager did not respond within " + timeout, e); } catch (Throwable throwable) { Throwable stripped = ExceptionUtils.stripExecutionException(throwable); - + try { ExceptionUtils.tryDeserializeAndThrow(stripped, classLoader); } catch (JobExecutionException jee) { http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java index f204393..d24a3d0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java @@ -18,14 +18,14 @@ package org.apache.flink.runtime.clusterframework; -import akka.actor.ActorRef; import akka.actor.ActorSystem; -import akka.actor.Address; import com.typesafe.config.Config; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.lang3.StringUtils; + +import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; @@ -35,6 +35,8 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.webmonitor.WebMonitor; import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever; +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; import org.apache.flink.util.NetUtils; import org.slf4j.Logger; @@ -52,6 +54,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.Executor; /** * Tools for starting JobManager and TaskManager processes, including the @@ -171,7 +174,11 @@ public class BootstrapTools { * * @param config The Flink config. * @param highAvailabilityServices Service factory for high availability services - * @param actorSystem The ActorSystem to start the web frontend in. + * @param jobManagerRetriever to retrieve the leading JobManagerGateway + * @param queryServiceRetriever to resolve a query service + * @param timeout for asynchronous operations + * @param executor to run asynchronous operations + * @param jobManagerAddress the address of the JobManager for which the WebMonitor is started * @param logger Logger for log output * @return WebMonitor instance. * @throws Exception @@ -179,16 +186,13 @@ public class BootstrapTools { public static WebMonitor startWebMonitorIfConfigured( Configuration config, HighAvailabilityServices highAvailabilityServices, - ActorSystem actorSystem, - ActorRef jobManager, + JobManagerRetriever jobManagerRetriever, + MetricQueryServiceRetriever queryServiceRetriever, + Time timeout, + Executor executor, + String jobManagerAddress, Logger logger) throws Exception { - - // this ensures correct values are present in the web frontend - final Address address = AkkaUtils.getAddress(actorSystem); - config.setString(JobManagerOptions.ADDRESS, address.host().get()); - config.setInteger(JobManagerOptions.PORT, Integer.parseInt(address.port().get().toString())); - if (config.getInteger(WebOptions.PORT, 0) >= 0) { logger.info("Starting JobManager Web Frontend"); @@ -197,12 +201,14 @@ public class BootstrapTools { WebMonitor monitor = WebMonitorUtils.startWebRuntimeMonitor( config, highAvailabilityServices, - actorSystem); + jobManagerRetriever, + queryServiceRetriever, + timeout, + executor); // start the web monitor if (monitor != null) { - String jobManagerAkkaURL = AkkaUtils.getAkkaURL(actorSystem, jobManager); - monitor.start(jobManagerAkkaURL); + monitor.start(jobManagerAddress); } return monitor; } http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java index 043c603..5c6439d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java @@ -305,6 +305,16 @@ public class FutureUtils { return new FiniteDuration(time.toMilliseconds(), TimeUnit.MILLISECONDS); } + /** + * Converts {@link FiniteDuration} into Flink time. + * + * @param finiteDuration to convert into Flink time + * @return Flink time with the length of the given finite duration + */ + public static Time toTime(FiniteDuration finiteDuration) { + return Time.milliseconds(finiteDuration.toMillis()); + } + // ------------------------------------------------------------------------ // Converting futures // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java index cba7b06..a4d0d11 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java @@ -21,11 +21,20 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.akka.ListeningBehaviour; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview; +import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; +import org.apache.flink.runtime.messages.webmonitor.StatusOverview; import org.apache.flink.runtime.rpc.RpcGateway; +import javax.annotation.Nullable; + +import java.util.Collection; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -38,21 +47,19 @@ import java.util.concurrent.CompletableFuture; public interface JobManagerGateway extends RpcGateway { /** - * Requests the class loading properties for the given JobID. + * Requests the BlobServer port. * - * @param jobId for which the class loading properties are requested * @param timeout for this operation - * @return Future containing the optional class loading properties if they could be retrieved from the JobManager. + * @return Future containing the BlobServer port */ - CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> requestClassloadingProps(JobID jobId, Time timeout); + CompletableFuture<Integer> requestBlobServerPort(Time timeout); /** - * Requests the BlobServer port. + * Returns the port of the web runtime monitor serving requests for the JobManager endpoint. * - * @param timeout for this operation - * @return Future containing the BlobServer port + * @return Port of the WebRuntimeMonitor responsible for the JobManager endpoint */ - CompletableFuture<Integer> requestBlobServerPort(Time timeout); + CompletableFuture<Integer> requestWebPort(Time timeout); /** * Submits a job to the JobManager. @@ -63,4 +70,103 @@ public interface JobManagerGateway extends RpcGateway { * @return Future containing an Acknowledge message if the submission succeeded */ CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, ListeningBehaviour listeningBehaviour, Time timeout); + + /** + * Cancels the given job after taking a savepoint and returning its path. + * + * If the savepointPath is null, then the JobManager will use the default savepoint directory + * to store the savepoint in. After the savepoint has been taken and the job has been canceled + * successfully, the path of the savepoint is returned. + * + * @param jobId identifying the job to cancel + * @param savepointPath Optional path for the savepoint to be stored under; if null, then the default path is + * taken + * @param timeout for the asynchronous operation + * @return Future containing the savepoint path of the taken savepoint or an Exception if the operation failed + */ + CompletableFuture<String> cancelJobWithSavepoint(JobID jobId, @Nullable String savepointPath, Time timeout); + + /** + * Cancels the given job. + * + * @param jobId identifying the job to cancel + * @param timeout for the asynchronous operation + * @return Future containing Acknowledge or an Exception if the operation failed + */ + CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout); + + /** + * Stops the given job. + * + * @param jobId identifying the job to cancel + * @param timeout for the asynchronous operation + * @return Future containing Acknowledge or an Exception if the operation failed + */ + CompletableFuture<Acknowledge> stopJob(JobID jobId, Time timeout); + + /** + * Requests the class loading properties for the given JobID. + * + * @param jobId for which the class loading properties are requested + * @param timeout for this operation + * @return Future containing the optional class loading properties if they could be retrieved from the JobManager. + */ + CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> requestClassloadingProps(JobID jobId, Time timeout); + + /** + * Requests the TaskManager instance registered under the given instanceId from the JobManager. + * If there is no Instance registered, then {@link Optional#empty()} is returned. + * + * @param instanceId for which to retrieve the Instance + * @param timeout for the asynchronous operation + * @return Future containing the TaskManager instance registered under instanceId, otherwise {@link Optional#empty()} + */ + CompletableFuture<Optional<Instance>> requestTaskManagerInstance(InstanceID instanceId, Time timeout); + + /** + * Requests all currently registered TaskManager instances from the JobManager. + * + * @param timeout for the asynchronous operation + * @return Future containing the collection of all currently registered TaskManager instances + */ + CompletableFuture<Collection<Instance>> requestTaskManagerInstances(Time timeout); + + /** + * Requests job details currently being executed by the JobManager. + * + * @param includeRunning true if running jobs shall be included, otherwise false + * @param includeFinished true if finished jobs shall be included, otherwise false + * @param timeout for the asynchronous operation + * @return Future containing the job details + */ + CompletableFuture<MultipleJobsDetails> requestJobDetails( + boolean includeRunning, + boolean includeFinished, + Time timeout); + + /** + * Requests the AccessExecutionGraph for the given jobId. If there is no such graph, then + * {@link Optional#empty()} is returned. + * + * @param jobId identifying the job whose AccessExecutionGraph is requested + * @param timeout for the asynchronous operation + * @return Future containing the AccessExecutionGraph for the given jobId, otherwise {@link Optional#empty()} + */ + CompletableFuture<Optional<AccessExecutionGraph>> requestJob(JobID jobId, Time timeout); + + /** + * Requests the status overview from the JobManager. + * + * @param timeout for the asynchronous operation + * @return Future containing the status overview + */ + CompletableFuture<StatusOverview> requestStatusOverview(Time timeout); + + /** + * Requests the job overview from the JobManager. + * + * @param timeout for the asynchronous operation + * @return Future containing the job overview + */ + CompletableFuture<JobsWithIDsOverview> requestJobsOverview(Time timeout); } http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java index e173522..1791fe1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java @@ -25,6 +25,7 @@ import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.HistogramStatistics; import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.Metric; import org.apache.flink.runtime.util.DataInputDeserializer; import org.apache.flink.runtime.util.DataOutputSerializer; import org.apache.flink.util.Preconditions; http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java index 9ebb126..9493696 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.webmonitor; +import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.WebOptions; import org.apache.flink.core.fs.Path; @@ -31,8 +32,9 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever; +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; -import akka.actor.ActorSystem; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; @@ -47,6 +49,7 @@ import java.net.URI; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.Executor; /** * Utilities for the web runtime monitor. This class contains for example methods to build @@ -119,27 +122,39 @@ public final class WebMonitorUtils { * * @param config The configuration for the runtime monitor. * @param highAvailabilityServices HighAvailabilityServices used to start the WebRuntimeMonitor - * @param actorSystem ActorSystem used to connect to the JobManager - * + * @param jobManagerRetriever which retrieves the currently leading JobManager + * @param queryServiceRetriever which retrieves the query service + * @param timeout for asynchronous operations + * @param executor to run asynchronous operations */ public static WebMonitor startWebRuntimeMonitor( Configuration config, HighAvailabilityServices highAvailabilityServices, - ActorSystem actorSystem) { + JobManagerRetriever jobManagerRetriever, + MetricQueryServiceRetriever queryServiceRetriever, + Time timeout, + Executor executor) { // try to load and instantiate the class try { String classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor"; Class<? extends WebMonitor> clazz = Class.forName(classname).asSubclass(WebMonitor.class); - Constructor<? extends WebMonitor> constructor = clazz.getConstructor(Configuration.class, + Constructor<? extends WebMonitor> constructor = clazz.getConstructor( + Configuration.class, LeaderRetrievalService.class, BlobView.class, - ActorSystem.class); + JobManagerRetriever.class, + MetricQueryServiceRetriever.class, + Time.class, + Executor.class); return constructor.newInstance( config, highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), highAvailabilityServices.createBlobStore(), - actorSystem); + jobManagerRetriever, + queryServiceRetriever, + timeout, + executor); } catch (ClassNotFoundException e) { LOG.error("Could not load web runtime monitor. " + "Probably reason: flink-runtime-web is not in the classpath"); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/JobManagerRetriever.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/JobManagerRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/JobManagerRetriever.java new file mode 100644 index 0000000..2eade48 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/JobManagerRetriever.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.retriever; + +import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; +import org.apache.flink.util.FlinkException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Retrieves and stores the JobManagerGateway for the current leading JobManager. + */ +public abstract class JobManagerRetriever implements LeaderRetrievalListener { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + // False if we have to create a new JobManagerGateway future when being notified + // about a new leader address + private final AtomicBoolean firstTimeUsage; + + private volatile CompletableFuture<JobManagerGateway> jobManagerGatewayFuture; + + public JobManagerRetriever() { + firstTimeUsage = new AtomicBoolean(true); + jobManagerGatewayFuture = new CompletableFuture<>(); + } + + /** + * Returns the currently known leading job manager gateway and its web monitor port. + */ + public Optional<JobManagerGateway> getJobManagerGatewayNow() throws Exception { + if (jobManagerGatewayFuture != null) { + CompletableFuture<JobManagerGateway> jobManagerGatewayFuture = this.jobManagerGatewayFuture; + + if (jobManagerGatewayFuture.isDone()) { + return Optional.of(jobManagerGatewayFuture.get()); + } else { + return Optional.empty(); + } + } else { + return Optional.empty(); + } + } + + /** + * Returns the current JobManagerGateway future. + */ + public CompletableFuture<JobManagerGateway> getJobManagerGateway() throws Exception { + return jobManagerGatewayFuture; + } + + @Override + public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) { + if (leaderAddress != null && !leaderAddress.equals("")) { + try { + final CompletableFuture<JobManagerGateway> newJobManagerGatewayFuture; + + if (firstTimeUsage.compareAndSet(true, false)) { + newJobManagerGatewayFuture = jobManagerGatewayFuture; + } else { + newJobManagerGatewayFuture = new CompletableFuture<>(); + jobManagerGatewayFuture = newJobManagerGatewayFuture; + } + + log.info("New leader reachable under {}:{}.", leaderAddress, leaderSessionID); + + createJobManagerGateway(leaderAddress, leaderSessionID).whenComplete( + (JobManagerGateway jobManagerGateway, Throwable throwable) -> { + if (throwable != null) { + newJobManagerGatewayFuture.completeExceptionally(new FlinkException("Could not retrieve" + + "the current job manager gateway.", throwable)); + } else { + newJobManagerGatewayFuture.complete(jobManagerGateway); + } + } + ); + } + catch (Exception e) { + handleError(e); + } + } + } + + @Override + public void handleError(Exception exception) { + log.error("Received error from LeaderRetrievalService.", exception); + + jobManagerGatewayFuture.completeExceptionally(exception); + } + + /** + * Create a JobManagerGateway for the given leader address and leader id. + * + * @param leaderAddress to connect against + * @param leaderId the endpoint currently uses + * @return Future containing the resolved JobManagerGateway + * @throws Exception if the JobManagerGateway creation failed + */ + protected abstract CompletableFuture<JobManagerGateway> createJobManagerGateway(String leaderAddress, UUID leaderId) throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java new file mode 100644 index 0000000..c79bf5d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.retriever; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization; +import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceGateway; + +import java.util.concurrent.CompletableFuture; + +/** + * Gateway to communicate with a QueryService. + * + * <p>Currently there is only one implementation working with a Akka based + * MetricQueryService {@link AkkaQueryServiceGateway}. + */ +public interface MetricQueryServiceGateway { + + CompletableFuture<MetricDumpSerialization.MetricSerializationResult> queryMetrics(Time timeout); +} http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceRetriever.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceRetriever.java new file mode 100644 index 0000000..7bb9b44 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceRetriever.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.retriever; + +import java.util.concurrent.CompletableFuture; + +/** + * Retriever for {@link MetricQueryServiceGateway}. + */ +public interface MetricQueryServiceRetriever { + + /** + * Retrieves for the given query service path a {@link MetricQueryServiceGateway}. + * + * @param queryServicePath under which the QueryService can be reached + * @return Future containing the resolved QueryServiceGateway + */ + CompletableFuture<MetricQueryServiceGateway> retrieveService(String queryServicePath); +} http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetriever.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetriever.java new file mode 100644 index 0000000..027b42a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetriever.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.retriever.impl; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.akka.AkkaJobManagerGateway; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever; +import org.apache.flink.util.Preconditions; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +/** + * {@link JobManagerRetriever} implementation for Akka based JobManagers. + */ +public class AkkaJobManagerRetriever extends JobManagerRetriever { + + private final ActorSystem actorSystem; + private final Time timeout; + + public AkkaJobManagerRetriever( + ActorSystem actorSystem, + Time timeout) { + + this.actorSystem = Preconditions.checkNotNull(actorSystem); + this.timeout = Preconditions.checkNotNull(timeout); + } + + @Override + protected CompletableFuture<JobManagerGateway> createJobManagerGateway(String leaderAddress, UUID leaderId) throws Exception { + return FutureUtils.toJava( + AkkaUtils.getActorRefFuture( + leaderAddress, + actorSystem, + FutureUtils.toFiniteDuration(timeout))) + .thenApplyAsync( + (ActorRef jobManagerRef) -> { + ActorGateway leaderGateway = new AkkaActorGateway( + jobManagerRef, leaderId); + + return new AkkaJobManagerGateway(leaderGateway); + }, + actorSystem.dispatcher()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java new file mode 100644 index 0000000..8985205 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.retriever.impl; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization; +import org.apache.flink.runtime.metrics.dump.MetricQueryService; +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway; +import org.apache.flink.util.Preconditions; + +import akka.actor.ActorRef; +import akka.pattern.Patterns; + +import java.util.concurrent.CompletableFuture; + +import scala.reflect.ClassTag$; + +/** + * {@link MetricQueryServiceGateway} implementation for Akka based {@link MetricQueryService}. + */ +public class AkkaQueryServiceGateway implements MetricQueryServiceGateway { + + private final ActorRef queryServiceActorRef; + + public AkkaQueryServiceGateway(ActorRef queryServiceActorRef) { + this.queryServiceActorRef = Preconditions.checkNotNull(queryServiceActorRef); + } + + @Override + public CompletableFuture<MetricDumpSerialization.MetricSerializationResult> queryMetrics(Time timeout) { + return FutureUtils.toJava( + Patterns.ask(queryServiceActorRef, MetricQueryService.getCreateDump(), timeout.toMilliseconds()) + .mapTo(ClassTag$.MODULE$.apply(MetricDumpSerialization.MetricSerializationResult.class)) + ); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceRetriever.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceRetriever.java new file mode 100644 index 0000000..7de436a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceRetriever.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.retriever.impl; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.metrics.dump.MetricQueryService; +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway; +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; +import org.apache.flink.util.Preconditions; + +import akka.actor.ActorSelection; +import akka.actor.ActorSystem; + +import java.util.concurrent.CompletableFuture; + +/** + * {@link MetricQueryServiceRetriever} implementation for Akka based {@link MetricQueryService}. + */ +public class AkkaQueryServiceRetriever implements MetricQueryServiceRetriever { + private final ActorSystem actorSystem; + private final Time lookupTimeout; + + public AkkaQueryServiceRetriever(ActorSystem actorSystem, Time lookupTimeout) { + this.actorSystem = Preconditions.checkNotNull(actorSystem); + this.lookupTimeout = Preconditions.checkNotNull(lookupTimeout); + } + + @Override + public CompletableFuture<MetricQueryServiceGateway> retrieveService(String queryServicePath) { + ActorSelection selection = actorSystem.actorSelection(queryServicePath); + + return FutureUtils.toJava(selection.resolveOne(FutureUtils.toFiniteDuration(lookupTimeout))).thenApply(AkkaQueryServiceGateway::new); + } +}
