[
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16527831#comment-16527831
]
ASF GitHub Bot commented on FLINK-9280:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6203#discussion_r199199366
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
---
@@ -71,20 +112,131 @@ public void testSerializationFailureHandling() throws
Exception {
@Test
public void testSuccessfulJobSubmission() throws Exception {
+ final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+ try (OutputStream fileOut =
Files.newOutputStream(jobGraphFile)) {
+ try (ObjectOutputStream objectOut = new
ObjectOutputStream(fileOut)) {
+ objectOut.writeObject(new JobGraph("testjob"));
+ }
+ }
+
DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+ when(mockGateway.getHostname()).thenReturn("localhost");
+
when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(blobServer.getPort()));
when(mockGateway.submitJob(any(JobGraph.class),
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
GatewayRetriever<DispatcherGateway> mockGatewayRetriever =
mock(GatewayRetriever.class);
JobSubmitHandler handler = new JobSubmitHandler(
CompletableFuture.completedFuture("http://localhost:1234"),
mockGatewayRetriever,
RpcUtils.INF_TIMEOUT,
- Collections.emptyMap());
+ Collections.emptyMap(),
+ TestingUtils.defaultExecutor());
- JobGraph job = new JobGraph("testjob");
- JobSubmitRequestBody request = new JobSubmitRequestBody(job);
+ JobSubmitRequestBody request = new
JobSubmitRequestBody(jobGraphFile.getFileName().toString(),
Collections.emptyList(), Collections.emptyList());
- handler.handleRequest(new HandlerRequest<>(request,
EmptyMessageParameters.getInstance()), mockGateway)
+ handler.handleRequest(new HandlerRequest<>(request,
EmptyMessageParameters.getInstance(), Collections.emptyMap(),
Collections.emptyMap(), Collections.singleton(jobGraphFile.toFile())),
mockGateway)
.get();
}
+
+ @Test
+ public void testRejectionOnCountMismatch() throws Exception {
+ final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+ try (OutputStream fileOut =
Files.newOutputStream(jobGraphFile)) {
+ try (ObjectOutputStream objectOut = new
ObjectOutputStream(fileOut)) {
+ objectOut.writeObject(new JobGraph("testjob"));
+ }
+ }
+ final Path countExceedingFile =
TEMPORARY_FOLDER.newFile().toPath();
+
+ DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+ when(mockGateway.getHostname()).thenReturn("localhost");
+
when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(blobServer.getPort()));
+ when(mockGateway.submitJob(any(JobGraph.class),
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+ GatewayRetriever<DispatcherGateway> mockGatewayRetriever =
mock(GatewayRetriever.class);
+
+ JobSubmitHandler handler = new JobSubmitHandler(
+
CompletableFuture.completedFuture("http://localhost:1234"),
+ mockGatewayRetriever,
+ RpcUtils.INF_TIMEOUT,
+ Collections.emptyMap(),
+ TestingUtils.defaultExecutor());
+
+ JobSubmitRequestBody request = new
JobSubmitRequestBody(jobGraphFile.getFileName().toString(),
Collections.emptyList(), Collections.emptyList());
+
+ try {
+ handler.handleRequest(new HandlerRequest<>(request,
EmptyMessageParameters.getInstance(), Collections.emptyMap(),
Collections.emptyMap(), Arrays.asList(jobGraphFile.toFile(),
countExceedingFile.toFile())), mockGateway)
+ .get();
+ } catch (Exception e) {
+ ExceptionUtils.findThrowable(e, candidate -> candidate
instanceof RestHandlerException && candidate.getMessage().contains("count"));
+ }
+ }
+
+ @Test
+ public void testFileHandling() throws Exception {
+ final String dcEntryName = "entry";
+
+ CompletableFuture<JobGraph> submittedJobGraphFuture = new
CompletableFuture<>();
+ DispatcherGateway dispatcherGateway = new
TestingDispatcherGateway.Builder()
+ .setBlobServerPort(blobServer.getPort())
+ .setSubmitFunction(submittedJobGraph -> {
+
submittedJobGraphFuture.complete(submittedJobGraph);
+ return
CompletableFuture.completedFuture(Acknowledge.get());
+ })
+ .build();
+
+ GatewayRetriever<DispatcherGateway> gatewayRetriever = new
TestGatewayRetriever(dispatcherGateway);
+
+ JobSubmitHandler handler = new JobSubmitHandler(
+
CompletableFuture.completedFuture("http://localhost:1234"),
+ gatewayRetriever,
+ RpcUtils.INF_TIMEOUT,
+ Collections.emptyMap(),
+ TestingUtils.defaultExecutor());
+
+ final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+ final Path jarFile = TEMPORARY_FOLDER.newFile().toPath();
+ final Path artifactFile = TEMPORARY_FOLDER.newFile().toPath();
+
+ final JobGraph jobGraph = new JobGraph();
+ // the entry that should be updated
+ jobGraph.addUserArtifact(dcEntryName, new
DistributedCache.DistributedCacheEntry("random", false));
+ try (OutputStream fileOut =
Files.newOutputStream(jobGraphFile)) {
+ try (ObjectOutputStream objectOut = new
ObjectOutputStream(fileOut)) {
+ objectOut.writeObject(jobGraph);
+ }
+ }
+
+ JobSubmitRequestBody request = new JobSubmitRequestBody(
+ jobGraphFile.getFileName().toString(),
+
Collections.singletonList(jarFile.getFileName().toString()),
+ Collections.singleton(new
JobSubmitRequestBody.DistributedCacheFile(dcEntryName,
artifactFile.getFileName().toString())));
+
+ handler.handleRequest(new HandlerRequest<>(
+ request,
+ EmptyMessageParameters.getInstance(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Arrays.asList(jobGraphFile.toFile(),
jarFile.toFile(), artifactFile.toFile())), dispatcherGateway)
+ .get();
+
+ Assert.assertTrue("No JobGraph was submitted.",
submittedJobGraphFuture.isDone());
+ final JobGraph submittedJobGraph =
submittedJobGraphFuture.get();
+ Assert.assertEquals(1,
submittedJobGraph.getUserJarBlobKeys().size());
+ Assert.assertEquals(1,
submittedJobGraph.getUserArtifacts().size());
+
Assert.assertNotNull(submittedJobGraph.getUserArtifacts().get(dcEntryName).blobKey);
--- End diff --
Just a side note, hamcrest offers a bit more expressive assertions which
generate in many cases better failure messages. E.g.,
`assertThat(submittedJobGraph.getUserArtifacts(), hasSize(1))`.
> Extend JobSubmitHandler to accept jar files
> -------------------------------------------
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
> Issue Type: New Feature
> Components: Job-Submission, REST
> Affects Versions: 1.5.0
> Reporter: Chesnay Schepler
> Assignee: Chesnay Schepler
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob
> server, sets the blob keys in the jobgraph, and then uploads this graph to
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an
> optional list of jar files, that were previously uploaded through the
> {{JarUploadHandler}}. If present, the handler would upload these jars to the
> blobserver and set the blob keys.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)