[ 
https://issues.apache.org/jira/browse/BEAM-10384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maurice Poirrier updated BEAM-10384:
------------------------------------
    Description: 
we are trying to deploy an Streaming pipeline to Dataflow where we separate in 
few different "routes" that we manipulate differently the data.
 We did the complete development with the DirectRunner, and works smoothly as 
we tested but now, that we did deployed it to Dataflow, it does not work.

 
{code:java}
class SplitByRoute(beam.DoFn):
    OUTPUT_TAG_ROUTE_ONE= "route_one"
    OUTPUT_TAG_ROUTE_TWO = "route_two"
    OUTPUT_NOT_SUPPORTED = "not_supported"
    def _init_(self):
      beam.DoFn._init_(self)

    def process(self, elem):
        try:
            route = self.define_route(elem["param"]) # Just tag it depending on 
param 
        except Exception: 
            route = None
        logging.info(f"Routed to {route}")
        if route == self.OUTPUT_TAG_ROUTE_ONE:
            yield TaggedOutput(self.OUTPUT_TAG_ROUTE_ONE, elem)
        elif route == self.OUTPUT_TAG_ROUTE_TWO:
            logging.info(f"Element: {elem}")
            yield TaggedOutput(self.OUTPUT_TAG_ROUTE_TWO, elem)
        else:
            yield TaggedOutput(self.OUTPUT_NOT_SUPPORTED, elem)
{code}
 

 

The code fails when yielding on the following doFn

 

It does log the element, yield the output and fails with the following error
 `AttributeError: Can't pickle local object 
'WeakValueDictionary.__init__.<locals>.remove' [while running 
'generatedPtransform-3196']`

Other considerations are that we use taggedOutputs on the pipeline before this 
DoFn, and it works on Dataflow but this one in particularly fails with the 
error mentioned.

Any suggestions so how we could manage this? It's been very frustrating error.

Thank you!!! :)

 

 

Edit:

The problem is when we use DataflowRunner you can not modify an object that we 
were passing as message (element in the example) and then send it. 

For example
{code:java}
elem = {
    'param': OurClass(),
    'param2': 'stuf'
}

class OurClass:

    def __init__(self):
        self.something = None
    def dosomething(self):
        self.something = 1
        self.other = 2{code}
 

So, on the code on the top we used, define_route to check which route the data 
will go. But on define_route we called dosomething method to define the route. 
So the instance of the class is modified and then, we it tried to pickle this 
message, the code fails.

  was:
we are trying to deploy an Streaming pipeline to Dataflow where we separate in 
few different "routes" that we manipulate differently the data.
 We did the complete development with the DirectRunner, and works smoothly as 
we tested but now, that we did deployed it to Dataflow, it does not work.

The code fails when yielding on the following doFn

class SplitByRoute(beam.DoFn):
 OUTPUT_TAG_ROUTE_ONE= "route_one"
 OUTPUT_TAG_ROUTE_TWO = "route_two"
 OUTPUT_NOT_SUPPORTED = "not_supported"

def __init__(self):
 beam.DoFn.__init__(self)

def process(self, elem):
 try:
 route = self.define_route(elem["param"]) # Just tag it depending on param
 except Exception:
 route = None
 logging.info(f"Routed to \{route}")
 if route == self.OUTPUT_TAG_ROUTE_ONE:
 yield TaggedOutput(self.OUTPUT_TAG_ROUTE_ONE, elem)
 elif route == self.OUTPUT_TAG_ROUTE_TWO:
 logging.info(f"Element: \{elem}")
 yield TaggedOutput(self.OUTPUT_TAG_ROUTE_TWO, elem)
 else:
 yield TaggedOutput(self.OUTPUT_NOT_SUPPORTED, elem)

It does log the element, yield the output and fails with the following error
 `AttributeError: Can't pickle local object 
'WeakValueDictionary.__init__.<locals>.remove' [while running 
'generatedPtransform-3196']`

Other considerations are that we use taggedOutputs on the pipeline before this 
DoFn, and it works on Dataflow but this one in particularly fails with the 
error mentioned.

Any suggestions so how we could manage this? It's been very frustrating error.

Thank you!!! :)


> Dataflow can't pickle WeakRefDictionary
> ---------------------------------------
>
>                 Key: BEAM-10384
>                 URL: https://issues.apache.org/jira/browse/BEAM-10384
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow
>    Affects Versions: 2.22.0
>         Environment: Ubuntu LTS (direct runner)
>            Reporter: Maurice Poirrier
>            Priority: P0
>              Labels: GCP
>             Fix For: Not applicable
>
>   Original Estimate: 120h
>  Remaining Estimate: 120h
>
> we are trying to deploy an Streaming pipeline to Dataflow where we separate 
> in few different "routes" that we manipulate differently the data.
>  We did the complete development with the DirectRunner, and works smoothly as 
> we tested but now, that we did deployed it to Dataflow, it does not work.
>  
> {code:java}
> class SplitByRoute(beam.DoFn):
>     OUTPUT_TAG_ROUTE_ONE= "route_one"
>     OUTPUT_TAG_ROUTE_TWO = "route_two"
>     OUTPUT_NOT_SUPPORTED = "not_supported"
>     def _init_(self):
>       beam.DoFn._init_(self)
>     def process(self, elem):
>         try:
>             route = self.define_route(elem["param"]) # Just tag it depending 
> on param 
>         except Exception: 
>             route = None
>         logging.info(f"Routed to {route}")
>         if route == self.OUTPUT_TAG_ROUTE_ONE:
>             yield TaggedOutput(self.OUTPUT_TAG_ROUTE_ONE, elem)
>         elif route == self.OUTPUT_TAG_ROUTE_TWO:
>             logging.info(f"Element: {elem}")
>             yield TaggedOutput(self.OUTPUT_TAG_ROUTE_TWO, elem)
>         else:
>             yield TaggedOutput(self.OUTPUT_NOT_SUPPORTED, elem)
> {code}
>  
>  
> The code fails when yielding on the following doFn
>  
> It does log the element, yield the output and fails with the following error
>  `AttributeError: Can't pickle local object 
> 'WeakValueDictionary.__init__.<locals>.remove' [while running 
> 'generatedPtransform-3196']`
> Other considerations are that we use taggedOutputs on the pipeline before 
> this DoFn, and it works on Dataflow but this one in particularly fails with 
> the error mentioned.
> Any suggestions so how we could manage this? It's been very frustrating error.
> Thank you!!! :)
>  
>  
> Edit:
> The problem is when we use DataflowRunner you can not modify an object that 
> we were passing as message (element in the example) and then send it. 
> For example
> {code:java}
> elem = {
>     'param': OurClass(),
>     'param2': 'stuf'
> }
> class OurClass:
>     def __init__(self):
>         self.something = None
>     def dosomething(self):
>         self.something = 1
>         self.other = 2{code}
>  
> So, on the code on the top we used, define_route to check which route the 
> data will go. But on define_route we called dosomething method to define the 
> route. So the instance of the class is modified and then, we it tried to 
> pickle this message, the code fails.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to