gemini-code-assist[bot] commented on code in PR #38753:
URL: https://github.com/apache/beam/pull/38753#discussion_r3395347334
##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java:
##########
@@ -373,29 +391,91 @@ private static class CancelOnError implements
Callable<Void> {
private final DataflowPipelineJob job;
private final ErrorMonitorMessagesHandler messageHandler;
-
- public CancelOnError(DataflowPipelineJob job, ErrorMonitorMessagesHandler
messageHandler) {
+ private final TestDataflowRunner runner;
+ private final
java.util.concurrent.atomic.AtomicReference<Optional<Boolean>>
+ assertionsPassedRef;
+
+ public CancelOnError(
+ DataflowPipelineJob job,
+ ErrorMonitorMessagesHandler messageHandler,
+ TestDataflowRunner runner,
+ java.util.concurrent.atomic.AtomicReference<Optional<Boolean>>
assertionsPassedRef) {
this.job = job;
this.messageHandler = messageHandler;
+ this.runner = runner;
+ this.assertionsPassedRef = assertionsPassedRef;
}
@Override
public Void call() throws Exception {
+ int checkMetricsIntervalSteps = 5; // Check metrics every 15 seconds (5
* 3s)
+ int steps = 0;
+ boolean cancellationPending = false;
while (true) {
Review Comment:

To support measuring the actual elapsed time of the job monitor loop (which
is more robust against slow API calls or network latency), we should record the
start time of the loop using `System.currentTimeMillis()`.
```java
@Override
public Void call() throws Exception {
int checkMetricsIntervalSteps = 5; // Check metrics every 15 seconds
(5 * 3s)
int steps = 0;
boolean cancellationPending = false;
long startTimeMillis = System.currentTimeMillis();
while (true) {
```
##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java:
##########
@@ -373,29 +391,91 @@ private static class CancelOnError implements
Callable<Void> {
private final DataflowPipelineJob job;
private final ErrorMonitorMessagesHandler messageHandler;
-
- public CancelOnError(DataflowPipelineJob job, ErrorMonitorMessagesHandler
messageHandler) {
+ private final TestDataflowRunner runner;
+ private final
java.util.concurrent.atomic.AtomicReference<Optional<Boolean>>
+ assertionsPassedRef;
+
+ public CancelOnError(
+ DataflowPipelineJob job,
+ ErrorMonitorMessagesHandler messageHandler,
+ TestDataflowRunner runner,
+ java.util.concurrent.atomic.AtomicReference<Optional<Boolean>>
assertionsPassedRef) {
this.job = job;
this.messageHandler = messageHandler;
+ this.runner = runner;
+ this.assertionsPassedRef = assertionsPassedRef;
}
@Override
public Void call() throws Exception {
+ int checkMetricsIntervalSteps = 5; // Check metrics every 15 seconds (5
* 3s)
+ int steps = 0;
+ boolean cancellationPending = false;
while (true) {
- State jobState = job.getState();
-
- // If we see an error, cancel and note failure
- if (messageHandler.hasSeenError() && !job.getState().isTerminal()) {
- job.cancel();
- LOG.info("Cancelling Dataflow job {}", job.getJobId());
- return null;
- }
-
- if (jobState.isTerminal()) {
- return null;
+ try {
+ State jobState = job.getState();
+
+ if (jobState.isTerminal()) {
+ return null;
+ }
+
+ // Check if we should initiate cancellation based on metrics (only
if assertion state is
+ // not yet known)
+ if (!assertionsPassedRef.get().isPresent() && !cancellationPending) {
+ if (runner.expectedNumberOfAssertions > 0 && steps %
checkMetricsIntervalSteps == 0) {
+ try {
+ Optional<Boolean> assertionsPassed =
runner.checkForPAssertSuccess(job);
+ if (assertionsPassed.isPresent()) {
+ assertionsPassedRef.set(assertionsPassed);
+ cancellationPending = true;
+ if (assertionsPassed.get()) {
+ LOG.info(
+ "All assertions passed for streaming job {},
cancelling job.",
+ job.getJobId());
+ } else {
+ LOG.info(
+ "Found failed assertion for streaming job {},
cancelling job.",
+ job.getJobId());
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Transient error polling metrics for job {}",
job.getJobId(), e);
+ }
+ }
+ }
+
+ // Check if we should initiate cancellation based on error logs
(only if not already
+ // cancellationPending)
+ if (!cancellationPending) {
+ long runningTimeMillis = steps * 3000L;
+ if (messageHandler.hasSeenError()
+ && (runningTimeMillis > 300000L ||
runner.expectedNumberOfAssertions == 0)) {
Review Comment:

Calculating `runningTimeMillis` as `steps * 3000L` assumes that each
iteration of the loop takes exactly 3 seconds. However, network calls to the
Dataflow API (such as `job.getState()` or `runner.checkForPAssertSuccess(job)`)
can be slow or timeout, making the actual elapsed time much longer than `steps
* 3000L`. This can cause the 5-minute delay to be significantly longer in
practice. We should use the actual elapsed time since the monitor started.
```suggestion
if (!cancellationPending) {
long runningTimeMillis = System.currentTimeMillis() -
startTimeMillis;
if (messageHandler.hasSeenError()
&& (runningTimeMillis > 300000L ||
runner.expectedNumberOfAssertions == 0)) {
```
##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java:
##########
@@ -373,29 +391,91 @@ private static class CancelOnError implements
Callable<Void> {
private final DataflowPipelineJob job;
private final ErrorMonitorMessagesHandler messageHandler;
-
- public CancelOnError(DataflowPipelineJob job, ErrorMonitorMessagesHandler
messageHandler) {
+ private final TestDataflowRunner runner;
+ private final
java.util.concurrent.atomic.AtomicReference<Optional<Boolean>>
+ assertionsPassedRef;
+
+ public CancelOnError(
+ DataflowPipelineJob job,
+ ErrorMonitorMessagesHandler messageHandler,
+ TestDataflowRunner runner,
+ java.util.concurrent.atomic.AtomicReference<Optional<Boolean>>
assertionsPassedRef) {
this.job = job;
this.messageHandler = messageHandler;
+ this.runner = runner;
+ this.assertionsPassedRef = assertionsPassedRef;
}
@Override
public Void call() throws Exception {
+ int checkMetricsIntervalSteps = 5; // Check metrics every 15 seconds (5
* 3s)
+ int steps = 0;
+ boolean cancellationPending = false;
while (true) {
- State jobState = job.getState();
-
- // If we see an error, cancel and note failure
- if (messageHandler.hasSeenError() && !job.getState().isTerminal()) {
- job.cancel();
- LOG.info("Cancelling Dataflow job {}", job.getJobId());
- return null;
- }
-
- if (jobState.isTerminal()) {
- return null;
+ try {
+ State jobState = job.getState();
+
+ if (jobState.isTerminal()) {
+ return null;
+ }
+
+ // Check if we should initiate cancellation based on metrics (only
if assertion state is
+ // not yet known)
+ if (!assertionsPassedRef.get().isPresent() && !cancellationPending) {
+ if (runner.expectedNumberOfAssertions > 0 && steps %
checkMetricsIntervalSteps == 0) {
Review Comment:

At `steps = 0`, `steps % checkMetricsIntervalSteps == 0` is true, so the
runner immediately makes a network call to check metrics. Since the job has
just been submitted, it is guaranteed to have no metrics yet. We should skip
checking metrics on the very first step (`steps > 0`) to avoid a redundant API
call.
```suggestion
if (!assertionsPassedRef.get().isPresent() &&
!cancellationPending) {
if (runner.expectedNumberOfAssertions > 0 && steps > 0 && steps
% checkMetricsIntervalSteps == 0) {
```
##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java:
##########
@@ -373,29 +391,91 @@ private static class CancelOnError implements
Callable<Void> {
private final DataflowPipelineJob job;
private final ErrorMonitorMessagesHandler messageHandler;
-
- public CancelOnError(DataflowPipelineJob job, ErrorMonitorMessagesHandler
messageHandler) {
+ private final TestDataflowRunner runner;
+ private final
java.util.concurrent.atomic.AtomicReference<Optional<Boolean>>
+ assertionsPassedRef;
+
+ public CancelOnError(
+ DataflowPipelineJob job,
+ ErrorMonitorMessagesHandler messageHandler,
+ TestDataflowRunner runner,
+ java.util.concurrent.atomic.AtomicReference<Optional<Boolean>>
assertionsPassedRef) {
this.job = job;
this.messageHandler = messageHandler;
+ this.runner = runner;
+ this.assertionsPassedRef = assertionsPassedRef;
}
@Override
public Void call() throws Exception {
+ int checkMetricsIntervalSteps = 5; // Check metrics every 15 seconds (5
* 3s)
+ int steps = 0;
+ boolean cancellationPending = false;
while (true) {
- State jobState = job.getState();
-
- // If we see an error, cancel and note failure
- if (messageHandler.hasSeenError() && !job.getState().isTerminal()) {
- job.cancel();
- LOG.info("Cancelling Dataflow job {}", job.getJobId());
- return null;
- }
-
- if (jobState.isTerminal()) {
- return null;
+ try {
+ State jobState = job.getState();
+
+ if (jobState.isTerminal()) {
+ return null;
+ }
+
+ // Check if we should initiate cancellation based on metrics (only
if assertion state is
+ // not yet known)
+ if (!assertionsPassedRef.get().isPresent() && !cancellationPending) {
+ if (runner.expectedNumberOfAssertions > 0 && steps %
checkMetricsIntervalSteps == 0) {
+ try {
+ Optional<Boolean> assertionsPassed =
runner.checkForPAssertSuccess(job);
+ if (assertionsPassed.isPresent()) {
+ assertionsPassedRef.set(assertionsPassed);
+ cancellationPending = true;
+ if (assertionsPassed.get()) {
+ LOG.info(
+ "All assertions passed for streaming job {},
cancelling job.",
+ job.getJobId());
+ } else {
+ LOG.info(
+ "Found failed assertion for streaming job {},
cancelling job.",
+ job.getJobId());
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Transient error polling metrics for job {}",
job.getJobId(), e);
+ }
+ }
+ }
+
+ // Check if we should initiate cancellation based on error logs
(only if not already
+ // cancellationPending)
+ if (!cancellationPending) {
+ long runningTimeMillis = steps * 3000L;
+ if (messageHandler.hasSeenError()
+ && (runningTimeMillis > 300000L ||
runner.expectedNumberOfAssertions == 0)) {
+ LOG.info(
+ "Cancelling Dataflow job due to error messages seen: {}",
+ messageHandler.getErrorMessage());
+ cancellationPending = true;
+ }
+ }
+
+ // Perform or retry cancellation if cancellation is pending
+ if (cancellationPending) {
+ try {
+ job.cancel();
+ return null; // Successful cancellation
+ } catch (Exception e) {
+ LOG.warn(
+ "Failed to cancel Dataflow job {}. Will retry on next
iteration. Error",
+ job.getJobId(),
+ e);
+ }
+ }
+
+ } catch (Exception e) {
+ LOG.warn("Exception in streaming job monitor loop for job {}",
job.getJobId(), e);
}
Review Comment:

Catching `Exception` inside the loop can swallow `InterruptedException` if
it is thrown inside the `try` block. If `InterruptedException` is caught and
swallowed without restoring the interrupted status or exiting, the thread might
not terminate properly when cancelled. We should check if the caught exception
is an instance of `InterruptedException`, restore the interrupted status, and
exit the loop.
```java
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
return null;
}
LOG.warn("Exception in streaming job monitor loop for job {}",
job.getJobId(), e);
}
```
##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java:
##########
@@ -609,6 +598,89 @@ public void
testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception {
// If the onSuccessMatcher were invoked, it would have crashed here with
AssertionError
}
+ @Test
+ public void testRunStreamingJobEarlySuccess() throws Exception {
+ options.setStreaming(true);
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getState()).thenReturn(State.RUNNING);
+ java.util.concurrent.CountDownLatch cancelLatch = new
java.util.concurrent.CountDownLatch(1);
+ try {
+ Mockito.doAnswer(
+ invocation -> {
+ cancelLatch.countDown();
+ return null;
+ })
+ .when(mockJob)
+ .cancel();
+ } catch (Exception e) {
+ // Ignore
+ }
+ when(mockJob.waitUntilFinish(any(Duration.class),
any(JobMessagesHandler.class)))
+ .thenAnswer(
+ invocation -> {
+ cancelLatch.await();
+ return State.CANCELLED;
+ });
Review Comment:

Using `cancelLatch.await()` without a timeout can cause the test to hang
indefinitely if the job is never cancelled (e.g., due to a bug in the code
under test). It is a best practice to use a timeout on `await()` to ensure the
test fails fast instead of hanging the CI build.
```suggestion
when(mockJob.waitUntilFinish(any(Duration.class),
any(JobMessagesHandler.class)))
.thenAnswer(
invocation -> {
if (!cancelLatch.await(10,
java.util.concurrent.TimeUnit.SECONDS)) {
throw new RuntimeException("Timeout waiting for job
cancellation");
}
return State.CANCELLED;
});
```
##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java:
##########
@@ -995,21 +998,51 @@ public ProcessContinuation process(
throws InterruptedException {
AtomicBoolean wasFinalized =
WAS_FINALIZED.computeIfAbsent(element, (unused) -> new
AtomicBoolean());
+
+ long currentAttempt = tracker.currentRestriction().getFrom();
+
+ // On subsequent attempts, the previous bundle has committed, so the
finalization
+ // callback should run. Poll wasFinalized with a timed wait to avoid
deadlocks
+ // on single-threaded executors.
+ if (currentAttempt > 0 && !wasFinalized.get()) {
+ long limitMs = 1000;
+ long start = System.currentTimeMillis();
+ while (!wasFinalized.get() && (System.currentTimeMillis() - start) <
limitMs) {
+ sleep(10L);
+ }
Review Comment:

Using `System.currentTimeMillis()` to measure elapsed time is not monotonic
and can be affected by system clock adjustments (e.g., NTP sync). It is safer
and more robust to use `System.nanoTime()` for measuring elapsed time in loops.
```suggestion
if (currentAttempt > 0 && !wasFinalized.get()) {
long limitNs = 1_000_000_000L;
long start = System.nanoTime();
while (!wasFinalized.get() && (System.nanoTime() - start) < limitNs)
{
sleep(10L);
}
```
##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java:
##########
@@ -609,6 +598,89 @@ public void
testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception {
// If the onSuccessMatcher were invoked, it would have crashed here with
AssertionError
}
+ @Test
+ public void testRunStreamingJobEarlySuccess() throws Exception {
+ options.setStreaming(true);
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getState()).thenReturn(State.RUNNING);
+ java.util.concurrent.CountDownLatch cancelLatch = new
java.util.concurrent.CountDownLatch(1);
+ try {
+ Mockito.doAnswer(
+ invocation -> {
+ cancelLatch.countDown();
+ return null;
+ })
+ .when(mockJob)
+ .cancel();
+ } catch (Exception e) {
+ // Ignore
+ }
+ when(mockJob.waitUntilFinish(any(Duration.class),
any(JobMessagesHandler.class)))
+ .thenAnswer(
+ invocation -> {
+ cancelLatch.await();
+ return State.CANCELLED;
+ });
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ when(mockClient.getJobMetrics(anyString()))
+ .thenReturn(generateMockMetricResponse(true /* success */, true /*
tentative */));
+ TestDataflowRunner runner =
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+ runner.run(p, mockRunner);
+
+ Mockito.verify(mockJob, Mockito.timeout(5000)).cancel();
+ }
+
+ @Test
+ public void testRunStreamingJobEarlyFailure() throws Exception {
+ options.setStreaming(true);
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getState()).thenReturn(State.RUNNING);
+ java.util.concurrent.CountDownLatch cancelLatch = new
java.util.concurrent.CountDownLatch(1);
+ try {
+ Mockito.doAnswer(
+ invocation -> {
+ cancelLatch.countDown();
+ return null;
+ })
+ .when(mockJob)
+ .cancel();
+ } catch (Exception e) {
+ // Ignore
+ }
+ when(mockJob.waitUntilFinish(any(Duration.class),
any(JobMessagesHandler.class)))
+ .thenAnswer(
+ invocation -> {
+ cancelLatch.await();
+ return State.CANCELLED;
+ });
Review Comment:

Using `cancelLatch.await()` without a timeout can cause the test to hang
indefinitely if the job is never cancelled. It is a best practice to use a
timeout on `await()` to ensure the test fails fast instead of hanging the CI
build.
```suggestion
when(mockJob.waitUntilFinish(any(Duration.class),
any(JobMessagesHandler.class)))
.thenAnswer(
invocation -> {
if (!cancelLatch.await(10,
java.util.concurrent.TimeUnit.SECONDS)) {
throw new RuntimeException("Timeout waiting for job
cancellation");
}
return State.CANCELLED;
});
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]