AHeise commented on a change in pull request #18043:
URL: https://github.com/apache/flink/pull/18043#discussion_r765007357



##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
##########
@@ -681,16 +725,20 @@ public void testDuplicateJobSubmissionWithRunningJobId() 
throws Throwable {
                                         FutureUtils.completedExceptionally(
                                                 
DuplicateJobSubmissionException.of(testJobID)));
         final CompletableFuture<Void> applicationFuture =
-                runApplication(dispatcherBuilder, configurationUnderTest, 1);
-        final ExecutionException executionException =
-                assertThrows(
-                        ExecutionException.class,
-                        () -> applicationFuture.get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS));
-        final Optional<DuplicateJobSubmissionException> maybeDuplicate =
-                ExceptionUtils.findThrowable(
-                        executionException, 
DuplicateJobSubmissionException.class);
-        assertTrue(maybeDuplicate.isPresent());
-        assertFalse(maybeDuplicate.get().isGloballyTerminated());
+                new TestApplicationDispatcherBootstrapBuilder()
+                        .setConfiguration(configurationUnderTest)
+                        .setDispatcherGateway(dispatcherBuilder.build())
+                        .build()
+                        .getApplicationCompletionFuture();
+        assertThatThrownBy(() -> applicationFuture.get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS))

Review comment:
       Please move assertj stuff into extra commit.

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
##########
@@ -55,24 +69,67 @@
     private final boolean suppressSysout;
 
     private final boolean enforceSingleJobExecution;
+    private final CheckpointConfig originalCheckpointConfig;
+    private final ExecutionConfig originalExecutionConfig;
+    private final Configuration originalConfiguration;
 
     private int jobCounter;
 
+    private final Collection<JobValidationError> errors;
+
+    private final boolean allowConfigurations;
+
     public StreamContextEnvironment(
             final PipelineExecutorServiceLoader executorServiceLoader,
             final Configuration configuration,
             final ClassLoader userCodeClassLoader,
             final boolean enforceSingleJobExecution,
             final boolean suppressSysout) {
+        this(
+                executorServiceLoader,
+                configuration,
+                userCodeClassLoader,
+                enforceSingleJobExecution,
+                suppressSysout,
+                true,
+                Collections.emptyList());
+    }
+
+    @Internal
+    public StreamContextEnvironment(
+            final PipelineExecutorServiceLoader executorServiceLoader,
+            final Configuration configuration,
+            final ClassLoader userCodeClassLoader,
+            final boolean enforceSingleJobExecution,
+            final boolean suppressSysout,
+            final boolean allowConfigurations,
+            final Collection<JobValidationError> errors) {
         super(executorServiceLoader, configuration, userCodeClassLoader);
         this.suppressSysout = suppressSysout;
         this.enforceSingleJobExecution = enforceSingleJobExecution;
-
+        this.allowConfigurations = allowConfigurations;
+        this.originalCheckpointConfig = new CheckpointConfig(checkpointCfg);
+        this.originalExecutionConfig = new ExecutionConfig(config);
+        this.originalConfiguration = configuration;
+        this.errors = errors;
         this.jobCounter = 0;
     }
 
     @Override
     public JobExecutionResult execute(StreamGraph streamGraph) throws 
Exception {
+        errors.addAll(checkNotAllowedConfigurations());
+        if (!errors.isEmpty()) {
+            // HACK: We shortcut the StreamGraph to jobgraph translation 
because we already
+            // know that the job needs to fail and can derive the jobId.
+
+            final JobID jobId =
+                    configuration
+                            
.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID)
+                            .map(JobID::fromHexString)
+                            .orElse(new JobID());
+            final String jobName = streamGraph.getJobName();
+            throw new JobStartupFailedException(jobId, jobName, errors);
+        }

Review comment:
       Move into check function. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to