[ 
https://issues.apache.org/jira/browse/BEAM-12603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17530968#comment-17530968
 ] 

Brian Hulette commented on BEAM-12603:
--------------------------------------

Unfortunately I can't glean anything particularly useful from that debug log. 
What's interesting is that the gRPC channels were _just_ created, it seems to 
be one the very first bundle where the "Broken Pipe" occurred. 

For reference, I capture a portion of this sequence from a passing test:
{code}
INFO     
apache_beam.runners.portability.fn_api_runner.worker_handlers:worker_handlers.py:521
 starting control server on port 46845                                          
                                                           
INFO     
apache_beam.runners.portability.fn_api_runner.worker_handlers:worker_handlers.py:522
 starting data server on port 45973                                             
                                                           
INFO     
apache_beam.runners.portability.fn_api_runner.worker_handlers:worker_handlers.py:523
 starting state server on port 42855                                            
                                                           
INFO     
apache_beam.runners.portability.fn_api_runner.worker_handlers:worker_handlers.py:524
 starting logging server on port 33253                                          
                                                           
INFO     
apache_beam.runners.portability.fn_api_runner.worker_handlers:worker_handlers.py:889
 Created Worker handler 
<apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedGrpcWorkerHandler
 object at 0x7f43757685b0> 
for environment ref_Environment_default_environment_1 
(beam:env:embedded_python_grpc:v1, b'{"state_cache_size": 0, 
"data_buffer_time_limit_ms": 0}')                                               
                                     
INFO     apache_beam.runners.worker.statecache:statecache.py:172 Creating state 
cache with size 100                                                             
                                                                        
INFO     apache_beam.runners.worker.sdk_worker:sdk_worker.py:164 Creating 
insecure control channel for localhost:46845.                                   
                                                                              
INFO     apache_beam.runners.worker.sdk_worker:sdk_worker.py:172 Control 
channel established.                                                            
                                                                               
INFO     apache_beam.runners.worker.sdk_worker:sdk_worker.py:215 Initializing 
SDKHarness with unbounded number of workers.                                    
                                                                          DEBUG 
   apache_beam.runners.portability.fn_api_runner.execution:execution.py:847 
Scheduling bundle in stage for execution: 
((((ref_AppliedPTransform_Create-Impulse_3)+(ref_AppliedPTransform_Create-FlatMap-lambda-at-core-py-3320-_4)
)+(ref_AppliedPTransform_Create-MaybeReshuffle-Reshuffle-AddRandomKeys_7))+(ref_AppliedPTransform_Create-MaybeReshuffle-Reshuffle-ReshufflePerKey-Map-reify_timestamps-_9))+(Create/MaybeReshuffle/Reshuffle/ReshufflePerKey/GroupByKey/Write)
                                                                                
                                                                                
                                                                  
DEBUG    
apache_beam.runners.portability.fn_api_runner.execution:execution.py:847 
Scheduling bundle in stage for execution: 
(((((ref_AppliedPTransform_assert_that-Create-Impulse_17)+(ref_AppliedPTransform_assert_that-Create-FlatMap-lambda-at-core-py-3320-_18))+(ref_AppliedPTransform_assert_that-Create-Map-decode-_20))+(ref_AppliedPTransform_assert_that-Group-CoGroupByKeyImpl-Tag-0-_25))+(assert_that/Group/CoGroupByKeyImpl/Flatten/Transcode/0))+(assert_that/Gro
up/CoGroupByKeyImpl/Flatten/Write/0)                                            
                                                                                
                                                                        DEBUG   
 apache_beam.runners.portability.fn_api_runner.fn_runner:fn_runner.py:388 
Remaining ready bundles: 2                                                      
                                                                      
                                                                                
        Watermark pending bundles: 0                                            
                                                                                
                                                                                
Time pending bundles: 0                                                         
                                                                
DEBUG    
apache_beam.runners.portability.fn_api_runner.fn_runner.run_bundle:fn_runner.py:400
 Running bundle for stage 
