This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push:
new 8da9d502357 [FLINK-32751][runtime] Refactors close handling to
CollectSinkOperatorCoordinator (#23237)
8da9d502357 is described below
commit 8da9d50235776e41901b19c0f226f2826f03650e
Author: Matthias Pohl <[email protected]>
AuthorDate: Mon Aug 21 09:19:29 2023 +0200
[FLINK-32751][runtime] Refactors close handling to
CollectSinkOperatorCoordinator (#23237)
shutdownNow() is added to immediately stop any already submitted requests.
That makes
the close logic more explicit rather than relying on the Runnables to
finish implicitly
through the connection loss.
Any ongoing requests are now also cancelled. This allows us to log specific
cases of unexpected
errors properly.
---
.../collect/CollectSinkOperatorCoordinator.java | 140 ++++++++++++---------
1 file changed, 82 insertions(+), 58 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
index e926d625e0a..806ed180496 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
@@ -25,8 +25,10 @@ import
org.apache.flink.runtime.operators.coordination.CoordinationRequestHandle
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,7 +43,10 @@ import java.io.ObjectOutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -63,6 +68,9 @@ public class CollectSinkOperatorCoordinator
private DataInputViewStreamWrapper inStream;
private DataOutputViewStreamWrapper outStream;
+ private final Set<CompletableFuture<CoordinationResponse>> ongoingRequests
=
+ ConcurrentHashMap.newKeySet();
+
private ExecutorService executorService;
public CollectSinkOperatorCoordinator(int socketTimeout) {
@@ -79,8 +87,9 @@ public class CollectSinkOperatorCoordinator
@Override
public void close() throws Exception {
+ LOG.info("Closing the CollectSinkOperatorCoordinator.");
+ this.executorService.shutdownNow();
closeConnection();
- this.executorService.shutdown();
}
@Override
@@ -101,75 +110,82 @@ public class CollectSinkOperatorCoordinator
"Coordination request must be a CollectCoordinationRequest");
CollectCoordinationRequest collectRequest =
(CollectCoordinationRequest) request;
- CompletableFuture<CoordinationResponse> responseFuture = new
CompletableFuture<>();
-
if (address == null) {
- completeWithEmptyResponse(collectRequest, responseFuture);
- return responseFuture;
+ return
CompletableFuture.completedFuture(createEmptyResponse(collectRequest));
}
- executorService.submit(() -> handleRequestImpl(collectRequest,
responseFuture, address));
- return responseFuture;
+ final CompletableFuture<CoordinationResponse> responseFuture =
+ FutureUtils.supplyAsync(
+ () -> handleRequestImpl(collectRequest, address),
executorService);
+
+ ongoingRequests.add(responseFuture);
+ return responseFuture.handle(
+ (response, error) -> {
+ ongoingRequests.remove(responseFuture);
+
+ if (response != null) {
+ return response;
+ }
+
+ // cancelling the future implies that the error handling
happens somewhere else
+ if (!ExceptionUtils.findThrowable(error,
CancellationException.class)
+ .isPresent()) {
+ // Request failed: Close current connection and send
back empty results
+ // we catch every exception here because the Socket
might suddenly become
+ // null. We don't want the coordinator to fail if the
sink fails.
+ if (LOG.isDebugEnabled()) {
+ LOG.warn(
+ "Collect sink coordinator encountered an
unexpected error.",
+ error);
+ } else {
+ LOG.warn(
+ "Collect sink coordinator encounters a {}:
{}",
+ error.getClass().getSimpleName(),
+ error.getMessage());
+ }
+
+ closeConnection();
+ }
+
+ return createEmptyResponse(collectRequest);
+ });
}
- private void handleRequestImpl(
- CollectCoordinationRequest request,
- CompletableFuture<CoordinationResponse> responseFuture,
- InetSocketAddress sinkAddress) {
+ private CoordinationResponse handleRequestImpl(
+ CollectCoordinationRequest request, InetSocketAddress sinkAddress)
throws IOException {
if (sinkAddress == null) {
- closeConnection();
- completeWithEmptyResponse(request, responseFuture);
- return;
+ throw new NullPointerException("No sinkAddress available.");
}
- try {
- if (socket == null) {
- socket = new Socket();
- socket.setSoTimeout(socketTimeout);
- socket.setKeepAlive(true);
- socket.setTcpNoDelay(true);
-
- socket.connect(sinkAddress);
- inStream = new
DataInputViewStreamWrapper(socket.getInputStream());
- outStream = new
DataOutputViewStreamWrapper(socket.getOutputStream());
- LOG.info("Sink connection established");
- }
+ if (socket == null) {
+ socket = new Socket();
+ socket.setSoTimeout(socketTimeout);
+ socket.setKeepAlive(true);
+ socket.setTcpNoDelay(true);
- // send version and offset to sink server
- if (LOG.isDebugEnabled()) {
- LOG.debug("Forwarding request to sink socket server");
- }
- request.serialize(outStream);
-
- // fetch back serialized results
- if (LOG.isDebugEnabled()) {
- LOG.debug("Fetching serialized result from sink socket
server");
- }
- responseFuture.complete(new CollectCoordinationResponse(inStream));
- } catch (Exception e) {
- // request failed, close current connection and send back empty
results
- // we catch every exception here because socket might suddenly
becomes null if the sink
- // fails
- // and we do not want the coordinator to fail
- if (LOG.isDebugEnabled()) {
- // this is normal when sink restarts or job ends, so we print
a debug log
- LOG.debug("Collect sink coordinator encounters an exception",
e);
- }
- closeConnection();
- completeWithEmptyResponse(request, responseFuture);
+ socket.connect(sinkAddress);
+ inStream = new DataInputViewStreamWrapper(socket.getInputStream());
+ outStream = new
DataOutputViewStreamWrapper(socket.getOutputStream());
+ LOG.info("Sink connection established");
}
+
+ // send version and offset to sink server
+ LOG.debug("Forwarding request to sink socket server");
+ request.serialize(outStream);
+
+ // fetch back serialized results
+ LOG.debug("Fetching serialized result from sink socket server");
+ return new CollectCoordinationResponse(inStream);
}
- private void completeWithEmptyResponse(
- CollectCoordinationRequest request,
CompletableFuture<CoordinationResponse> future) {
- future.complete(
- new CollectCoordinationResponse(
- request.getVersion(),
- // this lastCheckpointedOffset is OK
- // because client will only expose results to the
users when the
- // checkpointed offset increases
- -1,
- Collections.emptyList()));
+ private CollectCoordinationResponse
createEmptyResponse(CollectCoordinationRequest request) {
+ return new CollectCoordinationResponse(
+ request.getVersion(),
+ // this lastCheckpointedOffset is OK
+ // because client will only expose results to the users when
the
+ // checkpointed offset increases
+ -1,
+ Collections.emptyList());
}
private void closeConnection() {
@@ -217,6 +233,9 @@ public class CollectSinkOperatorCoordinator
throws Exception {
if (checkpointData == null) {
// restore before any checkpoint completed
+ LOG.info("Any ongoing requests are cancelled due to a coordinator
reset.");
+ cancelOngoingRequests();
+
closeConnection();
} else {
ByteArrayInputStream bais = new
ByteArrayInputStream(checkpointData);
@@ -225,6 +244,11 @@ public class CollectSinkOperatorCoordinator
}
}
+ private void cancelOngoingRequests() {
+ ongoingRequests.forEach(ft -> ft.cancel(true));
+ ongoingRequests.clear();
+ }
+
/** Provider for {@link CollectSinkOperatorCoordinator}. */
public static class Provider implements OperatorCoordinator.Provider {