gemini-code-assist[bot] commented on code in PR #36170:
URL: https://github.com/apache/beam/pull/36170#discussion_r2352028858
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserverTest.java:
##########
@@ -113,6 +161,37 @@ public void testReset_usesNewDelegate()
verify(secondObserver).onNext(eq(2));
}
+ @Test
+ public void testOnNext_streamCancelledException_onErrorThrows() throws
Exception {
+ ResettableThrowingStreamObserver<Integer> observer = newStreamObserver();
+ TerminatingStreamObserver<Integer> spiedDelegate = newDelegate();
+ StreamObserverCancelledException streamObserverCancelledException =
+ new StreamObserverCancelledException("Test error");
+
doThrow(streamObserverCancelledException).when(spiedDelegate).onNext(any());
+ observer.reset(spiedDelegate);
+ observer.onNext(1);
+
+ verify(spiedDelegate).onError(eq(streamObserverCancelledException));
+ assertThrows(
+ ResettableThrowingStreamObserver.StreamClosedException.class,
+ () -> observer.onError(new Exception()));
+ }
+
+ @Test
+ public void testOnNext_streamCancelledException_onCompletedThrows() throws
Exception {
+ ResettableThrowingStreamObserver<Integer> observer = newStreamObserver();
+ TerminatingStreamObserver<Integer> spiedDelegate = newDelegate();
+ StreamObserverCancelledException streamObserverCancelledException =
+ new StreamObserverCancelledException("Test error");
+
doThrow(streamObserverCancelledException).when(spiedDelegate).onNext(any());
+ observer.reset(spiedDelegate);
+ observer.onNext(1);
+
+ verify(spiedDelegate).onError(eq(streamObserverCancelledException));
+ assertThrows(
+ ResettableThrowingStreamObserver.StreamClosedException.class,
observer::onCompleted);
+ }
Review Comment:

The two new tests, `testOnNext_streamCancelledException_onErrorThrows` and
`testOnNext_streamCancelledException_onCompletedThrows`, share a significant
amount of setup code and test a very similar outcome: that the stream is
correctly closed after `onNext` fails with a `StreamObserverCancelledException`.
To improve code clarity and reduce duplication, these two tests could be
combined into a single, more comprehensive test case that verifies all expected
behaviors on the closed stream. This would make the test suite more concise and
easier to maintain.
```suggestion
@Test
public void testOnNext_streamCancelledException_closesStream() throws
Exception {
ResettableThrowingStreamObserver<Integer> observer = newStreamObserver();
TerminatingStreamObserver<Integer> spiedDelegate = newDelegate();
StreamObserverCancelledException streamObserverCancelledException =
new StreamObserverCancelledException("Test error");
doThrow(streamObserverCancelledException).when(spiedDelegate).onNext(any());
observer.reset(spiedDelegate);
observer.onNext(1);
verify(spiedDelegate).onError(eq(streamObserverCancelledException));
assertThrows(
ResettableThrowingStreamObserver.StreamClosedException.class,
() -> observer.onError(new Exception()));
assertThrows(
ResettableThrowingStreamObserver.StreamClosedException.class,
observer::onCompleted);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]