zentol commented on code in PR #23296:
URL: https://github.com/apache/flink/pull/23296#discussion_r1307474170
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java:
##########
@@ -83,7 +104,10 @@ public void start() throws Exception {
@Override
public void close() throws Exception {
LOG.info("Closing the CollectSinkOperatorCoordinator.");
+ running = false;
this.executorService.shutdownNow();
+ ongoingRequests.forEach(ft -> ft.cancel(true));
Review Comment:
could it be that this alone is actually the solution?
If multiple requests were already queued into the executor we weren't
failing them anywhere.
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java:
##########
@@ -105,54 +102,147 @@ void testCloseAfterRequestIsReceived() throws Exception {
}
@Test
- void testServerFailure() throws Exception {
- CollectSinkOperatorCoordinator coordinator =
- new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
- coordinator.start();
-
- final String versionOfFailedRequest = "version3";
- final CompletableFuture<CoordinationResponse> failedResponseFuture;
- try (final TestingSocketServer socketServer =
-
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
-
- // a normal response
- final List<Row> expectedData0 = Arrays.asList(Row.of(1, "aaa"),
Row.of(2, "bbb"));
- final CompletableFuture<CoordinationResponse> responseFuture0 =
- coordinator.handleCoordinationRequest(
- createRequestForServerGeneratedResponse());
- socketServer.handleRequest(expectedData0);
- assertResponseWithDefaultMetadataFromServer(responseFuture0,
expectedData0);
+ void testSuccessfulResponse() throws Exception {
+ try (CollectSinkOperatorCoordinator testInstance = new
CollectSinkOperatorCoordinator();
+ final TestingSocketServer socketServer =
+
TestingSocketServer.createSocketServerAndInitializeCoordinator(
+ testInstance)) {
+ testInstance.start();
- // a normal response
- final List<Row> expectedData1 =
- Arrays.asList(Row.of(3, "ccc"), Row.of(4, "ddd"),
Row.of(5, "eee"));
- final CompletableFuture<CoordinationResponse> responseFuture1 =
- coordinator.handleCoordinationRequest(
+ final List<Row> expectedData = Arrays.asList(Row.of(1, "aaa"),
Row.of(2, "bbb"));
+ final CompletableFuture<CoordinationResponse> responseFuture =
+ testInstance.handleCoordinationRequest(
createRequestForServerGeneratedResponse());
- socketServer.handleRequest(expectedData1);
- assertResponseWithDefaultMetadataFromServer(responseFuture1,
expectedData1);
+ assertThat(responseFuture).isNotDone();
+
+ socketServer.handleRequest(expectedData);
+
+ assertResponseWithDefaultMetadataFromServer(responseFuture,
expectedData);
+ }
+ }
+
+ @Test
+ void testServerSideClosingTheServerSocket() throws Exception {
+ try (CollectSinkOperatorCoordinator coordinator = new
CollectSinkOperatorCoordinator()) {
+ coordinator.start();
+
+ final CompletableFuture<CoordinationResponse> responseFuture;
+ try (final TestingSocketServer socketServer =
+
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
+ final String version = "version";
+ responseFuture =
+ coordinator.handleCoordinationRequest(
+
createRequestForClientGeneratedResponse(version));
+ assertThat(responseFuture).isNotDone();
+
+
FlinkAssertions.assertThatFuture(socketServer.handleRequestWithoutResponse())
+ .eventuallySucceeds();
+ }
+ assertEmptyResponseGeneratedFromServer(responseFuture);
+ }
+ }
+
+ @Test
+ void testServerSideClosingTheAcceptingSocket() throws Exception {
+ try (CollectSinkOperatorCoordinator coordinator = new
CollectSinkOperatorCoordinator()) {
+ coordinator.start();
+
+ try (final TestingSocketServer socketServer =
+
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
+ final String version = "version";
+ final CompletableFuture<CoordinationResponse> responseFuture =
+ coordinator.handleCoordinationRequest(
+
createRequestForClientGeneratedResponse(version));
+ assertThat(responseFuture).isNotDone();
+
+ final CompletableFuture<List<Row>> dataFuture = new
CompletableFuture<>();
+ final CompletableFuture<Void> serverSideProcess =
+ socketServer.handleRequestAsync(dataFuture);
+ assertThat(serverSideProcess).isNotDone();
+
+ // wait for the connection to be established
+ socketServer.waitForConnectionToBeEstablished();
+ // in order to close the connection from the server's side
+ socketServer.closeAcceptingSocket();
+
+ assertEmptyResponseGeneratedFromClient(responseFuture,
version);
+
+ assertThat(serverSideProcess).isNotDone();
- // server closes here
- failedResponseFuture =
+ dataFuture.complete(Collections.singletonList(Row.of(123,
"abc")));
+
+ FlinkAssertions.assertThatFuture(serverSideProcess)
+ .eventuallyFailsWith(ExecutionException.class)
+ .withCauseInstanceOf(SocketException.class);
+ }
+ }
+ }
+
+ @Test
+ void testServerSideNotResponding() throws Exception {
Review Comment:
Name seems misleading because right now the server isn't even accepting
requests. `testServerSideClosingTheServerSocket` actually models this better.
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java:
##########
@@ -105,54 +102,147 @@ void testCloseAfterRequestIsReceived() throws Exception {
}
@Test
- void testServerFailure() throws Exception {
- CollectSinkOperatorCoordinator coordinator =
- new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
- coordinator.start();
-
- final String versionOfFailedRequest = "version3";
- final CompletableFuture<CoordinationResponse> failedResponseFuture;
- try (final TestingSocketServer socketServer =
-
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
-
- // a normal response
- final List<Row> expectedData0 = Arrays.asList(Row.of(1, "aaa"),
Row.of(2, "bbb"));
- final CompletableFuture<CoordinationResponse> responseFuture0 =
- coordinator.handleCoordinationRequest(
- createRequestForServerGeneratedResponse());
- socketServer.handleRequest(expectedData0);
- assertResponseWithDefaultMetadataFromServer(responseFuture0,
expectedData0);
+ void testSuccessfulResponse() throws Exception {
+ try (CollectSinkOperatorCoordinator testInstance = new
CollectSinkOperatorCoordinator();
+ final TestingSocketServer socketServer =
+
TestingSocketServer.createSocketServerAndInitializeCoordinator(
+ testInstance)) {
+ testInstance.start();
- // a normal response
- final List<Row> expectedData1 =
- Arrays.asList(Row.of(3, "ccc"), Row.of(4, "ddd"),
Row.of(5, "eee"));
- final CompletableFuture<CoordinationResponse> responseFuture1 =
- coordinator.handleCoordinationRequest(
+ final List<Row> expectedData = Arrays.asList(Row.of(1, "aaa"),
Row.of(2, "bbb"));
+ final CompletableFuture<CoordinationResponse> responseFuture =
+ testInstance.handleCoordinationRequest(
createRequestForServerGeneratedResponse());
- socketServer.handleRequest(expectedData1);
- assertResponseWithDefaultMetadataFromServer(responseFuture1,
expectedData1);
+ assertThat(responseFuture).isNotDone();
+
+ socketServer.handleRequest(expectedData);
+
+ assertResponseWithDefaultMetadataFromServer(responseFuture,
expectedData);
+ }
+ }
+
+ @Test
+ void testServerSideClosingTheServerSocket() throws Exception {
+ try (CollectSinkOperatorCoordinator coordinator = new
CollectSinkOperatorCoordinator()) {
+ coordinator.start();
+
+ final CompletableFuture<CoordinationResponse> responseFuture;
+ try (final TestingSocketServer socketServer =
+
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
+ final String version = "version";
+ responseFuture =
+ coordinator.handleCoordinationRequest(
+
createRequestForClientGeneratedResponse(version));
+ assertThat(responseFuture).isNotDone();
+
+
FlinkAssertions.assertThatFuture(socketServer.handleRequestWithoutResponse())
+ .eventuallySucceeds();
+ }
+ assertEmptyResponseGeneratedFromServer(responseFuture);
+ }
+ }
+
+ @Test
+ void testServerSideClosingTheAcceptingSocket() throws Exception {
+ try (CollectSinkOperatorCoordinator coordinator = new
CollectSinkOperatorCoordinator()) {
+ coordinator.start();
+
+ try (final TestingSocketServer socketServer =
+
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
+ final String version = "version";
+ final CompletableFuture<CoordinationResponse> responseFuture =
+ coordinator.handleCoordinationRequest(
+
createRequestForClientGeneratedResponse(version));
+ assertThat(responseFuture).isNotDone();
+
+ final CompletableFuture<List<Row>> dataFuture = new
CompletableFuture<>();
+ final CompletableFuture<Void> serverSideProcess =
+ socketServer.handleRequestAsync(dataFuture);
+ assertThat(serverSideProcess).isNotDone();
+
+ // wait for the connection to be established
+ socketServer.waitForConnectionToBeEstablished();
+ // in order to close the connection from the server's side
+ socketServer.closeAcceptingSocket();
+
+ assertEmptyResponseGeneratedFromClient(responseFuture,
version);
+
+ assertThat(serverSideProcess).isNotDone();
- // server closes here
- failedResponseFuture =
+ dataFuture.complete(Collections.singletonList(Row.of(123,
"abc")));
Review Comment:
why do we touch the data-future again? It's a server-side thing and should
have no effect on the test results.
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java:
##########
@@ -105,54 +102,147 @@ void testCloseAfterRequestIsReceived() throws Exception {
}
@Test
- void testServerFailure() throws Exception {
- CollectSinkOperatorCoordinator coordinator =
- new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
- coordinator.start();
-
- final String versionOfFailedRequest = "version3";
- final CompletableFuture<CoordinationResponse> failedResponseFuture;
- try (final TestingSocketServer socketServer =
-
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
-
- // a normal response
- final List<Row> expectedData0 = Arrays.asList(Row.of(1, "aaa"),
Row.of(2, "bbb"));
- final CompletableFuture<CoordinationResponse> responseFuture0 =
- coordinator.handleCoordinationRequest(
- createRequestForServerGeneratedResponse());
- socketServer.handleRequest(expectedData0);
- assertResponseWithDefaultMetadataFromServer(responseFuture0,
expectedData0);
+ void testSuccessfulResponse() throws Exception {
+ try (CollectSinkOperatorCoordinator testInstance = new
CollectSinkOperatorCoordinator();
+ final TestingSocketServer socketServer =
+
TestingSocketServer.createSocketServerAndInitializeCoordinator(
+ testInstance)) {
+ testInstance.start();
- // a normal response
- final List<Row> expectedData1 =
- Arrays.asList(Row.of(3, "ccc"), Row.of(4, "ddd"),
Row.of(5, "eee"));
- final CompletableFuture<CoordinationResponse> responseFuture1 =
- coordinator.handleCoordinationRequest(
+ final List<Row> expectedData = Arrays.asList(Row.of(1, "aaa"),
Row.of(2, "bbb"));
+ final CompletableFuture<CoordinationResponse> responseFuture =
+ testInstance.handleCoordinationRequest(
createRequestForServerGeneratedResponse());
- socketServer.handleRequest(expectedData1);
- assertResponseWithDefaultMetadataFromServer(responseFuture1,
expectedData1);
+ assertThat(responseFuture).isNotDone();
+
+ socketServer.handleRequest(expectedData);
+
+ assertResponseWithDefaultMetadataFromServer(responseFuture,
expectedData);
+ }
+ }
+
+ @Test
+ void testServerSideClosingTheServerSocket() throws Exception {
+ try (CollectSinkOperatorCoordinator coordinator = new
CollectSinkOperatorCoordinator()) {
+ coordinator.start();
+
+ final CompletableFuture<CoordinationResponse> responseFuture;
+ try (final TestingSocketServer socketServer =
+
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
+ final String version = "version";
+ responseFuture =
+ coordinator.handleCoordinationRequest(
+
createRequestForClientGeneratedResponse(version));
+ assertThat(responseFuture).isNotDone();
+
+
FlinkAssertions.assertThatFuture(socketServer.handleRequestWithoutResponse())
+ .eventuallySucceeds();
+ }
+ assertEmptyResponseGeneratedFromServer(responseFuture);
+ }
+ }
+
+ @Test
+ void testServerSideClosingTheAcceptingSocket() throws Exception {
+ try (CollectSinkOperatorCoordinator coordinator = new
CollectSinkOperatorCoordinator()) {
+ coordinator.start();
+
+ try (final TestingSocketServer socketServer =
+
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
+ final String version = "version";
+ final CompletableFuture<CoordinationResponse> responseFuture =
+ coordinator.handleCoordinationRequest(
+
createRequestForClientGeneratedResponse(version));
+ assertThat(responseFuture).isNotDone();
+
+ final CompletableFuture<List<Row>> dataFuture = new
CompletableFuture<>();
+ final CompletableFuture<Void> serverSideProcess =
+ socketServer.handleRequestAsync(dataFuture);
+ assertThat(serverSideProcess).isNotDone();
+
+ // wait for the connection to be established
+ socketServer.waitForConnectionToBeEstablished();
+ // in order to close the connection from the server's side
+ socketServer.closeAcceptingSocket();
+
+ assertEmptyResponseGeneratedFromClient(responseFuture,
version);
+
+ assertThat(serverSideProcess).isNotDone();
- // server closes here
- failedResponseFuture =
+ dataFuture.complete(Collections.singletonList(Row.of(123,
"abc")));
+
+ FlinkAssertions.assertThatFuture(serverSideProcess)
+ .eventuallyFailsWith(ExecutionException.class)
+ .withCauseInstanceOf(SocketException.class);
+ }
+ }
+ }
+
+ @Test
+ void testServerSideNotResponding() throws Exception {
+ try (final TestingSocketServer socketServer = new
TestingSocketServer()) {
+ final String expectedVersion = "version";
+ final CompletableFuture<CoordinationResponse> responseFuture;
+ try (CollectSinkOperatorCoordinator coordinator =
+ new CollectSinkOperatorCoordinator()) {
+ coordinator.start();
+ coordinator.handleEventFromOperator(
+ 0, 0, new
CollectSinkAddressEvent(socketServer.getSocketAddress()));
+
+ responseFuture =
+ coordinator.handleCoordinationRequest(
+
createRequestForClientGeneratedResponse(expectedVersion));
+ assertThat(responseFuture).isNotDone();
+ }
+ assertEmptyResponseGeneratedFromClient(responseFuture,
expectedVersion);
+ }
+ }
+
+ @Test
+ void testServerSideDisconnectWithReconnect() throws Exception {
+ try (CollectSinkOperatorCoordinator coordinator = new
CollectSinkOperatorCoordinator()) {
+ coordinator.start();
+
+ final TestingSocketServer socketServer =
+
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator);
+
+ final String expectedVersion = "version";
+ final CompletableFuture<CoordinationResponse> responseFuture =
coordinator.handleCoordinationRequest(
-
createRequestForClientGeneratedResponse(versionOfFailedRequest));
+
createRequestForClientGeneratedResponse(expectedVersion));
+
+ final CompletableFuture<SocketConnection> connectFuture =
socketServer.connectSocket();
+ FlinkAssertions.assertThatFuture(connectFuture)
+ .as("The connection to the client should be established.")
+ .eventuallySucceeds();
+
+ // simulation of the server side failing while the request is
handled on the server side
Review Comment:
Maybe we should drop the client/server terminology because afaict we have 3
participants; the actual client (e.g., CLI), the coordinator and the sink
operator (== TestingSocketServer), creating 2 client/server pairs.
It's just really confusing.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java:
##########
@@ -152,9 +176,11 @@ private CoordinationResponse handleRequestImpl(
throw new NullPointerException("No sinkAddress available.");
}
- if (socketConnection == null) {
- socketConnection = SocketConnection.create(socketTimeout,
sinkAddress);
- LOG.info("Sink connection established");
+ synchronized (connectionLock) {
+ if (running && socketConnection == null) {
Review Comment:
otherwise we should exit eagerly instead of running into an NPE later on.
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java:
##########
@@ -105,54 +102,147 @@ void testCloseAfterRequestIsReceived() throws Exception {
}
@Test
- void testServerFailure() throws Exception {
- CollectSinkOperatorCoordinator coordinator =
- new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
- coordinator.start();
-
- final String versionOfFailedRequest = "version3";
- final CompletableFuture<CoordinationResponse> failedResponseFuture;
- try (final TestingSocketServer socketServer =
-
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
-
- // a normal response
- final List<Row> expectedData0 = Arrays.asList(Row.of(1, "aaa"),
Row.of(2, "bbb"));
- final CompletableFuture<CoordinationResponse> responseFuture0 =
- coordinator.handleCoordinationRequest(
- createRequestForServerGeneratedResponse());
- socketServer.handleRequest(expectedData0);
- assertResponseWithDefaultMetadataFromServer(responseFuture0,
expectedData0);
+ void testSuccessfulResponse() throws Exception {
+ try (CollectSinkOperatorCoordinator testInstance = new
CollectSinkOperatorCoordinator();
+ final TestingSocketServer socketServer =
+
TestingSocketServer.createSocketServerAndInitializeCoordinator(
+ testInstance)) {
+ testInstance.start();
- // a normal response
- final List<Row> expectedData1 =
- Arrays.asList(Row.of(3, "ccc"), Row.of(4, "ddd"),
Row.of(5, "eee"));
- final CompletableFuture<CoordinationResponse> responseFuture1 =
- coordinator.handleCoordinationRequest(
+ final List<Row> expectedData = Arrays.asList(Row.of(1, "aaa"),
Row.of(2, "bbb"));
+ final CompletableFuture<CoordinationResponse> responseFuture =
+ testInstance.handleCoordinationRequest(
createRequestForServerGeneratedResponse());
- socketServer.handleRequest(expectedData1);
- assertResponseWithDefaultMetadataFromServer(responseFuture1,
expectedData1);
+ assertThat(responseFuture).isNotDone();
+
+ socketServer.handleRequest(expectedData);
+
+ assertResponseWithDefaultMetadataFromServer(responseFuture,
expectedData);
+ }
+ }
+
+ @Test
+ void testServerSideClosingTheServerSocket() throws Exception {
+ try (CollectSinkOperatorCoordinator coordinator = new
CollectSinkOperatorCoordinator()) {
+ coordinator.start();
+
+ final CompletableFuture<CoordinationResponse> responseFuture;
+ try (final TestingSocketServer socketServer =
+
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
+ final String version = "version";
+ responseFuture =
+ coordinator.handleCoordinationRequest(
+
createRequestForClientGeneratedResponse(version));
+ assertThat(responseFuture).isNotDone();
+
+
FlinkAssertions.assertThatFuture(socketServer.handleRequestWithoutResponse())
+ .eventuallySucceeds();
+ }
+ assertEmptyResponseGeneratedFromServer(responseFuture);
+ }
+ }
+
+ @Test
+ void testServerSideClosingTheAcceptingSocket() throws Exception {
+ try (CollectSinkOperatorCoordinator coordinator = new
CollectSinkOperatorCoordinator()) {
+ coordinator.start();
+
+ try (final TestingSocketServer socketServer =
+
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
+ final String version = "version";
+ final CompletableFuture<CoordinationResponse> responseFuture =
+ coordinator.handleCoordinationRequest(
+
createRequestForClientGeneratedResponse(version));
+ assertThat(responseFuture).isNotDone();
+
+ final CompletableFuture<List<Row>> dataFuture = new
CompletableFuture<>();
+ final CompletableFuture<Void> serverSideProcess =
+ socketServer.handleRequestAsync(dataFuture);
+ assertThat(serverSideProcess).isNotDone();
+
+ // wait for the connection to be established
+ socketServer.waitForConnectionToBeEstablished();
+ // in order to close the connection from the server's side
+ socketServer.closeAcceptingSocket();
+
+ assertEmptyResponseGeneratedFromClient(responseFuture,
version);
+
+ assertThat(serverSideProcess).isNotDone();
- // server closes here
- failedResponseFuture =
+ dataFuture.complete(Collections.singletonList(Row.of(123,
"abc")));
+
+ FlinkAssertions.assertThatFuture(serverSideProcess)
+ .eventuallyFailsWith(ExecutionException.class)
+ .withCauseInstanceOf(SocketException.class);
Review Comment:
I would completely drop the last 2 assertion blocks
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java:
##########
@@ -152,9 +176,11 @@ private CoordinationResponse handleRequestImpl(
throw new NullPointerException("No sinkAddress available.");
}
- if (socketConnection == null) {
- socketConnection = SocketConnection.create(socketTimeout,
sinkAddress);
- LOG.info("Sink connection established");
+ synchronized (connectionLock) {
Review Comment:
Wondering if this shouldn't be a larger block.
Does the client handle it properly when the connection breaks down while it
is reading data?
What's the desired behavior here?
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java:
##########
@@ -105,54 +102,147 @@ void testCloseAfterRequestIsReceived() throws Exception {
}
@Test
- void testServerFailure() throws Exception {
- CollectSinkOperatorCoordinator coordinator =
- new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
- coordinator.start();
-
- final String versionOfFailedRequest = "version3";
- final CompletableFuture<CoordinationResponse> failedResponseFuture;
- try (final TestingSocketServer socketServer =
-
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
-
- // a normal response
- final List<Row> expectedData0 = Arrays.asList(Row.of(1, "aaa"),
Row.of(2, "bbb"));
- final CompletableFuture<CoordinationResponse> responseFuture0 =
- coordinator.handleCoordinationRequest(
- createRequestForServerGeneratedResponse());
- socketServer.handleRequest(expectedData0);
- assertResponseWithDefaultMetadataFromServer(responseFuture0,
expectedData0);
+ void testSuccessfulResponse() throws Exception {
+ try (CollectSinkOperatorCoordinator testInstance = new
CollectSinkOperatorCoordinator();
+ final TestingSocketServer socketServer =
+
TestingSocketServer.createSocketServerAndInitializeCoordinator(
+ testInstance)) {
+ testInstance.start();
- // a normal response
- final List<Row> expectedData1 =
- Arrays.asList(Row.of(3, "ccc"), Row.of(4, "ddd"),
Row.of(5, "eee"));
- final CompletableFuture<CoordinationResponse> responseFuture1 =
- coordinator.handleCoordinationRequest(
+ final List<Row> expectedData = Arrays.asList(Row.of(1, "aaa"),
Row.of(2, "bbb"));
+ final CompletableFuture<CoordinationResponse> responseFuture =
+ testInstance.handleCoordinationRequest(
createRequestForServerGeneratedResponse());
- socketServer.handleRequest(expectedData1);
- assertResponseWithDefaultMetadataFromServer(responseFuture1,
expectedData1);
+ assertThat(responseFuture).isNotDone();
+
+ socketServer.handleRequest(expectedData);
+
+ assertResponseWithDefaultMetadataFromServer(responseFuture,
expectedData);
+ }
+ }
+
+ @Test
+ void testServerSideClosingTheServerSocket() throws Exception {
+ try (CollectSinkOperatorCoordinator coordinator = new
CollectSinkOperatorCoordinator()) {
+ coordinator.start();
+
+ final CompletableFuture<CoordinationResponse> responseFuture;
+ try (final TestingSocketServer socketServer =
+
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
+ final String version = "version";
+ responseFuture =
+ coordinator.handleCoordinationRequest(
+
createRequestForClientGeneratedResponse(version));
+ assertThat(responseFuture).isNotDone();
+
+
FlinkAssertions.assertThatFuture(socketServer.handleRequestWithoutResponse())
+ .eventuallySucceeds();
+ }
+ assertEmptyResponseGeneratedFromServer(responseFuture);
Review Comment:
A comment stating that the connection was now closed would go a long way
here and other tests
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java:
##########
@@ -83,7 +104,10 @@ public void start() throws Exception {
@Override
public void close() throws Exception {
LOG.info("Closing the CollectSinkOperatorCoordinator.");
+ running = false;
this.executorService.shutdownNow();
Review Comment:
I'm wondering whether `awaitTermination` wouldn't be a simpler option.
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java:
##########
@@ -105,54 +102,147 @@ void testCloseAfterRequestIsReceived() throws Exception {
}
@Test
- void testServerFailure() throws Exception {
- CollectSinkOperatorCoordinator coordinator =
- new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
- coordinator.start();
-
- final String versionOfFailedRequest = "version3";
- final CompletableFuture<CoordinationResponse> failedResponseFuture;
- try (final TestingSocketServer socketServer =
-
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
-
- // a normal response
- final List<Row> expectedData0 = Arrays.asList(Row.of(1, "aaa"),
Row.of(2, "bbb"));
- final CompletableFuture<CoordinationResponse> responseFuture0 =
- coordinator.handleCoordinationRequest(
- createRequestForServerGeneratedResponse());
- socketServer.handleRequest(expectedData0);
- assertResponseWithDefaultMetadataFromServer(responseFuture0,
expectedData0);
+ void testSuccessfulResponse() throws Exception {
+ try (CollectSinkOperatorCoordinator testInstance = new
CollectSinkOperatorCoordinator();
+ final TestingSocketServer socketServer =
+
TestingSocketServer.createSocketServerAndInitializeCoordinator(
+ testInstance)) {
+ testInstance.start();
- // a normal response
- final List<Row> expectedData1 =
- Arrays.asList(Row.of(3, "ccc"), Row.of(4, "ddd"),
Row.of(5, "eee"));
- final CompletableFuture<CoordinationResponse> responseFuture1 =
- coordinator.handleCoordinationRequest(
+ final List<Row> expectedData = Arrays.asList(Row.of(1, "aaa"),
Row.of(2, "bbb"));
+ final CompletableFuture<CoordinationResponse> responseFuture =
+ testInstance.handleCoordinationRequest(
createRequestForServerGeneratedResponse());
- socketServer.handleRequest(expectedData1);
- assertResponseWithDefaultMetadataFromServer(responseFuture1,
expectedData1);
+ assertThat(responseFuture).isNotDone();
+
+ socketServer.handleRequest(expectedData);
+
+ assertResponseWithDefaultMetadataFromServer(responseFuture,
expectedData);
+ }
+ }
+
+ @Test
+ void testServerSideClosingTheServerSocket() throws Exception {
+ try (CollectSinkOperatorCoordinator coordinator = new
CollectSinkOperatorCoordinator()) {
+ coordinator.start();
+
+ final CompletableFuture<CoordinationResponse> responseFuture;
+ try (final TestingSocketServer socketServer =
+
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
+ final String version = "version";
+ responseFuture =
+ coordinator.handleCoordinationRequest(
+
createRequestForClientGeneratedResponse(version));
+ assertThat(responseFuture).isNotDone();
+
+
FlinkAssertions.assertThatFuture(socketServer.handleRequestWithoutResponse())
+ .eventuallySucceeds();
+ }
+ assertEmptyResponseGeneratedFromServer(responseFuture);
+ }
+ }
+
+ @Test
+ void testServerSideClosingTheAcceptingSocket() throws Exception {
+ try (CollectSinkOperatorCoordinator coordinator = new
CollectSinkOperatorCoordinator()) {
+ coordinator.start();
+
+ try (final TestingSocketServer socketServer =
+
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
+ final String version = "version";
+ final CompletableFuture<CoordinationResponse> responseFuture =
+ coordinator.handleCoordinationRequest(
+
createRequestForClientGeneratedResponse(version));
+ assertThat(responseFuture).isNotDone();
+
+ final CompletableFuture<List<Row>> dataFuture = new
CompletableFuture<>();
+ final CompletableFuture<Void> serverSideProcess =
+ socketServer.handleRequestAsync(dataFuture);
+ assertThat(serverSideProcess).isNotDone();
+
+ // wait for the connection to be established
+ socketServer.waitForConnectionToBeEstablished();
+ // in order to close the connection from the server's side
+ socketServer.closeAcceptingSocket();
+
+ assertEmptyResponseGeneratedFromClient(responseFuture,
version);
+
+ assertThat(serverSideProcess).isNotDone();
- // server closes here
- failedResponseFuture =
+ dataFuture.complete(Collections.singletonList(Row.of(123,
"abc")));
+
+ FlinkAssertions.assertThatFuture(serverSideProcess)
+ .eventuallyFailsWith(ExecutionException.class)
+ .withCauseInstanceOf(SocketException.class);
Review Comment:
This seems to primarily be testing the `TestingSocketServer`.
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java:
##########
@@ -173,72 +174,192 @@ private static void assertResponse(
}
}
- private static class ServerThread extends Thread {
+ /**
+ * {@code TestingSocketServer} simulates the server side of the {@link
+ * CollectSinkOperatorCoordinator} communication.
+ */
+ private static class TestingSocketServer implements AutoCloseable {
- static final String DEFAULT_SERVER_RESPONSE_VERSION =
"server-response-version";
+ static final String DEFAULT_SERVER_RESPONSE_VERSION = "version";
static final int DEFAULT_SERVER_RESPONSE_OFFSET = 2;
- private final LinkedList<List<Row>> data;
- private final int closeRequestNum;
+ private final ServerSocket serverSocket;
- private final ServerSocket server;
- private boolean running;
+ private CompletableFuture<SocketConnection> connectionFuture = new
CompletableFuture<>();
- private ServerThread(List<List<Row>> data, int closeRequestNum) throws
IOException {
- this.data = new LinkedList<>(data);
- this.closeRequestNum = closeRequestNum;
+ /**
+ * Creates a {@code TestingSocketServer} and connects it with the
passed {@link
+ * CollectSinkOperatorCoordinator}.
+ */
+ public static TestingSocketServer
createSocketServerAndInitializeCoordinator(
+ CollectSinkOperatorCoordinator coordinator) throws Exception {
+ final TestingSocketServer socketServer = new TestingSocketServer();
+ coordinator.handleEventFromOperator(
+ 0, 0, new
CollectSinkAddressEvent(socketServer.getSocketAddress()));
- this.server = new ServerSocket(0);
+ return socketServer;
+ }
+
+ /**
+ * Instantiates a {@code TestingSocketServer}. The instance is not
listening, yet. The
+ * connection will be established with the first request being handled.
+ *
+ * @see #handleRequest(List)
+ * @see #handleRequest(String, int, List)
+ * @see #handleRequestAsync(CompletableFuture)
+ * @see #handleRequestAsync(String, int, CompletableFuture)
+ */
+ public TestingSocketServer() throws IOException {
+ this.serverSocket = new ServerSocket(0);
+ }
+
+ /** Returns the {@link InetSocketAddress} of the {@code
TestingSocketServer}. */
+ public InetSocketAddress getSocketAddress() {
+ return new InetSocketAddress(
+ InetAddress.getLoopbackAddress(),
serverSocket.getLocalPort());
}
@Override
- public void run() {
- running = true;
-
- int requestNum = 0;
- Socket socket = null;
- DataInputViewStreamWrapper inStream = null;
- DataOutputViewStreamWrapper outStream = null;
-
- try {
- while (running) {
- if (socket == null) {
- socket = NetUtils.acceptWithoutTimeout(server);
- inStream = new
DataInputViewStreamWrapper(socket.getInputStream());
- outStream = new
DataOutputViewStreamWrapper(socket.getOutputStream());
- }
-
- // parsing the request to ensure correct format of input
message
- new CollectCoordinationRequest(inStream);
-
- requestNum++;
- if (requestNum >= closeRequestNum) {
- // server close abruptly
- running = false;
- break;
- }
-
- // serialize generic response (only the data is relevant)
- new CollectCoordinationResponse(
- DEFAULT_SERVER_RESPONSE_VERSION,
- DEFAULT_SERVER_RESPONSE_OFFSET,
-
CollectTestUtils.toBytesList(data.removeFirst(), serializer))
- .serialize(outStream);
- }
-
- socket.close();
- server.close();
- } catch (IOException e) {
- // ignore
+ public void close() throws Exception {
+ closeAcceptingSocket();
+ this.serverSocket.close();
+ }
+
+ private CompletableFuture<SocketConnection> acceptSocketAsync() {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return new SocketConnection(
+
NetUtils.acceptWithoutTimeout(serverSocket));
+ } catch (IOException e) {
+ throw new CompletionException(e);
+ }
+ });
+ }
+
+ /** Closes the established connection from the server side. */
+ public void closeAcceptingSocket() throws Exception {
+ if (connectionFuture.isDone()) {
+ connectionFuture.get().close();
+ connectionFuture = new CompletableFuture<>();
}
}
- public void close() {
- running = false;
+ /**
+ * Waits for the connection to be established between the client and
the server. This method
+ * will block until a request is sent by the client and a {@code
handle*} call is initiated
+ * by this instance.
+ */
+ public void waitForConnectionToBeEstablished()
+ throws ExecutionException, InterruptedException {
+ connectionFuture.get();
+ }
+
+ /**
+ * Handles a request with the given data in a synchronous fashion. The
{@code
+ * TestingSocketServer}'s default meta information is attached to the
response.
+ *
+ * @see #DEFAULT_SERVER_RESPONSE_VERSION
+ * @see #DEFAULT_SERVER_RESPONSE_OFFSET
+ */
+ public void handleRequest(List<Row> actualData) {
+ handleRequest(
+ DEFAULT_SERVER_RESPONSE_VERSION,
DEFAULT_SERVER_RESPONSE_OFFSET, actualData);
+ }
+
+ /**
+ * Handles the next request synchronously. The passed {@code
actualData} will be forwarded
+ * to the response.
+ */
+ public void handleRequest(String actualVersion, int actualOffset,
List<Row> actualData) {
+ handleRequestAsync(
+ actualVersion,
+ actualOffset,
+ CompletableFuture.completedFuture(actualData))
+ .join();
+ }
+
+ /**
+ * Handles a request with the given data in an asynchronous fashion
with the {@code
+ * TestingSocketServer}'s default meta information being attached to
the response. The
+ * returned future completes when the server side's processing
completes.
+ *
+ * @see #DEFAULT_SERVER_RESPONSE_VERSION
+ * @see #DEFAULT_SERVER_RESPONSE_OFFSET
+ */
+ public CompletableFuture<Void> handleRequestAsync(
+ CompletableFuture<List<Row>> actualDataAsync) {
+ return handleRequestAsync(
+ DEFAULT_SERVER_RESPONSE_VERSION,
+ DEFAULT_SERVER_RESPONSE_OFFSET,
+ actualDataAsync);
+ }
+
+ private CompletableFuture<SocketConnection> connectSocket() {
+ if (!connectionFuture.isDone()) {
+ FutureUtils.forward(acceptSocketAsync(), connectionFuture);
Review Comment:
Feels weird to set up this forwarding here instead of whenever the
`connectionFuture` is reset (in the constructor and closeAcceptingSocket)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]