ryan-mbuashundip commented on code in PR #35120: URL: https://github.com/apache/beam/pull/35120#discussion_r2136213340
########## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java: ########## @@ -97,6 +100,18 @@ public ExecutionStateSampler(PipelineOptions options, MillisProvider clock) { : Integer.parseInt(samplingPeriodMills); this.clock = clock; this.activeStateTrackers = new HashSet<>(); + this.lullTimeMinuteForRestart = + Math.max( + options.getPtransformTimeoutDuration(), + ExecutionStateSampler.MIN_LULL_TIME_MINUTE_FOR_RESTART); + if (options.getPtransformTimeoutDuration() + < ExecutionStateSampler.MIN_LULL_TIME_MINUTE_FOR_RESTART) { + LOG.info( + String.format( + "The user defined ptransformTimeoutDuration might be too small for " + + "a pTransform operation and has been set to %d minutes", Review Comment: Nit: "The user-defined ptransformTimeoutDuration ~~might be too small~~ is too short for a PTransform operation and has been..." ########## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java: ########## @@ -357,6 +377,19 @@ private void takeSample(long currentTimeMillis, long millisSinceLastSample) { transitionsAtLastSample = transitionsAtThisSample; } else { long lullTimeMs = currentTimeMillis - lastTransitionTimeMillis.get(); + + try { Review Comment: @scwhittle do you know? ########## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java: ########## @@ -357,6 +377,19 @@ private void takeSample(long currentTimeMillis, long millisSinceLastSample) { transitionsAtLastSample = transitionsAtThisSample; } else { long lullTimeMs = currentTimeMillis - lastTransitionTimeMillis.get(); + + try { + if (lullTimeMs > TimeUnit.MINUTES.toMillis(lullTimeMinuteForRestart)) { + throw new TimeoutException( + String.format( + "The ptransform has been stuck for more than %d minutes, the SDK worker will" + + " restart", + lullTimeMinuteForRestart)); + } + } catch (TimeoutException e) { + LOG.error(e.getMessage()); Review Comment: I believe the exception must be uncaught in the SDK for it to be handled by RunnerV2. You can still catch the exception in the test like you are doing currently. Or @scwhittle do you prefer using `System.exit(1)` here? ########## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java: ########## @@ -80,6 +82,7 @@ public class ExecutionStateSampler { .toFormatter(); private final int periodMs; private final MillisProvider clock; + private final int lullTimeMinuteForRestart; Review Comment: Using minutes as the unit for the flag is fine but I prefer to use milliseconds in `ExecutionStateSampler.java` to be consistent with the other time variables in the file. Same goes for `MIN_LULL_TIME_MINUTE_FOR_RESTART`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org