This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c70460ad878a5c33aa4a16a054e21e94df524d81
Author: Dian Fu <dia...@apache.org>
AuthorDate: Wed Jun 3 16:45:18 2020 +0800

    [FLINK-17959][python] Fix the 'call already cancelled' exception when 
executing Python UDF
    
    This closes #12459.
---
 .../org/apache/beam/runners/fnexecution/state/GrpcStateService.java  | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/flink-python/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
 
b/flink-python/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
index 9081778..010232a 100644
--- 
a/flink-python/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
+++ 
b/flink-python/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
@@ -142,7 +142,10 @@ public class GrpcStateService extends 
BeamFnStateGrpc.BeamFnStateImplBase
 
     @Override
     public void onError(Throwable t) {
-      outboundObserver.onCompleted();
+      if (!t.getMessage().contains("cancelled before receiving half close")) {
+        // ignore the exception "cancelled before receiving half close" as we 
don't care about it.
+        outboundObserver.onError(t);
+      }
     }
 
     @Override

Reply via email to