dmvk commented on a change in pull request #18189:
URL: https://github.com/apache/flink/pull/18189#discussion_r791168425



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
##########
@@ -55,14 +57,16 @@ public MiniDispatcher(
             RpcService rpcService,
             DispatcherId fencingToken,
             DispatcherServices dispatcherServices,
-            JobGraph jobGraph,
+            @Nullable JobGraph jobGraph,
+            @Nullable JobResult recoveredDirtyJob,
             DispatcherBootstrapFactory dispatcherBootstrapFactory,
             JobClusterEntrypoint.ExecutionMode executionMode)
             throws Exception {
         super(
                 rpcService,
                 fencingToken,
-                Collections.singleton(jobGraph),
+                CollectionUtil.fromNullable(jobGraph),

Review comment:
       ```suggestion
                   CollectionUtil.ofNullable(jobGraph),
   ```
   to be consistent with other java apis (eg. `Optional.ofNullable`)

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/testjar/ErrorHandlingSubmissionJob.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.testjar;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.client.cli.CliFrontendTestUtils;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.util.FlinkException;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.net.MalformedURLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * {@code ErrorHandlingSubmissionJob} provides a factory method for creating a 
{@link
+ * PackagedProgram} that monitors the job submission within the job's {@code 
main} method.
+ */
+public class ErrorHandlingSubmissionJob {
+
+    private static final AtomicReference<Exception> SUBMISSION_EXCEPTION = new 
AtomicReference<>();
+
+    /**
+     * {@code ExpectedFailingInMainException} is the failure being thrown when 
executing the {@code
+     * main} method of this {@code ErrorHandlingSubmissionJob}.
+     */
+    public static class ExpectedFailingInMainException extends Exception {

Review comment:
       unused

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java
##########
@@ -147,6 +157,67 @@ public void 
testDispatcherRecoversAfterLosingAndRegainingLeadership() throws Exc
         }
     }
 
+    @Test
+    public void testDirtyJobResultRecoveryInApplicationMode() throws Exception 
{
+        final Deadline deadline = Deadline.fromNow(TIMEOUT);
+        final Configuration configuration = new Configuration();
+        configuration.set(HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name());
+        configuration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME);
+        configuration.set(ClientOptions.CLIENT_RETRY_PERIOD, 
Duration.ofMillis(100));
+        final TestingMiniClusterConfiguration clusterConfiguration =
+                TestingMiniClusterConfiguration.newBuilder()
+                        .setConfiguration(configuration)
+                        .build();
+
+        // having a dirty entry in the JobResultStore should make the 
ApplicationDispatcherBootstrap
+        // implementation fail to submit the job
+        final JobResultStore jobResultStore = new EmbeddedJobResultStore();
+        jobResultStore.createDirtyResult(
+                new JobResultEntry(
+                        new JobResult.Builder()
+                                
.jobId(ApplicationDispatcherBootstrap.ZERO_JOB_ID)
+                                .applicationStatus(ApplicationStatus.SUCCEEDED)
+                                .netRuntime(1)
+                                .build()));
+        final EmbeddedHaServicesWithLeadershipControl haServices =
+                new 
EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor()) {
+
+                    @Override
+                    public JobResultStore getJobResultStore() {
+                        return jobResultStore;
+                    }
+                };
+
+        final TestingMiniCluster.Builder clusterBuilder =
+                TestingMiniCluster.newBuilder(clusterConfiguration)
+                        .setHighAvailabilityServicesSupplier(() -> haServices)
+                        .setDispatcherResourceManagerComponentFactorySupplier(
+                                
createApplicationModeDispatcherResourceManagerComponentFactorySupplier(
+                                        
clusterConfiguration.getConfiguration(),
+                                        
ErrorHandlingSubmissionJob.createPackagedProgram()));
+        try (final MiniCluster cluster = clusterBuilder.build()) {
+            // start mini cluster and submit the job
+            cluster.start();
+
+            // the cluster should shut down automatically once the application 
completes
+            awaitClusterStopped(cluster, deadline);
+        }
+
+        
FlinkAssertions.assertThatChainOfCauses(ErrorHandlingSubmissionJob.getSubmissionException())
+                .as(
+                        "The job's main method shouldn't have been succeeded 
due to a DuplicateJobSubmissionException.")
+                
.hasAtLeastOneElementOfType(DuplicateJobSubmissionException.class);
+
+        assertThat(
+                        jobResultStore.hasDirtyJobResultEntry(
+                                ApplicationDispatcherBootstrap.ZERO_JOB_ID))
+                .isTrue();
+        assertThat(
+                        jobResultStore.hasCleanJobResultEntry(
+                                ApplicationDispatcherBootstrap.ZERO_JOB_ID))
+                .isFalse();

Review comment:
       Should we test that we've performed an actual cleanup? Basically we 
should transition from dirty to clean state here before the cluster is allowed 
to shut down.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
##########
@@ -38,13 +40,21 @@ public MiniDispatcher createDispatcher(
             RpcService rpcService,
             DispatcherId fencingToken,
             Collection<JobGraph> recoveredJobs,
+            Collection<JobResult> recoveredDirtyJobResults,
             DispatcherBootstrapFactory dispatcherBootstrapFactory,
-            PartialDispatcherServicesWithJobGraphStore 
partialDispatcherServicesWithJobGraphStore)
+            PartialDispatcherServicesWithJobPersistenceComponents
+                    partialDispatcherServicesWithJobPersistenceComponents)
             throws Exception {
-        final JobGraph jobGraph = Iterables.getOnlyElement(recoveredJobs);
+        final Optional<JobGraph> recoveredJobGraph = 
CollectionUtil.atMostOnce(recoveredJobs);
+        final Optional<JobResult> recoveredDirtyJob =
+                CollectionUtil.atMostOnce(recoveredDirtyJobResults);
+
+        Preconditions.checkArgument(
+                recoveredJobGraph.isPresent() != recoveredDirtyJob.isPresent(),
+                "Either the JobGraph or the recovered JobResult needs to be 
specified.");
 

Review comment:
       Should we reuse the existing primitives here instead of introducing new 
ones?
   ```suggestion
           @Nullable
           final JobGraph jobGraph = Iterables.getOnlyElement(recoveredJobs, 
null);
           @Nullable
           final JobResult jobResult = 
Iterables.getOnlyElement(recoveredDirtyJobResults, null);
           Preconditions.checkArgument(
                   jobGraph != null ^ jobResult != null,
                   "Either the JobGraph or the recovered JobResult needs to be 
specified.");
   
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java
##########
@@ -52,22 +69,70 @@ public DispatcherLeaderProcessFactory createFactory(
 
         try {
             jobGraph =
-                    jobGraphRetriever.retrieveJobGraph(
-                            partialDispatcherServices.getConfiguration());
+                    Preconditions.checkNotNull(
+                            jobGraphRetriever.retrieveJobGraph(
+                                    
partialDispatcherServices.getConfiguration()));
         } catch (FlinkException e) {
             throw new FlinkRuntimeException("Could not retrieve the 
JobGraph.", e);
         }
 
+        final JobResultStore jobResultStore = 
jobPersistenceComponentFactory.createJobResultStore();
+        final Collection<JobResult> recoveredDirtyJobResults = 
getDirtyJobResults(jobResultStore);
+
+        final Optional<JobResult> recoveredDirtyJobResultOptional =
+                extractDirtyJobResult(recoveredDirtyJobResults, jobGraph);
+        final Optional<JobGraph> jobGraphOptional =
+                getJobGraphBasedOnDirtyJobResults(jobGraph, 
recoveredDirtyJobResults);
+
         final DefaultDispatcherGatewayServiceFactory 
defaultDispatcherServiceFactory =
                 new DefaultDispatcherGatewayServiceFactory(
                         JobDispatcherFactory.INSTANCE, rpcService, 
partialDispatcherServices);
 
         return new JobDispatcherLeaderProcessFactory(
-                defaultDispatcherServiceFactory, jobGraph, fatalErrorHandler);
+                defaultDispatcherServiceFactory,
+                jobGraphOptional.orElse(null),
+                recoveredDirtyJobResultOptional.orElse(null),
+                jobResultStore,
+                fatalErrorHandler);
     }
 
     public static JobDispatcherLeaderProcessFactoryFactory create(
             JobGraphRetriever jobGraphRetriever) {
         return new JobDispatcherLeaderProcessFactoryFactory(jobGraphRetriever);
     }
+
+    private static Collection<JobResult> getDirtyJobResults(JobResultStore 
jobResultStore) {
+        try {
+            return jobResultStore.getDirtyResults();
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(
+                    "Could not retrieve the JobResults of dirty jobs from the 
underlying JobResultStore.",
+                    e);
+        }
+    }
+
+    private static Optional<JobResult> extractDirtyJobResult(
+            Collection<JobResult> dirtyJobResults, JobGraph jobGraph) {
+        Optional<JobResult> actualDirtyJobResult = Optional.empty();
+        for (JobResult dirtyJobResult : dirtyJobResults) {
+            if (dirtyJobResult.getJobId().equals(jobGraph.getJobID())) {
+                actualDirtyJobResult = Optional.of(dirtyJobResult);
+            } else {
+                LOG.warn(
+                        "Unexpected dirty JobResultStore entry: Job '{}' is 
listed as dirty, isn't part of this single-job cluster, though.",

Review comment:
       should we allow this?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java
##########
@@ -52,22 +69,70 @@ public DispatcherLeaderProcessFactory createFactory(
 
         try {
             jobGraph =
-                    jobGraphRetriever.retrieveJobGraph(
-                            partialDispatcherServices.getConfiguration());
+                    Preconditions.checkNotNull(
+                            jobGraphRetriever.retrieveJobGraph(
+                                    
partialDispatcherServices.getConfiguration()));
         } catch (FlinkException e) {
             throw new FlinkRuntimeException("Could not retrieve the 
JobGraph.", e);
         }
 
+        final JobResultStore jobResultStore = 
jobPersistenceComponentFactory.createJobResultStore();
+        final Collection<JobResult> recoveredDirtyJobResults = 
getDirtyJobResults(jobResultStore);

Review comment:
       seems that assumption that there is at most a single dirty result per 
job cluster might simplify things

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java
##########
@@ -52,22 +69,70 @@ public DispatcherLeaderProcessFactory createFactory(
 
         try {
             jobGraph =
-                    jobGraphRetriever.retrieveJobGraph(
-                            partialDispatcherServices.getConfiguration());
+                    Preconditions.checkNotNull(
+                            jobGraphRetriever.retrieveJobGraph(
+                                    
partialDispatcherServices.getConfiguration()));
         } catch (FlinkException e) {
             throw new FlinkRuntimeException("Could not retrieve the 
JobGraph.", e);
         }
 
+        final JobResultStore jobResultStore = 
jobPersistenceComponentFactory.createJobResultStore();
+        final Collection<JobResult> recoveredDirtyJobResults = 
getDirtyJobResults(jobResultStore);
+
+        final Optional<JobResult> recoveredDirtyJobResultOptional =

Review comment:
       nit, I'd try to stick with `maybeXXX` naming convention for optionals
   ```suggestion
           final Optional<JobResult> maybeDirtyJobResult =
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to