[
https://issues.apache.org/jira/browse/BEAM-8441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Valentyn Tymofieiev updated BEAM-8441:
--------------------------------------
Description:
When running Apache Beam with Python3 on Google Cloud Dataflow the pipeline
consistently fails during pickler.load_session(session_file):
StockUnpickler.find_class(self, module, name) AttributeError: Can't get
attribute 'SomeAttribute' on <module 'dataflow_worker.start' from
'/usr/local/lib/python3.6/site-packages/dataflow_worker/start.py'>.
When testing it in the local/direct runner there seems to be no issue.
{code:java}
class FlattenCustomActions(beam.PTransform):
""" Transforms Facebook Day Actions Only retains actions with
custom_conversions
Flattens the actions
Adds custom conversions names using a side input
"""
def __init__(self, conversions):
super(FlattenCustomActions, self).__init__()
self.conversions = conversions def expand(self, input_or_inputs):
return (
input_or_inputs
| "FlattenActions" >> beam.ParDo(flatten_filter_actions)
| "AddConversionName" >> beam.Map(add_conversion_name,
self.conversions)
)
# ...
# in run():
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
conversions_output = (
p
| "ReadConversions" >> ReadFromText(known_args.input_conversions,
coder=JsonCoder())
| TransformConversionMetadata()
) (
conversions_output
| "WriteConversions"
>> WriteCoerced(
known_args.output_conversions,
known_args.output_type,
schema_path=BIGQUERY_SCHEMA_CONVERSIONS_PATH,
)
) (
p
| ReadFacebookJson(known_args.input, retain_root_fields=True)
| FlattenCustomActions(beam.pvalue.AsList(conversions_output))
| "WriteActions"
>> WriteCoerced(
known_args.output, known_args.output_type,
schema_path=BIGQUERY_SCHEMA_ACTIONS_PATH
)
){code}
I receive the following Traceback in Dataflow:
{code:java}
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py",
line 773, in run self._load_main_session(self.local_staging_directory)
File "/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py",
line 489, in _load_main_session pickler.load_session(session_file)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/internal/pickler.py", line
287, in load_session return dill.load_session(file_path)
File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 410, in
load_session module = unpickler.load()
File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 474, in
find_class return StockUnpickler.find_class(self, module, name) AttributeError:
Can't get attribute 'FlattenCustomActions' on <module 'dataflow_worker.start'
from '/usr/local/lib/python3.6/site-packages/dataflow_worker/start.py'>
{code}
was:
When running Apache Beam with Python3 on Google Cloud Dataflow Sideinputs don't
work.
When testing it in the local/direct runner there seems to be no issue.
{code:java}
class FlattenCustomActions(beam.PTransform):
""" Transforms Facebook Day Actions Only retains actions with
custom_conversions
Flattens the actions
Adds custom conversions names using a side input
"""
def __init__(self, conversions):
super(FlattenCustomActions, self).__init__()
self.conversions = conversions def expand(self, input_or_inputs):
return (
input_or_inputs
| "FlattenActions" >> beam.ParDo(flatten_filter_actions)
| "AddConversionName" >> beam.Map(add_conversion_name,
self.conversions)
)
# ...
# in run():
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
conversions_output = (
p
| "ReadConversions" >> ReadFromText(known_args.input_conversions,
coder=JsonCoder())
| TransformConversionMetadata()
) (
conversions_output
| "WriteConversions"
>> WriteCoerced(
known_args.output_conversions,
known_args.output_type,
schema_path=BIGQUERY_SCHEMA_CONVERSIONS_PATH,
)
) (
p
| ReadFacebookJson(known_args.input, retain_root_fields=True)
| FlattenCustomActions(beam.pvalue.AsList(conversions_output))
| "WriteActions"
>> WriteCoerced(
known_args.output, known_args.output_type,
schema_path=BIGQUERY_SCHEMA_ACTIONS_PATH
)
){code}
I receive the following Traceback in Dataflow:
{code:java}
Traceback (most recent call last): File
"/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py", line
773, in run self._load_main_session(self.local_staging_directory) File
"/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py", line
489, in _load_main_session pickler.load_session(session_file) File
"/usr/local/lib/python3.6/site-packages/apache_beam/internal/pickler.py", line
287, in load_session return dill.load_session(file_path) File
"/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 410, in
load_session module = unpickler.load() File
"/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 474, in find_class
return StockUnpickler.find_class(self, module, name) AttributeError: Can't get
attribute 'FlattenCustomActions' on <module 'dataflow_worker.start' from
'/usr/local/lib/python3.6/site-packages/dataflow_worker/start.py'>
{code}
> Side-Input in Python3 fails to pickle class
> -------------------------------------------
>
> Key: BEAM-8441
> URL: https://issues.apache.org/jira/browse/BEAM-8441
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Reporter: Jannik Franz
> Assignee: Valentyn Tymofieiev
> Priority: Major
>
> When running Apache Beam with Python3 on Google Cloud Dataflow the pipeline
> consistently fails during pickler.load_session(session_file):
> StockUnpickler.find_class(self, module, name) AttributeError: Can't get
> attribute 'SomeAttribute' on <module 'dataflow_worker.start' from
> '/usr/local/lib/python3.6/site-packages/dataflow_worker/start.py'>.
> When testing it in the local/direct runner there seems to be no issue.
>
> {code:java}
> class FlattenCustomActions(beam.PTransform):
> """ Transforms Facebook Day Actions Only retains actions with
> custom_conversions
> Flattens the actions
> Adds custom conversions names using a side input
> """
> def __init__(self, conversions):
> super(FlattenCustomActions, self).__init__()
> self.conversions = conversions def expand(self, input_or_inputs):
> return (
> input_or_inputs
> | "FlattenActions" >> beam.ParDo(flatten_filter_actions)
> | "AddConversionName" >> beam.Map(add_conversion_name,
> self.conversions)
> )
> # ...
> # in run():
> pipeline_options = PipelineOptions(pipeline_args)
> pipeline_options.view_as(SetupOptions).save_main_session = True
> p = beam.Pipeline(options=pipeline_options)
> conversions_output = (
> p
> | "ReadConversions" >> ReadFromText(known_args.input_conversions,
> coder=JsonCoder())
> | TransformConversionMetadata()
> ) (
> conversions_output
> | "WriteConversions"
> >> WriteCoerced(
> known_args.output_conversions,
> known_args.output_type,
> schema_path=BIGQUERY_SCHEMA_CONVERSIONS_PATH,
> )
> ) (
> p
> | ReadFacebookJson(known_args.input, retain_root_fields=True)
> | FlattenCustomActions(beam.pvalue.AsList(conversions_output))
> | "WriteActions"
> >> WriteCoerced(
> known_args.output, known_args.output_type,
> schema_path=BIGQUERY_SCHEMA_ACTIONS_PATH
> )
> ){code}
>
> I receive the following Traceback in Dataflow:
> {code:java}
> Traceback (most recent call last):
> File
> "/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py", line
> 773, in run self._load_main_session(self.local_staging_directory)
> File
> "/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py", line
> 489, in _load_main_session pickler.load_session(session_file)
> File
> "/usr/local/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
> line 287, in load_session return dill.load_session(file_path)
> File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 410, in
> load_session module = unpickler.load()
> File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 474, in
> find_class return StockUnpickler.find_class(self, module, name)
> AttributeError: Can't get attribute 'FlattenCustomActions' on <module
> 'dataflow_worker.start' from
> '/usr/local/lib/python3.6/site-packages/dataflow_worker/start.py'>
> {code}
>
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)