Thesharing commented on code in PR #19275:
URL: https://github.com/apache/flink/pull/19275#discussion_r843515669
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1060,40 +1066,24 @@ protected CleanupJobState
jobReachedTerminalState(ExecutionGraphInfo executionGr
terminalJobStatus);
}
- archiveExecutionGraph(executionGraphInfo);
+ storeExecutionGraphInfo(executionGraphInfo);
if (terminalJobStatus.isGloballyTerminalState()) {
- final JobID jobId = executionGraphInfo.getJobId();
- try {
- if (jobResultStore.hasCleanJobResultEntry(jobId)) {
- log.warn(
- "Job {} is already marked as clean but clean up
was triggered again.",
- jobId);
- } else if (!jobResultStore.hasDirtyJobResultEntry(jobId)) {
- jobResultStore.createDirtyResult(
- new JobResultEntry(
- JobResult.createFrom(
-
executionGraphInfo.getArchivedExecutionGraph())));
- log.info(
- "Job {} has been registered for cleanup in the
JobResultStore after reaching a terminal state.",
- jobId);
- }
- } catch (IOException e) {
- fatalErrorHandler.onFatalError(
- new FlinkException(
- String.format(
- "The job %s couldn't be marked as
pre-cleanup finished in JobResultStore.",
- jobId),
- e));
- }
- }
- return terminalJobStatus.isGloballyTerminalState()
- ? CleanupJobState.GLOBAL
- : CleanupJobState.LOCAL;
+ // do not create an archive for suspended jobs, as this would
eventually lead to
+ // multiple archive attempts which we currently do not support
+ CompletableFuture<Acknowledge> archiveFuture =
+ archiveExecutionGraph(executionGraphInfo);
+
+ registerCleanupInJobResultStore(executionGraphInfo);
+
+ return archiveFuture.thenApplyAsync(ignored ->
CleanupJobState.GLOBAL);
+ } else {
+ return CompletableFuture.completedFuture(CleanupJobState.LOCAL);
+ }
}
- private void archiveExecutionGraph(ExecutionGraphInfo executionGraphInfo) {
+ private void storeExecutionGraphInfo(ExecutionGraphInfo
executionGraphInfo) {
Review Comment:
Thank you for pointing this out. I've changed `storeExecutionGraphInfo` into
`writeToExecutionGraphInfoStore`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1103,23 +1093,56 @@ private void archiveExecutionGraph(ExecutionGraphInfo
executionGraphInfo) {
executionGraphInfo.getArchivedExecutionGraph().getJobID(),
e);
}
+ }
+
+ private CompletableFuture<Acknowledge> archiveExecutionGraph(
+ ExecutionGraphInfo executionGraphInfo) {
// do not create an archive for suspended jobs, as this would
eventually lead to multiple
// archive attempts which we currently do not support
- if
(executionGraphInfo.getArchivedExecutionGraph().getState().isGloballyTerminalState())
{
- final CompletableFuture<Acknowledge> executionGraphFuture =
-
historyServerArchivist.archiveExecutionGraph(executionGraphInfo);
-
- executionGraphFuture.whenComplete(
- (Acknowledge ignored, Throwable throwable) -> {
- if (throwable != null) {
- log.info(
- "Could not archive completed job {}({}) to
the history server.",
-
executionGraphInfo.getArchivedExecutionGraph().getJobName(),
-
executionGraphInfo.getArchivedExecutionGraph().getJobID(),
- throwable);
- }
- });
+ final CompletableFuture<Acknowledge> executionGraphFuture =
+ FutureUtils.orTimeout(
+
historyServerArchivist.archiveExecutionGraph(executionGraphInfo),
+
configuration.get(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT),
+ TimeUnit.MILLISECONDS,
+ getMainThreadExecutor());
+
+ return executionGraphFuture.handle(
+ (Acknowledge ignored, Throwable throwable) -> {
+ if (throwable != null) {
+ log.info(
+ "Could not archive completed job {}({}) to the
history server.",
+
executionGraphInfo.getArchivedExecutionGraph().getJobName(),
+
executionGraphInfo.getArchivedExecutionGraph().getJobID(),
+ throwable);
+ }
+ return Acknowledge.get();
+ });
+ }
+
+ private void registerCleanupInJobResultStore(ExecutionGraphInfo
executionGraphInfo) {
Review Comment:
Thank you for pointing this out. This will guarantee that the function is
called correctly.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1103,23 +1093,56 @@ private void archiveExecutionGraph(ExecutionGraphInfo
executionGraphInfo) {
executionGraphInfo.getArchivedExecutionGraph().getJobID(),
e);
}
+ }
+
+ private CompletableFuture<Acknowledge> archiveExecutionGraph(
+ ExecutionGraphInfo executionGraphInfo) {
// do not create an archive for suspended jobs, as this would
eventually lead to multiple
// archive attempts which we currently do not support
Review Comment:
Yes, you're right. Sorry for being careless. Comments are removed.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -651,6 +661,104 @@ public void testFailingJobManagerRunnerCleanup() throws
Exception {
awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
}
+ @Test(timeout = 5000L)
Review Comment:
Thank you for pointing this out. I've removed all the timeouts I added in
this test.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -651,6 +661,104 @@ public void testFailingJobManagerRunnerCleanup() throws
Exception {
awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
}
+ @Test(timeout = 5000L)
+ public void testArchiveSuccessfullyWithinTimeout() throws Exception {
+
+ final Configuration configuration = new Configuration();
+
configuration.setLong(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT, 1000L);
+
+ final ExecutorService ioExecutor = Executors.newSingleThreadExecutor();
+
+ try {
+ final TestingHistoryServerArchivist archivist =
+ new TestingHistoryServerArchivist(ioExecutor, 50L);
+ final TestingJobMasterServiceLeadershipRunnerFactory
testingJobManagerRunnerFactory =
+ new TestingJobMasterServiceLeadershipRunnerFactory(0);
+ final TestingDispatcher.Builder testingDispatcherBuilder =
+ createTestingDispatcherBuilder()
+ .setHistoryServerArchivist(archivist)
+ .setConfiguration(configuration);
+ startDispatcher(testingDispatcherBuilder,
testingJobManagerRunnerFactory);
+
+ submitJobAndWait();
+ final TestingJobManagerRunner testingJobManagerRunner =
+
testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
+ finishJob(testingJobManagerRunner);
+
+ globalCleanupFuture.join();
+ dispatcher.getJobTerminationFuture(jobId,
Time.milliseconds(1000L)).join();
+
+ assertTrue(archivist.isArchived());
Review Comment:
Thank you for providing a better solution here. I've replaced the
`Thread.sleep` with the `CompletableFuture`.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -186,6 +193,9 @@ private void startDispatcher(
// because cleaning it will trigger the
closeAsync latch
// provided by TestingJobManagerRunner
.withLocallyCleanableResource(jobManagerRunnerRegistry)
+ .withGloballyCleanableResource(
+
DispatcherResourceCleanerFactory.ofLocalResource(
+ jobManagerRunnerRegistry))
Review Comment:
Thank you for pointing this out! I added this cleanup because
`Dispatcher#getJobTerminationFuture` used to throw an exception if the job is
still registered in `jobManagerRunnerRegistry`. Since a new method without the
checking is added, this should be removed.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -651,6 +661,104 @@ public void testFailingJobManagerRunnerCleanup() throws
Exception {
awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
}
+ @Test(timeout = 5000L)
+ public void testArchiveSuccessfullyWithinTimeout() throws Exception {
+
+ final Configuration configuration = new Configuration();
+
configuration.setLong(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT, 1000L);
+
+ final ExecutorService ioExecutor = Executors.newSingleThreadExecutor();
+
+ try {
+ final TestingHistoryServerArchivist archivist =
+ new TestingHistoryServerArchivist(ioExecutor, 50L);
+ final TestingJobMasterServiceLeadershipRunnerFactory
testingJobManagerRunnerFactory =
+ new TestingJobMasterServiceLeadershipRunnerFactory(0);
+ final TestingDispatcher.Builder testingDispatcherBuilder =
+ createTestingDispatcherBuilder()
+ .setHistoryServerArchivist(archivist)
+ .setConfiguration(configuration);
+ startDispatcher(testingDispatcherBuilder,
testingJobManagerRunnerFactory);
+
+ submitJobAndWait();
+ final TestingJobManagerRunner testingJobManagerRunner =
+
testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
+ finishJob(testingJobManagerRunner);
+
+ globalCleanupFuture.join();
+ dispatcher.getJobTerminationFuture(jobId,
Time.milliseconds(1000L)).join();
+
+ assertTrue(archivist.isArchived());
+ } finally {
+ ioExecutor.shutdownNow();
+ }
+ }
+
+ @Test(timeout = 5000L)
+ public void testArchiveFailedWhenTimeoutExceeded() throws Exception {
+ final Configuration configuration = new Configuration();
+
configuration.setLong(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT, 10L);
+
+ final ExecutorService ioExecutor = Executors.newSingleThreadExecutor();
+
+ try {
+ final TestingHistoryServerArchivist archivist =
+ new TestingHistoryServerArchivist(ioExecutor, 1000L);
+ final TestingJobMasterServiceLeadershipRunnerFactory
testingJobManagerRunnerFactory =
+ new TestingJobMasterServiceLeadershipRunnerFactory(0);
+ final TestingDispatcher.Builder testingDispatcherBuilder =
+ createTestingDispatcherBuilder()
+ .setHistoryServerArchivist(archivist)
+ .setConfiguration(configuration);
+ startDispatcher(testingDispatcherBuilder,
testingJobManagerRunnerFactory);
+
+ submitJobAndWait();
+ final TestingJobManagerRunner testingJobManagerRunner =
+
testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
+ finishJob(testingJobManagerRunner);
+
+ globalCleanupFuture.join();
+ dispatcher.getJobTerminationFuture(jobId,
Time.milliseconds(1000L)).join();
+
+ assertFalse(archivist.isArchived());
+ } finally {
+ ioExecutor.shutdownNow();
+ }
+ }
+
+ @Test(timeout = 5000L)
Review Comment:
Removed the timeout here.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1103,23 +1093,56 @@ private void archiveExecutionGraph(ExecutionGraphInfo
executionGraphInfo) {
executionGraphInfo.getArchivedExecutionGraph().getJobID(),
e);
}
+ }
+
+ private CompletableFuture<Acknowledge> archiveExecutionGraph(
+ ExecutionGraphInfo executionGraphInfo) {
// do not create an archive for suspended jobs, as this would
eventually lead to multiple
// archive attempts which we currently do not support
- if
(executionGraphInfo.getArchivedExecutionGraph().getState().isGloballyTerminalState())
{
- final CompletableFuture<Acknowledge> executionGraphFuture =
-
historyServerArchivist.archiveExecutionGraph(executionGraphInfo);
-
- executionGraphFuture.whenComplete(
- (Acknowledge ignored, Throwable throwable) -> {
- if (throwable != null) {
- log.info(
- "Could not archive completed job {}({}) to
the history server.",
-
executionGraphInfo.getArchivedExecutionGraph().getJobName(),
-
executionGraphInfo.getArchivedExecutionGraph().getJobID(),
- throwable);
- }
- });
+ final CompletableFuture<Acknowledge> executionGraphFuture =
+ FutureUtils.orTimeout(
+
historyServerArchivist.archiveExecutionGraph(executionGraphInfo),
+
configuration.get(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT),
+ TimeUnit.MILLISECONDS,
+ getMainThreadExecutor());
+
+ return executionGraphFuture.handle(
+ (Acknowledge ignored, Throwable throwable) -> {
+ if (throwable != null) {
+ log.info(
+ "Could not archive completed job {}({}) to the
history server.",
+
executionGraphInfo.getArchivedExecutionGraph().getJobName(),
+
executionGraphInfo.getArchivedExecutionGraph().getJobID(),
+ throwable);
+ }
+ return Acknowledge.get();
+ });
+ }
+
+ private void registerCleanupInJobResultStore(ExecutionGraphInfo
executionGraphInfo) {
Review Comment:
Thank you for offering this better name. Modified.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java:
##########
@@ -129,10 +129,12 @@ public MiniDispatcher(
}
@Override
- protected CleanupJobState jobReachedTerminalState(ExecutionGraphInfo
executionGraphInfo) {
+ protected CompletableFuture<CleanupJobState> jobReachedTerminalState(
+ ExecutionGraphInfo executionGraphInfo) {
final ArchivedExecutionGraph archivedExecutionGraph =
executionGraphInfo.getArchivedExecutionGraph();
- final CleanupJobState cleanupHAState =
super.jobReachedTerminalState(executionGraphInfo);
+ final CompletableFuture<CleanupJobState> cleanupHAState =
Review Comment:
I'm not sure about this... Originally, the `shutdownFuture` is completed
without waiting for completion of `jobReachedTerminalState`. Furthermore, as we
mentioned in
https://github.com/apache/flink/pull/19275#issuecomment-1085618588, should
`shutdownFuture` waits for the completion of all `jobTerminationFutures `
instead of the future returned by `jobReachedTerminalState`?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -651,6 +661,104 @@ public void testFailingJobManagerRunnerCleanup() throws
Exception {
awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
}
+ @Test(timeout = 5000L)
+ public void testArchiveSuccessfullyWithinTimeout() throws Exception {
Review Comment:
Thank you for pointing this out. I've tried to cleanup the duplicated codes
in these tests.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1103,23 +1093,56 @@ private void archiveExecutionGraph(ExecutionGraphInfo
executionGraphInfo) {
executionGraphInfo.getArchivedExecutionGraph().getJobID(),
e);
}
+ }
+
+ private CompletableFuture<Acknowledge> archiveExecutionGraph(
Review Comment:
Here I've changed `archiveExecutionGraph` into
`archiveExecutionGraphToHistoryServer`.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -651,6 +661,104 @@ public void testFailingJobManagerRunnerCleanup() throws
Exception {
awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
}
+ @Test(timeout = 5000L)
+ public void testArchiveSuccessfullyWithinTimeout() throws Exception {
+
+ final Configuration configuration = new Configuration();
+
configuration.setLong(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT, 1000L);
+
+ final ExecutorService ioExecutor = Executors.newSingleThreadExecutor();
+
+ try {
+ final TestingHistoryServerArchivist archivist =
+ new TestingHistoryServerArchivist(ioExecutor, 50L);
+ final TestingJobMasterServiceLeadershipRunnerFactory
testingJobManagerRunnerFactory =
+ new TestingJobMasterServiceLeadershipRunnerFactory(0);
+ final TestingDispatcher.Builder testingDispatcherBuilder =
+ createTestingDispatcherBuilder()
+ .setHistoryServerArchivist(archivist)
+ .setConfiguration(configuration);
+ startDispatcher(testingDispatcherBuilder,
testingJobManagerRunnerFactory);
+
+ submitJobAndWait();
+ final TestingJobManagerRunner testingJobManagerRunner =
+
testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
+ finishJob(testingJobManagerRunner);
+
+ globalCleanupFuture.join();
+ dispatcher.getJobTerminationFuture(jobId,
Time.milliseconds(1000L)).join();
+
+ assertTrue(archivist.isArchived());
+ } finally {
+ ioExecutor.shutdownNow();
+ }
+ }
+
+ @Test(timeout = 5000L)
Review Comment:
Removed the timeout here.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1215,17 +1238,22 @@ private void jobMasterFailed(JobID jobId, Throwable
cause) {
getMainThreadExecutor());
}
- CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
+ private CompletableFuture<Void> getTerminatedJobTerminationFuture(JobID
jobId) {
Review Comment:
How about `getJobTerminationFutureOrThrowIfJobIsRunning`? Since it throws a
`DispatcherException` for running job.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -762,4 +870,40 @@ public JobManagerRunner createJobManagerRunner(
throw testException;
}
}
+
+ private class TestingHistoryServerArchivist implements
HistoryServerArchivist {
+
+ private final Executor ioExecutor;
+ private final long sleepMills;
+
+ private boolean archived;
+
+ public TestingHistoryServerArchivist(Executor ioExecutor, long
sleepMills) {
+ this.ioExecutor = ioExecutor;
+ this.sleepMills = sleepMills;
+ this.archived = false;
+ }
+
+ public boolean isArchived() {
+ return archived;
+ }
+
+ @Override
+ public CompletableFuture<Acknowledge> archiveExecutionGraph(
+ ExecutionGraphInfo executionGraphInfo) {
+ return CompletableFuture.runAsync(
Review Comment:
Replaced.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1060,40 +1066,24 @@ protected CleanupJobState
jobReachedTerminalState(ExecutionGraphInfo executionGr
terminalJobStatus);
}
- archiveExecutionGraph(executionGraphInfo);
+ storeExecutionGraphInfo(executionGraphInfo);
if (terminalJobStatus.isGloballyTerminalState()) {
- final JobID jobId = executionGraphInfo.getJobId();
- try {
- if (jobResultStore.hasCleanJobResultEntry(jobId)) {
- log.warn(
- "Job {} is already marked as clean but clean up
was triggered again.",
- jobId);
- } else if (!jobResultStore.hasDirtyJobResultEntry(jobId)) {
- jobResultStore.createDirtyResult(
- new JobResultEntry(
- JobResult.createFrom(
-
executionGraphInfo.getArchivedExecutionGraph())));
- log.info(
- "Job {} has been registered for cleanup in the
JobResultStore after reaching a terminal state.",
- jobId);
- }
- } catch (IOException e) {
- fatalErrorHandler.onFatalError(
- new FlinkException(
- String.format(
- "The job %s couldn't be marked as
pre-cleanup finished in JobResultStore.",
- jobId),
- e));
- }
- }
- return terminalJobStatus.isGloballyTerminalState()
- ? CleanupJobState.GLOBAL
- : CleanupJobState.LOCAL;
+ // do not create an archive for suspended jobs, as this would
eventually lead to
+ // multiple archive attempts which we currently do not support
+ CompletableFuture<Acknowledge> archiveFuture =
+ archiveExecutionGraph(executionGraphInfo);
+
+ registerCleanupInJobResultStore(executionGraphInfo);
Review Comment:
Could we chain the registration by putting it into the `thenApplyAsync`?
However, I put it here because I think if the archiving takes a long time
and the cluster is killed by the user or the external resource providers, the
registration is done already and the cleanup will be executed once the job is
resumed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]