dmvk commented on a change in pull request #18189:
URL: https://github.com/apache/flink/pull/18189#discussion_r791168425
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
##########
@@ -55,14 +57,16 @@ public MiniDispatcher(
RpcService rpcService,
DispatcherId fencingToken,
DispatcherServices dispatcherServices,
- JobGraph jobGraph,
+ @Nullable JobGraph jobGraph,
+ @Nullable JobResult recoveredDirtyJob,
DispatcherBootstrapFactory dispatcherBootstrapFactory,
JobClusterEntrypoint.ExecutionMode executionMode)
throws Exception {
super(
rpcService,
fencingToken,
- Collections.singleton(jobGraph),
+ CollectionUtil.fromNullable(jobGraph),
Review comment:
```suggestion
CollectionUtil.ofNullable(jobGraph),
```
to be consistent with other java apis (eg. `Optional.ofNullable`)
##########
File path:
flink-clients/src/test/java/org/apache/flink/client/testjar/ErrorHandlingSubmissionJob.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.testjar;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.client.cli.CliFrontendTestUtils;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.util.FlinkException;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.net.MalformedURLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * {@code ErrorHandlingSubmissionJob} provides a factory method for creating a
{@link
+ * PackagedProgram} that monitors the job submission within the job's {@code
main} method.
+ */
+public class ErrorHandlingSubmissionJob {
+
+ private static final AtomicReference<Exception> SUBMISSION_EXCEPTION = new
AtomicReference<>();
+
+ /**
+ * {@code ExpectedFailingInMainException} is the failure being thrown when
executing the {@code
+ * main} method of this {@code ErrorHandlingSubmissionJob}.
+ */
+ public static class ExpectedFailingInMainException extends Exception {
Review comment:
unused
##########
File path:
flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java
##########
@@ -147,6 +157,67 @@ public void
testDispatcherRecoversAfterLosingAndRegainingLeadership() throws Exc
}
}
+ @Test
+ public void testDirtyJobResultRecoveryInApplicationMode() throws Exception
{
+ final Deadline deadline = Deadline.fromNow(TIMEOUT);
+ final Configuration configuration = new Configuration();
+ configuration.set(HighAvailabilityOptions.HA_MODE,
HighAvailabilityMode.ZOOKEEPER.name());
+ configuration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME);
+ configuration.set(ClientOptions.CLIENT_RETRY_PERIOD,
Duration.ofMillis(100));
+ final TestingMiniClusterConfiguration clusterConfiguration =
+ TestingMiniClusterConfiguration.newBuilder()
+ .setConfiguration(configuration)
+ .build();
+
+ // having a dirty entry in the JobResultStore should make the
ApplicationDispatcherBootstrap
+ // implementation fail to submit the job
+ final JobResultStore jobResultStore = new EmbeddedJobResultStore();
+ jobResultStore.createDirtyResult(
+ new JobResultEntry(
+ new JobResult.Builder()
+
.jobId(ApplicationDispatcherBootstrap.ZERO_JOB_ID)
+ .applicationStatus(ApplicationStatus.SUCCEEDED)
+ .netRuntime(1)
+ .build()));
+ final EmbeddedHaServicesWithLeadershipControl haServices =
+ new
EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor()) {
+
+ @Override
+ public JobResultStore getJobResultStore() {
+ return jobResultStore;
+ }
+ };
+
+ final TestingMiniCluster.Builder clusterBuilder =
+ TestingMiniCluster.newBuilder(clusterConfiguration)
+ .setHighAvailabilityServicesSupplier(() -> haServices)
+ .setDispatcherResourceManagerComponentFactorySupplier(
+
createApplicationModeDispatcherResourceManagerComponentFactorySupplier(
+
clusterConfiguration.getConfiguration(),
+
ErrorHandlingSubmissionJob.createPackagedProgram()));
+ try (final MiniCluster cluster = clusterBuilder.build()) {
+ // start mini cluster and submit the job
+ cluster.start();
+
+ // the cluster should shut down automatically once the application
completes
+ awaitClusterStopped(cluster, deadline);
+ }
+
+
FlinkAssertions.assertThatChainOfCauses(ErrorHandlingSubmissionJob.getSubmissionException())
+ .as(
+ "The job's main method shouldn't have been succeeded
due to a DuplicateJobSubmissionException.")
+
.hasAtLeastOneElementOfType(DuplicateJobSubmissionException.class);
+
+ assertThat(
+ jobResultStore.hasDirtyJobResultEntry(
+ ApplicationDispatcherBootstrap.ZERO_JOB_ID))
+ .isTrue();
+ assertThat(
+ jobResultStore.hasCleanJobResultEntry(
+ ApplicationDispatcherBootstrap.ZERO_JOB_ID))
+ .isFalse();
Review comment:
Should we test that we've performed an actual cleanup? Basically we
should transition from dirty to clean state here before the cluster is allowed
to shut down.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
##########
@@ -38,13 +40,21 @@ public MiniDispatcher createDispatcher(
RpcService rpcService,
DispatcherId fencingToken,
Collection<JobGraph> recoveredJobs,
+ Collection<JobResult> recoveredDirtyJobResults,
DispatcherBootstrapFactory dispatcherBootstrapFactory,
- PartialDispatcherServicesWithJobGraphStore
partialDispatcherServicesWithJobGraphStore)
+ PartialDispatcherServicesWithJobPersistenceComponents
+ partialDispatcherServicesWithJobPersistenceComponents)
throws Exception {
- final JobGraph jobGraph = Iterables.getOnlyElement(recoveredJobs);
+ final Optional<JobGraph> recoveredJobGraph =
CollectionUtil.atMostOnce(recoveredJobs);
+ final Optional<JobResult> recoveredDirtyJob =
+ CollectionUtil.atMostOnce(recoveredDirtyJobResults);
+
+ Preconditions.checkArgument(
+ recoveredJobGraph.isPresent() != recoveredDirtyJob.isPresent(),
+ "Either the JobGraph or the recovered JobResult needs to be
specified.");
Review comment:
Should we reuse the existing primitives here instead of introducing new
ones?
```suggestion
@Nullable
final JobGraph jobGraph = Iterables.getOnlyElement(recoveredJobs,
null);
@Nullable
final JobResult jobResult =
Iterables.getOnlyElement(recoveredDirtyJobResults, null);
Preconditions.checkArgument(
jobGraph != null ^ jobResult != null,
"Either the JobGraph or the recovered JobResult needs to be
specified.");
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java
##########
@@ -52,22 +69,70 @@ public DispatcherLeaderProcessFactory createFactory(
try {
jobGraph =
- jobGraphRetriever.retrieveJobGraph(
- partialDispatcherServices.getConfiguration());
+ Preconditions.checkNotNull(
+ jobGraphRetriever.retrieveJobGraph(
+
partialDispatcherServices.getConfiguration()));
} catch (FlinkException e) {
throw new FlinkRuntimeException("Could not retrieve the
JobGraph.", e);
}
+ final JobResultStore jobResultStore =
jobPersistenceComponentFactory.createJobResultStore();
+ final Collection<JobResult> recoveredDirtyJobResults =
getDirtyJobResults(jobResultStore);
+
+ final Optional<JobResult> recoveredDirtyJobResultOptional =
+ extractDirtyJobResult(recoveredDirtyJobResults, jobGraph);
+ final Optional<JobGraph> jobGraphOptional =
+ getJobGraphBasedOnDirtyJobResults(jobGraph,
recoveredDirtyJobResults);
+
final DefaultDispatcherGatewayServiceFactory
defaultDispatcherServiceFactory =
new DefaultDispatcherGatewayServiceFactory(
JobDispatcherFactory.INSTANCE, rpcService,
partialDispatcherServices);
return new JobDispatcherLeaderProcessFactory(
- defaultDispatcherServiceFactory, jobGraph, fatalErrorHandler);
+ defaultDispatcherServiceFactory,
+ jobGraphOptional.orElse(null),
+ recoveredDirtyJobResultOptional.orElse(null),
+ jobResultStore,
+ fatalErrorHandler);
}
public static JobDispatcherLeaderProcessFactoryFactory create(
JobGraphRetriever jobGraphRetriever) {
return new JobDispatcherLeaderProcessFactoryFactory(jobGraphRetriever);
}
+
+ private static Collection<JobResult> getDirtyJobResults(JobResultStore
jobResultStore) {
+ try {
+ return jobResultStore.getDirtyResults();
+ } catch (IOException e) {
+ throw new FlinkRuntimeException(
+ "Could not retrieve the JobResults of dirty jobs from the
underlying JobResultStore.",
+ e);
+ }
+ }
+
+ private static Optional<JobResult> extractDirtyJobResult(
+ Collection<JobResult> dirtyJobResults, JobGraph jobGraph) {
+ Optional<JobResult> actualDirtyJobResult = Optional.empty();
+ for (JobResult dirtyJobResult : dirtyJobResults) {
+ if (dirtyJobResult.getJobId().equals(jobGraph.getJobID())) {
+ actualDirtyJobResult = Optional.of(dirtyJobResult);
+ } else {
+ LOG.warn(
+ "Unexpected dirty JobResultStore entry: Job '{}' is
listed as dirty, isn't part of this single-job cluster, though.",
Review comment:
should we allow this?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java
##########
@@ -52,22 +69,70 @@ public DispatcherLeaderProcessFactory createFactory(
try {
jobGraph =
- jobGraphRetriever.retrieveJobGraph(
- partialDispatcherServices.getConfiguration());
+ Preconditions.checkNotNull(
+ jobGraphRetriever.retrieveJobGraph(
+
partialDispatcherServices.getConfiguration()));
} catch (FlinkException e) {
throw new FlinkRuntimeException("Could not retrieve the
JobGraph.", e);
}
+ final JobResultStore jobResultStore =
jobPersistenceComponentFactory.createJobResultStore();
+ final Collection<JobResult> recoveredDirtyJobResults =
getDirtyJobResults(jobResultStore);
Review comment:
seems that assumption that there is at most a single dirty result per
job cluster might simplify things
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java
##########
@@ -52,22 +69,70 @@ public DispatcherLeaderProcessFactory createFactory(
try {
jobGraph =
- jobGraphRetriever.retrieveJobGraph(
- partialDispatcherServices.getConfiguration());
+ Preconditions.checkNotNull(
+ jobGraphRetriever.retrieveJobGraph(
+
partialDispatcherServices.getConfiguration()));
} catch (FlinkException e) {
throw new FlinkRuntimeException("Could not retrieve the
JobGraph.", e);
}
+ final JobResultStore jobResultStore =
jobPersistenceComponentFactory.createJobResultStore();
+ final Collection<JobResult> recoveredDirtyJobResults =
getDirtyJobResults(jobResultStore);
+
+ final Optional<JobResult> recoveredDirtyJobResultOptional =
Review comment:
nit, I'd try to stick with `maybeXXX` naming convention for optionals
```suggestion
final Optional<JobResult> maybeDirtyJobResult =
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]