tvalentyn commented on issue #22321:
URL: https://github.com/apache/beam/issues/22321#issuecomment-1284427151
```
def test_pardo_large_input(self):
try:
utils.check_compiled('apache_beam.coders.coder_impl')
except RuntimeError:
self.skipTest(
'<a href="https://github.com/apache/beam/issues/21643:"
style="box-sizing: inherit; text-decoration: var(--link-text-decoration);
font-weight: var(--link-font-weight); overflow-wrap: break-word; color:
var(--link-color);">https://github.com/apache/beam/issues/21643:</a>
FnRunnerTest with '
'non-trivial inputs flakes in non-cython environments')
with self.create_pipeline() as p:
res = (
p
| beam.Create(np.array(range(5000),
dtype=np.int64)).with_output_types(np.int64)
| beam.Map(lambda e: e * 2)
| beam.Map(lambda e: e + 3))
> assert_that(res, equal_to([(i * 2) + 3 for i in range(5000)]))
apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:385:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _
apache_beam/pipeline.py:598: in __exit__
self.result.wait_until_finish()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _
self = <apache_beam.runners.portability.portable_runner.PipelineResult
object at 0x7fa1996045b0>
duration = None
def wait_until_finish(self, duration=None):
"""
:param duration: The maximum time in milliseconds to wait for the
result of
the execution. If None or zero, will wait until the pipeline finishes.
:return: The result of the pipeline, i.e. PipelineResult.
"""
def read_messages():
# type: () -> None
previous_state = -1
for message in self._message_stream:
if message.HasField('message_response'):
logging.log(
MESSAGE_LOG_LEVELS[message.message_response.importance],
"%s",
message.message_response.message_text)
else:
current_state = message.state_response.state
if current_state != previous_state:
_LOGGER.info(
"Job state changed to %s",
self.runner_api_state_to_pipeline_state(current_state))
previous_state = current_state
self._messages.append(message)
message_thread = threading.Thread(
target=read_messages, name='wait_until_finish_read')
message_thread.daemon = True
message_thread.start()
if duration:
state_thread = threading.Thread(
target=functools.partial(self._observe_state, message_thread),
name='wait_until_finish_state_observer')
state_thread.daemon = True
state_thread.start()
start_time = time.time()
duration_secs = duration / 1000
while (time.time() - start_time < duration_secs and
state_thread.is_alive()):
time.sleep(1)
else:
self._observe_state(message_thread)
if self._runtime_exception:
> raise self._runtime_exception
E RuntimeError: Pipeline job-d1c57041-746a-4fee-9a45-f5cd9f6dcf4e
failed in state FAILED: Error running pipeline.
E Traceback (most recent call last):
E File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/local_job_service.py",
line 296, in _run_job
E self.result = self._invoke_runner()
E File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/local_job_service.py",
line 318, in _invoke_runner
E return fn_runner.FnApiRunner(
E File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 212, in run_via_runner_api
E return self.run_stages(stage_context, stages)
E File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 442, in run_stages
E bundle_results = self._execute_bundle(
E File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 770, in _execute_bundle
E self._run_bundle(
E File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 999, in _run_bundle
E result, splits = bundle_manager.process_bundle(
E File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 1348, in process_bundle
E raise RuntimeError(result.error)
E RuntimeError: Traceback (most recent call last):
E File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
line 287, in _execute
E response = task()
E File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
line 360, in <lambda>
E lambda: self.create_worker().do_instruction(request), request)
E File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
line 596, in do_instruction
E return getattr(self, request_type)(
E File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
line 634, in process_bundle
E bundle_processor.process_bundle(instruction_id))
E File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/bundle_processor.py",
line 1003, in process_bundle
E input_op_by_transform_id[element.transform_id].process_encoded(
E File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/bundle_processor.py",
line 225, in process_encoded
E decoded_value = self.windowed_coder_impl.decode_from_stream(
E File "apache_beam/coders/coder_impl.py", line 1446, in
apache_beam.coders.coder_impl.WindowedValueCoderImpl.decode_from_stream
E def decode_from_stream(self, in_stream, nested):
E File "apache_beam/coders/coder_impl.py", line 1465, in
apache_beam.coders.coder_impl.WindowedValueCoderImpl.decode_from_stream
E value = self._value_coder.decode_from_stream(in_stream, nested)
E File "apache_beam/coders/coder_impl.py", line 1009, in
apache_beam.coders.coder_impl.AbstractComponentCoderImpl.decode_from_stream
E c.decode_from_stream(
E File "apache_beam/coders/coder_impl.py", line 1557, in
apache_beam.coders.coder_impl.LengthPrefixCoderImpl.decode_from_stream
E return self._value_coder.decode(in_stream.read(value_length))
E File "apache_beam/coders/coder_impl.py", line 240, in
apache_beam.coders.coder_impl.StreamCoderImpl.decode
E return self.decode_from_stream(create_InputStream(encoded),
False)
E File "apache_beam/coders/coder_impl.py", line 585, in
apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.decode_from_stream
E raise ValueError('Unknown type tag %x' % t)
E ValueError: Unknown type tag df
apache_beam/runners/portability/portable_runner.py:607:
RuntimeError</pre>Regression
apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithExternalEnv.test_pardo_large_input
(from py39-cython)
Failing for the past 1 build (Since
[#3168](https://ci-beam.apache.org/job/beam_PreCommit_Python_Phrase/3168/) )
[Took 7.9
sec.](https://ci-beam.apache.org/job/beam_PreCommit_Python_Phrase/3168/testReport/junit/apache_beam.runners.portability.portable_runner_test/PortableRunnerTestWithExternalEnv/test_pardo_large_input/history)
Error Message
RuntimeError: Pipeline job-d1c57041-746a-4fee-9a45-f5cd9f6dcf4e failed in
state FAILED: Error running pipeline.
Traceback (most recent call last):
File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/local_job_service.py",
line 296, in _run_job
self.result = self._invoke_runner()
File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/local_job_service.py",
line 318, in _invoke_runner
return fn_runner.FnApiRunner(
File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 212, in run_via_runner_api
return self.run_stages(stage_context, stages)
File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 442, in run_stages
bundle_results = self._execute_bundle(
File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 770, in _execute_bundle
self._run_bundle(
File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 999, in _run_bundle
result, splits = bundle_manager.process_bundle(
File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 1348, in process_bundle
raise RuntimeError(result.error)
RuntimeError: Traceback (most recent call last):
File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
line 287, in _execute
response = task()
File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
line 360, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
line 596, in do_instruction
return getattr(self, request_type)(
File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
line 634, in process_bundle
bundle_processor.process_bundle(instruction_id))
File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/bundle_processor.py",
line 1003, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/bundle_processor.py",
line 225, in process_encoded
decoded_value = self.windowed_coder_impl.decode_from_stream(
File "apache_beam/coders/coder_impl.py", line 1446, in
apache_beam.coders.coder_impl.WindowedValueCoderImpl.decode_from_stream
def decode_from_stream(self, in_stream, nested):
File "apache_beam/coders/coder_impl.py", line 1465, in
apache_beam.coders.coder_impl.WindowedValueCoderImpl.decode_from_stream
value = self._value_coder.decode_from_stream(in_stream, nested)
File "apache_beam/coders/coder_impl.py", line 1009, in
apache_beam.coders.coder_impl.AbstractComponentCoderImpl.decode_from_stream
c.decode_from_stream(
File "apache_beam/coders/coder_impl.py", line 1557, in
apache_beam.coders.coder_impl.LengthPrefixCoderImpl.decode_from_stream
return self._value_coder.decode(in_stream.read(value_length))
File "apache_beam/coders/coder_impl.py", line 240, in
apache_beam.coders.coder_impl.StreamCoderImpl.decode
return self.decode_from_stream(create_InputStream(encoded), False)
File "apache_beam/coders/coder_impl.py", line 585, in
apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.decode_from_stream
raise ValueError('Unknown type tag %x' % t)
ValueError: Unknown type tag df
Stacktrace
self =
<apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithExternalEnv
testMethod=test_pardo_large_input>
def test_pardo_large_input(self):
try:
utils.check_compiled('apache_beam.coders.coder_impl')
except RuntimeError:
self.skipTest(
'https://github.com/apache/beam/issues/21643: FnRunnerTest with '
'non-trivial inputs flakes in non-cython environments')
with self.create_pipeline() as p:
res = (
p
| beam.Create(np.array(range(5000),
dtype=np.int64)).with_output_types(np.int64)
| beam.Map(lambda e: e * 2)
| beam.Map(lambda e: e + 3))
> assert_that(res, equal_to([(i * 2) + 3 for i in range(5000)]))
apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:385:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _
apache_beam/pipeline.py:598: in __exit__
self.result.wait_until_finish()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _
self = <apache_beam.runners.portability.portable_runner.PipelineResult
object at 0x7fa1996045b0>
duration = None
def wait_until_finish(self, duration=None):
"""
:param duration: The maximum time in milliseconds to wait for the
result of
the execution. If None or zero, will wait until the pipeline finishes.
:return: The result of the pipeline, i.e. PipelineResult.
"""
def read_messages():
# type: () -> None
previous_state = -1
for message in self._message_stream:
if message.HasField('message_response'):
logging.log(
MESSAGE_LOG_LEVELS[message.message_response.importance],
"%s",
message.message_response.message_text)
else:
current_state = message.state_response.state
if current_state != previous_state:
_LOGGER.info(
"Job state changed to %s",
self.runner_api_state_to_pipeline_state(current_state))
previous_state = current_state
self._messages.append(message)
message_thread = threading.Thread(
target=read_messages, name='wait_until_finish_read')
message_thread.daemon = True
message_thread.start()
if duration:
state_thread = threading.Thread(
target=functools.partial(self._observe_state, message_thread),
name='wait_until_finish_state_observer')
state_thread.daemon = True
state_thread.start()
start_time = time.time()
duration_secs = duration / 1000
while (time.time() - start_time < duration_secs and
state_thread.is_alive()):
time.sleep(1)
else:
self._observe_state(message_thread)
if self._runtime_exception:
> raise self._runtime_exception
E RuntimeError: Pipeline job-d1c57041-746a-4fee-9a45-f5cd9f6dcf4e
failed in state FAILED: Error running pipeline.
E Traceback (most recent call last):
E File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/local_job_service.py",
line 296, in _run_job
E self.result = self._invoke_runner()
E File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/local_job_service.py",
line 318, in _invoke_runner
E return fn_runner.FnApiRunner(
E File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 212, in run_via_runner_api
E return self.run_stages(stage_context, stages)
E File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 442, in run_stages
E bundle_results = self._execute_bundle(
E File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 770, in _execute_bundle
E self._run_bundle(
E File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 999, in _run_bundle
E result, splits = bundle_manager.process_bundle(
E File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 1348, in process_bundle
E raise RuntimeError(result.error)
E RuntimeError: Traceback (most recent call last):
E File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
line 287, in _execute
E response = task()
E File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
line 360, in <lambda>
E lambda: self.create_worker().do_instruction(request), request)
E File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
line 596, in do_instruction
E return getattr(self, request_type)(
E File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
line 634, in process_bundle
E bundle_processor.process_bundle(instruction_id))
E File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/bundle_processor.py",
line 1003, in process_bundle
E input_op_by_transform_id[element.transform_id].process_encoded(
E File
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/bundle_processor.py",
line 225, in process_encoded
E decoded_value = self.windowed_coder_impl.decode_from_stream(
E File "apache_beam/coders/coder_impl.py", line 1446, in
apache_beam.coders.coder_impl.WindowedValueCoderImpl.decode_from_stream
E def decode_from_stream(self, in_stream, nested):
E File "apache_beam/coders/coder_impl.py", line 1465, in
apache_beam.coders.coder_impl.WindowedValueCoderImpl.decode_from_stream
E value = self._value_coder.decode_from_stream(in_stream, nested)
E File "apache_beam/coders/coder_impl.py", line 1009, in
apache_beam.coders.coder_impl.AbstractComponentCoderImpl.decode_from_stream
E c.decode_from_stream(
E File "apache_beam/coders/coder_impl.py", line 1557, in
apache_beam.coders.coder_impl.LengthPrefixCoderImpl.decode_from_stream
E return self._value_coder.decode(in_stream.read(value_length))
E File "apache_beam/coders/coder_impl.py", line 240, in
apache_beam.coders.coder_impl.StreamCoderImpl.decode
E return self.decode_from_stream(create_InputStream(encoded),
False)
E File "apache_beam/coders/coder_impl.py", line 585, in
apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.decode_from_stream
E raise ValueError('Unknown type tag %x' % t)
E ValueError: Unknown type tag df
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]