[
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511230#comment-16511230
]
ASF GitHub Bot commented on FLINK-9280:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6147#discussion_r195107887
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
---
@@ -71,20 +108,192 @@ public void testSerializationFailureHandling() throws
Exception {
@Test
public void testSuccessfulJobSubmission() throws Exception {
- DispatcherGateway mockGateway = mock(DispatcherGateway.class);
- when(mockGateway.submitJob(any(JobGraph.class),
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+ DispatcherGateway mockGateway = new
JobGraphCapturingMockGateway();
GatewayRetriever<DispatcherGateway> mockGatewayRetriever =
mock(GatewayRetriever.class);
JobSubmitHandler handler = new JobSubmitHandler(
CompletableFuture.completedFuture("http://localhost:1234"),
mockGatewayRetriever,
RpcUtils.INF_TIMEOUT,
- Collections.emptyMap());
+ Collections.emptyMap(),
+ new Configuration());
JobGraph job = new JobGraph("testjob");
JobSubmitRequestBody request = new JobSubmitRequestBody(job);
handler.handleRequest(new HandlerRequest<>(request,
EmptyMessageParameters.getInstance()), mockGateway)
.get();
}
+
+ @Test
+ public void testJarHandling() throws Exception {
+ final String jarName = "jar";
+
+ JobGraphCapturingMockGateway jobGraphCapturingMockGateway = new
JobGraphCapturingMockGateway();
+ GatewayRetriever<DispatcherGateway> mockGatewayRetriever =
mock(GatewayRetriever.class);
+
+ JobSubmitHandler handler = new JobSubmitHandler(
+
CompletableFuture.completedFuture("http://localhost:1234"),
+ mockGatewayRetriever,
+ RpcUtils.INF_TIMEOUT,
+ Collections.emptyMap(),
+ new Configuration());
+
+ Path tmp = TMP_FOLDER.newFolder().toPath();
+ Path clientStorageDirectory =
Files.createDirectory(tmp.resolve("client-storage-directory"));
+ Path serverStorageDirectory =
Files.createDirectory(tmp.resolve("server-storage-directory"));
+
+ Path jar = Paths.get(jarName);
+ Files.createFile(clientStorageDirectory.resolve(jar));
+ Files.createFile(serverStorageDirectory.resolve(jar));
+
+ JobGraph job = new JobGraph("testjob");
+ job.addJar(new org.apache.flink.core.fs.Path(jar.toUri()));
+ JobSubmitRequestBody serializedJobGraphBody = new
JobSubmitRequestBody(job);
+ JobSubmitRequestBody request = new
JobSubmitRequestBody(serializedJobGraphBody.serializedJobGraph,
Collections.singletonList(serverStorageDirectory.resolve(jar)),
Collections.emptyList(), serverStorageDirectory);
+
+ handler.handleRequest(new HandlerRequest<>(request,
EmptyMessageParameters.getInstance()), jobGraphCapturingMockGateway)
+ .get();
+
+ JobGraph submittedJobGraph =
jobGraphCapturingMockGateway.jobGraph;
+ List<org.apache.flink.core.fs.Path> userJars =
submittedJobGraph.getUserJars();
+
+ // ensure we haven't changed the total number of jars
+ Assert.assertEquals(1, userJars.size());
+
+ // this entry should be changed, a replacement jar exists in
the server storage directory
+ Assert.assertEquals(new
org.apache.flink.core.fs.Path(serverStorageDirectory.resolve(jar).toUri()),
userJars.get(0));
--- End diff --
I think updating `JobGraph#userJars` and `JobGraph#userArtifacts` is not
really necessary. Maybe we should even mark them `transient` in order to
emphasize that they won't be transmitted. Given that, I think we don't have to
do these tests.
> Extend JobSubmitHandler to accept jar files
> -------------------------------------------
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
> Issue Type: New Feature
> Components: Job-Submission, REST
> Affects Versions: 1.5.0
> Reporter: Chesnay Schepler
> Assignee: Chesnay Schepler
> Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob
> server, sets the blob keys in the jobgraph, and then uploads this graph to
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an
> optional list of jar files, that were previously uploaded through the
> {{JarUploadHandler}}. If present, the handler would upload these jars to the
> blobserver and set the blob keys.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)