This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 14ae6e5df54d0855e414270c5c107839d0ee1c76
Author: zentol <[email protected]>
AuthorDate: Wed Aug 22 12:04:29 2018 +0200

    [FLINK-7551][rest] Add versioning
---
 ...est_dispatcher.html => rest_v1_dispatcher.html} |   0
 docs/monitoring/rest_api.md                        |  51 +++--
 .../flink/docs/rest/RestAPIDocGenerator.java       |  18 +-
 .../HistoryServerStaticFileServerHandler.java      |   2 +-
 .../HistoryServerStaticFileServerHandlerTest.java  |   8 +-
 .../org/apache/flink/runtime/rest/RestClient.java  |  32 ++-
 .../flink/runtime/rest/RestServerEndpoint.java     |  44 +++-
 .../rest/handler/RestHandlerSpecification.java     |  13 ++
 .../runtime/rest/versioning/RestAPIVersion.java    |  98 ++++++++
 .../apache/flink/runtime/rest/RestClientTest.java  |  28 ++-
 .../runtime/rest/RestServerEndpointITCase.java     | 251 +++++++++++++++++++++
 .../rest/versioning/RestAPIVersionTest.java        |  52 +++++
 12 files changed, 561 insertions(+), 36 deletions(-)

diff --git a/docs/_includes/generated/rest_dispatcher.html 
b/docs/_includes/generated/rest_v1_dispatcher.html
similarity index 100%
rename from docs/_includes/generated/rest_dispatcher.html
rename to docs/_includes/generated/rest_v1_dispatcher.html
diff --git a/docs/monitoring/rest_api.md b/docs/monitoring/rest_api.md
index eefc8b9..85ec0ac 100644
--- a/docs/monitoring/rest_api.md
+++ b/docs/monitoring/rest_api.md
@@ -52,13 +52,26 @@ To add new requests, one needs to
 A good example is the 
`org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler` that uses the 
`org.apache.flink.runtime.rest.messages.JobExceptionsHeaders`.
 
 
-## Available Requests
+## API
 
-### Dispatcher
+The REST API is versioned, with specific versions being queryable by prefixing 
the url with the version prefix. Prefixes are always of the form 
`v[version_number]`.
+For example, to access version 1 of `/foo/bar` one would query `/v1/foo/bar`.
 
-{% include generated/rest_dispatcher.html %}
+If no version is specified Flink will default to the *oldest* version 
supporting the request.
 
