[
https://issues.apache.org/jira/browse/BEAM-12490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17365135#comment-17365135
]
Brian Hulette commented on BEAM-12490:
--------------------------------------
I found a way to reproduce it reliably - if you run the new gcsio_test,
followed by sdk_worker_test:
{code}
python -m pytest sdks/python/apache_beam/io/gcp/gcsio_test.py
sdks/python/apache_beam/runners/worker/sdk_worker_test.py
{code}
I guess gcio_test is setting some global state that causes sdk_worker test to
fail. Note it doesn't happen if you run sdk_worker_test first, that must be why
this is flaky
> Python PreCommit flaking in
> SdkWorkerTest.test_harness_monitoring_infos_and_metadata
> ------------------------------------------------------------------------------------
>
> Key: BEAM-12490
> URL: https://issues.apache.org/jira/browse/BEAM-12490
> Project: Beam
> Issue Type: Bug
> Components: test-failures
> Reporter: Brian Hulette
> Assignee: Rogelio Miguel Hernandez Sandoval
> Priority: P1
> Labels: flake
>
> SdkWorkerTest.test_harness_monitoring_infos_and_metadata has been very flaky
> over the last few days.
> Based on [test
> history|https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/testReport/junit/apache_beam.runners.worker.sdk_worker_test/SdkWorkerTest/history/]
> it looks like the first flake was in [cron build
> #4303|https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/4303/].
> {code}
> self = <apache_beam.runners.worker.sdk_worker_test.SdkWorkerTest
> testMethod=test_harness_monitoring_infos_and_metadata>
> def test_harness_monitoring_infos_and_metadata(self):
> # Clear the process wide metric container.
> MetricsEnvironment.process_wide_container().reset()
> # Create a process_wide metric.
> urn = 'my.custom.urn'
> labels = {'key': 'value'}
> InternalMetrics.counter(urn=urn, labels=labels,
> process_wide=True).inc(10)
>
> harness_monitoring_infos_request = beam_fn_api_pb2.InstructionRequest(
> instruction_id="monitoring_infos",
>
> harness_monitoring_infos=beam_fn_api_pb2.HarnessMonitoringInfosRequest(
> ))
>
> > responses = self.get_responses([harness_monitoring_infos_request])
> apache_beam/runners/worker/sdk_worker_test.py:236:
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> _
> apache_beam/runners/worker/sdk_worker_test.py:220: in get_responses
> harness.run()
> apache_beam/runners/worker/sdk_worker.py:257: in run
> work_request)
> apache_beam/runners/worker/sdk_worker.py:330: in
> _request_harness_monitoring_infos
> ).to_runner_api_monitoring_infos(None).values()
> apache_beam/metrics/execution.py:312: in to_runner_api_monitoring_infos
> cell in items
> apache_beam/metrics/execution.py:312: in <listcomp>
> cell in items
> apache_beam/metrics/cells.py:76: in to_runner_api_monitoring_info
> mi = self.to_runner_api_monitoring_info_impl(name, transform_id)
> apache_beam/metrics/cells.py:158: in to_runner_api_monitoring_info_impl
> name.urn, self.get_cumulative(), labels=name.labels)
> apache_beam/metrics/monitoring_infos.py:187: in int64_counter
> return create_monitoring_info(urn, SUM_INT64_TYPE, metric, labels)
> apache_beam/metrics/monitoring_infos.py:286: in create_monitoring_info
> urn=urn, type=type_urn, labels=labels or dict(), payload=payload)
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> _
> args = [{'GCS_BUCKET': 'gcsio-test', 'GCS_PROJECT_ID': 1, 'METHOD':
> 'Objects.insert', 'RESOURCE': '//storage.googleapis.com/buckets/gcsio-test',
> ...}]
> kwds = {}
> self = {'GCS_BUCKET': 'gcsio-test', 'METHOD': 'Objects.insert', 'RESOURCE':
> '//storage.googleapis.com/buckets/gcsio-test', 'GCS_PROJECT_ID': '',
> 'SERVICE': 'Storage'}
> other = {'GCS_BUCKET': 'gcsio-test', 'GCS_PROJECT_ID': 1, 'METHOD':
> 'Objects.insert', 'RESOURCE': '//storage.googleapis.com/buckets/gcsio-test',
> ...}
> key = 'GCS_PROJECT_ID'
> def update(*args, **kwds):
> ''' D.update([E, ]**F) -> None. Update D from mapping/iterable E and
> F.
> If E present and has a .keys() method, does: for k in E: D[k]
> = E[k]
> If E present and lacks .keys() method, does: for (k, v) in E:
> D[k] = v
> In either case, this is followed by: for k, v in F.items(): D[k]
> = v
> '''
> if not args:
> raise TypeError("descriptor 'update' of 'MutableMapping' object "
> "needs an argument")
> self, *args = args
> if len(args) > 1:
> raise TypeError('update expected at most 1 arguments, got %d' %
> len(args))
> if args:
> other = args[0]
> if isinstance(other, Mapping):
> for key in other:
> > self[key] = other[key]
> E TypeError: 1 has type int, but expected one of: bytes,
> unicode
> /usr/lib/python3.7/_collections_abc.py:841: TypeError
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)