dmvk commented on a change in pull request #18189:
URL: https://github.com/apache/flink/pull/18189#discussion_r785912691
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -870,6 +871,7 @@ private void cleanUpRemainingJobData(JobID jobId, boolean
jobGraphRemoved) {
private void markJobAsClean(JobID jobId) {
try {
jobResultStore.markResultAsClean(jobId);
+ log.debug("Cleanup for job {} finished. Job was marked as clean.",
jobId);
Review comment:
nit
```suggestion
log.debug("Cleanup for the job '{}' has finished. Job has been
marked as clean.", jobId);
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -943,15 +945,26 @@ protected CleanupJobState
jobReachedTerminalState(ExecutionGraphInfo executionGr
archiveExecutionGraph(executionGraphInfo);
if (terminalJobStatus.isGloballyTerminalState()) {
+ final JobID jobId = executionGraphInfo.getJobId();
try {
- jobResultStore.createDirtyResult(
-
JobResult.createFrom(executionGraphInfo.getArchivedExecutionGraph()));
+ if (jobResultStore.hasCleanJobResultEntry(jobId)) {
+ log.warn(
+ "Job {} is already marked as clean but clean up
was triggered again.",
+ jobId);
+ } else if (!jobResultStore.hasDirtyJobResultEntry(jobId)) {
+ jobResultStore.createDirtyResult(
+ new JobResultEntry(
+ JobResult.createFrom(
+
executionGraphInfo.getArchivedExecutionGraph())));
+ log.debug("Job {} marked as dirty. Cleanup will be
triggered.", jobId);
+ }
} catch (IOException e) {
- log.error(
- "Could not un-register from high-availability services
job {}."
- + "Other JobManager's may attempt to recover
it and re-execute it.",
- executionGraphInfo.getJobId(),
- e);
+ fatalErrorHandler.onFatalError(
+ new FlinkException(
+ String.format(
+ "The job %s couldn't be marked as
pre-cleanup finished in JobResultStore.",
Review comment:
Should we rephrase it a bit? I think if I get this exception as an user,
I won't known what's going on.
Something along the lines of:
```
Failed to create a JobResultStore entry for the job '%s'. This is fatal as
Flink can not guarantee a proper cleanup lifecycle.
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedJobResultStore.java
##########
@@ -20,65 +20,112 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.highavailability.JobResultEntry;
import org.apache.flink.runtime.highavailability.JobResultStore;
+import org.apache.flink.runtime.highavailability.WithJobResult;
import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.util.Preconditions;
-import java.util.Collection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
-/**
- * An implementation of the {@link JobResultStore} which only persists the
data to an in-memory map.
- */
+/** A thread-safe in-memory implementation of the {@link JobResultStore}. */
public class EmbeddedJobResultStore implements JobResultStore {
- private final Map<JobID, JobResultEntry> inMemoryMap = new
ConcurrentHashMap<>();
+ private static final Logger LOG =
LoggerFactory.getLogger(EmbeddedJobResultStore.class);
+
+ private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
Review comment:
👍
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java
##########
@@ -58,12 +62,26 @@ public DispatcherLeaderProcessFactory createFactory(
throw new FlinkRuntimeException("Could not retrieve the
JobGraph.", e);
}
+ final JobResultStore jobResultStore =
jobPersistenceComponentFactory.createJobResultStore();
+ final Collection<JobResult> recoveredDirtyJobResults;
+ try {
+ recoveredDirtyJobResults = jobResultStore.getDirtyResults();
Review comment:
should we check that there is at most once result?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -868,6 +867,14 @@ private void cleanUpRemainingJobData(JobID jobId, boolean
jobGraphRemoved) {
blobServer.cleanupJob(jobId, jobGraphRemoved);
}
+ private void markJobAsClean(JobID jobId) {
+ try {
+ jobResultStore.markResultAsClean(jobId);
+ } catch (IOException e) {
+ log.warn("Could not properly mark job {} result as clean.", jobId,
e);
Review comment:
Maybe should we just re-throw a runtime exception here for now? So we
don't forget to fix it.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java
##########
@@ -130,28 +146,56 @@ public void
start_triggersJobGraphRecoveryAndDispatcherServiceCreation() throws
final CompletableFuture<Collection<JobGraph>> recoveredJobGraphsFuture
=
new CompletableFuture<>();
dispatcherServiceFactory =
- TestingDispatcherServiceFactory.newBuilder()
- .setCreateFunction(
- (fencingToken,
- recoveredJobGraphs,
- jobResults,
- jobGraphStore,
- jobResultStore) -> {
-
recoveredJobGraphsFuture.complete(recoveredJobGraphs);
- return
TestingDispatcherGatewayService.newBuilder().build();
- })
- .build();
+ createFactoryBasedOnJobGraphs(
+ recoveredJobGraphs -> {
+
recoveredJobGraphsFuture.complete(recoveredJobGraphs);
+ return
TestingDispatcherGatewayService.newBuilder().build();
+ });
try (final SessionDispatcherLeaderProcess dispatcherLeaderProcess =
createDispatcherLeaderProcess()) {
dispatcherLeaderProcess.start();
- assertThat(
- dispatcherLeaderProcess.getState(),
- is(SessionDispatcherLeaderProcess.State.RUNNING));
+ assertThat(recoveredJobGraphsFuture)
+ .succeedsWithin(100, TimeUnit.MILLISECONDS)
Review comment:
OT: We should rethink using, the `succeedsWithin` with assertj, I've run
into this multiple times already. We should introduce something along the lines
of `FlinkAssertions.assertThatSucceeds(future)....` with infinite timeout.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcess.java
##########
@@ -34,14 +37,21 @@
private final JobGraph jobGraph;
+ private final Collection<JobResult> recoveredDirtyJobResults;
+ private final JobResultStore jobResultStore;
+
JobDispatcherLeaderProcess(
UUID leaderSessionId,
DispatcherGatewayServiceFactory dispatcherGatewayServiceFactory,
JobGraph jobGraph,
+ Collection<JobResult> recoveredDirtyJobResults,
Review comment:
Would not having a collection here make sense? We already do the same
thing with the jobGraph (recoveredJobs collection in other implementations)
```suggestion
@Nullable JobResult recoveredDirtyJobResult,
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -943,15 +945,26 @@ protected CleanupJobState
jobReachedTerminalState(ExecutionGraphInfo executionGr
archiveExecutionGraph(executionGraphInfo);
if (terminalJobStatus.isGloballyTerminalState()) {
+ final JobID jobId = executionGraphInfo.getJobId();
try {
- jobResultStore.createDirtyResult(
-
JobResult.createFrom(executionGraphInfo.getArchivedExecutionGraph()));
+ if (jobResultStore.hasCleanJobResultEntry(jobId)) {
+ log.warn(
Review comment:
Should this be logged as a warning (at all)? What does this warning mean
from user perspective? Does the user need to take any action to address that?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java
##########
@@ -22,53 +22,88 @@
import org.apache.flink.runtime.jobmaster.JobResult;
import java.io.IOException;
-import java.util.Collection;
import java.util.NoSuchElementException;
+import java.util.Set;
/**
- * A persistent storage mechanism for the results of successfully and
unsuccessfully completed jobs.
+ * A storage for the results of globally terminated jobs. These results can
have the following
+ * states:
+ *
+ * <ul>
+ * <li>{@code dirty} - indicating that the corresponding job is not properly
cleaned up, yet.
+ * <li>{@code clean} - indicating that the cleanup of the corresponding job
is performed and no
+ * further actions need to be applied.
+ * </ul>
*/
public interface JobResultStore {
Review comment:
```suggestion
@Internal
public interface JobResultStore {
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultEntry.java
##########
@@ -22,41 +22,18 @@
import org.apache.flink.util.Preconditions;
/**
- * An entry in a {@link JobResultStore} that couples a completed {@link
JobResult} to a state that
- * represents whether the resources of that JobResult have been finalized
({@link
- * JobResultState#CLEAN}) or have yet to be finalized ({@link
JobResultState#DIRTY}).
+ * {@code JobResultEntry} is the entity managed by the {@link JobResultStore}.
It collects
+ * information about a globally terminated job (e.g. {@link JobResult}).
*/
-public class JobResultEntry {
+public class JobResultEntry implements WithJobResult {
Review comment:
Why do we need an extra `WithJobResult` interface?
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java
##########
@@ -118,7 +120,12 @@ public ApplicationDispatcherGatewayServiceFactory(
return DefaultDispatcherGatewayService.from(dispatcher);
}
- private List<JobID> getRecoveredJobIds(final Collection<JobGraph>
recoveredJobs) {
- return
recoveredJobs.stream().map(JobGraph::getJobID).collect(Collectors.toList());
+ private Set<JobID> getRecoveredJobIds(
+ final Collection<JobGraph> recoveredJobs,
+ final Collection<JobResult> recoveredDirtyJobs) {
+ return Stream.concat(
+ recoveredJobs.stream().map(JobGraph::getJobID),
+ recoveredDirtyJobs.stream().map(JobResult::getJobId))
+ .collect(Collectors.toSet());
Review comment:
This part I'm not sure about. Could you document why we need to pass
dirty jobs into ApplicationDispatcherBootstrap? Why is it not enough to just
pass them into the dispatcher.
(I'm not saying it's incorrect, just can't reason about it from top of my
head)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]