This is an automated email from the ASF dual-hosted git repository.
goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 79b6e5b [BEAM-8824] Add support to allow specify window
allowed_lateness in python sdk
new fb0353e Merge pull request #10216 from y1chi/allowed_lateness
79b6e5b is described below
commit 79b6e5bb863c8cf96b690d1a96c8988d8bec72be
Author: Yichi Zhang <[email protected]>
AuthorDate: Tue Nov 19 10:16:38 2019 -0800
[BEAM-8824] Add support to allow specify window allowed_lateness in python
sdk
---
.../apache_beam/examples/snippets/snippets_test.py | 2 ++
.../testing/data/trigger_transcripts.yaml | 22 ++++++++++++
.../python/apache_beam/testing/test_stream_test.py | 2 +-
sdks/python/apache_beam/transforms/core.py | 39 +++++++++++++++++-----
sdks/python/apache_beam/transforms/trigger.py | 5 ++-
sdks/python/apache_beam/transforms/trigger_test.py | 29 ++++++++++------
6 files changed, 78 insertions(+), 21 deletions(-)
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py
b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index f0f53e2..38bcb88 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -958,6 +958,7 @@ class SnippetsTest(unittest.TestCase):
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| WindowInto(FixedWindows(15),
trigger=trigger,
+ allowed_lateness=20,
accumulation_mode=AccumulationMode.DISCARDING)
| 'group' >> beam.GroupByKey()
| 'count' >> beam.Map(
@@ -1014,6 +1015,7 @@ class SnippetsTest(unittest.TestCase):
FixedWindows(1 * 60),
trigger=AfterWatermark(
late=AfterProcessingTime(10 * 60)),
+ allowed_lateness=10,
accumulation_mode=AccumulationMode.DISCARDING)
# [END model_composite_triggers]
| 'group' >> beam.GroupByKey()
diff --git a/sdks/python/apache_beam/testing/data/trigger_transcripts.yaml
b/sdks/python/apache_beam/testing/data/trigger_transcripts.yaml
index cac0c74..b2d4e9a 100644
--- a/sdks/python/apache_beam/testing/data/trigger_transcripts.yaml
+++ b/sdks/python/apache_beam/testing/data/trigger_transcripts.yaml
@@ -30,6 +30,7 @@ name: fixed_default_late_data
window_fn: FixedWindows(10)
trigger_fn: Default
timestamp_combiner: OUTPUT_AT_EOW
+allowed_lateness: 100
transcript:
- input: [1, 2, 3, 10, 11, 25]
- watermark: 100
@@ -42,6 +43,26 @@ transcript:
- {window: [0, 9], values: [1, 2, 3, 7], timestamp: 9, late: true}
---
+name: fixed_drop_late_data_after_allowed_lateness
+window_fn: FixedWindows(10)
+trigger_fn: AfterWatermark(early=AfterCount(3), late=AfterCount(1))
+timestamp_combiner: OUTPUT_AT_EOW
+allowed_lateness: 20
+accumulation_mode: accumulating
+transcript:
+ - input: [1, 2, 10, 11, 80, 81]
+ - watermark: 100
+ - expect:
+ - {window: [0, 9], values: [1, 2], timestamp: 9, final: false}
+ - {window: [10, 19], values: [10, 11], timestamp: 19}
+ - {window: [80, 89], values: [80, 81], timestamp: 89, late: false}
+ - input: [7, 8] # no output
+ - input: [17, 18] # no output
+ - input: [82]
+ - expect:
+ - {window: [80, 89], values: [80, 81, 82], timestamp: 89, late: true}
+
+---
name: timestamp_combiner_earliest
window_fn: FixedWindows(10)
trigger_fn: Default
@@ -118,6 +139,7 @@ broken_on:
- SwitchingDirectRunner
window_fn: Sessions(10)
trigger_fn: AfterWatermark(early=AfterCount(2), late=AfterCount(3))
+allowed_lateness: 100
timestamp_combiner: OUTPUT_AT_EOW
transcript:
- input: [1, 2, 3]
diff --git a/sdks/python/apache_beam/testing/test_stream_test.py
b/sdks/python/apache_beam/testing/test_stream_test.py
index 26b54bd..bfadb5e 100644
--- a/sdks/python/apache_beam/testing/test_stream_test.py
+++ b/sdks/python/apache_beam/testing/test_stream_test.py
@@ -275,7 +275,7 @@ class TestStreamTest(unittest.TestCase):
p = TestPipeline(options=options)
records = (p
| test_stream
- | beam.WindowInto(FixedWindows(15))
+ | beam.WindowInto(FixedWindows(15), allowed_lateness=300)
| beam.Map(lambda x: ('k', x))
| beam.GroupByKey())
diff --git a/sdks/python/apache_beam/transforms/core.py
b/sdks/python/apache_beam/transforms/core.py
index 3cd5472..25cc91f 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -64,6 +64,7 @@ from apache_beam.typehints.trivial_inference import
element_type
from apache_beam.typehints.typehints import is_consistent_with
from apache_beam.utils import timestamp
from apache_beam.utils import urns
+from apache_beam.utils.timestamp import Duration
if typing.TYPE_CHECKING:
from google.protobuf import message # pylint: disable=ungrouped-imports
@@ -2269,7 +2270,21 @@ class Windowing(object):
triggerfn=None, # type: typing.Optional[TriggerFn]
accumulation_mode=None, #
typing.Optional[beam_runner_api_pb2.AccumulationMode]
timestamp_combiner=None, #
typing.Optional[beam_runner_api_pb2.OutputTime]
- ):
+ allowed_lateness=0, # type: typing.Union[int, float]
+ ):
+ """Class representing the window strategy.
+
+ Args:
+ windowfn: Window assign function.
+ triggerfn: Trigger function.
+ accumulation_mode: a AccumulationMode, controls what to do with data
+ when a trigger fires multiple times.
+ timestamp_combiner: a TimestampCombiner, determines how output
+ timestamps of grouping operations are assigned.
+ allowed_lateness: Maximum delay in seconds after end of window
+ allowed for any late data to be processed without being discarded
+ directly.
+ """
global AccumulationMode, DefaultTrigger # pylint:
disable=global-variable-not-assigned
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam.transforms.trigger import AccumulationMode, DefaultTrigger
@@ -2289,13 +2304,15 @@ class Windowing(object):
self.windowfn = windowfn
self.triggerfn = triggerfn
self.accumulation_mode = accumulation_mode
+ self.allowed_lateness = Duration.of(allowed_lateness)
self.timestamp_combiner = (
timestamp_combiner or TimestampCombiner.OUTPUT_AT_EOW)
self._is_default = (
self.windowfn == GlobalWindows() and
self.triggerfn == DefaultTrigger() and
self.accumulation_mode == AccumulationMode.DISCARDING and
- self.timestamp_combiner == TimestampCombiner.OUTPUT_AT_EOW)
+ self.timestamp_combiner == TimestampCombiner.OUTPUT_AT_EOW and
+ self.allowed_lateness == 0)
def __repr__(self):
return "Windowing(%s, %s, %s, %s)" % (self.windowfn, self.triggerfn,
@@ -2310,7 +2327,8 @@ class Windowing(object):
self.windowfn == other.windowfn
and self.triggerfn == other.triggerfn
and self.accumulation_mode == other.accumulation_mode
- and self.timestamp_combiner == other.timestamp_combiner)
+ and self.timestamp_combiner == other.timestamp_combiner
+ and self.allowed_lateness == other.allowed_lateness)
return False
def __ne__(self, other):
@@ -2318,7 +2336,8 @@ class Windowing(object):
return not self == other
def __hash__(self):
- return hash((self.windowfn, self.accumulation_mode,
+ return hash((self.windowfn, self.triggerfn, self.accumulation_mode,
+ self.allowed_lateness,
self.timestamp_combiner))
def is_default(self):
@@ -2340,7 +2359,7 @@ class Windowing(object):
# TODO(robertwb): Support EMIT_IF_NONEMPTY
closing_behavior=beam_runner_api_pb2.ClosingBehavior.EMIT_ALWAYS,
OnTimeBehavior=beam_runner_api_pb2.OnTimeBehavior.FIRE_ALWAYS,
- allowed_lateness=0,
+ allowed_lateness=self.allowed_lateness.micros // 1000,
environment_id=context.default_environment_id())
@staticmethod
@@ -2351,7 +2370,8 @@ class Windowing(object):
windowfn=WindowFn.from_runner_api(proto.window_fn, context),
triggerfn=TriggerFn.from_runner_api(proto.trigger, context),
accumulation_mode=proto.accumulation_mode,
- timestamp_combiner=proto.output_time)
+ timestamp_combiner=proto.output_time,
+ allowed_lateness=Duration(micros=proto.allowed_lateness * 1000))
@typehints.with_input_types(T)
@@ -2383,8 +2403,8 @@ class WindowInto(ParDo):
windowfn, # type: typing.Union[Windowing, WindowFn]
trigger=None, # type: typing.Optional[TriggerFn]
accumulation_mode=None,
- timestamp_combiner=None
- ):
+ timestamp_combiner=None,
+ allowed_lateness=0):
"""Initializes a WindowInto transform.
Args:
@@ -2406,7 +2426,8 @@ class WindowInto(ParDo):
timestamp_combiner = timestamp_combiner or windowing.timestamp_combiner
self.windowing = Windowing(
- windowfn, trigger, accumulation_mode, timestamp_combiner)
+ windowfn, trigger, accumulation_mode, timestamp_combiner,
+ allowed_lateness)
super(WindowInto, self).__init__(self.WindowIntoFn(self.windowing))
def get_windowing(self, unused_inputs):
diff --git a/sdks/python/apache_beam/transforms/trigger.py
b/sdks/python/apache_beam/transforms/trigger.py
index 6106c26..65bd4c7 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -1128,6 +1128,7 @@ class GeneralTriggerDriver(TriggerDriver):
def __init__(self, windowing, clock):
self.clock = clock
+ self.allowed_lateness = windowing.allowed_lateness
self.window_fn = windowing.windowfn
self.timestamp_combiner_impl = TimestampCombiner.get_impl(
windowing.timestamp_combiner, self.window_fn)
@@ -1147,6 +1148,9 @@ class GeneralTriggerDriver(TriggerDriver):
windows_to_elements = collections.defaultdict(list)
for wv in windowed_values:
for window in wv.windows:
+ # ignore expired windows
+ if input_watermark > window.end + self.allowed_lateness:
+ continue
windows_to_elements[window].append((wv.value, wv.timestamp))
# First handle merging.
@@ -1241,7 +1245,6 @@ class GeneralTriggerDriver(TriggerDriver):
nonspeculative_index = state.get_state(
window, self.NONSPECULATIVE_INDEX)
state.add_state(window, self.NONSPECULATIVE_INDEX, 1)
- windowed_value.PaneInfoTiming.LATE
_LOGGER.warning('Watermark moved backwards in time '
'or late data moved window end forward.')
else:
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py
b/sdks/python/apache_beam/transforms/trigger_test.py
index d67611c..bdc8e37 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -36,6 +36,7 @@ import apache_beam as beam
from apache_beam import coders
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
+from apache_beam.portability import common_urns
from apache_beam.runners import pipeline_context
from apache_beam.runners.direct.clock import TestClock
from apache_beam.testing.test_pipeline import TestPipeline
@@ -66,6 +67,7 @@ from apache_beam.transforms.window import WindowedValue
from apache_beam.transforms.window import WindowFn
from apache_beam.utils.timestamp import MAX_TIMESTAMP
from apache_beam.utils.timestamp import MIN_TIMESTAMP
+from apache_beam.utils.timestamp import Duration
from apache_beam.utils.windowed_value import PaneInfoTiming
@@ -118,8 +120,11 @@ class TriggerTest(unittest.TestCase):
bundles, late_bundles,
expected_panes):
actual_panes = collections.defaultdict(list)
+ allowed_lateness = Duration(micros=int(
+ common_urns.constants.MAX_TIMESTAMP_MILLIS.constant)*1000)
driver = GeneralTriggerDriver(
- Windowing(window_fn, trigger_fn, accumulation_mode), TestClock())
+ Windowing(window_fn, trigger_fn, accumulation_mode,
+ allowed_lateness=allowed_lateness), TestClock())
state = InMemoryUnmergedState()
for bundle in bundles:
@@ -590,6 +595,7 @@ class TranscriptTest(unittest.TestCase):
timestamp_combiner = getattr(
TimestampCombiner,
spec.get('timestamp_combiner', 'OUTPUT_AT_EOW').upper())
+ allowed_lateness = spec.get('allowed_lateness', 0.000)
def only_element(xs):
x, = list(xs)
@@ -599,7 +605,7 @@ class TranscriptTest(unittest.TestCase):
self._execute(
window_fn, trigger_fn, accumulation_mode, timestamp_combiner,
- transcript, spec)
+ allowed_lateness, transcript, spec)
def _windowed_value_info(windowed_value):
@@ -676,11 +682,11 @@ class TriggerDriverTranscriptTest(TranscriptTest):
def _execute(
self, window_fn, trigger_fn, accumulation_mode, timestamp_combiner,
- transcript, unused_spec):
+ allowed_lateness, transcript, unused_spec):
driver = GeneralTriggerDriver(
- Windowing(window_fn, trigger_fn, accumulation_mode,
timestamp_combiner),
- TestClock())
+ Windowing(window_fn, trigger_fn, accumulation_mode,
+ timestamp_combiner, allowed_lateness), TestClock())
state = InMemoryUnmergedState()
output = []
watermark = MIN_TIMESTAMP
@@ -708,7 +714,8 @@ class TriggerDriverTranscriptTest(TranscriptTest):
for t in params]
output = [
_windowed_value_info(wv)
- for wv in driver.process_elements(state, bundle, watermark)]
+ for wv in driver.process_elements(state, bundle, watermark,
+ watermark)]
fire_timers()
elif action == 'watermark':
@@ -742,7 +749,7 @@ class BaseTestStreamTranscriptTest(TranscriptTest):
def _execute(
self, window_fn, trigger_fn, accumulation_mode, timestamp_combiner,
- transcript, spec):
+ allowed_lateness, transcript, spec):
runner_name = TestPipeline().runner.__class__.__name__
if runner_name in spec.get('broken_on', ()):
@@ -881,7 +888,8 @@ class BaseTestStreamTranscriptTest(TranscriptTest):
window_fn,
trigger=trigger_fn,
accumulation_mode=accumulation_mode,
- timestamp_combiner=timestamp_combiner)
+ timestamp_combiner=timestamp_combiner,
+ allowed_lateness=allowed_lateness)
| aggregation
| beam.MapTuple(_windowed_value_info_map_fn)
# Place outputs back into the global window to allow flattening
@@ -921,7 +929,7 @@ class BatchTranscriptTest(TranscriptTest):
def _execute(
self, window_fn, trigger_fn, accumulation_mode, timestamp_combiner,
- transcript, spec):
+ allowed_lateness, transcript, spec):
if timestamp_combiner == TimestampCombiner.OUTPUT_AT_EARLIEST_TRANSFORMED:
self.skipTest(
'Non-fnapi timestamp combiner: %s' % spec.get('timestamp_combiner'))
@@ -971,7 +979,8 @@ class BatchTranscriptTest(TranscriptTest):
window_fn,
trigger=trigger_fn,
accumulation_mode=accumulation_mode,
- timestamp_combiner=timestamp_combiner))
+ timestamp_combiner=timestamp_combiner,
+ allowed_lateness=allowed_lateness))
grouped = input_pc | 'Grouped' >> (
beam.GroupByKey()