This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 55161db6286c2d9c5fd9f8c0fa13d38c2f17d61a
Author: Chesnay Schepler <[email protected]>
AuthorDate: Mon Oct 4 11:48:30 2021 +0200

    [FLINK-24275][rest] Add HandlerRequest factory methods
    
    Add factory methods for production code paths (where parameters must be 
resolved from the maps that Netty provides) and test code paths (where 
parameters are already resolved).
---
 .../webmonitor/handlers/JarDeleteHandlerTest.java  |   5 +-
 .../handlers/JarHandlerParameterTest.java          |   2 +-
 .../runtime/webmonitor/handlers/JarHandlers.java   |  12 +--
 .../webmonitor/handlers/JarUploadHandlerTest.java  |   4 +-
 .../runtime/rest/handler/AbstractHandler.java      |   2 +-
 .../flink/runtime/rest/handler/HandlerRequest.java | 115 ++++++++++++---------
 .../AbstractAsynchronousOperationHandlersTest.java |   9 +-
 .../cluster/JobManagerCustomLogHandlerTest.java    |   5 +-
 .../cluster/JobManagerLogListHandlerTest.java      |   5 +-
 .../rest/handler/job/JobConfigHandlerTest.java     |   5 +-
 .../rest/handler/job/JobExceptionsHandlerTest.java |   5 +-
 .../handler/job/JobExecutionResultHandlerTest.java |   5 +-
 .../rest/handler/job/JobSubmitHandlerTest.java     |  18 +---
 .../job/JobVertexBackPressureHandlerTest.java      |  10 +-
 .../SubtaskCurrentAttemptDetailsHandlerTest.java   |   5 +-
 ...askExecutionAttemptAccumulatorsHandlerTest.java |   2 +-
 .../SubtaskExecutionAttemptDetailsHandlerTest.java |   5 +-
 .../job/metrics/AbstractMetricsHandlerTest.java    |  20 ++--
 .../metrics/AggregatingMetricsHandlerTestBase.java |  45 ++++----
 .../metrics/JobVertexWatermarksHandlerTest.java    |   5 +-
 .../job/metrics/MetricsHandlerTestBase.java        |   5 +-
 .../job/savepoints/SavepointHandlersTest.java      |  10 +-
 .../savepoints/StopWithSavepointHandlersTest.java  |  10 +-
 .../AbstractTaskManagerFileHandlerTest.java        |   5 +-
 .../taskmanager/TaskManagerDetailsHandlerTest.java |   5 +-
 .../taskmanager/TaskManagerLogListHandlerTest.java |   5 +-
 .../rest/handler/util/HandlerRequestUtilsTest.java |  15 +--
 27 files changed, 185 insertions(+), 154 deletions(-)

diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandlerTest.java
index 750c801..4746a1e 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandlerTest.java
@@ -136,11 +136,12 @@ public class JarDeleteHandlerTest extends TestLogger {
 
     private static HandlerRequest<EmptyRequestBody, 
JarDeleteMessageParameters> createRequest(
             final String jarFileName) throws HandlerRequestException {
-        return new HandlerRequest<>(
+        return HandlerRequest.resolveParametersAndCreate(
                 EmptyRequestBody.getInstance(),
                 new JarDeleteMessageParameters(),
                 Collections.singletonMap(JarIdPathParameter.KEY, jarFileName),
-                Collections.emptyMap());
+                Collections.emptyMap(),
+                Collections.emptyList());
     }
 
     private void makeJarDirReadOnly() {
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java
index daa2986..3fcd991 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java
@@ -291,7 +291,7 @@ public abstract class JarHandlerParameterTest<
                                         MessageParameter::getKey,
                                         
JarHandlerParameterTest::getValuesAsString));
 
