This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit c70460ad878a5c33aa4a16a054e21e94df524d81 Author: Dian Fu <dia...@apache.org> AuthorDate: Wed Jun 3 16:45:18 2020 +0800 [FLINK-17959][python] Fix the 'call already cancelled' exception when executing Python UDF This closes #12459. --- .../org/apache/beam/runners/fnexecution/state/GrpcStateService.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/flink-python/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java b/flink-python/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java index 9081778..010232a 100644 --- a/flink-python/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java +++ b/flink-python/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java @@ -142,7 +142,10 @@ public class GrpcStateService extends BeamFnStateGrpc.BeamFnStateImplBase @Override public void onError(Throwable t) { - outboundObserver.onCompleted(); + if (!t.getMessage().contains("cancelled before receiving half close")) { + // ignore the exception "cancelled before receiving half close" as we don't care about it. + outboundObserver.onError(t); + } } @Override