tillrohrmann commented on a change in pull request #18220:
URL: https://github.com/apache/flink/pull/18220#discussion_r800081613



##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClientRetrieverImpl.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+/**
+ * The implementation of {@link YarnClientRetriever} which is used to get a 
wrapper of yarn client
+ * for {@link ApplicationReportProviderImpl}. When external yarn client is 
closed or nullable, it
+ * will create a dedicated yarn client wrapper.

Review comment:
       ```suggestion
    * will create a dedicated yarn client.
   ```

##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClientWrapper.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/** Wrapper class for {@link YarnClient}. */
+@Internal
+public class YarnClientWrapper implements AutoCloseable {
+    private YarnClient yarnClient;
+    private boolean allowToStop;
+
+    public YarnClientWrapper(YarnClient yarnClient, boolean allowToStop) {
+        this.yarnClient = yarnClient;
+        this.allowToStop = allowToStop;
+    }
+
+    public ApplicationReport getApplicationReport(ApplicationId appId)
+            throws YarnException, IOException {
+        return yarnClient.getApplicationReport(appId);
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (allowToStop) {
+            yarnClient.close();
+        }
+    }
+
+    public boolean isClosed() {
+        return yarnClient.isInState(Service.STATE.STOPPED);
+    }
+
+    public static YarnClientWrapper of(YarnClient yarnClient, boolean 
allowToStop) {
+        return new YarnClientWrapper(yarnClient, allowToStop);
+    }
+
+    public static YarnClientWrapper of(YarnConfiguration yarnConfiguration, 
boolean allowToStop) {
+        final YarnClient newlyCreatedYarnClient = 
YarnClient.createYarnClient();
+        newlyCreatedYarnClient.init(yarnConfiguration);
+        newlyCreatedYarnClient.start();
+        return of(newlyCreatedYarnClient, allowToStop);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        YarnClientWrapper that = (YarnClientWrapper) o;
+        return allowToStop == that.allowToStop && 
yarnClient.equals(that.yarnClient);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(yarnClient, allowToStop);
+    }

Review comment:
       For what is this needed?

##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClientWrapper.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/** Wrapper class for {@link YarnClient}. */
+@Internal
+public class YarnClientWrapper implements AutoCloseable {
+    private YarnClient yarnClient;
+    private boolean allowToStop;
+
+    public YarnClientWrapper(YarnClient yarnClient, boolean allowToStop) {
+        this.yarnClient = yarnClient;
+        this.allowToStop = allowToStop;
+    }
+
+    public ApplicationReport getApplicationReport(ApplicationId appId)
+            throws YarnException, IOException {
+        return yarnClient.getApplicationReport(appId);
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (allowToStop) {
+            yarnClient.close();
+        }
+    }
+
+    public boolean isClosed() {
+        return yarnClient.isInState(Service.STATE.STOPPED);
+    }
+
+    public static YarnClientWrapper of(YarnClient yarnClient, boolean 
allowToStop) {

Review comment:
       Instead of passing boolean flags, I'd suggest to provide properly named 
factory methods.

##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClientWrapper.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/** Wrapper class for {@link YarnClient}. */
+@Internal
+public class YarnClientWrapper implements AutoCloseable {

Review comment:
       Instead of this wrapper, you could use Flink's `Reference` class. It 
exactly provides you what we need here. Something that needs to be closed is a 
`owned` and something that should not be closed is `borrowed`.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClientRetriever.java
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.FlinkException;
+
+/** Factory for getting {@link org.apache.hadoop.yarn.client.api.YarnClient}. 
*/
+@Internal
+public interface YarnClientRetriever {
+    YarnClientWrapper getYarnClient() throws FlinkException;

Review comment:
       JavaDocs are missing. It would be especially interesting to know when a 
`FlinkException` is being thrown.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClientRetrieverImpl.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+/**
+ * The implementation of {@link YarnClientRetriever} which is used to get a 
wrapper of yarn client
+ * for {@link ApplicationReportProviderImpl}. When external yarn client is 
closed or nullable, it
+ * will create a dedicated yarn client wrapper.
+ */
+@Internal
+public final class YarnClientRetrieverImpl implements YarnClientRetriever {
+    private static final Logger LOG = 
LoggerFactory.getLogger(YarnClientRetrieverImpl.class);
+
+    @Nullable private final YarnClientWrapper externalYarnClient;
+    private final YarnConfiguration yarnConfiguration;
+    @Nullable private YarnClientWrapper dedicatedYarnClient;
+
+    private YarnClientRetrieverImpl(
+            @Nullable YarnClientWrapper externalCreatedYarnClient,

Review comment:
       ```suggestion
               @Nullable YarnClientWrapper externallyCreatedYarnClient,
   ```

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClientRetrieverImpl.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+/**
+ * The implementation of {@link YarnClientRetriever} which is used to get a 
wrapper of yarn client
+ * for {@link ApplicationReportProviderImpl}. When external yarn client is 
closed or nullable, it
+ * will create a dedicated yarn client wrapper.
+ */
+@Internal
+public final class YarnClientRetrieverImpl implements YarnClientRetriever {
+    private static final Logger LOG = 
LoggerFactory.getLogger(YarnClientRetrieverImpl.class);
+
+    @Nullable private final YarnClientWrapper externalYarnClient;
+    private final YarnConfiguration yarnConfiguration;
+    @Nullable private YarnClientWrapper dedicatedYarnClient;
+
+    private YarnClientRetrieverImpl(
+            @Nullable YarnClientWrapper externalCreatedYarnClient,
+            YarnConfiguration yarnConfiguration) {
+        this.externalYarnClient = externalCreatedYarnClient;
+        this.yarnConfiguration = yarnConfiguration;
+    }
+
+    @Override
+    public YarnClientWrapper getYarnClient() throws FlinkException {
+
+        if (externalYarnClient != null && !externalYarnClient.isClosed()) {

Review comment:
       This check could be refactored into `isRunning(externalYarnClient)` and 
then be reused for the next if condition as well.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -1195,53 +1189,78 @@ private ApplicationReport startAppMaster(
         yarnClient.submitApplication(appContext);
 
         LOG.info("Waiting for the cluster to be allocated");
+
+        YarnClientWrapper yarnClientWrapper = YarnClientWrapper.of(yarnClient, 
false);
+
+        try {
+            waitTillTargetState(
+                    yarnClientWrapper,
+                    appId,
+                    YarnApplicationState.ACCEPTED,
+                    YarnApplicationState.RUNNING);
+        } catch (IOException e) {
+            throw new YarnDeploymentException("Failed to deploy the cluster.", 
e);
+        }
+
+        // since deployment was successful, remove the hook
+        ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, 
getClass().getSimpleName(), LOG);
+
+        return ApplicationReportProviderImpl.of(
+                YarnClientRetrieverImpl.from(yarnClientWrapper, 
yarnConfiguration), applicationId);
+    }
+
+    public static ApplicationReport waitTillTargetState(
+            YarnClientWrapper yarnClient, ApplicationId appId, 
YarnApplicationState... targetStates)

Review comment:
       ```suggestion
               YarnClient yarnClient, ApplicationId appId, 
YarnApplicationState... targetStates)
   ```

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -1195,53 +1189,78 @@ private ApplicationReport startAppMaster(
         yarnClient.submitApplication(appContext);
 
         LOG.info("Waiting for the cluster to be allocated");
+
+        YarnClientWrapper yarnClientWrapper = YarnClientWrapper.of(yarnClient, 
false);
+
+        try {
+            waitTillTargetState(
+                    yarnClientWrapper,
+                    appId,
+                    YarnApplicationState.ACCEPTED,
+                    YarnApplicationState.RUNNING);
+        } catch (IOException e) {
+            throw new YarnDeploymentException("Failed to deploy the cluster.", 
e);
+        }
+
+        // since deployment was successful, remove the hook
+        ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, 
getClass().getSimpleName(), LOG);
+
+        return ApplicationReportProviderImpl.of(
+                YarnClientRetrieverImpl.from(yarnClientWrapper, 
yarnConfiguration), applicationId);
+    }
+
+    public static ApplicationReport waitTillTargetState(

Review comment:
       ```suggestion
       public static ApplicationReport waitUntilTargetState(
   ```

##########
File path: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
##########
@@ -141,54 +184,70 @@ private void deployPerJob(Configuration configuration, 
JobGraph jobGraph, boolea
                         ? createYarnClusterDescriptor(configuration)
                         : 
createYarnClusterDescriptorWithoutLibDir(configuration)) {
 
-            final int masterMemory =
-                    yarnClusterDescriptor
-                            .getFlinkConfiguration()
-                            .get(JobManagerOptions.TOTAL_PROCESS_MEMORY)
-                            .getMebiBytes();
-            final ClusterSpecification clusterSpecification =
-                    new ClusterSpecification.ClusterSpecificationBuilder()
-                            .setMasterMemoryMB(masterMemory)
-                            .setTaskManagerMemoryMB(1024)
-                            .setSlotsPerTaskManager(1)
-                            .createClusterSpecification();
-
-            File testingJar =
-                    TestUtils.findFile("..", new 
TestUtils.TestJarFinder("flink-yarn-tests"));
-
-            jobGraph.addJar(new 
org.apache.flink.core.fs.Path(testingJar.toURI()));
-            try (ClusterClient<ApplicationId> clusterClient =
-                    yarnClusterDescriptor
-                            .deployJobCluster(clusterSpecification, jobGraph, 
false)
-                            .getClusterClient()) {
-
-                for (DistributedCache.DistributedCacheEntry entry :
-                        jobGraph.getUserArtifacts().values()) {
-                    assertTrue(
-                            String.format(
-                                    "The user artifacts(%s) should be remote 
or uploaded to remote filesystem.",
-                                    entry.filePath),
-                            Utils.isRemotePath(entry.filePath));
-                }
-
-                ApplicationId applicationId = clusterClient.getClusterId();
-
-                final CompletableFuture<JobResult> jobResultCompletableFuture =
-                        clusterClient.requestJobResult(jobGraph.getJobID());
-
-                final JobResult jobResult = jobResultCompletableFuture.get();
-
-                assertThat(jobResult, is(notNullValue()));
-                assertThat(jobResult.getSerializedThrowable().isPresent(), 
is(false));
-
-                checkStagingDirectory(configuration, applicationId);
-
-                waitApplicationFinishedElseKillIt(
-                        applicationId,
-                        yarnAppTerminateTimeout,
-                        yarnClusterDescriptor,
-                        sleepIntervalInMS);
+            ClusterClientProvider<ApplicationId> clusterClientProvider =
+                    deployJobCluster(yarnClusterDescriptor, jobGraph);
+
+            ClusterClient<ApplicationId> clusterClient = 
clusterClientProvider.getClusterClient();
+
+            checkApplicationFinished(clusterClient, yarnClusterDescriptor, 
jobGraph, configuration);
+        }
+    }
+
+    private ClusterClientProvider<ApplicationId> deployJobCluster(
+            YarnClusterDescriptor yarnClusterDescriptor, JobGraph jobGraph)
+            throws ClusterDeploymentException {
+        final int masterMemory =
+                yarnClusterDescriptor
+                        .getFlinkConfiguration()
+                        .get(JobManagerOptions.TOTAL_PROCESS_MEMORY)
+                        .getMebiBytes();
+        final ClusterSpecification clusterSpecification =
+                new ClusterSpecification.ClusterSpecificationBuilder()
+                        .setMasterMemoryMB(masterMemory)
+                        .setTaskManagerMemoryMB(1024)
+                        .setSlotsPerTaskManager(1)
+                        .createClusterSpecification();
+
+        File testingJar = TestUtils.findFile("..", new 
TestUtils.TestJarFinder("flink-yarn-tests"));
+
+        jobGraph.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI()));
+        return yarnClusterDescriptor.deployJobCluster(clusterSpecification, 
jobGraph, false);
+    }
+
+    private void checkApplicationFinished(

Review comment:
       ```suggestion
       private void waitUntilApplicationFinishes(
   ```

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -1195,53 +1189,78 @@ private ApplicationReport startAppMaster(
         yarnClient.submitApplication(appContext);
 
         LOG.info("Waiting for the cluster to be allocated");
+
+        YarnClientWrapper yarnClientWrapper = YarnClientWrapper.of(yarnClient, 
false);
+
+        try {
+            waitTillTargetState(
+                    yarnClientWrapper,
+                    appId,
+                    YarnApplicationState.ACCEPTED,
+                    YarnApplicationState.RUNNING);
+        } catch (IOException e) {
+            throw new YarnDeploymentException("Failed to deploy the cluster.", 
e);
+        }
+
+        // since deployment was successful, remove the hook
+        ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, 
getClass().getSimpleName(), LOG);
+
+        return ApplicationReportProviderImpl.of(
+                YarnClientRetrieverImpl.from(yarnClientWrapper, 
yarnConfiguration), applicationId);

Review comment:
       What if the `YarnApplicationState` is `RUNNING`? Couldn't we then create 
a finished `ApplicationReportProvider`? Then we wouldn't have to poll again.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/ApplicationReportProviderImpl.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import java.io.IOException;
+
+/** The implementation of {@link ApplicationReportProvider}. */
+@Internal
+public class ApplicationReportProviderImpl implements 
ApplicationReportProvider {
+    private final YarnClientRetriever yarnClientRetriever;
+    private final ApplicationId appId;
+
+    private ApplicationReportProviderImpl(
+            YarnClientRetriever yarnClientRetriever, ApplicationId 
applicationId) {
+        this.yarnClientRetriever = yarnClientRetriever;
+        this.appId = applicationId;
+    }
+
+    @Override
+    public ApplicationReport waitTillSubmissionFinish() throws FlinkException {
+        try (final YarnClientWrapper yarnClient = 
yarnClientRetriever.getYarnClient()) {
+            return YarnClusterDescriptor.waitTillTargetState(
+                    yarnClient, appId, YarnApplicationState.RUNNING);
+        } catch (YarnException | IOException e) {
+            throw new FlinkException(
+                    "Errors on getting YARN application report. Maybe 
application has finished.",
+                    e);
+        } catch (InterruptedException interruptedException) {

Review comment:
       In case of `InterruptedException` you should set the interrupted flag 
via: `Thread.currentThread().interrupt();`

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClientRetriever.java
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.FlinkException;
+
+/** Factory for getting {@link org.apache.hadoop.yarn.client.api.YarnClient}. 
*/

Review comment:
       Maybe say that it returns a `YarnClientWrapper`.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/ApplicationReportProvider.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+
+/** Factory for getting {@link ApplicationReport}. */
+@Internal
+public interface ApplicationReportProvider {
+    /**
+     * Waiting for the job to be running and then return the latest {@link 
ApplicationReport}. And
+     * when Yarn client is closed or encountering any connection error, it 
will throw {@link
+     * FlinkException}.
+     */
+    ApplicationReport waitTillSubmissionFinish() throws FlinkException;

Review comment:
       ```suggestion
       ApplicationReport waitUntilSubmissionFinishes() throws FlinkException;
   ```

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClientRetrieverImpl.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+/**
+ * The implementation of {@link YarnClientRetriever} which is used to get a 
wrapper of yarn client
+ * for {@link ApplicationReportProviderImpl}. When external yarn client is 
closed or nullable, it
+ * will create a dedicated yarn client wrapper.
+ */
+@Internal
+public final class YarnClientRetrieverImpl implements YarnClientRetriever {
+    private static final Logger LOG = 
LoggerFactory.getLogger(YarnClientRetrieverImpl.class);
+
+    @Nullable private final YarnClientWrapper externalYarnClient;
+    private final YarnConfiguration yarnConfiguration;
+    @Nullable private YarnClientWrapper dedicatedYarnClient;
+
+    private YarnClientRetrieverImpl(
+            @Nullable YarnClientWrapper externalCreatedYarnClient,

Review comment:
       I would suggest to not provide a `YarnClientWrapper` but a `YarnClient` 
that is then wrapped as an external `YarnClient` in the constructor.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to