jrmccluskey commented on code in PR #27618:
URL: https://github.com/apache/beam/pull/27618#discussion_r1276354978
##########
sdks/python/apache_beam/runners/dask/transform_evaluator.py:
##########
@@ -83,21 +180,20 @@ def key(item):
def value(item):
k, v = item
- return k, [elm[1] for elm in v]
+ return k, [defenestrate(elm[1]) for elm in v]
return input_bag.groupby(key).map(value)
class Flatten(DaskBagOp):
- def apply(self, input_bag: OpInput) -> db.Bag:
+ def apply(self, input_bag: t.List[db.Bag]) -> db.Bag:
assert type(input_bag) is list, 'Must take a sequence of bags!'
return db.concat(input_bag)
TRANSLATIONS = {
_Create: Create,
apache_beam.ParDo: ParDo,
- apache_beam.Map: Map,
Review Comment:
Could you explain why Map was removed in this PR? Doesn't seem to be hurting
anything based on testing, just curious
##########
sdks/python/apache_beam/runners/dask/transform_evaluator.py:
##########
@@ -65,15 +123,54 @@ def apply(self, input_bag: OpInput) -> db.Bag:
class ParDo(DaskBagOp):
Review Comment:
Docstrings for the classes and methods would be helpful
##########
sdks/python/apache_beam/runners/dask/transform_evaluator.py:
##########
@@ -83,21 +180,20 @@ def key(item):
def value(item):
k, v = item
- return k, [elm[1] for elm in v]
+ return k, [defenestrate(elm[1]) for elm in v]
return input_bag.groupby(key).map(value)
class Flatten(DaskBagOp):
- def apply(self, input_bag: OpInput) -> db.Bag:
+ def apply(self, input_bag: t.List[db.Bag]) -> db.Bag:
assert type(input_bag) is list, 'Must take a sequence of bags!'
Review Comment:
I'd expect our type hinting infra to handle this, but I'm comfortable
leaving it while you develop
##########
sdks/python/apache_beam/runners/dask/transform_evaluator.py:
##########
@@ -25,15 +25,73 @@
import abc
import dataclasses
import typing as t
+from dataclasses import field
import apache_beam
import dask.bag as db
+from apache_beam import DoFn
+from apache_beam import TaggedOutput
from apache_beam.pipeline import AppliedPTransform
+from apache_beam.runners.common import DoFnContext
+from apache_beam.runners.common import DoFnInvoker
+from apache_beam.runners.common import DoFnSignature
+from apache_beam.runners.common import Receiver
+from apache_beam.runners.common import _OutputHandler
from apache_beam.runners.dask.overrides import _Create
from apache_beam.runners.dask.overrides import _Flatten
from apache_beam.runners.dask.overrides import _GroupByKeyOnly
+from apache_beam.transforms.window import GlobalWindow
+from apache_beam.transforms.window import TimestampedValue
+from apache_beam.transforms.window import WindowFn
+from apache_beam.utils.windowed_value import WindowedValue
OpInput = t.Union[db.Bag, t.Sequence[db.Bag], None]
+PCollVal = t.Union[WindowedValue, t.Any]
+
+
+def get_windowed_value(item: t.Any, window_fn: WindowFn) -> WindowedValue:
+ if isinstance(item, TaggedOutput):
+ item = item.value
+
+ if isinstance(item, WindowedValue):
+ windowed_value = item
+ elif isinstance(item, TimestampedValue):
+ assign_context = WindowFn.AssignContext(item.timestamp, item.value)
+ windowed_value = WindowedValue(
+ item.value, item.timestamp, tuple(window_fn.assign(assign_context)))
+ else:
+ windowed_value = WindowedValue(item, 0, (GlobalWindow(), ))
+
+ return windowed_value
+
+
+def defenestrate(x):
Review Comment:
This might be the best function name I've ever seen
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]