This is an automated email from the ASF dual-hosted git repository.

damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3cadc83d348 flink portable client configurations (#31188)
3cadc83d348 is described below

commit 3cadc83d348605d07837532701c0fb4de0feb116
Author: Marc hurabielle <[email protected]>
AuthorDate: Fri May 31 01:46:20 2024 +0900

    flink portable client configurations (#31188)
    
    * add support for FlinkJobServer configuration
    
    * remove hardcoded timeout for FlinkPortableClient
---
 .../flink/FlinkPortableClientEntryPoint.java       | 47 +++++++++++++---------
 .../runners/jobsubmission/JobServerDriver.java     | 12 ++++++
 2 files changed, 39 insertions(+), 20 deletions(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientEntryPoint.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientEntryPoint.java
index 47d3959ad18..8f0ecf3efbd 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientEntryPoint.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientEntryPoint.java
@@ -73,14 +73,10 @@ import org.slf4j.LoggerFactory;
 public class FlinkPortableClientEntryPoint {
   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkPortableClientEntryPoint.class);
   private static final String JOB_ENDPOINT_FLAG = "--job_endpoint";
-  private static final Duration JOB_INVOCATION_TIMEOUT = 
Duration.ofSeconds(30);
-  private static final Duration JOB_SERVICE_STARTUP_TIMEOUT = 
Duration.ofSeconds(30);
-
   private final String driverCmd;
   private FlinkJobServerDriver jobServer;
   private Thread jobServerThread;
   private DetachedJobInvokerFactory jobInvokerFactory;
-  private int jobPort = 0; // pick any free port
 
   public FlinkPortableClientEntryPoint(String driverCmd) {
     Preconditions.checkState(
@@ -96,8 +92,8 @@ public class FlinkPortableClientEntryPoint {
     FlinkPortableClientEntryPoint runner =
         new FlinkPortableClientEntryPoint(configuration.driverCmd);
     try {
-      runner.startJobService();
-      runner.runDriverProgram();
+      runner.startJobService(configuration);
+      
runner.runDriverProgram(Duration.ofSeconds(configuration.jobInvocationTimeoutSeconds));
     } catch (Exception e) {
       throw new RuntimeException(String.format("Job %s failed.", 
configuration.driverCmd), e);
     } finally {
@@ -107,7 +103,8 @@ public class FlinkPortableClientEntryPoint {
     LOG.info("Job submitted successfully.");
   }
 
-  private static class EntryPointConfiguration {
+  private static class EntryPointConfiguration
+      extends FlinkJobServerDriver.FlinkServerConfiguration {
     @Option(
         name = "--driver-cmd",
         required = true,
@@ -115,6 +112,16 @@ public class FlinkPortableClientEntryPoint {
             "Command that launches the Python driver program. "
                 + "(The job service endpoint will be appended as 
--job_endpoint=localhost:<port>.)")
     private String driverCmd;
+
+    @Option(
+        name = "--job-service-startup-timeout-seconds",
+        usage = "Timeout for the job service start in seconds")
+    private long jobServiceStartupTimeoutSeconds = 30;
+
+    @Option(
+        name = "--job-invocation-timeout-seconds",
+        usage = "Timeout for the job submission in seconds")
+    private long jobInvocationTimeoutSeconds = 30;
   }
 
   private static EntryPointConfiguration parseArgs(String[] args) {
@@ -127,20 +134,20 @@ public class FlinkPortableClientEntryPoint {
       parser.printUsage(System.err);
       throw new IllegalArgumentException("Unable to parse command line 
arguments.", e);
     }
+    configuration.setPort(0);
+    configuration.setArtifactPort(0);
+    configuration.setExpansionPort(0);
     return configuration;
   }
 
-  private void startJobService() throws Exception {
+  private void startJobService(EntryPointConfiguration configuration) throws 
Exception {
     jobInvokerFactory = new DetachedJobInvokerFactory();
-    jobServer =
-        FlinkJobServerDriver.fromConfig(
-            FlinkJobServerDriver.parseArgs(
-                new String[] {"--job-port=" + jobPort, "--artifact-port=0", 
"--expansion-port=0"}),
-            jobInvokerFactory);
+    jobServer = FlinkJobServerDriver.fromConfig(configuration, 
jobInvokerFactory);
     jobServerThread = new Thread(jobServer);
     jobServerThread.start();
 
-    Deadline deadline = Deadline.fromNow(JOB_SERVICE_STARTUP_TIMEOUT);
+    Deadline deadline =
+        
Deadline.fromNow(Duration.ofSeconds(configuration.jobServiceStartupTimeoutSeconds));
     while (jobServer.getJobServerUrl() == null && deadline.hasTimeLeft()) {
       try {
         Thread.sleep(500);
@@ -160,7 +167,7 @@ public class FlinkPortableClientEntryPoint {
     }
   }
 
-  private void runDriverProgram() throws Exception {
+  private void runDriverProgram(Duration startTimeout) throws Exception {
     ProcessManager processManager = ProcessManager.create();
     String executable = "bash";
     List<String> args =
@@ -177,7 +184,7 @@ public class FlinkPortableClientEntryPoint {
       LOG.info("Started driver program");
 
       // await effect of the driver program submitting the job
-      jobInvokerFactory.executeDetachedJob();
+      jobInvokerFactory.executeDetachedJob(startTimeout);
     } catch (Exception e) {
       try {
         processManager.stopProcess(processId);
@@ -247,13 +254,13 @@ public class FlinkPortableClientEntryPoint {
       };
     }
 
-    private void executeDetachedJob() throws Exception {
-      long timeoutSeconds = JOB_INVOCATION_TIMEOUT.getSeconds();
-      if (latch.await(timeoutSeconds, TimeUnit.SECONDS)) {
+    private void executeDetachedJob(Duration startTimeout) throws Exception {
+      if (latch.await(startTimeout.getSeconds(), TimeUnit.SECONDS)) {
         actualPipelineRunner.run(pipeline, jobInfo);
       } else {
         throw new TimeoutException(
-            String.format("Timeout of %s seconds waiting for job submission.", 
timeoutSeconds));
+            String.format(
+                "Timeout of %s seconds waiting for job submission.", 
startTimeout.getSeconds()));
       }
     }
   }
diff --git 
a/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobServerDriver.java
 
b/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobServerDriver.java
index a342593b21f..885e54bfb66 100644
--- 
a/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobServerDriver.java
+++ 
b/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobServerDriver.java
@@ -119,14 +119,26 @@ public abstract class JobServerDriver implements Runnable 
{
       return port;
     }
 
+    public void setPort(int port) {
+      this.port = port;
+    }
+
     public int getArtifactPort() {
       return artifactPort;
     }
 
+    public void setArtifactPort(int artifactPort) {
+      this.artifactPort = artifactPort;
+    }
+
     public int getExpansionPort() {
       return expansionPort;
     }
 
+    public void setExpansionPort(int expansionPort) {
+      this.expansionPort = expansionPort;
+    }
+
     public String getArtifactStagingPath() {
       return artifactStagingPath;
     }

Reply via email to