Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5849#discussion_r181669761
  
    --- Diff: 
flink-end-to-end-tests/flink-rest-api-test/src/main/java/org/apache/flink/runtime/rest/tests/RestApiTestSuite.java
 ---
    @@ -0,0 +1,363 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.tests;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.utils.ParameterTool;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.RestOptions;
    +import org.apache.flink.runtime.blob.TransientBlobKey;
    +import org.apache.flink.runtime.blob.TransientBlobService;
    +import org.apache.flink.runtime.concurrent.Executors;
    +import org.apache.flink.runtime.dispatcher.DispatcherGateway;
    +import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.leaderelection.LeaderContender;
    +import org.apache.flink.runtime.leaderelection.LeaderElectionService;
    +import 
org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
    +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
    +import org.apache.flink.runtime.rest.HttpMethodWrapper;
    +import org.apache.flink.runtime.rest.RestServerEndpoint;
    +import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
    +import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
    +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.MessageHeaders;
    +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
    +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
    +import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
    +import org.apache.flink.util.ConfigurationException;
    +import org.apache.flink.util.Preconditions;
    +
    +import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    +import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +import java.util.regex.Pattern;
    +import java.util.stream.Collectors;
    +
    +import scala.concurrent.duration.FiniteDuration;
    +
    +/**
    + * Rest API test suite.
    + */
    +public class RestApiTestSuite {
    +   private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(10L, TimeUnit.SECONDS);
    +   private static final ObjectMapper MAPPER = new ObjectMapper();
    +
    +   private static int testSuccessCount = 0;
    +   private static int testFailureCount = 0;
    +   private static int testSkipCount = 0;
    +
    +   public static void main(String[] args) throws Exception {
    +
    +           ParameterTool params = ParameterTool.fromArgs(args);
    +           final String host = params.get("host", "localhost");
    +           final int port = params.getInt("port", 8081);
    +           final HttpTestClient httpClient = new HttpTestClient(host, 
port);
    +
    +           // Validate Flink cluster is running
    +           JobIdsWithStatusOverview jobOverview = 
getJobOverview(httpClient);
    +
    +           // Get necessary parameters for testing GET endpoints
    +           Map<String, String> parameterMap = getParameterMaps(httpClient, 
jobOverview);
    +
    +           // Get list of endpoints
    +           List<MessageHeaders> specs = new 
E2ETestDispatcherRestEndpoint().getSpecs();
    +           specs.forEach(spec -> testMonitoringEndpointSpecs(httpClient, 
spec, parameterMap));
    +
    +           if (testFailureCount != 0) {
    +                   throw new RuntimeException("There are test failures. 
Success: " + testSuccessCount +
    +                           " Failures: " + testFailureCount + " Skipped: " 
+ testSkipCount);
    +           }
    +   }
    +
    +   @SuppressWarnings("ConstantConditions")
    +   private static Map<String, String> getParameterMaps(HttpTestClient 
httpClient,
    +           JobIdsWithStatusOverview jobOverview) throws 
InterruptedException, IOException, TimeoutException {
    +           // Get necessary parameters used for all REST API testings.
    +           final Map<String, String> parameterMap = new HashMap<>();
    +           
Preconditions.checkState(jobOverview.getJobsWithStatus().stream()
    +                   .filter(jobIdWithStatus -> 
jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
    +                   .count() >= 1, "Cannot found active running jobs, 
discontinuing test!");
    +           String jobId = jobOverview.getJobsWithStatus().stream()
    +                   .filter(jobIdWithStatus -> 
jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
    +                   .findFirst().get().getJobId().toString();
    +           parameterMap.put(":jobid", jobId);
    +
    +           JobDetailsInfo jobDetailsInfo = getJobDetailInfo(httpClient, 
jobId);
    +           String vertexId = jobDetailsInfo.getJobVertexInfos().stream()
    +                   .findFirst().get().getJobVertexID().toString();
    +           parameterMap.put(":vertexid", vertexId);
    +           parameterMap.put(":checkpointid", "1"); // test first checkpoint
    +           parameterMap.put(":subtaskindex", "0"); // test first subtask
    +
    +           TaskManagersInfo taskManagersInfo = getTaskManagers(httpClient);
    +           String taskMgrId = 
taskManagersInfo.getTaskManagerInfos().stream().findFirst().get().getResourceId().toString();
    +           parameterMap.put(":taskmanagerid", taskMgrId);
    +           parameterMap.put(":triggerid", "");
    +
    +           return parameterMap;
    +   }
    +
    +   private static JobIdsWithStatusOverview getJobOverview(HttpTestClient 
httpClient)
    +           throws TimeoutException, InterruptedException, IOException {
    +           httpClient.sendGetRequest("/jobs", TEST_TIMEOUT);
    +           HttpTestClient.SimpleHttpResponse resp = 
httpClient.getNextResponse();
    +           Preconditions.checkState(resp.getStatus().code() == 200,
    +                   "Cannot fetch Flink cluster status!");
    +           return MAPPER.readValue(resp.getContent(), 
JobIdsWithStatusOverview.class);
    +   }
    +
    +   private static JobDetailsInfo getJobDetailInfo(HttpTestClient 
httpClient, String jobId)
    +           throws TimeoutException, InterruptedException, IOException {
    +                   httpClient.sendGetRequest("/jobs/" + jobId, 
TEST_TIMEOUT);
    +                   HttpTestClient.SimpleHttpResponse resp = 
httpClient.getNextResponse();
    +                   Preconditions.checkState(resp.getStatus().code() == 200,
    +                           "Cannot fetch job detail information for job " 
+ jobId);
    +                   return MAPPER.readValue(resp.getContent(), 
JobDetailsInfo.class);
    +   }
    +
    +   private static TaskManagersInfo getTaskManagers(HttpTestClient 
httpClient)
    +           throws TimeoutException, InterruptedException, IOException {
    +           httpClient.sendGetRequest("/taskmanagers", TEST_TIMEOUT);
    +           HttpTestClient.SimpleHttpResponse resp = 
httpClient.getNextResponse();
    +           Preconditions.checkState(resp.getStatus().code() == 200,
    +                   "Cannot fetch task manager status!");
    +           return MAPPER.readValue(resp.getContent(), 
TaskManagersInfo.class);
    +   }
    +
    +   private static void testMonitoringEndpointSpecs(HttpTestClient 
httpClient, MessageHeaders spec,
    +           Map<String, String> parameterMap) {
    +           try {
    +                   HttpMethodWrapper method = spec.getHttpMethod();
    +                   String path = 
getRestEndpointPath(spec.getTargetRestEndpointURL(), parameterMap);
    +                   if (spec.getRequestClass() == EmptyRequestBody.class) {
    +                           switch (method) {
    +                                   case GET:
    +                                           httpClient.sendGetRequest(path, 
TEST_TIMEOUT);
    +                                           break;
    +                                   case DELETE:
    +                                           
httpClient.sendDeleteRequest(path, TEST_TIMEOUT);
    +                                           break;
    +                                   default:
    +                                           throw new 
UnsupportedOperationException("Cannot handle REST Test for " + path +
    +                                           " with method " + method + ". 
Only GET and DELETE requests are supported!");
    +                           }
    +
    +                           HttpTestClient.SimpleHttpResponse resp = 
httpClient.getNextResponse();
    +
    +                           
Preconditions.checkState(resp.getStatus().code() == 
spec.getResponseStatusCode().code(),
    +                                   "Found mismatching status code from 
endpoint " + path + " with method " + method +
    +                                   ", expecting: " + 
spec.getResponseStatusCode().code() + ", but was: " + resp.getStatus().code());
    +                           // System.out.println("Found matching status 
for endpoint " + path + " with method  " + method);
    +                           @SuppressWarnings("unchecked")
    --- End diff --
    
    this is unnecessary. `MessageHeaders` have the response class as a generic 
argument.
    
    Change the method signature to 
    ```
    private static <P extends ResponseBody> void 
testMonitoringEndpointSpecs(HttpTestClient httpClient, MessageHeaders<?, P, ?> 
spec, Map<String, String> parameterMap)
    ```


---

Reply via email to