[ 
https://issues.apache.org/jira/browse/FLINK-17959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17125047#comment-17125047
 ] 

Dian Fu edited comment on FLINK-17959 at 6/3/20, 3:18 PM:
----------------------------------------------------------

master: 
 80b002ba7232abcece0c839aaf82cbd2034f9965
 b31b180b3bc928d61521d2a67c2028767428c731
 66d1300f9a51245f98977e654041d4a35cfb8bf1

release-1.11:
 5a2afbc01a438ea2f2598eac9c6dac5f58012f35
 de05d81df34a7e7ccfc9e5fb5d75c4cc3cfa1006
 0195b9819d778bb8b3421beed2b431e6066e6239

release-1.10:
 bd2ca499263a9b5819d3cc2320f59ad5cb7ef5e9
 eafc602022668af68e520e734f9faa15eaa6ac61
 c70460ad878a5c33aa4a16a054e21e94df524d81


was (Author: dian.fu):
master: 
80b002ba7232abcece0c839aaf82cbd2034f9965
b31b180b3bc928d61521d2a67c2028767428c731
66d1300f9a51245f98977e654041d4a35cfb8bf1

release-1.11:
5a2afbc01a438ea2f2598eac9c6dac5f58012f35
de05d81df34a7e7ccfc9e5fb5d75c4cc3cfa1006
0195b9819d778bb8b3421beed2b431e6066e6239

release-1.10:
bd2ca499263a9b5819d3cc2320f59ad5cb7ef5e9
eafc602022668af68e520e734f9faa15eaa6ac61
c70460ad878a5c33aa4a16a054e21e94df524d81

 

 

 

 

 

> Exception: "CANCELLED: call already cancelled" is thrown when run python udf
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-17959
>                 URL: https://issues.apache.org/jira/browse/FLINK-17959
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 1.10.1, 1.11.0
>            Reporter: Hequn Cheng
>            Assignee: Dian Fu
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.11.0, 1.10.2
>
>
> The exception is thrown when running Python UDF:
> {code:java}
> May 27, 2020 3:20:49 PM 
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor run
> SEVERE: Exception while executing runnable 
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed@3960b30e
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.StatusRuntimeException: 
> CANCELLED: call already cancelled
>       at 
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Status.asRuntimeException(Status.java:524)
>       at 
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onCompleted(ServerCalls.java:366)
>       at 
> org.apache.beam.runners.fnexecution.state.GrpcStateService$Inbound.onError(GrpcStateService.java:145)
>       at 
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:270)
>       at 
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
>       at 
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
>       at 
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
>       at 
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
>       at 
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:337)
>       at 
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:793)
>       at 
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>       at 
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> {code}
> The job can output the right results however it seems something goes wrong 
> during the shutdown procedure.
> You can reproduce the exception with the following code(note: the exception 
> happens occasionally):
> {code}
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment, DataTypes
> from pyflink.table.descriptors import Schema, OldCsv, FileSystem
> from pyflink.table.udf import udf
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> t_env = StreamTableEnvironment.create(env)
> add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], 
> DataTypes.BIGINT())
> t_env.register_function("add", add)
> t_env.connect(FileSystem().path('/tmp/input')) \
>     .with_format(OldCsv()
>                  .field('a', DataTypes.BIGINT())
>                  .field('b', DataTypes.BIGINT())) \
>     .with_schema(Schema()
>                  .field('a', DataTypes.BIGINT())
>                  .field('b', DataTypes.BIGINT())) \
>     .create_temporary_table('mySource')
> t_env.connect(FileSystem().path('/tmp/output')) \
>     .with_format(OldCsv()
>                  .field('sum', DataTypes.BIGINT())) \
>     .with_schema(Schema()
>                  .field('sum', DataTypes.BIGINT())) \
>     .create_temporary_table('mySink')
> t_env.from_path('mySource')\
>     .select("add(a, b)") \
>     .insert_into('mySink')
> t_env.execute("tutorial_job")
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to