[
https://issues.apache.org/jira/browse/BEAM-12603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17530968#comment-17530968
]
Brian Hulette commented on BEAM-12603:
--------------------------------------
Unfortunately I can't glean anything particularly useful from that debug log.
What's interesting is that the gRPC channels were _just_ created, it seems to
be one the very first bundle where the "Broken Pipe" occurred.
For reference, I capture a portion of this sequence from a passing test:
{code}
INFO
apache_beam.runners.portability.fn_api_runner.worker_handlers:worker_handlers.py:521
starting control server on port 46845
INFO
apache_beam.runners.portability.fn_api_runner.worker_handlers:worker_handlers.py:522
starting data server on port 45973
INFO
apache_beam.runners.portability.fn_api_runner.worker_handlers:worker_handlers.py:523
starting state server on port 42855
INFO
apache_beam.runners.portability.fn_api_runner.worker_handlers:worker_handlers.py:524
starting logging server on port 33253
INFO
apache_beam.runners.portability.fn_api_runner.worker_handlers:worker_handlers.py:889
Created Worker handler
<apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedGrpcWorkerHandler
object at 0x7f43757685b0>
for environment ref_Environment_default_environment_1
(beam:env:embedded_python_grpc:v1, b'{"state_cache_size": 0,
"data_buffer_time_limit_ms": 0}')
INFO apache_beam.runners.worker.statecache:statecache.py:172 Creating state
cache with size 100
INFO apache_beam.runners.worker.sdk_worker:sdk_worker.py:164 Creating
insecure control channel for localhost:46845.
INFO apache_beam.runners.worker.sdk_worker:sdk_worker.py:172 Control
channel established.
INFO apache_beam.runners.worker.sdk_worker:sdk_worker.py:215 Initializing
SDKHarness with unbounded number of workers.
DEBUG
apache_beam.runners.portability.fn_api_runner.execution:execution.py:847
Scheduling bundle in stage for execution:
((((ref_AppliedPTransform_Create-Impulse_3)+(ref_AppliedPTransform_Create-FlatMap-lambda-at-core-py-3320-_4)
)+(ref_AppliedPTransform_Create-MaybeReshuffle-Reshuffle-AddRandomKeys_7))+(ref_AppliedPTransform_Create-MaybeReshuffle-Reshuffle-ReshufflePerKey-Map-reify_timestamps-_9))+(Create/MaybeReshuffle/Reshuffle/ReshufflePerKey/GroupByKey/Write)
DEBUG
apache_beam.runners.portability.fn_api_runner.execution:execution.py:847
Scheduling bundle in stage for execution:
(((((ref_AppliedPTransform_assert_that-Create-Impulse_17)+(ref_AppliedPTransform_assert_that-Create-FlatMap-lambda-at-core-py-3320-_18))+(ref_AppliedPTransform_assert_that-Create-Map-decode-_20))+(ref_AppliedPTransform_assert_that-Group-CoGroupByKeyImpl-Tag-0-_25))+(assert_that/Group/CoGroupByKeyImpl/Flatten/Transcode/0))+(assert_that/Gro
up/CoGroupByKeyImpl/Flatten/Write/0)
DEBUG
apache_beam.runners.portability.fn_api_runner.fn_runner:fn_runner.py:388
Remaining ready bundles: 2
Watermark pending bundles: 0
Time pending bundles: 0
DEBUG
apache_beam.runners.portability.fn_api_runner.fn_runner.run_bundle:fn_runner.py:400
Running bundle for stage
((((ref_AppliedPTransform_Create-Impulse_3)+(ref_AppliedPTransform_Create-FlatMap-lambda-at-core-py-3320-_4))+(ref
_AppliedPTransform_Create-MaybeReshuffle-Reshuffle-AddRandomKeys_7))+(ref_AppliedPTransform_Create-MaybeReshuffle-Reshuffle-ReshufflePerKey-Map-reify_timestamps-_9))+(Create/MaybeReshuffle/Reshuffle/ReshufflePerKey/GroupByKey/Write)
Expected outputs:
{'Create/MaybeReshuffle/Reshuffle/ReshufflePerKey/GroupByKey/Write':
b'group:ref_AppliedPTransform_Create-MaybeReshuff
le-Reshuffle-ReshufflePerKey-GroupByKey_10'} timers: {}
DEBUG apache_beam.runners.worker.sdk_worker:sdk_worker.py:235 Got work
bundle_1
DEBUG apache_beam.runners.worker.sdk_worker:sdk_worker.py:343 Currently
using 2 threads.
INFO apache_beam.runners.worker.sdk_worker:sdk_worker.py:840 Creating
insecure state channel for localhost:42855.
INFO apache_beam.runners.worker.sdk_worker:sdk_worker.py:847 State channel
established.
INFO apache_beam.runners.worker.data_plane:data_plane.py:750 Creating
client data channel for localhost:45973
DEBUG apache_beam.runners.worker.bundle_processor:bundle_processor.py:951
start <DataOutputOperation
Create/MaybeReshuffle/Reshuffle/ReshufflePerKey/GroupByKey/Write >
DEBUG apache_beam.runners.worker.bundle_processor:bundle_processor.py:951
start <DoOperation
Create/MaybeReshuffle/Reshuffle/ReshufflePerKey/Map(reify_timestamps)
output_tags=['None'], receivers=[SingletonConsumerSet[Create/Maybe
Reshuffle/Reshuffle/ReshufflePerKey/Map(reify_timestamps).out0,
coder=WindowedValueCoder[TupleCoder[LengthPrefixCoder[DeterministicFastPrimitivesCoder],
LengthPrefixCoder[FastPrimitivesCoder]]], len(consumers)=1]]>
DEBUG apache_beam.runners.worker.bundle_processor:bundle_processor.py:951
start <DoOperation Create/MaybeReshuffle/Reshuffle/AddRandomKeys
output_tags=['None'],
receivers=[SingletonConsumerSet[Create/MaybeReshuffle/Reshuffle/AddR
andomKeys.out0, coder=WindowedValueCoder[TupleCoder[VarIntCoder, BytesCoder]],
len(consumers)=1]]>
DEBUG apache_beam.runners.worker.bundle_processor:bundle_processor.py:951
start <DoOperation Create/FlatMap(<lambda at core.py:3320>)
output_tags=['None'], receivers=[SingletonConsumerSet[Create/FlatMap(<lambda at
core.py:3320>).
out0, coder=WindowedValueCoder[BytesCoder], len(consumers)=1]]>
DEBUG apache_beam.runners.worker.bundle_processor:bundle_processor.py:951
start <DataInputOperation Create/Impulse
receivers=[SingletonConsumerSet[Create/Impulse.out0,
coder=WindowedValueCoder[BytesCoder], len(consumers)=1]]>
DEBUG apache_beam.runners.worker.bundle_processor:bundle_processor.py:1000
finish <DataInputOperation Create/Impulse
receivers=[SingletonConsumerSet[Create/Impulse.out0,
coder=WindowedValueCoder[BytesCoder], len(consumers)=1]]>
DEBUG apache_beam.runners.worker.bundle_processor:bundle_processor.py:1000
finish <DoOperation Create/FlatMap(<lambda at core.py:3320>)
output_tags=['None'], receivers=[SingletonConsumerSet[Create/FlatMap(<lambda at
core.py:3320>
).out0, coder=WindowedValueCoder[BytesCoder], len(consumers)=1]]>
{code}
> Flaky:
> apache_beam.runners.portability.fn_api_runner.fn_runner_test.FnApiRunnerTestWithGrpcAndMultiWorkers
> ----------------------------------------------------------------------------------------------------------
>
> Key: BEAM-12603
> URL: https://issues.apache.org/jira/browse/BEAM-12603
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core, test-failures
> Reporter: Tyson Hamilton
> Assignee: Yichi Zhang
> Priority: P1
> Labels: flake
> Fix For: Not applicable
>
> Time Spent: 2h 50m
> Remaining Estimate: 0h
>
> The
> `apache_beam.runners.portability.fn_api_runner.fn_runner_test.FnApiRunnerTestWithGrpcAndMultiWorkers`
> tests are flaky and causing precommit failures that seem similar.
> `test_pardo_windowed_side_inputs` :
> [https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/4417/console]
> {code:java}
> 23:32:35 Exception in thread read_grpc_client_inputs:
> 23:32:35 Traceback (most recent call last):
> 23:32:35 File "/usr/lib/python3.8/threading.py", line 932, in
> _bootstrap_inner
> 23:32:35 self.run()
> 23:32:35 File "/usr/lib/python3.8/threading.py", line 870, in run
> 23:32:35 self._target(*self._args, **self._kwargs)
> 23:32:35 File
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/test-suites/tox/py38/build/srcs/sdks/python/apache_beam/runners/worker/data_plane.py",
> line 587, in <lambda>
> 23:32:35 target=lambda: self._read_inputs(elements_iterator),
> 23:32:35 File
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/test-suites/tox/py38/build/srcs/sdks/python/apache_beam/runners/worker/data_plane.py",
> line 570, in _read_inputs
> 23:32:35 for elements in elements_iterator:
> 23:32:35 File
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/test-suites/tox/py38/build/srcs/sdks/python/target/.tox-py38-cython/py38-cython/lib/python3.8/site-packages/grpc/_channel.py",
> line 426, in __next__
> 23:32:35 return self._next()
> 23:32:35 File
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/test-suites/tox/py38/build/srcs/sdks/python/target/.tox-py38-cython/py38-cython/lib/python3.8/site-packages/grpc/_channel.py",
> line 826, in _next
> 23:32:35 raise self
> 23:32:35 grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of
> RPC that terminated with:
> 23:32:35 status = StatusCode.UNAVAILABLE
> 23:32:35 details = "Broken pipe"
> 23:32:35 debug_error_string =
> "{"created":"@1626071403.252458842","description":"Error received from peer
> ipv4:127.0.0.1:37459","file":"src/core/lib/surface/call.cc","file_line":1066,"grpc_message":"Broken
> pipe","grpc_status":14}"
> 23:32:35 >
> 23:32:35 Traceback (most recent call last):
> 23:32:35 File
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/test-suites/tox/py38/build/srcs/sdks/python/apache_beam/runners/worker/data_plane.py",
> line 459, in input_elements
> 23:32:35 element = received.get(timeout=1)
> 23:32:35 File "/usr/lib/python3.8/queue.py", line 178, in get
> 23:32:35 raise Empty
> 23:32:35 _queue.Empty
> {code}
>
> `test_pack_combiners` :
> [https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/4415/consoleFull]
> {code:java}
> 11:41:14 Exception ignored in: <object repr() failed>
> 11:41:14 Traceback (most recent call last):
> 11:41:14 File
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/test-suites/tox/py36/build/srcs/sdks/python/target/.tox-py36-cython/py36-cython/lib/python3.6/site-packages/grpc/_channel.py",
> line 444, in __del__
> 11:41:14 with self._state.condition:
> 11:41:14 AttributeError: '_MultiThreadedRendezvous' object has no attribute
> '_state'
> 11:41:14 Traceback (most recent call last):
> 11:41:14 File "apache_beam/runners/common.py", line 1223, in
> apache_beam.runners.common.DoFnRunner.process
> 11:41:14 return self.do_fn_invoker.invoke_process(windowed_value)
> 11:41:14 File "apache_beam/runners/common.py", line 752, in
> apache_beam.runners.common.PerWindowInvoker.invoke_process
> 11:41:14 self._invoke_process_per_window(
> 11:41:14 File "apache_beam/runners/common.py", line 816, in
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
> 11:41:14 [si[global_window] for si in self.side_inputs]))
> 11:41:14 File
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/test-suites/tox/py36/build/srcs/sdks/python/apache_beam/runners/worker/bundle_processor.py",
> line 427, in __getitem__
> 11:41:14 self._cache[target_window] =
> self._side_input_data.view_fn(raw_view)
> 11:41:14 File
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/test-suites/tox/py36/build/srcs/sdks/python/apache_beam/runners/worker/bundle_processor.py",
> line 353, in __iter__
> 11:41:14 self._state_handler.blocking_get(self._state_key,
> self._coder_impl))
> 11:41:14 File
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/test-suites/tox/py36/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
> line 1184, in blocking_get
> 11:41:14 self._partially_cached_iterable(state_key, coder))
> 11:41:14 File
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/test-suites/tox/py36/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
> line 1290, in _partially_cached_iterable
> 11:41:14 data, continuation_token = self._underlying.get_raw(state_key,
> None)
> 11:41:14 File
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/test-suites/tox/py36/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
> line 1057, in get_raw
> 11:41:14 continuation_token=continuation_token)))
> 11:41:14 File
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/test-suites/tox/py36/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
> line 1095, in _blocking_request
> 11:41:14 raise t(v).with_traceback(tb)
> 11:41:14 TypeError: __init__() missing 3 required positional arguments:
> 'call', 'response_deserializer', and 'deadline'
> {code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)