XComp commented on code in PR #23296:
URL: https://github.com/apache/flink/pull/23296#discussion_r1308281432
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java:
##########
@@ -105,54 +102,147 @@ void testCloseAfterRequestIsReceived() throws Exception {
}
@Test
- void testServerFailure() throws Exception {
- CollectSinkOperatorCoordinator coordinator =
- new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
- coordinator.start();
-
- final String versionOfFailedRequest = "version3";
- final CompletableFuture<CoordinationResponse> failedResponseFuture;
- try (final TestingSocketServer socketServer =
-
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
-
- // a normal response
- final List<Row> expectedData0 = Arrays.asList(Row.of(1, "aaa"),
Row.of(2, "bbb"));
- final CompletableFuture<CoordinationResponse> responseFuture0 =
- coordinator.handleCoordinationRequest(
- createRequestForServerGeneratedResponse());
- socketServer.handleRequest(expectedData0);
- assertResponseWithDefaultMetadataFromServer(responseFuture0,
expectedData0);
+ void testSuccessfulResponse() throws Exception {
+ try (CollectSinkOperatorCoordinator testInstance = new
CollectSinkOperatorCoordinator();
+ final TestingSocketServer socketServer =
+
TestingSocketServer.createSocketServerAndInitializeCoordinator(
+ testInstance)) {
+ testInstance.start();
- // a normal response
- final List<Row> expectedData1 =
- Arrays.asList(Row.of(3, "ccc"), Row.of(4, "ddd"),
Row.of(5, "eee"));
- final CompletableFuture<CoordinationResponse> responseFuture1 =
- coordinator.handleCoordinationRequest(
+ final List<Row> expectedData = Arrays.asList(Row.of(1, "aaa"),
Row.of(2, "bbb"));
+ final CompletableFuture<CoordinationResponse> responseFuture =
+ testInstance.handleCoordinationRequest(
createRequestForServerGeneratedResponse());
- socketServer.handleRequest(expectedData1);
- assertResponseWithDefaultMetadataFromServer(responseFuture1,
expectedData1);
+ assertThat(responseFuture).isNotDone();
+
+ socketServer.handleRequest(expectedData);
+
+ assertResponseWithDefaultMetadataFromServer(responseFuture,
expectedData);
+ }
+ }
+
+ @Test
+ void testServerSideClosingTheServerSocket() throws Exception {
+ try (CollectSinkOperatorCoordinator coordinator = new
CollectSinkOperatorCoordinator()) {
+ coordinator.start();
+
+ final CompletableFuture<CoordinationResponse> responseFuture;
+ try (final TestingSocketServer socketServer =
+
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
+ final String version = "version";
+ responseFuture =
+ coordinator.handleCoordinationRequest(
+
createRequestForClientGeneratedResponse(version));
+ assertThat(responseFuture).isNotDone();
+
+
FlinkAssertions.assertThatFuture(socketServer.handleRequestWithoutResponse())
+ .eventuallySucceeds();
+ }
+ assertEmptyResponseGeneratedFromServer(responseFuture);
Review Comment:
I removed the try/catch blocks where I felt that having the explicit close
call being part of the test code helps the readability.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]