prakhar-dhakar commented on issue #30445:
URL: https://github.com/apache/beam/issues/30445#issuecomment-1968371489

   A Quick Fix for this can be the following change in the python-SDK in the 
following function
   apache_beam.dataframe.transforms.DataframeTransform.expand
   
   ```
     def expand(self, input_pcolls):
       # Avoid circular import.
       from apache_beam.dataframe import convert
   
       # Convert inputs to a flat dict.
       input_dict = _flatten(input_pcolls)  # type: Dict[Any, PCollection]
   
   
       proxies = _flatten(self._proxy) if self._proxy is not None else {
           tag: None
           for tag in input_dict
       }
   
       input_frames = {
           k: convert.to_dataframe(pc, proxies[k]) 
           for k, pc in input_dict.items()
       }  # type: Dict[Any, DeferredFrame] # noqa: F821
   ```
   The issue is occuring because of the label 'None' being passed to 
convert.to_dataframe(), this causes the label to be fetched from variable name, 
and the variable name for all PCollections is being retrieved as `pc`, hence 
pipeline fails due to transforms with duplicate names.
   If we pass the label `str(k))` while calling the function, this issue can be 
resolved as shown below.
   
   `k: convert.to_dataframe(pc, proxies[k], str(k)) `
   
   This is a very quick fix that can result in the pipeline being used as 
intended.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to