XComp commented on code in PR #23296:
URL: https://github.com/apache/flink/pull/23296#discussion_r1308288979


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java:
##########
@@ -173,72 +174,192 @@ private static void assertResponse(
         }
     }
 
-    private static class ServerThread extends Thread {
+    /**
+     * {@code TestingSocketServer} simulates the server side of the {@link
+     * CollectSinkOperatorCoordinator} communication.
+     */
+    private static class TestingSocketServer implements AutoCloseable {
 
-        static final String DEFAULT_SERVER_RESPONSE_VERSION = 
"server-response-version";
+        static final String DEFAULT_SERVER_RESPONSE_VERSION = "version";
         static final int DEFAULT_SERVER_RESPONSE_OFFSET = 2;
 
-        private final LinkedList<List<Row>> data;
-        private final int closeRequestNum;
+        private final ServerSocket serverSocket;
 
-        private final ServerSocket server;
-        private boolean running;
+        private CompletableFuture<SocketConnection> connectionFuture = new 
CompletableFuture<>();
 
-        private ServerThread(List<List<Row>> data, int closeRequestNum) throws 
IOException {
-            this.data = new LinkedList<>(data);
-            this.closeRequestNum = closeRequestNum;
+        /**
+         * Creates a {@code TestingSocketServer} and connects it with the 
passed {@link
+         * CollectSinkOperatorCoordinator}.
+         */
+        public static TestingSocketServer 
createSocketServerAndInitializeCoordinator(
+                CollectSinkOperatorCoordinator coordinator) throws Exception {
+            final TestingSocketServer socketServer = new TestingSocketServer();
+            coordinator.handleEventFromOperator(
+                    0, 0, new 
CollectSinkAddressEvent(socketServer.getSocketAddress()));
 
-            this.server = new ServerSocket(0);
+            return socketServer;
+        }
+
+        /**
+         * Instantiates a {@code TestingSocketServer}. The instance is not 
listening, yet. The
+         * connection will be established with the first request being handled.
+         *
+         * @see #handleRequest(List)
+         * @see #handleRequest(String, int, List)
+         * @see #handleRequestAsync(CompletableFuture)
+         * @see #handleRequestAsync(String, int, CompletableFuture)
+         */
+        public TestingSocketServer() throws IOException {
+            this.serverSocket = new ServerSocket(0);
+        }
+
+        /** Returns the {@link InetSocketAddress} of the {@code 
TestingSocketServer}. */
+        public InetSocketAddress getSocketAddress() {
+            return new InetSocketAddress(
+                    InetAddress.getLoopbackAddress(), 
serverSocket.getLocalPort());
         }
 
         @Override
-        public void run() {
-            running = true;
-
-            int requestNum = 0;
-            Socket socket = null;
-            DataInputViewStreamWrapper inStream = null;
-            DataOutputViewStreamWrapper outStream = null;
-
-            try {
-                while (running) {
-                    if (socket == null) {
-                        socket = NetUtils.acceptWithoutTimeout(server);
-                        inStream = new 
DataInputViewStreamWrapper(socket.getInputStream());
-                        outStream = new 
DataOutputViewStreamWrapper(socket.getOutputStream());
-                    }
-
-                    // parsing the request to ensure correct format of input 
message
-                    new CollectCoordinationRequest(inStream);
-
-                    requestNum++;
-                    if (requestNum >= closeRequestNum) {
-                        // server close abruptly
-                        running = false;
-                        break;
-                    }
-
-                    // serialize generic response (only the data is relevant)
-                    new CollectCoordinationResponse(
-                                    DEFAULT_SERVER_RESPONSE_VERSION,
-                                    DEFAULT_SERVER_RESPONSE_OFFSET,
-                                    
CollectTestUtils.toBytesList(data.removeFirst(), serializer))
-                            .serialize(outStream);
-                }
-
-                socket.close();
-                server.close();
-            } catch (IOException e) {
-                // ignore
+        public void close() throws Exception {
+            closeAcceptingSocket();
+            this.serverSocket.close();
+        }
+
+        private CompletableFuture<SocketConnection> acceptSocketAsync() {
+            return CompletableFuture.supplyAsync(
+                    () -> {
+                        try {
+                            return new SocketConnection(
+                                    
NetUtils.acceptWithoutTimeout(serverSocket));
+                        } catch (IOException e) {
+                            throw new CompletionException(e);
+                        }
+                    });
+        }
+
+        /** Closes the established connection from the server side. */
+        public void closeAcceptingSocket() throws Exception {
+            if (connectionFuture.isDone()) {
+                connectionFuture.get().close();
+                connectionFuture = new CompletableFuture<>();
             }
         }
 
-        public void close() {
-            running = false;
+        /**
+         * Waits for the connection to be established between the client and 
the server. This method
+         * will block until a request is sent by the client and a {@code 
handle*} call is initiated
+         * by this instance.
+         */
+        public void waitForConnectionToBeEstablished()
+                throws ExecutionException, InterruptedException {
+            connectionFuture.get();
+        }
+
+        /**
+         * Handles a request with the given data in a synchronous fashion. The 
{@code
+         * TestingSocketServer}'s default meta information is attached to the 
response.
+         *
+         * @see #DEFAULT_SERVER_RESPONSE_VERSION
+         * @see #DEFAULT_SERVER_RESPONSE_OFFSET
+         */
+        public void handleRequest(List<Row> actualData) {
+            handleRequest(
+                    DEFAULT_SERVER_RESPONSE_VERSION, 
DEFAULT_SERVER_RESPONSE_OFFSET, actualData);
+        }
+
+        /**
+         * Handles the next request synchronously. The passed {@code 
actualData} will be forwarded
+         * to the response.
+         */
+        public void handleRequest(String actualVersion, int actualOffset, 
List<Row> actualData) {
+            handleRequestAsync(
+                            actualVersion,
+                            actualOffset,
+                            CompletableFuture.completedFuture(actualData))
+                    .join();
+        }
+
+        /**
+         * Handles a request with the given data in an asynchronous fashion 
with the {@code
+         * TestingSocketServer}'s default meta information being attached to 
the response. The
+         * returned future completes when the server side's processing 
completes.
+         *
+         * @see #DEFAULT_SERVER_RESPONSE_VERSION
+         * @see #DEFAULT_SERVER_RESPONSE_OFFSET
+         */
+        public CompletableFuture<Void> handleRequestAsync(
+                CompletableFuture<List<Row>> actualDataAsync) {
+            return handleRequestAsync(
+                    DEFAULT_SERVER_RESPONSE_VERSION,
+                    DEFAULT_SERVER_RESPONSE_OFFSET,
+                    actualDataAsync);
+        }
+
+        private CompletableFuture<SocketConnection> connectSocket() {
+            if (!connectionFuture.isDone()) {
+                FutureUtils.forward(acceptSocketAsync(), connectionFuture);

Review Comment:
   That remark, I don't get. We want to have the same future that's accessible 
all the time (that's why `new CompletableFuture` here and in the constructor) 
but we do want to trigger the socket listening later on. How would you 
implement that differently. It feels natural to me. But I might have missed 
your point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to