XComp commented on code in PR #23296:
URL: https://github.com/apache/flink/pull/23296#discussion_r1308287200
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java:
##########
@@ -105,54 +102,147 @@ void testCloseAfterRequestIsReceived() throws Exception {
}
@Test
- void testServerFailure() throws Exception {
- CollectSinkOperatorCoordinator coordinator =
- new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
- coordinator.start();
-
- final String versionOfFailedRequest = "version3";
- final CompletableFuture<CoordinationResponse> failedResponseFuture;
- try (final TestingSocketServer socketServer =
-
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
-
- // a normal response
- final List<Row> expectedData0 = Arrays.asList(Row.of(1, "aaa"),
Row.of(2, "bbb"));
- final CompletableFuture<CoordinationResponse> responseFuture0 =
- coordinator.handleCoordinationRequest(
- createRequestForServerGeneratedResponse());
- socketServer.handleRequest(expectedData0);
- assertResponseWithDefaultMetadataFromServer(responseFuture0,
expectedData0);
+ void testSuccessfulResponse() throws Exception {
+ try (CollectSinkOperatorCoordinator testInstance = new
CollectSinkOperatorCoordinator();
+ final TestingSocketServer socketServer =
+
TestingSocketServer.createSocketServerAndInitializeCoordinator(
+ testInstance)) {
+ testInstance.start();
- // a normal response
- final List<Row> expectedData1 =
- Arrays.asList(Row.of(3, "ccc"), Row.of(4, "ddd"),
Row.of(5, "eee"));
- final CompletableFuture<CoordinationResponse> responseFuture1 =
- coordinator.handleCoordinationRequest(
+ final List<Row> expectedData = Arrays.asList(Row.of(1, "aaa"),
Row.of(2, "bbb"));
+ final CompletableFuture<CoordinationResponse> responseFuture =
+ testInstance.handleCoordinationRequest(
createRequestForServerGeneratedResponse());
- socketServer.handleRequest(expectedData1);
- assertResponseWithDefaultMetadataFromServer(responseFuture1,
expectedData1);
+ assertThat(responseFuture).isNotDone();
+
+ socketServer.handleRequest(expectedData);
+
+ assertResponseWithDefaultMetadataFromServer(responseFuture,
expectedData);
+ }
+ }
+
+ @Test
+ void testServerSideClosingTheServerSocket() throws Exception {
+ try (CollectSinkOperatorCoordinator coordinator = new
CollectSinkOperatorCoordinator()) {
+ coordinator.start();
+
+ final CompletableFuture<CoordinationResponse> responseFuture;
+ try (final TestingSocketServer socketServer =
+
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
+ final String version = "version";
+ responseFuture =
+ coordinator.handleCoordinationRequest(
+
createRequestForClientGeneratedResponse(version));
+ assertThat(responseFuture).isNotDone();
+
+
FlinkAssertions.assertThatFuture(socketServer.handleRequestWithoutResponse())
+ .eventuallySucceeds();
+ }
+ assertEmptyResponseGeneratedFromServer(responseFuture);
+ }
+ }
+
+ @Test
+ void testServerSideClosingTheAcceptingSocket() throws Exception {
+ try (CollectSinkOperatorCoordinator coordinator = new
CollectSinkOperatorCoordinator()) {
+ coordinator.start();
+
+ try (final TestingSocketServer socketServer =
+
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
+ final String version = "version";
+ final CompletableFuture<CoordinationResponse> responseFuture =
+ coordinator.handleCoordinationRequest(
+
createRequestForClientGeneratedResponse(version));
+ assertThat(responseFuture).isNotDone();
+
+ final CompletableFuture<List<Row>> dataFuture = new
CompletableFuture<>();
+ final CompletableFuture<Void> serverSideProcess =
+ socketServer.handleRequestAsync(dataFuture);
+ assertThat(serverSideProcess).isNotDone();
+
+ // wait for the connection to be established
+ socketServer.waitForConnectionToBeEstablished();
+ // in order to close the connection from the server's side
+ socketServer.closeAcceptingSocket();
+
+ assertEmptyResponseGeneratedFromClient(responseFuture,
version);
+
+ assertThat(serverSideProcess).isNotDone();
- // server closes here
- failedResponseFuture =
+ dataFuture.complete(Collections.singletonList(Row.of(123,
"abc")));
+
+ FlinkAssertions.assertThatFuture(serverSideProcess)
+ .eventuallyFailsWith(ExecutionException.class)
+ .withCauseInstanceOf(SocketException.class);
+ }
+ }
+ }
+
+ @Test
+ void testServerSideNotResponding() throws Exception {
+ try (final TestingSocketServer socketServer = new
TestingSocketServer()) {
+ final String expectedVersion = "version";
+ final CompletableFuture<CoordinationResponse> responseFuture;
+ try (CollectSinkOperatorCoordinator coordinator =
+ new CollectSinkOperatorCoordinator()) {
+ coordinator.start();
+ coordinator.handleEventFromOperator(
+ 0, 0, new
CollectSinkAddressEvent(socketServer.getSocketAddress()));
+
+ responseFuture =
+ coordinator.handleCoordinationRequest(
+
createRequestForClientGeneratedResponse(expectedVersion));
+ assertThat(responseFuture).isNotDone();
+ }
+ assertEmptyResponseGeneratedFromClient(responseFuture,
expectedVersion);
+ }
+ }
+
+ @Test
+ void testServerSideDisconnectWithReconnect() throws Exception {
+ try (CollectSinkOperatorCoordinator coordinator = new
CollectSinkOperatorCoordinator()) {
+ coordinator.start();
+
+ final TestingSocketServer socketServer =
+
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator);
+
+ final String expectedVersion = "version";
+ final CompletableFuture<CoordinationResponse> responseFuture =
coordinator.handleCoordinationRequest(
-
createRequestForClientGeneratedResponse(versionOfFailedRequest));
+
createRequestForClientGeneratedResponse(expectedVersion));
+
+ final CompletableFuture<SocketConnection> connectFuture =
socketServer.connectSocket();
+ FlinkAssertions.assertThatFuture(connectFuture)
+ .as("The connection to the client should be established.")
+ .eventuallySucceeds();
+
+ // simulation of the server side failing while the request is
handled on the server side
Review Comment:
As mentioend
[above](https://github.com/apache/flink/pull/23296#discussion_r1308280794):
> I refactored the method and variable names to use coordinator and
sinkFunction. Additionally, TestingSocketServer is renamed into
TestingSinkFunction
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]