-        return new HandlerRequest<>(
+        return HandlerRequest.resolveParametersAndCreate(
                 requestBody,
                 unresolvedMessageParameters,
                 Collections.singletonMap(JarIdPathParameter.KEY, 
jar.getFileName().toString()),
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlers.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlers.java
index f08d23e..b726f28 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlers.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlers.java
@@ -106,11 +106,9 @@ public class JarHandlers {
     public static String uploadJar(
             JarUploadHandler handler, Path jar, RestfulGateway restfulGateway) 
throws Exception {
         HandlerRequest<EmptyRequestBody, EmptyMessageParameters> uploadRequest 
=
-                new HandlerRequest<>(
+                HandlerRequest.create(
                         EmptyRequestBody.getInstance(),
                         EmptyMessageParameters.getInstance(),
-                        Collections.emptyMap(),
-                        Collections.emptyMap(),
                         Collections.singletonList(jar.toFile()));
         final JarUploadResponseBody uploadResponse =
                 handler.handleRequest(uploadRequest, restfulGateway).get();
@@ -120,7 +118,7 @@ public class JarHandlers {
     public static JarListInfo listJars(JarListHandler handler, RestfulGateway 
restfulGateway)
             throws Exception {
         HandlerRequest<EmptyRequestBody, EmptyMessageParameters> listRequest =
-                new HandlerRequest<>(
+                HandlerRequest.create(
                         EmptyRequestBody.getInstance(), 
EmptyMessageParameters.getInstance());
         return handler.handleRequest(listRequest, restfulGateway).get();
     }
@@ -131,7 +129,7 @@ public class JarHandlers {
         JarPlanMessageParameters planParameters =
                 
JarPlanGetHeaders.getInstance().getUnresolvedMessageParameters();
         HandlerRequest<JarPlanRequestBody, JarPlanMessageParameters> 
planRequest =
-                new HandlerRequest<>(
+                HandlerRequest.resolveParametersAndCreate(
                         new JarPlanRequestBody(),
                         planParameters,
                         Collections.singletonMap(
@@ -147,7 +145,7 @@ public class JarHandlers {
         final JarRunMessageParameters runParameters =
                 JarRunHeaders.getInstance().getUnresolvedMessageParameters();
         HandlerRequest<JarRunRequestBody, JarRunMessageParameters> runRequest =
-                new HandlerRequest<>(
+                HandlerRequest.resolveParametersAndCreate(
                         new JarRunRequestBody(),
                         runParameters,
                         Collections.singletonMap(
@@ -163,7 +161,7 @@ public class JarHandlers {
         JarDeleteMessageParameters deleteParameters =
                 
JarDeleteHeaders.getInstance().getUnresolvedMessageParameters();
         HandlerRequest<EmptyRequestBody, JarDeleteMessageParameters> 
deleteRequest =
-                new HandlerRequest<>(
+                HandlerRequest.resolveParametersAndCreate(
                         EmptyRequestBody.getInstance(),
                         deleteParameters,
                         Collections.singletonMap(
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
index f1b5150..d90b95c 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
@@ -138,11 +138,9 @@ public class JarUploadHandlerTest extends TestLogger {
 
     private static HandlerRequest<EmptyRequestBody, EmptyMessageParameters> 
createRequest(
             final Path uploadedFile) throws HandlerRequestException, 
IOException {
-        return new HandlerRequest<>(
+        return HandlerRequest.create(
                 EmptyRequestBody.getInstance(),
                 EmptyMessageParameters.getInstance(),
-                Collections.emptyMap(),
-                Collections.emptyMap(),
                 Collections.singleton(uploadedFile.toFile()));
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
index 14ceb46..fe8f806 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
@@ -175,7 +175,7 @@ public abstract class AbstractHandler<
 
             try {
                 handlerRequest =
-                        new HandlerRequest<R, M>(
+                        HandlerRequest.resolveParametersAndCreate(
                                 request,
                                 
untypedResponseMessageHeaders.getUnresolvedMessageParameters(),
                                 routedRequest.getRouteResult().pathParams(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
index 61977e6..e1695c9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.rest.handler;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.rest.messages.MessageParameter;
 import org.apache.flink.runtime.rest.messages.MessageParameters;
 import org.apache.flink.runtime.rest.messages.MessagePathParameter;
@@ -51,45 +52,19 @@ public class HandlerRequest<R extends RequestBody, M 
extends MessageParameters>
     private final Map<Class<? extends MessageQueryParameter<?>>, 
MessageQueryParameter<?>>
             queryParameters;
 
-    public HandlerRequest(R requestBody, M messageParameters) throws 
HandlerRequestException {
-        this(
-                requestBody,
-                messageParameters,
-                Collections.emptyMap(),
-                Collections.emptyMap(),
-                Collections.emptyList());
-    }
-
-    public HandlerRequest(
-            R requestBody,
-            M messageParameters,
-            Map<String, String> receivedPathParameters,
-            Map<String, List<String>> receivedQueryParameters)
-            throws HandlerRequestException {
-        this(
-                requestBody,
-                messageParameters,
-                receivedPathParameters,
-                receivedQueryParameters,
-                Collections.emptyList());
-    }
-
-    public HandlerRequest(
+    private HandlerRequest(
             R requestBody,
-            M messageParameters,
-            Map<String, String> receivedPathParameters,
-            Map<String, List<String>> receivedQueryParameters,
-            Collection<File> uploadedFiles)
-            throws HandlerRequestException {
+            Map<Class<? extends MessagePathParameter<?>>, 
MessagePathParameter<?>>
+                    receivedPathParameters,
+            Map<Class<? extends MessageQueryParameter<?>>, 
MessageQueryParameter<?>>
+                    receivedQueryParameters,
+            Collection<File> uploadedFiles) {
         this.requestBody = Preconditions.checkNotNull(requestBody);
         this.uploadedFiles =
                 
Collections.unmodifiableCollection(Preconditions.checkNotNull(uploadedFiles));
-        Preconditions.checkNotNull(messageParameters);
-        Preconditions.checkNotNull(receivedQueryParameters);
-        Preconditions.checkNotNull(receivedPathParameters);
 
-        pathParameters = resolvePathParameters(messageParameters, 
receivedPathParameters);
-        queryParameters = resolveQueryParameters(messageParameters, 
receivedQueryParameters);
+        pathParameters = Preconditions.checkNotNull(receivedPathParameters);
+        queryParameters = Preconditions.checkNotNull(receivedQueryParameters);
     }
 
     /**
@@ -143,11 +118,55 @@ public class HandlerRequest<R extends RequestBody, M 
extends MessageParameters>
         return uploadedFiles;
     }
 
-    private static Map<Class<? extends MessagePathParameter<?>>, 
MessagePathParameter<?>>
-            resolvePathParameters(
-                    MessageParameters messageParameters, Map<String, String> 
receivedPathParameters)
+    /**
+     * Short-cut for {@link #create(RequestBody, MessageParameters, 
Collection)} without any
+     * uploaded files.
+     */
+    @VisibleForTesting
+    public static <R extends RequestBody, M extends MessageParameters> 
HandlerRequest<R, M> create(
+            R requestBody, M messageParameters) {
+        return create(requestBody, messageParameters, Collections.emptyList());
+    }
+
+    /**
+     * Creates a new {@link HandlerRequest}. The given {@link 
MessageParameters} are expected to be
+     * resolved.
+     */
+    @VisibleForTesting
+    public static <R extends RequestBody, M extends MessageParameters> 
HandlerRequest<R, M> create(
+            R requestBody, M messageParameters, Collection<File> 
uploadedFiles) {
+        return new HandlerRequest<R, M>(
+                requestBody,
+                mapParameters(messageParameters.getPathParameters()),
+                mapParameters(messageParameters.getQueryParameters()),
+                uploadedFiles);
+    }
+
+    /**
+     * Creates a new {@link HandlerRequest} after resolving the given {@link 
MessageParameters}
+     * against the given query/path parameter maps.
+     *
+     * <p>For tests it is recommended to resolve the parameters manually and 
use {@link #create}.
+     */
+    public static <R extends RequestBody, M extends MessageParameters>
+            HandlerRequest<R, M> resolveParametersAndCreate(
+                    R requestBody,
+                    M messageParameters,
+                    Map<String, String> receivedPathParameters,
+                    Map<String, List<String>> receivedQueryParameters,
+                    Collection<File> uploadedFiles)
                     throws HandlerRequestException {
 
+        resolvePathParameters(messageParameters, receivedPathParameters);
+        resolveQueryParameters(messageParameters, receivedQueryParameters);
+
+        return create(requestBody, messageParameters, uploadedFiles);
+    }
+
+    private static void resolvePathParameters(
+            MessageParameters messageParameters, Map<String, String> 
receivedPathParameters)
+            throws HandlerRequestException {
+
         for (MessagePathParameter<?> pathParameter : 
messageParameters.getPathParameters()) {
             String value = receivedPathParameters.get(pathParameter.getKey());
             if (value != null) {
@@ -163,15 +182,11 @@ public class HandlerRequest<R extends RequestBody, M 
extends MessageParameters>
                 }
             }
         }
-
-        return mapParameters(messageParameters.getPathParameters());
     }
 
-    private static Map<Class<? extends MessageQueryParameter<?>>, 
MessageQueryParameter<?>>
-            resolveQueryParameters(
-                    MessageParameters messageParameters,
-                    Map<String, List<String>> receivedQueryParameters)
-                    throws HandlerRequestException {
+    private static void resolveQueryParameters(
+            MessageParameters messageParameters, Map<String, List<String>> 
receivedQueryParameters)
+            throws HandlerRequestException {
 
         for (MessageQueryParameter<?> queryParameter : 
messageParameters.getQueryParameters()) {
             List<String> values = 
receivedQueryParameters.get(queryParameter.getKey());
@@ -191,19 +206,17 @@ public class HandlerRequest<R extends RequestBody, M 
extends MessageParameters>
                 }
             }
         }
-
-        return mapParameters(messageParameters.getQueryParameters());
     }
 
     private static <P extends MessageParameter<?>> Map<Class<? extends P>, P> 
mapParameters(
-            Collection<P> pathParameters) {
+            Collection<P> parameters) {
         final Map<Class<? extends P>, P> mappedParameters = new HashMap<>(2);
 
-        for (P pathParameter : pathParameters) {
-            if (pathParameter.isResolved()) {
+        for (P parameter : parameters) {
+            if (parameter.isResolved()) {
                 @SuppressWarnings("unchecked")
-                Class<P> clazz = (Class<P>) pathParameter.getClass();
-                mappedParameters.put(clazz, pathParameter);
+                Class<P> clazz = (Class<P>) parameter.getClass();
+                mappedParameters.put(clazz, parameter);
             }
         }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest.java
index e23ae01..411867f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest.java
@@ -223,18 +223,19 @@ public class AbstractAsynchronousOperationHandlersTest 
extends TestLogger {
     }
 
     private static HandlerRequest<EmptyRequestBody, EmptyMessageParameters>
-            triggerOperationRequest() throws HandlerRequestException {
-        return new HandlerRequest<>(
+            triggerOperationRequest() {
+        return HandlerRequest.create(
                 EmptyRequestBody.getInstance(), 
EmptyMessageParameters.getInstance());
     }
 
     private static HandlerRequest<EmptyRequestBody, TriggerMessageParameters>
             statusOperationRequest(TriggerId triggerId) throws 
HandlerRequestException {
-        return new HandlerRequest<>(
+        return HandlerRequest.resolveParametersAndCreate(
                 EmptyRequestBody.getInstance(),
                 new TriggerMessageParameters(),
                 Collections.singletonMap(TriggerIdPathParameter.KEY, 
triggerId.toString()),
-                Collections.emptyMap());
+                Collections.emptyMap(),
+                Collections.emptyList());
     }
 
     private static final class TestOperationKey extends OperationKey {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/cluster/JobManagerCustomLogHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/cluster/JobManagerCustomLogHandlerTest.java
index 414a17c..0392c8d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/cluster/JobManagerCustomLogHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/cluster/JobManagerCustomLogHandlerTest.java
@@ -96,11 +96,12 @@ public class JobManagerCustomLogHandlerTest extends 
TestLogger {
         Map<String, String> pathParameters = new HashMap<>();
         
pathParameters.put(messageParameters.logFileNamePathParameter.getKey(), path);
 
-        return new HandlerRequest<>(
+        return HandlerRequest.resolveParametersAndCreate(
                 EmptyRequestBody.getInstance(),
                 messageParameters,
                 pathParameters,
-                Collections.emptyMap());
+                Collections.emptyMap(),
+                Collections.emptyList());
     }
 
     @Test
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/cluster/JobManagerLogListHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/cluster/JobManagerLogListHandlerTest.java
index 44bb7c4..143d717 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/cluster/JobManagerLogListHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/cluster/JobManagerLogListHandlerTest.java
@@ -66,11 +66,10 @@ public class JobManagerLogListHandlerTest extends 
TestLogger {
     @BeforeClass
     public static void setupClass() throws HandlerRequestException {
         testRequest =
-                new HandlerRequest<>(
+                HandlerRequest.create(
                         EmptyRequestBody.getInstance(),
                         EmptyMessageParameters.getInstance(),
-                        Collections.emptyMap(),
-                        Collections.emptyMap());
+                        Collections.emptyList());
     }
 
     @Before
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandlerTest.java
index a03a414..4e13cff 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandlerTest.java
@@ -96,10 +96,11 @@ public class JobConfigHandlerTest extends TestLogger {
         final Map<String, String> pathParameters = new HashMap<>();
         pathParameters.put(JobIDPathParameter.KEY, jobId.toString());
 
-        return new HandlerRequest<>(
+        return HandlerRequest.resolveParametersAndCreate(
                 EmptyRequestBody.getInstance(),
                 new JobMessageParameters(),
                 pathParameters,
-                Collections.emptyMap());
+                Collections.emptyMap(),
+                Collections.emptyList());
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
index 0012a6b..154c55c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
@@ -377,11 +377,12 @@ public class JobExceptionsHandlerTest extends TestLogger {
         final Map<String, List<String>> queryParameters = new HashMap<>();
         queryParameters.put(UpperLimitExceptionParameter.KEY, 
Collections.singletonList("" + size));
 
-        return new HandlerRequest<>(
+        return HandlerRequest.resolveParametersAndCreate(
                 EmptyRequestBody.getInstance(),
                 new JobExceptionsMessageParameters(),
                 pathParameters,
-                queryParameters);
+                queryParameters,
+                Collections.emptyList());
     }
 
     private static RootExceptionHistoryEntry fromGlobalFailure(Throwable 
cause, long timestamp) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandlerTest.java
index 0f875c8..46ff2c2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandlerTest.java
@@ -73,11 +73,12 @@ public class JobExecutionResultHandlerTest extends 
TestLogger {
                         Collections.emptyMap());
 
         testRequest =
-                new HandlerRequest<>(
+                HandlerRequest.resolveParametersAndCreate(
                         EmptyRequestBody.getInstance(),
                         new JobMessageParameters(),
                         Collections.singletonMap("jobid", 
TEST_JOB_ID.toString()),
-                        Collections.emptyMap());
+                        Collections.emptyMap(),
+                        Collections.emptyList());
     }
 
     @Test
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
index 4c5b190..432ae30 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
@@ -128,7 +128,7 @@ public class JobSubmitHandlerTest extends TestLogger {
 
         try {
             handler.handleRequest(
-                    new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance()),
+                    HandlerRequest.create(request, 
EmptyMessageParameters.getInstance()),
                     mockGateway);
             Assert.fail();
         } catch (RestHandlerException rhe) {
@@ -165,11 +165,9 @@ public class JobSubmitHandlerTest extends TestLogger {
                         Collections.emptyList());
 
         handler.handleRequest(
-                        new HandlerRequest<>(
+                        HandlerRequest.create(
                                 request,
                                 EmptyMessageParameters.getInstance(),
-                                Collections.emptyMap(),
-                                Collections.emptyMap(),
                                 Collections.singleton(jobGraphFile.toFile())),
                         mockGateway)
                 .get();
@@ -206,11 +204,9 @@ public class JobSubmitHandlerTest extends TestLogger {
 
         try {
             handler.handleRequest(
-                            new HandlerRequest<>(
+                            HandlerRequest.create(
                                     request,
                                     EmptyMessageParameters.getInstance(),
-                                    Collections.emptyMap(),
-                                    Collections.emptyMap(),
                                     Arrays.asList(
                                             jobGraphFile.toFile(), 
countExceedingFile.toFile())),
                             mockGateway)
@@ -269,11 +265,9 @@ public class JobSubmitHandlerTest extends TestLogger {
                                         dcEntryName, 
artifactFile.getFileName().toString())));
 
         handler.handleRequest(
-                        new HandlerRequest<>(
+                        HandlerRequest.create(
                                 request,
                                 EmptyMessageParameters.getInstance(),
-                                Collections.emptyMap(),
-                                Collections.emptyMap(),
                                 Arrays.asList(
                                         jobGraphFile.toFile(),
                                         jarFile.toFile(),
@@ -322,11 +316,9 @@ public class JobSubmitHandlerTest extends TestLogger {
 
         try {
             handler.handleRequest(
-                            new HandlerRequest<>(
+                            HandlerRequest.create(
                                     request,
                                     EmptyMessageParameters.getInstance(),
-                                    Collections.emptyMap(),
-                                    Collections.emptyMap(),
                                     
Collections.singletonList(jobGraphFile.toFile())),
                             mockGateway)
                     .get();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
index faca126..9f8bed9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
@@ -148,11 +148,12 @@ public class JobVertexBackPressureHandlerTest {
         pathParameters.put(JobVertexIdPathParameter.KEY, 
TEST_JOB_VERTEX_ID.toString());
 
         final HandlerRequest<EmptyRequestBody, JobVertexMessageParameters> 
request =
-                new HandlerRequest<>(
+                HandlerRequest.resolveParametersAndCreate(
                         EmptyRequestBody.getInstance(),
                         new JobVertexMessageParameters(),
                         pathParameters,
-                        Collections.emptyMap());
+                        Collections.emptyMap(),
+                        Collections.emptyList());
 
         final CompletableFuture<JobVertexBackPressureInfo>
                 jobVertexBackPressureInfoCompletableFuture =
@@ -202,11 +203,12 @@ public class JobVertexBackPressureHandlerTest {
         pathParameters.put(JobVertexIdPathParameter.KEY, new 
JobVertexID().toString());
 
         final HandlerRequest<EmptyRequestBody, JobVertexMessageParameters> 
request =
-                new HandlerRequest<>(
+                HandlerRequest.resolveParametersAndCreate(
                         EmptyRequestBody.getInstance(),
                         new JobVertexMessageParameters(),
                         pathParameters,
-                        Collections.emptyMap());
+                        Collections.emptyMap(),
+                        Collections.emptyList());
 
         final CompletableFuture<JobVertexBackPressureInfo>
                 jobVertexBackPressureInfoCompletableFuture =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
index 0f9ec36..5b3d66a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -135,11 +135,12 @@ public class SubtaskCurrentAttemptDetailsHandlerTest 
extends TestLogger {
         receivedPathParameters.put(JobVertexIdPathParameter.KEY, 
jobVertexID.toString());
 
         final HandlerRequest<EmptyRequestBody, SubtaskMessageParameters> 
request =
-                new HandlerRequest<>(
+                HandlerRequest.resolveParametersAndCreate(
                         EmptyRequestBody.getInstance(),
                         new SubtaskMessageParameters(),
                         receivedPathParameters,
-                        Collections.emptyMap());
+                        Collections.emptyMap(),
+                        Collections.emptyList());
 
         // Handle request.
         final SubtaskExecutionAttemptDetailsInfo detailsInfo =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
index c8cfd67..e2ba1f2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
@@ -72,7 +72,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandlerTest 
extends TestLogger {
 
         // Instance a empty request.
         final HandlerRequest<EmptyRequestBody, 
SubtaskAttemptMessageParameters> request =
-                new HandlerRequest<>(
+                HandlerRequest.create(
                         EmptyRequestBody.getInstance(), new 
SubtaskAttemptMessageParameters());
 
         final Map<String, OptionalFailure<Accumulator<?, ?>>> userAccumulators 
= new HashMap<>(3);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
index 8243e4d..f49d9d8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -139,11 +139,12 @@ public class SubtaskExecutionAttemptDetailsHandlerTest 
extends TestLogger {
         receivedPathParameters.put(SubtaskAttemptPathParameter.KEY, 
Integer.toString(attempt));
 
         final HandlerRequest<EmptyRequestBody, 
SubtaskAttemptMessageParameters> request =
-                new HandlerRequest<>(
+                HandlerRequest.resolveParametersAndCreate(
                         EmptyRequestBody.getInstance(),
                         new SubtaskAttemptMessageParameters(),
                         receivedPathParameters,
-                        Collections.emptyMap());
+                        Collections.emptyMap(),
+                        Collections.emptyList());
 
         // Handle request.
         final SubtaskExecutionAttemptDetailsInfo detailsInfo =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandlerTest.java
index 04b5f5c..c9f5c0b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandlerTest.java
@@ -102,11 +102,10 @@ public class AbstractMetricsHandlerTest extends 
TestLogger {
     public void testListMetrics() throws Exception {
         final CompletableFuture<MetricCollectionResponseBody> 
completableFuture =
                 testMetricsHandler.handleRequest(
-                        new HandlerRequest<>(
+                        HandlerRequest.create(
                                 EmptyRequestBody.getInstance(),
                                 new TestMessageParameters(),
-                                Collections.emptyMap(),
-                                Collections.emptyMap()),
+                                Collections.emptyList()),
                         mockDispatcherGateway);
 
         assertTrue(completableFuture.isDone());
@@ -125,11 +124,10 @@ public class AbstractMetricsHandlerTest extends 
TestLogger {
 
         final CompletableFuture<MetricCollectionResponseBody> 
completableFuture =
                 testMetricsHandler.handleRequest(
-                        new HandlerRequest<>(
+                        HandlerRequest.create(
                                 EmptyRequestBody.getInstance(),
                                 new TestMessageParameters(),
-                                Collections.emptyMap(),
-                                Collections.emptyMap()),
+                                Collections.emptyList()),
                         mockDispatcherGateway);
 
         assertTrue(completableFuture.isDone());
@@ -142,13 +140,14 @@ public class AbstractMetricsHandlerTest extends 
TestLogger {
     public void testGetMetrics() throws Exception {
         final CompletableFuture<MetricCollectionResponseBody> 
completableFuture =
                 testMetricsHandler.handleRequest(
-                        new HandlerRequest<>(
+                        HandlerRequest.resolveParametersAndCreate(
                                 EmptyRequestBody.getInstance(),
                                 new TestMessageParameters(),
                                 Collections.emptyMap(),
                                 Collections.singletonMap(
                                         METRICS_FILTER_QUERY_PARAM,
-                                        
Collections.singletonList(TEST_METRIC_NAME))),
+                                        
Collections.singletonList(TEST_METRIC_NAME)),
+                                Collections.emptyList()),
                         mockDispatcherGateway);
 
         assertTrue(completableFuture.isDone());
@@ -165,13 +164,14 @@ public class AbstractMetricsHandlerTest extends 
TestLogger {
     public void testReturnEmptyListIfRequestedMetricIsUnknown() throws 
Exception {
         final CompletableFuture<MetricCollectionResponseBody> 
completableFuture =
                 testMetricsHandler.handleRequest(
-                        new HandlerRequest<>(
+                        HandlerRequest.resolveParametersAndCreate(
                                 EmptyRequestBody.getInstance(),
                                 new TestMessageParameters(),
                                 Collections.emptyMap(),
                                 Collections.singletonMap(
                                         METRICS_FILTER_QUERY_PARAM,
-                                        
Collections.singletonList("unknown_metric"))),
+                                        
Collections.singletonList("unknown_metric")),
+                                Collections.emptyList()),
                         mockDispatcherGateway);
 
         assertTrue(completableFuture.isDone());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
index ead98be..bef4d7e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
@@ -126,11 +126,12 @@ public abstract class AggregatingMetricsHandlerTestBase<
     public void getStores() throws Exception {
         { // test without filter
             HandlerRequest<EmptyRequestBody, P> request =
-                    new HandlerRequest<>(
+                    HandlerRequest.resolveParametersAndCreate(
                             EmptyRequestBody.getInstance(),
                             
handler.getMessageHeaders().getUnresolvedMessageParameters(),
                             pathParameters,
-                            Collections.emptyMap());
+                            Collections.emptyMap(),
+                            Collections.emptyList());
             Collection<? extends MetricStore.ComponentMetricStore> subStores =
                     handler.getStores(store, request);
 
@@ -165,11 +166,12 @@ public abstract class AggregatingMetricsHandlerTestBase<
             Map<String, List<String>> queryParameters = new HashMap<>(4);
             queryParameters.put(filter.f0, filter.f1);
             HandlerRequest<EmptyRequestBody, P> request =
-                    new HandlerRequest<>(
+                    HandlerRequest.resolveParametersAndCreate(
                             EmptyRequestBody.getInstance(),
                             
handler.getMessageHeaders().getUnresolvedMessageParameters(),
                             pathParameters,
-                            queryParameters);
+                            queryParameters,
+                            Collections.emptyList());
             Collection<? extends MetricStore.ComponentMetricStore> subStores =
                     handler.getStores(store, request);
 
@@ -202,11 +204,12 @@ public abstract class AggregatingMetricsHandlerTestBase<
     @Test
     public void testListMetrics() throws Exception {
         HandlerRequest<EmptyRequestBody, P> request =
-                new HandlerRequest<>(
+                HandlerRequest.resolveParametersAndCreate(
                         EmptyRequestBody.getInstance(),
                         
handler.getMessageHeaders().getUnresolvedMessageParameters(),
                         pathParameters,
-                        Collections.emptyMap());
+                        Collections.emptyMap(),
+                        Collections.emptyList());
 
         AggregatedMetricsResponseBody response =
                 handler.handleRequest(request, MOCK_DISPATCHER_GATEWAY).get();
@@ -229,11 +232,12 @@ public abstract class AggregatingMetricsHandlerTestBase<
         queryParams.put("agg", Collections.singletonList("min"));
 
         HandlerRequest<EmptyRequestBody, P> request =
-                new HandlerRequest<>(
+                HandlerRequest.resolveParametersAndCreate(
                         EmptyRequestBody.getInstance(),
                         
handler.getMessageHeaders().getUnresolvedMessageParameters(),
                         pathParameters,
-                        queryParams);
+                        queryParams,
+                        Collections.emptyList());
 
         AggregatedMetricsResponseBody response =
                 handler.handleRequest(request, MOCK_DISPATCHER_GATEWAY).get();
@@ -257,11 +261,12 @@ public abstract class AggregatingMetricsHandlerTestBase<
         queryParams.put("agg", Collections.singletonList("max"));
 
         HandlerRequest<EmptyRequestBody, P> request =
-                new HandlerRequest<>(
+                HandlerRequest.resolveParametersAndCreate(
                         EmptyRequestBody.getInstance(),
                         
handler.getMessageHeaders().getUnresolvedMessageParameters(),
                         pathParameters,
-                        queryParams);
+                        queryParams,
+                        Collections.emptyList());
 
         AggregatedMetricsResponseBody response =
                 handler.handleRequest(request, MOCK_DISPATCHER_GATEWAY).get();
@@ -285,11 +290,12 @@ public abstract class AggregatingMetricsHandlerTestBase<
         queryParams.put("agg", Collections.singletonList("sum"));
 
         HandlerRequest<EmptyRequestBody, P> request =
-                new HandlerRequest<>(
+                HandlerRequest.resolveParametersAndCreate(
                         EmptyRequestBody.getInstance(),
                         
handler.getMessageHeaders().getUnresolvedMessageParameters(),
                         pathParameters,
-                        queryParams);
+                        queryParams,
+                        Collections.emptyList());
 
         AggregatedMetricsResponseBody response =
                 handler.handleRequest(request, MOCK_DISPATCHER_GATEWAY).get();
@@ -313,11 +319,12 @@ public abstract class AggregatingMetricsHandlerTestBase<
         queryParams.put("agg", Collections.singletonList("avg"));
 
         HandlerRequest<EmptyRequestBody, P> request =
-                new HandlerRequest<>(
+                HandlerRequest.resolveParametersAndCreate(
                         EmptyRequestBody.getInstance(),
                         
handler.getMessageHeaders().getUnresolvedMessageParameters(),
                         pathParameters,
-                        queryParams);
+                        queryParams,
+                        Collections.emptyList());
 
         AggregatedMetricsResponseBody response =
                 handler.handleRequest(request, MOCK_DISPATCHER_GATEWAY).get();
@@ -341,11 +348,12 @@ public abstract class AggregatingMetricsHandlerTestBase<
         queryParams.put("agg", Arrays.asList("min", "max", "avg"));
 
         HandlerRequest<EmptyRequestBody, P> request =
-                new HandlerRequest<>(
+                HandlerRequest.resolveParametersAndCreate(
                         EmptyRequestBody.getInstance(),
                         
handler.getMessageHeaders().getUnresolvedMessageParameters(),
                         pathParameters,
-                        queryParams);
+                        queryParams,
+                        Collections.emptyList());
 
         AggregatedMetricsResponseBody response =
                 handler.handleRequest(request, MOCK_DISPATCHER_GATEWAY).get();
@@ -368,11 +376,12 @@ public abstract class AggregatingMetricsHandlerTestBase<
         queryParams.put("get", Collections.singletonList("abc.metric1"));
 
         HandlerRequest<EmptyRequestBody, P> request =
-                new HandlerRequest<>(
+                HandlerRequest.resolveParametersAndCreate(
                         EmptyRequestBody.getInstance(),
                         
handler.getMessageHeaders().getUnresolvedMessageParameters(),
                         pathParameters,
-                        queryParams);
+                        queryParams,
+                        Collections.emptyList());
 
         AggregatedMetricsResponseBody response =
                 handler.handleRequest(request, MOCK_DISPATCHER_GATEWAY).get();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexWatermarksHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexWatermarksHandlerTest.java
index 288a988..7e48114 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexWatermarksHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexWatermarksHandlerTest.java
@@ -96,11 +96,12 @@ public class JobVertexWatermarksHandlerTest {
         pathParameters.put(JobVertexIdPathParameter.KEY, 
TEST_VERTEX_ID.toString());
 
         request =
-                new HandlerRequest<>(
+                HandlerRequest.resolveParametersAndCreate(
                         EmptyRequestBody.getInstance(),
                         new JobVertexMessageParameters(),
                         pathParameters,
-                        Collections.emptyMap());
+                        Collections.emptyMap(),
+                        Collections.emptyList());
 
         vertex = Mockito.mock(AccessExecutionJobVertex.class);
         Mockito.when(vertex.getJobVertexId()).thenReturn(TEST_VERTEX_ID);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/MetricsHandlerTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/MetricsHandlerTestBase.java
index 95cf45a..dd9fe02 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/MetricsHandlerTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/MetricsHandlerTestBase.java
@@ -101,11 +101,12 @@ public abstract class MetricsHandlerTestBase<T extends 
AbstractMetricsHandler> e
         @SuppressWarnings("unchecked")
         final CompletableFuture<MetricCollectionResponseBody> 
completableFuture =
                 metricsHandler.handleRequest(
-                        new HandlerRequest<>(
+                        HandlerRequest.resolveParametersAndCreate(
                                 EmptyRequestBody.getInstance(),
                                 
metricsHandler.getMessageHeaders().getUnresolvedMessageParameters(),
                                 pathParameters,
-                                Collections.emptyMap()),
+                                Collections.emptyMap(),
+                                Collections.emptyList()),
                         mockDispatcherGateway);
 
         assertTrue(completableFuture.isDone());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlersTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlersTest.java
index 3f0cd91..0187731 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlersTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlersTest.java
@@ -212,11 +212,12 @@ public class SavepointHandlersTest extends TestLogger {
 
     private static HandlerRequest<SavepointTriggerRequestBody, 
SavepointTriggerMessageParameters>
             triggerSavepointRequest(final String targetDirectory) throws 
HandlerRequestException {
-        return new HandlerRequest<>(
+        return HandlerRequest.resolveParametersAndCreate(
                 new SavepointTriggerRequestBody(targetDirectory, false),
                 new SavepointTriggerMessageParameters(),
                 Collections.singletonMap(JobIDPathParameter.KEY, 
JOB_ID.toString()),
-                Collections.emptyMap());
+                Collections.emptyMap(),
+                Collections.emptyList());
     }
 
     private static HandlerRequest<EmptyRequestBody, 
SavepointStatusMessageParameters>
@@ -225,10 +226,11 @@ public class SavepointHandlersTest extends TestLogger {
         pathParameters.put(JobIDPathParameter.KEY, JOB_ID.toString());
         pathParameters.put(TriggerIdPathParameter.KEY, triggerId.toString());
 
-        return new HandlerRequest<>(
+        return HandlerRequest.resolveParametersAndCreate(
                 EmptyRequestBody.getInstance(),
                 new SavepointStatusMessageParameters(),
                 pathParameters,
-                Collections.emptyMap());
+                Collections.emptyMap(),
+                Collections.emptyList());
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/StopWithSavepointHandlersTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/StopWithSavepointHandlersTest.java
index f7730a8..aa18350 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/StopWithSavepointHandlersTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/StopWithSavepointHandlersTest.java
@@ -217,11 +217,12 @@ public class StopWithSavepointHandlersTest extends 
TestLogger {
 
     private static HandlerRequest<StopWithSavepointRequestBody, 
SavepointTriggerMessageParameters>
             triggerSavepointRequest(final String targetDirectory) throws 
HandlerRequestException {
-        return new HandlerRequest<>(
+        return HandlerRequest.resolveParametersAndCreate(
                 new StopWithSavepointRequestBody(targetDirectory, false),
                 new SavepointTriggerMessageParameters(),
                 Collections.singletonMap(JobIDPathParameter.KEY, 
JOB_ID.toString()),
-                Collections.emptyMap());
+                Collections.emptyMap(),
+                Collections.emptyList());
     }
 
     private static HandlerRequest<EmptyRequestBody, 
SavepointStatusMessageParameters>
@@ -230,10 +231,11 @@ public class StopWithSavepointHandlersTest extends 
TestLogger {
         pathParameters.put(JobIDPathParameter.KEY, JOB_ID.toString());
         pathParameters.put(TriggerIdPathParameter.KEY, triggerId.toString());
 
-        return new HandlerRequest<>(
+        return HandlerRequest.resolveParametersAndCreate(
                 EmptyRequestBody.getInstance(),
                 new SavepointStatusMessageParameters(),
                 pathParameters,
-                Collections.emptyMap());
+                Collections.emptyMap(),
+                Collections.emptyList());
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandlerTest.java
index 86de2b9..4f8608c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandlerTest.java
@@ -124,13 +124,14 @@ public class AbstractTaskManagerFileHandlerTest extends 
TestLogger {
         blobServer = new BlobServer(configuration, new VoidBlobStore());
 
         handlerRequest =
-                new HandlerRequest<>(
+                HandlerRequest.resolveParametersAndCreate(
                         EmptyRequestBody.getInstance(),
                         new TaskManagerFileMessageParameters(),
                         Collections.singletonMap(
                                 TaskManagerIdPathParameter.KEY,
                                 
EXPECTED_TASK_MANAGER_ID.getResourceIdString()),
-                        Collections.emptyMap());
+                        Collections.emptyMap(),
+                        Collections.emptyList());
     }
 
     @Before
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
index d6a1887..565c3a1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
@@ -183,11 +183,12 @@ public class TaskManagerDetailsHandlerTest extends 
TestLogger {
         Map<String, String> pathParameters = new HashMap<>();
         pathParameters.put(TaskManagerIdPathParameter.KEY, 
TASK_MANAGER_ID.toString());
 
-        return new HandlerRequest<>(
+        return HandlerRequest.resolveParametersAndCreate(
                 EmptyRequestBody.getInstance(),
                 new TaskManagerFileMessageParameters(),
                 pathParameters,
-                Collections.emptyMap());
+                Collections.emptyMap(),
+                Collections.emptyList());
     }
 
     private static class TestingMetricFetcher implements MetricFetcher {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerLogListHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerLogListHandlerTest.java
index 8237db9..48e063d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerLogListHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerLogListHandlerTest.java
@@ -119,10 +119,11 @@ public class TaskManagerLogListHandlerTest extends 
TestLogger {
         pathParameters.put(TaskManagerIdPathParameter.KEY, 
taskManagerId.toString());
         Map<String, List<String>> queryParameters = Collections.emptyMap();
 
-        return new HandlerRequest<>(
+        return HandlerRequest.resolveParametersAndCreate(
                 EmptyRequestBody.getInstance(),
                 new TaskManagerMessageParameters(),
                 pathParameters,
-                queryParameters);
+                queryParameters,
+                Collections.emptyList());
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/util/HandlerRequestUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/util/HandlerRequestUtilsTest.java
index 752a406..88c0b39 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/util/HandlerRequestUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/util/HandlerRequestUtilsTest.java
@@ -43,11 +43,12 @@ public class HandlerRequestUtilsTest extends TestLogger {
     public void testGetQueryParameter() throws Exception {
         final Boolean queryParameter =
                 HandlerRequestUtils.getQueryParameter(
-                        new HandlerRequest<>(
+                        HandlerRequest.resolveParametersAndCreate(
                                 EmptyRequestBody.getInstance(),
                                 new TestMessageParameters(),
                                 Collections.emptyMap(),
-                                Collections.singletonMap("key", 
Collections.singletonList("true"))),
+                                Collections.singletonMap("key", 
Collections.singletonList("true")),
+                                Collections.emptyList()),
                         TestBooleanQueryParameter.class);
         assertThat(queryParameter, equalTo(true));
     }
@@ -56,11 +57,12 @@ public class HandlerRequestUtilsTest extends TestLogger {
     public void testGetQueryParameterRepeated() throws Exception {
         try {
             HandlerRequestUtils.getQueryParameter(
-                    new HandlerRequest<>(
+                    HandlerRequest.resolveParametersAndCreate(
                             EmptyRequestBody.getInstance(),
                             new TestMessageParameters(),
                             Collections.emptyMap(),
-                            Collections.singletonMap("key", 
Arrays.asList("true", "false"))),
+                            Collections.singletonMap("key", 
Arrays.asList("true", "false")),
+                            Collections.emptyList()),
                     TestBooleanQueryParameter.class);
         } catch (final RestHandlerException e) {
             assertThat(e.getMessage(), containsString("Expected only one 
value"));
@@ -71,11 +73,12 @@ public class HandlerRequestUtilsTest extends TestLogger {
     public void testGetQueryParameterDefaultValue() throws Exception {
         final Boolean allowNonRestoredState =
                 HandlerRequestUtils.getQueryParameter(
-                        new HandlerRequest<>(
+                        HandlerRequest.resolveParametersAndCreate(
                                 EmptyRequestBody.getInstance(),
                                 new TestMessageParameters(),
                                 Collections.emptyMap(),
-                                Collections.singletonMap("key", 
Collections.emptyList())),
+                                Collections.singletonMap("key", 
Collections.emptyList()),
+                                Collections.emptyList()),
                         TestBooleanQueryParameter.class,
                         true);
         assertThat(allowNonRestoredState, equalTo(true));

Reply via email to