ryan-mbuashundip commented on code in PR #35120:
URL: https://github.com/apache/beam/pull/35120#discussion_r2136213340


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java:
##########
@@ -97,6 +100,18 @@ public ExecutionStateSampler(PipelineOptions options, 
MillisProvider clock) {
             : Integer.parseInt(samplingPeriodMills);
     this.clock = clock;
     this.activeStateTrackers = new HashSet<>();
+    this.lullTimeMinuteForRestart =
+        Math.max(
+            options.getPtransformTimeoutDuration(),
+            ExecutionStateSampler.MIN_LULL_TIME_MINUTE_FOR_RESTART);
+    if (options.getPtransformTimeoutDuration()
+        < ExecutionStateSampler.MIN_LULL_TIME_MINUTE_FOR_RESTART) {
+      LOG.info(
+          String.format(
+              "The user defined ptransformTimeoutDuration might be too small 
for "
+                  + "a pTransform operation and has been set to %d minutes",

Review Comment:
   Nit: "The user-defined ptransformTimeoutDuration ~~might be too small~~ is 
too short for a PTransform operation and has been..."



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java:
##########
@@ -357,6 +377,19 @@ private void takeSample(long currentTimeMillis, long 
millisSinceLastSample) {
         transitionsAtLastSample = transitionsAtThisSample;
       } else {
         long lullTimeMs = currentTimeMillis - lastTransitionTimeMillis.get();
+
+        try {

Review Comment:
   @scwhittle do you know?



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java:
##########
@@ -357,6 +377,19 @@ private void takeSample(long currentTimeMillis, long 
millisSinceLastSample) {
         transitionsAtLastSample = transitionsAtThisSample;
       } else {
         long lullTimeMs = currentTimeMillis - lastTransitionTimeMillis.get();
+
+        try {
+          if (lullTimeMs > 
TimeUnit.MINUTES.toMillis(lullTimeMinuteForRestart)) {
+            throw new TimeoutException(
+                String.format(
+                    "The ptransform has been stuck for more than %d minutes, 
the SDK worker will"
+                        + " restart",
+                    lullTimeMinuteForRestart));
+          }
+        } catch (TimeoutException e) {
+          LOG.error(e.getMessage());

Review Comment:
   I believe the exception must be uncaught in the SDK for it to be handled by 
RunnerV2. You can still catch the exception in the test like you are doing 
currently. Or @scwhittle do you prefer using `System.exit(1)` here?



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java:
##########
@@ -80,6 +82,7 @@ public class ExecutionStateSampler {
           .toFormatter();
   private final int periodMs;
   private final MillisProvider clock;
+  private final int lullTimeMinuteForRestart;

Review Comment:
   Using minutes as the unit for the flag is fine but I prefer to use 
milliseconds in `ExecutionStateSampler.java` to be consistent with the other 
time variables in the file. Same goes for `MIN_LULL_TIME_MINUTE_FOR_RESTART`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to