This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 848183c6dcd16c0c6a9766a2a4ea64c4a923040e
Author: Matthias Pohl <[email protected]>
AuthorDate: Fri Aug 25 16:37:57 2023 +0200

    [FLINK-32751][streaming] Refactors CollectSinkOperatorCoordinator to 
improve its testability
    
    Additionally, a few new test scenarios were added to 
CollectSinkOperatorCoordinatorTest and SocketConnection was introduced
---
 .../collect/CollectSinkOperatorCoordinator.java    |  37 +-
 .../api/operators/collect/SocketConnection.java    |  77 ++++
 .../CollectSinkOperatorCoordinatorTest.java        | 474 +++++++++++++++------
 3 files changed, 443 insertions(+), 145 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
index 806ed180496..13957b297a5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
@@ -17,8 +17,7 @@
 
 package org.apache.flink.streaming.api.operators.collect;
 
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 import 
org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
@@ -41,7 +40,6 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.net.InetSocketAddress;
-import java.net.Socket;
 import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
@@ -64,15 +62,19 @@ public class CollectSinkOperatorCoordinator
     private final int socketTimeout;
 
     private InetSocketAddress address;
-    private Socket socket;
-    private DataInputViewStreamWrapper inStream;
-    private DataOutputViewStreamWrapper outStream;
+
+    private SocketConnection socketConnection;
 
     private final Set<CompletableFuture<CoordinationResponse>> ongoingRequests 
=
             ConcurrentHashMap.newKeySet();
 
     private ExecutorService executorService;
 
+    @VisibleForTesting
+    CollectSinkOperatorCoordinator() {
+        this(0);
+    }
+
     public CollectSinkOperatorCoordinator(int socketTimeout) {
         this.socketTimeout = socketTimeout;
     }
@@ -157,25 +159,18 @@ public class CollectSinkOperatorCoordinator
             throw new NullPointerException("No sinkAddress available.");
         }
 
-        if (socket == null) {
-            socket = new Socket();
-            socket.setSoTimeout(socketTimeout);
-            socket.setKeepAlive(true);
-            socket.setTcpNoDelay(true);
-
-            socket.connect(sinkAddress);
-            inStream = new DataInputViewStreamWrapper(socket.getInputStream());
-            outStream = new 
DataOutputViewStreamWrapper(socket.getOutputStream());
+        if (socketConnection == null) {
+            socketConnection = SocketConnection.create(socketTimeout, 
sinkAddress);
             LOG.info("Sink connection established");
         }
 
         // send version and offset to sink server
         LOG.debug("Forwarding request to sink socket server");
-        request.serialize(outStream);
+        request.serialize(socketConnection.getDataOutputView());
 
         // fetch back serialized results
         LOG.debug("Fetching serialized result from sink socket server");
-        return new CollectCoordinationResponse(inStream);
+        return new 
CollectCoordinationResponse(socketConnection.getDataInputView());
     }
 
     private CollectCoordinationResponse 