-## Legacy
+Querying unsupported/non-existing versions will return a 404 error.
+
+<span class="label label-danger">Attention</span> REST API versioning is *not* 
active if the cluster runs in [legacy mode](../ops/config.html#mode). For this 
case please refer to the legacy API below.
+
+<div class="codetabs" markdown="1">
+
+<div data-lang="v1" markdown="1">
+#### Dispatcher
+
+{% include generated/rest_v1_dispatcher.html %}
+</div>
+
+<div data-lang="legacy" markdown="1">
 
 This section is only relevant if the cluster runs in [legacy 
mode](../ops/config.html#mode).
 
@@ -90,7 +103,7 @@ Values in angle brackets are variables, for example 
`http://hostname:8081/jobs/<
   - `/jars/:jarid/run`
 
 
-### General
+#### General
 
 **`/config`**
 
@@ -126,7 +139,7 @@ Sample Result:
 }
 {% endhighlight %}
 
-### Overview of Jobs
+#### Overview of Jobs
 
 **`/jobs/overview`**
 
@@ -163,7 +176,7 @@ Sample Result:
 }
 {% endhighlight %}
 
-### Details of a Running or Completed Job
+#### Details of a Running or Completed Job
 
 **`/jobs/<jobid>`**
 
@@ -573,15 +586,15 @@ Sample Result:
 }
 {% endhighlight %}
 
-### Job Cancellation
+#### Job Cancellation
 
-#### Cancel Job
+##### Cancel Job
 
 `DELETE` request to **`/jobs/:jobid/cancel`**.
 
 Triggers job cancellation, result on success is `{}`.
 
-#### Cancel Job with Savepoint
+##### Cancel Job with Savepoint
 
 Triggers a savepoint and cancels the job after the savepoint succeeds.
 
@@ -601,7 +614,7 @@ Sample Trigger Result:
 }
 {% endhighlight %}
 
-##### Monitoring Progress
+###### Monitoring Progress
 
 The progress of the cancellation has to be monitored by the user at
 
@@ -611,7 +624,7 @@ The progress of the cancellation has to be monitored by the 
user at
 
 The request ID is returned by the trigger result.
 
-###### In-Progress
+####### In-Progress
 
 {% highlight json %}
 {
@@ -620,7 +633,7 @@ The request ID is returned by the trigger result.
 }
 {% endhighlight %}
 
-###### Success
+####### Success
 
 {% highlight json %}
 {
@@ -632,7 +645,7 @@ The request ID is returned by the trigger result.
 
 The `savepointPath` points to the external path of the savepoint, which can be 
used to resume the savepoint.
 
-###### Failed
+####### Failed
 
 {% highlight json %}
 {
@@ -642,11 +655,11 @@ The `savepointPath` points to the external path of the 
savepoint, which can be u
 }
 {% endhighlight %}
 
-### Submitting Programs
+#### Submitting Programs
 
 It is possible to upload, run, and list Flink programs via the REST APIs and 
web frontend.
 
-#### Upload a new JAR file
+##### Upload a new JAR file
 
 Send a `POST` request to `/jars/upload` with your jar file sent as multi-part 
data under the `jarfile` file.
 Also make sure that the multi-part data includes the `Content-Type` of the 
file itself, some http libraries do not add the header by default.
@@ -659,7 +672,7 @@ Content-Disposition: form-data; name="jarfile"; 
filename="YourFileName.jar"
 Content-Type: application/x-java-archive
 {% endhighlight %}
 
-#### Run a Program (POST)
+##### Run a Program (POST)
 
 Send a `POST` request to `/jars/:jarid/run`. The `jarid` parameter is the file 
name of the program JAR in the configured web frontend upload directory 
(configuration key `web.upload.dir`).
 
@@ -688,3 +701,7 @@ Response:
 {% endhighlight %}
 
 {% top %}
+</div>
+
+</div>
+
diff --git 
a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java 
b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
index 82fdeec..4df1d6e 100644
--- 
a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
+++ 
b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
@@ -37,6 +37,7 @@ import 
org.apache.flink.runtime.rest.messages.EmptyResponseBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessagePathParameter;
 import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
@@ -123,13 +124,24 @@ public class RestAPIDocGenerator {
        public static void main(String[] args) throws IOException {
                String outputDirectory = args[0];
 
-               createHtmlFile(new DocumentingDispatcherRestEndpoint(), 
Paths.get(outputDirectory, "rest_dispatcher.html"));
+               for (final RestAPIVersion apiVersion : RestAPIVersion.values()) 
{
+                       if (apiVersion == RestAPIVersion.V0) {
+                               // this version exists only for testing purposes
+                               continue;
+                       }
+                       createHtmlFile(
+                               new DocumentingDispatcherRestEndpoint(),
+                               apiVersion,
+                               Paths.get(outputDirectory, "rest_" + 
apiVersion.getURLVersionPrefix() + "_dispatcher.html"));
+               }
        }
 
-       private static void createHtmlFile(DocumentingRestEndpoint 
restEndpoint, Path outputFile) throws IOException {
+       private static void createHtmlFile(DocumentingRestEndpoint 
restEndpoint, RestAPIVersion apiVersion, Path outputFile) throws IOException {
                StringBuilder html = new StringBuilder();
 
-               List<MessageHeaders> specs = restEndpoint.getSpecs();
+               List<MessageHeaders> specs = restEndpoint.getSpecs().stream()
+                       .filter(spec -> 
spec.getSupportedAPIVersions().contains(apiVersion))
+                       .collect(Collectors.toList());
                specs.forEach(spec -> html.append(createHtmlEntry(spec)));
 
                Files.deleteIfExists(outputFile);
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
index 2042088..d8542d6 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
@@ -164,7 +164,7 @@ public class HistoryServerStaticFileServerHandler extends 
SimpleChannelInboundHa
                                                HandlerUtils.sendErrorResponse(
                                                        ctx,
                                                        request,
-                                                       new 
ErrorResponseBody(String.format("Unable to load requested file %s.", 
requestPath)),
+                                                       new 
ErrorResponseBody("File not found."),
                                                        NOT_FOUND,
                                                        Collections.emptyMap());
                                                return;
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java
index b08504d..19a3d52 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java
@@ -30,6 +30,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 
+import static org.hamcrest.CoreMatchers.containsString;
+
 /**
  * Tests for the HistoryServerStaticFileServerHandler.
  */
@@ -56,7 +58,7 @@ public class HistoryServerStaticFileServerHandlerTest {
                try {
                        // verify that 404 message is returned when requesting 
a non-existent file
                        String notFound404 = 
HistoryServerTest.getFromHTTP("http://localhost:"; + port + "/hello");
-                       Assert.assertTrue(notFound404.contains("404 Not 
Found"));
+                       Assert.assertThat(notFound404, containsString("not 
found"));
 
                        // verify that a) a file can be loaded using the 
ClassLoader and b) that the HistoryServer
                        // index_hs.html is injected
@@ -71,12 +73,12 @@ public class HistoryServerStaticFileServerHandlerTest {
                        File dir = new File(webDir, "dir.json");
                        dir.mkdirs();
                        String dirNotFound404 = 
HistoryServerTest.getFromHTTP("http://localhost:"; + port + "/dir");
-                       Assert.assertTrue(dirNotFound404.contains("404 Not 
Found"));
+                       Assert.assertTrue(dirNotFound404.contains("not found"));
 
                        // verify that a 404 message is returned when 
requesting a file outside the webDir
                        tmp.newFile("secret");
                        String x = 
HistoryServerTest.getFromHTTP("http://localhost:"; + port + "/../secret");
-                       Assert.assertTrue(x.contains("404 Not Found"));
+                       Assert.assertTrue(x.contains("not found"));
                } finally {
                        webUI.shutdown();
                }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 2e9de4c..a855749 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.util.RestClientException;
 import org.apache.flink.runtime.rest.util.RestConstants;
 import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
@@ -85,6 +86,7 @@ import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE;
 
@@ -173,6 +175,24 @@ public class RestClient {
                        U messageParameters,
                        R request,
                        Collection<FileUpload> fileUploads) throws IOException {
+               return sendRequest(
+                       targetAddress,
+                       targetPort,
+                       messageHeaders,
+                       messageParameters,
+                       request,
+                       fileUploads,
+                       
RestAPIVersion.getLatestVersion(messageHeaders.getSupportedAPIVersions()));
+       }
+
+       public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, 
R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(
+                       String targetAddress,
+                       int targetPort,
+                       M messageHeaders,
+                       U messageParameters,
+                       R request,
+                       Collection<FileUpload> fileUploads,
+                       RestAPIVersion apiVersion) throws IOException {
                Preconditions.checkNotNull(targetAddress);
                Preconditions.checkArgument(0 <= targetPort && targetPort < 
65536, "The target port " + targetPort + " is not in the range (0, 65536].");
                Preconditions.checkNotNull(messageHeaders);
@@ -181,7 +201,17 @@ public class RestClient {
                Preconditions.checkNotNull(fileUploads);
                Preconditions.checkState(messageParameters.isResolved(), 
"Message parameters were not resolved.");
 
-               String targetUrl = 
MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), 
messageParameters);
+               if 
(!messageHeaders.getSupportedAPIVersions().contains(apiVersion)) {
+                       throw new IllegalArgumentException(String.format(
+                               "The requested version %s is not supported by 
the request (method=%s URL=%s). Supported versions are: %s.",
+                               apiVersion,
+                               messageHeaders.getHttpMethod(),
+                               messageHeaders.getTargetRestEndpointURL(),
+                               
messageHeaders.getSupportedAPIVersions().stream().map(RestAPIVersion::getURLVersionPrefix).collect(Collectors.joining(","))));
+               }
+
+               String versionedHandlerURL = "/" + 
apiVersion.getURLVersionPrefix() + messageHeaders.getTargetRestEndpointURL();
+               String targetUrl = 
MessageParameters.resolveUrl(versionedHandlerURL, messageParameters);
 
                LOG.debug("Sending request of class {} to {}:{}{}", 
request.getClass(), targetAddress, targetPort, targetUrl);
                // serialize payload
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index e836e35..28af072 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.router.Router;
 import org.apache.flink.runtime.rest.handler.router.RouterHandler;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.Preconditions;
@@ -144,8 +145,7 @@ public abstract class RestServerEndpoint implements 
AutoCloseableAsync {
                                RestHandlerUrlComparator.INSTANCE);
 
                        handlers.forEach(handler -> {
-                               log.debug("Register handler {} under {}@{}.", 
handler.f1, handler.f0.getHttpMethod(), handler.f0.getTargetRestEndpointURL());
-                               registerHandler(router, handler);
+                               registerHandler(router, handler, log);
                        });
 
                        ChannelInitializer<SocketChannel> initializer = new 
ChannelInitializer<SocketChannel>() {
@@ -364,22 +364,37 @@ public abstract class RestServerEndpoint implements 
AutoCloseableAsync {
                }
        }
 
-       private static void registerHandler(Router router, 
Tuple2<RestHandlerSpecification, ChannelInboundHandler> specificationHandler) {
-               switch (specificationHandler.f0.getHttpMethod()) {
+       private static void registerHandler(Router router, 
Tuple2<RestHandlerSpecification, ChannelInboundHandler> specificationHandler, 
Logger log) {
+               final String handlerURL = 
specificationHandler.f0.getTargetRestEndpointURL();
+               // setup versioned urls
+               for (final RestAPIVersion supportedVersion : 
specificationHandler.f0.getSupportedAPIVersions()) {
+                       final String versionedHandlerURL = '/' + 
supportedVersion.getURLVersionPrefix() + handlerURL;
+                       log.debug("Register handler {} under {}@{}.", 
specificationHandler.f1, specificationHandler.f0.getHttpMethod(), 
versionedHandlerURL);
+                       registerHandler(router, versionedHandlerURL, 
specificationHandler.f0.getHttpMethod(), specificationHandler.f1);
+                       if (supportedVersion.isDefaultVersion()) {
+                               // setup unversioned url for convenience and 
backwards compatibility
+                               log.debug("Register handler {} under {}@{}.", 
specificationHandler.f1, specificationHandler.f0.getHttpMethod(), handlerURL);
+                               registerHandler(router, handlerURL, 
specificationHandler.f0.getHttpMethod(), specificationHandler.f1);
+                       }
+               }
+       }
+
+       private static void registerHandler(Router router, String handlerURL, 
HttpMethodWrapper httpMethod, ChannelInboundHandler handler) {
+               switch (httpMethod) {
                        case GET:
-                               
router.addGet(specificationHandler.f0.getTargetRestEndpointURL(), 
specificationHandler.f1);
+                               router.addGet(handlerURL, handler);
                                break;
                        case POST:
-                               
router.addPost(specificationHandler.f0.getTargetRestEndpointURL(), 
specificationHandler.f1);
+                               router.addPost(handlerURL, handler);
                                break;
                        case DELETE:
-                               
router.addDelete(specificationHandler.f0.getTargetRestEndpointURL(), 
specificationHandler.f1);
+                               router.addDelete(handlerURL, handler);
                                break;
                        case PATCH:
-                               
router.addPatch(specificationHandler.f0.getTargetRestEndpointURL(), 
specificationHandler.f1);
+                               router.addPatch(handlerURL, handler);
                                break;
                        default:
-                               throw new RuntimeException("Unsupported http 
method: " + specificationHandler.f0.getHttpMethod() + '.');
+                               throw new RuntimeException("Unsupported http 
method: " + httpMethod + '.');
                }
        }
 
@@ -437,13 +452,22 @@ public abstract class RestServerEndpoint implements 
AutoCloseableAsync {
 
                private static final Comparator<String> CASE_INSENSITIVE_ORDER 
= new CaseInsensitiveOrderComparator();
 
+               private static final Comparator<RestAPIVersion> 
API_VERSION_ORDER = new RestAPIVersion.RestAPIVersionComparator();
+
                static final RestHandlerUrlComparator INSTANCE = new 
RestHandlerUrlComparator();
 
                @Override
                public int compare(
                                Tuple2<RestHandlerSpecification, 
ChannelInboundHandler> o1,
                                Tuple2<RestHandlerSpecification, 
ChannelInboundHandler> o2) {
-                       return 
CASE_INSENSITIVE_ORDER.compare(o1.f0.getTargetRestEndpointURL(), 
o2.f0.getTargetRestEndpointURL());
+                       final int urlComparisonResult = 
CASE_INSENSITIVE_ORDER.compare(o1.f0.getTargetRestEndpointURL(), 
o2.f0.getTargetRestEndpointURL());
+                       if (urlComparisonResult != 0) {
+                               return urlComparisonResult;
+                       } else {
+                               return API_VERSION_ORDER.compare(
+                                       
Collections.min(o1.f0.getSupportedAPIVersions()),
+                                       
Collections.min(o2.f0.getSupportedAPIVersions()));
+                       }
                }
 
                /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java
index 4ebcd49..6561679 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java
@@ -19,6 +19,10 @@
 package org.apache.flink.runtime.rest.handler;
 
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
+
+import java.util.Collection;
+import java.util.Collections;
 
 /**
  * Rest handler interface which all rest handler implementation have to 
implement.
@@ -38,4 +42,13 @@ public interface RestHandlerSpecification {
         * @return endpoint url that this request should be sent to
         */
        String getTargetRestEndpointURL();
+
+       /**
+        * Returns the supported API versions that this request supports.
+        *
+        * @return Collection of supported API versions
+        */
+       default Collection<RestAPIVersion> getSupportedAPIVersions() {
+               return Collections.singleton(RestAPIVersion.V1);
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RestAPIVersion.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RestAPIVersion.java
new file mode 100644
index 0000000..d630563
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RestAPIVersion.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.versioning;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+
+/**
+ * An enum for all versions of the REST API.
+ *
+ * <p>REST API versions are global and thus apply to every REST component.
+ *
+ * <p>Changes that must result in an API version increment include but are not 
limited to:
+ * - modification of a handler url
+ * - addition of new mandatory parameters
+ * - removal of a handler/request
+ * - modifications to request/response bodies (excluding additions)
+ */
+public enum RestAPIVersion {
+       V0(0, false), // strictly for testing purposes
+       V1(1, true);
+
+       private final int versionNumber;
+
+       private final boolean isDefaultVersion;
+
+       RestAPIVersion(int versionNumber, boolean isDefaultVersion) {
+               this.versionNumber = versionNumber;
+               this.isDefaultVersion = isDefaultVersion;
+       }
+
+       /**
+        * Returns the URL version prefix (e.g. "v1") for this version.
+        *
+        * @return URL version prefix
+        */
+       public String getURLVersionPrefix() {
+               return name().toLowerCase();
+       }
+
+       /**
+        * Returns whether this version is the default REST API version.
+        *
+        * @return whether this version is the default
+        */
+       public boolean isDefaultVersion() {
+               return isDefaultVersion;
+       }
+
+       /**
+        * Converts the given URL version prefix (e.g "v1") to a {@link 
RestAPIVersion}.
+        *
+        * @param prefix prefix to converted
+        * @return REST API version matching the prefix
+        * @throws IllegalArgumentException if the prefix doesn't match any 
version
+        */
+       public static RestAPIVersion fromURLVersionPrefix(String prefix) {
+               return valueOf(prefix.toUpperCase());
+       }
+
+       /**
+        * Returns the latest version from the given collection.
+        *
+        * @param versions possible candidates
+        * @return latest version
+        */
+       public static RestAPIVersion 
getLatestVersion(Collection<RestAPIVersion> versions) {
+               return Collections.max(versions, new 
RestAPIVersionComparator());
+       }
+
+       /**
+        * Comparator for {@link RestAPIVersion} that sorts versions based on 
their version number, i.e. oldest to latest.
+        */
+       public static class RestAPIVersionComparator implements 
Comparator<RestAPIVersion> {
+
+               @Override
+               public int compare(RestAPIVersion o1, RestAPIVersion o2) {
+                       return Integer.compare(o1.versionNumber, 
o2.versionNumber);
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
index 209f2d1..22cd6f6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
@@ -25,14 +25,18 @@ import 
org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
@@ -45,12 +49,13 @@ import static org.junit.Assert.assertThat;
  */
 public class RestClientTest extends TestLogger {
 
+       private static final String unroutableIp = "10.255.255.1";
+
        @Test
        public void testConnectionTimeout() throws Exception {
                final Configuration config = new Configuration();
                config.setLong(RestOptions.CONNECTION_TIMEOUT, 1);
                final RestClient restClient = new 
RestClient(RestClientConfiguration.fromConfiguration(config), 
Executors.directExecutor());
-               final String unroutableIp = "10.255.255.1";
                try {
                        restClient.sendRequest(
                                unroutableIp,
@@ -66,6 +71,27 @@ public class RestClientTest extends TestLogger {
                }
        }
 
+       @Test
+       public void testInvalidVersionRejection() throws Exception {
+               final RestClient restClient = new 
RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), 
Executors.directExecutor());
+
+               try {
+                       CompletableFuture<EmptyResponseBody> 
invalidVersionResponse = restClient.sendRequest(
+                               unroutableIp,
+                               80,
+                               new TestMessageHeaders(),
+                               EmptyMessageParameters.getInstance(),
+                               EmptyRequestBody.getInstance(),
+                               Collections.emptyList(),
+                               RestAPIVersion.V0
+                       );
+                       Assert.fail("The request should have been rejected due 
to a version mismatch.");
+               } catch (IllegalArgumentException e) {
+                       // expected
+               }
+
+       }
+
        private static class TestMessageHeaders implements 
MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
 
                @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index 31f78e3..b017610 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -43,6 +43,7 @@ import 
org.apache.flink.runtime.rest.messages.MessageQueryParameter;
 import org.apache.flink.runtime.rest.messages.RequestBody;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.util.RestClientException;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
@@ -56,8 +57,12 @@ import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.TooLongFrameException;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
 import org.apache.commons.io.IOUtils;
 import org.junit.After;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -85,6 +90,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
@@ -184,6 +190,21 @@ public class RestServerEndpointITCase extends TestLogger {
                        mockGatewayRetriever,
                        RpcUtils.INF_TIMEOUT);
 
+               TestVersionHandler testVersionHandler = new TestVersionHandler(
+                       CompletableFuture.completedFuture(restAddress),
+                       mockGatewayRetriever,
+                       RpcUtils.INF_TIMEOUT);
+
+               TestVersionSelectionHandler1 testVersionSelectionHandler1 = new 
TestVersionSelectionHandler1(
+                       CompletableFuture.completedFuture(restAddress),
+                       mockGatewayRetriever,
+                       RpcUtils.INF_TIMEOUT);
+
+               TestVersionSelectionHandler2 testVersionSelectionHandler2 = new 
TestVersionSelectionHandler2(
+                       CompletableFuture.completedFuture(restAddress),
+                       mockGatewayRetriever,
+                       RpcUtils.INF_TIMEOUT);
+
                testUploadHandler = new TestUploadHandler(
                        CompletableFuture.completedFuture(restAddress),
                        mockGatewayRetriever,
@@ -198,6 +219,9 @@ public class RestServerEndpointITCase extends TestLogger {
                final List<Tuple2<RestHandlerSpecification, 
ChannelInboundHandler>> handlers = Arrays.asList(
                        Tuple2.of(new TestHeaders(), testHandler),
                        Tuple2.of(TestUploadHeaders.INSTANCE, 
testUploadHandler),
+                       Tuple2.of(testVersionHandler.getMessageHeaders(), 
testVersionHandler),
+                       
Tuple2.of(testVersionSelectionHandler1.getMessageHeaders(), 
testVersionSelectionHandler1),
+                       
Tuple2.of(testVersionSelectionHandler2.getMessageHeaders(), 
testVersionSelectionHandler2),
                        Tuple2.of(WebContentHandlerSpecification.getInstance(), 
staticFileServerHandler));
 
                serverEndpoint = new TestRestServerEndpoint(serverConfig, 
handlers);
@@ -415,6 +439,88 @@ public class RestServerEndpointITCase extends TestLogger {
                assertEquals("foobar", fileContents.trim());
        }
 
+       @Test
+       public void testVersioning() throws Exception {
+               CompletableFuture<EmptyResponseBody> unspecifiedVersionResponse 
= restClient.sendRequest(
+                       serverAddress.getHostName(),
+                       serverAddress.getPort(),
+                       TestVersionHeaders.INSTANCE,
+                       EmptyMessageParameters.getInstance(),
+                       EmptyRequestBody.getInstance(),
+                       Collections.emptyList()
+               );
+
+               unspecifiedVersionResponse.get(5, TimeUnit.SECONDS);
+
+               CompletableFuture<EmptyResponseBody> specifiedVersionResponse = 
restClient.sendRequest(
+                       serverAddress.getHostName(),
+                       serverAddress.getPort(),
+                       TestVersionHeaders.INSTANCE,
+                       EmptyMessageParameters.getInstance(),
+                       EmptyRequestBody.getInstance(),
+                       Collections.emptyList(),
+                       RestAPIVersion.V1
+               );
+
+               specifiedVersionResponse.get(5, TimeUnit.SECONDS);
+       }
+
+       @Test
+       public void testVersionSelection() throws Exception {
+               CompletableFuture<EmptyResponseBody> version1Response = 
restClient.sendRequest(
+                       serverAddress.getHostName(),
+                       serverAddress.getPort(),
+                       TestVersionSelectionHeaders1.INSTANCE,
+                       EmptyMessageParameters.getInstance(),
+                       EmptyRequestBody.getInstance(),
+                       Collections.emptyList(),
+                       RestAPIVersion.V0
+               );
+
+               try {
+                       version1Response.get(5, TimeUnit.SECONDS);
+                       fail();
+               } catch (ExecutionException ee) {
+                       RestClientException rce = (RestClientException) 
ee.getCause();
+                       assertEquals(HttpResponseStatus.OK, 
rce.getHttpResponseStatus());
+               }
+
+               CompletableFuture<EmptyResponseBody> version2Response = 
restClient.sendRequest(
+                       serverAddress.getHostName(),
+                       serverAddress.getPort(),
+                       TestVersionSelectionHeaders2.INSTANCE,
+                       EmptyMessageParameters.getInstance(),
+                       EmptyRequestBody.getInstance(),
+                       Collections.emptyList(),
+                       RestAPIVersion.V1
+               );
+
+               try {
+                       version2Response.get(5, TimeUnit.SECONDS);
+                       fail();
+               } catch (ExecutionException ee) {
+                       RestClientException rce = (RestClientException) 
ee.getCause();
+                       assertEquals(HttpResponseStatus.ACCEPTED, 
rce.getHttpResponseStatus());
+               }
+       }
+
+       @Test
+       public void testDefaultVersionRouting() throws Exception {
+               Assume.assumeFalse(
+                       "Ignoring SSL-enabled test to keep OkHttp usage 
simple.",
+                       config.getBoolean(SecurityOptions.SSL_REST_ENABLED));
+
+               OkHttpClient client = new OkHttpClient();
+
+               final Request request = new Request.Builder()
+                       .url(serverEndpoint.getRestBaseUrl() + 
TestVersionSelectionHeaders2.INSTANCE.getTargetRestEndpointURL())
+                       .build();
+
+               try (final Response response = 
client.newCall(request).execute()) {
+                       assertEquals(HttpResponseStatus.ACCEPTED.code(), 
response.code());
+               }
+       }
+
        private HttpURLConnection openHttpConnectionForUpload(final String 
boundary) throws IOException {
                final HttpURLConnection connection =
                        (HttpURLConnection) new 
URL(serverEndpoint.getRestBaseUrl() + "/upload").openConnection();
@@ -697,6 +803,151 @@ public class RestServerEndpointITCase extends TestLogger {
                }
        }
 
+       private static class TestVersionHandler extends 
AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, 
EmptyMessageParameters> {
+
+               private TestVersionHandler(
+                       final CompletableFuture<String> localRestAddress,
+                       final GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
+                       final Time timeout) {
+                       super(localRestAddress, leaderRetriever, timeout, 
Collections.emptyMap(), TestVersionHeaders.INSTANCE);
+               }
+
+               @Override
+               protected CompletableFuture<EmptyResponseBody> 
handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> 
request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
+                       return 
CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
+               }
+       }
+
+       private enum TestVersionHeaders implements 
MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+               INSTANCE;
+
+               @Override
+               public Class<EmptyRequestBody> getRequestClass() {
+                       return EmptyRequestBody.class;
+               }
+
+               @Override
+               public HttpMethodWrapper getHttpMethod() {
+                       return HttpMethodWrapper.GET;
+               }
+
+               @Override
+               public String getTargetRestEndpointURL() {
+                       return "/test/versioning";
+               }
+
+               @Override
+               public Class<EmptyResponseBody> getResponseClass() {
+                       return EmptyResponseBody.class;
+               }
+
+               @Override
+               public HttpResponseStatus getResponseStatusCode() {
+                       return HttpResponseStatus.OK;
+               }
+
+               @Override
+               public String getDescription() {
+                       return null;
+               }
+
+               @Override
+               public EmptyMessageParameters getUnresolvedMessageParameters() {
+                       return EmptyMessageParameters.getInstance();
+               }
+
+               @Override
+               public Collection<RestAPIVersion> getSupportedAPIVersions() {
+                       return Collections.singleton(RestAPIVersion.V1);
+               }
+       }
+
+       private interface TestVersionSelectionHeadersBase extends 
MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+
+               @Override
+               default Class<EmptyRequestBody> getRequestClass() {
+                       return EmptyRequestBody.class;
+               }
+
+               @Override
+               default HttpMethodWrapper getHttpMethod() {
+                       return HttpMethodWrapper.GET;
+               }
+
+               @Override
+               default String getTargetRestEndpointURL() {
+                       return "/test/select-version";
+               }
+
+               @Override
+               default Class<EmptyResponseBody> getResponseClass() {
+                       return EmptyResponseBody.class;
+               }
+
+               @Override
+               default HttpResponseStatus getResponseStatusCode() {
+                       return HttpResponseStatus.OK;
+               }
+
+               @Override
+               default String getDescription() {
+                       return null;
+               }
+
+               @Override
+               default EmptyMessageParameters getUnresolvedMessageParameters() 
{
+                       return EmptyMessageParameters.getInstance();
+               }
+       }
+
+       private enum TestVersionSelectionHeaders1 implements 
TestVersionSelectionHeadersBase {
+               INSTANCE;
+
+               @Override
+               public Collection<RestAPIVersion> getSupportedAPIVersions() {
+                       return Collections.singleton(RestAPIVersion.V0);
+               }
+       }
+
+       private enum TestVersionSelectionHeaders2 implements 
TestVersionSelectionHeadersBase {
+               INSTANCE;
+
+               @Override
+               public Collection<RestAPIVersion> getSupportedAPIVersions() {
+                       return Collections.singleton(RestAPIVersion.V1);
+               }
+       }
+
+       private static class TestVersionSelectionHandler1 extends 
AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, 
EmptyMessageParameters> {
+
+               private TestVersionSelectionHandler1(
+                       final CompletableFuture<String> localRestAddress,
+                       final GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
+                       final Time timeout) {
+                       super(localRestAddress, leaderRetriever, timeout, 
Collections.emptyMap(), TestVersionSelectionHeaders1.INSTANCE);
+               }
+
+               @Override
+               protected CompletableFuture<EmptyResponseBody> 
handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> 
request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
+                       throw new RestHandlerException("test failure 1", 
HttpResponseStatus.OK);
+               }
+       }
+
+       private static class TestVersionSelectionHandler2 extends 
AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, 
EmptyMessageParameters> {
+
+               private TestVersionSelectionHandler2(
+                       final CompletableFuture<String> localRestAddress,
+                       final GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
+                       final Time timeout) {
+                       super(localRestAddress, leaderRetriever, timeout, 
Collections.emptyMap(), TestVersionSelectionHeaders2.INSTANCE);
+               }
+
+               @Override
+               protected CompletableFuture<EmptyResponseBody> 
handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> 
request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
+                       throw new RestHandlerException("test failure 2", 
HttpResponseStatus.ACCEPTED);
+               }
+       }
+
        private enum TestUploadHeaders implements 
MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
                INSTANCE;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/versioning/RestAPIVersionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/versioning/RestAPIVersionTest.java
new file mode 100644
index 0000000..4f60da1
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/versioning/RestAPIVersionTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.versioning;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Tests for {@link RestAPIVersion}.
+ */
+public class RestAPIVersionTest extends TestLogger {
+       @Test
+       public void testGetLatest() {
+               Collection<RestAPIVersion> candidates = 
Arrays.asList(RestAPIVersion.V0, RestAPIVersion.V1);
+               Assert.assertEquals(RestAPIVersion.V1, 
RestAPIVersion.getLatestVersion(candidates));
+       }
+
+       @Test
+       public void testSingleDefaultVersion() {
+               final List<RestAPIVersion> defaultVersions = 
Arrays.stream(RestAPIVersion.values())
+                       .filter(RestAPIVersion::isDefaultVersion)
+                       .collect(Collectors.toList());
+
+               Assert.assertEquals(
+                       "Only one RestAPIVersion should be marked as the 
default. Defaults: " + defaultVersions,
+                       1,
+                       defaultVersions.size());
+       }
+}

Reply via email to