[
https://issues.apache.org/jira/browse/BEAM-9832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094678#comment-17094678
]
Ning Kang commented on BEAM-9832:
---------------------------------
That test expects some work to be completed within a timeout. If Jenkins is
busy during that timeout, the work will not be completed or even started at
all, thus causing all kinds of error scenarios such as mismatch result due to
incompletion of the work, no PCollection due to no work done and etc.
> KeyError: 'No such coder: ' in fn_runner_test
> ---------------------------------------------
>
> Key: BEAM-9832
> URL: https://issues.apache.org/jira/browse/BEAM-9832
> Project: Beam
> Issue Type: Test
> Components: sdk-py-core, test-failures
> Reporter: Ning Kang
> Assignee: Pablo Estrada
> Priority: Critical
> Time Spent: 50m
> Remaining Estimate: 0h
>
> Failed test results can be found
> [here|[https://builds.apache.org/job/beam_PreCommit_Python_Commit/12525/]]
>
> A stack trace:
> {code:java}
> self =
> <apache_beam.runners.portability.fn_api_runner.fn_runner_test.FnApiRunnerTestWithMultiWorkers
> testMethod=test_read>
> def test_read(self):
> # Can't use NamedTemporaryFile as a context
> # due to https://bugs.python.org/issue14243
> temp_file = tempfile.NamedTemporaryFile(delete=False)
> try:
> temp_file.write(b'a\nb\nc')
> temp_file.close()
> with self.create_pipeline() as p:
> assert_that(
> > p | beam.io.ReadFromText(temp_file.name), equal_to(['a', 'b',
> > 'c']))
> apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:625:
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> _
> apache_beam/pipeline.py:529: in __exit__
> self.run().wait_until_finish()
> apache_beam/pipeline.py:502: in run
> self._options).run(False)
> apache_beam/pipeline.py:515: in run
> return self.runner.run_pipeline(self, self._options)
> apache_beam/runners/portability/fn_api_runner/fn_runner.py:173: in
> run_pipeline
> pipeline.to_runner_api(default_environment=self._default_environment))
> apache_beam/runners/portability/fn_api_runner/fn_runner.py:183: in
> run_via_runner_api
> return self.run_stages(stage_context, stages)
> apache_beam/runners/portability/fn_api_runner/fn_runner.py:331: in run_stages
> bundle_context_manager,
> apache_beam/runners/portability/fn_api_runner/fn_runner.py:508: in _run_stage
> bundle_manager)
> apache_beam/runners/portability/fn_api_runner/fn_runner.py:546: in _run_bundle
> data_input, data_output, input_timers, expected_timer_output)
> apache_beam/runners/portability/fn_api_runner/fn_runner.py:927: in
> process_bundle
> timer_inputs)):
> /usr/lib/python3.5/concurrent/futures/_base.py:556: in result_iterator
> yield future.result()
> /usr/lib/python3.5/concurrent/futures/_base.py:405: in result
> return self.__get_result()
> /usr/lib/python3.5/concurrent/futures/_base.py:357: in __get_result
> raise self._exception
> apache_beam/utils/thread_pool_executor.py:44: in run
> self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
> apache_beam/runners/portability/fn_api_runner/fn_runner.py:923: in execute
> dry_run)
> apache_beam/runners/portability/fn_api_runner/fn_runner.py:826: in
> process_bundle
> result_future = self._worker_handler.control_conn.push(process_bundle_req)
> apache_beam/runners/portability/fn_api_runner/worker_handlers.py:353: in push
> response = self.worker.do_instruction(request)
> apache_beam/runners/worker/sdk_worker.py:471: in do_instruction
> getattr(request, request_type), request.instruction_id)
> apache_beam/runners/worker/sdk_worker.py:500: in process_bundle
> instruction_id, request.process_bundle_descriptor_id)
> apache_beam/runners/worker/sdk_worker.py:374: in get
> self.data_channel_factory)
> apache_beam/runners/worker/bundle_processor.py:782: in __init__
> self.ops = self.create_execution_tree(self.process_bundle_descriptor)
> apache_beam/runners/worker/bundle_processor.py:837: in create_execution_tree
> descriptor.transforms, key=topological_height, reverse=True)
> apache_beam/runners/worker/bundle_processor.py:836: in <listcomp>
> (transform_id, get_operation(transform_id)) for transform_id in sorted(
> apache_beam/runners/worker/bundle_processor.py:726: in wrapper
> result = cache[args] = func(*args)
> apache_beam/runners/worker/bundle_processor.py:820: in get_operation
> pcoll_id in descriptor.transforms[transform_id].outputs.items()
> apache_beam/runners/worker/bundle_processor.py:820: in <dictcomp>
> pcoll_id in descriptor.transforms[transform_id].outputs.items()
> apache_beam/runners/worker/bundle_processor.py:818: in <listcomp>
> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
> apache_beam/runners/worker/bundle_processor.py:726: in wrapper
> result = cache[args] = func(*args)
> apache_beam/runners/worker/bundle_processor.py:823: in get_operation
> transform_id, transform_consumers)
> apache_beam/runners/worker/bundle_processor.py:1108: in create_operation
> return creator(self, transform_id, transform_proto, payload, consumers)
> apache_beam/runners/worker/bundle_processor.py:1341: in
> create_pair_with_restriction
> return _create_sdf_operation(PairWithRestriction, *args)
> apache_beam/runners/worker/bundle_processor.py:1404: in _create_sdf_operation
> parameter)
> apache_beam/runners/worker/bundle_processor.py:1501: in
> _create_pardo_operation
> output_coders = factory.get_output_coders(transform_proto)
> apache_beam/runners/worker/bundle_processor.py:1154: in get_output_coders
> pcoll_id in transform_proto.outputs.items()
> apache_beam/runners/worker/bundle_processor.py:1154: in <dictcomp>
> pcoll_id in transform_proto.outputs.items()
> apache_beam/runners/worker/bundle_processor.py:1139: in get_windowed_coder
> coder = self.get_coder(self.descriptor.pcollections[pcoll_id].coder_id)
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> _
> self = <apache_beam.runners.worker.bundle_processor.BeamTransformFactory
> object at 0x7f4248bf5160>
> coder_id = ''
> def get_coder(self, coder_id):
> # type: (str) -> coders.Coder
> if coder_id not in self.descriptor.coders:
> > raise KeyError("No such coder: %s" % coder_id)
> E KeyError: 'No such coder: '
> apache_beam/runners/worker/bundle_processor.py:1128: KeyError
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)