[ 
https://issues.apache.org/jira/browse/BEAM-12448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17544855#comment-17544855
 ] 

Janek Bevendorff commented on BEAM-12448:
-----------------------------------------

I don't see how this issue is fixed.

This happens with any sort of GroupReduce. I can reproduce it reliably by using 
a WriteToText or WriteToParquet transform or any other global or group reduce 
transform. The Beam worker errors out with


{noformat}
E0601 09:15:12.931656930     362 chttp2_transport.cc:1079]   Received a GOAWAY 
with error code ENHANCE_YOUR_CALM and debug data equal to "too_many_pings"
E0601 09:16:01.707294373     369 chttp2_transport.cc:1079]   Received a GOAWAY 
with error code ENHANCE_YOUR_CALM and debug data equal to "too_many_pings"
Logging client failed: <_MultiThreadedRendezvous of RPC that terminated with:
        status = StatusCode.UNAVAILABLE
        details = "Socket closed"
        debug_error_string = 
"{"created":"@1654074912.932334894","description":"Error received from peer 
ipv6:[::1]:43067","file":"src/core/lib/surface/call.cc","file_line":903,"grpc_message":"Socket
 closed","grpc_status":14}"
>... resetting
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/runpy.py", line 194, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/local/lib/python3.8/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File 
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker_main.py",
 line 289, in <module>
    main(sys.argv)
  File 
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker_main.py",
 line 180, in main
    sdk_harness.run()
  File 
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 234, in run
    for work_request in self._control_stub.Control(get_responses()):
  File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 426, in 
__next__
    return self._next()
  File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 826, in 
_next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that 
terminated with:
        status = StatusCode.UNAVAILABLE
        details = "Socket closed"
        debug_error_string = 
"{"created":"@1654074961.707804272","description":"Error received from peer 
ipv6:[::1]:37959","file":"src/core/lib/surface/call.cc","file_line":903,"grpc_message":"Socket
 closed","grpc_status":14}"
{noformat}


The reduce itself seems to be at least partially successful, but chokes at 
cleanup, so the job always finishes with an error. When writing to text or 
Parquet files, the job often runs fine until the point where (most) temporary 
files are written, but then crashes before it can combine and move the output 
files to their final destination.

This prevents me entirely from running any sort of aggregation. Linear map-only 
jobs are not affected. IMHO, this is a big, bad release blocker. If I have more 
than a dozen workers or more than a few MB of data, then using the 
PortableRunning on Flink, I cannot get a SINGLE combine transform to finish 
without throwing an error. I am trying to process terabytes of data using 
100-200 workers and it's simply impossible.

I tried setting grpc.keepalive_timeout_ms in my pipeline options, but it did 
nothing. I'm also not quite sure that it is actually handed down to the GRPC 
factory at all.

> Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug data equal to 
> "too_many_pings"
> --------------------------------------------------------------------------------------------
>
>                 Key: BEAM-12448
>                 URL: https://issues.apache.org/jira/browse/BEAM-12448
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink, sdk-py-harness
>    Affects Versions: 2.29.0
>            Reporter: Ravikiran Borse
>            Priority: P2
>             Fix For: 2.34.0
>
>         Attachments: Screenshot 2021-06-09 at 8.31.23 AM.png, 
> flink-root-standalonesession-0-gg34-bn.log, 
> flink-root-taskexecutor-0-gg34-bn.log, flink-root-taskexecutor-0-gg34-bn.out, 
> wordcount_with_metrics.py
>
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Beam: 2.31.0
> Flink 1.12.0
> Python 3.8.5
> # tr -dc "A-Za-z 0-9" < /dev/urandom | fold -w100|head -n 9000000 > 
> bigfile.txt
> # python3 wordcount_with_metrics.py --input beam/bigfile.txt --out 
> bigfile-output.txt
>  
>  
> E0603 04:50:42.552403830    8442 chttp2_transport.cc:1117]   Received a 
> GOAWAY with error code ENHANCE_YOUR_CALM and debug data equal to 
> "too_many_pings"
> ERROR:apache_beam.runners.worker.data_plane:Failed to read inputs in the data 
> plane.
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.8/dist-packages/apache_beam/runners/worker/data_plane.py",
>  line 581, in _read_inputs
>     for elements in elements_iterator:
>   File "/usr/local/lib/python3.8/dist-packages/grpc/_channel.py", line 426, 
> in __next__
>     return self._next()
>   File "/usr/local/lib/python3.8/dist-packages/grpc/_channel.py", line 826, 
> in _next
>     raise self
> grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that 
> terminated with:
>  status = StatusCode.UNAVAILABLE
>  details = "Socket closed"
>  debug_error_string = 
> "\{"created":"@1622695842.552861174","description":"Error received from peer 
> ipv6:[::1]:45717","file":"src/core/lib/surface/call.cc","file_line":1066,"grpc_message":"Socket
>  closed","grpc_status":14}"
> >
> Exception in thread read_grpc_client_inputs:
> Traceback (most recent call last):
>   File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner
>     self.run()
>   File "/usr/lib/python3.8/threading.py", line 870, in run
>     self._target(*self._args, **self._kwargs)
>   File 
> "/usr/local/lib/python3.8/dist-packages/apache_beam/runners/worker/data_plane.py",
>  line 598, in <lambda>
>     target=lambda: self._read_inputs(elements_iterator),
>   File 
> "/usr/local/lib/python3.8/dist-packages/apache_beam/runners/worker/data_plane.py",
>  line 581, in _read_inputs
>     for elements in elements_iterator:
>   File "/usr/local/lib/python3.8/dist-packages/grpc/_channel.py", line 426, 
> in __next__
>     return self._next()
>   File "/usr/local/lib/python3.8/dist-packages/grpc/_channel.py", line 826, 
> in _next
>     raise self
> grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that 
> terminated with:
>  status = StatusCode.UNAVAILABLE
>  details = "Socket closed"
>  debug_error_string = 
> "\{"created":"@1622695842.552861174","description":"Error received from peer 
> ipv6:[::1]:45717","file":"src/core/lib/surface/call.cc","file_line":1066,"grpc_message":"Socket
>  closed","grpc_status":14}"



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to