XComp commented on code in PR #23296:
URL: https://github.com/apache/flink/pull/23296#discussion_r1308288979
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java:
##########
@@ -173,72 +174,192 @@ private static void assertResponse(
}
}
- private static class ServerThread extends Thread {
+ /**
+ * {@code TestingSocketServer} simulates the server side of the {@link
+ * CollectSinkOperatorCoordinator} communication.
+ */
+ private static class TestingSocketServer implements AutoCloseable {
- static final String DEFAULT_SERVER_RESPONSE_VERSION =
"server-response-version";
+ static final String DEFAULT_SERVER_RESPONSE_VERSION = "version";
static final int DEFAULT_SERVER_RESPONSE_OFFSET = 2;
- private final LinkedList<List<Row>> data;
- private final int closeRequestNum;
+ private final ServerSocket serverSocket;
- private final ServerSocket server;
- private boolean running;
+ private CompletableFuture<SocketConnection> connectionFuture = new
CompletableFuture<>();
- private ServerThread(List<List<Row>> data, int closeRequestNum) throws
IOException {
- this.data = new LinkedList<>(data);
- this.closeRequestNum = closeRequestNum;
+ /**
+ * Creates a {@code TestingSocketServer} and connects it with the
passed {@link
+ * CollectSinkOperatorCoordinator}.
+ */
+ public static TestingSocketServer
createSocketServerAndInitializeCoordinator(
+ CollectSinkOperatorCoordinator coordinator) throws Exception {
+ final TestingSocketServer socketServer = new TestingSocketServer();
+ coordinator.handleEventFromOperator(
+ 0, 0, new
CollectSinkAddressEvent(socketServer.getSocketAddress()));
- this.server = new ServerSocket(0);
+ return socketServer;
+ }
+
+ /**
+ * Instantiates a {@code TestingSocketServer}. The instance is not
listening, yet. The
+ * connection will be established with the first request being handled.
+ *
+ * @see #handleRequest(List)
+ * @see #handleRequest(String, int, List)
+ * @see #handleRequestAsync(CompletableFuture)
+ * @see #handleRequestAsync(String, int, CompletableFuture)
+ */
+ public TestingSocketServer() throws IOException {
+ this.serverSocket = new ServerSocket(0);
+ }
+
+ /** Returns the {@link InetSocketAddress} of the {@code
TestingSocketServer}. */
+ public InetSocketAddress getSocketAddress() {
+ return new InetSocketAddress(
+ InetAddress.getLoopbackAddress(),
serverSocket.getLocalPort());
}
@Override
- public void run() {
- running = true;
-
- int requestNum = 0;
- Socket socket = null;
- DataInputViewStreamWrapper inStream = null;
- DataOutputViewStreamWrapper outStream = null;
-
- try {
- while (running) {
- if (socket == null) {
- socket = NetUtils.acceptWithoutTimeout(server);
- inStream = new
DataInputViewStreamWrapper(socket.getInputStream());
- outStream = new
DataOutputViewStreamWrapper(socket.getOutputStream());
- }
-
- // parsing the request to ensure correct format of input
message
- new CollectCoordinationRequest(inStream);
-
- requestNum++;
- if (requestNum >= closeRequestNum) {
- // server close abruptly
- running = false;
- break;
- }
-
- // serialize generic response (only the data is relevant)
- new CollectCoordinationResponse(
- DEFAULT_SERVER_RESPONSE_VERSION,
- DEFAULT_SERVER_RESPONSE_OFFSET,
-
CollectTestUtils.toBytesList(data.removeFirst(), serializer))
- .serialize(outStream);
- }
-
- socket.close();
- server.close();
- } catch (IOException e) {
- // ignore
+ public void close() throws Exception {
+ closeAcceptingSocket();
+ this.serverSocket.close();
+ }
+
+ private CompletableFuture<SocketConnection> acceptSocketAsync() {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return new SocketConnection(
+
NetUtils.acceptWithoutTimeout(serverSocket));
+ } catch (IOException e) {
+ throw new CompletionException(e);
+ }
+ });
+ }
+
+ /** Closes the established connection from the server side. */
+ public void closeAcceptingSocket() throws Exception {
+ if (connectionFuture.isDone()) {
+ connectionFuture.get().close();
+ connectionFuture = new CompletableFuture<>();
}
}
- public void close() {
- running = false;
+ /**
+ * Waits for the connection to be established between the client and
the server. This method
+ * will block until a request is sent by the client and a {@code
handle*} call is initiated
+ * by this instance.
+ */
+ public void waitForConnectionToBeEstablished()
+ throws ExecutionException, InterruptedException {
+ connectionFuture.get();
+ }
+
+ /**
+ * Handles a request with the given data in a synchronous fashion. The
{@code
+ * TestingSocketServer}'s default meta information is attached to the
response.
+ *
+ * @see #DEFAULT_SERVER_RESPONSE_VERSION
+ * @see #DEFAULT_SERVER_RESPONSE_OFFSET
+ */
+ public void handleRequest(List<Row> actualData) {
+ handleRequest(
+ DEFAULT_SERVER_RESPONSE_VERSION,
DEFAULT_SERVER_RESPONSE_OFFSET, actualData);
+ }
+
+ /**
+ * Handles the next request synchronously. The passed {@code
actualData} will be forwarded
+ * to the response.
+ */
+ public void handleRequest(String actualVersion, int actualOffset,
List<Row> actualData) {
+ handleRequestAsync(
+ actualVersion,
+ actualOffset,
+ CompletableFuture.completedFuture(actualData))
+ .join();
+ }
+
+ /**
+ * Handles a request with the given data in an asynchronous fashion
with the {@code
+ * TestingSocketServer}'s default meta information being attached to
the response. The
+ * returned future completes when the server side's processing
completes.
+ *
+ * @see #DEFAULT_SERVER_RESPONSE_VERSION
+ * @see #DEFAULT_SERVER_RESPONSE_OFFSET
+ */
+ public CompletableFuture<Void> handleRequestAsync(
+ CompletableFuture<List<Row>> actualDataAsync) {
+ return handleRequestAsync(
+ DEFAULT_SERVER_RESPONSE_VERSION,
+ DEFAULT_SERVER_RESPONSE_OFFSET,
+ actualDataAsync);
+ }
+
+ private CompletableFuture<SocketConnection> connectSocket() {
+ if (!connectionFuture.isDone()) {
+ FutureUtils.forward(acceptSocketAsync(), connectionFuture);
Review Comment:
That remark, I don't get. We want to have the same future that's accessible
all the time (that's why `new CompletableFuture` here and in the constructor)
but we do want to trigger the socket listening later on. How would you
implement that differently. It feels natural to me. But I might have missed
your point.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]