http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java deleted file mode 100644 index 9c5e168..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java +++ /dev/null @@ -1,389 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers.checkpoints; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; -import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory; -import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; -import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; -import org.apache.flink.runtime.checkpoint.MinMaxAvgStats; -import org.apache.flink.runtime.checkpoint.PendingCheckpointStats; -import org.apache.flink.runtime.checkpoint.SubtaskStateStats; -import org.apache.flink.runtime.checkpoint.TaskStateStats; -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.JsonArchivist; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.junit.Assert; -import org.junit.Test; - -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.ThreadLocalRandom; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -/** - * Tests for the CheckpointStatsSubtaskDetailsHandler. - */ -public class CheckpointStatsSubtaskDetailsHandlerTest { - - @Test - public void testArchiver() throws Exception { - JsonArchivist archivist = new CheckpointStatsDetailsSubtasksHandler.CheckpointStatsDetailsSubtasksJsonArchivist(); - ObjectMapper mapper = new ObjectMapper(); - - PendingCheckpointStats checkpoint = mock(PendingCheckpointStats.class); - when(checkpoint.getCheckpointId()).thenReturn(1992139L); - when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS); - when(checkpoint.getTriggerTimestamp()).thenReturn(0L); // ack timestamp = duration - - TaskStateStats task = createTaskStateStats(1237); - when(checkpoint.getAllTaskStateStats()).thenReturn(Collections.singletonList(task)); - - CheckpointStatsHistory history = mock(CheckpointStatsHistory.class); - when(history.getCheckpoints()).thenReturn(Collections.<AbstractCheckpointStats>singletonList(checkpoint)); - CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class); - when(snapshot.getHistory()).thenReturn(history); - - AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot); - when(graph.getJobID()).thenReturn(new JobID()); - - Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(graph); - Assert.assertEquals(1, archives.size()); - - ArchivedJson archive = archives.iterator().next(); - Assert.assertEquals( - "/jobs/" + graph.getJobID() + "/checkpoints/details/" + checkpoint.getCheckpointId() + "/subtasks/" + task.getJobVertexId(), - archive.getPath()); - JsonNode rootNode = mapper.readTree(archive.getJson()); - assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong()); - assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText()); - - verifyTaskNode(rootNode, task, checkpoint.getTriggerTimestamp()); - } - - @Test - public void testGetPaths() { - CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); - String[] paths = handler.getPaths(); - Assert.assertEquals(1, paths.length); - Assert.assertEquals("/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid", paths[0]); - } - - /** - * Tests a subtask details request. - */ - @Test - public void testSubtaskRequest() throws Exception { - PendingCheckpointStats checkpoint = mock(PendingCheckpointStats.class); - when(checkpoint.getCheckpointId()).thenReturn(1992139L); - when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS); - when(checkpoint.getTriggerTimestamp()).thenReturn(0L); // ack timestamp = duration - - TaskStateStats task = createTaskStateStats(1237); - when(checkpoint.getTaskStateStats(any(JobVertexID.class))).thenReturn(task); - - JsonNode rootNode = triggerRequest(checkpoint); - assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong()); - assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText()); - - verifyTaskNode(rootNode, task, checkpoint.getTriggerTimestamp()); - } - - /** - * Tests a subtask details request. - */ - @Test - public void testSubtaskRequestNoSummary() throws Exception { - PendingCheckpointStats checkpoint = mock(PendingCheckpointStats.class); - when(checkpoint.getCheckpointId()).thenReturn(1992139L); - when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS); - when(checkpoint.getTriggerTimestamp()).thenReturn(0L); // ack timestamp = duration - - TaskStateStats task = createTaskStateStats(0); // no acknowledged - when(checkpoint.getTaskStateStats(any(JobVertexID.class))).thenReturn(task); - - JsonNode rootNode = triggerRequest(checkpoint); - assertNull(rootNode.get("summary")); - } - - /** - * Tests request with illegal checkpoint ID param. - */ - @Test - public void testIllegalCheckpointId() throws Exception { - AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); - Map<String, String> params = new HashMap<>(); - params.put("checkpointid", "illegal checkpoint"); - String json = handler.handleRequest(graph, params).get(); - - assertEquals("{}", json); - } - - /** - * Tests request with missing checkpoint ID param. - */ - @Test - public void testNoCheckpointIdParam() throws Exception { - AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); - String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get(); - - assertEquals("{}", json); - } - - /** - * Test lookup of not existing checkpoint in history. - */ - @Test - public void testCheckpointNotFound() throws Exception { - CheckpointStatsHistory history = mock(CheckpointStatsHistory.class); - when(history.getCheckpointById(anyLong())).thenReturn(null); // not found - - CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class); - when(snapshot.getHistory()).thenReturn(history); - - AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot); - - CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); - Map<String, String> params = new HashMap<>(); - params.put("checkpointid", "123"); - params.put("vertexid", new JobVertexID().toString()); - String json = handler.handleRequest(graph, params).get(); - - assertEquals("{}", json); - verify(history, times(1)).getCheckpointById(anyLong()); - } - - /** - * Tests request with illegal job vertex ID param. - */ - @Test - public void testIllegalJobVertexIdParam() throws Exception { - AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); - Map<String, String> params = new HashMap<>(); - params.put("checkpointid", "1"); - params.put("vertexid", "illegal vertex id"); - String json = handler.handleRequest(graph, params).get(); - - assertEquals("{}", json); - } - - /** - * Tests request with missing job vertex ID param. - */ - @Test - public void testNoJobVertexIdParam() throws Exception { - AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); - Map<String, String> params = new HashMap<>(); - params.put("checkpointid", "1"); - String json = handler.handleRequest(graph, params).get(); - - assertEquals("{}", json); - } - - /** - * Test lookup of not existing job vertex ID in checkpoint. - */ - @Test - public void testJobVertexNotFound() throws Exception { - PendingCheckpointStats inProgress = mock(PendingCheckpointStats.class); - when(inProgress.getTaskStateStats(any(JobVertexID.class))).thenReturn(null); // not found - CheckpointStatsHistory history = mock(CheckpointStatsHistory.class); - when(history.getCheckpointById(anyLong())).thenReturn(inProgress); - - CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class); - when(snapshot.getHistory()).thenReturn(history); - - AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot); - - CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); - Map<String, String> params = new HashMap<>(); - params.put("checkpointid", "123"); - params.put("vertexid", new JobVertexID().toString()); - String json = handler.handleRequest(graph, params).get(); - - assertEquals("{}", json); - verify(inProgress, times(1)).getTaskStateStats(any(JobVertexID.class)); - } - - // ------------------------------------------------------------------------ - - private static JsonNode triggerRequest(AbstractCheckpointStats checkpoint) throws Exception { - CheckpointStatsHistory history = mock(CheckpointStatsHistory.class); - when(history.getCheckpointById(anyLong())).thenReturn(checkpoint); - CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class); - when(snapshot.getHistory()).thenReturn(history); - - AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot); - - CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); - Map<String, String> params = new HashMap<>(); - params.put("checkpointid", "123"); - params.put("vertexid", new JobVertexID().toString()); - String json = handler.handleRequest(graph, params).get(); - - ObjectMapper mapper = new ObjectMapper(); - return mapper.readTree(json); - } - - private static TaskStateStats createTaskStateStats(int numAcknowledged) { - ThreadLocalRandom rand = ThreadLocalRandom.current(); - - TaskStateStats task = mock(TaskStateStats.class); - when(task.getJobVertexId()).thenReturn(new JobVertexID()); - when(task.getLatestAckTimestamp()).thenReturn(rand.nextLong(1024) + 1); - when(task.getStateSize()).thenReturn(rand.nextLong(1024) + 1); - when(task.getEndToEndDuration(anyLong())).thenReturn(rand.nextLong(1024) + 1); - when(task.getAlignmentBuffered()).thenReturn(rand.nextLong(1024) + 1); - when(task.getNumberOfSubtasks()).thenReturn(rand.nextInt(1024) + 1); - when(task.getNumberOfAcknowledgedSubtasks()).thenReturn(numAcknowledged); - - TaskStateStats.TaskStateStatsSummary summary = mock(TaskStateStats.TaskStateStatsSummary.class); - - doReturn(createMinMaxAvgStats(rand)).when(summary).getStateSizeStats(); - doReturn(createMinMaxAvgStats(rand)).when(summary).getAckTimestampStats(); - doReturn(createMinMaxAvgStats(rand)).when(summary).getAlignmentBufferedStats(); - doReturn(createMinMaxAvgStats(rand)).when(summary).getAlignmentDurationStats(); - doReturn(createMinMaxAvgStats(rand)).when(summary).getSyncCheckpointDurationStats(); - doReturn(createMinMaxAvgStats(rand)).when(summary).getAsyncCheckpointDurationStats(); - - when(task.getSummaryStats()).thenReturn(summary); - - SubtaskStateStats[] subtasks = new SubtaskStateStats[3]; - subtasks[0] = createSubtaskStats(0, rand); - subtasks[1] = createSubtaskStats(1, rand); - subtasks[2] = null; - - when(task.getSubtaskStats()).thenReturn(subtasks); - - return task; - } - - private static void verifyTaskNode(JsonNode taskNode, TaskStateStats task, long triggerTimestamp) { - long duration = ThreadLocalRandom.current().nextInt(128); - - assertEquals(task.getLatestAckTimestamp(), taskNode.get("latest_ack_timestamp").asLong()); - assertEquals(task.getStateSize(), taskNode.get("state_size").asLong()); - assertEquals(task.getEndToEndDuration(task.getLatestAckTimestamp() - duration), taskNode.get("end_to_end_duration").asLong()); - assertEquals(task.getAlignmentBuffered(), taskNode.get("alignment_buffered").asLong()); - assertEquals(task.getNumberOfSubtasks(), taskNode.get("num_subtasks").asInt()); - assertEquals(task.getNumberOfAcknowledgedSubtasks(), taskNode.get("num_acknowledged_subtasks").asInt()); - - TaskStateStats.TaskStateStatsSummary summary = task.getSummaryStats(); - verifyMinMaxAvgStats(summary.getStateSizeStats(), taskNode.get("summary").get("state_size")); - verifyMinMaxAvgStats(summary.getSyncCheckpointDurationStats(), taskNode.get("summary").get("checkpoint_duration").get("sync")); - verifyMinMaxAvgStats(summary.getAsyncCheckpointDurationStats(), taskNode.get("summary").get("checkpoint_duration").get("async")); - verifyMinMaxAvgStats(summary.getAlignmentBufferedStats(), taskNode.get("summary").get("alignment").get("buffered")); - verifyMinMaxAvgStats(summary.getAlignmentDurationStats(), taskNode.get("summary").get("alignment").get("duration")); - - JsonNode endToEndDurationNode = taskNode.get("summary").get("end_to_end_duration"); - assertEquals(summary.getAckTimestampStats().getMinimum() - triggerTimestamp, endToEndDurationNode.get("min").asLong()); - assertEquals(summary.getAckTimestampStats().getMaximum() - triggerTimestamp, endToEndDurationNode.get("max").asLong()); - assertEquals((long) summary.getAckTimestampStats().getAverage() - triggerTimestamp, endToEndDurationNode.get("avg").asLong()); - - SubtaskStateStats[] subtasks = task.getSubtaskStats(); - Iterator<JsonNode> it = taskNode.get("subtasks").iterator(); - - assertTrue(it.hasNext()); - verifySubtaskStats(it.next(), 0, subtasks[0]); - - assertTrue(it.hasNext()); - verifySubtaskStats(it.next(), 1, subtasks[1]); - - assertTrue(it.hasNext()); - verifySubtaskStats(it.next(), 2, subtasks[2]); - - assertFalse(it.hasNext()); - } - - private static SubtaskStateStats createSubtaskStats(int index, ThreadLocalRandom rand) { - SubtaskStateStats subtask = mock(SubtaskStateStats.class); - when(subtask.getSubtaskIndex()).thenReturn(index); - when(subtask.getAckTimestamp()).thenReturn(rand.nextLong(1024)); - when(subtask.getAlignmentBuffered()).thenReturn(rand.nextLong(1024)); - when(subtask.getAlignmentDuration()).thenReturn(rand.nextLong(1024)); - when(subtask.getSyncCheckpointDuration()).thenReturn(rand.nextLong(1024)); - when(subtask.getAsyncCheckpointDuration()).thenReturn(rand.nextLong(1024)); - when(subtask.getAckTimestamp()).thenReturn(rand.nextLong(1024)); - when(subtask.getStateSize()).thenReturn(rand.nextLong(1024)); - when(subtask.getEndToEndDuration(anyLong())).thenReturn(rand.nextLong(1024)); - return subtask; - } - - private static void verifySubtaskStats(JsonNode subtaskNode, int index, SubtaskStateStats subtask) { - if (subtask == null) { - assertEquals(index, subtaskNode.get("index").asInt()); - assertEquals("pending_or_failed", subtaskNode.get("status").asText()); - } else { - assertEquals(subtask.getSubtaskIndex(), subtaskNode.get("index").asInt()); - assertEquals("completed", subtaskNode.get("status").asText()); - assertEquals(subtask.getAckTimestamp(), subtaskNode.get("ack_timestamp").asLong()); - assertEquals(subtask.getEndToEndDuration(0), subtaskNode.get("end_to_end_duration").asLong()); - assertEquals(subtask.getStateSize(), subtaskNode.get("state_size").asLong()); - assertEquals(subtask.getSyncCheckpointDuration(), subtaskNode.get("checkpoint").get("sync").asLong()); - assertEquals(subtask.getAsyncCheckpointDuration(), subtaskNode.get("checkpoint").get("async").asLong()); - assertEquals(subtask.getAlignmentBuffered(), subtaskNode.get("alignment").get("buffered").asLong()); - assertEquals(subtask.getAlignmentDuration(), subtaskNode.get("alignment").get("duration").asLong()); - } - } - - private static MinMaxAvgStats createMinMaxAvgStats(ThreadLocalRandom rand) { - MinMaxAvgStats mma = mock(MinMaxAvgStats.class); - when(mma.getMinimum()).thenReturn(rand.nextLong(1024)); - when(mma.getMaximum()).thenReturn(rand.nextLong(1024)); - when(mma.getAverage()).thenReturn(rand.nextLong(1024)); - - return mma; - } - - private static void verifyMinMaxAvgStats(MinMaxAvgStats expected, JsonNode node) { - assertEquals(expected.getMinimum(), node.get("min").asLong()); - assertEquals(expected.getMaximum(), node.get("max").asLong()); - assertEquals(expected.getAverage(), node.get("avg").asLong()); - } - -}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/FsJobArchivistTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/FsJobArchivistTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/FsJobArchivistTest.java index 2e52f2e..03666a8 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/FsJobArchivistTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/FsJobArchivistTest.java @@ -21,8 +21,8 @@ package org.apache.flink.runtime.webmonitor.history; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.history.FsJobArchivist; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; import org.apache.flink.runtime.webmonitor.WebRuntimeMonitor; -import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; import org.junit.Assert; import org.junit.Rule; http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java index 33d9c79..3c93be3 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java @@ -27,7 +27,7 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.messages.ArchiveMessages; -import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils; import akka.actor.ActorRef; import akka.actor.ActorSystem; http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java deleted file mode 100644 index 0755888..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.metrics; - -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; -import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; -import org.apache.flink.util.TestLogger; - -import org.junit.Test; - -import java.util.HashMap; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; -import static org.powermock.api.mockito.PowerMockito.mock; - -/** - * Tests for the AbstractMetricsHandler. - */ -public class AbstractMetricsHandlerTest extends TestLogger { - /** - * Verifies that the handlers correctly handle expected REST calls. - */ - @Test - public void testHandleRequest() throws Exception { - MetricFetcher fetcher = new MetricFetcher( - mock(GatewayRetriever.class), - mock(MetricQueryServiceRetriever.class), - Executors.directExecutor(), - TestingUtils.TIMEOUT()); - MetricStoreTest.setupStore(fetcher.getMetricStore()); - - JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher); - - Map<String, String> pathParams = new HashMap<>(); - Map<String, String> queryParams = new HashMap<>(); - - pathParams.put("jobid", "jobid"); - pathParams.put("vertexid", "taskid"); - - // get list of available metrics - String availableList = handler.handleJsonRequest(pathParams, queryParams, null).get(); - - assertEquals("[" + - "{\"id\":\"8.opname.abc.metric5\"}," + - "{\"id\":\"8.abc.metric4\"}" + - "]", - availableList); - - // get value for a single metric - queryParams.put("get", "8.opname.abc.metric5"); - - String metricValue = handler.handleJsonRequest(pathParams, queryParams, null).get(); - - assertEquals("[" + - "{\"id\":\"8.opname.abc.metric5\",\"value\":\"4\"}" + - "]" - , metricValue - ); - - // get values for multiple metrics - queryParams.put("get", "8.opname.abc.metric5,8.abc.metric4"); - - String metricValues = handler.handleJsonRequest(pathParams, queryParams, null).get(); - - assertEquals("[" + - "{\"id\":\"8.opname.abc.metric5\",\"value\":\"4\"}," + - "{\"id\":\"8.abc.metric4\",\"value\":\"3\"}" + - "]", - metricValues - ); - } - - /** - * Verifies that a malformed request for available metrics does not throw an exception. - */ - @Test - public void testInvalidListDoesNotFail() { - MetricFetcher fetcher = new MetricFetcher( - mock(GatewayRetriever.class), - mock(MetricQueryServiceRetriever.class), - Executors.directExecutor(), - TestingUtils.TIMEOUT()); - MetricStoreTest.setupStore(fetcher.getMetricStore()); - - JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher); - - Map<String, String> pathParams = new HashMap<>(); - Map<String, String> queryParams = new HashMap<>(); - - pathParams.put("jobid", "jobid"); - pathParams.put("vertexid", "taskid"); - - //-----invalid variable - pathParams.put("jobid", "nonexistent"); - - try { - assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null).get()); - } catch (Exception e) { - fail(); - } - } - - /** - * Verifies that a malformed request for a metric value does not throw an exception. - */ - @Test - public void testInvalidGetDoesNotFail() { - MetricFetcher fetcher = new MetricFetcher( - mock(GatewayRetriever.class), - mock(MetricQueryServiceRetriever.class), - Executors.directExecutor(), - TestingUtils.TIMEOUT()); - MetricStoreTest.setupStore(fetcher.getMetricStore()); - - JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher); - - Map<String, String> pathParams = new HashMap<>(); - Map<String, String> queryParams = new HashMap<>(); - - pathParams.put("jobid", "jobid"); - pathParams.put("vertexid", "taskid"); - - //-----empty string - queryParams.put("get", ""); - - try { - assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null).get()); - } catch (Exception e) { - fail(e.getMessage()); - } - - //-----invalid variable - pathParams.put("jobid", "nonexistent"); - queryParams.put("get", "subindex.opname.abc.metric5"); - - try { - assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null).get()); - } catch (Exception e) { - fail(e.getMessage()); - } - - //-----invalid metric - pathParams.put("jobid", "nonexistant"); - queryParams.put("get", "subindex.opname.abc.nonexistant"); - - try { - assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null).get()); - } catch (Exception e) { - fail(e.getMessage()); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java deleted file mode 100644 index 6d17b40..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.metrics; - -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; -import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; -import org.apache.flink.util.TestLogger; - -import org.junit.Assert; -import org.junit.Test; - -import java.util.HashMap; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.powermock.api.mockito.PowerMockito.mock; - -/** - * Tests for the JobManagerMetricsHandler. - */ -public class JobManagerMetricsHandlerTest extends TestLogger { - @Test - public void testGetPaths() { - JobManagerMetricsHandler handler = new JobManagerMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class)); - String[] paths = handler.getPaths(); - Assert.assertEquals(1, paths.length); - Assert.assertEquals("/jobmanager/metrics", paths[0]); - } - - @Test - public void getMapFor() { - MetricFetcher fetcher = new MetricFetcher( - mock(GatewayRetriever.class), - mock(MetricQueryServiceRetriever.class), - Executors.directExecutor(), - TestingUtils.TIMEOUT()); - MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore()); - - JobManagerMetricsHandler handler = new JobManagerMetricsHandler(Executors.directExecutor(), fetcher); - - Map<String, String> pathParams = new HashMap<>(); - - Map<String, String> metrics = handler.getMapFor(pathParams, store); - - assertEquals("0", metrics.get("abc.metric1")); - } - - @Test - public void getMapForNull() { - MetricFetcher fetcher = new MetricFetcher( - mock(GatewayRetriever.class), - mock(MetricQueryServiceRetriever.class), - Executors.directExecutor(), - TestingUtils.TIMEOUT()); - MetricStore store = fetcher.getMetricStore(); - - JobManagerMetricsHandler handler = new JobManagerMetricsHandler(Executors.directExecutor(), fetcher); - - Map<String, String> pathParams = new HashMap<>(); - - Map<String, String> metrics = handler.getMapFor(pathParams, store); - - assertNotNull(metrics); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java deleted file mode 100644 index b26ceab..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.metrics; - -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; -import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; -import org.apache.flink.util.TestLogger; - -import org.junit.Assert; -import org.junit.Test; - -import java.util.HashMap; -import java.util.Map; - -import static org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler.PARAMETER_JOB_ID; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.powermock.api.mockito.PowerMockito.mock; - -/** - * Tests for the JobMetricsHandler. - */ -public class JobMetricsHandlerTest extends TestLogger { - @Test - public void testGetPaths() { - JobMetricsHandler handler = new JobMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class)); - String[] paths = handler.getPaths(); - Assert.assertEquals(1, paths.length); - Assert.assertEquals("/jobs/:jobid/metrics", paths[0]); - } - - @Test - public void getMapFor() throws Exception { - MetricFetcher fetcher = new MetricFetcher( - mock(GatewayRetriever.class), - mock(MetricQueryServiceRetriever.class), - Executors.directExecutor(), - TestingUtils.TIMEOUT()); - MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore()); - - JobMetricsHandler handler = new JobMetricsHandler(Executors.directExecutor(), fetcher); - - Map<String, String> pathParams = new HashMap<>(); - pathParams.put(PARAMETER_JOB_ID, "jobid"); - - Map<String, String> metrics = handler.getMapFor(pathParams, store); - - assertEquals("2", metrics.get("abc.metric3")); - } - - @Test - public void getMapForNull() { - MetricFetcher fetcher = new MetricFetcher( - mock(GatewayRetriever.class), - mock(MetricQueryServiceRetriever.class), - Executors.directExecutor(), - TestingUtils.TIMEOUT()); - MetricStore store = fetcher.getMetricStore(); - - JobMetricsHandler handler = new JobMetricsHandler(Executors.directExecutor(), fetcher); - - Map<String, String> pathParams = new HashMap<>(); - - Map<String, String> metrics = handler.getMapFor(pathParams, store); - - assertNull(metrics); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java deleted file mode 100644 index d637a4a..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.metrics; - -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; -import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; -import org.apache.flink.util.TestLogger; - -import org.junit.Assert; -import org.junit.Test; - -import java.util.HashMap; -import java.util.Map; - -import static org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler.PARAMETER_JOB_ID; -import static org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler.PARAMETER_VERTEX_ID; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.powermock.api.mockito.PowerMockito.mock; - -/** - * Tests for the JobVertexMetricsHandler. - */ -public class JobVertexMetricsHandlerTest extends TestLogger { - @Test - public void testGetPaths() { - JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class)); - String[] paths = handler.getPaths(); - Assert.assertEquals(1, paths.length); - Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/metrics", paths[0]); - } - - @Test - public void getMapFor() throws Exception { - MetricFetcher fetcher = new MetricFetcher( - mock(GatewayRetriever.class), - mock(MetricQueryServiceRetriever.class), - Executors.directExecutor(), - TestingUtils.TIMEOUT()); - MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore()); - - JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher); - - Map<String, String> pathParams = new HashMap<>(); - pathParams.put(PARAMETER_JOB_ID, "jobid"); - pathParams.put(PARAMETER_VERTEX_ID, "taskid"); - - Map<String, String> metrics = handler.getMapFor(pathParams, store); - - assertEquals("3", metrics.get("8.abc.metric4")); - - assertEquals("4", metrics.get("8.opname.abc.metric5")); - } - - @Test - public void getMapForNull() { - MetricFetcher fetcher = new MetricFetcher( - mock(GatewayRetriever.class), - mock(MetricQueryServiceRetriever.class), - Executors.directExecutor(), - TestingUtils.TIMEOUT()); - MetricStore store = fetcher.getMetricStore(); - - JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher); - - Map<String, String> pathParams = new HashMap<>(); - - Map<String, String> metrics = handler.getMapFor(pathParams, store); - - assertNull(metrics); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java deleted file mode 100644 index 09a0829..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.metrics; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.Gauge; -import org.apache.flink.metrics.Histogram; -import org.apache.flink.metrics.Meter; -import org.apache.flink.metrics.SimpleCounter; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.instance.Instance; -import org.apache.flink.runtime.instance.InstanceID; -import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; -import org.apache.flink.runtime.jobmaster.JobManagerGateway; -import org.apache.flink.runtime.messages.webmonitor.JobDetails; -import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; -import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization; -import org.apache.flink.runtime.metrics.dump.MetricQueryService; -import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; -import org.apache.flink.runtime.metrics.util.TestingHistogram; -import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; -import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway; -import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; -import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever; -import org.apache.flink.util.TestLogger; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Matchers.eq; -import static org.powermock.api.mockito.PowerMockito.mock; -import static org.powermock.api.mockito.PowerMockito.when; - -/** - * Tests for the MetricFetcher. - */ -@RunWith(PowerMockRunner.class) -@PrepareForTest(MetricFetcher.class) -public class MetricFetcherTest extends TestLogger { - @Test - public void testUpdate() throws Exception { - final Time timeout = Time.seconds(10L); - - // ========= setup TaskManager ================================================================================= - JobID jobID = new JobID(); - InstanceID tmID = new InstanceID(); - ResourceID tmRID = new ResourceID(tmID.toString()); - TaskManagerGateway taskManagerGateway = mock(TaskManagerGateway.class); - when(taskManagerGateway.getAddress()).thenReturn("/tm/address"); - - Instance taskManager = mock(Instance.class); - when(taskManager.getTaskManagerGateway()).thenReturn(taskManagerGateway); - when(taskManager.getId()).thenReturn(tmID); - when(taskManager.getTaskManagerID()).thenReturn(tmRID); - - // ========= setup JobManager ================================================================================== - JobDetails details = mock(JobDetails.class); - when(details.getJobId()).thenReturn(jobID); - - JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); - - when(jobManagerGateway.requestJobDetails(anyBoolean(), anyBoolean(), any(Time.class))) - .thenReturn(CompletableFuture.completedFuture(new MultipleJobsDetails(new JobDetails[0], new JobDetails[0]))); - when(jobManagerGateway.requestTaskManagerInstances(any(Time.class))) - .thenReturn(CompletableFuture.completedFuture(Collections.singleton(taskManager))); - when(jobManagerGateway.getAddress()).thenReturn("/jm/address"); - - GatewayRetriever<JobManagerGateway> retriever = mock(AkkaJobManagerRetriever.class); - when(retriever.getNow()) - .thenReturn(Optional.of(jobManagerGateway)); - - // ========= setup QueryServices ================================================================================ - MetricQueryServiceGateway jmQueryService = mock(MetricQueryServiceGateway.class); - MetricQueryServiceGateway tmQueryService = mock(MetricQueryServiceGateway.class); - - MetricDumpSerialization.MetricSerializationResult requestMetricsAnswer = createRequestDumpAnswer(tmID, jobID); - - when(jmQueryService.queryMetrics(any(Time.class))) - .thenReturn(CompletableFuture.completedFuture(new MetricDumpSerialization.MetricSerializationResult(new byte[0], 0, 0, 0, 0))); - when(tmQueryService.queryMetrics(any(Time.class))) - .thenReturn(CompletableFuture.completedFuture(requestMetricsAnswer)); - - MetricQueryServiceRetriever queryServiceRetriever = mock(MetricQueryServiceRetriever.class); - when(queryServiceRetriever.retrieveService(eq("/jm/" + MetricQueryService.METRIC_QUERY_SERVICE_NAME))).thenReturn(CompletableFuture.completedFuture(jmQueryService)); - when(queryServiceRetriever.retrieveService(eq("/tm/" + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + tmRID.getResourceIdString()))).thenReturn(CompletableFuture.completedFuture(tmQueryService)); - - // ========= start MetricFetcher testing ======================================================================= - MetricFetcher fetcher = new MetricFetcher( - retriever, - queryServiceRetriever, - Executors.directExecutor(), - timeout); - - // verify that update fetches metrics and updates the store - fetcher.update(); - MetricStore store = fetcher.getMetricStore(); - synchronized (store) { - assertEquals("7", store.jobManager.metrics.get("abc.hist_min")); - assertEquals("6", store.jobManager.metrics.get("abc.hist_max")); - assertEquals("4.0", store.jobManager.metrics.get("abc.hist_mean")); - assertEquals("0.5", store.jobManager.metrics.get("abc.hist_median")); - assertEquals("5.0", store.jobManager.metrics.get("abc.hist_stddev")); - assertEquals("0.75", store.jobManager.metrics.get("abc.hist_p75")); - assertEquals("0.9", store.jobManager.metrics.get("abc.hist_p90")); - assertEquals("0.95", store.jobManager.metrics.get("abc.hist_p95")); - assertEquals("0.98", store.jobManager.metrics.get("abc.hist_p98")); - assertEquals("0.99", store.jobManager.metrics.get("abc.hist_p99")); - assertEquals("0.999", store.jobManager.metrics.get("abc.hist_p999")); - - assertEquals("x", store.getTaskManagerMetricStore(tmID.toString()).metrics.get("abc.gauge")); - assertEquals("5.0", store.getJobMetricStore(jobID.toString()).metrics.get("abc.jc")); - assertEquals("2", store.getTaskMetricStore(jobID.toString(), "taskid").metrics.get("2.abc.tc")); - assertEquals("1", store.getTaskMetricStore(jobID.toString(), "taskid").metrics.get("2.opname.abc.oc")); - } - } - - private static MetricDumpSerialization.MetricSerializationResult createRequestDumpAnswer(InstanceID tmID, JobID jobID) { - Map<Counter, Tuple2<QueryScopeInfo, String>> counters = new HashMap<>(); - Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new HashMap<>(); - Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new HashMap<>(); - Map<Meter, Tuple2<QueryScopeInfo, String>> meters = new HashMap<>(); - - SimpleCounter c1 = new SimpleCounter(); - SimpleCounter c2 = new SimpleCounter(); - - c1.inc(1); - c2.inc(2); - - counters.put(c1, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.OperatorQueryScopeInfo(jobID.toString(), "taskid", 2, "opname", "abc"), "oc")); - counters.put(c2, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.TaskQueryScopeInfo(jobID.toString(), "taskid", 2, "abc"), "tc")); - meters.put(new Meter() { - @Override - public void markEvent() { - } - - @Override - public void markEvent(long n) { - } - - @Override - public double getRate() { - return 5; - } - - @Override - public long getCount() { - return 10; - } - }, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.JobQueryScopeInfo(jobID.toString(), "abc"), "jc")); - gauges.put(new Gauge<String>() { - @Override - public String getValue() { - return "x"; - } - }, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.TaskManagerQueryScopeInfo(tmID.toString(), "abc"), "gauge")); - histograms.put(new TestingHistogram(), new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.JobManagerQueryScopeInfo("abc"), "hist")); - - MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer(); - MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters); - serializer.close(); - - return dump; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java deleted file mode 100644 index d19e8c6..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.metrics; - -import org.apache.flink.runtime.metrics.dump.MetricDump; -import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; -import org.apache.flink.util.TestLogger; - -import org.junit.Test; - -import java.io.IOException; - -import static org.junit.Assert.assertEquals; - -/** - * Tests for the MetricStore. - */ -public class MetricStoreTest extends TestLogger { - @Test - public void testAdd() throws IOException { - MetricStore store = setupStore(new MetricStore()); - - assertEquals("0", store.getJobManagerMetricStore().getMetric("abc.metric1", "-1")); - assertEquals("1", store.getTaskManagerMetricStore("tmid").getMetric("abc.metric2", "-1")); - assertEquals("2", store.getJobMetricStore("jobid").getMetric("abc.metric3", "-1")); - assertEquals("3", store.getTaskMetricStore("jobid", "taskid").getMetric("8.abc.metric4", "-1")); - assertEquals("4", store.getTaskMetricStore("jobid", "taskid").getMetric("8.opname.abc.metric5", "-1")); - } - - @Test - public void testMalformedNameHandling() { - MetricStore store = new MetricStore(); - //-----verify that no exceptions are thrown - - // null - store.add(null); - // empty name - QueryScopeInfo.JobManagerQueryScopeInfo info = new QueryScopeInfo.JobManagerQueryScopeInfo(""); - MetricDump.CounterDump cd = new MetricDump.CounterDump(info, "", 0); - store.add(cd); - - //-----verify that no side effects occur - assertEquals(0, store.jobManager.metrics.size()); - assertEquals(0, store.taskManagers.size()); - assertEquals(0, store.jobs.size()); - } - - public static MetricStore setupStore(MetricStore store) { - QueryScopeInfo.JobManagerQueryScopeInfo jm = new QueryScopeInfo.JobManagerQueryScopeInfo("abc"); - MetricDump.CounterDump cd1 = new MetricDump.CounterDump(jm, "metric1", 0); - - QueryScopeInfo.TaskManagerQueryScopeInfo tm = new QueryScopeInfo.TaskManagerQueryScopeInfo("tmid", "abc"); - MetricDump.CounterDump cd2 = new MetricDump.CounterDump(tm, "metric2", 1); - - QueryScopeInfo.JobQueryScopeInfo job = new QueryScopeInfo.JobQueryScopeInfo("jobid", "abc"); - MetricDump.CounterDump cd3 = new MetricDump.CounterDump(job, "metric3", 2); - - QueryScopeInfo.TaskQueryScopeInfo task = new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 8, "abc"); - MetricDump.CounterDump cd4 = new MetricDump.CounterDump(task, "metric4", 3); - - QueryScopeInfo.OperatorQueryScopeInfo operator = new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 8, "opname", "abc"); - MetricDump.CounterDump cd5 = new MetricDump.CounterDump(operator, "metric5", 4); - - store.add(cd1); - store.add(cd2); - store.add(cd3); - store.add(cd4); - store.add(cd5); - - return store; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java deleted file mode 100644 index 9c5549e..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.metrics; - -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; -import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; -import org.apache.flink.util.TestLogger; - -import org.junit.Assert; -import org.junit.Test; - -import java.util.HashMap; -import java.util.Map; - -import static org.apache.flink.runtime.webmonitor.handlers.TaskManagersHandler.TASK_MANAGER_ID_KEY; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.powermock.api.mockito.PowerMockito.mock; - -/** - * Tests for the TaskManagerMetricsHandler. - */ -public class TaskManagerMetricsHandlerTest extends TestLogger { - @Test - public void testGetPaths() { - TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class)); - String[] paths = handler.getPaths(); - Assert.assertEquals(1, paths.length); - Assert.assertEquals("/taskmanagers/:taskmanagerid/metrics", paths[0]); - } - - @Test - public void getMapFor() throws Exception { - MetricFetcher fetcher = new MetricFetcher( - mock(GatewayRetriever.class), - mock(MetricQueryServiceRetriever.class), - Executors.directExecutor(), - TestingUtils.TIMEOUT()); - MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore()); - - TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(Executors.directExecutor(), fetcher); - - Map<String, String> pathParams = new HashMap<>(); - pathParams.put(TASK_MANAGER_ID_KEY, "tmid"); - - Map<String, String> metrics = handler.getMapFor(pathParams, store); - - assertEquals("1", metrics.get("abc.metric2")); - } - - @Test - public void getMapForNull() { - MetricFetcher fetcher = new MetricFetcher( - mock(GatewayRetriever.class), - mock(MetricQueryServiceRetriever.class), - Executors.directExecutor(), - TestingUtils.TIMEOUT()); - MetricStore store = fetcher.getMetricStore(); - - TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(Executors.directExecutor(), fetcher); - - Map<String, String> pathParams = new HashMap<>(); - - Map<String, String> metrics = handler.getMapFor(pathParams, store); - - assertNull(metrics); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionBuilder.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionBuilder.java deleted file mode 100644 index 979d943..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionBuilder.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.utils; - -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.MeterView; -import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.ArchivedExecution; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.executiongraph.IOMetrics; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.util.Preconditions; - -import java.net.InetAddress; -import java.net.UnknownHostException; - -/** - * Utility class for constructing an ArchivedExecution. - */ -public class ArchivedExecutionBuilder { - private ExecutionAttemptID attemptId; - private long[] stateTimestamps; - private int attemptNumber; - private ExecutionState state; - private String failureCause; - private TaskManagerLocation assignedResourceLocation; - private StringifiedAccumulatorResult[] userAccumulators; - private IOMetrics ioMetrics; - private int parallelSubtaskIndex; - - public ArchivedExecutionBuilder setAttemptId(ExecutionAttemptID attemptId) { - this.attemptId = attemptId; - return this; - } - - public ArchivedExecutionBuilder setStateTimestamps(long[] stateTimestamps) { - Preconditions.checkArgument(stateTimestamps.length == ExecutionState.values().length); - this.stateTimestamps = stateTimestamps; - return this; - } - - public ArchivedExecutionBuilder setAttemptNumber(int attemptNumber) { - this.attemptNumber = attemptNumber; - return this; - } - - public ArchivedExecutionBuilder setState(ExecutionState state) { - this.state = state; - return this; - } - - public ArchivedExecutionBuilder setFailureCause(String failureCause) { - this.failureCause = failureCause; - return this; - } - - public ArchivedExecutionBuilder setAssignedResourceLocation(TaskManagerLocation assignedResourceLocation) { - this.assignedResourceLocation = assignedResourceLocation; - return this; - } - - public ArchivedExecutionBuilder setUserAccumulators(StringifiedAccumulatorResult[] userAccumulators) { - this.userAccumulators = userAccumulators; - return this; - } - - public ArchivedExecutionBuilder setParallelSubtaskIndex(int parallelSubtaskIndex) { - this.parallelSubtaskIndex = parallelSubtaskIndex; - return this; - } - - public ArchivedExecutionBuilder setIOMetrics(IOMetrics ioMetrics) { - this.ioMetrics = ioMetrics; - return this; - } - - public ArchivedExecution build() throws UnknownHostException { - return new ArchivedExecution( - userAccumulators != null ? userAccumulators : new StringifiedAccumulatorResult[0], - ioMetrics != null ? ioMetrics : new TestIOMetrics(), - attemptId != null ? attemptId : new ExecutionAttemptID(), - attemptNumber, - state != null ? state : ExecutionState.FINISHED, - failureCause != null ? failureCause : "(null)", - assignedResourceLocation != null ? assignedResourceLocation : new TaskManagerLocation(new ResourceID("tm"), InetAddress.getLocalHost(), 1234), - parallelSubtaskIndex, - stateTimestamps != null ? stateTimestamps : new long[]{1, 2, 3, 4, 5, 5, 5, 5} - ); - } - - private static class TestIOMetrics extends IOMetrics { - private static final long serialVersionUID = -5920076211680012555L; - - public TestIOMetrics() { - super( - new MeterView(new TestCounter(1), 0), - new MeterView(new TestCounter(2), 0), - new MeterView(new TestCounter(3), 0), - new MeterView(new TestCounter(4), 0), - new MeterView(new TestCounter(5), 0)); - } - } - - private static class TestCounter implements Counter { - private final long count; - - private TestCounter(long count) { - this.count = count; - } - - @Override - public void inc() { - } - - @Override - public void inc(long n) { - } - - @Override - public void dec() { - } - - @Override - public void dec(long n) { - } - - @Override - public long getCount() { - return count; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionConfigBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionConfigBuilder.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionConfigBuilder.java deleted file mode 100644 index 053f718..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionConfigBuilder.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.utils; - -import org.apache.flink.api.common.ArchivedExecutionConfig; -import org.apache.flink.api.common.ExecutionMode; - -import java.util.Collections; -import java.util.Map; - -/** - * Utility class for constructing an ArchivedExecutionConfig. - */ -public class ArchivedExecutionConfigBuilder { - private String executionMode; - private String restartStrategyDescription; - private int parallelism; - private boolean objectReuseEnabled; - private Map<String, String> globalJobParameters; - - public ArchivedExecutionConfigBuilder setExecutionMode(String executionMode) { - this.executionMode = executionMode; - return this; - } - - public ArchivedExecutionConfigBuilder setRestartStrategyDescription(String restartStrategyDescription) { - this.restartStrategyDescription = restartStrategyDescription; - return this; - } - - public ArchivedExecutionConfigBuilder setParallelism(int parallelism) { - this.parallelism = parallelism; - return this; - } - - public ArchivedExecutionConfigBuilder setObjectReuseEnabled(boolean objectReuseEnabled) { - this.objectReuseEnabled = objectReuseEnabled; - return this; - } - - public ArchivedExecutionConfigBuilder setGlobalJobParameters(Map<String, String> globalJobParameters) { - this.globalJobParameters = globalJobParameters; - return this; - } - - public ArchivedExecutionConfig build() { - return new ArchivedExecutionConfig( - executionMode != null ? executionMode : ExecutionMode.PIPELINED.name(), - restartStrategyDescription != null ? restartStrategyDescription : "default", - parallelism, - objectReuseEnabled, - globalJobParameters != null ? globalJobParameters : Collections.<String, String>emptyMap() - ); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java deleted file mode 100644 index 57b300a..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.utils; - -import org.apache.flink.api.common.ArchivedExecutionConfig; -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; -import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; -import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.ErrorInfo; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.util.Preconditions; -import org.apache.flink.util.SerializedValue; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Random; - -/** - * Utility class for constructing an ArchivedExecutionGraph. - */ -public class ArchivedExecutionGraphBuilder { - - private static final Random RANDOM = new Random(); - - private JobID jobID; - private String jobName; - private Map<JobVertexID, ArchivedExecutionJobVertex> tasks; - private List<ArchivedExecutionJobVertex> verticesInCreationOrder; - private long[] stateTimestamps; - private JobStatus state; - private ErrorInfo failureCause; - private String jsonPlan; - private StringifiedAccumulatorResult[] archivedUserAccumulators; - private ArchivedExecutionConfig archivedExecutionConfig; - private boolean isStoppable; - private Map<String, SerializedValue<Object>> serializedUserAccumulators; - - public ArchivedExecutionGraphBuilder setJobID(JobID jobID) { - this.jobID = jobID; - return this; - } - - public ArchivedExecutionGraphBuilder setJobName(String jobName) { - this.jobName = jobName; - return this; - } - - public ArchivedExecutionGraphBuilder setTasks(Map<JobVertexID, ArchivedExecutionJobVertex> tasks) { - this.tasks = tasks; - return this; - } - - public ArchivedExecutionGraphBuilder setVerticesInCreationOrder(List<ArchivedExecutionJobVertex> verticesInCreationOrder) { - this.verticesInCreationOrder = verticesInCreationOrder; - return this; - } - - public ArchivedExecutionGraphBuilder setStateTimestamps(long[] stateTimestamps) { - Preconditions.checkArgument(stateTimestamps.length == JobStatus.values().length); - this.stateTimestamps = stateTimestamps; - return this; - } - - public ArchivedExecutionGraphBuilder setState(JobStatus state) { - this.state = state; - return this; - } - - public ArchivedExecutionGraphBuilder setFailureCause(ErrorInfo failureCause) { - this.failureCause = failureCause; - return this; - } - - public ArchivedExecutionGraphBuilder setJsonPlan(String jsonPlan) { - this.jsonPlan = jsonPlan; - return this; - } - - public ArchivedExecutionGraphBuilder setArchivedUserAccumulators(StringifiedAccumulatorResult[] archivedUserAccumulators) { - this.archivedUserAccumulators = archivedUserAccumulators; - return this; - } - - public ArchivedExecutionGraphBuilder setArchivedExecutionConfig(ArchivedExecutionConfig archivedExecutionConfig) { - this.archivedExecutionConfig = archivedExecutionConfig; - return this; - } - - public ArchivedExecutionGraphBuilder setStoppable(boolean stoppable) { - isStoppable = stoppable; - return this; - } - - public ArchivedExecutionGraphBuilder setSerializedUserAccumulators(Map<String, SerializedValue<Object>> serializedUserAccumulators) { - this.serializedUserAccumulators = serializedUserAccumulators; - return this; - } - - public ArchivedExecutionGraph build() { - Preconditions.checkNotNull(tasks, "Tasks must not be null."); - JobID jobID = this.jobID != null ? this.jobID : new JobID(); - String jobName = this.jobName != null ? this.jobName : "job_" + RANDOM.nextInt(); - return new ArchivedExecutionGraph( - jobID, - jobName, - tasks, - verticesInCreationOrder != null ? verticesInCreationOrder : new ArrayList<>(tasks.values()), - stateTimestamps != null ? stateTimestamps : new long[JobStatus.values().length], - state != null ? state : JobStatus.FINISHED, - failureCause, - jsonPlan != null ? jsonPlan : "{\"jobid\":\"" + jobID + "\", \"name\":\"" + jobName + "\", \"nodes\":[]}", - archivedUserAccumulators != null ? archivedUserAccumulators : new StringifiedAccumulatorResult[0], - serializedUserAccumulators != null ? serializedUserAccumulators : Collections.<String, SerializedValue<Object>>emptyMap(), - archivedExecutionConfig != null ? archivedExecutionConfig : new ArchivedExecutionConfigBuilder().build(), - isStoppable, - null, - null - ); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionJobVertexBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionJobVertexBuilder.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionJobVertexBuilder.java deleted file mode 100644 index 3ef4106..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionJobVertexBuilder.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.utils; - -import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; -import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.util.Preconditions; - -import java.util.Random; - -/** - * Utility class for constructing an ArchivedExecutionJobVertex. - */ -public class ArchivedExecutionJobVertexBuilder { - - private static final Random RANDOM = new Random(); - - private ArchivedExecutionVertex[] taskVertices; - private JobVertexID id; - private String name; - private int parallelism; - private int maxParallelism; - private StringifiedAccumulatorResult[] archivedUserAccumulators; - - public ArchivedExecutionJobVertexBuilder setTaskVertices(ArchivedExecutionVertex[] taskVertices) { - this.taskVertices = taskVertices; - return this; - } - - public ArchivedExecutionJobVertexBuilder setId(JobVertexID id) { - this.id = id; - return this; - } - - public ArchivedExecutionJobVertexBuilder setName(String name) { - this.name = name; - return this; - } - - public ArchivedExecutionJobVertexBuilder setParallelism(int parallelism) { - this.parallelism = parallelism; - return this; - } - - public ArchivedExecutionJobVertexBuilder setMaxParallelism(int maxParallelism) { - this.maxParallelism = maxParallelism; - return this; - } - - public ArchivedExecutionJobVertexBuilder setArchivedUserAccumulators(StringifiedAccumulatorResult[] archivedUserAccumulators) { - this.archivedUserAccumulators = archivedUserAccumulators; - return this; - } - - public ArchivedExecutionJobVertex build() { - Preconditions.checkNotNull(taskVertices); - return new ArchivedExecutionJobVertex( - taskVertices, - id != null ? id : new JobVertexID(), - name != null ? name : "task_" + RANDOM.nextInt(), - parallelism, - maxParallelism, - archivedUserAccumulators != null ? archivedUserAccumulators : new StringifiedAccumulatorResult[0] - ); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionVertexBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionVertexBuilder.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionVertexBuilder.java deleted file mode 100644 index 67e9e11..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionVertexBuilder.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.utils; - -import org.apache.flink.runtime.executiongraph.ArchivedExecution; -import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex; -import org.apache.flink.runtime.util.EvictingBoundedList; -import org.apache.flink.util.Preconditions; - -import java.util.List; -import java.util.Random; - -/** - * Utility class for constructing an ArchivedExecutionVertex. - */ -public class ArchivedExecutionVertexBuilder { - - private static final Random RANDOM = new Random(); - - private int subtaskIndex; - private EvictingBoundedList<ArchivedExecution> priorExecutions; - private String taskNameWithSubtask; - private ArchivedExecution currentExecution; - - public ArchivedExecutionVertexBuilder setSubtaskIndex(int subtaskIndex) { - this.subtaskIndex = subtaskIndex; - return this; - } - - public ArchivedExecutionVertexBuilder setPriorExecutions(List<ArchivedExecution> priorExecutions) { - this.priorExecutions = new EvictingBoundedList<>(priorExecutions.size()); - for (ArchivedExecution execution : priorExecutions) { - this.priorExecutions.add(execution); - } - return this; - } - - public ArchivedExecutionVertexBuilder setTaskNameWithSubtask(String taskNameWithSubtask) { - this.taskNameWithSubtask = taskNameWithSubtask; - return this; - } - - public ArchivedExecutionVertexBuilder setCurrentExecution(ArchivedExecution currentExecution) { - this.currentExecution = currentExecution; - return this; - } - - public ArchivedExecutionVertex build() { - Preconditions.checkNotNull(currentExecution); - return new ArchivedExecutionVertex( - subtaskIndex, - taskNameWithSubtask != null ? taskNameWithSubtask : "task_" + RANDOM.nextInt() + "_" + subtaskIndex, - currentExecution, - priorExecutions != null ? priorExecutions : new EvictingBoundedList<ArchivedExecution>(0) - ); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java deleted file mode 100644 index 3e4fc01..0000000 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.utils; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.AccessExecution; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; -import org.apache.flink.runtime.executiongraph.ArchivedExecution; -import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; -import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex; -import org.apache.flink.runtime.executiongraph.ErrorInfo; -import org.apache.flink.runtime.executiongraph.IOMetrics; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; - -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; - -import java.net.InetAddress; -import java.util.HashMap; -import java.util.Map; - -import static org.junit.Assert.assertEquals; - -/** - * Common entry-point for accessing generated ArchivedExecution* components. - */ -public class ArchivedJobGenerationUtils { - public static final ObjectMapper MAPPER = new ObjectMapper(); - public static final JsonFactory JACKSON_FACTORY = new JsonFactory() - .enable(JsonGenerator.Feature.AUTO_CLOSE_TARGET) - .disable(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT); - - private static ArchivedExecutionGraph originalJob; - private static ArchivedExecutionJobVertex originalTask; - private static ArchivedExecutionVertex originalSubtask; - private static ArchivedExecution originalAttempt; - - private static final Object lock = new Object(); - - private ArchivedJobGenerationUtils() { - } - - public static AccessExecutionGraph getTestJob() throws Exception { - synchronized (lock) { - if (originalJob == null) { - generateArchivedJob(); - } - } - return originalJob; - } - - public static AccessExecutionJobVertex getTestTask() throws Exception { - synchronized (lock) { - if (originalJob == null) { - generateArchivedJob(); - } - } - return originalTask; - } - - public static AccessExecutionVertex getTestSubtask() throws Exception { - synchronized (lock) { - if (originalJob == null) { - generateArchivedJob(); - } - } - return originalSubtask; - } - - public static AccessExecution getTestAttempt() throws Exception { - synchronized (lock) { - if (originalJob == null) { - generateArchivedJob(); - } - } - return originalAttempt; - } - - private static void generateArchivedJob() throws Exception { - // Attempt - StringifiedAccumulatorResult acc1 = new StringifiedAccumulatorResult("name1", "type1", "value1"); - StringifiedAccumulatorResult acc2 = new StringifiedAccumulatorResult("name2", "type2", "value2"); - TaskManagerLocation location = new TaskManagerLocation(new ResourceID("hello"), InetAddress.getLocalHost(), 1234); - originalAttempt = new ArchivedExecutionBuilder() - .setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9}) - .setParallelSubtaskIndex(1) - .setAttemptNumber(0) - .setAssignedResourceLocation(location) - .setUserAccumulators(new StringifiedAccumulatorResult[]{acc1, acc2}) - .setState(ExecutionState.FINISHED) - .setFailureCause("attemptException") - .build(); - // Subtask - originalSubtask = new ArchivedExecutionVertexBuilder() - .setSubtaskIndex(originalAttempt.getParallelSubtaskIndex()) - .setTaskNameWithSubtask("hello(1/1)") - .setCurrentExecution(originalAttempt) - .build(); - // Task - originalTask = new ArchivedExecutionJobVertexBuilder() - .setTaskVertices(new ArchivedExecutionVertex[]{originalSubtask}) - .build(); - // Job - Map<JobVertexID, ArchivedExecutionJobVertex> tasks = new HashMap<>(); - tasks.put(originalTask.getJobVertexId(), originalTask); - originalJob = new ArchivedExecutionGraphBuilder() - .setJobID(new JobID()) - .setTasks(tasks) - .setFailureCause(new ErrorInfo(new Exception("jobException"), originalAttempt.getStateTimestamp(ExecutionState.FAILED))) - .setState(JobStatus.FINISHED) - .setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}) - .setArchivedUserAccumulators(new StringifiedAccumulatorResult[]{acc1, acc2}) - .build(); - } - - // ======================================================================== - // utility methods - // ======================================================================== - - public static void compareStringifiedAccumulators(StringifiedAccumulatorResult[] expectedAccs, ArrayNode writtenAccs) { - assertEquals(expectedAccs.length, writtenAccs.size()); - for (int x = 0; x < expectedAccs.length; x++) { - JsonNode acc = writtenAccs.get(x); - - assertEquals(expectedAccs[x].getName(), acc.get("name").asText()); - assertEquals(expectedAccs[x].getType(), acc.get("type").asText()); - assertEquals(expectedAccs[x].getValue(), acc.get("value").asText()); - } - } - - public static void compareIoMetrics(IOMetrics expectedMetrics, JsonNode writtenMetrics) { - assertEquals(expectedMetrics.getNumBytesInTotal(), writtenMetrics.get("read-bytes").asLong()); - assertEquals(expectedMetrics.getNumBytesOut(), writtenMetrics.get("write-bytes").asLong()); - assertEquals(expectedMetrics.getNumRecordsIn(), writtenMetrics.get("read-records").asLong()); - assertEquals(expectedMetrics.getNumRecordsOut(), writtenMetrics.get("write-records").asLong()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java new file mode 100644 index 0000000..4ad1759 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +/** + * A special exception that indicates that an element was not found and that the + * request should be answered with a {@code 404} return code. + */ +public class NotFoundException extends Exception { + + private static final long serialVersionUID = -4036006746423754639L; + + public NotFoundException(String message) { + super(message); + } +}
