Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6203#discussion_r199198055
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
---
@@ -71,20 +112,131 @@ public void testSerializationFailureHandling() throws
Exception {
@Test
public void testSuccessfulJobSubmission() throws Exception {
+ final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+ try (OutputStream fileOut =
Files.newOutputStream(jobGraphFile)) {
+ try (ObjectOutputStream objectOut = new
ObjectOutputStream(fileOut)) {
+ objectOut.writeObject(new JobGraph("testjob"));
+ }
+ }
+
DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+ when(mockGateway.getHostname()).thenReturn("localhost");
+
when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(blobServer.getPort()));
when(mockGateway.submitJob(any(JobGraph.class),
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
GatewayRetriever<DispatcherGateway> mockGatewayRetriever =
mock(GatewayRetriever.class);
JobSubmitHandler handler = new JobSubmitHandler(
CompletableFuture.completedFuture("http://localhost:1234"),
mockGatewayRetriever,
RpcUtils.INF_TIMEOUT,
- Collections.emptyMap());
+ Collections.emptyMap(),
+ TestingUtils.defaultExecutor());
- JobGraph job = new JobGraph("testjob");
- JobSubmitRequestBody request = new JobSubmitRequestBody(job);
+ JobSubmitRequestBody request = new
JobSubmitRequestBody(jobGraphFile.getFileName().toString(),
Collections.emptyList(), Collections.emptyList());
- handler.handleRequest(new HandlerRequest<>(request,
EmptyMessageParameters.getInstance()), mockGateway)
+ handler.handleRequest(new HandlerRequest<>(request,
EmptyMessageParameters.getInstance(), Collections.emptyMap(),
Collections.emptyMap(), Collections.singleton(jobGraphFile.toFile())),
mockGateway)
.get();
}
+
+ @Test
+ public void testRejectionOnCountMismatch() throws Exception {
+ final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+ try (OutputStream fileOut =
Files.newOutputStream(jobGraphFile)) {
+ try (ObjectOutputStream objectOut = new
ObjectOutputStream(fileOut)) {
+ objectOut.writeObject(new JobGraph("testjob"));
+ }
+ }
+ final Path countExceedingFile =
TEMPORARY_FOLDER.newFile().toPath();
+
+ DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+ when(mockGateway.getHostname()).thenReturn("localhost");
+
when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(blobServer.getPort()));
+ when(mockGateway.submitJob(any(JobGraph.class),
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
--- End diff --
This is duplicate code, can we avoid it?
---