mynameborat commented on code in PR #25031:
URL: https://github.com/apache/beam/pull/25031#discussion_r1072605173


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java:
##########
@@ -439,6 +445,25 @@ public void finishBundle() {
       }
     }
 
+    private void closeBundle() throws Exception {
+      long bundleProcessingTimeout = 
pipelineOptions.getBundleProcessingTimeout();
+      if (bundleProcessingTimeout < 0) {
+        // RemoteBundle close blocks until all results are received
+        remoteBundle.close();
+      } else {
+        CompletableFuture<Void> future =
+            CompletableFuture.runAsync(
+                () -> {
+                  try {
+                    remoteBundle.close();
+                  } catch (Exception e) {
+                    throw new RuntimeException(e);

Review Comment:
   Why not throw the same exception instead of wrapping it up in 
`RuntimeException`? 



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPortablePipelineOptions.java:
##########
@@ -29,4 +30,11 @@
   String getFsTokenPath();
 
   void setFsTokenPath(String path);
+
+  @Description(
+      "Wait if necessary for completing a remote bundle processing for at most 
the given time (in milliseconds). if the value of timeout is negative, wait 
forever until the bundle processing is completed. Used only in portable mode.")
+  @Default.Long(-1)
+  long getBundleProcessingTimeout();
+
+  void setBundleProcessingTimeout(long timeoutMs);

Review Comment:
   Why is this specific to the portable mode? Since we support bundling in 
classic mode, we should support this across the two modes given users can 
perform logic within classic mode as well that can timeout (e.g., async 
processing)



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPortablePipelineOptions.java:
##########
@@ -29,4 +30,11 @@
   String getFsTokenPath();
 
   void setFsTokenPath(String path);
+
+  @Description(
+      "Wait if necessary for completing a remote bundle processing for at most 
the given time (in milliseconds). if the value of timeout is negative, wait 
forever until the bundle processing is completed. Used only in portable mode.")
+  @Default.Long(-1)
+  long getBundleProcessingTimeout();
+
+  void setBundleProcessingTimeout(long timeoutMs);

Review Comment:
   We can start with supporting the timeout in portable mode in this PR and 
keep the classic mode implementation in another PR



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java:
##########
@@ -439,6 +445,25 @@ public void finishBundle() {
       }
     }
 
+    private void closeBundle() throws Exception {
+      long bundleProcessingTimeout = 
pipelineOptions.getBundleProcessingTimeout();
+      if (bundleProcessingTimeout < 0) {
+        // RemoteBundle close blocks until all results are received
+        remoteBundle.close();
+      } else {
+        CompletableFuture<Void> future =
+            CompletableFuture.runAsync(
+                () -> {
+                  try {
+                    remoteBundle.close();
+                  } catch (Exception e) {
+                    throw new RuntimeException(e);
+                  }
+                });
+        future.get(bundleProcessingTimeout, TimeUnit.MILLISECONDS);

Review Comment:
   This would throw `TimeoutException`/`ExecutionException` and mask the 
original cause in case of latter.
   I'd preferably bubble up the original cause so that we know what is wrong in 
case of failure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to