tillrohrmann commented on a change in pull request #18220:
URL: https://github.com/apache/flink/pull/18220#discussion_r777382425



##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/program/AsyncClusterClientProvider.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+/** Factory for {@link ClusterClient ClusterClients}. */
+@Internal
+public interface AsyncClusterClientProvider<T> extends 
ClusterClientProvider<T> {
+
+    CompletableFuture<ClusterClient<T>> getClusterClientAsync();
+
+    default ClusterClient<T> getClusterClient() {
+        try {
+            return getClusterClientAsync().get();

Review comment:
       ```suggestion
               return getClusterClientAsync().join();
   ```
   
   then you don't have to catch exceptions.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -1240,8 +1247,17 @@ private ApplicationReport startAppMaster(
             Thread.sleep(250);
         }
 
-        // since deployment was successful, remove the hook
-        ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, 
getClass().getSimpleName(), LOG);
+        if (ensureAppDoRun) {
+            if (ClusterEntrypoint.ExecutionMode.DETACHED
+                    .name()
+                    .equals(
+                            flinkConfiguration.get(
+                                    
ClusterEntrypoint.INTERNAL_CLUSTER_EXECUTION_MODE))) {
+                logDetachedClusterInformation(applicationId, LOG);
+            }

Review comment:
       Do we have to keep this logic here? It somehow does not seem to belong 
here.

##########
File path: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
##########
@@ -133,6 +140,38 @@ public void testPerJobWithArchive() throws Exception {
         runTest(() -> deployPerJob(flinkConfig, archiveJobGraph, true));
     }
 
+    @Test
+    public void testPerJobModeWithAsyncClusterClientProvider() throws 
Exception {
+        runTest(
+                () -> {
+                    Configuration configuration =
+                            
createDefaultConfiguration(YarnConfigOptions.UserJarInclusion.FIRST);
+                    JobGraph jobGraph = getTestingJobGraph();
+                    jobGraph.setJobType(JobType.STREAMING);
+                    try (final YarnClusterDescriptor yarnClusterDescriptor =
+                            createYarnClusterDescriptor(configuration)) {
+
+                        ClusterClientProvider<ApplicationId> 
clusterClientProvider =
+                                deployJobCluster(yarnClusterDescriptor, 
jobGraph);
+
+                        ApplicationId applicationId = 
yarnClusterDescriptor.getApplicationId();
+                        YarnClient yarnClient = getYarnClient();
+                        ApplicationReport report = 
yarnClient.getApplicationReport(applicationId);
+
+                        assertEquals(ACCEPTED, 
report.getYarnApplicationState());
+
+                        ClusterClient<ApplicationId> clusterClient =
+                                clusterClientProvider.getClusterClient();
+
+                        report = 
yarnClient.getApplicationReport(applicationId);
+                        assertEquals(RUNNING, 
report.getYarnApplicationState());
+
+                        checkApplicationFinished(
+                                clusterClient, yarnClusterDescriptor, 
jobGraph, configuration);
+                    }
+                });

Review comment:
       What exactly is this test testing that we aren't testing yet?

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnAsyncClusterClientProviderFactory.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.client.program.AsyncClusterClientProvider;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+
+import java.util.concurrent.CompletableFuture;
+
+/** A {@link YarnAsyncClusterClientProviderFactory} for a YARN cluster. */
+@Internal
+public class YarnAsyncClusterClientProviderFactory {
+
+    static AsyncClusterClientProvider from(

Review comment:
       Raw usage.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -1195,9 +1184,22 @@ private ApplicationReport startAppMaster(
         yarnClient.submitApplication(appContext);
 
         LOG.info("Waiting for the cluster to be allocated");
+
+        waitAppSubmission(yarnClient, appId, false);
+
+        // since deployment was successful, remove the hook
+        ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, 
getClass().getSimpleName(), LOG);
+
+        return () -> waitAppSubmission(yarnClient, appId, true);

Review comment:
       Let's use concrete classes. This makes it a lot easier to test them 
properly.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnAsyncClusterClientProviderFactory.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.client.program.AsyncClusterClientProvider;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+
+import java.util.concurrent.CompletableFuture;
+
+/** A {@link YarnAsyncClusterClientProviderFactory} for a YARN cluster. */
+@Internal
+public class YarnAsyncClusterClientProviderFactory {
+
+    static AsyncClusterClientProvider from(
+            ApplicationReportProvider applicationReportProvider,
+            final Configuration flinkConfiguration) {
+        return () -> {
+            try {
+                ApplicationReport report = 
applicationReportProvider.waitSubmissionFinish();
+                return CompletableFuture.completedFuture(
+                        new RestClusterClient<ApplicationId>(
+                                flinkConfiguration, 
report.getApplicationId()));

Review comment:
       This is a blocking implementation of an asynchronous interface. I think 
this defies the purpose of the `AsyncClusterClientProvider`.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnAsyncClusterClientProviderFactory.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.client.program.AsyncClusterClientProvider;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+
+import java.util.concurrent.CompletableFuture;
+
+/** A {@link YarnAsyncClusterClientProviderFactory} for a YARN cluster. */
+@Internal
+public class YarnAsyncClusterClientProviderFactory {
+
+    static AsyncClusterClientProvider from(
+            ApplicationReportProvider applicationReportProvider,
+            final Configuration flinkConfiguration) {
+        return () -> {

Review comment:
       I think it is better to not rely too much on anonymous classes. Instead 
I would suggest to implement named class (admittedly this is a bit more boiler 
plate but more maintainable).

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -1195,9 +1184,22 @@ private ApplicationReport startAppMaster(
         yarnClient.submitApplication(appContext);
 
         LOG.info("Waiting for the cluster to be allocated");
+
+        waitAppSubmission(yarnClient, appId, false);

Review comment:
       Instead of using booleans to encode the behaviour can't we define a 
target state for which we want to wait? E.g. here I want to wait until we see 
the `ACCEPTED` state.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -1195,9 +1184,22 @@ private ApplicationReport startAppMaster(
         yarnClient.submitApplication(appContext);
 
         LOG.info("Waiting for the cluster to be allocated");
+
+        waitAppSubmission(yarnClient, appId, false);
+
+        // since deployment was successful, remove the hook
+        ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, 
getClass().getSimpleName(), LOG);
+
+        return () -> waitAppSubmission(yarnClient, appId, true);

Review comment:
       What happens if this supplier is called after a long time and when the 
yarn application has been removed from the cluster? Won't this then fail with 
`"Failed to deploy the cluster."`? This seems wrong.

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/program/AsyncClusterClientProvider.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+/** Factory for {@link ClusterClient ClusterClients}. */
+@Internal
+public interface AsyncClusterClientProvider<T> extends 
ClusterClientProvider<T> {

Review comment:
       Given how this interface is used, I think we don't need it. We can also 
implement a `ClusterClientProvider` that resolves the application report in a 
blocking fashion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to