Husky Zeng created FLINK-19154:
----------------------------------
Summary: Always clean up HA data when application completion
Key: FLINK-19154
URL: https://issues.apache.org/jira/browse/FLINK-19154
Project: Flink
Issue Type: Bug
Components: Client / Job Submission
Affects Versions: 1.11.1
Environment: Run a stand-alone cluster that runs a single job (if you
are familiar with the way Ververica Platform runs Flink jobs, we use a very
similar approach). It runs Flink 1.11.1 straight from the official docker image.
Reporter: Husky Zeng
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-metadata-deleted-by-Flink-after-ZK-connection-issues-td37937.html
As this mail say , when the application completed with unknown throwable, the
program catch and ignore it , and finally leads to clean up HA data.
``
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap#runApplicationAndShutdownClusterAsync
CompletableFuture<Acknowledge> runApplicationAndShutdownClusterAsync(
final DispatcherGateway dispatcher,
final ScheduledExecutor scheduledExecutor) {
applicationCompletionFuture =
fixJobIdAndRunApplicationAsync(dispatcher, scheduledExecutor);
return applicationCompletionFuture
.handle((r, t) -> {
final ApplicationStatus
applicationStatus;
if (t != null) {
final
Optional<JobCancellationException> cancellationException =
ExceptionUtils.findThrowable(t, JobCancellationException.class);
if
(cancellationException.isPresent()) {
// this means the Flink
Job was cancelled
applicationStatus =
ApplicationStatus.CANCELED;
} else if (t instanceof
CancellationException) {
// this means that the
future was cancelled
applicationStatus =
ApplicationStatus.UNKNOWN;
} else {
applicationStatus =
ApplicationStatus.FAILED;
}
LOG.warn("Application {}: ",
applicationStatus, t);
} else {
applicationStatus =
ApplicationStatus.SUCCEEDED;
LOG.info("Application completed
SUCCESSFULLY");
}
* // notes: whatever the throwable is,we
will ignore it,*
*return
dispatcher.shutDownCluster(applicationStatus);*
})
.thenCompose(Function.identity());
}
org.apache.flink.runtime.dispatcher.Dispatcher#shutDownCluster(org.apache.flink.runtime.clusterframework.ApplicationStatus)
@Override
public CompletableFuture<Acknowledge> shutDownCluster(final
ApplicationStatus applicationStatus) {
// only complete , no completeExceptionally
*shutDownFuture.complete(applicationStatus);*
return CompletableFuture.completedFuture(Acknowledge.get());
}
org.apache.flink.runtime.entrypoint.ClusterEntrypoint#runCluster
private void runCluster(Configuration configuration, PluginManager
pluginManager) throws Exception {
synchronized (lock) {
initializeServices(configuration, pluginManager);
// write host information into configuration
configuration.setString(JobManagerOptions.ADDRESS,
commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT,
commonRpcService.getPort());
final DispatcherResourceManagerComponentFactory
dispatcherResourceManagerComponentFactory =
createDispatcherResourceManagerComponentFactory(configuration);
clusterComponent =
dispatcherResourceManagerComponentFactory.create(
configuration,
ioExecutor,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
archivedExecutionGraphStore,
new
RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
this);
// the throwable will always null
*clusterComponent.getShutDownFuture().whenComplete*(
(ApplicationStatus applicationStatus, Throwable
throwable) -> {
if (throwable != null) {
shutDownAsync(
ApplicationStatus.UNKNOWN,
ExceptionUtils.stringifyException(throwable),
false);
} else {
// This is the general shutdown
path. If a separate more specific shutdown was
// already triggered, this will
do nothing
shutDownAsync(
applicationStatus,
null,
true);
}
});
}
}
``
So ,if we change code like this ,it will not clean up ha data when failed such
as shutDownFuture..completeExceptionally(t) when there is an unknown error.
By the way, this is the first time I submit an issue , if there are any fault ,
please told me. I am very glad to do something for the community.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)