[ 
https://issues.apache.org/jira/browse/AIRFLOW-5689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jannik Franz updated AIRFLOW-5689:
----------------------------------
    Description: 
When running Apache Beam with Python3 on Google Cloud Dataflow Sideinputs don't 
work.

When testing it in the local/direct runner there seems to be no issue.

 

 
{code:java}
class FlattenCustomActions(beam.PTransform):
    """ Transforms Facebook Day Actions        Only retains actions with 
custom_conversions
        Flattens the actions
        Adds custom conversions names using a side input
    """    
    def __init__(self, conversions):
        super(FlattenCustomActions, self).__init__()
        self.conversions = conversions    def expand(self, input_or_inputs):
        return (
            input_or_inputs
            | "FlattenActions" >> beam.ParDo(flatten_filter_actions)
            | "AddConversionName" >> beam.Map(add_conversion_name, 
self.conversions)
        )

# ...
# in run():
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    p = beam.Pipeline(options=pipeline_options)    conversions_output = (
        p
        | "ReadConversions" >> ReadFromText(known_args.input_conversions, 
coder=JsonCoder())
        | TransformConversionMetadata()
    )    (
        conversions_output
        | "WriteConversions"
        >> WriteCoerced(
            known_args.output_conversions,
            known_args.output_type,
            schema_path=BIGQUERY_SCHEMA_CONVERSIONS_PATH,
        )
    )    (
        p
        | ReadFacebookJson(known_args.input, retain_root_fields=True)
        | FlattenCustomActions(beam.pvalue.AsList(conversions_output))
        | "WriteActions"
        >> WriteCoerced(
            known_args.output, known_args.output_type, 
schema_path=BIGQUERY_SCHEMA_ACTIONS_PATH
        )
    ){code}
 

I receive the following Traceback in Dataflow:
{code:java}
Traceback (most recent call last): File 
"/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py", line 
773, in run self._load_main_session(self.local_staging_directory) File 
"/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py", line 
489, in _load_main_session pickler.load_session(session_file) File 
"/usr/local/lib/python3.6/site-packages/apache_beam/internal/pickler.py", line 
287, in load_session return dill.load_session(file_path) File 
"/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 410, in 
load_session module = unpickler.load() File 
"/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 474, in find_class 
return StockUnpickler.find_class(self, module, name) AttributeError: Can't get 
attribute 'FlattenCustomActions' on <module 'dataflow_worker.start' from 
'/usr/local/lib/python3.6/site-packages/dataflow_worker/start.py'>
{code}
 

 

 

  was:
When running Apache Beam with Python3 on Google Cloud Dataflow Sideinputs don't 
work.

When testing it in the local/direct runner there seems to be no issue.

 

 
{code:java}
class FlattenCustomActions(beam.PTransform):
    """ Transforms Facebook Day Actions        Only retains actions with 
custom_conversions
        Flattens the actions
        Adds custom conversions names using a side input
    """    def __init__(self, conversions):
        super(FlattenCustomActions, self).__init__()
        self.conversions = conversions    def expand(self, input_or_inputs):
        return (
            input_or_inputs
            | "FlattenActions" >> beam.ParDo(flatten_filter_actions)
            | "AddConversionName" >> beam.Map(add_conversion_name, 
self.conversions)
        )

# ...
# in run():
pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    p = beam.Pipeline(options=pipeline_options)    conversions_output = (
        p
        | "ReadConversions" >> ReadFromText(known_args.input_conversions, 
coder=JsonCoder())
        | TransformConversionMetadata()
    )    (
        conversions_output
        | "WriteConversions"
        >> WriteCoerced(
            known_args.output_conversions,
            known_args.output_type,
            schema_path=BIGQUERY_SCHEMA_CONVERSIONS_PATH,
        )
    )    (
        p
        | ReadFacebookJson(known_args.input, retain_root_fields=True)
        | FlattenCustomActions(beam.pvalue.AsList(conversions_output))
        | "WriteActions"
        >> WriteCoerced(
            known_args.output, known_args.output_type, 
schema_path=BIGQUERY_SCHEMA_ACTIONS_PATH
        )
    ){code}
 

I receive the following Traceback in Dataflow:
{code:java}
Traceback (most recent call last): File 
"/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py", line 
773, in run self._load_main_session(self.local_staging_directory) File 
"/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py", line 
489, in _load_main_session pickler.load_session(session_file) File 
"/usr/local/lib/python3.6/site-packages/apache_beam/internal/pickler.py", line 
287, in load_session return dill.load_session(file_path) File 
"/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 410, in 
load_session module = unpickler.load() File 
"/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 474, in find_class 
return StockUnpickler.find_class(self, module, name) AttributeError: Can't get 
attribute 'FlattenCustomActions' on <module 'dataflow_worker.start' from 
'/usr/local/lib/python3.6/site-packages/dataflow_worker/start.py'>
{code}
 

 

 


> Side-Input in Python3 fails to pickle class
> -------------------------------------------
>
>                 Key: AIRFLOW-5689
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-5689
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: gcp
>    Affects Versions: 1.10.5
>         Environment: python3,beam 2.16.0
>            Reporter: Jannik Franz
>            Priority: Major
>
> When running Apache Beam with Python3 on Google Cloud Dataflow Sideinputs 
> don't work.
> When testing it in the local/direct runner there seems to be no issue.
>  
>  
> {code:java}
> class FlattenCustomActions(beam.PTransform):
>     """ Transforms Facebook Day Actions        Only retains actions with 
> custom_conversions
>         Flattens the actions
>         Adds custom conversions names using a side input
>     """    
>     def __init__(self, conversions):
>         super(FlattenCustomActions, self).__init__()
>         self.conversions = conversions    def expand(self, input_or_inputs):
>         return (
>             input_or_inputs
>             | "FlattenActions" >> beam.ParDo(flatten_filter_actions)
>             | "AddConversionName" >> beam.Map(add_conversion_name, 
> self.conversions)
>         )
> # ...
> # in run():
>     pipeline_options = PipelineOptions(pipeline_args)
>     pipeline_options.view_as(SetupOptions).save_main_session = True
>     p = beam.Pipeline(options=pipeline_options)    conversions_output = (
>         p
>         | "ReadConversions" >> ReadFromText(known_args.input_conversions, 
> coder=JsonCoder())
>         | TransformConversionMetadata()
>     )    (
>         conversions_output
>         | "WriteConversions"
>         >> WriteCoerced(
>             known_args.output_conversions,
>             known_args.output_type,
>             schema_path=BIGQUERY_SCHEMA_CONVERSIONS_PATH,
>         )
>     )    (
>         p
>         | ReadFacebookJson(known_args.input, retain_root_fields=True)
>         | FlattenCustomActions(beam.pvalue.AsList(conversions_output))
>         | "WriteActions"
>         >> WriteCoerced(
>             known_args.output, known_args.output_type, 
> schema_path=BIGQUERY_SCHEMA_ACTIONS_PATH
>         )
>     ){code}
>  
> I receive the following Traceback in Dataflow:
> {code:java}
> Traceback (most recent call last): File 
> "/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py", line 
> 773, in run self._load_main_session(self.local_staging_directory) File 
> "/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py", line 
> 489, in _load_main_session pickler.load_session(session_file) File 
> "/usr/local/lib/python3.6/site-packages/apache_beam/internal/pickler.py", 
> line 287, in load_session return dill.load_session(file_path) File 
> "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 410, in 
> load_session module = unpickler.load() File 
> "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 474, in 
> find_class return StockUnpickler.find_class(self, module, name) 
> AttributeError: Can't get attribute 'FlattenCustomActions' on <module 
> 'dataflow_worker.start' from 
> '/usr/local/lib/python3.6/site-packages/dataflow_worker/start.py'>
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to