[
https://issues.apache.org/jira/browse/FLINK-19154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Husky Zeng updated FLINK-19154:
-------------------------------
Description:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-metadata-deleted-by-Flink-after-ZK-connection-issues-td37937.html
As this mail say , when the application completed with unknown throwable, the
program catch and ignore it , and finally leads to clean up HA data.
``
//catch all throwable,but ignore it.
https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L162
//always use shutDownFuture.complete(status) ,but no
shutDownFuture.completeExceptionally(t)
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L597
// the throwable will always null
https://github.com/apache/flink/blob/6b9cdd41743edd24a929074d62a57b84e7b2dd97/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java#L243
```
So ,if we use shutDownFuture.completeExceptionally(t) when failed because of
an unknown error ,it will not clean up ha data.
By the way, this is the first time I submit an issue , if there are some wrong,
please told me. I am very glad to do something for the community,thanks.
was:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-metadata-deleted-by-Flink-after-ZK-connection-issues-td37937.html
As this mail say , when the application completed with unknown throwable, the
program catch and ignore it , and finally leads to clean up HA data.
``
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap#runApplicationAndShutdownClusterAsync
CompletableFuture<Acknowledge> runApplicationAndShutdownClusterAsync(
final DispatcherGateway dispatcher,
final ScheduledExecutor scheduledExecutor) {
applicationCompletionFuture =
fixJobIdAndRunApplicationAsync(dispatcher, scheduledExecutor);
return applicationCompletionFuture
.handle((r, t) -> {
final ApplicationStatus
applicationStatus;
if (t != null) {
final
Optional<JobCancellationException> cancellationException =
ExceptionUtils.findThrowable(t, JobCancellationException.class);
if
(cancellationException.isPresent()) {
// this means the Flink
Job was cancelled
applicationStatus =
ApplicationStatus.CANCELED;
} else if (t instanceof
CancellationException) {
// this means that the
future was cancelled
applicationStatus =
ApplicationStatus.UNKNOWN;
} else {
applicationStatus =
ApplicationStatus.FAILED;
}
LOG.warn("Application {}: ",
applicationStatus, t);
} else {
applicationStatus =
ApplicationStatus.SUCCEEDED;
LOG.info("Application completed
SUCCESSFULLY");
}
* // notes: whatever the throwable is,we
will ignore it,*
*return
dispatcher.shutDownCluster(applicationStatus);*
})
.thenCompose(Function.identity());
}
org.apache.flink.runtime.dispatcher.Dispatcher#shutDownCluster(org.apache.flink.runtime.clusterframework.ApplicationStatus)
@Override
public CompletableFuture<Acknowledge> shutDownCluster(final
ApplicationStatus applicationStatus) {
// only complete , no completeExceptionally
*shutDownFuture.complete(applicationStatus);*
return CompletableFuture.completedFuture(Acknowledge.get());
}
org.apache.flink.runtime.entrypoint.ClusterEntrypoint#runCluster
private void runCluster(Configuration configuration, PluginManager
pluginManager) throws Exception {
synchronized (lock) {
initializeServices(configuration, pluginManager);
// write host information into configuration
configuration.setString(JobManagerOptions.ADDRESS,
commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT,
commonRpcService.getPort());
final DispatcherResourceManagerComponentFactory
dispatcherResourceManagerComponentFactory =
createDispatcherResourceManagerComponentFactory(configuration);
clusterComponent =
dispatcherResourceManagerComponentFactory.create(
configuration,
ioExecutor,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
archivedExecutionGraphStore,
new
RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
this);
// the throwable will always null
*clusterComponent.getShutDownFuture().whenComplete*(
(ApplicationStatus applicationStatus, Throwable
throwable) -> {
if (throwable != null) {
shutDownAsync(
ApplicationStatus.UNKNOWN,
ExceptionUtils.stringifyException(throwable),
false);
} else {
// This is the general shutdown
path. If a separate more specific shutdown was
// already triggered, this will
do nothing
shutDownAsync(
applicationStatus,
null,
true);
}
});
}
}
``
So ,if we change code like this ,it will not clean up ha data when failed such
as shutDownFuture..completeExceptionally(t) when there is an unknown error.
By the way, this is the first time I submit an issue , if there are any fault ,
please told me. I am very glad to do something for the community.
> Always clean up HA data when application completion
> ----------------------------------------------------
>
> Key: FLINK-19154
> URL: https://issues.apache.org/jira/browse/FLINK-19154
> Project: Flink
> Issue Type: Bug
> Components: Client / Job Submission
> Affects Versions: 1.11.1
> Environment: Run a stand-alone cluster that runs a single job (if you
> are familiar with the way Ververica Platform runs Flink jobs, we use a very
> similar approach). It runs Flink 1.11.1 straight from the official docker
> image.
> Reporter: Husky Zeng
> Priority: Major
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-metadata-deleted-by-Flink-after-ZK-connection-issues-td37937.html
> As this mail say , when the application completed with unknown throwable, the
> program catch and ignore it , and finally leads to clean up HA data.
> ``
> //catch all throwable,but ignore it.
> https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L162
> //always use shutDownFuture.complete(status) ,but no
> shutDownFuture.completeExceptionally(t)
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L597
> // the throwable will always null
> https://github.com/apache/flink/blob/6b9cdd41743edd24a929074d62a57b84e7b2dd97/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java#L243
> ```
> So ,if we use shutDownFuture.completeExceptionally(t) when failed because
> of an unknown error ,it will not clean up ha data.
> By the way, this is the first time I submit an issue , if there are some
> wrong, please told me. I am very glad to do something for the
> community,thanks.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)