zentol commented on a change in pull request #18416:
URL: https://github.com/apache/flink/pull/18416#discussion_r788642062



##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
##########
@@ -266,10 +271,22 @@ private void runApplicationEntryPoint(
             final Set<JobID> tolerateMissingResult,
             final DispatcherGateway dispatcherGateway,
             final ScheduledExecutor scheduledExecutor,
-            final boolean enforceSingleJobExecution) {
+            final boolean enforceSingleJobExecution,
+            final boolean submitFailedJobOnApplicationError) {
+        if (submitFailedJobOnApplicationError && !enforceSingleJobExecution) {
+            dispatcherGateway.submitFailedJob(
+                    ZERO_JOB_ID,
+                    FAILED_JOB_NAME,
+                    new IllegalStateException(
+                            String.format(
+                                    "Submission of failed job in case of an 
application error ('%s') is not supported in non-HA setups.",

Review comment:
       I'm confused. Submitting a failed job is not supported (for some 
reason), yet we are doing exactly that right here?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
##########
@@ -230,6 +234,10 @@ public DispatcherId getFencingToken() {
         private BiFunction<JobID, String, CompletableFuture<String>>
                 stopWithSavepointAndGetLocationFunction;
 
+        public Builder() {

Review comment:
       (package-)private so there's only one way to create the builder?

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/testjar/FailingJob.java
##########
@@ -0,0 +1,35 @@
+package org.apache.flink.client.testjar;

Review comment:
       missing license header

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
##########
@@ -92,7 +94,7 @@ public void cleanup() {
     @Test
     public void testExceptionThrownWhenApplicationContainsNoJobs() throws 
Throwable {
         final TestingDispatcherGateway.Builder dispatcherBuilder =
-                new TestingDispatcherGateway.Builder()
+                TestingDispatcherGateway.newBuilder()

Review comment:
       can we move these to the previous commit?

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java
##########
@@ -147,6 +153,57 @@ public void 
testDispatcherRecoversAfterLosingAndRegainingLeadership() throws Exc
         }
     }
 
+    @Test
+    public void testSubmitFailedJobOnApplicationError() throws Exception {
+        final Deadline deadline = Deadline.fromNow(TIMEOUT);
+        final JobID jobId = new JobID();
+        final Configuration configuration = new Configuration();
+        configuration.set(HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name());
+        configuration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME);
+        configuration.set(ClientOptions.CLIENT_RETRY_PERIOD, 
Duration.ofMillis(100));
+        configuration.set(DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH, 
false);
+        
configuration.set(DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR, 
true);
+        configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
jobId.toHexString());
+        final TestingMiniClusterConfiguration clusterConfiguration =
+                TestingMiniClusterConfiguration.newBuilder()
+                        .setConfiguration(configuration)
+                        .build();
+        final EmbeddedHaServicesWithLeadershipControl haServices =
+                new 
EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor());
+        final TestingMiniCluster.Builder clusterBuilder =
+                TestingMiniCluster.newBuilder(clusterConfiguration)
+                        .setHighAvailabilityServicesSupplier(() -> haServices)
+                        .setDispatcherResourceManagerComponentFactorySupplier(
+                                
createApplicationModeDispatcherResourceManagerComponentFactorySupplier(
+                                        
clusterConfiguration.getConfiguration(),
+                                        FailingJob.getProgram()));
+        try (final MiniCluster cluster = clusterBuilder.build()) {
+
+            // start mini cluster and submit the job
+            cluster.start();
+
+            // wait until job is running

Review comment:
       `// wait until job was submitted`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to