mattalbr commented on issue #33802:
URL: https://github.com/apache/beam/issues/33802#issuecomment-2654923822
From my experimentation, it seems like the workaround in #33189 requires the
DoFn or PTransform to only operate on a concrete instance of a generic type,
but doesn't work for a fully generic DoFn or PTransform (e.g. if a DoFn needs
to accept a fully generic type).
It would be cool to support fully generic PTransforms. FWIW, here's my use
case:
I'm building out a pipeline that processes data from various historical
tables and basically groups historical values by key and computes intervals of
values across time for each key.
The logic there to handle the intervalization is complex enough that I don't
want to repeat it, so I have my PTransform that computes the intervals accept
Callables to go from each history row to a key+value pair. Then the output is:
`tuple[K, list[Interval[V]]]`
Without support for generics, I need to throw a ton of Anys in here and
slowly but surely chip away at the value of the typechecking. With generics I
can say:
```
K = TypeVar("K")
V = TypeVar("V")
R = TypeVar("R")
@dataclasses.dataclass
class Interval[V]:
start: datetime.datetime
end: datetime.datetime
value: V | None
class Intervalizer[K, V, R](beam.PTransform):
def __init__(self, key_fn: Callable[[R], K], value_fn: Callable[[R], V]):
self._key_fn = key_fn
self._value_fn = value_fn
def process(self, pcoll) -> list[tuple[K, Interval[V]]]:
# Fancy implementation.
return pcoll
class FooIntervalizer(beam.PTransform):
def process(self, history_rows_pcoll) -> list[tuple[int, Interval[str]]]:
return history_rows_pcoll | Intervalizer[int, str](key_fn=lambda x:
x.id, value_fn=lambda x: x.val)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]