alnzng commented on code in PR #25031:
URL: https://github.com/apache/beam/pull/25031#discussion_r1072645256


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPortablePipelineOptions.java:
##########
@@ -29,4 +30,11 @@
   String getFsTokenPath();
 
   void setFsTokenPath(String path);
+
+  @Description(
+      "Wait if necessary for completing a remote bundle processing for at most 
the given time (in milliseconds). if the value of timeout is negative, wait 
forever until the bundle processing is completed. Used only in portable mode.")
+  @Default.Long(-1)
+  long getBundleProcessingTimeout();
+
+  void setBundleProcessingTimeout(long timeoutMs);

Review Comment:
   Good suggestion. I think we can move this config to `SamzaPipelineOptions` 
and call out that this config is supported in portable mode only.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java:
##########
@@ -439,6 +445,25 @@ public void finishBundle() {
       }
     }
 
+    private void closeBundle() throws Exception {
+      long bundleProcessingTimeout = 
pipelineOptions.getBundleProcessingTimeout();
+      if (bundleProcessingTimeout < 0) {
+        // RemoteBundle close blocks until all results are received
+        remoteBundle.close();
+      } else {
+        CompletableFuture<Void> future =
+            CompletableFuture.runAsync(
+                () -> {
+                  try {
+                    remoteBundle.close();
+                  } catch (Exception e) {
+                    throw new RuntimeException(e);
+                  }
+                });
+        future.get(bundleProcessingTimeout, TimeUnit.MILLISECONDS);

Review Comment:
   I didn't do that because of:
   1. The "remoteBundle.close()" method throws the "Exception" anyway. It seems 
we can't throw these concrete exceptions(e.g. TimeoutException) in the method 
signature.
   2. the current `finishBundle()` implementation just simply catches all 
exceptions without differentiating the concrete exceptions
   
   



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java:
##########
@@ -439,6 +445,25 @@ public void finishBundle() {
       }
     }
 
+    private void closeBundle() throws Exception {
+      long bundleProcessingTimeout = 
pipelineOptions.getBundleProcessingTimeout();
+      if (bundleProcessingTimeout < 0) {
+        // RemoteBundle close blocks until all results are received
+        remoteBundle.close();
+      } else {
+        CompletableFuture<Void> future =
+            CompletableFuture.runAsync(
+                () -> {
+                  try {
+                    remoteBundle.close();
+                  } catch (Exception e) {
+                    throw new RuntimeException(e);

Review Comment:
   The `close()` method throws the checked exception `Exception`, we have to 
handle the checked exception explicitly in the Java lamda expression.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to