gemini-code-assist[bot] commented on code in PR #38753:
URL: https://github.com/apache/beam/pull/38753#discussion_r3329102223
##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java:
##########
@@ -609,6 +609,54 @@ public void
testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception {
// If the onSuccessMatcher were invoked, it would have crashed here with
AssertionError
}
+ @Test
+ public void testRunStreamingJobEarlySuccess() throws Exception {
+ options.setStreaming(true);
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getState()).thenReturn(State.CANCELLED);
+ when(mockJob.waitUntilFinish(any(Duration.class),
any(JobMessagesHandler.class)))
+ .thenReturn(State.CANCELLED);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ when(mockClient.getJobMetrics(anyString()))
+ .thenReturn(generateMockMetricResponse(true /* success */, true /*
tentative */));
+ TestDataflowRunner runner =
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+ runner.run(p, mockRunner);
+ }
Review Comment:

In `testRunStreamingJobEarlySuccess`, `mockJob.waitUntilFinish` is mocked to
return `State.CANCELLED` immediately. Because of this, the main thread does not
block and may finish executing before the background `CancelOnError` thread
even runs or calls `cancel()`. The test currently passes because the main
thread falls back to calling `checkForPAssertSuccess(job)` synchronously.\n\nTo
ensure that the asynchronous early-termination logic is actually being executed
and that `job.cancel()` is called by the background thread, we should use
Mockito's `timeout` verification.
```java
@Test\n public void testRunStreamingJobEarlySuccess() throws Exception
{\n options.setStreaming(true);\n Pipeline p =
TestPipeline.create(options);\n PCollection<Integer> pc =
p.apply(Create.of(1, 2, 3));\n PAssert.that(pc).containsInAnyOrder(1, 2,
3);\n\n DataflowPipelineJob mockJob =
Mockito.mock(DataflowPipelineJob.class);\n
when(mockJob.getState()).thenReturn(State.CANCELLED);\n
when(mockJob.waitUntilFinish(any(Duration.class),
any(JobMessagesHandler.class)))\n .thenReturn(State.CANCELLED);\n
when(mockJob.getProjectId()).thenReturn("test-project");\n
when(mockJob.getJobId()).thenReturn("test-job");\n\n DataflowRunner
mockRunner = Mockito.mock(DataflowRunner.class);\n
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);\n\n
when(mockClient.getJobMetrics(anyString()))\n
.thenReturn(generateMockMetricResponse(true /* success */, true /* tentative
*/));\n TestDataflowRunner runner = TestDataflowRunner.fromOptionsAnd
Client(options, mockClient);\n runner.run(p, mockRunner);\n\n
Mockito.verify(mockJob, Mockito.timeout(5000)).cancel();\n }
```
##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java:
##########
@@ -609,6 +609,54 @@ public void
testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception {
// If the onSuccessMatcher were invoked, it would have crashed here with
AssertionError
}
+ @Test
+ public void testRunStreamingJobEarlySuccess() throws Exception {
+ options.setStreaming(true);
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getState()).thenReturn(State.CANCELLED);
+ when(mockJob.waitUntilFinish(any(Duration.class),
any(JobMessagesHandler.class)))
+ .thenReturn(State.CANCELLED);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ when(mockClient.getJobMetrics(anyString()))
+ .thenReturn(generateMockMetricResponse(true /* success */, true /*
tentative */));
+ TestDataflowRunner runner =
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+ runner.run(p, mockRunner);
+ }
+
+ @Test
+ public void testRunStreamingJobEarlyFailure() throws Exception {
+ options.setStreaming(true);
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getState()).thenReturn(State.CANCELLED);
+ when(mockJob.waitUntilFinish(any(Duration.class),
any(JobMessagesHandler.class)))
+ .thenReturn(State.CANCELLED);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ when(mockClient.getJobMetrics(anyString()))
+ .thenReturn(generateMockMetricResponse(false /* success */, true /*
tentative */));
+ TestDataflowRunner runner =
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+
+ expectedException.expect(AssertionError.class);
+ runner.run(p, mockRunner);
+ }
Review Comment:

