alxmrs commented on code in PR #27618:
URL: https://github.com/apache/beam/pull/27618#discussion_r1279742359
##########
sdks/python/apache_beam/runners/dask/transform_evaluator.py:
##########
@@ -83,21 +180,20 @@ def key(item):
def value(item):
k, v = item
- return k, [elm[1] for elm in v]
+ return k, [defenestrate(elm[1]) for elm in v]
return input_bag.groupby(key).map(value)
class Flatten(DaskBagOp):
- def apply(self, input_bag: OpInput) -> db.Bag:
+ def apply(self, input_bag: t.List[db.Bag]) -> db.Bag:
assert type(input_bag) is list, 'Must take a sequence of bags!'
Review Comment:
Good to know!
##########
sdks/python/apache_beam/runners/dask/transform_evaluator.py:
##########
@@ -25,15 +25,73 @@
import abc
import dataclasses
import typing as t
+from dataclasses import field
import apache_beam
import dask.bag as db
+from apache_beam import DoFn
+from apache_beam import TaggedOutput
from apache_beam.pipeline import AppliedPTransform
+from apache_beam.runners.common import DoFnContext
+from apache_beam.runners.common import DoFnInvoker
+from apache_beam.runners.common import DoFnSignature
+from apache_beam.runners.common import Receiver
+from apache_beam.runners.common import _OutputHandler
from apache_beam.runners.dask.overrides import _Create
from apache_beam.runners.dask.overrides import _Flatten
from apache_beam.runners.dask.overrides import _GroupByKeyOnly
+from apache_beam.transforms.window import GlobalWindow
+from apache_beam.transforms.window import TimestampedValue
+from apache_beam.transforms.window import WindowFn
+from apache_beam.utils.windowed_value import WindowedValue
OpInput = t.Union[db.Bag, t.Sequence[db.Bag], None]
+PCollVal = t.Union[WindowedValue, t.Any]
+
+
+def get_windowed_value(item: t.Any, window_fn: WindowFn) -> WindowedValue:
+ if isinstance(item, TaggedOutput):
+ item = item.value
+
+ if isinstance(item, WindowedValue):
+ windowed_value = item
+ elif isinstance(item, TimestampedValue):
+ assign_context = WindowFn.AssignContext(item.timestamp, item.value)
+ windowed_value = WindowedValue(
+ item.value, item.timestamp, tuple(window_fn.assign(assign_context)))
+ else:
+ windowed_value = WindowedValue(item, 0, (GlobalWindow(), ))
+
+ return windowed_value
+
+
+def defenestrate(x):
Review Comment:
:)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]