Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5849#discussion_r181687398
--- Diff:
flink-end-to-end-tests/flink-rest-api-test/src/main/java/org/apache/flink/runtime/rest/tests/RestApiTestSuite.java
---
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.tests;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.blob.TransientBlobKey;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import
org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Rest API test suite.
+ */
+public class RestApiTestSuite {
+ private static final FiniteDuration TEST_TIMEOUT = new
FiniteDuration(10L, TimeUnit.SECONDS);
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ private static int testSuccessCount = 0;
+ private static int testFailureCount = 0;
+ private static int testSkipCount = 0;
+
+ public static void main(String[] args) throws Exception {
+
+ ParameterTool params = ParameterTool.fromArgs(args);
+ final String host = params.get("host", "localhost");
+ final int port = params.getInt("port", 8081);
+ final HttpTestClient httpClient = new HttpTestClient(host,
port);
+
+ // Validate Flink cluster is running
+ JobIdsWithStatusOverview jobOverview =
getJobOverview(httpClient);
+
+ // Get necessary parameters for testing GET endpoints
+ Map<String, String> parameterMap = getParameterMaps(httpClient,
jobOverview);
+
+ // Get list of endpoints
+ List<MessageHeaders> specs = new
E2ETestDispatcherRestEndpoint().getSpecs();
+ specs.forEach(spec -> testMonitoringEndpointSpecs(httpClient,
spec, parameterMap));
+
+ if (testFailureCount != 0) {
+ throw new RuntimeException("There are test failures.
Success: " + testSuccessCount +
+ " Failures: " + testFailureCount + " Skipped: "
+ testSkipCount);
+ }
+ }
+
+ @SuppressWarnings("ConstantConditions")
+ private static Map<String, String> getParameterMaps(HttpTestClient
httpClient,
+ JobIdsWithStatusOverview jobOverview) throws
InterruptedException, IOException, TimeoutException {
+ // Get necessary parameters used for all REST API testings.
+ final Map<String, String> parameterMap = new HashMap<>();
+
Preconditions.checkState(jobOverview.getJobsWithStatus().stream()
+ .filter(jobIdWithStatus ->
jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
+ .count() >= 1, "Cannot found active running jobs,
discontinuing test!");
+ String jobId = jobOverview.getJobsWithStatus().stream()
+ .filter(jobIdWithStatus ->
jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
+ .findFirst().get().getJobId().toString();
+ parameterMap.put(":jobid", jobId);
+
+ JobDetailsInfo jobDetailsInfo = getJobDetailInfo(httpClient,
jobId);
+ String vertexId = jobDetailsInfo.getJobVertexInfos().stream()
+ .findFirst().get().getJobVertexID().toString();
+ parameterMap.put(":vertexid", vertexId);
+ parameterMap.put(":checkpointid", "1"); // test first checkpoint
+ parameterMap.put(":subtaskindex", "0"); // test first subtask
+
+ TaskManagersInfo taskManagersInfo = getTaskManagers(httpClient);
+ String taskMgrId =
taskManagersInfo.getTaskManagerInfos().stream().findFirst().get().getResourceId().toString();
+ parameterMap.put(":taskmanagerid", taskMgrId);
+ parameterMap.put(":triggerid", "");
+
+ return parameterMap;
+ }
+
+ private static JobIdsWithStatusOverview getJobOverview(HttpTestClient
httpClient)
+ throws TimeoutException, InterruptedException, IOException {
+ httpClient.sendGetRequest("/jobs", TEST_TIMEOUT);
+ HttpTestClient.SimpleHttpResponse resp =
httpClient.getNextResponse();
+ Preconditions.checkState(resp.getStatus().code() == 200,
+ "Cannot fetch Flink cluster status!");
+ return MAPPER.readValue(resp.getContent(),
JobIdsWithStatusOverview.class);
+ }
+
+ private static JobDetailsInfo getJobDetailInfo(HttpTestClient
httpClient, String jobId)
--- End diff --
these methods can be made less brittle my using the corresponding
`MessageHeader` objects.
```
JobDetailsHeaders headers = JobDetailsHeaders.getInstance();
httpClient.sendGetRequest(headers.getTargetRestEndpointURL().replace(JobIDPathParameter.KEY,
jobId), TEST_TIMEOUT);
HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
Preconditions.checkState(resp.getStatus() ==
headers.getResponseStatusCode(),
"Cannot fetch job detail information for job " + jobId);
return MAPPER.readValue(resp.getContent(), headers.getResponseClass());
```
---