[ 
https://issues.apache.org/jira/browse/FLINK-14479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16958790#comment-16958790
 ] 

Wei Zhong commented on FLINK-14479:
-----------------------------------

After the investigation, my suggestion is to keep current status.

In short, this exception is caused by calling 
`AbstractPythonFunctionRunner#open` and then calling 
`AbstractPythonFunctionRunner#close` immediately (without any data transmission 
to python worker). And in this case there is only a chance to trigger.

In detail, in `open` method of `AbstractPythonFunctionRunner` there is a 
creation of `stageBundleFactory`. During the creation, a `bundleProcessor` will 
be registered to python worker process, and this action is async. If the 
`close` method of `AbstractPythonFunctionRunner` is called before the 
registration complete, the bottom layer of Apache Beam will send an error to 
the Python worker. The content of this error is the exception mentioned in this 
JIRA. For details, please see 
[here|https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClient.java].

The key to eliminating this exception is to wait for the bundleProcessor 
registration to complete. Such as transmit some data to python worker before 
closing. In this situation the exception won't appear becasuse the data 
transmission must wait for registration to complete before starting. If we add 
such a piece of code to the open method:

!screenshot-1.png|width=590,height=327!

to force the runner to wait for registration to complete, the exception won't 
occur forever.

Unfortunately, there are no public methods or properties that can get the 
bundleProcessor object directly or indirectly, so maybe there is no way we can 
do this legally in Flink.

Considering that this error does not affect correctness and performance, and 
only occurs randomly when the amount of data is too small to allow each subtask 
to receive data. My suggestion is to keep current status. What do you think? 
[~sunjincheng121]

> Strange exceptions found in log file after executing `test_udf.py`
> ------------------------------------------------------------------
>
>                 Key: FLINK-14479
>                 URL: https://issues.apache.org/jira/browse/FLINK-14479
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / Python
>            Reporter: sunjincheng
>            Assignee: Wei Zhong
>            Priority: Major
>             Fix For: 1.10.0
>
>         Attachments: screenshot-1.png
>
>
> There are several strange exceptions as follow in 
> `${flink_source}/build-target/log/flink-${username}-python-udf-boot-${machine_name}.local.log`
>  after executing 
> `${flink_source}/flink-python/pyflink/table/tests/test_udf.py`:
> Traceback (most recent call last):
> {code:java}
>  File 
> "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/runpy.py",
>  line 193, in _run_module_as_main
>  "__main__", mod_spec)
>  File 
> "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/runpy.py",
>  line 85, in _run_code
>  exec(code, run_globals)
>  File 
> "/Users/zhongwei/flink/flink-python/pyflink/fn_execution/sdk_worker_main.py", 
> line 30, in <module>
>  apache_beam.runners.worker.sdk_worker_main.main(sys.argv)
>  File 
> "/Users/zhongwei/pyflink_env/py37/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py",
>  line 148, in main
>  sdk_pipeline_options.view_as(pipeline_options.ProfilingOptions))
>  File 
> "/Users/zhongwei/pyflink_env/py37/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 133, in run
>  for work_request in control_stub.Control(get_responses()):
>  File 
> "/Users/zhongwei/pyflink_env/py37/lib/python3.7/site-packages/grpc/_channel.py",
>  line 364, in __next__
>  return self._next()
>  File 
> "/Users/zhongwei/pyflink_env/py37/lib/python3.7/site-packages/grpc/_channel.py",
>  line 347, in _next
>  raise self
> grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with:
>  status = StatusCode.CANCELLED
>  details = "Runner closed connection"
>  debug_error_string = 
> "{"created":"@1571660342.057172000","description":"Error received from peer 
> ipv6:[::1]:52699","file":"src/core/lib/surface/call.cc","file_line":1052,"grpc_message":"Runner
>  closed connection","grpc_status":1}"{code}
> It appears randomly when executing test cases of blink planner. Although it 
> does not affect test results we need to find out why it appears.
> Welcome any feedback!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to