lostluck commented on PR #23093:
URL: https://github.com/apache/beam/pull/23093#issuecomment-1240994046
Python Precommit fails:
> Task :sdks:python:test-suites:tox:py37:testPy37Cloud
.ss...ss........................... [ 52%]
.................s.....s........s......Exception in thread Thread-1769:
Traceback (most recent call last):
File "/usr/lib/python3.7/threading.py", line 926, in _bootstrap_inner
self.run()
File "/usr/lib/python3.7/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/portability/local_job_service.py",
line 296, in _run_job
self.result = self._invoke_runner()
File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/portability/local_job_service.py",
line 320, in _invoke_runner
self._pipeline_proto, self.pipeline_options())
File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 212, in run_via_runner_api
return self.run_stages(stage_context, stages)
File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 443, in run_stages
runner_execution_context, bundle_context_manager, bundle_input)
File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 776, in _execute_bundle
bundle_manager))
File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 1000, in _run_bundle
data_input, data_output, input_timers, expected_timer_output)
File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 1348, in process_bundle
raise RuntimeError(result.error)
RuntimeError: Traceback (most recent call last):
File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/common.py",
line 1417, in process
return self.do_fn_invoker.invoke_process(windowed_value)
File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/common.py",
line 838, in invoke_process
windowed_value, additional_args, additional_kwargs)
File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/common.py",
line 983, in _invoke_process_per_window
self.process_method(*args_for_process, **kwargs_for_process),
File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/transforms/core.py",
line 1877, in <lambda>
wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/testing/util.py",
line 191, in _equal
raise BeamAssertException(msg)
apache_beam.testing.util.BeamAssertException: Failed assert: ['a'] == ['a',
'b'], unexpected elements ['b']
|
---
> Task :sdks:python:test-suites:tox:py37:testPy37Cloud FAILED
============================= test session starts
==============================
platform linux -- Python 3.7.12, pytest-7.1.3, pluggy-1.0.0
cachedir: target/.tox-py37-cloud/py37-cloud/.pytest_cache
rootdir:
/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python,
configfile: pytest.ini
plugins: xdist-2.5.0, timeout-2.1.0, forked-1.4.0, requests-mock-1.10.0
timeout: 600.0s
timeout method: signal
timeout func_only: False
collected 6657 items / 6576 deselected / 2 skipped / 81 selected
apache_beam/coders/fast_coders_test.py ................................. [
40%]
.. [
43%]
apache_beam/coders/slow_coders_test.py ................................. [
83%]
.. [
86%]
apache_beam/examples/complete/autocomplete_it_test.py s [
87%]
apache_beam/examples/complete/estimate_pi_it_test.py s [
88%]
apache_beam/examples/complete/top_wikipedia_sessions_it_test.py s [
90%]
apache_beam/examples/complete/game/hourly_team_score_it_test.py s [
91%]
apache_beam/examples/complete/game/user_score_it_test.py s [
92%]
apache_beam/examples/cookbook/bigquery_side_input_it_test.py s [
93%]
apache_beam/examples/cookbook/coders_it_test.py s [
95%]
apache_beam/runners/dataflow/dataflow_exercise_streaming_metrics_pipeline_test.py
s [ 96%]
[
96%]
apache_beam/runners/portability/stager_test.py .. [
98%]
apache_beam/transforms/userstate_test.py F
[100%]
=================================== FAILURES
===================================
____ StatefulDoFnOnDirectRunnerTest.test_dynamic_timer_clear_then_set_timer
____
self = <apache_beam.transforms.userstate_test.StatefulDoFnOnDirectRunnerTest
testMethod=test_dynamic_timer_clear_then_set_timer>
@pytest.mark.no_xdist
@pytest.mark.timeout(3)
def test_dynamic_timer_clear_then_set_timer(self):
class EmitTwoEvents(DoFn):
EMIT_CLEAR_SET_TIMER = TimerSpec('emitclear', TimeDomain.WATERMARK)
def process(self, element,
emit=DoFn.TimerParam(EMIT_CLEAR_SET_TIMER)):
yield ('1', 'set')
emit.set(1)
@on_timer(EMIT_CLEAR_SET_TIMER)
def emit_clear(self):
yield ('1', 'clear')
class DynamicTimerDoFn(DoFn):
EMIT_TIMER_FAMILY = TimerSpec('emit', TimeDomain.WATERMARK)
def process(self, element, emit=DoFn.TimerParam(EMIT_TIMER_FAMILY)):
if element[1] == 'set':
emit.set(10, dynamic_timer_tag='emit1')
emit.set(20, dynamic_timer_tag='emit2')
if element[1] == 'clear':
emit.set(30, dynamic_timer_tag='emit3')
emit.clear(dynamic_timer_tag='emit3')
emit.set(40, dynamic_timer_tag='emit3')
return []
@on_timer(EMIT_TIMER_FAMILY)
def emit_callback(
self, ts=DoFn.TimestampParam, tag=DoFn.DynamicTimerTagParam):
yield (tag, ts)
with TestPipeline() as p:
res = (
p
| beam.Create([('1', 'impulse')])
| beam.ParDo(EmitTwoEvents())
| beam.ParDo(DynamicTimerDoFn()))
> assert_that(res, equal_to([('emit1', 10), ('emit2', 20), ('emit3',
40)]))
apache_beam/transforms/userstate_test.py:1033:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _
apache_beam/pipeline.py:597: in __exit__
self.result = self.run()
apache_beam/testing/test_pipeline.py:114: in run
False if self.not_use_test_runner_api else test_runner_api))
apache_beam/pipeline.py:550: in run
self._options).run(False)
apache_beam/pipeline.py:574: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/direct/direct_runner.py:131: in run_pipeline
return runner.run_pipeline(pipeline, options)
apache_beam/runners/portability/fn_api_runner/fn_runner.py:201: in
run_pipeline
options)
apache_beam/runners/portability/fn_api_runner/fn_runner.py:212: in
run_via_runner_api
return self.run_stages(stage_context, stages)
apache_beam/runners/portability/fn_api_runner/fn_runner.py:443: in run_stages
runner_execution_context, bundle_context_manager, bundle_input)
apache_beam/runners/portability/fn_api_runner/fn_runner.py:776: in
_execute_bundle
bundle_manager))
apache_beam/runners/portability/fn_api_runner/fn_runner.py:1000: in
_run_bundle
data_input, data_output, input_timers, expected_timer_output)
apache_beam/runners/portability/fn_api_runner/fn_runner.py:1309: in
process_bundle
result_future =
self._worker_handler.control_conn.push(process_bundle_req)
apache_beam/runners/portability/fn_api_runner/worker_handlers.py:379: in push
response = self.worker.do_instruction(request)
apache_beam/runners/worker/sdk_worker.py:597: in do_instruction
getattr(request, request_type), request.instruction_id)
apache_beam/runners/worker/sdk_worker.py:628: in process_bundle
instruction_id, request.process_bundle_descriptor_id)
apache_beam/runners/worker/sdk_worker.py:463: in get
self.data_channel_factory)
apache_beam/runners/worker/bundle_processor.py:873: in __init__
op.setup()
apache_beam/runners/worker/operations.py:881: in setup
operation_name=self.name_context.metrics_name())
apache_beam/runners/common.py:1366: in __init__
do_fn_signature = DoFnSignature(fn)
apache_beam/runners/common.py:293: in __init__
self._is_stateful_dofn = userstate.is_stateful_dofn(do_fn)
apache_beam/transforms/userstate.py:274: in is_stateful_dofn
all_state_specs, all_timer_specs = get_dofn_specs(dofn)
apache_beam/transforms/userstate.py:251: in get_dofn_specs
method = MethodWrapper(dofn, method_name)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _
self = <apache_beam.runners.common.MethodWrapper object at 0x7fafc7c10390>
obj_to_invoke = <apache_beam.runners.worker.bundle_processor.WindowIntoDoFn
object at 0x7fafc7c107d0>
method_name = 'with_input_types'
def __init__(self, obj_to_invoke, method_name):
"""
Initiates a ``MethodWrapper``.
Args:
obj_to_invoke: the object that contains the method. Has to either be
a
`DoFn` object or a `RestrictionProvider` object.
method_name: name of the method as a string.
"""
if not isinstance(obj_to_invoke,
(DoFn, RestrictionProvider,
WatermarkEstimatorProvider)):
raise ValueError(
'\'obj_to_invoke\' has to be either a \'DoFn\' or '
'a \'RestrictionProvider\'. Received %r instead.' %
obj_to_invoke)
self.args, self.defaults = core.get_function_arguments(obj_to_invoke,
method_name)
# TODO(BEAM-5878) support kwonlyargs on Python 3.
self.method_value = getattr(obj_to_invoke, method_name)
self.method_name = method_name
self.has_userstate_arguments = False
self.state_args_to_replace = {} # type: Dict[str, core.StateSpec]
self.timer_args_to_replace = {} # type: Dict[str, core.TimerSpec]
self.timestamp_arg_name = None # type: Optional[str]
self.window_arg_name = None # type: Optional[str]
self.key_arg_name = None # type: Optional[str]
self.restriction_provider = None
self.restriction_provider_arg_name = None
self.watermark_estimator_provider = None
self.watermark_estimator_provider_arg_name = None
self.dynamic_timer_tag_arg_name = None
if hasattr(self.method_value, 'unbounded_per_element'):
self.unbounded_per_element = True
else:
> self.unbounded_per_element = False
E Failed: Timeout >3.0s
apache_beam/runners/common.py:168: Failed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]