This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git
commit abb5d7583232d72375019fb673334e72890f84e0 Author: Matthias Pohl <[email protected]> AuthorDate: Fri Aug 25 16:37:57 2023 +0200 [FLINK-32751][streaming] Refactors CollectSinkOperatorCoordinator to improve its testability Additionally, a few new test scenarios were added to CollectSinkOperatorCoordinatorTest and SocketConnection was introduced --- .../collect/CollectSinkOperatorCoordinator.java | 37 +- .../api/operators/collect/SocketConnection.java | 77 ++++ .../CollectSinkOperatorCoordinatorTest.java | 473 +++++++++++++++------ 3 files changed, 442 insertions(+), 145 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java index 806ed180496..13957b297a5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java @@ -17,8 +17,7 @@ package org.apache.flink.streaming.api.operators.collect; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler; @@ -41,7 +40,6 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.net.InetSocketAddress; -import java.net.Socket; import java.util.Collections; import java.util.Set; import java.util.concurrent.CancellationException; @@ -64,15 +62,19 @@ public class CollectSinkOperatorCoordinator private final int socketTimeout; private InetSocketAddress address; - private Socket socket; - private DataInputViewStreamWrapper inStream; - private DataOutputViewStreamWrapper outStream; + + private SocketConnection socketConnection; private final Set<CompletableFuture<CoordinationResponse>> ongoingRequests = ConcurrentHashMap.newKeySet(); private ExecutorService executorService; + @VisibleForTesting + CollectSinkOperatorCoordinator() { + this(0); + } + public CollectSinkOperatorCoordinator(int socketTimeout) { this.socketTimeout = socketTimeout; } @@ -157,25 +159,18 @@ public class CollectSinkOperatorCoordinator throw new NullPointerException("No sinkAddress available."); } - if (socket == null) { - socket = new Socket(); - socket.setSoTimeout(socketTimeout); - socket.setKeepAlive(true); - socket.setTcpNoDelay(true); - - socket.connect(sinkAddress); - inStream = new DataInputViewStreamWrapper(socket.getInputStream()); - outStream = new DataOutputViewStreamWrapper(socket.getOutputStream()); + if (socketConnection == null) { + socketConnection = SocketConnection.create(socketTimeout, sinkAddress); LOG.info("Sink connection established"); } // send version and offset to sink server LOG.debug("Forwarding request to sink socket server"); - request.serialize(outStream); + request.serialize(socketConnection.getDataOutputView()); // fetch back serialized results LOG.debug("Fetching serialized result from sink socket server"); - return new CollectCoordinationResponse(inStream); + return new CollectCoordinationResponse(socketConnection.getDataInputView()); } private CollectCoordinationResponse createEmptyResponse(CollectCoordinationRequest request) { @@ -189,14 +184,14 @@ public class CollectSinkOperatorCoordinator } private void closeConnection() { - if (socket != null) { + if (socketConnection != null) { try { - socket.close(); - } catch (IOException e) { + socketConnection.close(); + } catch (Exception e) { LOG.warn("Failed to close sink socket server connection", e); } + socketConnection = null; } - socket = null; } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/SocketConnection.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/SocketConnection.java new file mode 100644 index 00000000000..deaba8483fe --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/SocketConnection.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.collect; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; + +/** + * {@code SocketConnection} is a helper class to collect Socket-related fields that belong to the + * same {@link Socket} connection. + */ +class SocketConnection implements AutoCloseable { + + private final Socket socket; + private final DataInputViewStreamWrapper inStream; + private final DataOutputViewStreamWrapper outStream; + + public static SocketConnection create(int socketTimeout, InetSocketAddress address) + throws IOException { + final Socket newSocket = new Socket(); + newSocket.setSoTimeout(socketTimeout); + newSocket.setKeepAlive(true); + newSocket.setTcpNoDelay(true); + + newSocket.connect(address); + + return new SocketConnection(newSocket); + } + + @VisibleForTesting + SocketConnection(Socket connectedSocket) throws IOException { + Preconditions.checkArgument(connectedSocket.isConnected()); + + this.socket = connectedSocket; + this.inStream = new DataInputViewStreamWrapper(socket.getInputStream()); + this.outStream = new DataOutputViewStreamWrapper(socket.getOutputStream()); + } + + public DataInputView getDataInputView() { + return this.inStream; + } + + public DataOutputView getDataOutputView() { + return this.outStream; + } + + @Override + public void close() throws Exception { + this.outStream.close(); + this.inStream.close(); + this.socket.close(); + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java index fea9fdea344..58fbe2994a4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java @@ -21,131 +21,230 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.streaming.api.operators.collect.utils.CollectTestUtils; import org.apache.flink.types.Row; import org.apache.flink.util.NetUtils; +import org.apache.flink.util.Preconditions; import org.junit.jupiter.api.Test; +import javax.annotation.Nullable; + import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; -import java.net.Socket; import java.util.Arrays; import java.util.Collections; -import java.util.LinkedList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; /** Tests for {@link CollectSinkOperatorCoordinator}. */ class CollectSinkOperatorCoordinatorTest { - private static final int SOCKET_TIMEOUT_MILLIS = 1000; - private static final TypeSerializer<Row> serializer = new RowTypeInfo(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO) .createSerializer(new ExecutionConfig()); @Test void testNoAddress() throws Exception { - CollectSinkOperatorCoordinator coordinator = - new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS); - coordinator.start(); + try (CollectSinkOperatorCoordinator coordinator = new CollectSinkOperatorCoordinator()) { + coordinator.start(); + + final String requestVersion = "version"; + final CompletableFuture<CoordinationResponse> response = + coordinator.handleCoordinationRequest( + createRequestForCoordinatorGeneratedResponse(requestVersion)); + assertEmptyResponseGeneratedFromCoordinator(response, requestVersion); + } + } - final String expectedVersion = "version"; - final CompletableFuture<CoordinationResponse> response = - coordinator.handleCoordinationRequest( - createRequestForClientGeneratedResponse(expectedVersion)); - assertEmptyResponseGeneratedFromClient(response, expectedVersion); + @Test + void testClosingTheCoordinatorAfterRequestWasReceivedBySinkFunction() throws Exception { + try (final TestingSinkFunction sinkFunction = new TestingSinkFunction()) { + final String expectedVersion = "version"; + final CompletableFuture<CoordinationResponse> responseFuture; + + final CollectSinkOperatorCoordinator coordinator = new CollectSinkOperatorCoordinator(); + coordinator.start(); + sinkFunction.registerSinkFunctionWith(coordinator); + + final CompletableFuture<Void> sinkFunctionProcessing = + sinkFunction.handleRequestWithoutResponse(); + assertThat(sinkFunctionProcessing) + .as("The SocketServer waits for the request to be sent.") + .isNotDone(); + + responseFuture = + coordinator.handleCoordinationRequest( + createRequestForCoordinatorGeneratedResponse(expectedVersion)); + assertThat(responseFuture) + .as( + "The response shouldn't complete because the SinkFunction doesn't send any response.") + .isNotDone(); + + assertThatNoException() + .as( + "The SocketServer should eventually have handled the request without sending a response back.") + .isThrownBy(sinkFunctionProcessing::get); + + // closing the coordinator should have resulted in an empty response being returned + coordinator.close(); + assertEmptyResponseGeneratedFromCoordinator(responseFuture, expectedVersion); + } + } - coordinator.close(); + @Test + void testSuccessfulResponse() throws Exception { + try (CollectSinkOperatorCoordinator coordinator = new CollectSinkOperatorCoordinator(); + final TestingSinkFunction sinkFunction = + TestingSinkFunction.createSinkFunctionAndInitializeCoordinator( + coordinator)) { + coordinator.start(); + + final List<Row> expectedData = Arrays.asList(Row.of(1, "aaa"), Row.of(2, "bbb")); + final CompletableFuture<CoordinationResponse> responseFuture = + coordinator.handleCoordinationRequest( + createRequestForSinkFunctionGeneratedResponse()); + assertThat(responseFuture).isNotDone(); + + sinkFunction.handleRequest(expectedData); + + assertResponseWithDefaultMetadataFromSinkFunction(responseFuture, expectedData); + } } @Test - void testServerFailure() throws Exception { - CollectSinkOperatorCoordinator coordinator = - new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS); - coordinator.start(); - - List<List<Row>> expected = - Arrays.asList( - Arrays.asList(Row.of(1, "aaa"), Row.of(2, "bbb")), - Arrays.asList(Row.of(3, "ccc"), Row.of(4, "ddd"), Row.of(5, "eee"))); - ServerThread server = new ServerThread(expected, 3); - server.start(); - coordinator.handleEventFromOperator( - 0, 0, new CollectSinkAddressEvent(server.getServerAddress())); - - // a normal response - CompletableFuture<CoordinationResponse> response = - coordinator.handleCoordinationRequest(createRequestForServerGeneratedResponse()); - assertResponseWithDefaultMetadataFromServer(response, expected.get(0)); - - // a normal response - response = coordinator.handleCoordinationRequest(createRequestForServerGeneratedResponse()); - assertResponseWithDefaultMetadataFromServer(response, expected.get(1)); - - // server closes here - final String expectedVersion = "version3"; - CompletableFuture<CoordinationResponse> responseFuture = - coordinator.handleCoordinationRequest( - createRequestForClientGeneratedResponse(expectedVersion)); - coordinator.executionAttemptFailed(0, 0, null); - - // new server comes - expected = Collections.singletonList(Arrays.asList(Row.of(6, "fff"), Row.of(7, "ggg"))); - server = new ServerThread(expected, 2); - server.start(); - coordinator.handleEventFromOperator( - 0, 0, new CollectSinkAddressEvent(server.getServerAddress())); - - // check failed request - assertEmptyResponseGeneratedFromClient(responseFuture, expectedVersion); - - // a normal response - response = coordinator.handleCoordinationRequest(createRequestForServerGeneratedResponse()); - assertResponseWithDefaultMetadataFromServer(response, expected.get(0)); - - server.close(); - coordinator.close(); + void testClosingTheServerSocketOfTheSinkFunction() throws Exception { + try (CollectSinkOperatorCoordinator coordinator = new CollectSinkOperatorCoordinator()) { + coordinator.start(); + + final TestingSinkFunction sinkFunction = + TestingSinkFunction.createSinkFunctionAndInitializeCoordinator(coordinator); + final String version = "version"; + final CompletableFuture<CoordinationResponse> responseFuture = + coordinator.handleCoordinationRequest( + createRequestForCoordinatorGeneratedResponse(version)); + assertThat(responseFuture).isNotDone(); + + sinkFunction.handleRequestWithoutResponse().join(); + + // closing the ServerSocket on the SinkFunction side should result in an empty response + sinkFunction.close(); + assertEmptyResponseGeneratedFromSinkFunction(responseFuture); + } } - private static CoordinationRequest createRequestForServerGeneratedResponse() { + @Test + void testClosingTheListeningSocketInTheSinkFunction() throws Exception { + try (CollectSinkOperatorCoordinator coordinator = new CollectSinkOperatorCoordinator()) { + coordinator.start(); + + try (final TestingSinkFunction sinkFunction = + TestingSinkFunction.createSinkFunctionAndInitializeCoordinator(coordinator)) { + final String version = "version"; + final CompletableFuture<CoordinationResponse> responseFuture = + coordinator.handleCoordinationRequest( + createRequestForCoordinatorGeneratedResponse(version)); + assertThat(responseFuture).isNotDone(); + + sinkFunction.handleRequestWithoutResponse(); + + // wait for the connection to be established + sinkFunction.waitForConnectionToBeEstablished(); + // in order to close the connection from the SinkFunction's side + sinkFunction.closeAcceptingSocket(); + + // closing the accepting socket of the SinkFunction should result in an empty + // response on the coordinator side + assertEmptyResponseGeneratedFromCoordinator(responseFuture, version); + } + } + } + + @Test + void testReconnectAfterSinkFunctionSocketDisconnect() throws Exception { + try (CollectSinkOperatorCoordinator coordinator = new CollectSinkOperatorCoordinator()) { + coordinator.start(); + + final TestingSinkFunction sinkFunction = + TestingSinkFunction.createSinkFunctionAndInitializeCoordinator(coordinator); + + final String expectedVersion = "version"; + final CompletableFuture<CoordinationResponse> responseFuture = + coordinator.handleCoordinationRequest( + createRequestForCoordinatorGeneratedResponse(expectedVersion)); + + sinkFunction.waitForConnectionToBeEstablished(); + + // simulation a failure while the request is handled by the SinkFunction + coordinator.executionAttemptFailed(0, 0, null); + sinkFunction.closeAcceptingSocket(); + + // the client generates an empty response after the accepting socket was closed within + // the sinkFunction + assertEmptyResponseGeneratedFromCoordinator(responseFuture, expectedVersion); + + // the coordinator returns empty responses as long as there is no new connection + // established + final String anotherVersion = "another-version"; + assertEmptyResponseGeneratedFromCoordinator( + coordinator.handleCoordinationRequest( + createRequestForCoordinatorGeneratedResponse(anotherVersion)), + anotherVersion); + + // the next request is properly handled + final TestingSinkFunction anotherSinkFunction = + TestingSinkFunction.createSinkFunctionAndInitializeCoordinator(coordinator); + final CompletableFuture<CoordinationResponse> anotherResponseFuture = + coordinator.handleCoordinationRequest( + createRequestForSinkFunctionGeneratedResponse()); + final List<Row> expectedData = Arrays.asList(Row.of(1, "aaa"), Row.of(2, "bbb")); + anotherSinkFunction.handleRequest(expectedData); + + assertResponseWithDefaultMetadataFromSinkFunction(anotherResponseFuture, expectedData); + anotherSinkFunction.close(); + } + } + + private static CoordinationRequest createRequestForSinkFunctionGeneratedResponse() { final String unusedVersion = "random-version"; - return createRequestForClientGeneratedResponse(unusedVersion); + return createRequestForCoordinatorGeneratedResponse(unusedVersion); } - private static CoordinationRequest createRequestForClientGeneratedResponse(String version) { + private static CoordinationRequest createRequestForCoordinatorGeneratedResponse( + String version) { final int unusedOffset = 123; return new CollectCoordinationRequest(version, unusedOffset); } - private static void assertEmptyResponseGeneratedFromServer( + private static void assertEmptyResponseGeneratedFromSinkFunction( CompletableFuture<CoordinationResponse> responseFuture) throws Exception { - assertEmptyResponseGeneratedFromClient( - responseFuture, ServerThread.DEFAULT_SERVER_RESPONSE_VERSION); + assertEmptyResponseGeneratedFromCoordinator( + responseFuture, TestingSinkFunction.DEFAULT_SINK_FUNCTION_RESPONSE_VERSION); } - private static void assertEmptyResponseGeneratedFromClient( + private static void assertEmptyResponseGeneratedFromCoordinator( CompletableFuture<CoordinationResponse> responseFuture, String expectedVersion) throws Exception { assertResponse(responseFuture, expectedVersion, -1, Collections.emptyList()); } - private static void assertResponseWithDefaultMetadataFromServer( + private static void assertResponseWithDefaultMetadataFromSinkFunction( CompletableFuture<CoordinationResponse> responseFuture, List<Row> expectedData) throws Exception { assertResponse( responseFuture, - ServerThread.DEFAULT_SERVER_RESPONSE_VERSION, - ServerThread.DEFAULT_SERVER_RESPONSE_OFFSET, + TestingSinkFunction.DEFAULT_SINK_FUNCTION_RESPONSE_VERSION, + TestingSinkFunction.DEFAULT_SINK_FUNCTION_RESPONSE_OFFSET, expectedData); } @@ -173,72 +272,198 @@ class CollectSinkOperatorCoordinatorTest { } } - private static class ServerThread extends Thread { + /** + * {@code TestingSinkFunction} simulates the {@link CollectSinkFunction} side of the {@link + * CollectSinkOperatorCoordinator} communication. + */ + private static class TestingSinkFunction implements AutoCloseable { + + static final String DEFAULT_SINK_FUNCTION_RESPONSE_VERSION = "version"; + static final int DEFAULT_SINK_FUNCTION_RESPONSE_OFFSET = 2; - static final String DEFAULT_SERVER_RESPONSE_VERSION = "server-response-version"; - static final int DEFAULT_SERVER_RESPONSE_OFFSET = 2; + private final ServerSocket serverSocket; - private final LinkedList<List<Row>> data; - private final int closeRequestNum; + // null indicates that the socket was closed + @Nullable private CompletableFuture<SocketConnection> connectionFuture; - private final ServerSocket server; - private boolean running; + /** + * Creates a {@code TestingSinkFunction} and connects it with the passed {@link + * CollectSinkOperatorCoordinator}. + */ + public static TestingSinkFunction createSinkFunctionAndInitializeCoordinator( + CollectSinkOperatorCoordinator coordinator) throws Exception { + final TestingSinkFunction socketServer = new TestingSinkFunction(); + socketServer.registerSinkFunctionWith(coordinator); - private ServerThread(List<List<Row>> data, int closeRequestNum) throws IOException { - this.data = new LinkedList<>(data); - this.closeRequestNum = closeRequestNum; + return socketServer; + } + + /** + * Creates a {@code TestingSinKFunction} that doesn't listen on the configured. Sending + * requests to this function will block forever when trying to connect to it. + */ + public static TestingSinkFunction createTestingSinkFunctionWithoutConnection() + throws IOException { + return new TestingSinkFunction(ignoredServerSocket -> null); + } - this.server = new ServerSocket(0); + /** + * Instantiates a {@code TestingSinkFunction} that starts listening for incoming connections + * right-away. + */ + public TestingSinkFunction() throws IOException { + this(TestingSinkFunction::acceptSocketAsync); + } + + private TestingSinkFunction( + Function<ServerSocket, CompletableFuture<SocketConnection>> socketListenerFactory) + throws IOException { + this.serverSocket = new ServerSocket(0); + + this.connectionFuture = socketListenerFactory.apply(serverSocket); + } + + private static CompletableFuture<SocketConnection> acceptSocketAsync( + ServerSocket serverSocket) { + return CompletableFuture.supplyAsync( + () -> { + try { + return new SocketConnection( + NetUtils.acceptWithoutTimeout(serverSocket)); + } catch (IOException e) { + throw new CompletionException(e); + } + }); + } + + private CompletableFuture<SocketConnection> getConnectionFuture() { + Preconditions.checkState( + connectionFuture != null, + "The accepting Socket is already closed. The calling operation is not possible anymore."); + + return connectionFuture; + } + + /** Returns the {@link InetSocketAddress} of the {@code TestingSinkFunction}. */ + private InetSocketAddress getSocketAddress() { + return new InetSocketAddress( + InetAddress.getLoopbackAddress(), serverSocket.getLocalPort()); + } + + /** Registers the {@code TestingSinkFunction} with the passed {@code coordinator}. */ + public void registerSinkFunctionWith(CollectSinkOperatorCoordinator coordinator) + throws Exception { + coordinator.handleEventFromOperator( + 0, 0, new CollectSinkAddressEvent(getSocketAddress())); } @Override - public void run() { - running = true; - - int requestNum = 0; - Socket socket = null; - DataInputViewStreamWrapper inStream = null; - DataOutputViewStreamWrapper outStream = null; - - try { - while (running) { - if (socket == null) { - socket = NetUtils.acceptWithoutTimeout(server); - inStream = new DataInputViewStreamWrapper(socket.getInputStream()); - outStream = new DataOutputViewStreamWrapper(socket.getOutputStream()); - } - - // parsing the request to ensure correct format of input message - new CollectCoordinationRequest(inStream); - - requestNum++; - if (requestNum >= closeRequestNum) { - // server close abruptly - running = false; - break; - } - - // serialize generic response (only the data is relevant) - new CollectCoordinationResponse( - DEFAULT_SERVER_RESPONSE_VERSION, - DEFAULT_SERVER_RESPONSE_OFFSET, - CollectTestUtils.toBytesList(data.removeFirst(), serializer)) - .serialize(outStream); - } - - socket.close(); - server.close(); - } catch (IOException e) { - // ignore + public void close() throws Exception { + closeAcceptingSocket(); + this.serverSocket.close(); + } + + /** Closes the established connection from the {@code SinkFunction}'s side. */ + public void closeAcceptingSocket() throws Exception { + if (connectionFuture != null) { + connectionFuture.get().close(); + connectionFuture = null; } } - public void close() { - running = false; + /** + * Waits for the connection to be established between the {@code coordinator} and the {@code + * SinkFunction}. This method will block until a request is sent by the coordinator and a + * {@code handle*} call is initiated by this instance. + */ + public void waitForConnectionToBeEstablished() + throws ExecutionException, InterruptedException { + getConnectionFuture().get(); + } + + /** + * Handles a request with the given data in a synchronous fashion. The {@code + * TestingSinkFunction}'s default meta information is attached to the response. + * + * @see #DEFAULT_SINK_FUNCTION_RESPONSE_VERSION + * @see #DEFAULT_SINK_FUNCTION_RESPONSE_OFFSET + */ + public void handleRequest(List<Row> actualData) { + handleRequest( + DEFAULT_SINK_FUNCTION_RESPONSE_VERSION, + DEFAULT_SINK_FUNCTION_RESPONSE_OFFSET, + actualData); + } + + /** + * Handles the next request synchronously. The passed {@code actualData} will be forwarded + * to the response. + */ + public void handleRequest(String actualVersion, int actualOffset, List<Row> actualData) { + handleRequestAsync( + actualVersion, + actualOffset, + CompletableFuture.completedFuture(actualData)) + .join(); + } + + /** + * Waits for the socket connection to be established and handles an incoming request. This + * call will block until a request is sent that can be handled. + */ + public CompletableFuture<Void> handleRequestWithoutResponse() { + return internalConnectWithRequestHandlingAsync().thenApply(socketConnection -> null); + } + + private CompletableFuture<SocketConnection> internalConnectWithRequestHandlingAsync() { + return getConnectionFuture() + .thenApply( + socketConnection -> { + try { + // parsing the request to ensure correct format of input message + new CollectCoordinationRequest( + socketConnection.getDataInputView()); + } catch (IOException e) { + throw new CompletionException(e); + } + + return socketConnection; + }); } - public InetSocketAddress getServerAddress() { - return new InetSocketAddress(InetAddress.getLoopbackAddress(), server.getLocalPort()); + /** + * Handles the request by sending a {@link CollectCoordinationResponse} via the socket. + * + * @return {@code CompleteFuture} that indicates whether the asynchronous processing on the + * {@code SinkFunction}'s side finished. + */ + public CompletableFuture<Void> handleRequestAsync( + String actualVersion, + int actualOffset, + CompletableFuture<List<Row>> actualDataAsync) { + return internalConnectWithRequestHandlingAsync() + .thenCombineAsync( + actualDataAsync, + (socketConnection, data) -> { + if (socketConnection == null) { + throw new CompletionException( + new IllegalStateException( + "No SocketConnection established.")); + } + + try { + // serialize generic response (only the data is relevant) + new CollectCoordinationResponse( + actualVersion, + actualOffset, + CollectTestUtils.toBytesList(data, serializer)) + .serialize(socketConnection.getDataOutputView()); + } catch (IOException e) { + throw new CompletionException(e); + } + + return null; + }); } } }
