cisaacstern commented on code in PR #27618: URL: https://github.com/apache/beam/pull/27618#discussion_r1279723400
########## sdks/python/apache_beam/runners/dask/transform_evaluator.py: ########## @@ -25,15 +25,73 @@ import abc import dataclasses import typing as t +from dataclasses import field import apache_beam import dask.bag as db +from apache_beam import DoFn +from apache_beam import TaggedOutput from apache_beam.pipeline import AppliedPTransform +from apache_beam.runners.common import DoFnContext +from apache_beam.runners.common import DoFnInvoker +from apache_beam.runners.common import DoFnSignature +from apache_beam.runners.common import Receiver +from apache_beam.runners.common import _OutputHandler from apache_beam.runners.dask.overrides import _Create from apache_beam.runners.dask.overrides import _Flatten from apache_beam.runners.dask.overrides import _GroupByKeyOnly +from apache_beam.transforms.window import GlobalWindow +from apache_beam.transforms.window import TimestampedValue +from apache_beam.transforms.window import WindowFn +from apache_beam.utils.windowed_value import WindowedValue OpInput = t.Union[db.Bag, t.Sequence[db.Bag], None] +PCollVal = t.Union[WindowedValue, t.Any] + + +def get_windowed_value(item: t.Any, window_fn: WindowFn) -> WindowedValue: + if isinstance(item, TaggedOutput): + item = item.value + + if isinstance(item, WindowedValue): + windowed_value = item + elif isinstance(item, TimestampedValue): + assign_context = WindowFn.AssignContext(item.timestamp, item.value) + windowed_value = WindowedValue( + item.value, item.timestamp, tuple(window_fn.assign(assign_context))) + else: + windowed_value = WindowedValue(item, 0, (GlobalWindow(), )) + + return windowed_value + + +def defenestrate(x): Review Comment: incredible. also what if `x` is passed as `'prague'` ... do we get some kind of easter egg behavior? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
