zentol commented on code in PR #23296:
URL: https://github.com/apache/flink/pull/23296#discussion_r1307474170


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java:
##########
@@ -83,7 +104,10 @@ public void start() throws Exception {
     @Override
     public void close() throws Exception {
         LOG.info("Closing the CollectSinkOperatorCoordinator.");
+        running = false;
         this.executorService.shutdownNow();
+        ongoingRequests.forEach(ft -> ft.cancel(true));

Review Comment:
   could it be that this alone is actually the solution?
   If multiple requests were already queued into the executor we weren't 
failing them anywhere.



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java:
##########
@@ -105,54 +102,147 @@ void testCloseAfterRequestIsReceived() throws Exception {
     }
 
     @Test
-    void testServerFailure() throws Exception {
-        CollectSinkOperatorCoordinator coordinator =
-                new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
-        coordinator.start();
-
-        final String versionOfFailedRequest = "version3";
-        final CompletableFuture<CoordinationResponse> failedResponseFuture;
-        try (final TestingSocketServer socketServer =
-                
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
-
-            // a normal response
-            final List<Row> expectedData0 = Arrays.asList(Row.of(1, "aaa"), 
Row.of(2, "bbb"));
-            final CompletableFuture<CoordinationResponse> responseFuture0 =
-                    coordinator.handleCoordinationRequest(
-                            createRequestForServerGeneratedResponse());
-            socketServer.handleRequest(expectedData0);
-            assertResponseWithDefaultMetadataFromServer(responseFuture0, 
expectedData0);
+    void testSuccessfulResponse() throws Exception {
+        try (CollectSinkOperatorCoordinator testInstance = new 
CollectSinkOperatorCoordinator();
+                final TestingSocketServer socketServer =
+                        
TestingSocketServer.createSocketServerAndInitializeCoordinator(
+                                testInstance)) {
+            testInstance.start();
 
-            // a normal response
-            final List<Row> expectedData1 =
-                    Arrays.asList(Row.of(3, "ccc"), Row.of(4, "ddd"), 
Row.of(5, "eee"));
-            final CompletableFuture<CoordinationResponse> responseFuture1 =
-                    coordinator.handleCoordinationRequest(
+            final List<Row> expectedData = Arrays.asList(Row.of(1, "aaa"), 
Row.of(2, "bbb"));
+            final CompletableFuture<CoordinationResponse> responseFuture =
+                    testInstance.handleCoordinationRequest(
                             createRequestForServerGeneratedResponse());
-            socketServer.handleRequest(expectedData1);
-            assertResponseWithDefaultMetadataFromServer(responseFuture1, 
expectedData1);
+            assertThat(responseFuture).isNotDone();
+
+            socketServer.handleRequest(expectedData);
+
+            assertResponseWithDefaultMetadataFromServer(responseFuture, 
expectedData);
+        }
+    }
+
+    @Test
+    void testServerSideClosingTheServerSocket() throws Exception {
+        try (CollectSinkOperatorCoordinator coordinator = new 
CollectSinkOperatorCoordinator()) {
+            coordinator.start();
+
+            final CompletableFuture<CoordinationResponse> responseFuture;
+            try (final TestingSocketServer socketServer =
+                    
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
+                final String version = "version";
+                responseFuture =
+                        coordinator.handleCoordinationRequest(
+                                
createRequestForClientGeneratedResponse(version));
+                assertThat(responseFuture).isNotDone();
+
+                
FlinkAssertions.assertThatFuture(socketServer.handleRequestWithoutResponse())
+                        .eventuallySucceeds();
+            }
+            assertEmptyResponseGeneratedFromServer(responseFuture);
+        }
+    }
+
+    @Test
+    void testServerSideClosingTheAcceptingSocket() throws Exception {
+        try (CollectSinkOperatorCoordinator coordinator = new 
CollectSinkOperatorCoordinator()) {
+            coordinator.start();
+
+            try (final TestingSocketServer socketServer =
+                    
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
+                final String version = "version";
+                final CompletableFuture<CoordinationResponse> responseFuture =
+                        coordinator.handleCoordinationRequest(
+                                
createRequestForClientGeneratedResponse(version));
+                assertThat(responseFuture).isNotDone();
+
+                final CompletableFuture<List<Row>> dataFuture = new 
CompletableFuture<>();
+                final CompletableFuture<Void> serverSideProcess =
+                        socketServer.handleRequestAsync(dataFuture);
+                assertThat(serverSideProcess).isNotDone();
+
+                // wait for the connection to be established
+                socketServer.waitForConnectionToBeEstablished();
+                // in order to close the connection from the server's side
+                socketServer.closeAcceptingSocket();
+
+                assertEmptyResponseGeneratedFromClient(responseFuture, 
version);
+
+                assertThat(serverSideProcess).isNotDone();
 
-            // server closes here
-            failedResponseFuture =
+                dataFuture.complete(Collections.singletonList(Row.of(123, 
"abc")));
+
+                FlinkAssertions.assertThatFuture(serverSideProcess)
+                        .eventuallyFailsWith(ExecutionException.class)
+                        .withCauseInstanceOf(SocketException.class);
+            }
+        }
+    }
+
+    @Test
+    void testServerSideNotResponding() throws Exception {

Review Comment:
   Name seems misleading because right now the server isn't even accepting 
requests. `testServerSideClosingTheServerSocket` actually models this better.



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java:
##########
@@ -105,54 +102,147 @@ void testCloseAfterRequestIsReceived() throws Exception {
     }
 
     @Test
-    void testServerFailure() throws Exception {
-        CollectSinkOperatorCoordinator coordinator =
-                new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
-        coordinator.start();
-
-        final String versionOfFailedRequest = "version3";
-        final CompletableFuture<CoordinationResponse> failedResponseFuture;
-        try (final TestingSocketServer socketServer =
-                
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
-
-            // a normal response
-            final List<Row> expectedData0 = Arrays.asList(Row.of(1, "aaa"), 
Row.of(2, "bbb"));
-            final CompletableFuture<CoordinationResponse> responseFuture0 =
-                    coordinator.handleCoordinationRequest(
-                            createRequestForServerGeneratedResponse());
-            socketServer.handleRequest(expectedData0);
-            assertResponseWithDefaultMetadataFromServer(responseFuture0, 
expectedData0);
+    void testSuccessfulResponse() throws Exception {
+        try (CollectSinkOperatorCoordinator testInstance = new 
CollectSinkOperatorCoordinator();
+                final TestingSocketServer socketServer =
+                        
TestingSocketServer.createSocketServerAndInitializeCoordinator(
+                                testInstance)) {
+            testInstance.start();
 
-            // a normal response
-            final List<Row> expectedData1 =
-                    Arrays.asList(Row.of(3, "ccc"), Row.of(4, "ddd"), 
Row.of(5, "eee"));
-            final CompletableFuture<CoordinationResponse> responseFuture1 =
-                    coordinator.handleCoordinationRequest(
+            final List<Row> expectedData = Arrays.asList(Row.of(1, "aaa"), 
Row.of(2, "bbb"));
+            final CompletableFuture<CoordinationResponse> responseFuture =
+                    testInstance.handleCoordinationRequest(
                             createRequestForServerGeneratedResponse());
-            socketServer.handleRequest(expectedData1);
-            assertResponseWithDefaultMetadataFromServer(responseFuture1, 
expectedData1);
+            assertThat(responseFuture).isNotDone();
+
+            socketServer.handleRequest(expectedData);
+
+            assertResponseWithDefaultMetadataFromServer(responseFuture, 
expectedData);
+        }
+    }
+
+    @Test
+    void testServerSideClosingTheServerSocket() throws Exception {
+        try (CollectSinkOperatorCoordinator coordinator = new 
CollectSinkOperatorCoordinator()) {
+            coordinator.start();
+
+            final CompletableFuture<CoordinationResponse> responseFuture;
+            try (final TestingSocketServer socketServer =
+                    
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
+                final String version = "version";
+                responseFuture =
+                        coordinator.handleCoordinationRequest(
+                                
createRequestForClientGeneratedResponse(version));
+                assertThat(responseFuture).isNotDone();
+
+                
FlinkAssertions.assertThatFuture(socketServer.handleRequestWithoutResponse())
+                        .eventuallySucceeds();
+            }
+            assertEmptyResponseGeneratedFromServer(responseFuture);
+        }
+    }
+
+    @Test
+    void testServerSideClosingTheAcceptingSocket() throws Exception {
+        try (CollectSinkOperatorCoordinator coordinator = new 
CollectSinkOperatorCoordinator()) {
+            coordinator.start();
+
+            try (final TestingSocketServer socketServer =
+                    
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
+                final String version = "version";
+                final CompletableFuture<CoordinationResponse> responseFuture =
+                        coordinator.handleCoordinationRequest(
+                                
createRequestForClientGeneratedResponse(version));
+                assertThat(responseFuture).isNotDone();
+
+                final CompletableFuture<List<Row>> dataFuture = new 
CompletableFuture<>();
+                final CompletableFuture<Void> serverSideProcess =
+                        socketServer.handleRequestAsync(dataFuture);
+                assertThat(serverSideProcess).isNotDone();
+
+                // wait for the connection to be established
+                socketServer.waitForConnectionToBeEstablished();
+                // in order to close the connection from the server's side
+                socketServer.closeAcceptingSocket();
+
+                assertEmptyResponseGeneratedFromClient(responseFuture, 
version);
+
+                assertThat(serverSideProcess).isNotDone();
 
-            // server closes here
-            failedResponseFuture =
+                dataFuture.complete(Collections.singletonList(Row.of(123, 
"abc")));

Review Comment:
   why do we touch the data-future again? It's a server-side thing and should 
have no effect on the test results.



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java:
##########
@@ -105,54 +102,147 @@ void testCloseAfterRequestIsReceived() throws Exception {
     }
 
     @Test
-    void testServerFailure() throws Exception {
-        CollectSinkOperatorCoordinator coordinator =
-                new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
-        coordinator.start();
-
-        final String versionOfFailedRequest = "version3";
-        final CompletableFuture<CoordinationResponse> failedResponseFuture;
-        try (final TestingSocketServer socketServer =
-                
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
-
-            // a normal response
-            final List<Row> expectedData0 = Arrays.asList(Row.of(1, "aaa"), 
Row.of(2, "bbb"));
-            final CompletableFuture<CoordinationResponse> responseFuture0 =
-                    coordinator.handleCoordinationRequest(
-                            createRequestForServerGeneratedResponse());
-            socketServer.handleRequest(expectedData0);
-            assertResponseWithDefaultMetadataFromServer(responseFuture0, 
expectedData0);
+    void testSuccessfulResponse() throws Exception {
+        try (CollectSinkOperatorCoordinator testInstance = new 
CollectSinkOperatorCoordinator();
+                final TestingSocketServer socketServer =
+                        
TestingSocketServer.createSocketServerAndInitializeCoordinator(
+                                testInstance)) {
+            testInstance.start();
 
-            // a normal response
-            final List<Row> expectedData1 =
-                    Arrays.asList(Row.of(3, "ccc"), Row.of(4, "ddd"), 
Row.of(5, "eee"));
-            final CompletableFuture<CoordinationResponse> responseFuture1 =
-                    coordinator.handleCoordinationRequest(
+            final List<Row> expectedData = Arrays.asList(Row.of(1, "aaa"), 
Row.of(2, "bbb"));
+            final CompletableFuture<CoordinationResponse> responseFuture =
+                    testInstance.handleCoordinationRequest(
                             createRequestForServerGeneratedResponse());
-            socketServer.handleRequest(expectedData1);
-            assertResponseWithDefaultMetadataFromServer(responseFuture1, 
expectedData1);
+            assertThat(responseFuture).isNotDone();
+
+            socketServer.handleRequest(expectedData);
+
+            assertResponseWithDefaultMetadataFromServer(responseFuture, 
expectedData);
+        }
+    }
+
+    @Test
+    void testServerSideClosingTheServerSocket() throws Exception {
+        try (CollectSinkOperatorCoordinator coordinator = new 
CollectSinkOperatorCoordinator()) {
+            coordinator.start();
+
+            final CompletableFuture<CoordinationResponse> responseFuture;
+            try (final TestingSocketServer socketServer =
+                    
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
+                final String version = "version";
+                responseFuture =
+                        coordinator.handleCoordinationRequest(
+                                
createRequestForClientGeneratedResponse(version));
+                assertThat(responseFuture).isNotDone();
+
+                
FlinkAssertions.assertThatFuture(socketServer.handleRequestWithoutResponse())
+                        .eventuallySucceeds();
+            }
+            assertEmptyResponseGeneratedFromServer(responseFuture);
+        }
+    }
+
+    @Test
+    void testServerSideClosingTheAcceptingSocket() throws Exception {
+        try (CollectSinkOperatorCoordinator coordinator = new 
CollectSinkOperatorCoordinator()) {
+            coordinator.start();
+
+            try (final TestingSocketServer socketServer =
+                    
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
+                final String version = "version";
+                final CompletableFuture<CoordinationResponse> responseFuture =
+                        coordinator.handleCoordinationRequest(
+                                
createRequestForClientGeneratedResponse(version));
+                assertThat(responseFuture).isNotDone();
+
+                final CompletableFuture<List<Row>> dataFuture = new 
CompletableFuture<>();
+                final CompletableFuture<Void> serverSideProcess =
+                        socketServer.handleRequestAsync(dataFuture);
+                assertThat(serverSideProcess).isNotDone();
+
+                // wait for the connection to be established
+                socketServer.waitForConnectionToBeEstablished();
+                // in order to close the connection from the server's side
+                socketServer.closeAcceptingSocket();
+
+                assertEmptyResponseGeneratedFromClient(responseFuture, 
version);
+
+                assertThat(serverSideProcess).isNotDone();
 
-            // server closes here
-            failedResponseFuture =
+                dataFuture.complete(Collections.singletonList(Row.of(123, 
"abc")));
+
+                FlinkAssertions.assertThatFuture(serverSideProcess)
+                        .eventuallyFailsWith(ExecutionException.class)
+                        .withCauseInstanceOf(SocketException.class);
+            }
+        }
+    }
+
+    @Test
+    void testServerSideNotResponding() throws Exception {
+        try (final TestingSocketServer socketServer = new 
TestingSocketServer()) {
+            final String expectedVersion = "version";
+            final CompletableFuture<CoordinationResponse> responseFuture;
+            try (CollectSinkOperatorCoordinator coordinator =
+                    new CollectSinkOperatorCoordinator()) {
+                coordinator.start();
+                coordinator.handleEventFromOperator(
+                        0, 0, new 
CollectSinkAddressEvent(socketServer.getSocketAddress()));
+
+                responseFuture =
+                        coordinator.handleCoordinationRequest(
+                                
createRequestForClientGeneratedResponse(expectedVersion));
+                assertThat(responseFuture).isNotDone();
+            }
+            assertEmptyResponseGeneratedFromClient(responseFuture, 
expectedVersion);
+        }
+    }
+
+    @Test
+    void testServerSideDisconnectWithReconnect() throws Exception {
+        try (CollectSinkOperatorCoordinator coordinator = new 
CollectSinkOperatorCoordinator()) {
+            coordinator.start();
+
+            final TestingSocketServer socketServer =
+                    
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator);
+
+            final String expectedVersion = "version";
+            final CompletableFuture<CoordinationResponse> responseFuture =
                     coordinator.handleCoordinationRequest(
-                            
createRequestForClientGeneratedResponse(versionOfFailedRequest));
+                            
createRequestForClientGeneratedResponse(expectedVersion));
+
+            final CompletableFuture<SocketConnection> connectFuture = 
socketServer.connectSocket();
+            FlinkAssertions.assertThatFuture(connectFuture)
+                    .as("The connection to the client should be established.")
+                    .eventuallySucceeds();
+
+            // simulation of the server side failing while the request is 
handled on the server side

Review Comment:
   Maybe we should drop the client/server terminology because afaict we have 3 
participants; the actual client (e.g., CLI), the coordinator and the sink 
operator (== TestingSocketServer), creating 2 client/server pairs.
   It's just really confusing.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java:
##########
@@ -152,9 +176,11 @@ private CoordinationResponse handleRequestImpl(
             throw new NullPointerException("No sinkAddress available.");
         }
 
-        if (socketConnection == null) {
-            socketConnection = SocketConnection.create(socketTimeout, 
sinkAddress);
-            LOG.info("Sink connection established");
+        synchronized (connectionLock) {
+            if (running && socketConnection == null) {

Review Comment:
   otherwise we should exit eagerly instead of running into an NPE later on.



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java:
##########
@@ -105,54 +102,147 @@ void testCloseAfterRequestIsReceived() throws Exception {
     }
 
     @Test
-    void testServerFailure() throws Exception {
-        CollectSinkOperatorCoordinator coordinator =
-                new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
-        coordinator.start();
-
-        final String versionOfFailedRequest = "version3";
-        final CompletableFuture<CoordinationResponse> failedResponseFuture;
-        try (final TestingSocketServer socketServer =
-                
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
-
-            // a normal response
-            final List<Row> expectedData0 = Arrays.asList(Row.of(1, "aaa"), 
Row.of(2, "bbb"));
-            final CompletableFuture<CoordinationResponse> responseFuture0 =
-                    coordinator.handleCoordinationRequest(
-                            createRequestForServerGeneratedResponse());
-            socketServer.handleRequest(expectedData0);
-            assertResponseWithDefaultMetadataFromServer(responseFuture0, 
expectedData0);
+    void testSuccessfulResponse() throws Exception {
+        try (CollectSinkOperatorCoordinator testInstance = new 
CollectSinkOperatorCoordinator();
+                final TestingSocketServer socketServer =
+                        
TestingSocketServer.createSocketServerAndInitializeCoordinator(
+                                testInstance)) {
+            testInstance.start();
 
-            // a normal response
-            final List<Row> expectedData1 =
-                    Arrays.asList(Row.of(3, "ccc"), Row.of(4, "ddd"), 
Row.of(5, "eee"));
-            final CompletableFuture<CoordinationResponse> responseFuture1 =
-                    coordinator.handleCoordinationRequest(
+            final List<Row> expectedData = Arrays.asList(Row.of(1, "aaa"), 
Row.of(2, "bbb"));
+            final CompletableFuture<CoordinationResponse> responseFuture =
+                    testInstance.handleCoordinationRequest(
                             createRequestForServerGeneratedResponse());
-            socketServer.handleRequest(expectedData1);
-            assertResponseWithDefaultMetadataFromServer(responseFuture1, 
expectedData1);
+            assertThat(responseFuture).isNotDone();
+
+            socketServer.handleRequest(expectedData);
+
+            assertResponseWithDefaultMetadataFromServer(responseFuture, 
expectedData);
+        }
+    }
+
+    @Test
+    void testServerSideClosingTheServerSocket() throws Exception {
+        try (CollectSinkOperatorCoordinator coordinator = new 
CollectSinkOperatorCoordinator()) {
+            coordinator.start();
+
+            final CompletableFuture<CoordinationResponse> responseFuture;
+            try (final TestingSocketServer socketServer =
+                    
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
+                final String version = "version";
+                responseFuture =
+                        coordinator.handleCoordinationRequest(
+                                
createRequestForClientGeneratedResponse(version));
+                assertThat(responseFuture).isNotDone();
+
+                
FlinkAssertions.assertThatFuture(socketServer.handleRequestWithoutResponse())
+                        .eventuallySucceeds();
+            }
+            assertEmptyResponseGeneratedFromServer(responseFuture);
+        }
+    }
+
+    @Test
+    void testServerSideClosingTheAcceptingSocket() throws Exception {
+        try (CollectSinkOperatorCoordinator coordinator = new 
CollectSinkOperatorCoordinator()) {
+            coordinator.start();
+
+            try (final TestingSocketServer socketServer =
+                    
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
+                final String version = "version";
+                final CompletableFuture<CoordinationResponse> responseFuture =
+                        coordinator.handleCoordinationRequest(
+                                
createRequestForClientGeneratedResponse(version));
+                assertThat(responseFuture).isNotDone();
+
+                final CompletableFuture<List<Row>> dataFuture = new 
CompletableFuture<>();
+                final CompletableFuture<Void> serverSideProcess =
+                        socketServer.handleRequestAsync(dataFuture);
+                assertThat(serverSideProcess).isNotDone();
+
+                // wait for the connection to be established
+                socketServer.waitForConnectionToBeEstablished();
+                // in order to close the connection from the server's side
+                socketServer.closeAcceptingSocket();
+
+                assertEmptyResponseGeneratedFromClient(responseFuture, 
version);
+
+                assertThat(serverSideProcess).isNotDone();
 
-            // server closes here
-            failedResponseFuture =
+                dataFuture.complete(Collections.singletonList(Row.of(123, 
"abc")));
+
+                FlinkAssertions.assertThatFuture(serverSideProcess)
+                        .eventuallyFailsWith(ExecutionException.class)
+                        .withCauseInstanceOf(SocketException.class);

Review Comment:
   I would completely drop the last 2 assertion blocks



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java:
##########
@@ -152,9 +176,11 @@ private CoordinationResponse handleRequestImpl(
             throw new NullPointerException("No sinkAddress available.");
         }
 
-        if (socketConnection == null) {
-            socketConnection = SocketConnection.create(socketTimeout, 
sinkAddress);
-            LOG.info("Sink connection established");
+        synchronized (connectionLock) {

Review Comment:
   Wondering if this shouldn't be a larger block.
   Does the client handle it properly when the connection breaks down while it 
is reading data?
   
   What's the desired behavior here? 



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java:
##########
@@ -105,54 +102,147 @@ void testCloseAfterRequestIsReceived() throws Exception {
     }
 
     @Test
-    void testServerFailure() throws Exception {
-        CollectSinkOperatorCoordinator coordinator =
-                new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
-        coordinator.start();
-
-        final String versionOfFailedRequest = "version3";
-        final CompletableFuture<CoordinationResponse> failedResponseFuture;
-        try (final TestingSocketServer socketServer =
-                
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
-
-            // a normal response
-            final List<Row> expectedData0 = Arrays.asList(Row.of(1, "aaa"), 
Row.of(2, "bbb"));
-            final CompletableFuture<CoordinationResponse> responseFuture0 =
-                    coordinator.handleCoordinationRequest(
-                            createRequestForServerGeneratedResponse());
-            socketServer.handleRequest(expectedData0);
-            assertResponseWithDefaultMetadataFromServer(responseFuture0, 
expectedData0);
+    void testSuccessfulResponse() throws Exception {
+        try (CollectSinkOperatorCoordinator testInstance = new 
CollectSinkOperatorCoordinator();
+                final TestingSocketServer socketServer =
+                        
TestingSocketServer.createSocketServerAndInitializeCoordinator(
+                                testInstance)) {
+            testInstance.start();
 
-            // a normal response
-            final List<Row> expectedData1 =
-                    Arrays.asList(Row.of(3, "ccc"), Row.of(4, "ddd"), 
Row.of(5, "eee"));
-            final CompletableFuture<CoordinationResponse> responseFuture1 =
-                    coordinator.handleCoordinationRequest(
+            final List<Row> expectedData = Arrays.asList(Row.of(1, "aaa"), 
Row.of(2, "bbb"));
+            final CompletableFuture<CoordinationResponse> responseFuture =
+                    testInstance.handleCoordinationRequest(
                             createRequestForServerGeneratedResponse());
-            socketServer.handleRequest(expectedData1);
-            assertResponseWithDefaultMetadataFromServer(responseFuture1, 
expectedData1);
+            assertThat(responseFuture).isNotDone();
+
+            socketServer.handleRequest(expectedData);
+
+            assertResponseWithDefaultMetadataFromServer(responseFuture, 
expectedData);
+        }
+    }
+
+    @Test
+    void testServerSideClosingTheServerSocket() throws Exception {
+        try (CollectSinkOperatorCoordinator coordinator = new 
CollectSinkOperatorCoordinator()) {
+            coordinator.start();
+
+            final CompletableFuture<CoordinationResponse> responseFuture;
+            try (final TestingSocketServer socketServer =
+                    
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
+                final String version = "version";
+                responseFuture =
+                        coordinator.handleCoordinationRequest(
+                                
createRequestForClientGeneratedResponse(version));
+                assertThat(responseFuture).isNotDone();
+
+                
FlinkAssertions.assertThatFuture(socketServer.handleRequestWithoutResponse())
+                        .eventuallySucceeds();
+            }
+            assertEmptyResponseGeneratedFromServer(responseFuture);

Review Comment:
   A comment stating that the connection was now closed would go a long way 
here and other tests



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java:
##########
@@ -83,7 +104,10 @@ public void start() throws Exception {
     @Override
     public void close() throws Exception {
         LOG.info("Closing the CollectSinkOperatorCoordinator.");
+        running = false;
         this.executorService.shutdownNow();

Review Comment:
   I'm wondering whether `awaitTermination` wouldn't be a simpler option.



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java:
##########
@@ -105,54 +102,147 @@ void testCloseAfterRequestIsReceived() throws Exception {
     }
 
     @Test
-    void testServerFailure() throws Exception {
-        CollectSinkOperatorCoordinator coordinator =
-                new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
-        coordinator.start();
-
-        final String versionOfFailedRequest = "version3";
-        final CompletableFuture<CoordinationResponse> failedResponseFuture;
-        try (final TestingSocketServer socketServer =
-                
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
-
-            // a normal response
-            final List<Row> expectedData0 = Arrays.asList(Row.of(1, "aaa"), 
Row.of(2, "bbb"));
-            final CompletableFuture<CoordinationResponse> responseFuture0 =
-                    coordinator.handleCoordinationRequest(
-                            createRequestForServerGeneratedResponse());
-            socketServer.handleRequest(expectedData0);
-            assertResponseWithDefaultMetadataFromServer(responseFuture0, 
expectedData0);
+    void testSuccessfulResponse() throws Exception {
+        try (CollectSinkOperatorCoordinator testInstance = new 
CollectSinkOperatorCoordinator();
+                final TestingSocketServer socketServer =
+                        
TestingSocketServer.createSocketServerAndInitializeCoordinator(
+                                testInstance)) {
+            testInstance.start();
 
-            // a normal response
-            final List<Row> expectedData1 =
-                    Arrays.asList(Row.of(3, "ccc"), Row.of(4, "ddd"), 
Row.of(5, "eee"));
-            final CompletableFuture<CoordinationResponse> responseFuture1 =
-                    coordinator.handleCoordinationRequest(
+            final List<Row> expectedData = Arrays.asList(Row.of(1, "aaa"), 
Row.of(2, "bbb"));
+            final CompletableFuture<CoordinationResponse> responseFuture =
+                    testInstance.handleCoordinationRequest(
                             createRequestForServerGeneratedResponse());
-            socketServer.handleRequest(expectedData1);
-            assertResponseWithDefaultMetadataFromServer(responseFuture1, 
expectedData1);
+            assertThat(responseFuture).isNotDone();
+
+            socketServer.handleRequest(expectedData);
+
+            assertResponseWithDefaultMetadataFromServer(responseFuture, 
expectedData);
+        }
+    }
+
+    @Test
+    void testServerSideClosingTheServerSocket() throws Exception {
+        try (CollectSinkOperatorCoordinator coordinator = new 
CollectSinkOperatorCoordinator()) {
+            coordinator.start();
+
+            final CompletableFuture<CoordinationResponse> responseFuture;
+            try (final TestingSocketServer socketServer =
+                    
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
+                final String version = "version";
+                responseFuture =
+                        coordinator.handleCoordinationRequest(
+                                
createRequestForClientGeneratedResponse(version));
+                assertThat(responseFuture).isNotDone();
+
+                
FlinkAssertions.assertThatFuture(socketServer.handleRequestWithoutResponse())
+                        .eventuallySucceeds();
+            }
+            assertEmptyResponseGeneratedFromServer(responseFuture);
+        }
+    }
+
+    @Test
+    void testServerSideClosingTheAcceptingSocket() throws Exception {
+        try (CollectSinkOperatorCoordinator coordinator = new 
CollectSinkOperatorCoordinator()) {
+            coordinator.start();
+
+            try (final TestingSocketServer socketServer =
+                    
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
+                final String version = "version";
+                final CompletableFuture<CoordinationResponse> responseFuture =
+                        coordinator.handleCoordinationRequest(
+                                
createRequestForClientGeneratedResponse(version));
+                assertThat(responseFuture).isNotDone();
+
+                final CompletableFuture<List<Row>> dataFuture = new 
CompletableFuture<>();
+                final CompletableFuture<Void> serverSideProcess =
+                        socketServer.handleRequestAsync(dataFuture);
+                assertThat(serverSideProcess).isNotDone();
+
+                // wait for the connection to be established
+                socketServer.waitForConnectionToBeEstablished();
+                // in order to close the connection from the server's side
+                socketServer.closeAcceptingSocket();
+
+                assertEmptyResponseGeneratedFromClient(responseFuture, 
version);
+
+                assertThat(serverSideProcess).isNotDone();
 
-            // server closes here
-            failedResponseFuture =
+                dataFuture.complete(Collections.singletonList(Row.of(123, 
"abc")));
+
+                FlinkAssertions.assertThatFuture(serverSideProcess)
+                        .eventuallyFailsWith(ExecutionException.class)
+                        .withCauseInstanceOf(SocketException.class);

Review Comment:
   This seems to primarily be testing the `TestingSocketServer`.



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java:
##########
@@ -173,72 +174,192 @@ private static void assertResponse(
         }
     }
 
-    private static class ServerThread extends Thread {
+    /**
+     * {@code TestingSocketServer} simulates the server side of the {@link
+     * CollectSinkOperatorCoordinator} communication.
+     */
+    private static class TestingSocketServer implements AutoCloseable {
 
-        static final String DEFAULT_SERVER_RESPONSE_VERSION = 
"server-response-version";
+        static final String DEFAULT_SERVER_RESPONSE_VERSION = "version";
         static final int DEFAULT_SERVER_RESPONSE_OFFSET = 2;
 
-        private final LinkedList<List<Row>> data;
-        private final int closeRequestNum;
+        private final ServerSocket serverSocket;
 
-        private final ServerSocket server;
-        private boolean running;
+        private CompletableFuture<SocketConnection> connectionFuture = new 
CompletableFuture<>();
 
-        private ServerThread(List<List<Row>> data, int closeRequestNum) throws 
IOException {
-            this.data = new LinkedList<>(data);
-            this.closeRequestNum = closeRequestNum;
+        /**
+         * Creates a {@code TestingSocketServer} and connects it with the 
passed {@link
+         * CollectSinkOperatorCoordinator}.
+         */
+        public static TestingSocketServer 
createSocketServerAndInitializeCoordinator(
+                CollectSinkOperatorCoordinator coordinator) throws Exception {
+            final TestingSocketServer socketServer = new TestingSocketServer();
+            coordinator.handleEventFromOperator(
+                    0, 0, new 
CollectSinkAddressEvent(socketServer.getSocketAddress()));
 
-            this.server = new ServerSocket(0);
+            return socketServer;
+        }
+
+        /**
+         * Instantiates a {@code TestingSocketServer}. The instance is not 
listening, yet. The
+         * connection will be established with the first request being handled.
+         *
+         * @see #handleRequest(List)
+         * @see #handleRequest(String, int, List)
+         * @see #handleRequestAsync(CompletableFuture)
+         * @see #handleRequestAsync(String, int, CompletableFuture)
+         */
+        public TestingSocketServer() throws IOException {
+            this.serverSocket = new ServerSocket(0);
+        }
+
+        /** Returns the {@link InetSocketAddress} of the {@code 
TestingSocketServer}. */
+        public InetSocketAddress getSocketAddress() {
+            return new InetSocketAddress(
+                    InetAddress.getLoopbackAddress(), 
serverSocket.getLocalPort());
         }
 
         @Override
-        public void run() {
-            running = true;
-
-            int requestNum = 0;
-            Socket socket = null;
-            DataInputViewStreamWrapper inStream = null;
-            DataOutputViewStreamWrapper outStream = null;
-
-            try {
-                while (running) {
-                    if (socket == null) {
-                        socket = NetUtils.acceptWithoutTimeout(server);
-                        inStream = new 
DataInputViewStreamWrapper(socket.getInputStream());
-                        outStream = new 
DataOutputViewStreamWrapper(socket.getOutputStream());
-                    }
-
-                    // parsing the request to ensure correct format of input 
message
-                    new CollectCoordinationRequest(inStream);
-
-                    requestNum++;
-                    if (requestNum >= closeRequestNum) {
-                        // server close abruptly
-                        running = false;
-                        break;
-                    }
-
-                    // serialize generic response (only the data is relevant)
-                    new CollectCoordinationResponse(
-                                    DEFAULT_SERVER_RESPONSE_VERSION,
-                                    DEFAULT_SERVER_RESPONSE_OFFSET,
-                                    
CollectTestUtils.toBytesList(data.removeFirst(), serializer))
-                            .serialize(outStream);
-                }
-
-                socket.close();
-                server.close();
-            } catch (IOException e) {
-                // ignore
+        public void close() throws Exception {
+            closeAcceptingSocket();
+            this.serverSocket.close();
+        }
+
+        private CompletableFuture<SocketConnection> acceptSocketAsync() {
+            return CompletableFuture.supplyAsync(
+                    () -> {
+                        try {
+                            return new SocketConnection(
+                                    
NetUtils.acceptWithoutTimeout(serverSocket));
+                        } catch (IOException e) {
+                            throw new CompletionException(e);
+                        }
+                    });
+        }
+
+        /** Closes the established connection from the server side. */
+        public void closeAcceptingSocket() throws Exception {
+            if (connectionFuture.isDone()) {
+                connectionFuture.get().close();
+                connectionFuture = new CompletableFuture<>();
             }
         }
 
-        public void close() {
-            running = false;
+        /**
+         * Waits for the connection to be established between the client and 
the server. This method
+         * will block until a request is sent by the client and a {@code 
handle*} call is initiated
+         * by this instance.
+         */
+        public void waitForConnectionToBeEstablished()
+                throws ExecutionException, InterruptedException {
+            connectionFuture.get();
+        }
+
+        /**
+         * Handles a request with the given data in a synchronous fashion. The 
{@code
+         * TestingSocketServer}'s default meta information is attached to the 
response.
+         *
+         * @see #DEFAULT_SERVER_RESPONSE_VERSION
+         * @see #DEFAULT_SERVER_RESPONSE_OFFSET
+         */
+        public void handleRequest(List<Row> actualData) {
+            handleRequest(
+                    DEFAULT_SERVER_RESPONSE_VERSION, 
DEFAULT_SERVER_RESPONSE_OFFSET, actualData);
+        }
+
+        /**
+         * Handles the next request synchronously. The passed {@code 
actualData} will be forwarded
+         * to the response.
+         */
+        public void handleRequest(String actualVersion, int actualOffset, 
List<Row> actualData) {
+            handleRequestAsync(
+                            actualVersion,
+                            actualOffset,
+                            CompletableFuture.completedFuture(actualData))
+                    .join();
+        }
+
+        /**
+         * Handles a request with the given data in an asynchronous fashion 
with the {@code
+         * TestingSocketServer}'s default meta information being attached to 
the response. The
+         * returned future completes when the server side's processing 
completes.
+         *
+         * @see #DEFAULT_SERVER_RESPONSE_VERSION
+         * @see #DEFAULT_SERVER_RESPONSE_OFFSET
+         */
+        public CompletableFuture<Void> handleRequestAsync(
+                CompletableFuture<List<Row>> actualDataAsync) {
+            return handleRequestAsync(
+                    DEFAULT_SERVER_RESPONSE_VERSION,
+                    DEFAULT_SERVER_RESPONSE_OFFSET,
+                    actualDataAsync);
+        }
+
+        private CompletableFuture<SocketConnection> connectSocket() {
+            if (!connectionFuture.isDone()) {
+                FutureUtils.forward(acceptSocketAsync(), connectionFuture);

Review Comment:
   Feels weird to set up this forwarding here instead of whenever the 
`connectionFuture` is reset (in the constructor and closeAcceptingSocket)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to