[
https://issues.apache.org/jira/browse/FLINK-14479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16958790#comment-16958790
]
Wei Zhong commented on FLINK-14479:
-----------------------------------
After the investigation, my suggestion is to keep current status.
In short, this exception is caused by calling
`AbstractPythonFunctionRunner#open` and then calling
`AbstractPythonFunctionRunner#close` immediately (without any data transmission
to python worker). And in this case there is only a chance to trigger.
In detail, in `open` method of `AbstractPythonFunctionRunner` there is a
creation of `stageBundleFactory`. During the creation, a `bundleProcessor` will
be registered to python worker process, and this action is async. If the
`close` method of `AbstractPythonFunctionRunner` is called before the
registration complete, the bottom layer of Apache Beam will send an error to
the Python worker. The content of this error is the exception mentioned in this
JIRA. For details, please see
[here|https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClient.java].
The key to eliminating this exception is to wait for the bundleProcessor
registration to complete. Such as transmit some data to python worker before
closing. In this situation the exception won't appear becasuse the data
transmission must wait for registration to complete before starting. If we add
such a piece of code to the open method:
!screenshot-1.png|width=590,height=327!
to force the runner to wait for registration to complete, the exception won't
occur forever.
Unfortunately, there are no public methods or properties that can get the
bundleProcessor object directly or indirectly, so maybe there is no way we can
do this legally in Flink.
Considering that this error does not affect correctness and performance, and
only occurs randomly when the amount of data is too small to allow each subtask
to receive data. My suggestion is to keep current status. What do you think?
[~sunjincheng121]
> Strange exceptions found in log file after executing `test_udf.py`
> ------------------------------------------------------------------
>
> Key: FLINK-14479
> URL: https://issues.apache.org/jira/browse/FLINK-14479
> Project: Flink
> Issue Type: Improvement
> Components: API / Python
> Reporter: sunjincheng
> Assignee: Wei Zhong
> Priority: Major
> Fix For: 1.10.0
>
> Attachments: screenshot-1.png
>
>
> There are several strange exceptions as follow in
> `${flink_source}/build-target/log/flink-${username}-python-udf-boot-${machine_name}.local.log`
> after executing
> `${flink_source}/flink-python/pyflink/table/tests/test_udf.py`:
> Traceback (most recent call last):
> {code:java}
> File
> "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/runpy.py",
> line 193, in _run_module_as_main
> "__main__", mod_spec)
> File
> "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/runpy.py",
> line 85, in _run_code
> exec(code, run_globals)
> File
> "/Users/zhongwei/flink/flink-python/pyflink/fn_execution/sdk_worker_main.py",
> line 30, in <module>
> apache_beam.runners.worker.sdk_worker_main.main(sys.argv)
> File
> "/Users/zhongwei/pyflink_env/py37/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py",
> line 148, in main
> sdk_pipeline_options.view_as(pipeline_options.ProfilingOptions))
> File
> "/Users/zhongwei/pyflink_env/py37/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 133, in run
> for work_request in control_stub.Control(get_responses()):
> File
> "/Users/zhongwei/pyflink_env/py37/lib/python3.7/site-packages/grpc/_channel.py",
> line 364, in __next__
> return self._next()
> File
> "/Users/zhongwei/pyflink_env/py37/lib/python3.7/site-packages/grpc/_channel.py",
> line 347, in _next
> raise self
> grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with:
> status = StatusCode.CANCELLED
> details = "Runner closed connection"
> debug_error_string =
> "{"created":"@1571660342.057172000","description":"Error received from peer
> ipv6:[::1]:52699","file":"src/core/lib/surface/call.cc","file_line":1052,"grpc_message":"Runner
> closed connection","grpc_status":1}"{code}
> It appears randomly when executing test cases of blink planner. Although it
> does not affect test results we need to find out why it appears.
> Welcome any feedback!
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)