Similarly to `testRunStreamingJobEarlySuccess`,
`testRunStreamingJobEarlyFailure` should verify that `job.cancel()` is
asynchronously called when an assertion fails. Since we expect an
`AssertionError` to be thrown, we can use a try-catch block to capture the
exception and then perform the verification.
```java
@Test\n public void testRunStreamingJobEarlyFailure() throws Exception
{\n options.setStreaming(true);\n Pipeline p =
TestPipeline.create(options);\n PCollection<Integer> pc =
p.apply(Create.of(1, 2, 3));\n PAssert.that(pc).containsInAnyOrder(1, 2,
3);\n\n DataflowPipelineJob mockJob =
Mockito.mock(DataflowPipelineJob.class);\n
when(mockJob.getState()).thenReturn(State.CANCELLED);\n
when(mockJob.waitUntilFinish(any(Duration.class),
any(JobMessagesHandler.class)))\n .thenReturn(State.CANCELLED);\n
when(mockJob.getProjectId()).thenReturn("test-project");\n
when(mockJob.getJobId()).thenReturn("test-job");\n\n DataflowRunner
mockRunner = Mockito.mock(DataflowRunner.class);\n
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);\n\n
when(mockClient.getJobMetrics(anyString()))\n
.thenReturn(generateMockMetricResponse(false /* success */, true /* tentative
*/));\n TestDataflowRunner runner = TestDataflowRunner.fromOptionsAn
dClient(options, mockClient);\n\n try {\n runner.run(p, mockRunner);\n
org.junit.Assert.fail("Expected AssertionError to be thrown");\n }
catch (AssertionError expected) {\n // Expected\n }\n\n
Mockito.verify(mockJob, Mockito.timeout(5000)).cancel();\n }
```
##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java:
##########
@@ -373,29 +384,66 @@ private static class CancelOnError implements
Callable<Void> {
private final DataflowPipelineJob job;
private final ErrorMonitorMessagesHandler messageHandler;
-
- public CancelOnError(DataflowPipelineJob job, ErrorMonitorMessagesHandler
messageHandler) {
+ private final TestDataflowRunner runner;
+ private final
java.util.concurrent.atomic.AtomicReference<Optional<Boolean>>
assertionsPassedRef;
+
+ public CancelOnError(
+ DataflowPipelineJob job,
+ ErrorMonitorMessagesHandler messageHandler,
+ TestDataflowRunner runner,
+ java.util.concurrent.atomic.AtomicReference<Optional<Boolean>>
assertionsPassedRef) {
this.job = job;
this.messageHandler = messageHandler;
+ this.runner = runner;
+ this.assertionsPassedRef = assertionsPassedRef;
}
@Override
public Void call() throws Exception {
+ int checkMetricsIntervalSteps = 5; // Check metrics every 15 seconds (5
* 3s)
+ int steps = 0;
while (true) {
State jobState = job.getState();
- // If we see an error, cancel and note failure
- if (messageHandler.hasSeenError() && !job.getState().isTerminal()) {
- job.cancel();
- LOG.info("Cancelling Dataflow job {}", job.getJobId());
+ if (jobState.isTerminal()) {
return null;
}
- if (jobState.isTerminal()) {
+ // Check metrics for early success/failure cancellation
+ if (steps % checkMetricsIntervalSteps == 0) {
+ Optional<Boolean> assertionsPassed =
runner.checkForPAssertSuccess(job);
+ if (assertionsPassed.isPresent()) {
+ assertionsPassedRef.set(assertionsPassed);
+ if (assertionsPassed.get()) {
+ LOG.info(
+ "All assertions passed for streaming job {}, cancelling
job.",
+ job.getJobId());
+ job.cancel();
+ return null;
+ } else {
+ LOG.info(
+ "Found failed assertion for streaming job {}, cancelling
job.",
+ job.getJobId());
+ job.cancel();
+ return null;
+ }
+ }
+ }
Review Comment:

The metrics polling and job cancellation logic in `CancelOnError` is prone
to transient network or API failures (e.g., rate limits or temporary service
outages when calling `checkForPAssertSuccess` or `cancel`). If any exception is
thrown during these calls, the background thread will terminate silently,
preventing any further monitoring or early termination for the streaming
job.\n\nTo make this robust, we should:\n1. Wrap the API calls in a try-catch
block to prevent transient errors from killing the monitoring thread.\n2. Avoid
redundant metrics queries once we have already determined that assertions have
passed/failed (i.e., if `assertionsPassedRef` is already set, we should just
retry the cancellation directly).
```suggestion
// Check metrics for early success/failure cancellation\n if
(assertionsPassedRef.get().isPresent()) {\n try {\n
LOG.info("Retrying cancellation for job {}...", job.getJobId());\n
job.cancel();\n return null;\n } catch (Exception e) {\n
LOG.warn("Failed to cancel job {}, will retry.", job.getJobId(), e);\n
}\n } else if (steps % checkMetricsIntervalSteps == 0) {\n
try {\n Optional<Boolean> assertionsPassed =
runner.checkForPAssertSuccess(job);\n if
(assertionsPassed.isPresent()) {\n
assertionsPassedRef.set(assertionsPassed);\n LOG.info(\n
"{} assertion(s) for streaming job {}, cancelling job.",\n
assertionsPassed.get() ? "All passed" : "Found failed",\n
job.getJobId());\n try {\n job.cancel();\n
return null;\n } catch (Exception
e) {\n LOG.warn("Failed to cancel job {}, will retry.",
job.getJobId(), e);\n }\n }\n } catch
(Exception e) {\n LOG.warn("Failed to check assertions for job {}.",
job.getJobId(), e);\n }\n }
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]