durgaprasadml commented on code in PR #38753:
URL: https://github.com/apache/beam/pull/38753#discussion_r3329106881


##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java:
##########
@@ -609,6 +609,54 @@ public void 
testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception {
     // If the onSuccessMatcher were invoked, it would have crashed here with 
AssertionError
   }
 
+  @Test
+  public void testRunStreamingJobEarlySuccess() throws Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.CANCELLED);
+    when(mockJob.waitUntilFinish(any(Duration.class), 
any(JobMessagesHandler.class)))
+        .thenReturn(State.CANCELLED);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(true /* success */, true /* 
tentative */));
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    runner.run(p, mockRunner);
+  }
+
+  @Test
+  public void testRunStreamingJobEarlyFailure() throws Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.CANCELLED);
+    when(mockJob.waitUntilFinish(any(Duration.class), 
any(JobMessagesHandler.class)))
+        .thenReturn(State.CANCELLED);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(false /* success */, true /* 
tentative */));
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+
+    expectedException.expect(AssertionError.class);
+    runner.run(p, mockRunner);
+  }

Review Comment:
   Thanks for the detailed review! Addressed the robustness and async 
validation concerns by:
   - hardening the monitoring loop with retry-safe exception handling
   - preventing redundant metrics polling after assertion state is known
   - retrying transient cancellation failures
   - adding Mockito timeout verification for async cancellation behavior in 
streaming tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to