This is an automated email from the ASF dual-hosted git repository.
damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3cadc83d348 flink portable client configurations (#31188)
3cadc83d348 is described below
commit 3cadc83d348605d07837532701c0fb4de0feb116
Author: Marc hurabielle <[email protected]>
AuthorDate: Fri May 31 01:46:20 2024 +0900
flink portable client configurations (#31188)
* add support for FlinkJobServer configuration
* remove hardcoded timeout for FlinkPortableClient
---
.../flink/FlinkPortableClientEntryPoint.java | 47 +++++++++++++---------
.../runners/jobsubmission/JobServerDriver.java | 12 ++++++
2 files changed, 39 insertions(+), 20 deletions(-)
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientEntryPoint.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientEntryPoint.java
index 47d3959ad18..8f0ecf3efbd 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientEntryPoint.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientEntryPoint.java
@@ -73,14 +73,10 @@ import org.slf4j.LoggerFactory;
public class FlinkPortableClientEntryPoint {
private static final Logger LOG =
LoggerFactory.getLogger(FlinkPortableClientEntryPoint.class);
private static final String JOB_ENDPOINT_FLAG = "--job_endpoint";
- private static final Duration JOB_INVOCATION_TIMEOUT =
Duration.ofSeconds(30);
- private static final Duration JOB_SERVICE_STARTUP_TIMEOUT =
Duration.ofSeconds(30);
-
private final String driverCmd;
private FlinkJobServerDriver jobServer;
private Thread jobServerThread;
private DetachedJobInvokerFactory jobInvokerFactory;
- private int jobPort = 0; // pick any free port
public FlinkPortableClientEntryPoint(String driverCmd) {
Preconditions.checkState(
@@ -96,8 +92,8 @@ public class FlinkPortableClientEntryPoint {
FlinkPortableClientEntryPoint runner =
new FlinkPortableClientEntryPoint(configuration.driverCmd);
try {
- runner.startJobService();
- runner.runDriverProgram();
+ runner.startJobService(configuration);
+
runner.runDriverProgram(Duration.ofSeconds(configuration.jobInvocationTimeoutSeconds));
} catch (Exception e) {
throw new RuntimeException(String.format("Job %s failed.",
configuration.driverCmd), e);
} finally {
@@ -107,7 +103,8 @@ public class FlinkPortableClientEntryPoint {
LOG.info("Job submitted successfully.");
}
- private static class EntryPointConfiguration {
+ private static class EntryPointConfiguration
+ extends FlinkJobServerDriver.FlinkServerConfiguration {
@Option(
name = "--driver-cmd",
required = true,
@@ -115,6 +112,16 @@ public class FlinkPortableClientEntryPoint {
"Command that launches the Python driver program. "
+ "(The job service endpoint will be appended as
--job_endpoint=localhost:<port>.)")
private String driverCmd;
+
+ @Option(
+ name = "--job-service-startup-timeout-seconds",
+ usage = "Timeout for the job service start in seconds")
+ private long jobServiceStartupTimeoutSeconds = 30;
+
+ @Option(
+ name = "--job-invocation-timeout-seconds",
+ usage = "Timeout for the job submission in seconds")
+ private long jobInvocationTimeoutSeconds = 30;
}
private static EntryPointConfiguration parseArgs(String[] args) {
@@ -127,20 +134,20 @@ public class FlinkPortableClientEntryPoint {
parser.printUsage(System.err);
throw new IllegalArgumentException("Unable to parse command line
arguments.", e);
}
+ configuration.setPort(0);
+ configuration.setArtifactPort(0);
+ configuration.setExpansionPort(0);
return configuration;
}
- private void startJobService() throws Exception {
+ private void startJobService(EntryPointConfiguration configuration) throws
Exception {
jobInvokerFactory = new DetachedJobInvokerFactory();
- jobServer =
- FlinkJobServerDriver.fromConfig(
- FlinkJobServerDriver.parseArgs(
- new String[] {"--job-port=" + jobPort, "--artifact-port=0",
"--expansion-port=0"}),
- jobInvokerFactory);
+ jobServer = FlinkJobServerDriver.fromConfig(configuration,
jobInvokerFactory);
jobServerThread = new Thread(jobServer);
jobServerThread.start();
- Deadline deadline = Deadline.fromNow(JOB_SERVICE_STARTUP_TIMEOUT);
+ Deadline deadline =
+
Deadline.fromNow(Duration.ofSeconds(configuration.jobServiceStartupTimeoutSeconds));
while (jobServer.getJobServerUrl() == null && deadline.hasTimeLeft()) {
try {
Thread.sleep(500);
@@ -160,7 +167,7 @@ public class FlinkPortableClientEntryPoint {
}
}
- private void runDriverProgram() throws Exception {
+ private void runDriverProgram(Duration startTimeout) throws Exception {
ProcessManager processManager = ProcessManager.create();
String executable = "bash";
List<String> args =
@@ -177,7 +184,7 @@ public class FlinkPortableClientEntryPoint {
LOG.info("Started driver program");
// await effect of the driver program submitting the job
- jobInvokerFactory.executeDetachedJob();
+ jobInvokerFactory.executeDetachedJob(startTimeout);
} catch (Exception e) {
try {
processManager.stopProcess(processId);
@@ -247,13 +254,13 @@ public class FlinkPortableClientEntryPoint {
};
}
- private void executeDetachedJob() throws Exception {
- long timeoutSeconds = JOB_INVOCATION_TIMEOUT.getSeconds();
- if (latch.await(timeoutSeconds, TimeUnit.SECONDS)) {
+ private void executeDetachedJob(Duration startTimeout) throws Exception {
+ if (latch.await(startTimeout.getSeconds(), TimeUnit.SECONDS)) {
actualPipelineRunner.run(pipeline, jobInfo);
} else {
throw new TimeoutException(
- String.format("Timeout of %s seconds waiting for job submission.",
timeoutSeconds));
+ String.format(
+ "Timeout of %s seconds waiting for job submission.",
startTimeout.getSeconds()));
}
}
}
diff --git
a/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobServerDriver.java
b/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobServerDriver.java
index a342593b21f..885e54bfb66 100644
---
a/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobServerDriver.java
+++
b/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobServerDriver.java
@@ -119,14 +119,26 @@ public abstract class JobServerDriver implements Runnable
{
return port;
}
+ public void setPort(int port) {
+ this.port = port;
+ }
+
public int getArtifactPort() {
return artifactPort;
}
+ public void setArtifactPort(int artifactPort) {
+ this.artifactPort = artifactPort;
+ }
+
public int getExpansionPort() {
return expansionPort;
}
+ public void setExpansionPort(int expansionPort) {
+ this.expansionPort = expansionPort;
+ }
+
public String getArtifactStagingPath() {
return artifactStagingPath;
}