This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 14ae6e5df54d0855e414270c5c107839d0ee1c76 Author: zentol <[email protected]> AuthorDate: Wed Aug 22 12:04:29 2018 +0200 [FLINK-7551][rest] Add versioning --- ...est_dispatcher.html => rest_v1_dispatcher.html} | 0 docs/monitoring/rest_api.md | 51 +++-- .../flink/docs/rest/RestAPIDocGenerator.java | 18 +- .../HistoryServerStaticFileServerHandler.java | 2 +- .../HistoryServerStaticFileServerHandlerTest.java | 8 +- .../org/apache/flink/runtime/rest/RestClient.java | 32 ++- .../flink/runtime/rest/RestServerEndpoint.java | 44 +++- .../rest/handler/RestHandlerSpecification.java | 13 ++ .../runtime/rest/versioning/RestAPIVersion.java | 98 ++++++++ .../apache/flink/runtime/rest/RestClientTest.java | 28 ++- .../runtime/rest/RestServerEndpointITCase.java | 251 +++++++++++++++++++++ .../rest/versioning/RestAPIVersionTest.java | 52 +++++ 12 files changed, 561 insertions(+), 36 deletions(-) diff --git a/docs/_includes/generated/rest_dispatcher.html b/docs/_includes/generated/rest_v1_dispatcher.html similarity index 100% rename from docs/_includes/generated/rest_dispatcher.html rename to docs/_includes/generated/rest_v1_dispatcher.html diff --git a/docs/monitoring/rest_api.md b/docs/monitoring/rest_api.md index eefc8b9..85ec0ac 100644 --- a/docs/monitoring/rest_api.md +++ b/docs/monitoring/rest_api.md @@ -52,13 +52,26 @@ To add new requests, one needs to A good example is the `org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler` that uses the `org.apache.flink.runtime.rest.messages.JobExceptionsHeaders`. -## Available Requests +## API -### Dispatcher +The REST API is versioned, with specific versions being queryable by prefixing the url with the version prefix. Prefixes are always of the form `v[version_number]`. +For example, to access version 1 of `/foo/bar` one would query `/v1/foo/bar`. -{% include generated/rest_dispatcher.html %} +If no version is specified Flink will default to the *oldest* version supporting the request. -## Legacy +Querying unsupported/non-existing versions will return a 404 error. + +<span class="label label-danger">Attention</span> REST API versioning is *not* active if the cluster runs in [legacy mode](../ops/config.html#mode). For this case please refer to the legacy API below. + +<div class="codetabs" markdown="1"> + +<div data-lang="v1" markdown="1"> +#### Dispatcher + +{% include generated/rest_v1_dispatcher.html %} +</div> + +<div data-lang="legacy" markdown="1"> This section is only relevant if the cluster runs in [legacy mode](../ops/config.html#mode). @@ -90,7 +103,7 @@ Values in angle brackets are variables, for example `http://hostname:8081/jobs/< - `/jars/:jarid/run` -### General +#### General **`/config`** @@ -126,7 +139,7 @@ Sample Result: } {% endhighlight %} -### Overview of Jobs +#### Overview of Jobs **`/jobs/overview`** @@ -163,7 +176,7 @@ Sample Result: } {% endhighlight %} -### Details of a Running or Completed Job +#### Details of a Running or Completed Job **`/jobs/<jobid>`** @@ -573,15 +586,15 @@ Sample Result: } {% endhighlight %} -### Job Cancellation +#### Job Cancellation -#### Cancel Job +##### Cancel Job `DELETE` request to **`/jobs/:jobid/cancel`**. Triggers job cancellation, result on success is `{}`. -#### Cancel Job with Savepoint +##### Cancel Job with Savepoint Triggers a savepoint and cancels the job after the savepoint succeeds. @@ -601,7 +614,7 @@ Sample Trigger Result: } {% endhighlight %} -##### Monitoring Progress +###### Monitoring Progress The progress of the cancellation has to be monitored by the user at @@ -611,7 +624,7 @@ The progress of the cancellation has to be monitored by the user at The request ID is returned by the trigger result. -###### In-Progress +####### In-Progress {% highlight json %} { @@ -620,7 +633,7 @@ The request ID is returned by the trigger result. } {% endhighlight %} -###### Success +####### Success {% highlight json %} { @@ -632,7 +645,7 @@ The request ID is returned by the trigger result. The `savepointPath` points to the external path of the savepoint, which can be used to resume the savepoint. -###### Failed +####### Failed {% highlight json %} { @@ -642,11 +655,11 @@ The `savepointPath` points to the external path of the savepoint, which can be u } {% endhighlight %} -### Submitting Programs +#### Submitting Programs It is possible to upload, run, and list Flink programs via the REST APIs and web frontend. -#### Upload a new JAR file +##### Upload a new JAR file Send a `POST` request to `/jars/upload` with your jar file sent as multi-part data under the `jarfile` file. Also make sure that the multi-part data includes the `Content-Type` of the file itself, some http libraries do not add the header by default. @@ -659,7 +672,7 @@ Content-Disposition: form-data; name="jarfile"; filename="YourFileName.jar" Content-Type: application/x-java-archive {% endhighlight %} -#### Run a Program (POST) +##### Run a Program (POST) Send a `POST` request to `/jars/:jarid/run`. The `jarid` parameter is the file name of the program JAR in the configured web frontend upload directory (configuration key `web.upload.dir`). @@ -688,3 +701,7 @@ Response: {% endhighlight %} {% top %} +</div> + +</div> + diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java index 82fdeec..4df1d6e 100644 --- a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java +++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.rest.messages.EmptyResponseBody; import org.apache.flink.runtime.rest.messages.MessageHeaders; import org.apache.flink.runtime.rest.messages.MessagePathParameter; import org.apache.flink.runtime.rest.messages.MessageQueryParameter; +import org.apache.flink.runtime.rest.versioning.RestAPIVersion; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; @@ -123,13 +124,24 @@ public class RestAPIDocGenerator { public static void main(String[] args) throws IOException { String outputDirectory = args[0]; - createHtmlFile(new DocumentingDispatcherRestEndpoint(), Paths.get(outputDirectory, "rest_dispatcher.html")); + for (final RestAPIVersion apiVersion : RestAPIVersion.values()) { + if (apiVersion == RestAPIVersion.V0) { + // this version exists only for testing purposes + continue; + } + createHtmlFile( + new DocumentingDispatcherRestEndpoint(), + apiVersion, + Paths.get(outputDirectory, "rest_" + apiVersion.getURLVersionPrefix() + "_dispatcher.html")); + } } - private static void createHtmlFile(DocumentingRestEndpoint restEndpoint, Path outputFile) throws IOException { + private static void createHtmlFile(DocumentingRestEndpoint restEndpoint, RestAPIVersion apiVersion, Path outputFile) throws IOException { StringBuilder html = new StringBuilder(); - List<MessageHeaders> specs = restEndpoint.getSpecs(); + List<MessageHeaders> specs = restEndpoint.getSpecs().stream() + .filter(spec -> spec.getSupportedAPIVersions().contains(apiVersion)) + .collect(Collectors.toList()); specs.forEach(spec -> html.append(createHtmlEntry(spec))); Files.deleteIfExists(outputFile); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java index 2042088..d8542d6 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java @@ -164,7 +164,7 @@ public class HistoryServerStaticFileServerHandler extends SimpleChannelInboundHa HandlerUtils.sendErrorResponse( ctx, request, - new ErrorResponseBody(String.format("Unable to load requested file %s.", requestPath)), + new ErrorResponseBody("File not found."), NOT_FOUND, Collections.emptyMap()); return; diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java index b08504d..19a3d52 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java @@ -30,6 +30,8 @@ import org.slf4j.LoggerFactory; import java.io.File; +import static org.hamcrest.CoreMatchers.containsString; + /** * Tests for the HistoryServerStaticFileServerHandler. */ @@ -56,7 +58,7 @@ public class HistoryServerStaticFileServerHandlerTest { try { // verify that 404 message is returned when requesting a non-existent file String notFound404 = HistoryServerTest.getFromHTTP("http://localhost:" + port + "/hello"); - Assert.assertTrue(notFound404.contains("404 Not Found")); + Assert.assertThat(notFound404, containsString("not found")); // verify that a) a file can be loaded using the ClassLoader and b) that the HistoryServer // index_hs.html is injected @@ -71,12 +73,12 @@ public class HistoryServerStaticFileServerHandlerTest { File dir = new File(webDir, "dir.json"); dir.mkdirs(); String dirNotFound404 = HistoryServerTest.getFromHTTP("http://localhost:" + port + "/dir"); - Assert.assertTrue(dirNotFound404.contains("404 Not Found")); + Assert.assertTrue(dirNotFound404.contains("not found")); // verify that a 404 message is returned when requesting a file outside the webDir tmp.newFile("secret"); String x = HistoryServerTest.getFromHTTP("http://localhost:" + port + "/../secret"); - Assert.assertTrue(x.contains("404 Not Found")); + Assert.assertTrue(x.contains("not found")); } finally { webUI.shutdown(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java index 2e9de4c..a855749 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.rest.util.RestClientException; import org.apache.flink.runtime.rest.util.RestConstants; import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.runtime.rest.versioning.RestAPIVersion; import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; @@ -85,6 +86,7 @@ import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE; @@ -173,6 +175,24 @@ public class RestClient { U messageParameters, R request, Collection<FileUpload> fileUploads) throws IOException { + return sendRequest( + targetAddress, + targetPort, + messageHeaders, + messageParameters, + request, + fileUploads, + RestAPIVersion.getLatestVersion(messageHeaders.getSupportedAPIVersions())); + } + + public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest( + String targetAddress, + int targetPort, + M messageHeaders, + U messageParameters, + R request, + Collection<FileUpload> fileUploads, + RestAPIVersion apiVersion) throws IOException { Preconditions.checkNotNull(targetAddress); Preconditions.checkArgument(0 <= targetPort && targetPort < 65536, "The target port " + targetPort + " is not in the range (0, 65536]."); Preconditions.checkNotNull(messageHeaders); @@ -181,7 +201,17 @@ public class RestClient { Preconditions.checkNotNull(fileUploads); Preconditions.checkState(messageParameters.isResolved(), "Message parameters were not resolved."); - String targetUrl = MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), messageParameters); + if (!messageHeaders.getSupportedAPIVersions().contains(apiVersion)) { + throw new IllegalArgumentException(String.format( + "The requested version %s is not supported by the request (method=%s URL=%s). Supported versions are: %s.", + apiVersion, + messageHeaders.getHttpMethod(), + messageHeaders.getTargetRestEndpointURL(), + messageHeaders.getSupportedAPIVersions().stream().map(RestAPIVersion::getURLVersionPrefix).collect(Collectors.joining(",")))); + } + + String versionedHandlerURL = "/" + apiVersion.getURLVersionPrefix() + messageHeaders.getTargetRestEndpointURL(); + String targetUrl = MessageParameters.resolveUrl(versionedHandlerURL, messageParameters); LOG.debug("Sending request of class {} to {}:{}{}", request.getClass(), targetAddress, targetPort, targetUrl); // serialize payload diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java index e836e35..28af072 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.rest.handler.PipelineErrorHandler; import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; import org.apache.flink.runtime.rest.handler.router.Router; import org.apache.flink.runtime.rest.handler.router.RouterHandler; +import org.apache.flink.runtime.rest.versioning.RestAPIVersion; import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.util.AutoCloseableAsync; import org.apache.flink.util.Preconditions; @@ -144,8 +145,7 @@ public abstract class RestServerEndpoint implements AutoCloseableAsync { RestHandlerUrlComparator.INSTANCE); handlers.forEach(handler -> { - log.debug("Register handler {} under {}@{}.", handler.f1, handler.f0.getHttpMethod(), handler.f0.getTargetRestEndpointURL()); - registerHandler(router, handler); + registerHandler(router, handler, log); }); ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() { @@ -364,22 +364,37 @@ public abstract class RestServerEndpoint implements AutoCloseableAsync { } } - private static void registerHandler(Router router, Tuple2<RestHandlerSpecification, ChannelInboundHandler> specificationHandler) { - switch (specificationHandler.f0.getHttpMethod()) { + private static void registerHandler(Router router, Tuple2<RestHandlerSpecification, ChannelInboundHandler> specificationHandler, Logger log) { + final String handlerURL = specificationHandler.f0.getTargetRestEndpointURL(); + // setup versioned urls + for (final RestAPIVersion supportedVersion : specificationHandler.f0.getSupportedAPIVersions()) { + final String versionedHandlerURL = '/' + supportedVersion.getURLVersionPrefix() + handlerURL; + log.debug("Register handler {} under {}@{}.", specificationHandler.f1, specificationHandler.f0.getHttpMethod(), versionedHandlerURL); + registerHandler(router, versionedHandlerURL, specificationHandler.f0.getHttpMethod(), specificationHandler.f1); + if (supportedVersion.isDefaultVersion()) { + // setup unversioned url for convenience and backwards compatibility + log.debug("Register handler {} under {}@{}.", specificationHandler.f1, specificationHandler.f0.getHttpMethod(), handlerURL); + registerHandler(router, handlerURL, specificationHandler.f0.getHttpMethod(), specificationHandler.f1); + } + } + } + + private static void registerHandler(Router router, String handlerURL, HttpMethodWrapper httpMethod, ChannelInboundHandler handler) { + switch (httpMethod) { case GET: - router.addGet(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1); + router.addGet(handlerURL, handler); break; case POST: - router.addPost(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1); + router.addPost(handlerURL, handler); break; case DELETE: - router.addDelete(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1); + router.addDelete(handlerURL, handler); break; case PATCH: - router.addPatch(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1); + router.addPatch(handlerURL, handler); break; default: - throw new RuntimeException("Unsupported http method: " + specificationHandler.f0.getHttpMethod() + '.'); + throw new RuntimeException("Unsupported http method: " + httpMethod + '.'); } } @@ -437,13 +452,22 @@ public abstract class RestServerEndpoint implements AutoCloseableAsync { private static final Comparator<String> CASE_INSENSITIVE_ORDER = new CaseInsensitiveOrderComparator(); + private static final Comparator<RestAPIVersion> API_VERSION_ORDER = new RestAPIVersion.RestAPIVersionComparator(); + static final RestHandlerUrlComparator INSTANCE = new RestHandlerUrlComparator(); @Override public int compare( Tuple2<RestHandlerSpecification, ChannelInboundHandler> o1, Tuple2<RestHandlerSpecification, ChannelInboundHandler> o2) { - return CASE_INSENSITIVE_ORDER.compare(o1.f0.getTargetRestEndpointURL(), o2.f0.getTargetRestEndpointURL()); + final int urlComparisonResult = CASE_INSENSITIVE_ORDER.compare(o1.f0.getTargetRestEndpointURL(), o2.f0.getTargetRestEndpointURL()); + if (urlComparisonResult != 0) { + return urlComparisonResult; + } else { + return API_VERSION_ORDER.compare( + Collections.min(o1.f0.getSupportedAPIVersions()), + Collections.min(o2.f0.getSupportedAPIVersions())); + } } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java index 4ebcd49..6561679 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java @@ -19,6 +19,10 @@ package org.apache.flink.runtime.rest.handler; import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.versioning.RestAPIVersion; + +import java.util.Collection; +import java.util.Collections; /** * Rest handler interface which all rest handler implementation have to implement. @@ -38,4 +42,13 @@ public interface RestHandlerSpecification { * @return endpoint url that this request should be sent to */ String getTargetRestEndpointURL(); + + /** + * Returns the supported API versions that this request supports. + * + * @return Collection of supported API versions + */ + default Collection<RestAPIVersion> getSupportedAPIVersions() { + return Collections.singleton(RestAPIVersion.V1); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RestAPIVersion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RestAPIVersion.java new file mode 100644 index 0000000..d630563 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RestAPIVersion.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.versioning; + +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; + +/** + * An enum for all versions of the REST API. + * + * <p>REST API versions are global and thus apply to every REST component. + * + * <p>Changes that must result in an API version increment include but are not limited to: + * - modification of a handler url + * - addition of new mandatory parameters + * - removal of a handler/request + * - modifications to request/response bodies (excluding additions) + */ +public enum RestAPIVersion { + V0(0, false), // strictly for testing purposes + V1(1, true); + + private final int versionNumber; + + private final boolean isDefaultVersion; + + RestAPIVersion(int versionNumber, boolean isDefaultVersion) { + this.versionNumber = versionNumber; + this.isDefaultVersion = isDefaultVersion; + } + + /** + * Returns the URL version prefix (e.g. "v1") for this version. + * + * @return URL version prefix + */ + public String getURLVersionPrefix() { + return name().toLowerCase(); + } + + /** + * Returns whether this version is the default REST API version. + * + * @return whether this version is the default + */ + public boolean isDefaultVersion() { + return isDefaultVersion; + } + + /** + * Converts the given URL version prefix (e.g "v1") to a {@link RestAPIVersion}. + * + * @param prefix prefix to converted + * @return REST API version matching the prefix + * @throws IllegalArgumentException if the prefix doesn't match any version + */ + public static RestAPIVersion fromURLVersionPrefix(String prefix) { + return valueOf(prefix.toUpperCase()); + } + + /** + * Returns the latest version from the given collection. + * + * @param versions possible candidates + * @return latest version + */ + public static RestAPIVersion getLatestVersion(Collection<RestAPIVersion> versions) { + return Collections.max(versions, new RestAPIVersionComparator()); + } + + /** + * Comparator for {@link RestAPIVersion} that sorts versions based on their version number, i.e. oldest to latest. + */ + public static class RestAPIVersionComparator implements Comparator<RestAPIVersion> { + + @Override + public int compare(RestAPIVersion o1, RestAPIVersion o2) { + return Integer.compare(o1.versionNumber, o2.versionNumber); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java index 209f2d1..22cd6f6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java @@ -25,14 +25,18 @@ import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.EmptyResponseBody; import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.versioning.RestAPIVersion; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; +import org.junit.Assert; import org.junit.Test; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -45,12 +49,13 @@ import static org.junit.Assert.assertThat; */ public class RestClientTest extends TestLogger { + private static final String unroutableIp = "10.255.255.1"; + @Test public void testConnectionTimeout() throws Exception { final Configuration config = new Configuration(); config.setLong(RestOptions.CONNECTION_TIMEOUT, 1); final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(config), Executors.directExecutor()); - final String unroutableIp = "10.255.255.1"; try { restClient.sendRequest( unroutableIp, @@ -66,6 +71,27 @@ public class RestClientTest extends TestLogger { } } + @Test + public void testInvalidVersionRejection() throws Exception { + final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), Executors.directExecutor()); + + try { + CompletableFuture<EmptyResponseBody> invalidVersionResponse = restClient.sendRequest( + unroutableIp, + 80, + new TestMessageHeaders(), + EmptyMessageParameters.getInstance(), + EmptyRequestBody.getInstance(), + Collections.emptyList(), + RestAPIVersion.V0 + ); + Assert.fail("The request should have been rejected due to a version mismatch."); + } catch (IllegalArgumentException e) { + // expected + } + + } + private static class TestMessageHeaders implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> { @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java index 31f78e3..b017610 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java @@ -43,6 +43,7 @@ import org.apache.flink.runtime.rest.messages.MessageQueryParameter; import org.apache.flink.runtime.rest.messages.RequestBody; import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.rest.util.RestClientException; +import org.apache.flink.runtime.rest.versioning.RestAPIVersion; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.webmonitor.RestfulGateway; @@ -56,8 +57,12 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; import org.apache.flink.shaded.netty4.io.netty.handler.codec.TooLongFrameException; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; import org.apache.commons.io.IOUtils; import org.junit.After; +import org.junit.Assume; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -85,6 +90,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static java.util.Objects.requireNonNull; @@ -184,6 +190,21 @@ public class RestServerEndpointITCase extends TestLogger { mockGatewayRetriever, RpcUtils.INF_TIMEOUT); + TestVersionHandler testVersionHandler = new TestVersionHandler( + CompletableFuture.completedFuture(restAddress), + mockGatewayRetriever, + RpcUtils.INF_TIMEOUT); + + TestVersionSelectionHandler1 testVersionSelectionHandler1 = new TestVersionSelectionHandler1( + CompletableFuture.completedFuture(restAddress), + mockGatewayRetriever, + RpcUtils.INF_TIMEOUT); + + TestVersionSelectionHandler2 testVersionSelectionHandler2 = new TestVersionSelectionHandler2( + CompletableFuture.completedFuture(restAddress), + mockGatewayRetriever, + RpcUtils.INF_TIMEOUT); + testUploadHandler = new TestUploadHandler( CompletableFuture.completedFuture(restAddress), mockGatewayRetriever, @@ -198,6 +219,9 @@ public class RestServerEndpointITCase extends TestLogger { final List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = Arrays.asList( Tuple2.of(new TestHeaders(), testHandler), Tuple2.of(TestUploadHeaders.INSTANCE, testUploadHandler), + Tuple2.of(testVersionHandler.getMessageHeaders(), testVersionHandler), + Tuple2.of(testVersionSelectionHandler1.getMessageHeaders(), testVersionSelectionHandler1), + Tuple2.of(testVersionSelectionHandler2.getMessageHeaders(), testVersionSelectionHandler2), Tuple2.of(WebContentHandlerSpecification.getInstance(), staticFileServerHandler)); serverEndpoint = new TestRestServerEndpoint(serverConfig, handlers); @@ -415,6 +439,88 @@ public class RestServerEndpointITCase extends TestLogger { assertEquals("foobar", fileContents.trim()); } + @Test + public void testVersioning() throws Exception { + CompletableFuture<EmptyResponseBody> unspecifiedVersionResponse = restClient.sendRequest( + serverAddress.getHostName(), + serverAddress.getPort(), + TestVersionHeaders.INSTANCE, + EmptyMessageParameters.getInstance(), + EmptyRequestBody.getInstance(), + Collections.emptyList() + ); + + unspecifiedVersionResponse.get(5, TimeUnit.SECONDS); + + CompletableFuture<EmptyResponseBody> specifiedVersionResponse = restClient.sendRequest( + serverAddress.getHostName(), + serverAddress.getPort(), + TestVersionHeaders.INSTANCE, + EmptyMessageParameters.getInstance(), + EmptyRequestBody.getInstance(), + Collections.emptyList(), + RestAPIVersion.V1 + ); + + specifiedVersionResponse.get(5, TimeUnit.SECONDS); + } + + @Test + public void testVersionSelection() throws Exception { + CompletableFuture<EmptyResponseBody> version1Response = restClient.sendRequest( + serverAddress.getHostName(), + serverAddress.getPort(), + TestVersionSelectionHeaders1.INSTANCE, + EmptyMessageParameters.getInstance(), + EmptyRequestBody.getInstance(), + Collections.emptyList(), + RestAPIVersion.V0 + ); + + try { + version1Response.get(5, TimeUnit.SECONDS); + fail(); + } catch (ExecutionException ee) { + RestClientException rce = (RestClientException) ee.getCause(); + assertEquals(HttpResponseStatus.OK, rce.getHttpResponseStatus()); + } + + CompletableFuture<EmptyResponseBody> version2Response = restClient.sendRequest( + serverAddress.getHostName(), + serverAddress.getPort(), + TestVersionSelectionHeaders2.INSTANCE, + EmptyMessageParameters.getInstance(), + EmptyRequestBody.getInstance(), + Collections.emptyList(), + RestAPIVersion.V1 + ); + + try { + version2Response.get(5, TimeUnit.SECONDS); + fail(); + } catch (ExecutionException ee) { + RestClientException rce = (RestClientException) ee.getCause(); + assertEquals(HttpResponseStatus.ACCEPTED, rce.getHttpResponseStatus()); + } + } + + @Test + public void testDefaultVersionRouting() throws Exception { + Assume.assumeFalse( + "Ignoring SSL-enabled test to keep OkHttp usage simple.", + config.getBoolean(SecurityOptions.SSL_REST_ENABLED)); + + OkHttpClient client = new OkHttpClient(); + + final Request request = new Request.Builder() + .url(serverEndpoint.getRestBaseUrl() + TestVersionSelectionHeaders2.INSTANCE.getTargetRestEndpointURL()) + .build(); + + try (final Response response = client.newCall(request).execute()) { + assertEquals(HttpResponseStatus.ACCEPTED.code(), response.code()); + } + } + private HttpURLConnection openHttpConnectionForUpload(final String boundary) throws IOException { final HttpURLConnection connection = (HttpURLConnection) new URL(serverEndpoint.getRestBaseUrl() + "/upload").openConnection(); @@ -697,6 +803,151 @@ public class RestServerEndpointITCase extends TestLogger { } } + private static class TestVersionHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> { + + private TestVersionHandler( + final CompletableFuture<String> localRestAddress, + final GatewayRetriever<? extends RestfulGateway> leaderRetriever, + final Time timeout) { + super(localRestAddress, leaderRetriever, timeout, Collections.emptyMap(), TestVersionHeaders.INSTANCE); + } + + @Override + protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException { + return CompletableFuture.completedFuture(EmptyResponseBody.getInstance()); + } + } + + private enum TestVersionHeaders implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> { + INSTANCE; + + @Override + public Class<EmptyRequestBody> getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return "/test/versioning"; + } + + @Override + public Class<EmptyResponseBody> getResponseClass() { + return EmptyResponseBody.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public String getDescription() { + return null; + } + + @Override + public EmptyMessageParameters getUnresolvedMessageParameters() { + return EmptyMessageParameters.getInstance(); + } + + @Override + public Collection<RestAPIVersion> getSupportedAPIVersions() { + return Collections.singleton(RestAPIVersion.V1); + } + } + + private interface TestVersionSelectionHeadersBase extends MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> { + + @Override + default Class<EmptyRequestBody> getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + default HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + default String getTargetRestEndpointURL() { + return "/test/select-version"; + } + + @Override + default Class<EmptyResponseBody> getResponseClass() { + return EmptyResponseBody.class; + } + + @Override + default HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + default String getDescription() { + return null; + } + + @Override + default EmptyMessageParameters getUnresolvedMessageParameters() { + return EmptyMessageParameters.getInstance(); + } + } + + private enum TestVersionSelectionHeaders1 implements TestVersionSelectionHeadersBase { + INSTANCE; + + @Override + public Collection<RestAPIVersion> getSupportedAPIVersions() { + return Collections.singleton(RestAPIVersion.V0); + } + } + + private enum TestVersionSelectionHeaders2 implements TestVersionSelectionHeadersBase { + INSTANCE; + + @Override + public Collection<RestAPIVersion> getSupportedAPIVersions() { + return Collections.singleton(RestAPIVersion.V1); + } + } + + private static class TestVersionSelectionHandler1 extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> { + + private TestVersionSelectionHandler1( + final CompletableFuture<String> localRestAddress, + final GatewayRetriever<? extends RestfulGateway> leaderRetriever, + final Time timeout) { + super(localRestAddress, leaderRetriever, timeout, Collections.emptyMap(), TestVersionSelectionHeaders1.INSTANCE); + } + + @Override + protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException { + throw new RestHandlerException("test failure 1", HttpResponseStatus.OK); + } + } + + private static class TestVersionSelectionHandler2 extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> { + + private TestVersionSelectionHandler2( + final CompletableFuture<String> localRestAddress, + final GatewayRetriever<? extends RestfulGateway> leaderRetriever, + final Time timeout) { + super(localRestAddress, leaderRetriever, timeout, Collections.emptyMap(), TestVersionSelectionHeaders2.INSTANCE); + } + + @Override + protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException { + throw new RestHandlerException("test failure 2", HttpResponseStatus.ACCEPTED); + } + } + private enum TestUploadHeaders implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> { INSTANCE; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/versioning/RestAPIVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/versioning/RestAPIVersionTest.java new file mode 100644 index 0000000..4f60da1 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/versioning/RestAPIVersionTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.versioning; + +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Tests for {@link RestAPIVersion}. + */ +public class RestAPIVersionTest extends TestLogger { + @Test + public void testGetLatest() { + Collection<RestAPIVersion> candidates = Arrays.asList(RestAPIVersion.V0, RestAPIVersion.V1); + Assert.assertEquals(RestAPIVersion.V1, RestAPIVersion.getLatestVersion(candidates)); + } + + @Test + public void testSingleDefaultVersion() { + final List<RestAPIVersion> defaultVersions = Arrays.stream(RestAPIVersion.values()) + .filter(RestAPIVersion::isDefaultVersion) + .collect(Collectors.toList()); + + Assert.assertEquals( + "Only one RestAPIVersion should be marked as the default. Defaults: " + defaultVersions, + 1, + defaultVersions.size()); + } +}
