scwhittle commented on code in PR #35120:
URL: https://github.com/apache/beam/pull/35120#discussion_r2216405301


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java:
##########
@@ -427,4 +427,16 @@ public Duration create(PipelineOptions options) {
           : Duration.ofMinutes(1);
     }
   }
+
+  /**
+   * The time limit (in minute) that an SDK worker allows for a PTransform 
operation before
+   * signaling the runner harness to restart the SDK worker.

Review Comment:
   nit: clarify that zero means no time-limit
   
   "There is no time limit if the value is zero."



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java:
##########
@@ -279,7 +279,15 @@ public static void main(
     ExecutorService executorService =
         options.as(ExecutorOptions.class).getScheduledExecutorService();
     ExecutionStateSampler executionStateSampler =
-        new ExecutionStateSampler(options, System::currentTimeMillis);
+        new ExecutionStateSampler(
+            options,
+            System::currentTimeMillis,
+            message -> {
+              System.err.println(
+                  "\n*** FATAL ERROR: Timeout occurred! Exiting JVM with 
status 1. ***");
+              System.err.println("Details: " + message);
+              System.exit(1);

Review Comment:
   instead of System.exit() we could have a completablefuture that we notify 
and add this to what is waited on below on line 423
   
   something like:
   ```
         if (options.as(SdkHarnessOptions.class).getEnableLogViaFnApi()) {
           CompletableFuture.anyOf(control.terminationFuture(), 
logging.terminationFuture(), samplerTerminationFuture).get();
         } else {
           CompletableFuture.anyOf(control.terminationFuture(), 
samplerTerminationFuture).get();
         }
   ```
   
   That will let us take the normal shutdown path which has the benefit of 
flushing the logging handler and other clean shutdown logic



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java:
##########
@@ -279,7 +279,15 @@ public static void main(
     ExecutorService executorService =
         options.as(ExecutorOptions.class).getScheduledExecutorService();
     ExecutionStateSampler executionStateSampler =
-        new ExecutionStateSampler(options, System::currentTimeMillis);
+        new ExecutionStateSampler(
+            options,
+            System::currentTimeMillis,
+            message -> {
+              System.err.println(

Review Comment:
   let's LOG as well



##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ExecutionStateSamplerTest.java:
##########
@@ -722,4 +739,159 @@ public void testErrorState() throws Exception {
     tracker.reset();
     assertTrue(state1.error());
   }
+
+  @Test
+  public void testDefaultElementProcessingTimeoutMinutesNoExceptionThrown() 
throws Exception {

Review Comment:
   nit: the noexceptionthrown is a little confusing since we're not throwing an 
exception. How about testDefaultElementProcessingHasNoTimeout and 
testUserSpecifiedElementProcessingTimeoutNotExceeded and 
testUserSpecifiedElementProcessingTimeoutExceeded
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to