tillrohrmann commented on a change in pull request #18220: URL: https://github.com/apache/flink/pull/18220#discussion_r800081613
########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClientRetrieverImpl.java ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.FlinkException; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +/** + * The implementation of {@link YarnClientRetriever} which is used to get a wrapper of yarn client + * for {@link ApplicationReportProviderImpl}. When external yarn client is closed or nullable, it + * will create a dedicated yarn client wrapper. Review comment: ```suggestion * will create a dedicated yarn client. ``` ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClientWrapper.java ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.annotation.Internal; + +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; + +import java.io.IOException; +import java.util.Objects; + +/** Wrapper class for {@link YarnClient}. */ +@Internal +public class YarnClientWrapper implements AutoCloseable { + private YarnClient yarnClient; + private boolean allowToStop; + + public YarnClientWrapper(YarnClient yarnClient, boolean allowToStop) { + this.yarnClient = yarnClient; + this.allowToStop = allowToStop; + } + + public ApplicationReport getApplicationReport(ApplicationId appId) + throws YarnException, IOException { + return yarnClient.getApplicationReport(appId); + } + + @Override + public void close() throws Exception { + if (allowToStop) { + yarnClient.close(); + } + } + + public boolean isClosed() { + return yarnClient.isInState(Service.STATE.STOPPED); + } + + public static YarnClientWrapper of(YarnClient yarnClient, boolean allowToStop) { + return new YarnClientWrapper(yarnClient, allowToStop); + } + + public static YarnClientWrapper of(YarnConfiguration yarnConfiguration, boolean allowToStop) { + final YarnClient newlyCreatedYarnClient = YarnClient.createYarnClient(); + newlyCreatedYarnClient.init(yarnConfiguration); + newlyCreatedYarnClient.start(); + return of(newlyCreatedYarnClient, allowToStop); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + YarnClientWrapper that = (YarnClientWrapper) o; + return allowToStop == that.allowToStop && yarnClient.equals(that.yarnClient); + } + + @Override + public int hashCode() { + return Objects.hash(yarnClient, allowToStop); + } Review comment: For what is this needed? ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClientWrapper.java ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.annotation.Internal; + +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; + +import java.io.IOException; +import java.util.Objects; + +/** Wrapper class for {@link YarnClient}. */ +@Internal +public class YarnClientWrapper implements AutoCloseable { + private YarnClient yarnClient; + private boolean allowToStop; + + public YarnClientWrapper(YarnClient yarnClient, boolean allowToStop) { + this.yarnClient = yarnClient; + this.allowToStop = allowToStop; + } + + public ApplicationReport getApplicationReport(ApplicationId appId) + throws YarnException, IOException { + return yarnClient.getApplicationReport(appId); + } + + @Override + public void close() throws Exception { + if (allowToStop) { + yarnClient.close(); + } + } + + public boolean isClosed() { + return yarnClient.isInState(Service.STATE.STOPPED); + } + + public static YarnClientWrapper of(YarnClient yarnClient, boolean allowToStop) { Review comment: Instead of passing boolean flags, I'd suggest to provide properly named factory methods. ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClientWrapper.java ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.annotation.Internal; + +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; + +import java.io.IOException; +import java.util.Objects; + +/** Wrapper class for {@link YarnClient}. */ +@Internal +public class YarnClientWrapper implements AutoCloseable { Review comment: Instead of this wrapper, you could use Flink's `Reference` class. It exactly provides you what we need here. Something that needs to be closed is a `owned` and something that should not be closed is `borrowed`. ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClientRetriever.java ########## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.FlinkException; + +/** Factory for getting {@link org.apache.hadoop.yarn.client.api.YarnClient}. */ +@Internal +public interface YarnClientRetriever { + YarnClientWrapper getYarnClient() throws FlinkException; Review comment: JavaDocs are missing. It would be especially interesting to know when a `FlinkException` is being thrown. ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClientRetrieverImpl.java ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.FlinkException; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +/** + * The implementation of {@link YarnClientRetriever} which is used to get a wrapper of yarn client + * for {@link ApplicationReportProviderImpl}. When external yarn client is closed or nullable, it + * will create a dedicated yarn client wrapper. + */ +@Internal +public final class YarnClientRetrieverImpl implements YarnClientRetriever { + private static final Logger LOG = LoggerFactory.getLogger(YarnClientRetrieverImpl.class); + + @Nullable private final YarnClientWrapper externalYarnClient; + private final YarnConfiguration yarnConfiguration; + @Nullable private YarnClientWrapper dedicatedYarnClient; + + private YarnClientRetrieverImpl( + @Nullable YarnClientWrapper externalCreatedYarnClient, Review comment: ```suggestion @Nullable YarnClientWrapper externallyCreatedYarnClient, ``` ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClientRetrieverImpl.java ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.FlinkException; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +/** + * The implementation of {@link YarnClientRetriever} which is used to get a wrapper of yarn client + * for {@link ApplicationReportProviderImpl}. When external yarn client is closed or nullable, it + * will create a dedicated yarn client wrapper. + */ +@Internal +public final class YarnClientRetrieverImpl implements YarnClientRetriever { + private static final Logger LOG = LoggerFactory.getLogger(YarnClientRetrieverImpl.class); + + @Nullable private final YarnClientWrapper externalYarnClient; + private final YarnConfiguration yarnConfiguration; + @Nullable private YarnClientWrapper dedicatedYarnClient; + + private YarnClientRetrieverImpl( + @Nullable YarnClientWrapper externalCreatedYarnClient, + YarnConfiguration yarnConfiguration) { + this.externalYarnClient = externalCreatedYarnClient; + this.yarnConfiguration = yarnConfiguration; + } + + @Override + public YarnClientWrapper getYarnClient() throws FlinkException { + + if (externalYarnClient != null && !externalYarnClient.isClosed()) { Review comment: This check could be refactored into `isRunning(externalYarnClient)` and then be reused for the next if condition as well. ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java ########## @@ -1195,53 +1189,78 @@ private ApplicationReport startAppMaster( yarnClient.submitApplication(appContext); LOG.info("Waiting for the cluster to be allocated"); + + YarnClientWrapper yarnClientWrapper = YarnClientWrapper.of(yarnClient, false); + + try { + waitTillTargetState( + yarnClientWrapper, + appId, + YarnApplicationState.ACCEPTED, + YarnApplicationState.RUNNING); + } catch (IOException e) { + throw new YarnDeploymentException("Failed to deploy the cluster.", e); + } + + // since deployment was successful, remove the hook + ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG); + + return ApplicationReportProviderImpl.of( + YarnClientRetrieverImpl.from(yarnClientWrapper, yarnConfiguration), applicationId); + } + + public static ApplicationReport waitTillTargetState( + YarnClientWrapper yarnClient, ApplicationId appId, YarnApplicationState... targetStates) Review comment: ```suggestion YarnClient yarnClient, ApplicationId appId, YarnApplicationState... targetStates) ``` ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java ########## @@ -1195,53 +1189,78 @@ private ApplicationReport startAppMaster( yarnClient.submitApplication(appContext); LOG.info("Waiting for the cluster to be allocated"); + + YarnClientWrapper yarnClientWrapper = YarnClientWrapper.of(yarnClient, false); + + try { + waitTillTargetState( + yarnClientWrapper, + appId, + YarnApplicationState.ACCEPTED, + YarnApplicationState.RUNNING); + } catch (IOException e) { + throw new YarnDeploymentException("Failed to deploy the cluster.", e); + } + + // since deployment was successful, remove the hook + ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG); + + return ApplicationReportProviderImpl.of( + YarnClientRetrieverImpl.from(yarnClientWrapper, yarnConfiguration), applicationId); + } + + public static ApplicationReport waitTillTargetState( Review comment: ```suggestion public static ApplicationReport waitUntilTargetState( ``` ########## File path: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java ########## @@ -141,54 +184,70 @@ private void deployPerJob(Configuration configuration, JobGraph jobGraph, boolea ? createYarnClusterDescriptor(configuration) : createYarnClusterDescriptorWithoutLibDir(configuration)) { - final int masterMemory = - yarnClusterDescriptor - .getFlinkConfiguration() - .get(JobManagerOptions.TOTAL_PROCESS_MEMORY) - .getMebiBytes(); - final ClusterSpecification clusterSpecification = - new ClusterSpecification.ClusterSpecificationBuilder() - .setMasterMemoryMB(masterMemory) - .setTaskManagerMemoryMB(1024) - .setSlotsPerTaskManager(1) - .createClusterSpecification(); - - File testingJar = - TestUtils.findFile("..", new TestUtils.TestJarFinder("flink-yarn-tests")); - - jobGraph.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI())); - try (ClusterClient<ApplicationId> clusterClient = - yarnClusterDescriptor - .deployJobCluster(clusterSpecification, jobGraph, false) - .getClusterClient()) { - - for (DistributedCache.DistributedCacheEntry entry : - jobGraph.getUserArtifacts().values()) { - assertTrue( - String.format( - "The user artifacts(%s) should be remote or uploaded to remote filesystem.", - entry.filePath), - Utils.isRemotePath(entry.filePath)); - } - - ApplicationId applicationId = clusterClient.getClusterId(); - - final CompletableFuture<JobResult> jobResultCompletableFuture = - clusterClient.requestJobResult(jobGraph.getJobID()); - - final JobResult jobResult = jobResultCompletableFuture.get(); - - assertThat(jobResult, is(notNullValue())); - assertThat(jobResult.getSerializedThrowable().isPresent(), is(false)); - - checkStagingDirectory(configuration, applicationId); - - waitApplicationFinishedElseKillIt( - applicationId, - yarnAppTerminateTimeout, - yarnClusterDescriptor, - sleepIntervalInMS); + ClusterClientProvider<ApplicationId> clusterClientProvider = + deployJobCluster(yarnClusterDescriptor, jobGraph); + + ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient(); + + checkApplicationFinished(clusterClient, yarnClusterDescriptor, jobGraph, configuration); + } + } + + private ClusterClientProvider<ApplicationId> deployJobCluster( + YarnClusterDescriptor yarnClusterDescriptor, JobGraph jobGraph) + throws ClusterDeploymentException { + final int masterMemory = + yarnClusterDescriptor + .getFlinkConfiguration() + .get(JobManagerOptions.TOTAL_PROCESS_MEMORY) + .getMebiBytes(); + final ClusterSpecification clusterSpecification = + new ClusterSpecification.ClusterSpecificationBuilder() + .setMasterMemoryMB(masterMemory) + .setTaskManagerMemoryMB(1024) + .setSlotsPerTaskManager(1) + .createClusterSpecification(); + + File testingJar = TestUtils.findFile("..", new TestUtils.TestJarFinder("flink-yarn-tests")); + + jobGraph.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI())); + return yarnClusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, false); + } + + private void checkApplicationFinished( Review comment: ```suggestion private void waitUntilApplicationFinishes( ``` ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java ########## @@ -1195,53 +1189,78 @@ private ApplicationReport startAppMaster( yarnClient.submitApplication(appContext); LOG.info("Waiting for the cluster to be allocated"); + + YarnClientWrapper yarnClientWrapper = YarnClientWrapper.of(yarnClient, false); + + try { + waitTillTargetState( + yarnClientWrapper, + appId, + YarnApplicationState.ACCEPTED, + YarnApplicationState.RUNNING); + } catch (IOException e) { + throw new YarnDeploymentException("Failed to deploy the cluster.", e); + } + + // since deployment was successful, remove the hook + ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG); + + return ApplicationReportProviderImpl.of( + YarnClientRetrieverImpl.from(yarnClientWrapper, yarnConfiguration), applicationId); Review comment: What if the `YarnApplicationState` is `RUNNING`? Couldn't we then create a finished `ApplicationReportProvider`? Then we wouldn't have to poll again. ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/ApplicationReportProviderImpl.java ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.FlinkException; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.exceptions.YarnException; + +import java.io.IOException; + +/** The implementation of {@link ApplicationReportProvider}. */ +@Internal +public class ApplicationReportProviderImpl implements ApplicationReportProvider { + private final YarnClientRetriever yarnClientRetriever; + private final ApplicationId appId; + + private ApplicationReportProviderImpl( + YarnClientRetriever yarnClientRetriever, ApplicationId applicationId) { + this.yarnClientRetriever = yarnClientRetriever; + this.appId = applicationId; + } + + @Override + public ApplicationReport waitTillSubmissionFinish() throws FlinkException { + try (final YarnClientWrapper yarnClient = yarnClientRetriever.getYarnClient()) { + return YarnClusterDescriptor.waitTillTargetState( + yarnClient, appId, YarnApplicationState.RUNNING); + } catch (YarnException | IOException e) { + throw new FlinkException( + "Errors on getting YARN application report. Maybe application has finished.", + e); + } catch (InterruptedException interruptedException) { Review comment: In case of `InterruptedException` you should set the interrupted flag via: `Thread.currentThread().interrupt();` ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClientRetriever.java ########## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.FlinkException; + +/** Factory for getting {@link org.apache.hadoop.yarn.client.api.YarnClient}. */ Review comment: Maybe say that it returns a `YarnClientWrapper`. ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/ApplicationReportProvider.java ########## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.FlinkException; + +import org.apache.hadoop.yarn.api.records.ApplicationReport; + +/** Factory for getting {@link ApplicationReport}. */ +@Internal +public interface ApplicationReportProvider { + /** + * Waiting for the job to be running and then return the latest {@link ApplicationReport}. And + * when Yarn client is closed or encountering any connection error, it will throw {@link + * FlinkException}. + */ + ApplicationReport waitTillSubmissionFinish() throws FlinkException; Review comment: ```suggestion ApplicationReport waitUntilSubmissionFinishes() throws FlinkException; ``` ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClientRetrieverImpl.java ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.FlinkException; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +/** + * The implementation of {@link YarnClientRetriever} which is used to get a wrapper of yarn client + * for {@link ApplicationReportProviderImpl}. When external yarn client is closed or nullable, it + * will create a dedicated yarn client wrapper. + */ +@Internal +public final class YarnClientRetrieverImpl implements YarnClientRetriever { + private static final Logger LOG = LoggerFactory.getLogger(YarnClientRetrieverImpl.class); + + @Nullable private final YarnClientWrapper externalYarnClient; + private final YarnConfiguration yarnConfiguration; + @Nullable private YarnClientWrapper dedicatedYarnClient; + + private YarnClientRetrieverImpl( + @Nullable YarnClientWrapper externalCreatedYarnClient, Review comment: I would suggest to not provide a `YarnClientWrapper` but a `YarnClient` that is then wrapped as an external `YarnClient` in the constructor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