createEmptyResponse(CollectCoordinationRequest request) {
@@ -189,14 +184,14 @@ public class CollectSinkOperatorCoordinator
     }
 
     private void closeConnection() {
-        if (socket != null) {
+        if (socketConnection != null) {
             try {
-                socket.close();
-            } catch (IOException e) {
+                socketConnection.close();
+            } catch (Exception e) {
                 LOG.warn("Failed to close sink socket server connection", e);
             }
+            socketConnection = null;
         }
-        socket = null;
     }
 
     @Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/SocketConnection.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/SocketConnection.java
new file mode 100644
index 00000000000..deaba8483fe
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/SocketConnection.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.collect;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+/**
+ * {@code SocketConnection} is a helper class to collect Socket-related fields 
that belong to the
+ * same {@link Socket} connection.
+ */
+class SocketConnection implements AutoCloseable {
+
+    private final Socket socket;
+    private final DataInputViewStreamWrapper inStream;
+    private final DataOutputViewStreamWrapper outStream;
+
+    public static SocketConnection create(int socketTimeout, InetSocketAddress 
address)
+            throws IOException {
+        final Socket newSocket = new Socket();
+        newSocket.setSoTimeout(socketTimeout);
+        newSocket.setKeepAlive(true);
+        newSocket.setTcpNoDelay(true);
+
+        newSocket.connect(address);
+
+        return new SocketConnection(newSocket);
+    }
+
+    @VisibleForTesting
+    SocketConnection(Socket connectedSocket) throws IOException {
+        Preconditions.checkArgument(connectedSocket.isConnected());
+
+        this.socket = connectedSocket;
+        this.inStream = new 
DataInputViewStreamWrapper(socket.getInputStream());
+        this.outStream = new 
DataOutputViewStreamWrapper(socket.getOutputStream());
+    }
+
+    public DataInputView getDataInputView() {
+        return this.inStream;
+    }
+
+    public DataOutputView getDataOutputView() {
+        return this.outStream;
+    }
+
+    @Override
+    public void close() throws Exception {
+        this.outStream.close();
+        this.inStream.close();
+        this.socket.close();
+    }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java
index fea9fdea344..ac89d264ed0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java
@@ -21,131 +21,231 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.testutils.FlinkAssertions;
 import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.streaming.api.operators.collect.utils.CollectTestUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.Preconditions;
 
 import org.junit.jupiter.api.Test;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
-import java.net.Socket;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link CollectSinkOperatorCoordinator}. */
 class CollectSinkOperatorCoordinatorTest {
 
-    private static final int SOCKET_TIMEOUT_MILLIS = 1000;
-
     private static final TypeSerializer<Row> serializer =
             new RowTypeInfo(BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO)
                     .createSerializer(new ExecutionConfig());
 
     @Test
     void testNoAddress() throws Exception {
-        CollectSinkOperatorCoordinator coordinator =
-                new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
-        coordinator.start();
+        try (CollectSinkOperatorCoordinator coordinator = new 
CollectSinkOperatorCoordinator()) {
+            coordinator.start();
+
+            final String requestVersion = "version";
+            final CompletableFuture<CoordinationResponse> response =
+                    coordinator.handleCoordinationRequest(
+                            
createRequestForCoordinatorGeneratedResponse(requestVersion));
+            assertEmptyResponseGeneratedFromCoordinator(response, 
requestVersion);
+        }
+    }
 
-        final String expectedVersion = "version";
-        final CompletableFuture<CoordinationResponse> response =
-                coordinator.handleCoordinationRequest(
-                        
createRequestForClientGeneratedResponse(expectedVersion));
-        assertEmptyResponseGeneratedFromClient(response, expectedVersion);
+    @Test
+    void testClosingTheCoordinatorAfterRequestWasReceivedBySinkFunction() 
throws Exception {
+        try (final TestingSinkFunction sinkFunction = new 
TestingSinkFunction()) {
+            final String expectedVersion = "version";
+            final CompletableFuture<CoordinationResponse> responseFuture;
+
+            final CollectSinkOperatorCoordinator coordinator = new 
CollectSinkOperatorCoordinator();
+            coordinator.start();
+            sinkFunction.registerSinkFunctionWith(coordinator);
+
+            final CompletableFuture<Void> sinkFunctionProcessing =
+                    sinkFunction.handleRequestWithoutResponse();
+            assertThat(sinkFunctionProcessing)
+                    .as("The SocketServer waits for the request to be sent.")
+                    .isNotDone();
+
+            responseFuture =
+                    coordinator.handleCoordinationRequest(
+                            
createRequestForCoordinatorGeneratedResponse(expectedVersion));
+            assertThat(responseFuture)
+                    .as(
+                            "The response shouldn't complete because the 
SinkFunction doesn't send any response.")
+                    .isNotDone();
+
+            FlinkAssertions.assertThatFuture(sinkFunctionProcessing)
+                    .as(
+                            "The SocketServer should eventually have handled 
the request without sending a response back.")
+                    .eventuallySucceeds();
+
+            // closing the coordinator should have resulted in an empty 
response being returned
+            coordinator.close();
+            assertEmptyResponseGeneratedFromCoordinator(responseFuture, 
expectedVersion);
+        }
+    }
 
-        coordinator.close();
+    @Test
+    void testSuccessfulResponse() throws Exception {
+        try (CollectSinkOperatorCoordinator coordinator = new 
CollectSinkOperatorCoordinator();
+                final TestingSinkFunction sinkFunction =
+                        
TestingSinkFunction.createSinkFunctionAndInitializeCoordinator(
+                                coordinator)) {
+            coordinator.start();
+
+            final List<Row> expectedData = Arrays.asList(Row.of(1, "aaa"), 
Row.of(2, "bbb"));
+            final CompletableFuture<CoordinationResponse> responseFuture =
+                    coordinator.handleCoordinationRequest(
+                            createRequestForSinkFunctionGeneratedResponse());
+            assertThat(responseFuture).isNotDone();
+
+            sinkFunction.handleRequest(expectedData);
+
+            assertResponseWithDefaultMetadataFromSinkFunction(responseFuture, 
expectedData);
+        }
     }
 
     @Test
-    void testServerFailure() throws Exception {
-        CollectSinkOperatorCoordinator coordinator =
-                new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
-        coordinator.start();
-
-        List<List<Row>> expected =
-                Arrays.asList(
-                        Arrays.asList(Row.of(1, "aaa"), Row.of(2, "bbb")),
-                        Arrays.asList(Row.of(3, "ccc"), Row.of(4, "ddd"), 
Row.of(5, "eee")));
-        ServerThread server = new ServerThread(expected, 3);
-        server.start();
-        coordinator.handleEventFromOperator(
-                0, 0, new CollectSinkAddressEvent(server.getServerAddress()));
-
-        // a normal response
-        CompletableFuture<CoordinationResponse> response =
-                
coordinator.handleCoordinationRequest(createRequestForServerGeneratedResponse());
-        assertResponseWithDefaultMetadataFromServer(response, expected.get(0));
-
-        // a normal response
-        response = 
coordinator.handleCoordinationRequest(createRequestForServerGeneratedResponse());
-        assertResponseWithDefaultMetadataFromServer(response, expected.get(1));
-
-        // server closes here
-        final String expectedVersion = "version3";
-        CompletableFuture<CoordinationResponse> responseFuture =
-                coordinator.handleCoordinationRequest(
-                        
createRequestForClientGeneratedResponse(expectedVersion));
-        coordinator.executionAttemptFailed(0, 0, null);
-
-        // new server comes
-        expected = Collections.singletonList(Arrays.asList(Row.of(6, "fff"), 
Row.of(7, "ggg")));
-        server = new ServerThread(expected, 2);
-        server.start();
-        coordinator.handleEventFromOperator(
-                0, 0, new CollectSinkAddressEvent(server.getServerAddress()));
-
-        // check failed request
-        assertEmptyResponseGeneratedFromClient(responseFuture, 
expectedVersion);
-
-        // a normal response
-        response = 
coordinator.handleCoordinationRequest(createRequestForServerGeneratedResponse());
-        assertResponseWithDefaultMetadataFromServer(response, expected.get(0));
-
-        server.close();
-        coordinator.close();
+    void testClosingTheServerSocketOfTheSinkFunction() throws Exception {
+        try (CollectSinkOperatorCoordinator coordinator = new 
CollectSinkOperatorCoordinator()) {
+            coordinator.start();
+
+            final TestingSinkFunction sinkFunction =
+                    
TestingSinkFunction.createSinkFunctionAndInitializeCoordinator(coordinator);
+            final String version = "version";
+            final CompletableFuture<CoordinationResponse> responseFuture =
+                    coordinator.handleCoordinationRequest(
+                            
createRequestForCoordinatorGeneratedResponse(version));
+            assertThat(responseFuture).isNotDone();
+
+            
FlinkAssertions.assertThatFuture(sinkFunction.handleRequestWithoutResponse())
+                    .eventuallySucceeds();
+
+            // closing the ServerSocket on the SinkFunction side should result 
in an empty response
+            sinkFunction.close();
+            assertEmptyResponseGeneratedFromSinkFunction(responseFuture);
+        }
     }
 
-    private static CoordinationRequest 
createRequestForServerGeneratedResponse() {
+    @Test
+    void testClosingTheListeningSocketInTheSinkFunction() throws Exception {
+        try (CollectSinkOperatorCoordinator coordinator = new 
CollectSinkOperatorCoordinator()) {
+            coordinator.start();
+
+            try (final TestingSinkFunction sinkFunction =
+                    
TestingSinkFunction.createSinkFunctionAndInitializeCoordinator(coordinator)) {
+                final String version = "version";
+                final CompletableFuture<CoordinationResponse> responseFuture =
+                        coordinator.handleCoordinationRequest(
+                                
createRequestForCoordinatorGeneratedResponse(version));
+                assertThat(responseFuture).isNotDone();
+
+                sinkFunction.handleRequestWithoutResponse();
+
+                // wait for the connection to be established
+                sinkFunction.waitForConnectionToBeEstablished();
+                // in order to close the connection from the SinkFunction's 
side
+                sinkFunction.closeAcceptingSocket();
+
+                // closing the accepting socket of the SinkFunction should 
result in an empty
+                // response on the coordinator side
+                assertEmptyResponseGeneratedFromCoordinator(responseFuture, 
version);
+            }
+        }
+    }
+
+    @Test
+    void testReconnectAfterSinkFunctionSocketDisconnect() throws Exception {
+        try (CollectSinkOperatorCoordinator coordinator = new 
CollectSinkOperatorCoordinator()) {
+            coordinator.start();
+
+            final TestingSinkFunction sinkFunction =
+                    
TestingSinkFunction.createSinkFunctionAndInitializeCoordinator(coordinator);
+
+            final String expectedVersion = "version";
+            final CompletableFuture<CoordinationResponse> responseFuture =
+                    coordinator.handleCoordinationRequest(
+                            
createRequestForCoordinatorGeneratedResponse(expectedVersion));
+
+            sinkFunction.waitForConnectionToBeEstablished();
+
+            // simulation a failure while the request is handled by the 
SinkFunction
+            coordinator.executionAttemptFailed(0, 0, null);
+            sinkFunction.closeAcceptingSocket();
+
+            // the client generates an empty response after the accepting 
socket was closed within
+            // the sinkFunction
+            assertEmptyResponseGeneratedFromCoordinator(responseFuture, 
expectedVersion);
+
+            // the coordinator returns empty responses as long as there is no 
new connection
+            // established
+            final String anotherVersion = "another-version";
+            assertEmptyResponseGeneratedFromCoordinator(
+                    coordinator.handleCoordinationRequest(
+                            
createRequestForCoordinatorGeneratedResponse(anotherVersion)),
+                    anotherVersion);
+
+            // the next request is properly handled
+            final TestingSinkFunction anotherSinkFunction =
+                    
TestingSinkFunction.createSinkFunctionAndInitializeCoordinator(coordinator);
+            final CompletableFuture<CoordinationResponse> 
anotherResponseFuture =
+                    coordinator.handleCoordinationRequest(
+                            createRequestForSinkFunctionGeneratedResponse());
+            final List<Row> expectedData = Arrays.asList(Row.of(1, "aaa"), 
Row.of(2, "bbb"));
+            anotherSinkFunction.handleRequest(expectedData);
+
+            
assertResponseWithDefaultMetadataFromSinkFunction(anotherResponseFuture, 
expectedData);
+            anotherSinkFunction.close();
+        }
+    }
+
+    private static CoordinationRequest 
createRequestForSinkFunctionGeneratedResponse() {
         final String unusedVersion = "random-version";
-        return createRequestForClientGeneratedResponse(unusedVersion);
+        return createRequestForCoordinatorGeneratedResponse(unusedVersion);
     }
 
-    private static CoordinationRequest 
createRequestForClientGeneratedResponse(String version) {
+    private static CoordinationRequest 
createRequestForCoordinatorGeneratedResponse(
+            String version) {
         final int unusedOffset = 123;
         return new CollectCoordinationRequest(version, unusedOffset);
     }
 
-    private static void assertEmptyResponseGeneratedFromServer(
+    private static void assertEmptyResponseGeneratedFromSinkFunction(
             CompletableFuture<CoordinationResponse> responseFuture) throws 
Exception {
-        assertEmptyResponseGeneratedFromClient(
-                responseFuture, ServerThread.DEFAULT_SERVER_RESPONSE_VERSION);
+        assertEmptyResponseGeneratedFromCoordinator(
+                responseFuture, 
TestingSinkFunction.DEFAULT_SINK_FUNCTION_RESPONSE_VERSION);
     }
 
-    private static void assertEmptyResponseGeneratedFromClient(
+    private static void assertEmptyResponseGeneratedFromCoordinator(
             CompletableFuture<CoordinationResponse> responseFuture, String 
expectedVersion)
             throws Exception {
         assertResponse(responseFuture, expectedVersion, -1, 
Collections.emptyList());
     }
 
-    private static void assertResponseWithDefaultMetadataFromServer(
+    private static void assertResponseWithDefaultMetadataFromSinkFunction(
             CompletableFuture<CoordinationResponse> responseFuture, List<Row> 
expectedData)
             throws Exception {
         assertResponse(
                 responseFuture,
-                ServerThread.DEFAULT_SERVER_RESPONSE_VERSION,
-                ServerThread.DEFAULT_SERVER_RESPONSE_OFFSET,
+                TestingSinkFunction.DEFAULT_SINK_FUNCTION_RESPONSE_VERSION,
+                TestingSinkFunction.DEFAULT_SINK_FUNCTION_RESPONSE_OFFSET,
                 expectedData);
     }
 
@@ -173,72 +273,198 @@ class CollectSinkOperatorCoordinatorTest {
         }
     }
 
-    private static class ServerThread extends Thread {
+    /**
+     * {@code TestingSinkFunction} simulates the {@link CollectSinkFunction} 
side of the {@link
+     * CollectSinkOperatorCoordinator} communication.
+     */
+    private static class TestingSinkFunction implements AutoCloseable {
+
+        static final String DEFAULT_SINK_FUNCTION_RESPONSE_VERSION = "version";
+        static final int DEFAULT_SINK_FUNCTION_RESPONSE_OFFSET = 2;
 
-        static final String DEFAULT_SERVER_RESPONSE_VERSION = 
"server-response-version";
-        static final int DEFAULT_SERVER_RESPONSE_OFFSET = 2;
+        private final ServerSocket serverSocket;
 
-        private final LinkedList<List<Row>> data;
-        private final int closeRequestNum;
+        // null indicates that the socket was closed
+        @Nullable private CompletableFuture<SocketConnection> connectionFuture;
 
-        private final ServerSocket server;
-        private boolean running;
+        /**
+         * Creates a {@code TestingSinkFunction} and connects it with the 
passed {@link
+         * CollectSinkOperatorCoordinator}.
+         */
+        public static TestingSinkFunction 
createSinkFunctionAndInitializeCoordinator(
+                CollectSinkOperatorCoordinator coordinator) throws Exception {
+            final TestingSinkFunction socketServer = new TestingSinkFunction();
+            socketServer.registerSinkFunctionWith(coordinator);
 
-        private ServerThread(List<List<Row>> data, int closeRequestNum) throws 
IOException {
-            this.data = new LinkedList<>(data);
-            this.closeRequestNum = closeRequestNum;
+            return socketServer;
+        }
+
+        /**
+         * Creates a {@code TestingSinKFunction} that doesn't listen on the 
configured. Sending
+         * requests to this function will block forever when trying to connect 
to it.
+         */
+        public static TestingSinkFunction 
createTestingSinkFunctionWithoutConnection()
+                throws IOException {
+            return new TestingSinkFunction(ignoredServerSocket -> null);
+        }
 
-            this.server = new ServerSocket(0);
+        /**
+         * Instantiates a {@code TestingSinkFunction} that starts listening 
for incoming connections
+         * right-away.
+         */
+        public TestingSinkFunction() throws IOException {
+            this(TestingSinkFunction::acceptSocketAsync);
+        }
+
+        private TestingSinkFunction(
+                Function<ServerSocket, CompletableFuture<SocketConnection>> 
socketListenerFactory)
+                throws IOException {
+            this.serverSocket = new ServerSocket(0);
+
+            this.connectionFuture = socketListenerFactory.apply(serverSocket);
+        }
+
+        private static CompletableFuture<SocketConnection> acceptSocketAsync(
+                ServerSocket serverSocket) {
+            return CompletableFuture.supplyAsync(
+                    () -> {
+                        try {
+                            return new SocketConnection(
+                                    
NetUtils.acceptWithoutTimeout(serverSocket));
+                        } catch (IOException e) {
+                            throw new CompletionException(e);
+                        }
+                    });
+        }
+
+        private CompletableFuture<SocketConnection> getConnectionFuture() {
+            Preconditions.checkState(
+                    connectionFuture != null,
+                    "The accepting Socket is already closed. The calling 
operation is not possible anymore.");
+
+            return connectionFuture;
+        }
+
+        /** Returns the {@link InetSocketAddress} of the {@code 
TestingSinkFunction}. */
+        private InetSocketAddress getSocketAddress() {
+            return new InetSocketAddress(
+                    InetAddress.getLoopbackAddress(), 
serverSocket.getLocalPort());
+        }
+
+        /** Registers the {@code TestingSinkFunction} with the passed {@code 
coordinator}. */
+        public void registerSinkFunctionWith(CollectSinkOperatorCoordinator 
coordinator)
+                throws Exception {
+            coordinator.handleEventFromOperator(
+                    0, 0, new CollectSinkAddressEvent(getSocketAddress()));
         }
 
         @Override
-        public void run() {
-            running = true;
-
-            int requestNum = 0;
-            Socket socket = null;
-            DataInputViewStreamWrapper inStream = null;
-            DataOutputViewStreamWrapper outStream = null;
-
-            try {
-                while (running) {
-                    if (socket == null) {
-                        socket = NetUtils.acceptWithoutTimeout(server);
-                        inStream = new 
DataInputViewStreamWrapper(socket.getInputStream());
-                        outStream = new 
DataOutputViewStreamWrapper(socket.getOutputStream());
-                    }
-
-                    // parsing the request to ensure correct format of input 
message
-                    new CollectCoordinationRequest(inStream);
-
-                    requestNum++;
-                    if (requestNum >= closeRequestNum) {
-                        // server close abruptly
-                        running = false;
-                        break;
-                    }
-
-                    // serialize generic response (only the data is relevant)
-                    new CollectCoordinationResponse(
-                                    DEFAULT_SERVER_RESPONSE_VERSION,
-                                    DEFAULT_SERVER_RESPONSE_OFFSET,
-                                    
CollectTestUtils.toBytesList(data.removeFirst(), serializer))
-                            .serialize(outStream);
-                }
-
-                socket.close();
-                server.close();
-            } catch (IOException e) {
-                // ignore
+        public void close() throws Exception {
+            closeAcceptingSocket();
+            this.serverSocket.close();
+        }
+
+        /** Closes the established connection from the {@code SinkFunction}'s 
side. */
+        public void closeAcceptingSocket() throws Exception {
+            if (connectionFuture != null) {
+                connectionFuture.get().close();
+                connectionFuture = null;
             }
         }
 
-        public void close() {
-            running = false;
+        /**
+         * Waits for the connection to be established between the {@code 
coordinator} and the {@code
+         * SinkFunction}. This method will block until a request is sent by 
the coordinator and a
+         * {@code handle*} call is initiated by this instance.
+         */
+        public void waitForConnectionToBeEstablished()
+                throws ExecutionException, InterruptedException {
+            getConnectionFuture().get();
+        }
+
+        /**
+         * Handles a request with the given data in a synchronous fashion. The 
{@code
+         * TestingSinkFunction}'s default meta information is attached to the 
response.
+         *
+         * @see #DEFAULT_SINK_FUNCTION_RESPONSE_VERSION
+         * @see #DEFAULT_SINK_FUNCTION_RESPONSE_OFFSET
+         */
+        public void handleRequest(List<Row> actualData) {
+            handleRequest(
+                    DEFAULT_SINK_FUNCTION_RESPONSE_VERSION,
+                    DEFAULT_SINK_FUNCTION_RESPONSE_OFFSET,
+                    actualData);
+        }
+
+        /**
+         * Handles the next request synchronously. The passed {@code 
actualData} will be forwarded
+         * to the response.
+         */
+        public void handleRequest(String actualVersion, int actualOffset, 
List<Row> actualData) {
+            handleRequestAsync(
+                            actualVersion,
+                            actualOffset,
+                            CompletableFuture.completedFuture(actualData))
+                    .join();
+        }
+
+        /**
+         * Waits for the socket connection to be established and handles an 
incoming request. This
+         * call will block until a request is sent that can be handled.
+         */
+        public CompletableFuture<Void> handleRequestWithoutResponse() {
+            return 
internalConnectWithRequestHandlingAsync().thenApply(socketConnection -> null);
+        }
+
+        private CompletableFuture<SocketConnection> 
internalConnectWithRequestHandlingAsync() {
+            return getConnectionFuture()
+                    .thenApply(
+                            socketConnection -> {
+                                try {
+                                    // parsing the request to ensure correct 
format of input message
+                                    new CollectCoordinationRequest(
+                                            
socketConnection.getDataInputView());
+                                } catch (IOException e) {
+                                    throw new CompletionException(e);
+                                }
+
+                                return socketConnection;
+                            });
         }
 
-        public InetSocketAddress getServerAddress() {
-            return new InetSocketAddress(InetAddress.getLoopbackAddress(), 
server.getLocalPort());
+        /**
+         * Handles the request by sending a {@link 
CollectCoordinationResponse} via the socket.
+         *
+         * @return {@code CompleteFuture} that indicates whether the 
asynchronous processing on the
+         *     {@code SinkFunction}'s side finished.
+         */
+        public CompletableFuture<Void> handleRequestAsync(
+                String actualVersion,
+                int actualOffset,
+                CompletableFuture<List<Row>> actualDataAsync) {
+            return internalConnectWithRequestHandlingAsync()
+                    .thenCombineAsync(
+                            actualDataAsync,
+                            (socketConnection, data) -> {
+                                if (socketConnection == null) {
+                                    throw new CompletionException(
+                                            new IllegalStateException(
+                                                    "No SocketConnection 
established."));
+                                }
+
+                                try {
+                                    // serialize generic response (only the 
data is relevant)
+                                    new CollectCoordinationResponse(
+                                                    actualVersion,
+                                                    actualOffset,
+                                                    
CollectTestUtils.toBytesList(data, serializer))
+                                            
.serialize(socketConnection.getDataOutputView());
+                                } catch (IOException e) {
+                                    throw new CompletionException(e);
+                                }
+
+                                return null;
+                            });
         }
     }
 }

Reply via email to