cisaacstern commented on code in PR #27618:
URL: https://github.com/apache/beam/pull/27618#discussion_r1279723400


##########
sdks/python/apache_beam/runners/dask/transform_evaluator.py:
##########
@@ -25,15 +25,73 @@
 import abc
 import dataclasses
 import typing as t
+from dataclasses import field
 
 import apache_beam
 import dask.bag as db
+from apache_beam import DoFn
+from apache_beam import TaggedOutput
 from apache_beam.pipeline import AppliedPTransform
+from apache_beam.runners.common import DoFnContext
+from apache_beam.runners.common import DoFnInvoker
+from apache_beam.runners.common import DoFnSignature
+from apache_beam.runners.common import Receiver
+from apache_beam.runners.common import _OutputHandler
 from apache_beam.runners.dask.overrides import _Create
 from apache_beam.runners.dask.overrides import _Flatten
 from apache_beam.runners.dask.overrides import _GroupByKeyOnly
+from apache_beam.transforms.window import GlobalWindow
+from apache_beam.transforms.window import TimestampedValue
+from apache_beam.transforms.window import WindowFn
+from apache_beam.utils.windowed_value import WindowedValue
 
 OpInput = t.Union[db.Bag, t.Sequence[db.Bag], None]
+PCollVal = t.Union[WindowedValue, t.Any]
+
+
+def get_windowed_value(item: t.Any, window_fn: WindowFn) -> WindowedValue:
+  if isinstance(item, TaggedOutput):
+    item = item.value
+
+  if isinstance(item, WindowedValue):
+    windowed_value = item
+  elif isinstance(item, TimestampedValue):
+    assign_context = WindowFn.AssignContext(item.timestamp, item.value)
+    windowed_value = WindowedValue(
+        item.value, item.timestamp, tuple(window_fn.assign(assign_context)))
+  else:
+    windowed_value = WindowedValue(item, 0, (GlobalWindow(), ))
+
+  return windowed_value
+
+
+def defenestrate(x):

Review Comment:
   incredible. also what if `x` is passed as `'prague'` ... do we get some kind 
of easter egg behavior? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to