AyWa commented on code in PR #28068:
URL: https://github.com/apache/beam/pull/28068#discussion_r1314098646


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientEntryPoint.java:
##########
@@ -73,28 +79,41 @@
 public class FlinkPortableClientEntryPoint {
   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkPortableClientEntryPoint.class);
   private static final String JOB_ENDPOINT_FLAG = "--job_endpoint";
-  private static final Duration JOB_INVOCATION_TIMEOUT = 
Duration.ofSeconds(30);
-  private static final Duration JOB_SERVICE_STARTUP_TIMEOUT = 
Duration.ofSeconds(30);
+  // TODO: add environments variable or parameters
+  private static final Duration JOB_INVOCATION_TIMEOUT = 
Duration.ofSeconds(90);
+  private static final Duration JOB_SERVICE_STARTUP_TIMEOUT = 
Duration.ofSeconds(90);
 
   private final String driverCmd;
+  private final String artifactStagingPath;
+  private final boolean cleanArtifactsPerJob;
   private FlinkJobServerDriver jobServer;
   private Thread jobServerThread;
   private DetachedJobInvokerFactory jobInvokerFactory;
   private int jobPort = 0; // pick any free port
 
-  public FlinkPortableClientEntryPoint(String driverCmd) {
+  public FlinkPortableClientEntryPoint(String driverCmd, String 
artifactStagingPath,
+                                       boolean cleanArtifactsPerJob) {
     Preconditions.checkState(
         !driverCmd.contains(JOB_ENDPOINT_FLAG),
         "Driver command must not contain " + JOB_ENDPOINT_FLAG);
     this.driverCmd = driverCmd;
+    this.cleanArtifactsPerJob = cleanArtifactsPerJob;
+    this.artifactStagingPath = artifactStagingPath;
   }
 
   /** Main method to be called standalone or by Flink (CLI or REST API). */
   public static void main(String[] args) throws Exception {
+    // TODO: Expose the fileSystem related options.
+    PipelineOptions options = PipelineOptionsFactory.create();
+    // Limiting gcs upload buffer to reduce memory usage while doing parallel 
artifact uploads.
+    options.as(GcsOptions.class).setGcsUploadBufferSizeBytes(1024 * 1024);

Review Comment:
   > Another concern is that FileSystems.setDefaultPipelineOptions(options) 
will affect all following behaviors until its getting overwritten. So if there 
is an approach not affecting global it would be ideal
   
   For now this is the same code as the `JobServerDriver` have. So the code is 
a bit duplicated, but the behavior should be same



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to