tvalentyn commented on issue #22321:
URL: https://github.com/apache/beam/issues/22321#issuecomment-1284427151

   ```
       def test_pardo_large_input(self):
         try:
           utils.check_compiled('apache_beam.coders.coder_impl')
         except RuntimeError:
           self.skipTest(
               '<a href="https://github.com/apache/beam/issues/21643:"; 
style="box-sizing: inherit; text-decoration: var(--link-text-decoration); 
font-weight: var(--link-font-weight); overflow-wrap: break-word; color: 
var(--link-color);">https://github.com/apache/beam/issues/21643:</a> 
FnRunnerTest with '
               'non-trivial inputs flakes in non-cython environments')
         with self.create_pipeline() as p:
           res = (
               p
               | beam.Create(np.array(range(5000),
                                      
dtype=np.int64)).with_output_types(np.int64)
               | beam.Map(lambda e: e * 2)
               | beam.Map(lambda e: e + 3))
   &gt;       assert_that(res, equal_to([(i * 2) + 3 for i in range(5000)]))
   
   apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:385: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   apache_beam/pipeline.py:598: in __exit__
       self.result.wait_until_finish()
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   self = &lt;apache_beam.runners.portability.portable_runner.PipelineResult 
object at 0x7fa1996045b0&gt;
   duration = None
   
       def wait_until_finish(self, duration=None):
         """
         :param duration: The maximum time in milliseconds to wait for the 
result of
         the execution. If None or zero, will wait until the pipeline finishes.
         :return: The result of the pipeline, i.e. PipelineResult.
         """
         def read_messages():
           # type: () -&gt; None
           previous_state = -1
           for message in self._message_stream:
             if message.HasField('message_response'):
               logging.log(
                   MESSAGE_LOG_LEVELS[message.message_response.importance],
                   "%s",
                   message.message_response.message_text)
             else:
               current_state = message.state_response.state
               if current_state != previous_state:
                 _LOGGER.info(
                     "Job state changed to %s",
                     self.runner_api_state_to_pipeline_state(current_state))
                 previous_state = current_state
             self._messages.append(message)
       
         message_thread = threading.Thread(
             target=read_messages, name='wait_until_finish_read')
         message_thread.daemon = True
         message_thread.start()
       
         if duration:
           state_thread = threading.Thread(
               target=functools.partial(self._observe_state, message_thread),
               name='wait_until_finish_state_observer')
           state_thread.daemon = True
           state_thread.start()
           start_time = time.time()
           duration_secs = duration / 1000
           while (time.time() - start_time &lt; duration_secs and
                  state_thread.is_alive()):
             time.sleep(1)
         else:
           self._observe_state(message_thread)
       
         if self._runtime_exception:
   &gt;       raise self._runtime_exception
   E       RuntimeError: Pipeline job-d1c57041-746a-4fee-9a45-f5cd9f6dcf4e 
failed in state FAILED: Error running pipeline.
   E       Traceback (most recent call last):
   E         File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/local_job_service.py",
 line 296, in _run_job
   E           self.result = self._invoke_runner()
   E         File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/local_job_service.py",
 line 318, in _invoke_runner
   E           return fn_runner.FnApiRunner(
   E         File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 212, in run_via_runner_api
   E           return self.run_stages(stage_context, stages)
   E         File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 442, in run_stages
   E           bundle_results = self._execute_bundle(
   E         File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 770, in _execute_bundle
   E           self._run_bundle(
   E         File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 999, in _run_bundle
   E           result, splits = bundle_manager.process_bundle(
   E         File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 1348, in process_bundle
   E           raise RuntimeError(result.error)
   E       RuntimeError: Traceback (most recent call last):
   E         File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 287, in _execute
   E           response = task()
   E         File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 360, in &lt;lambda&gt;
   E           lambda: self.create_worker().do_instruction(request), request)
   E         File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 596, in do_instruction
   E           return getattr(self, request_type)(
   E         File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 634, in process_bundle
   E           bundle_processor.process_bundle(instruction_id))
   E         File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/bundle_processor.py",
 line 1003, in process_bundle
   E           input_op_by_transform_id[element.transform_id].process_encoded(
   E         File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/bundle_processor.py",
 line 225, in process_encoded
   E           decoded_value = self.windowed_coder_impl.decode_from_stream(
   E         File "apache_beam/coders/coder_impl.py", line 1446, in 
apache_beam.coders.coder_impl.WindowedValueCoderImpl.decode_from_stream
   E           def decode_from_stream(self, in_stream, nested):
   E         File "apache_beam/coders/coder_impl.py", line 1465, in 
apache_beam.coders.coder_impl.WindowedValueCoderImpl.decode_from_stream
   E           value = self._value_coder.decode_from_stream(in_stream, nested)
   E         File "apache_beam/coders/coder_impl.py", line 1009, in 
apache_beam.coders.coder_impl.AbstractComponentCoderImpl.decode_from_stream
   E           c.decode_from_stream(
   E         File "apache_beam/coders/coder_impl.py", line 1557, in 
apache_beam.coders.coder_impl.LengthPrefixCoderImpl.decode_from_stream
   E           return self._value_coder.decode(in_stream.read(value_length))
   E         File "apache_beam/coders/coder_impl.py", line 240, in 
apache_beam.coders.coder_impl.StreamCoderImpl.decode
   E           return self.decode_from_stream(create_InputStream(encoded), 
False)
   E         File "apache_beam/coders/coder_impl.py", line 585, in 
apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.decode_from_stream
   E           raise ValueError('Unknown type tag %x' % t)
   E       ValueError: Unknown type tag df
   
   apache_beam/runners/portability/portable_runner.py:607: 
RuntimeError</pre>Regression
   
apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithExternalEnv.test_pardo_large_input
 (from py39-cython)
   
   Failing for the past 1 build (Since 
[#3168](https://ci-beam.apache.org/job/beam_PreCommit_Python_Phrase/3168/) )
   [Took 7.9 
sec.](https://ci-beam.apache.org/job/beam_PreCommit_Python_Phrase/3168/testReport/junit/apache_beam.runners.portability.portable_runner_test/PortableRunnerTestWithExternalEnv/test_pardo_large_input/history)
   Error Message
   RuntimeError: Pipeline job-d1c57041-746a-4fee-9a45-f5cd9f6dcf4e failed in 
state FAILED: Error running pipeline.
   Traceback (most recent call last):
     File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/local_job_service.py",
 line 296, in _run_job
       self.result = self._invoke_runner()
     File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/local_job_service.py",
 line 318, in _invoke_runner
       return fn_runner.FnApiRunner(
     File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 212, in run_via_runner_api
       return self.run_stages(stage_context, stages)
     File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 442, in run_stages
       bundle_results = self._execute_bundle(
     File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 770, in _execute_bundle
       self._run_bundle(
     File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 999, in _run_bundle
       result, splits = bundle_manager.process_bundle(
     File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 1348, in process_bundle
       raise RuntimeError(result.error)
   RuntimeError: Traceback (most recent call last):
     File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 287, in _execute
       response = task()
     File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 360, in <lambda>
       lambda: self.create_worker().do_instruction(request), request)
     File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 596, in do_instruction
       return getattr(self, request_type)(
     File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 634, in process_bundle
       bundle_processor.process_bundle(instruction_id))
     File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/bundle_processor.py",
 line 1003, in process_bundle
       input_op_by_transform_id[element.transform_id].process_encoded(
     File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/bundle_processor.py",
 line 225, in process_encoded
       decoded_value = self.windowed_coder_impl.decode_from_stream(
     File "apache_beam/coders/coder_impl.py", line 1446, in 
apache_beam.coders.coder_impl.WindowedValueCoderImpl.decode_from_stream
       def decode_from_stream(self, in_stream, nested):
     File "apache_beam/coders/coder_impl.py", line 1465, in 
apache_beam.coders.coder_impl.WindowedValueCoderImpl.decode_from_stream
       value = self._value_coder.decode_from_stream(in_stream, nested)
     File "apache_beam/coders/coder_impl.py", line 1009, in 
apache_beam.coders.coder_impl.AbstractComponentCoderImpl.decode_from_stream
       c.decode_from_stream(
     File "apache_beam/coders/coder_impl.py", line 1557, in 
apache_beam.coders.coder_impl.LengthPrefixCoderImpl.decode_from_stream
       return self._value_coder.decode(in_stream.read(value_length))
     File "apache_beam/coders/coder_impl.py", line 240, in 
apache_beam.coders.coder_impl.StreamCoderImpl.decode
       return self.decode_from_stream(create_InputStream(encoded), False)
     File "apache_beam/coders/coder_impl.py", line 585, in 
apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.decode_from_stream
       raise ValueError('Unknown type tag %x' % t)
   ValueError: Unknown type tag df
   Stacktrace
   self = 
<apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithExternalEnv
 testMethod=test_pardo_large_input>
   
       def test_pardo_large_input(self):
         try:
           utils.check_compiled('apache_beam.coders.coder_impl')
         except RuntimeError:
           self.skipTest(
               'https://github.com/apache/beam/issues/21643: FnRunnerTest with '
               'non-trivial inputs flakes in non-cython environments')
         with self.create_pipeline() as p:
           res = (
               p
               | beam.Create(np.array(range(5000),
                                      
dtype=np.int64)).with_output_types(np.int64)
               | beam.Map(lambda e: e * 2)
               | beam.Map(lambda e: e + 3))
   >       assert_that(res, equal_to([(i * 2) + 3 for i in range(5000)]))
   
   apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:385: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   apache_beam/pipeline.py:598: in __exit__
       self.result.wait_until_finish()
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   self = <apache_beam.runners.portability.portable_runner.PipelineResult 
object at 0x7fa1996045b0>
   duration = None
   
       def wait_until_finish(self, duration=None):
         """
         :param duration: The maximum time in milliseconds to wait for the 
result of
         the execution. If None or zero, will wait until the pipeline finishes.
         :return: The result of the pipeline, i.e. PipelineResult.
         """
         def read_messages():
           # type: () -> None
           previous_state = -1
           for message in self._message_stream:
             if message.HasField('message_response'):
               logging.log(
                   MESSAGE_LOG_LEVELS[message.message_response.importance],
                   "%s",
                   message.message_response.message_text)
             else:
               current_state = message.state_response.state
               if current_state != previous_state:
                 _LOGGER.info(
                     "Job state changed to %s",
                     self.runner_api_state_to_pipeline_state(current_state))
                 previous_state = current_state
             self._messages.append(message)
       
         message_thread = threading.Thread(
             target=read_messages, name='wait_until_finish_read')
         message_thread.daemon = True
         message_thread.start()
       
         if duration:
           state_thread = threading.Thread(
               target=functools.partial(self._observe_state, message_thread),
               name='wait_until_finish_state_observer')
           state_thread.daemon = True
           state_thread.start()
           start_time = time.time()
           duration_secs = duration / 1000
           while (time.time() - start_time < duration_secs and
                  state_thread.is_alive()):
             time.sleep(1)
         else:
           self._observe_state(message_thread)
       
         if self._runtime_exception:
   >       raise self._runtime_exception
   E       RuntimeError: Pipeline job-d1c57041-746a-4fee-9a45-f5cd9f6dcf4e 
failed in state FAILED: Error running pipeline.
   E       Traceback (most recent call last):
   E         File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/local_job_service.py",
 line 296, in _run_job
   E           self.result = self._invoke_runner()
   E         File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/local_job_service.py",
 line 318, in _invoke_runner
   E           return fn_runner.FnApiRunner(
   E         File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 212, in run_via_runner_api
   E           return self.run_stages(stage_context, stages)
   E         File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 442, in run_stages
   E           bundle_results = self._execute_bundle(
   E         File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 770, in _execute_bundle
   E           self._run_bundle(
   E         File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 999, in _run_bundle
   E           result, splits = bundle_manager.process_bundle(
   E         File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 1348, in process_bundle
   E           raise RuntimeError(result.error)
   E       RuntimeError: Traceback (most recent call last):
   E         File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 287, in _execute
   E           response = task()
   E         File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 360, in <lambda>
   E           lambda: self.create_worker().do_instruction(request), request)
   E         File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 596, in do_instruction
   E           return getattr(self, request_type)(
   E         File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 634, in process_bundle
   E           bundle_processor.process_bundle(instruction_id))
   E         File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/bundle_processor.py",
 line 1003, in process_bundle
   E           input_op_by_transform_id[element.transform_id].process_encoded(
   E         File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/runners/worker/bundle_processor.py",
 line 225, in process_encoded
   E           decoded_value = self.windowed_coder_impl.decode_from_stream(
   E         File "apache_beam/coders/coder_impl.py", line 1446, in 
apache_beam.coders.coder_impl.WindowedValueCoderImpl.decode_from_stream
   E           def decode_from_stream(self, in_stream, nested):
   E         File "apache_beam/coders/coder_impl.py", line 1465, in 
apache_beam.coders.coder_impl.WindowedValueCoderImpl.decode_from_stream
   E           value = self._value_coder.decode_from_stream(in_stream, nested)
   E         File "apache_beam/coders/coder_impl.py", line 1009, in 
apache_beam.coders.coder_impl.AbstractComponentCoderImpl.decode_from_stream
   E           c.decode_from_stream(
   E         File "apache_beam/coders/coder_impl.py", line 1557, in 
apache_beam.coders.coder_impl.LengthPrefixCoderImpl.decode_from_stream
   E           return self._value_coder.decode(in_stream.read(value_length))
   E         File "apache_beam/coders/coder_impl.py", line 240, in 
apache_beam.coders.coder_impl.StreamCoderImpl.decode
   E           return self.decode_from_stream(create_InputStream(encoded), 
False)
   E         File "apache_beam/coders/coder_impl.py", line 585, in 
apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.decode_from_stream
   E           raise ValueError('Unknown type tag %x' % t)
   E       ValueError: Unknown type tag df
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to