scwhittle commented on code in PR #35120: URL: https://github.com/apache/beam/pull/35120#discussion_r2216405301
########## sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java: ########## @@ -427,4 +427,16 @@ public Duration create(PipelineOptions options) { : Duration.ofMinutes(1); } } + + /** + * The time limit (in minute) that an SDK worker allows for a PTransform operation before + * signaling the runner harness to restart the SDK worker. Review Comment: nit: clarify that zero means no time-limit "There is no time limit if the value is zero." ########## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java: ########## @@ -279,7 +279,15 @@ public static void main( ExecutorService executorService = options.as(ExecutorOptions.class).getScheduledExecutorService(); ExecutionStateSampler executionStateSampler = - new ExecutionStateSampler(options, System::currentTimeMillis); + new ExecutionStateSampler( + options, + System::currentTimeMillis, + message -> { + System.err.println( + "\n*** FATAL ERROR: Timeout occurred! Exiting JVM with status 1. ***"); + System.err.println("Details: " + message); + System.exit(1); Review Comment: instead of System.exit() we could have a completablefuture that we notify and add this to what is waited on below on line 423 something like: ``` if (options.as(SdkHarnessOptions.class).getEnableLogViaFnApi()) { CompletableFuture.anyOf(control.terminationFuture(), logging.terminationFuture(), samplerTerminationFuture).get(); } else { CompletableFuture.anyOf(control.terminationFuture(), samplerTerminationFuture).get(); } ``` That will let us take the normal shutdown path which has the benefit of flushing the logging handler and other clean shutdown logic ########## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java: ########## @@ -279,7 +279,15 @@ public static void main( ExecutorService executorService = options.as(ExecutorOptions.class).getScheduledExecutorService(); ExecutionStateSampler executionStateSampler = - new ExecutionStateSampler(options, System::currentTimeMillis); + new ExecutionStateSampler( + options, + System::currentTimeMillis, + message -> { + System.err.println( Review Comment: let's LOG as well ########## sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ExecutionStateSamplerTest.java: ########## @@ -722,4 +739,159 @@ public void testErrorState() throws Exception { tracker.reset(); assertTrue(state1.error()); } + + @Test + public void testDefaultElementProcessingTimeoutMinutesNoExceptionThrown() throws Exception { Review Comment: nit: the noexceptionthrown is a little confusing since we're not throwing an exception. How about testDefaultElementProcessingHasNoTimeout and testUserSpecifiedElementProcessingTimeoutNotExceeded and testUserSpecifiedElementProcessingTimeoutExceeded -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org