eemario commented on code in PR #27324:
URL: https://github.com/apache/flink/pull/27324#discussion_r2630259275


##########
flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java:
##########
@@ -0,0 +1,640 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.application;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.common.ApplicationState;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.cli.ClientOptions;
+import 
org.apache.flink.client.deployment.application.executors.EmbeddedExecutorServiceLoader;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.PipelineOptionsInternal;
+import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
+import org.apache.flink.runtime.application.AbstractApplication;
+import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of {@link AbstractApplication} designed for executing the 
user's {@code
+ * main()}.
+ *
+ * <p>This application functions similarly to {@link 
ApplicationDispatcherBootstrap} and can serve
+ * as a replacement for it in application mode.
+ */
+@Internal
+public class PackagedProgramApplication extends AbstractApplication {
+
+    @VisibleForTesting static final String FAILED_JOB_NAME = "(application 
driver)";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PackagedProgramApplication.class);
+
+    private static boolean isCanceledOrFailed(ApplicationStatus 
applicationStatus) {
+        return applicationStatus == ApplicationStatus.CANCELED
+                || applicationStatus == ApplicationStatus.FAILED;
+    }
+
+    private final PackagedProgram program;
+
+    private final Collection<JobID> recoveredJobIds;
+
+    private final Configuration configuration;
+
+    private final boolean handleFatalError;
+
+    private final boolean enforceSingleJobExecution;
+
+    private final boolean submitFailedJobOnApplicationError;
+
+    private final boolean shutDownOnFinish;
+
+    private transient CompletableFuture<Void> applicationCompletionFuture;
+
+    private transient ScheduledFuture<?> applicationExecutionTask;
+
+    private transient CompletableFuture<Acknowledge> finishApplicationFuture;
+
+    private transient boolean isDisposing = false;
+
+    public PackagedProgramApplication(
+            final ApplicationID applicationId,
+            final PackagedProgram program,
+            final Collection<JobID> recoveredJobIds,
+            final Configuration configuration,
+            final boolean handleFatalError,
+            final boolean enforceSingleJobExecution,
+            final boolean submitFailedJobOnApplicationError,
+            final boolean shutDownOnFinish) {
+        super(applicationId);
+        this.program = checkNotNull(program);
+        this.recoveredJobIds = checkNotNull(recoveredJobIds);
+        this.configuration = checkNotNull(configuration);
+        this.handleFatalError = handleFatalError;
+        this.enforceSingleJobExecution = enforceSingleJobExecution;
+        this.submitFailedJobOnApplicationError = 
submitFailedJobOnApplicationError;
+        this.shutDownOnFinish = shutDownOnFinish;
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> execute(
+            DispatcherGateway dispatcherGateway,
+            ScheduledExecutor scheduledExecutor,
+            Executor mainThreadExecutor,
+            FatalErrorHandler errorHandler) {
+        transitionToRunning();
+
+        final CompletableFuture<List<JobID>> applicationExecutionFuture = new 
CompletableFuture<>();
+        final Set<JobID> tolerateMissingResult = 
Collections.synchronizedSet(new HashSet<>());
+
+        // we need to hand in a future as return value because we need to get 
those JobIs out
+        // from the scheduled task that executes the user program
+        applicationExecutionTask =
+                scheduledExecutor.schedule(
+                        () ->
+                                runApplicationEntryPoint(
+                                        applicationExecutionFuture,
+                                        tolerateMissingResult,
+                                        dispatcherGateway,
+                                        scheduledExecutor),
+                        0L,
+                        TimeUnit.MILLISECONDS);
+
+        boolean decoupleApplicationStatusFromJobStatus =
+                
!configuration.get(DeploymentOptions.TERMINATE_APPLICATION_ON_ANY_JOB_EXCEPTION);
+
+        if (decoupleApplicationStatusFromJobStatus) {
+            // when the application status is decoupled from the job status, 
we don't need to wait
+            // for the job results
+            applicationCompletionFuture = 
applicationExecutionFuture.thenApply(ignored -> null);
+            finishApplicationFuture =
+                    applicationCompletionFuture
+                            .handleAsync(
+                                    (ignored, t) -> {
+                                        if (t == null) {
+                                            LOG.info(
+                                                    "Application completed 
SUCCESSFULLY (decoupled from job results)");
+                                            return finishAsSucceeded(
+                                                    dispatcherGateway,
+                                                    scheduledExecutor,
+                                                    mainThreadExecutor,
+                                                    errorHandler);
+                                        }
+
+                                        return onApplicationCancelledOrFailed(
+                                                dispatcherGateway,
+                                                scheduledExecutor,
+                                                mainThreadExecutor,
+                                                errorHandler,
+                                                t);
+                                    },
+                                    mainThreadExecutor)
+                            .thenCompose(Function.identity());
+        } else {
+            applicationCompletionFuture =
+                    applicationExecutionFuture.thenCompose(
+                            jobIds ->
+                                    waitForJobResults(
+                                            dispatcherGateway,
+                                            jobIds,
+                                            tolerateMissingResult,
+                                            scheduledExecutor));
+
+            finishApplicationFuture =
+                    applicationCompletionFuture
+                            .handleAsync(
+                                    (ignored, t) -> {
+                                        if (t == null) {
+                                            LOG.info("Application completed 
SUCCESSFULLY");
+                                            transitionToFinished();
+                                            return maybeShutdownCluster(
+                                                    dispatcherGateway, 
ApplicationStatus.SUCCEEDED);
+                                        }
+
+                                        final Optional<ApplicationStatus> 
maybeApplicationStatus =
+                                                extractApplicationStatus(t);
+                                        if (maybeApplicationStatus.isPresent()
+                                                && isCanceledOrFailed(
+                                                        
maybeApplicationStatus.get())) {
+                                            // the exception is caused by job 
execution results
+                                            ApplicationStatus 
applicationStatus =
+                                                    
maybeApplicationStatus.get();
+                                            LOG.info("Application {}: ", 
applicationStatus, t);
+                                            if (applicationStatus == 
ApplicationStatus.CANCELED) {
+                                                transitionToCancelling();
+                                                return finishAsCanceled(
+                                                        dispatcherGateway,
+                                                        scheduledExecutor,
+                                                        mainThreadExecutor,
+                                                        errorHandler);
+
+                                            } else {
+                                                transitionToFailing();
+                                                return finishAsFailed(
+                                                        dispatcherGateway,
+                                                        scheduledExecutor,
+                                                        mainThreadExecutor,
+                                                        errorHandler);
+                                            }
+                                        }
+
+                                        return onApplicationCancelledOrFailed(
+                                                dispatcherGateway,
+                                                scheduledExecutor,
+                                                mainThreadExecutor,
+                                                errorHandler,
+                                                t);
+                                    },
+                                    mainThreadExecutor)
+                            .thenCompose(Function.identity());
+        }
+
+        // In Application Mode, the handleFatalError flag is set to true, and 
uncaught exceptions
+        // are handled by the errorHandler to trigger failover.
+        // In Session Mode, the handleFatalError flag may be set to false, 
leaving exceptions
+        // unhandled. This behavior may change in the future.
+        FutureUtils.handleUncaughtException(
+                finishApplicationFuture,
+                (t, e) -> {
+                    if (handleFatalError) {
+                        errorHandler.onFatalError(e);
+                    }
+                });
+
+        return CompletableFuture.completedFuture(Acknowledge.get());
+    }
+
+    @Override
+    public void cancel() {
+        ApplicationState currentState = getApplicationStatus();
+        if (currentState == ApplicationState.CREATED) {
+            // nothing to cancel
+            transitionToCancelling();
+            transitionToCanceled();
+        } else if (currentState == ApplicationState.RUNNING) {
+            transitionToCancelling();
+            cancelFutures();
+        }
+    }
+
+    @Override
+    public void dispose() {
+        isDisposing = true;
+        cancelFutures();
+    }
+
+    private void cancelFutures() {
+        if (applicationExecutionTask != null) {
+            applicationExecutionTask.cancel(true);
+        }
+
+        if (applicationCompletionFuture != null) {
+            // applicationCompletionFuture.handleAsync will not block here
+            applicationCompletionFuture.cancel(true);
+        }
+    }
+
+    @Override
+    public String getName() {
+        return program.getMainClassName();
+    }
+
+    @VisibleForTesting
+    ScheduledFuture<?> getApplicationExecutionFuture() {
+        return applicationExecutionTask;
+    }
+
+    @VisibleForTesting
+    CompletableFuture<Void> getApplicationCompletionFuture() {
+        return applicationCompletionFuture;
+    }
+
+    @VisibleForTesting
+    CompletableFuture<Acknowledge> getFinishApplicationFuture() {
+        return finishApplicationFuture;
+    }
+
+    private CompletableFuture<Acknowledge> onApplicationCancelledOrFailed(
+            final DispatcherGateway dispatcherGateway,
+            final ScheduledExecutor scheduledExecutor,
+            final Executor mainThreadExecutor,
+            final FatalErrorHandler errorHandler,
+            final Throwable t) {
+        if (t instanceof CancellationException) {
+            // the applicationCompletionFuture is canceled by cancel() or 
dispose()
+            if (isDisposing) {
+                LOG.warn("Application execution is cancelled when dispose.");

Review Comment:
   Updated in the previous pr https://github.com/apache/flink/pull/27307



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to