[
https://issues.apache.org/jira/browse/BEAM-6473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16750702#comment-16750702
]
Maximilian Michels commented on BEAM-6473:
------------------------------------------
This looks like an issue with the Python SDK or the fuser. It fails in
{{QueryablePipeline}}:
https://github.com/apache/beam/blob/fc4c6baff97a1c6efbe5d09a5207184ea1818b3b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java#L227
It seems like {{side3}} is never produced when {{side}} is used as a side input
to another transform. Copying the {{side3}} collection results in the test
passing:
{code:python}
def test_flattened_side_input(self):
with self.create_pipeline() as p:
main = p | 'main' >> beam.Create([None])
side1 = p | 'side1' >> beam.Create([('a', 1)])
side2 = p | 'side2' >> beam.Create([('b', 2)])
side3 = p | 'side3' >> beam.Create(['another type'])
side = (side1, side2) | beam.Flatten()
# Making a copy here works, but using 'side' below in
CheckFlattenOfSideInput does not
side_copy = (side1, side2) | "side_copy" >> beam.Flatten()
assert_that(
main | beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side)),
equal_to([(None, {'a': 1, 'b': 2})]),
label='CheckFlattenAsSideInput')
assert_that(
# This was (side, side3) before
(side_copy, side3) | 'FlattenAfter' >> beam.Flatten(),
equal_to([('a', 1), ('b', 2), ('another type')]),
label='CheckFlattenOfSideInput')
{code}
Do you have a pointer [~robertwb] where to look first?
> Python Flink ValidatesRunner test_flattened_side_input fails
> ------------------------------------------------------------
>
> Key: BEAM-6473
> URL: https://issues.apache.org/jira/browse/BEAM-6473
> Project: Beam
> Issue Type: Test
> Components: runner-flink, sdk-py-core
> Reporter: Maximilian Michels
> Priority: Major
> Time Spent: 50m
> Remaining Estimate: 0h
>
> The {{test_flattened_side_input}} test fails after merging
> [https://github.com/apache/beam/pull/7456]
> {noformat}
> ERROR: test_flattened_side_input (_main_.FlinkRunnerTest)
> ----------------------------------------------------------------------
> Traceback (most recent call last)
> File "apache_beam/runners/portability/fn_api_runner_test.py", line 205, in
> test_flattened_side_input
> label='CheckFlattenOfSideInput')
> File "apache_beam/pipeline.py", line 425, in _exit_
> self.run().wait_until_finish()
> File "apache_beam/runners/portability/portable_runner.py", line 349, in
> wait_until_finish
> self._job_id, self._state, self._last_error_message()))
> RuntimeError: Pipeline
> test_flattened_side_input_1547859357.36_07dcde9b-acfc-4e8d-b930-582f7637a07e
> failed in state FAILED: java.lang.IllegalArgumentException: PCollectionNodes
> [PCollectionNode
> {id=ref_PCollection_PCollection_12, PCollection=unique_name:
> "17side3/Map(decode).None" coder_id: "ref_Coder_BytesCoder_1" is_bounded:
> BOUNDED windowing_strategy_id: "ref_Windowing_Windowing_1" }
> ] were consumed but never produced
> {noformat}
> [https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink_PR/134/console]
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)