jrmccluskey commented on code in PR #27618:
URL: https://github.com/apache/beam/pull/27618#discussion_r1276354978


##########
sdks/python/apache_beam/runners/dask/transform_evaluator.py:
##########
@@ -83,21 +180,20 @@ def key(item):
 
     def value(item):
       k, v = item
-      return k, [elm[1] for elm in v]
+      return k, [defenestrate(elm[1]) for elm in v]
 
     return input_bag.groupby(key).map(value)
 
 
 class Flatten(DaskBagOp):
-  def apply(self, input_bag: OpInput) -> db.Bag:
+  def apply(self, input_bag: t.List[db.Bag]) -> db.Bag:
     assert type(input_bag) is list, 'Must take a sequence of bags!'
     return db.concat(input_bag)
 
 
 TRANSLATIONS = {
     _Create: Create,
     apache_beam.ParDo: ParDo,
-    apache_beam.Map: Map,

Review Comment:
   Could you explain why Map was removed in this PR? Doesn't seem to be hurting 
anything based on testing, just curious



##########
sdks/python/apache_beam/runners/dask/transform_evaluator.py:
##########
@@ -65,15 +123,54 @@ def apply(self, input_bag: OpInput) -> db.Bag:
 class ParDo(DaskBagOp):

Review Comment:
   Docstrings for the classes and methods would be helpful 



##########
sdks/python/apache_beam/runners/dask/transform_evaluator.py:
##########
@@ -83,21 +180,20 @@ def key(item):
 
     def value(item):
       k, v = item
-      return k, [elm[1] for elm in v]
+      return k, [defenestrate(elm[1]) for elm in v]
 
     return input_bag.groupby(key).map(value)
 
 
 class Flatten(DaskBagOp):
-  def apply(self, input_bag: OpInput) -> db.Bag:
+  def apply(self, input_bag: t.List[db.Bag]) -> db.Bag:
     assert type(input_bag) is list, 'Must take a sequence of bags!'

Review Comment:
   I'd expect our type hinting infra to handle this, but I'm comfortable 
leaving it while you develop 



##########
sdks/python/apache_beam/runners/dask/transform_evaluator.py:
##########
@@ -25,15 +25,73 @@
 import abc
 import dataclasses
 import typing as t
+from dataclasses import field
 
 import apache_beam
 import dask.bag as db
+from apache_beam import DoFn
+from apache_beam import TaggedOutput
 from apache_beam.pipeline import AppliedPTransform
+from apache_beam.runners.common import DoFnContext
+from apache_beam.runners.common import DoFnInvoker
+from apache_beam.runners.common import DoFnSignature
+from apache_beam.runners.common import Receiver
+from apache_beam.runners.common import _OutputHandler
 from apache_beam.runners.dask.overrides import _Create
 from apache_beam.runners.dask.overrides import _Flatten
 from apache_beam.runners.dask.overrides import _GroupByKeyOnly
+from apache_beam.transforms.window import GlobalWindow
+from apache_beam.transforms.window import TimestampedValue
+from apache_beam.transforms.window import WindowFn
+from apache_beam.utils.windowed_value import WindowedValue
 
 OpInput = t.Union[db.Bag, t.Sequence[db.Bag], None]
+PCollVal = t.Union[WindowedValue, t.Any]
+
+
+def get_windowed_value(item: t.Any, window_fn: WindowFn) -> WindowedValue:
+  if isinstance(item, TaggedOutput):
+    item = item.value
+
+  if isinstance(item, WindowedValue):
+    windowed_value = item
+  elif isinstance(item, TimestampedValue):
+    assign_context = WindowFn.AssignContext(item.timestamp, item.value)
+    windowed_value = WindowedValue(
+        item.value, item.timestamp, tuple(window_fn.assign(assign_context)))
+  else:
+    windowed_value = WindowedValue(item, 0, (GlobalWindow(), ))
+
+  return windowed_value
+
+
+def defenestrate(x):

Review Comment:
   This might be the best function name I've ever seen



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to