Husky Zeng created FLINK-19154:
----------------------------------

             Summary: Always  clean up HA data when application completion
                 Key: FLINK-19154
                 URL: https://issues.apache.org/jira/browse/FLINK-19154
             Project: Flink
          Issue Type: Bug
          Components: Client / Job Submission
    Affects Versions: 1.11.1
         Environment: Run a stand-alone cluster that runs a single job (if you 
are familiar with the way Ververica Platform runs Flink jobs, we use a very 
similar approach). It runs Flink 1.11.1 straight from the official docker image.
            Reporter: Husky Zeng


http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-metadata-deleted-by-Flink-after-ZK-connection-issues-td37937.html

As this mail say , when the application completed with unknown throwable, the 
program catch and ignore it , and finally leads to clean up HA data.

``
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap#runApplicationAndShutdownClusterAsync

        CompletableFuture<Acknowledge> runApplicationAndShutdownClusterAsync(
                        final DispatcherGateway dispatcher,
                        final ScheduledExecutor scheduledExecutor) {

                applicationCompletionFuture = 
fixJobIdAndRunApplicationAsync(dispatcher, scheduledExecutor);

                return applicationCompletionFuture
                                .handle((r, t) -> {
                                        final ApplicationStatus 
applicationStatus;
                                        if (t != null) {

                                                final 
Optional<JobCancellationException> cancellationException =
                                                                
ExceptionUtils.findThrowable(t, JobCancellationException.class);

                                                if 
(cancellationException.isPresent()) {
                                                        // this means the Flink 
Job was cancelled
                                                        applicationStatus = 
ApplicationStatus.CANCELED;
                                                } else if (t instanceof 
CancellationException) {
                                                        // this means that the 
future was cancelled
                                                        applicationStatus = 
ApplicationStatus.UNKNOWN;
                                                } else {
                                                        applicationStatus = 
ApplicationStatus.FAILED;
                                                }

                                                LOG.warn("Application {}: ", 
applicationStatus, t);
                                        } else {
                                                applicationStatus = 
ApplicationStatus.SUCCEEDED;
                                                LOG.info("Application completed 
SUCCESSFULLY");
                                        }
                                      * // notes: whatever the throwable is,we 
will ignore it,*
                                        *return 
dispatcher.shutDownCluster(applicationStatus);*
                                })
                                .thenCompose(Function.identity());
        }

org.apache.flink.runtime.dispatcher.Dispatcher#shutDownCluster(org.apache.flink.runtime.clusterframework.ApplicationStatus)


@Override
        public CompletableFuture<Acknowledge> shutDownCluster(final 
ApplicationStatus applicationStatus) {
                // only complete , no completeExceptionally
                *shutDownFuture.complete(applicationStatus);*
                return CompletableFuture.completedFuture(Acknowledge.get());
        }


org.apache.flink.runtime.entrypoint.ClusterEntrypoint#runCluster


private void runCluster(Configuration configuration, PluginManager 
pluginManager) throws Exception {
                synchronized (lock) {

                        initializeServices(configuration, pluginManager);

                        // write host information into configuration
                        configuration.setString(JobManagerOptions.ADDRESS, 
commonRpcService.getAddress());
                        configuration.setInteger(JobManagerOptions.PORT, 
commonRpcService.getPort());

                        final DispatcherResourceManagerComponentFactory 
dispatcherResourceManagerComponentFactory = 
createDispatcherResourceManagerComponentFactory(configuration);

                        clusterComponent = 
dispatcherResourceManagerComponentFactory.create(
                                configuration,
                                ioExecutor,
                                commonRpcService,
                                haServices,
                                blobServer,
                                heartbeatServices,
                                metricRegistry,
                                archivedExecutionGraphStore,
                                new 
RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
                                this);

// the throwable  will always null
                        *clusterComponent.getShutDownFuture().whenComplete*(
                                (ApplicationStatus applicationStatus, Throwable 
throwable) -> {
                                        if (throwable != null) {
                                                shutDownAsync(
                                                        
ApplicationStatus.UNKNOWN,
                                                        
ExceptionUtils.stringifyException(throwable),
                                                        false);
                                        } else {
                                                // This is the general shutdown 
path. If a separate more specific shutdown was
                                                // already triggered, this will 
do nothing
                                                shutDownAsync(
                                                        applicationStatus,
                                                        null,
                                                        true);
                                        }
                                });
                }
        }
``


So ,if we change code like this ,it will not clean up ha data when failed such 
as shutDownFuture..completeExceptionally(t)  when there is an unknown error.

By the way, this is the first time I submit an issue , if there are any fault , 
please told me. I am very glad to do something for the community.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to