durgaprasadml commented on code in PR #38753:
URL: https://github.com/apache/beam/pull/38753#discussion_r3334923597


##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java:
##########
@@ -121,19 +121,28 @@ DataflowPipelineJob run(Pipeline pipeline, DataflowRunner 
runner) {
     ErrorMonitorMessagesHandler messageHandler =
         new ErrorMonitorMessagesHandler(job, new 
MonitoringUtil.LoggingHandler());
 
+    java.util.concurrent.atomic.AtomicReference<Optional<Boolean>> 
assertionsPassedRef =
+        new java.util.concurrent.atomic.AtomicReference<>(Optional.absent());
+
     if (options.isStreaming()) {
       if (options.isBlockOnRun()) {
-        jobSuccess = waitForStreamingJobTermination(job, messageHandler);
+        jobSuccess = waitForStreamingJobTermination(job, messageHandler, 
assertionsPassedRef);
       } else {
         jobSuccess = true;
       }
-      // No metrics in streaming
-      allAssertionsPassed = Optional.absent();
+      allAssertionsPassed = assertionsPassedRef.get();
+      if (!allAssertionsPassed.isPresent()) {
+        allAssertionsPassed = checkForPAssertSuccess(job);
+      }
     } else {
       jobSuccess = waitForBatchJobTermination(job, messageHandler);
       allAssertionsPassed = checkForPAssertSuccess(job);
     }
 
+    if (allAssertionsPassed.isPresent() && allAssertionsPassed.get()) {
+      jobSuccess = true;

Review Comment:
   Thanks for reviewing this.
   
   The intent here is not to mask arbitrary job failures, but to avoid treating 
expected post-assertion cancellation behavior as a test failure in streaming 
mode.
   
   For these validatesRunner streaming tests, the pipelines are intentionally 
cancelled after all PAssert metrics succeed in order to avoid waiting for the 
full timeout window. Under Streaming Engine, the final observed job state is 
therefore often CANCELLED rather than DONE, even though all assertions have 
already completed successfully.
   
   The jobSuccess = true override is only applied when:
   - streaming mode is enabled
   - PAssertSuccess metrics confirm all expected assertions completed 
successfully
   - and no PAssertFailure metrics were observed
   
   Non-assertion failures are still surfaced because:
   - the monitor thread continues watching for terminal failures and error 
conditions before assertion success is reached
   - failed assertions explicitly set the result to failure
   - transient cancellation retries do not suppress exceptions from failed 
assertions
   - infrastructure/runtime failures occurring before assertion completion 
still fail the test normally
   
   So the override is specifically intended to distinguish:
   - “all assertions passed, then the streaming job was intentionally cancelled”
   from
   - “the pipeline failed before assertions completed”
   
   That said, I agree the current logic could benefit from being more 
explicit/documented around the distinction between intentional post-assertion 
cancellation and genuine runtime failure states. I’m happy to tighten the 
condition further if needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to