Repository: beam Updated Branches: refs/heads/master 9241fc69d -> 844762d10
[BEAM-2184] Rename OutputTimeFn to TimestampCombiner. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dc186fd8 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dc186fd8 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dc186fd8 Branch: refs/heads/master Commit: dc186fd876c5253d410802c9161d669124f0706f Parents: 9241fc6 Author: Robert Bradshaw <[email protected]> Authored: Mon May 8 15:54:23 2017 -0700 Committer: Robert Bradshaw <[email protected]> Committed: Mon May 8 23:04:11 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/transforms/core.py | 23 ++++++++------- sdks/python/apache_beam/transforms/timeutil.py | 22 +++++++------- sdks/python/apache_beam/transforms/trigger.py | 26 ++++++++--------- .../apache_beam/transforms/trigger_test.py | 9 +++--- .../transforms/trigger_transcripts.yaml | 30 ++++++++++---------- sdks/python/apache_beam/transforms/window.py | 22 +++++++------- .../apache_beam/transforms/window_test.py | 4 +-- 7 files changed, 69 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/dc186fd8/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 9367e6f..7ca1632 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -33,7 +33,7 @@ from apache_beam.transforms.display import HasDisplayData, DisplayDataItem from apache_beam.transforms.ptransform import PTransform from apache_beam.transforms.ptransform import PTransformWithSideInputs from apache_beam.transforms.window import MIN_TIMESTAMP -from apache_beam.transforms.window import OutputTimeFn +from apache_beam.transforms.window import TimestampCombiner from apache_beam.transforms.window import WindowedValue from apache_beam.transforms.window import TimestampedValue from apache_beam.transforms.window import GlobalWindows @@ -1172,7 +1172,7 @@ class Partition(PTransformWithSideInputs): class Windowing(object): def __init__(self, windowfn, triggerfn=None, accumulation_mode=None, - output_time_fn=None): + timestamp_combiner=None): global AccumulationMode, DefaultTrigger # pylint: disable=global-variable-not-assigned # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.transforms.trigger import AccumulationMode, DefaultTrigger @@ -1192,17 +1192,18 @@ class Windowing(object): self.windowfn = windowfn self.triggerfn = triggerfn self.accumulation_mode = accumulation_mode - self.output_time_fn = output_time_fn or OutputTimeFn.OUTPUT_AT_EOW + self.timestamp_combiner = ( + timestamp_combiner or TimestampCombiner.OUTPUT_AT_EOW) self._is_default = ( self.windowfn == GlobalWindows() and self.triggerfn == DefaultTrigger() and self.accumulation_mode == AccumulationMode.DISCARDING and - self.output_time_fn == OutputTimeFn.OUTPUT_AT_EOW) + self.timestamp_combiner == TimestampCombiner.OUTPUT_AT_EOW) def __repr__(self): return "Windowing(%s, %s, %s, %s)" % (self.windowfn, self.triggerfn, self.accumulation_mode, - self.output_time_fn) + self.timestamp_combiner) def __eq__(self, other): if type(self) == type(other): @@ -1212,7 +1213,7 @@ class Windowing(object): self.windowfn == other.windowfn and self.triggerfn == other.triggerfn and self.accumulation_mode == other.accumulation_mode - and self.output_time_fn == other.output_time_fn) + and self.timestamp_combiner == other.timestamp_combiner) return False def is_default(self): @@ -1229,7 +1230,7 @@ class Windowing(object): self.windowfn.get_window_coder()), trigger=self.triggerfn.to_runner_api(context), accumulation_mode=self.accumulation_mode, - output_time=self.output_time_fn, + output_time=self.timestamp_combiner, # TODO(robertwb): Support EMIT_IF_NONEMPTY closing_behavior=beam_runner_api_pb2.EMIT_ALWAYS, allowed_lateness=0) @@ -1242,7 +1243,7 @@ class Windowing(object): windowfn=WindowFn.from_runner_api(proto.window_fn, context), triggerfn=TriggerFn.from_runner_api(proto.trigger, context), accumulation_mode=proto.accumulation_mode, - output_time_fn=proto.output_time) + timestamp_combiner=proto.output_time) @typehints.with_input_types(T) @@ -1275,9 +1276,9 @@ class WindowInto(ParDo): """ triggerfn = kwargs.pop('trigger', None) accumulation_mode = kwargs.pop('accumulation_mode', None) - output_time_fn = kwargs.pop('output_time_fn', None) + timestamp_combiner = kwargs.pop('timestamp_combiner', None) self.windowing = Windowing(windowfn, triggerfn, accumulation_mode, - output_time_fn) + timestamp_combiner) super(WindowInto, self).__init__(self.WindowIntoFn(self.windowing)) def get_windowing(self, unused_inputs): @@ -1307,7 +1308,7 @@ class WindowInto(ParDo): windowing.windowfn, trigger=windowing.triggerfn, accumulation_mode=windowing.accumulation_mode, - output_time_fn=windowing.output_time_fn) + timestamp_combiner=windowing.timestamp_combiner) PTransform.register_urn( http://git-wip-us.apache.org/repos/asf/beam/blob/dc186fd8/sdks/python/apache_beam/transforms/timeutil.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/timeutil.py b/sdks/python/apache_beam/transforms/timeutil.py index 5453b20..435d41b 100644 --- a/sdks/python/apache_beam/transforms/timeutil.py +++ b/sdks/python/apache_beam/transforms/timeutil.py @@ -49,8 +49,8 @@ class TimeDomain(object): raise ValueError('Unknown time domain: %s' % domain) -class OutputTimeFnImpl(object): - """Implementation of OutputTimeFn.""" +class TimestampCombinerImpl(object): + """Implementation of TimestampCombiner.""" __metaclass__ = ABCMeta @@ -78,8 +78,8 @@ class OutputTimeFnImpl(object): return self.combine_all(merging_timestamps) -class DependsOnlyOnWindow(OutputTimeFnImpl): - """OutputTimeFnImpl that only depends on the window.""" +class DependsOnlyOnWindow(TimestampCombinerImpl): + """TimestampCombinerImpl that only depends on the window.""" __metaclass__ = ABCMeta @@ -92,8 +92,8 @@ class DependsOnlyOnWindow(OutputTimeFnImpl): return self.assign_output_time(result_window, None) -class OutputAtEarliestInputTimestampImpl(OutputTimeFnImpl): - """OutputTimeFnImpl outputting at earliest input timestamp.""" +class OutputAtEarliestInputTimestampImpl(TimestampCombinerImpl): + """TimestampCombinerImpl outputting at earliest input timestamp.""" def assign_output_time(self, window, input_timestamp): return input_timestamp @@ -103,8 +103,8 @@ class OutputAtEarliestInputTimestampImpl(OutputTimeFnImpl): return min(output_timestamp, other_output_timestamp) -class OutputAtEarliestTransformedInputTimestampImpl(OutputTimeFnImpl): - """OutputTimeFnImpl outputting at earliest input timestamp.""" +class OutputAtEarliestTransformedInputTimestampImpl(TimestampCombinerImpl): + """TimestampCombinerImpl outputting at earliest input timestamp.""" def __init__(self, window_fn): self.window_fn = window_fn @@ -116,8 +116,8 @@ class OutputAtEarliestTransformedInputTimestampImpl(OutputTimeFnImpl): return min(output_timestamp, other_output_timestamp) -class OutputAtLatestInputTimestampImpl(OutputTimeFnImpl): - """OutputTimeFnImpl outputting at latest input timestamp.""" +class OutputAtLatestInputTimestampImpl(TimestampCombinerImpl): + """TimestampCombinerImpl outputting at latest input timestamp.""" def assign_output_time(self, window, input_timestamp): return input_timestamp @@ -127,7 +127,7 @@ class OutputAtLatestInputTimestampImpl(OutputTimeFnImpl): class OutputAtEndOfWindowImpl(DependsOnlyOnWindow): - """OutputTimeFnImpl outputting at end of window.""" + """TimestampCombinerImpl outputting at end of window.""" def assign_output_time(self, window, unused_input_timestamp): return window.end http://git-wip-us.apache.org/repos/asf/beam/blob/dc186fd8/sdks/python/apache_beam/transforms/trigger.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py index b9786f4..bcb9dd3 100644 --- a/sdks/python/apache_beam/transforms/trigger.py +++ b/sdks/python/apache_beam/transforms/trigger.py @@ -32,7 +32,7 @@ from apache_beam.transforms.timeutil import MAX_TIMESTAMP from apache_beam.transforms.timeutil import MIN_TIMESTAMP from apache_beam.transforms.timeutil import TimeDomain from apache_beam.transforms.window import GlobalWindow -from apache_beam.transforms.window import OutputTimeFn +from apache_beam.transforms.window import TimestampCombiner from apache_beam.transforms.window import WindowedValue from apache_beam.transforms.window import WindowFn from apache_beam.runners.api import beam_runner_api_pb2 @@ -100,17 +100,17 @@ class ListStateTag(StateTag): class WatermarkHoldStateTag(StateTag): - def __init__(self, tag, output_time_fn_impl): + def __init__(self, tag, timestamp_combiner_impl): super(WatermarkHoldStateTag, self).__init__(tag) - self.output_time_fn_impl = output_time_fn_impl + self.timestamp_combiner_impl = timestamp_combiner_impl def __repr__(self): return 'WatermarkHoldStateTag(%s, %s)' % (self.tag, - self.output_time_fn_impl) + self.timestamp_combiner_impl) def with_prefix(self, prefix): return WatermarkHoldStateTag(prefix + self.tag, - self.output_time_fn_impl) + self.timestamp_combiner_impl) # pylint: disable=unused-argument @@ -750,7 +750,7 @@ class MergeableStateAdapter(SimpleState): elif isinstance(tag, ListStateTag): return [v for vs in values for v in vs] elif isinstance(tag, WatermarkHoldStateTag): - return tag.output_time_fn_impl.combine_all(values) + return tag.timestamp_combiner_impl.combine_all(values) else: raise ValueError('Invalid tag.', tag) @@ -909,11 +909,11 @@ class GeneralTriggerDriver(TriggerDriver): def __init__(self, windowing): self.window_fn = windowing.windowfn - self.output_time_fn_impl = OutputTimeFn.get_impl(windowing.output_time_fn, - self.window_fn) + self.timestamp_combiner_impl = TimestampCombiner.get_impl( + windowing.timestamp_combiner, self.window_fn) # pylint: disable=invalid-name - self.WATERMARK_HOLD = WatermarkHoldStateTag('watermark', - self.output_time_fn_impl) + self.WATERMARK_HOLD = WatermarkHoldStateTag( + 'watermark', self.timestamp_combiner_impl) # pylint: enable=invalid-name self.trigger_fn = windowing.triggerfn self.accumulation_mode = windowing.accumulation_mode @@ -965,10 +965,10 @@ class GeneralTriggerDriver(TriggerDriver): continue # Add watermark hold. # TODO(ccy): Add late data and garbage-collection hold support. - output_time = self.output_time_fn_impl.merge( + output_time = self.timestamp_combiner_impl.merge( window, (element_output_time for element_output_time in - (self.output_time_fn_impl.assign_output_time(window, timestamp) + (self.timestamp_combiner_impl.assign_output_time(window, timestamp) for unused_value, timestamp in elements) if element_output_time >= output_watermark)) if output_time is not None: @@ -1075,7 +1075,7 @@ class InMemoryUnmergedState(UnmergedState): elif isinstance(tag, ListStateTag): return values elif isinstance(tag, WatermarkHoldStateTag): - return tag.output_time_fn_impl.combine_all(values) + return tag.timestamp_combiner_impl.combine_all(values) else: raise ValueError('Invalid tag.', tag) http://git-wip-us.apache.org/repos/asf/beam/blob/dc186fd8/sdks/python/apache_beam/transforms/trigger_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py index 914babb..38871fe 100644 --- a/sdks/python/apache_beam/transforms/trigger_test.py +++ b/sdks/python/apache_beam/transforms/trigger_test.py @@ -44,7 +44,7 @@ from apache_beam.transforms.util import assert_that, equal_to from apache_beam.transforms.window import FixedWindows from apache_beam.transforms.window import IntervalWindow from apache_beam.transforms.window import MIN_TIMESTAMP -from apache_beam.transforms.window import OutputTimeFn +from apache_beam.transforms.window import TimestampCombiner from apache_beam.transforms.window import Sessions from apache_beam.transforms.window import TimestampedValue from apache_beam.transforms.window import WindowedValue @@ -522,11 +522,12 @@ class TranscriptTest(unittest.TestCase): trigger_fn = parse_fn(spec.get('trigger_fn', 'Default'), trigger_names) accumulation_mode = getattr( AccumulationMode, spec.get('accumulation_mode', 'ACCUMULATING').upper()) - output_time_fn = getattr( - OutputTimeFn, spec.get('output_time_fn', 'OUTPUT_AT_EOW').upper()) + timestamp_combiner = getattr( + TimestampCombiner, + spec.get('timestamp_combiner', 'OUTPUT_AT_EOW').upper()) driver = GeneralTriggerDriver( - Windowing(window_fn, trigger_fn, accumulation_mode, output_time_fn)) + Windowing(window_fn, trigger_fn, accumulation_mode, timestamp_combiner)) state = InMemoryUnmergedState() output = [] watermark = MIN_TIMESTAMP http://git-wip-us.apache.org/repos/asf/beam/blob/dc186fd8/sdks/python/apache_beam/transforms/trigger_transcripts.yaml ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/trigger_transcripts.yaml b/sdks/python/apache_beam/transforms/trigger_transcripts.yaml index f87cd1d..a736e94 100644 --- a/sdks/python/apache_beam/transforms/trigger_transcripts.yaml +++ b/sdks/python/apache_beam/transforms/trigger_transcripts.yaml @@ -29,7 +29,7 @@ transcript: # Ordered list of events. name: fixed_default_late_data window_fn: FixedWindows(10) trigger_fn: Default -output_time_fn: OUTPUT_AT_EOW +timestamp_combiner: OUTPUT_AT_EOW transcript: - input: [1, 2, 3, 10, 11, 25] - watermark: 100 @@ -42,10 +42,10 @@ transcript: - {window: [0, 9], values: [1, 2, 3, 7], timestamp: 10, late: true} --- -name: output_time_fn_earliest +name: timestamp_combiner_earliest window_fn: FixedWindows(10) trigger_fn: Default -output_time_fn: OUTPUT_AT_EARLIEST +timestamp_combiner: OUTPUT_AT_EARLIEST transcript: - input: [1, 2, 3, 10, 11, 25] - watermark: 100 @@ -55,10 +55,10 @@ transcript: - {window: [20, 29], values: [25], timestamp: 25, late: false} --- -name: output_time_fn_latest +name: timestamp_combiner_latest window_fn: FixedWindows(10) trigger_fn: Default -output_time_fn: OUTPUT_AT_LATEST +timestamp_combiner: OUTPUT_AT_LATEST transcript: - input: [1, 2, 3, 10, 11, 25] - watermark: 100 @@ -69,10 +69,10 @@ transcript: --- # Test that custom timestamping is not invoked. -name: output_time_fn_custom_timestamping_eow +name: timestamp_combiner_custom_timestamping_eow window_fn: CustomTimestampingFixedWindowsWindowFn(10) trigger_fn: Default -output_time_fn: OUTPUT_AT_EOW +timestamp_combiner: OUTPUT_AT_EOW transcript: - input: [1, 2, 3, 10, 11, 25] - watermark: 100 @@ -83,10 +83,10 @@ transcript: --- # Test that custom timestamping is not invoked. -name: output_time_fn_custom_timestamping_earliest +name: timestamp_combiner_custom_timestamping_earliest window_fn: CustomTimestampingFixedWindowsWindowFn(10) trigger_fn: Default -output_time_fn: OUTPUT_AT_EARLIEST +timestamp_combiner: OUTPUT_AT_EARLIEST transcript: - input: [1, 2, 3, 10, 11, 25] - watermark: 100 @@ -97,10 +97,10 @@ transcript: --- # Test that custom timestamping is in fact invoked. -name: output_time_fn_custom_timestamping_earliest +name: timestamp_combiner_custom_timestamping_earliest window_fn: CustomTimestampingFixedWindowsWindowFn(10) trigger_fn: Default -output_time_fn: OUTPUT_AT_EARLIEST_TRANSFORMED +timestamp_combiner: OUTPUT_AT_EARLIEST_TRANSFORMED transcript: - input: [1, 2, 3, 10, 11, 25] - watermark: 100 @@ -113,7 +113,7 @@ transcript: name: early_late_sessions window_fn: Sessions(10) trigger_fn: AfterWatermark(early=AfterCount(2), late=AfterCount(3)) -output_time_fn: OUTPUT_AT_EOW +timestamp_combiner: OUTPUT_AT_EOW transcript: - input: [1, 2, 3] - expect: @@ -136,7 +136,7 @@ transcript: name: garbage_collection window_fn: FixedWindows(10) trigger_fn: AfterCount(2) -output_time_fn: OUTPUT_AT_EOW +timestamp_combiner: OUTPUT_AT_EOW allowed_lateness: 10 accumulation_mode: discarding transcript: @@ -153,7 +153,7 @@ transcript: name: known_late_data_watermark window_fn: FixedWindows(10) trigger_fn: Default -output_time_fn: OUTPUT_AT_EARLIEST +timestamp_combiner: OUTPUT_AT_EARLIEST transcript: - watermark: 5 - input: [2, 3, 7, 8] @@ -165,7 +165,7 @@ transcript: name: known_late_data_no_watermark_hold_possible window_fn: FixedWindows(10) trigger_fn: Default -output_time_fn: OUTPUT_AT_EARLIEST +timestamp_combiner: OUTPUT_AT_EARLIEST transcript: - watermark: 8 - input: [2, 3, 7] http://git-wip-us.apache.org/repos/asf/beam/blob/dc186fd8/sdks/python/apache_beam/transforms/window.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index 9c4b109..44a5a26 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -67,7 +67,7 @@ from apache_beam.utils import urns # TODO(ccy): revisit naming and semantics once Java Apache Beam finalizes their # behavior. -class OutputTimeFn(object): +class TimestampCombiner(object): """Determines how output timestamps of grouping operations are assigned.""" OUTPUT_AT_EOW = beam_runner_api_pb2.END_OF_WINDOW @@ -77,17 +77,17 @@ class OutputTimeFn(object): OUTPUT_AT_EARLIEST_TRANSFORMED = 'OUTPUT_AT_EARLIEST_TRANSFORMED' @staticmethod - def get_impl(output_time_fn, window_fn): - if output_time_fn == OutputTimeFn.OUTPUT_AT_EOW: + def get_impl(timestamp_combiner, window_fn): + if timestamp_combiner == TimestampCombiner.OUTPUT_AT_EOW: return timeutil.OutputAtEndOfWindowImpl() - elif output_time_fn == OutputTimeFn.OUTPUT_AT_EARLIEST: + elif timestamp_combiner == TimestampCombiner.OUTPUT_AT_EARLIEST: return timeutil.OutputAtEarliestInputTimestampImpl() - elif output_time_fn == OutputTimeFn.OUTPUT_AT_LATEST: + elif timestamp_combiner == TimestampCombiner.OUTPUT_AT_LATEST: return timeutil.OutputAtLatestInputTimestampImpl() - elif output_time_fn == OutputTimeFn.OUTPUT_AT_EARLIEST_TRANSFORMED: + elif timestamp_combiner == TimestampCombiner.OUTPUT_AT_EARLIEST_TRANSFORMED: return timeutil.OutputAtEarliestTransformedInputTimestampImpl(window_fn) else: - raise ValueError('Invalid OutputTimeFn: %s.' % output_time_fn) + raise ValueError('Invalid TimestampCombiner: %s.' % timestamp_combiner) class WindowFn(urns.RunnerApiFn): @@ -132,10 +132,10 @@ class WindowFn(urns.RunnerApiFn): def get_transformed_output_time(self, window, input_timestamp): # pylint: disable=unused-argument """Given input time and output window, returns output time for window. - If OutputTimeFn.OUTPUT_AT_EARLIEST_TRANSFORMED is used in the Windowing, - the output timestamp for the given window will be the earliest of the - timestamps returned by get_transformed_output_time() for elements of the - window. + If TimestampCombiner.OUTPUT_AT_EARLIEST_TRANSFORMED is used in the + Windowing, the output timestamp for the given window will be the earliest + of the timestamps returned by get_transformed_output_time() for elements + of the window. Arguments: window: Output window of element. http://git-wip-us.apache.org/repos/asf/beam/blob/dc186fd8/sdks/python/apache_beam/transforms/window_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py index 38a2df8..0f613d7 100644 --- a/sdks/python/apache_beam/transforms/window_test.py +++ b/sdks/python/apache_beam/transforms/window_test.py @@ -38,7 +38,7 @@ from apache_beam.transforms.window import FixedWindows from apache_beam.transforms.window import GlobalWindow from apache_beam.transforms.window import GlobalWindows from apache_beam.transforms.window import IntervalWindow -from apache_beam.transforms.window import OutputTimeFn +from apache_beam.transforms.window import TimestampCombiner from apache_beam.transforms.window import Sessions from apache_beam.transforms.window import SlidingWindows from apache_beam.transforms.window import TimestampedValue @@ -271,7 +271,7 @@ class RunnerApiTest(unittest.TestCase): Windowing(FixedWindows(1, 3), AfterCount(6), accumulation_mode=AccumulationMode.ACCUMULATING), Windowing(SlidingWindows(10, 15, 21), AfterCount(28), - output_time_fn=OutputTimeFn.OUTPUT_AT_LATEST, + timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST, accumulation_mode=AccumulationMode.DISCARDING)): context = pipeline_context.PipelineContext() self.assertEqual(
