gemini-code-assist[bot] commented on code in PR #36170: URL: https://github.com/apache/beam/pull/36170#discussion_r2352028858
########## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserverTest.java: ########## @@ -113,6 +161,37 @@ public void testReset_usesNewDelegate() verify(secondObserver).onNext(eq(2)); } + @Test + public void testOnNext_streamCancelledException_onErrorThrows() throws Exception { + ResettableThrowingStreamObserver<Integer> observer = newStreamObserver(); + TerminatingStreamObserver<Integer> spiedDelegate = newDelegate(); + StreamObserverCancelledException streamObserverCancelledException = + new StreamObserverCancelledException("Test error"); + doThrow(streamObserverCancelledException).when(spiedDelegate).onNext(any()); + observer.reset(spiedDelegate); + observer.onNext(1); + + verify(spiedDelegate).onError(eq(streamObserverCancelledException)); + assertThrows( + ResettableThrowingStreamObserver.StreamClosedException.class, + () -> observer.onError(new Exception())); + } + + @Test + public void testOnNext_streamCancelledException_onCompletedThrows() throws Exception { + ResettableThrowingStreamObserver<Integer> observer = newStreamObserver(); + TerminatingStreamObserver<Integer> spiedDelegate = newDelegate(); + StreamObserverCancelledException streamObserverCancelledException = + new StreamObserverCancelledException("Test error"); + doThrow(streamObserverCancelledException).when(spiedDelegate).onNext(any()); + observer.reset(spiedDelegate); + observer.onNext(1); + + verify(spiedDelegate).onError(eq(streamObserverCancelledException)); + assertThrows( + ResettableThrowingStreamObserver.StreamClosedException.class, observer::onCompleted); + } Review Comment:  The two new tests, `testOnNext_streamCancelledException_onErrorThrows` and `testOnNext_streamCancelledException_onCompletedThrows`, share a significant amount of setup code and test a very similar outcome: that the stream is correctly closed after `onNext` fails with a `StreamObserverCancelledException`. To improve code clarity and reduce duplication, these two tests could be combined into a single, more comprehensive test case that verifies all expected behaviors on the closed stream. This would make the test suite more concise and easier to maintain. ```suggestion @Test public void testOnNext_streamCancelledException_closesStream() throws Exception { ResettableThrowingStreamObserver<Integer> observer = newStreamObserver(); TerminatingStreamObserver<Integer> spiedDelegate = newDelegate(); StreamObserverCancelledException streamObserverCancelledException = new StreamObserverCancelledException("Test error"); doThrow(streamObserverCancelledException).when(spiedDelegate).onNext(any()); observer.reset(spiedDelegate); observer.onNext(1); verify(spiedDelegate).onError(eq(streamObserverCancelledException)); assertThrows( ResettableThrowingStreamObserver.StreamClosedException.class, () -> observer.onError(new Exception())); assertThrows( ResettableThrowingStreamObserver.StreamClosedException.class, observer::onCompleted); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org