gemini-code-assist[bot] commented on code in PR #36170:
URL: https://github.com/apache/beam/pull/36170#discussion_r2352028858


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserverTest.java:
##########
@@ -113,6 +161,37 @@ public void testReset_usesNewDelegate()
     verify(secondObserver).onNext(eq(2));
   }
 
+  @Test
+  public void testOnNext_streamCancelledException_onErrorThrows() throws 
Exception {
+    ResettableThrowingStreamObserver<Integer> observer = newStreamObserver();
+    TerminatingStreamObserver<Integer> spiedDelegate = newDelegate();
+    StreamObserverCancelledException streamObserverCancelledException =
+        new StreamObserverCancelledException("Test error");
+    
doThrow(streamObserverCancelledException).when(spiedDelegate).onNext(any());
+    observer.reset(spiedDelegate);
+    observer.onNext(1);
+
+    verify(spiedDelegate).onError(eq(streamObserverCancelledException));
+    assertThrows(
+        ResettableThrowingStreamObserver.StreamClosedException.class,
+        () -> observer.onError(new Exception()));
+  }
+
+  @Test
+  public void testOnNext_streamCancelledException_onCompletedThrows() throws 
Exception {
+    ResettableThrowingStreamObserver<Integer> observer = newStreamObserver();
+    TerminatingStreamObserver<Integer> spiedDelegate = newDelegate();
+    StreamObserverCancelledException streamObserverCancelledException =
+        new StreamObserverCancelledException("Test error");
+    
doThrow(streamObserverCancelledException).when(spiedDelegate).onNext(any());
+    observer.reset(spiedDelegate);
+    observer.onNext(1);
+
+    verify(spiedDelegate).onError(eq(streamObserverCancelledException));
+    assertThrows(
+        ResettableThrowingStreamObserver.StreamClosedException.class, 
observer::onCompleted);
+  }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The two new tests, `testOnNext_streamCancelledException_onErrorThrows` and 
`testOnNext_streamCancelledException_onCompletedThrows`, share a significant 
amount of setup code and test a very similar outcome: that the stream is 
correctly closed after `onNext` fails with a `StreamObserverCancelledException`.
   
   To improve code clarity and reduce duplication, these two tests could be 
combined into a single, more comprehensive test case that verifies all expected 
behaviors on the closed stream. This would make the test suite more concise and 
easier to maintain.
   
   ```suggestion
     @Test
     public void testOnNext_streamCancelledException_closesStream() throws 
Exception {
       ResettableThrowingStreamObserver<Integer> observer = newStreamObserver();
       TerminatingStreamObserver<Integer> spiedDelegate = newDelegate();
       StreamObserverCancelledException streamObserverCancelledException =
           new StreamObserverCancelledException("Test error");
       
doThrow(streamObserverCancelledException).when(spiedDelegate).onNext(any());
       observer.reset(spiedDelegate);
       observer.onNext(1);
   
       verify(spiedDelegate).onError(eq(streamObserverCancelledException));
       assertThrows(
           ResettableThrowingStreamObserver.StreamClosedException.class,
           () -> observer.onError(new Exception()));
       assertThrows(
           ResettableThrowingStreamObserver.StreamClosedException.class, 
observer::onCompleted);
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to