prakhar-dhakar commented on issue #30445:
URL: https://github.com/apache/beam/issues/30445#issuecomment-1968371489
A Quick Fix for this can be the following change in the python-SDK in the
following function
apache_beam.dataframe.transforms.DataframeTransform.expand
```
def expand(self, input_pcolls):
# Avoid circular import.
from apache_beam.dataframe import convert
# Convert inputs to a flat dict.
input_dict = _flatten(input_pcolls) # type: Dict[Any, PCollection]
proxies = _flatten(self._proxy) if self._proxy is not None else {
tag: None
for tag in input_dict
}
input_frames = {
k: convert.to_dataframe(pc, proxies[k])
for k, pc in input_dict.items()
} # type: Dict[Any, DeferredFrame] # noqa: F821
```
The issue is occuring because of the label 'None' being passed to
convert.to_dataframe(), this causes the label to be fetched from variable name,
and the variable name for all PCollections is being retrieved as `pc`, hence
pipeline fails due to transforms with duplicate names.
If we pass the label `str(k))` while calling the function, this issue can be
resolved as shown below.
`k: convert.to_dataframe(pc, proxies[k], str(k)) `
This is a very quick fix that can result in the pipeline being used as
intended.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]