mynameborat commented on code in PR #25031:
URL: https://github.com/apache/beam/pull/25031#discussion_r1072605173
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java:
##########
@@ -439,6 +445,25 @@ public void finishBundle() {
}
}
+ private void closeBundle() throws Exception {
+ long bundleProcessingTimeout =
pipelineOptions.getBundleProcessingTimeout();
+ if (bundleProcessingTimeout < 0) {
+ // RemoteBundle close blocks until all results are received
+ remoteBundle.close();
+ } else {
+ CompletableFuture<Void> future =
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ remoteBundle.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
Review Comment:
Why not throw the same exception instead of wrapping it up in
`RuntimeException`?
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPortablePipelineOptions.java:
##########
@@ -29,4 +30,11 @@
String getFsTokenPath();
void setFsTokenPath(String path);
+
+ @Description(
+ "Wait if necessary for completing a remote bundle processing for at most
the given time (in milliseconds). if the value of timeout is negative, wait
forever until the bundle processing is completed. Used only in portable mode.")
+ @Default.Long(-1)
+ long getBundleProcessingTimeout();
+
+ void setBundleProcessingTimeout(long timeoutMs);
Review Comment:
Why is this specific to the portable mode? Since we support bundling in
classic mode, we should support this across the two modes given users can
perform logic within classic mode as well that can timeout (e.g., async
processing)
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPortablePipelineOptions.java:
##########
@@ -29,4 +30,11 @@
String getFsTokenPath();
void setFsTokenPath(String path);
+
+ @Description(
+ "Wait if necessary for completing a remote bundle processing for at most
the given time (in milliseconds). if the value of timeout is negative, wait
forever until the bundle processing is completed. Used only in portable mode.")
+ @Default.Long(-1)
+ long getBundleProcessingTimeout();
+
+ void setBundleProcessingTimeout(long timeoutMs);
Review Comment:
We can start with supporting the timeout in portable mode in this PR and
keep the classic mode implementation in another PR
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java:
##########
@@ -439,6 +445,25 @@ public void finishBundle() {
}
}
+ private void closeBundle() throws Exception {
+ long bundleProcessingTimeout =
pipelineOptions.getBundleProcessingTimeout();
+ if (bundleProcessingTimeout < 0) {
+ // RemoteBundle close blocks until all results are received
+ remoteBundle.close();
+ } else {
+ CompletableFuture<Void> future =
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ remoteBundle.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ future.get(bundleProcessingTimeout, TimeUnit.MILLISECONDS);
Review Comment:
This would throw `TimeoutException`/`ExecutionException` and mask the
original cause in case of latter.
I'd preferably bubble up the original cause so that we know what is wrong in
case of failure.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]