Repository: flink Updated Branches: refs/heads/release-1.6 343614cd8 -> 1386f91b0
[FLINK-9811][test] Add test for jar handler interactions This closes #6311. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1386f91b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1386f91b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1386f91b Branch: refs/heads/release-1.6 Commit: 1386f91b0ad50ee4900e221581f19608d7217994 Parents: 4022063 Author: zentol <ches...@apache.org> Authored: Wed Jul 11 18:41:06 2018 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Wed Jul 18 09:52:19 2018 +0200 ---------------------------------------------------------------------- flink-runtime-web/pom.xml | 28 +++ .../handlers/JarDeleteMessageParameters.java | 2 +- .../webmonitor/handlers/JarListInfo.java | 6 +- .../handlers/JarPlanMessageParameters.java | 2 +- .../handlers/JarSubmissionITCase.java | 226 +++++++++++++++++++ .../webmonitor/handlers/utils/TestProgram.java | 31 +++ .../runtime/rest/messages/JobPlanInfo.java | 6 + .../flink/runtime/util/BlobServerResource.java | 65 ++++++ 8 files changed, 361 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1386f91b/flink-runtime-web/pom.xml ---------------------------------------------------------------------- diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml index 837aadb..943a737 100644 --- a/flink-runtime-web/pom.xml +++ b/flink-runtime-web/pom.xml @@ -135,8 +135,36 @@ under the License. <goal>test-jar</goal> </goals> </execution> + <execution> + <id>test-program-jar</id> + <phase>process-test-classes</phase> + <goals> + <goal>jar</goal> + </goals> + <configuration> + <includes> + <include>org/apache/flink/runtime/webmonitor/handlers/utils/TestProgram.java</include> + </includes> + <archive> + <manifest> + <mainClass>org.apache.flink.runtime.webmonitor.handlers.utils.TestProgram</mainClass> + </manifest> + </archive> + <finalName>test-program</finalName> + </configuration> + </execution> </executions> </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <systemPropertyVariables> + <targetDir>${project.build.directory}</targetDir> + </systemPropertyVariables> + </configuration> + </plugin> </plugins> </build> http://git-wip-us.apache.org/repos/asf/flink/blob/1386f91b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteMessageParameters.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteMessageParameters.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteMessageParameters.java index 9080409..2b70602 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteMessageParameters.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteMessageParameters.java @@ -30,7 +30,7 @@ import java.util.Collections; */ public class JarDeleteMessageParameters extends MessageParameters { - private JarIdPathParameter jarIdPathParameter = new JarIdPathParameter(); + public JarIdPathParameter jarIdPathParameter = new JarIdPathParameter(); @Override public Collection<MessagePathParameter<?>> getPathParameters() { http://git-wip-us.apache.org/repos/asf/flink/blob/1386f91b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListInfo.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListInfo.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListInfo.java index 9168686..4d8d4c9 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListInfo.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListInfo.java @@ -41,7 +41,7 @@ public class JarListInfo implements ResponseBody { private String address; @JsonProperty(JAR_LIST_FIELD_FILES) - private List<JarFileInfo> jarFileList; + public List<JarFileInfo> jarFileList; @JsonCreator public JarListInfo( @@ -85,10 +85,10 @@ public class JarListInfo implements ResponseBody { public static final String JAR_FILE_FIELD_ENTRY = "entry"; @JsonProperty(JAR_FILE_FIELD_ID) - private String id; + public String id; @JsonProperty(JAR_FILE_FIELD_NAME) - private String name; + public String name; @JsonProperty(JAR_FILE_FIELD_UPLOADED) private long uploaded; http://git-wip-us.apache.org/repos/asf/flink/blob/1386f91b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanMessageParameters.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanMessageParameters.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanMessageParameters.java index 7dd9950..8599a2c 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanMessageParameters.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanMessageParameters.java @@ -31,7 +31,7 @@ import java.util.Collections; */ public class JarPlanMessageParameters extends MessageParameters { - private final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter(); + public final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter(); private final EntryClassQueryParameter entryClassQueryParameter = new EntryClassQueryParameter(); http://git-wip-us.apache.org/repos/asf/flink/blob/1386f91b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java new file mode 100644 index 0000000..e47a38a --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobPlanInfo; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.util.BlobServerResource; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.OperatingSystem; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import static org.hamcrest.Matchers.containsString; + +/** + * Tests the entire lifecycle of a jar submission. + */ +public class JarSubmissionITCase extends TestLogger { + + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Rule + public final BlobServerResource blobServerResource = new BlobServerResource(); + + @BeforeClass + public static void checkOS() { + Assume.assumeFalse("This test fails on Windows due to unclosed JarFiles, see FLINK-9844.", OperatingSystem.isWindows()); + } + + @Test + public void testJarSubmission() throws Exception { + final TestingDispatcherGateway restfulGateway = new TestingDispatcherGateway.Builder() + .setBlobServerPort(blobServerResource.getBlobServerPort()) + .setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get())) + .build(); + final JarHandlers handlers = new JarHandlers(temporaryFolder.newFolder().toPath(), restfulGateway); + final JarUploadHandler uploadHandler = handlers.uploadHandler; + final JarListHandler listHandler = handlers.listHandler; + final JarPlanHandler planHandler = handlers.planHandler; + final JarRunHandler runHandler = handlers.runHandler; + final JarDeleteHandler deleteHandler = handlers.deleteHandler; + + // targetDir property is set via surefire configuration + final Path originalJar = Paths.get(System.getProperty("targetDir")).resolve("test-program.jar"); + final Path jar = Files.copy(originalJar, temporaryFolder.getRoot().toPath().resolve("test-program.jar")); + + final String storedJarPath = uploadJar(uploadHandler, jar, restfulGateway); + final String storedJarName = Paths.get(storedJarPath).getFileName().toString(); + + final JarListInfo postUploadListResponse = listJars(listHandler, restfulGateway); + Assert.assertEquals(1, postUploadListResponse.jarFileList.size()); + final JarListInfo.JarFileInfo listEntry = postUploadListResponse.jarFileList.iterator().next(); + Assert.assertEquals(jar.getFileName().toString(), listEntry.name); + Assert.assertEquals(storedJarName, listEntry.id); + + final JobPlanInfo planResponse = showPlan(planHandler, storedJarName, restfulGateway); + // we're only interested in the core functionality so checking for a small detail is sufficient + Assert.assertThat(planResponse.getJsonPlan(), containsString("TestProgram.java:29")); + + runJar(runHandler, storedJarName, restfulGateway); + + deleteJar(deleteHandler, storedJarName, restfulGateway); + + final JarListInfo postDeleteListResponse = listJars(listHandler, restfulGateway); + Assert.assertEquals(0, postDeleteListResponse.jarFileList.size()); + } + + private static String uploadJar(JarUploadHandler handler, Path jar, RestfulGateway restfulGateway) throws Exception { + HandlerRequest<EmptyRequestBody, EmptyMessageParameters> uploadRequest = new HandlerRequest<>( + EmptyRequestBody.getInstance(), + EmptyMessageParameters.getInstance(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.singletonList(jar.toFile())); + final JarUploadResponseBody uploadResponse = handler.handleRequest(uploadRequest, restfulGateway) + .get(); + return uploadResponse.getFilename(); + } + + private static JarListInfo listJars(JarListHandler handler, RestfulGateway restfulGateway) throws Exception { + HandlerRequest<EmptyRequestBody, EmptyMessageParameters> listRequest = new HandlerRequest<>( + EmptyRequestBody.getInstance(), + EmptyMessageParameters.getInstance()); + return handler.handleRequest(listRequest, restfulGateway) + .get(); + } + + private static JobPlanInfo showPlan(JarPlanHandler handler, String jarName, RestfulGateway restfulGateway) throws Exception { + JarPlanMessageParameters planParameters = JarPlanHeaders.getInstance().getUnresolvedMessageParameters(); + HandlerRequest<EmptyRequestBody, JarPlanMessageParameters> planRequest = new HandlerRequest<>( + EmptyRequestBody.getInstance(), + planParameters, + Collections.singletonMap(planParameters.jarIdPathParameter.getKey(), jarName), + Collections.emptyMap(), + Collections.emptyList()); + return handler.handleRequest(planRequest, restfulGateway) + .get(); + } + + private static JarRunResponseBody runJar(JarRunHandler handler, String jarName, DispatcherGateway restfulGateway) throws Exception { + final JarRunMessageParameters runParameters = JarRunHeaders.getInstance().getUnresolvedMessageParameters(); + HandlerRequest<EmptyRequestBody, JarRunMessageParameters> runRequest = new HandlerRequest<>( + EmptyRequestBody.getInstance(), + runParameters, + Collections.singletonMap(runParameters.jarIdPathParameter.getKey(), jarName), + Collections.emptyMap(), + Collections.emptyList()); + return handler.handleRequest(runRequest, restfulGateway) + .get(); + } + + private static void deleteJar(JarDeleteHandler handler, String jarName, RestfulGateway restfulGateway) throws Exception { + JarDeleteMessageParameters deleteParameters = JarDeleteHeaders.getInstance().getUnresolvedMessageParameters(); + HandlerRequest<EmptyRequestBody, JarDeleteMessageParameters> deleteRequest = new HandlerRequest<>( + EmptyRequestBody.getInstance(), + deleteParameters, + Collections.singletonMap(deleteParameters.jarIdPathParameter.getKey(), jarName), + Collections.emptyMap(), + Collections.emptyList()); + handler.handleRequest(deleteRequest, restfulGateway) + .get(); + } + + private static class JarHandlers { + final JarUploadHandler uploadHandler; + final JarListHandler listHandler; + final JarPlanHandler planHandler; + final JarRunHandler runHandler; + final JarDeleteHandler deleteHandler; + + JarHandlers(final Path jarDir, final TestingDispatcherGateway restfulGateway) { + final GatewayRetriever<TestingDispatcherGateway> gatewayRetriever = () -> CompletableFuture.completedFuture(restfulGateway); + final CompletableFuture<String> localAddressFuture = CompletableFuture.completedFuture("shazam://localhost:12345"); + final Time timeout = Time.seconds(10); + final Map<String, String> responseHeaders = Collections.emptyMap(); + final Executor executor = TestingUtils.defaultExecutor(); + + uploadHandler = new JarUploadHandler( + localAddressFuture, + gatewayRetriever, + timeout, + responseHeaders, + JarUploadHeaders.getInstance(), + jarDir, + executor); + + listHandler = new JarListHandler( + localAddressFuture, + gatewayRetriever, + timeout, + responseHeaders, + JarListHeaders.getInstance(), + jarDir.toFile(), + executor); + + planHandler = new JarPlanHandler( + localAddressFuture, + gatewayRetriever, + timeout, + responseHeaders, + JarPlanHeaders.getInstance(), + jarDir, + new Configuration(), + executor); + + runHandler = new JarRunHandler( + localAddressFuture, + gatewayRetriever, + timeout, + responseHeaders, + JarRunHeaders.getInstance(), + jarDir, + new Configuration(), + executor); + + deleteHandler = new JarDeleteHandler( + localAddressFuture, + gatewayRetriever, + timeout, + responseHeaders, + JarDeleteHeaders.getInstance(), + jarDir, + executor); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1386f91b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/TestProgram.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/TestProgram.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/TestProgram.java new file mode 100644 index 0000000..19d4678 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/TestProgram.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers.utils; + +import org.apache.flink.api.java.ExecutionEnvironment; + +/** + * Simple test program. + */ +public class TestProgram { + public static void main(String[] args) throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.fromElements("hello", "world").print(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1386f91b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java index 7263b36..965702d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.rest.handler.job.JobPlanHandler; import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; @@ -55,6 +56,11 @@ public class JobPlanInfo implements ResponseBody { this.jsonPlan = jsonPlan; } + @JsonIgnore + public String getJsonPlan() { + return jsonPlan.json; + } + @Override public boolean equals(Object o) { if (this == o) { http://git-wip-us.apache.org/repos/asf/flink/blob/1386f91b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java new file mode 100644 index 0000000..080ecf8 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.VoidBlobStore; + +import org.junit.rules.ExternalResource; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * A simple {@link ExternalResource} to be used by tests that require a {@link BlobServer}. + */ +public class BlobServerResource extends ExternalResource { + private static final Logger LOG = LoggerFactory.getLogger(BlobServerResource.class); + private final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private BlobServer blobServer; + + protected void before() throws Throwable { + temporaryFolder.create(); + + Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + + blobServer = new BlobServer(config, new VoidBlobStore()); + blobServer.start(); + } + + protected void after() { + temporaryFolder.delete(); + + try { + blobServer.close(); + } catch (IOException e) { + LOG.error("Exception while shutting down blob server.", e); + } + } + + public int getBlobServerPort() { + return blobServer.getPort(); + } +}