redfungus opened a new issue, #25860:
URL: https://github.com/apache/beam/issues/25860

   ### What happened?
   
   Using a side input in the form in WriteToText with Dataflow as the runner 
causes an error.
   
   ` File 
".venv\lib\site-packages\apache_beam\runners\dataflow\dataflow_runner.py", line 
877, in run_ParDo
       step = self._add_step(
     File 
".venv\lib\site-packages\apache_beam\runners\dataflow\dataflow_runner.py", line 
652, in _add_step
       [
     File 
".venv\lib\site-packages\apache_beam\runners\dataflow\dataflow_runner.py", line 
653, in <listcomp>
       item.get_dict()
     File ".venv\lib\site-packages\apache_beam\transforms\display.py", line 
370, in get_dict
       self.is_valid()
     File ".venv\lib\site-packages\apache_beam\transforms\display.py", line 
336, in is_valid
       raise ValueError(
   ValueError: Invalid DisplayDataItem. Value <apache_beam.pvalue.AsDict object 
at 0x000001AA22BED610> is of an unsupported type.`
   
   The pipeline works fine when running locally but fails when using a Dataflow 
runner. Tested with all the different `beam.pvalue.As...` too and it still 
happens.
   
   Reverting to an older version of the SDK fixed the problem. (Specifically, 
2.40.0, I didn't test other versions)
   
   SDK version with the error: 2.46.0
   Python version used: 3.9.13
   
   Code of the whole pipeline:
   
   `#!/usr/bin/env python
   #
   # Copyright 2020 Google LLC
   #
   # Licensed under the Apache License, Version 2.0 (the "License");
   # you may not use this file except in compliance with the License.
   # You may obtain a copy of the License at
   #
   #     http://www.apache.org/licenses/LICENSE-2.0
   #
   # Unless required by applicable law or agreed to in writing, software
   # distributed under the License is distributed on an "AS IS" BASIS,
   # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   # See the License for the specific language governing permissions and
   # limitations under the License.
   
   
   import argparse
   import json
   import logging
   from typing import Any, Dict, List
   
   import apache_beam as beam
   import apache_beam.io.fileio
   from apache_beam.options.pipeline_options import PipelineOptions
   
   def is_valid(json_file):
     """
     This function is used as a filter.
     
       Args:
           json_file: json file corresponding to one episode.
   
       Returns:
           True if the episode is valid, False otherwise.
     """
     with json_file.open() as f:
       first_line = f.readline()
       first_line_json = json.loads(first_line)
       return first_line_json['username'] == "username"
   
   
   def run(
       input_raw_data: str,
       output_path: str,
       beam_args: List[str] = None,
   ) -> None:
       """Build and run the pipeline."""
       options = PipelineOptions(beam_args, save_main_session=True, 
streaming=False)
   
       with beam.Pipeline(options=options) as pipeline:
           filtered_files = (
               pipeline
               | 'Match file paths' >> beam.io.fileio.MatchFiles(input_raw_data)
               | 'Read file paths' >> beam.io.fileio.ReadMatches()
               | 'Filter correct episodes' >> beam.Filter(is_valid)
           )
           file_count = (
               filtered_files
               | 'Count files' >> beam.combiners.Count.Globally()
           )
           #removing the file_count_integer as a side input fixes the error.
           file_count_integer = beam.pvalue.AsSingleton(file_count)
           (   
               filtered_files
               | 'Read files' >> beam.Map(lambda x: x.read_utf8())
               | 'Write to files' >> beam.io.WriteToText(
                   output_path,
                   file_name_suffix='.json',
                   num_shards=file_count_integer)
           )
   
   
   if __name__ == "__main__":
       logging.getLogger().setLevel(logging.INFO)
   
       parser = argparse.ArgumentParser()
       parser.add_argument(
           "--output-path",
           required=True,
           help="Name of the folder to output the cleaned data files. ",
       )
       parser.add_argument(
           "--input-raw-data",
           required=True,
           help="Name of the folder containing the raw data files." ,
       )
       args, beam_args = parser.parse_known_args()
   
       run(
           input_raw_data=args.input_raw_data,
           output_path=args.output_path,
           beam_args=beam_args,
       )
   `
   
   Command used to run:
   
   `
   python -m script.py --region europe-west1 --input gs://path-to-files/*.json 
--output gs://path-to-utputs/results/outputs --runner DataflowRunner --project 
google_project --temp_location gs://temp_bucket/tmp/
   `
   
   ### Issue Priority
   
   Priority: 1 (data loss / total loss of function)
   
   ### Issue Components
   
   - [X] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to