((((ref_AppliedPTransform_Create-Impulse_3)+(ref_AppliedPTransform_Create-FlatMap-lambda-at-core-py-3320-_4))+(ref
_AppliedPTransform_Create-MaybeReshuffle-Reshuffle-AddRandomKeys_7))+(ref_AppliedPTransform_Create-MaybeReshuffle-Reshuffle-ReshufflePerKey-Map-reify_timestamps-_9))+(Create/MaybeReshuffle/Reshuffle/ReshufflePerKey/GroupByKey/Write)
                                                                                
                Expected outputs: 
{'Create/MaybeReshuffle/Reshuffle/ReshufflePerKey/GroupByKey/Write': 
b'group:ref_AppliedPTransform_Create-MaybeReshuff
le-Reshuffle-ReshufflePerKey-GroupByKey_10'} timers: {}                         
                                                                                
                                                                        
DEBUG    apache_beam.runners.worker.sdk_worker:sdk_worker.py:235 Got work 
bundle_1
DEBUG    apache_beam.runners.worker.sdk_worker:sdk_worker.py:343 Currently 
using 2 threads.
INFO     apache_beam.runners.worker.sdk_worker:sdk_worker.py:840 Creating 
insecure state channel for localhost:42855.
INFO     apache_beam.runners.worker.sdk_worker:sdk_worker.py:847 State channel 
established.
INFO     apache_beam.runners.worker.data_plane:data_plane.py:750 Creating 
client data channel for localhost:45973
DEBUG    apache_beam.runners.worker.bundle_processor:bundle_processor.py:951 
start <DataOutputOperation 
Create/MaybeReshuffle/Reshuffle/ReshufflePerKey/GroupByKey/Write >
DEBUG    apache_beam.runners.worker.bundle_processor:bundle_processor.py:951 
start <DoOperation 
Create/MaybeReshuffle/Reshuffle/ReshufflePerKey/Map(reify_timestamps) 
output_tags=['None'], receivers=[SingletonConsumerSet[Create/Maybe
Reshuffle/Reshuffle/ReshufflePerKey/Map(reify_timestamps).out0, 
coder=WindowedValueCoder[TupleCoder[LengthPrefixCoder[DeterministicFastPrimitivesCoder],
 LengthPrefixCoder[FastPrimitivesCoder]]], len(consumers)=1]]>
DEBUG    apache_beam.runners.worker.bundle_processor:bundle_processor.py:951 
start <DoOperation Create/MaybeReshuffle/Reshuffle/AddRandomKeys 
output_tags=['None'], 
receivers=[SingletonConsumerSet[Create/MaybeReshuffle/Reshuffle/AddR
andomKeys.out0, coder=WindowedValueCoder[TupleCoder[VarIntCoder, BytesCoder]], 
len(consumers)=1]]>
DEBUG    apache_beam.runners.worker.bundle_processor:bundle_processor.py:951 
start <DoOperation Create/FlatMap(<lambda at core.py:3320>) 
output_tags=['None'], receivers=[SingletonConsumerSet[Create/FlatMap(<lambda at 
core.py:3320>).
out0, coder=WindowedValueCoder[BytesCoder], len(consumers)=1]]>
DEBUG    apache_beam.runners.worker.bundle_processor:bundle_processor.py:951 
start <DataInputOperation Create/Impulse 
receivers=[SingletonConsumerSet[Create/Impulse.out0, 
coder=WindowedValueCoder[BytesCoder], len(consumers)=1]]>
DEBUG    apache_beam.runners.worker.bundle_processor:bundle_processor.py:1000 
finish <DataInputOperation Create/Impulse 
receivers=[SingletonConsumerSet[Create/Impulse.out0, 
coder=WindowedValueCoder[BytesCoder], len(consumers)=1]]>
DEBUG    apache_beam.runners.worker.bundle_processor:bundle_processor.py:1000 
finish <DoOperation Create/FlatMap(<lambda at core.py:3320>) 
output_tags=['None'], receivers=[SingletonConsumerSet[Create/FlatMap(<lambda at 
core.py:3320>
).out0, coder=WindowedValueCoder[BytesCoder], len(consumers)=1]]>
{code}

> Flaky: 
> apache_beam.runners.portability.fn_api_runner.fn_runner_test.FnApiRunnerTestWithGrpcAndMultiWorkers
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-12603
>                 URL: https://issues.apache.org/jira/browse/BEAM-12603
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core, test-failures
>            Reporter: Tyson Hamilton
>            Assignee: Yichi Zhang
>            Priority: P1
>              Labels: flake
>             Fix For: Not applicable
>
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> The 
> `apache_beam.runners.portability.fn_api_runner.fn_runner_test.FnApiRunnerTestWithGrpcAndMultiWorkers`
>  tests are flaky and causing precommit failures that seem similar.
> `test_pardo_windowed_side_inputs` : 
> [https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/4417/console]
> {code:java}
> 23:32:35 Exception in thread read_grpc_client_inputs:
> 23:32:35 Traceback (most recent call last):
> 23:32:35   File "/usr/lib/python3.8/threading.py", line 932, in 
> _bootstrap_inner
> 23:32:35     self.run()
> 23:32:35   File "/usr/lib/python3.8/threading.py", line 870, in run
> 23:32:35     self._target(*self._args, **self._kwargs)
> 23:32:35   File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/test-suites/tox/py38/build/srcs/sdks/python/apache_beam/runners/worker/data_plane.py",
>  line 587, in <lambda>
> 23:32:35     target=lambda: self._read_inputs(elements_iterator),
> 23:32:35   File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/test-suites/tox/py38/build/srcs/sdks/python/apache_beam/runners/worker/data_plane.py",
>  line 570, in _read_inputs
> 23:32:35     for elements in elements_iterator:
> 23:32:35   File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/test-suites/tox/py38/build/srcs/sdks/python/target/.tox-py38-cython/py38-cython/lib/python3.8/site-packages/grpc/_channel.py",
>  line 426, in __next__
> 23:32:35     return self._next()
> 23:32:35   File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/test-suites/tox/py38/build/srcs/sdks/python/target/.tox-py38-cython/py38-cython/lib/python3.8/site-packages/grpc/_channel.py",
>  line 826, in _next
> 23:32:35     raise self
> 23:32:35 grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of 
> RPC that terminated with:
> 23:32:35      status = StatusCode.UNAVAILABLE
> 23:32:35      details = "Broken pipe"
> 23:32:35      debug_error_string = 
> "{"created":"@1626071403.252458842","description":"Error received from peer 
> ipv4:127.0.0.1:37459","file":"src/core/lib/surface/call.cc","file_line":1066,"grpc_message":"Broken
>  pipe","grpc_status":14}"
> 23:32:35 >
> 23:32:35 Traceback (most recent call last):
> 23:32:35   File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/test-suites/tox/py38/build/srcs/sdks/python/apache_beam/runners/worker/data_plane.py",
>  line 459, in input_elements
> 23:32:35     element = received.get(timeout=1)
> 23:32:35   File "/usr/lib/python3.8/queue.py", line 178, in get
> 23:32:35     raise Empty
> 23:32:35 _queue.Empty
> {code}
>  
> `test_pack_combiners` :  
> [https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/4415/consoleFull]
> {code:java}
> 11:41:14 Exception ignored in: <object repr() failed>
> 11:41:14 Traceback (most recent call last):
> 11:41:14   File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/test-suites/tox/py36/build/srcs/sdks/python/target/.tox-py36-cython/py36-cython/lib/python3.6/site-packages/grpc/_channel.py",
>  line 444, in __del__
> 11:41:14     with self._state.condition:
> 11:41:14 AttributeError: '_MultiThreadedRendezvous' object has no attribute 
> '_state'
> 11:41:14 Traceback (most recent call last):
> 11:41:14   File "apache_beam/runners/common.py", line 1223, in 
> apache_beam.runners.common.DoFnRunner.process
> 11:41:14     return self.do_fn_invoker.invoke_process(windowed_value)
> 11:41:14   File "apache_beam/runners/common.py", line 752, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process
> 11:41:14     self._invoke_process_per_window(
> 11:41:14   File "apache_beam/runners/common.py", line 816, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
> 11:41:14     [si[global_window] for si in self.side_inputs]))
> 11:41:14   File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/test-suites/tox/py36/build/srcs/sdks/python/apache_beam/runners/worker/bundle_processor.py",
>  line 427, in __getitem__
> 11:41:14     self._cache[target_window] = 
> self._side_input_data.view_fn(raw_view)
> 11:41:14   File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/test-suites/tox/py36/build/srcs/sdks/python/apache_beam/runners/worker/bundle_processor.py",
>  line 353, in __iter__
> 11:41:14     self._state_handler.blocking_get(self._state_key, 
> self._coder_impl))
> 11:41:14   File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/test-suites/tox/py36/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
>  line 1184, in blocking_get
> 11:41:14     self._partially_cached_iterable(state_key, coder))
> 11:41:14   File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/test-suites/tox/py36/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
>  line 1290, in _partially_cached_iterable
> 11:41:14     data, continuation_token = self._underlying.get_raw(state_key, 
> None)
> 11:41:14   File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/test-suites/tox/py36/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
>  line 1057, in get_raw
> 11:41:14     continuation_token=continuation_token)))
> 11:41:14   File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/test-suites/tox/py36/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
>  line 1095, in _blocking_request
> 11:41:14     raise t(v).with_traceback(tb)
> 11:41:14 TypeError: __init__() missing 3 required positional arguments: 
> 'call', 'response_deserializer', and 'deadline'
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to