[
https://issues.apache.org/jira/browse/BEAM-10384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Maurice Poirrier updated BEAM-10384:
------------------------------------
Description:
we are trying to deploy an Streaming pipeline to Dataflow where we separate in
few different "routes" that we manipulate differently the data.
We did the complete development with the DirectRunner, and works smoothly as
we tested but now, that we did deployed it to Dataflow, it does not work.
The code fails when yielding on the following doFn
class SplitByRoute(beam.DoFn):
OUTPUT_TAG_ROUTE_ONE= "route_one"
OUTPUT_TAG_ROUTE_TWO = "route_two"
OUTPUT_NOT_SUPPORTED = "not_supported"
def __init__(self):
beam.DoFn.__init__(self)
def process(self, elem):
try:
route = self.define_route(elem["param"]) # Just tag it depending on param
except Exception:
route = None
logging.info(f"Routed to \{route}")
if route == self.OUTPUT_TAG_ROUTE_ONE:
yield TaggedOutput(self.OUTPUT_TAG_ROUTE_ONE, elem)
elif route == self.OUTPUT_TAG_ROUTE_TWO:
logging.info(f"Element: \{elem}")
yield TaggedOutput(self.OUTPUT_TAG_ROUTE_TWO, elem)
else:
yield TaggedOutput(self.OUTPUT_NOT_SUPPORTED, elem)
It does log the element, yield the output and fails with the following error
`AttributeError: Can't pickle local object
'WeakValueDictionary.__init__.<locals>.remove' [while running
'generatedPtransform-3196']`
Other considerations are that we use taggedOutputs on the pipeline before this
DoFn, and it works on Dataflow but this one in particularly fails with the
error mentioned.
Any suggestions so how we could manage this? It's been very frustrating error.
Thank you!!! :)
was:
we are trying to deploy an Streaming pipeline to Dataflow where we separate in
few different "routes" that we manipulate differently the data.
We did the complete development with the `DirectRunner`, and works smoothly as
we tested but now, that we did deployed it to `Dataflow`, it does not work.
The code fails when yielding on the following `doFn`
``` python
class SplitByRoute(beam.DoFn):
OUTPUT_TAG_ROUTE_ONE= "route_one"
OUTPUT_TAG_ROUTE_TWO = "route_two"
OUTPUT_NOT_SUPPORTED = "not_supported"
def __init__(self):
beam.DoFn.__init__(self)
def process(self, elem):
try:
route = self.define_route(elem["param"]) # Just tag it depending on param
except Exception:
route = None
logging.info(f"Routed to \{route}")
if route == self.OUTPUT_TAG_ROUTE_ONE:
yield TaggedOutput(self.OUTPUT_TAG_ROUTE_ONE, elem)
elif route == self.OUTPUT_TAG_ROUTE_TWO:
logging.info(f"Element: \{elem}")
yield TaggedOutput(self.OUTPUT_TAG_ROUTE_TWO, elem)
else:
yield TaggedOutput(self.OUTPUT_NOT_SUPPORTED, elem)
```
It does log the element, yield the output and fails with the following error
`AttributeError: Can't pickle local object
'WeakValueDictionary.__init__.<locals>.remove' [while running
'generatedPtransform-3196']`
Other considerations are that we use taggedOutputs on the pipeline before this
`DoFn`, and it works on Dataflow but this one in particularly fails with the
error mentioned. Could it be the memory cache? or something related to it?
Where `Weakrefs` are used?
Far as I know, this error happens when you have a class inside another one.
Maybe not(?)
Any suggestions so how we could manage this? It's been very frustrating error.
Thank you!!! :)
> Dataflow can't pickle WeakRefDictionary
> ---------------------------------------
>
> Key: BEAM-10384
> URL: https://issues.apache.org/jira/browse/BEAM-10384
> Project: Beam
> Issue Type: Bug
> Components: runner-dataflow
> Affects Versions: 2.22.0
> Environment: Ubuntu LTS (direct runner)
> Reporter: Maurice Poirrier
> Priority: P0
> Labels: GCP
> Fix For: Not applicable
>
> Original Estimate: 120h
> Remaining Estimate: 120h
>
> we are trying to deploy an Streaming pipeline to Dataflow where we separate
> in few different "routes" that we manipulate differently the data.
> We did the complete development with the DirectRunner, and works smoothly as
> we tested but now, that we did deployed it to Dataflow, it does not work.
> The code fails when yielding on the following doFn
> class SplitByRoute(beam.DoFn):
> OUTPUT_TAG_ROUTE_ONE= "route_one"
> OUTPUT_TAG_ROUTE_TWO = "route_two"
> OUTPUT_NOT_SUPPORTED = "not_supported"
> def __init__(self):
> beam.DoFn.__init__(self)
> def process(self, elem):
> try:
> route = self.define_route(elem["param"]) # Just tag it depending on param
> except Exception:
> route = None
> logging.info(f"Routed to \{route}")
> if route == self.OUTPUT_TAG_ROUTE_ONE:
> yield TaggedOutput(self.OUTPUT_TAG_ROUTE_ONE, elem)
> elif route == self.OUTPUT_TAG_ROUTE_TWO:
> logging.info(f"Element: \{elem}")
> yield TaggedOutput(self.OUTPUT_TAG_ROUTE_TWO, elem)
> else:
> yield TaggedOutput(self.OUTPUT_NOT_SUPPORTED, elem)
> It does log the element, yield the output and fails with the following error
> `AttributeError: Can't pickle local object
> 'WeakValueDictionary.__init__.<locals>.remove' [while running
> 'generatedPtransform-3196']`
> Other considerations are that we use taggedOutputs on the pipeline before
> this DoFn, and it works on Dataflow but this one in particularly fails with
> the error mentioned.
> Any suggestions so how we could manage this? It's been very frustrating error.
> Thank you!!! :)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)