This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new cbe4dfb Refactoring code from direct runner, and adding unit test for
processing time timers. (#8271)
cbe4dfb is described below
commit cbe4dfbdbe5d0da5152568853ee5e17334dd1b54
Author: Pablo <[email protected]>
AuthorDate: Thu Apr 11 11:35:25 2019 -0700
Refactoring code from direct runner, and adding unit test for processing
time timers. (#8271)
* Small refactor of direct runner code, and adding unit test.
* Fixing lint issue
---
sdks/python/apache_beam/runners/common.py | 8 +--
.../apache_beam/runners/direct/direct_runner.py | 11 ++--
.../runners/direct/evaluation_context.py | 28 ++++++----
.../apache_beam/transforms/userstate_test.py | 59 ++++++++++++++++++++++
4 files changed, 85 insertions(+), 21 deletions(-)
diff --git a/sdks/python/apache_beam/runners/common.py
b/sdks/python/apache_beam/runners/common.py
index f1fda35..84ac116 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -547,7 +547,7 @@ class PerWindowInvoker(DoFnInvoker):
try:
self.current_windowed_value = windowed_value
self.restriction_tracker = restriction_tracker
- return self._invoke_per_window(
+ return self._invoke_process_per_window(
windowed_value, additional_args, additional_kwargs,
output_processor)
finally:
@@ -556,14 +556,14 @@ class PerWindowInvoker(DoFnInvoker):
elif self.has_windowed_inputs and len(windowed_value.windows) != 1:
for w in windowed_value.windows:
- self._invoke_per_window(
+ self._invoke_process_per_window(
WindowedValue(windowed_value.value, windowed_value.timestamp,
(w,)),
additional_args, additional_kwargs, output_processor)
else:
- self._invoke_per_window(
+ self._invoke_process_per_window(
windowed_value, additional_args, additional_kwargs, output_processor)
- def _invoke_per_window(
+ def _invoke_process_per_window(
self, windowed_value, additional_args,
additional_kwargs, output_processor):
if self.has_windowed_inputs:
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py
b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 43e8c7f..e880460 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -69,11 +69,6 @@ class SwitchingDirectRunner(PipelineRunner):
"""
def run_pipeline(self, pipeline, options):
- use_fnapi_runner = True
-
- # Streaming mode is not yet supported on the FnApiRunner.
- if options.view_as(StandardOptions).streaming:
- use_fnapi_runner = False
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.dataflow.native_io.iobase import NativeSource
@@ -113,8 +108,10 @@ class SwitchingDirectRunner(PipelineRunner):
self.supported_by_fnapi_runner = False
# Check whether all transforms used in the pipeline are supported by the
- # FnApiRunner.
- use_fnapi_runner = _FnApiRunnerSupportVisitor().accept(pipeline)
+ # FnApiRunner, and the pipeline was not meant to be run as streaming.
+ use_fnapi_runner = (
+ _FnApiRunnerSupportVisitor().accept(pipeline)
+ and not options.view_as(StandardOptions).streaming)
# Also ensure grpc is available.
try:
diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py
b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index 24b05b6..a042ded 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -274,16 +274,7 @@ class EvaluationContext(object):
result.logical_metric_updates)
# If the result is for a view, update side inputs container.
- if (result.uncommitted_output_bundles
- and result.uncommitted_output_bundles[0].pcollection
- in self._pcollection_to_views):
- for view in self._pcollection_to_views[
- result.uncommitted_output_bundles[0].pcollection]:
- for committed_bundle in committed_bundles:
- # side_input must be materialized.
- self._side_inputs_container.add_values(
- view,
- committed_bundle.get_elements_iterable(make_copy=True))
+ self._update_side_inputs_container(committed_bundles, result)
# Tasks generated from unblocked side inputs as the watermark progresses.
tasks = self._watermark_manager.update_watermarks(
@@ -304,6 +295,23 @@ class EvaluationContext(object):
existing_keyed_state[k] = v
return committed_bundles
+ def _update_side_inputs_container(self, committed_bundles, result):
+ """Update the side inputs container if we are outputting into a side input.
+
+ Look at the result, and if it's outputing into a PCollection that we have
+ registered as a PCollectionView, we add the result to the PCollectionView.
+ """
+ if (result.uncommitted_output_bundles
+ and result.uncommitted_output_bundles[0].pcollection
+ in self._pcollection_to_views):
+ for view in self._pcollection_to_views[
+ result.uncommitted_output_bundles[0].pcollection]:
+ for committed_bundle in committed_bundles:
+ # side_input must be materialized.
+ self._side_inputs_container.add_values(
+ view,
+ committed_bundle.get_elements_iterable(make_copy=True))
+
def get_aggregator_values(self, aggregator_or_name):
return self._counter_factory.get_aggregator_values(aggregator_or_name)
diff --git a/sdks/python/apache_beam/transforms/userstate_test.py
b/sdks/python/apache_beam/transforms/userstate_test.py
index 6935a3a..0a3e13c 100644
--- a/sdks/python/apache_beam/transforms/userstate_test.py
+++ b/sdks/python/apache_beam/transforms/userstate_test.py
@@ -418,6 +418,65 @@ class StatefulDoFnOnDirectRunnerTest(unittest.TestCase):
'key-value pairs.')):
values | beam.ParDo(TestStatefulDoFn())
+ def test_generate_sequence_with_realtime_timer(self):
+ from apache_beam.transforms.combiners import CountCombineFn
+
+ class GenerateRecords(beam.DoFn):
+
+ EMIT_TIMER = TimerSpec('emit_timer', TimeDomain.REAL_TIME)
+ COUNT_STATE = CombiningValueStateSpec(
+ 'count_state', VarIntCoder(), CountCombineFn())
+
+ def __init__(self, frequency, total_records):
+ self.total_records = total_records
+ self.frequency = frequency
+
+ def process(self,
+ element,
+ emit_timer=beam.DoFn.TimerParam(EMIT_TIMER)):
+ # Processing time timers should be set on ABSOLUTE TIME.
+ emit_timer.set(self.frequency)
+ yield element[1]
+
+ @on_timer(EMIT_TIMER)
+ def emit_values(self,
+ emit_timer=beam.DoFn.TimerParam(EMIT_TIMER),
+ count_state=beam.DoFn.StateParam(COUNT_STATE)):
+ count = count_state.read() or 0
+ if self.total_records == count:
+ return
+
+ count_state.add(1)
+ # Processing time timers should be set on ABSOLUTE TIME.
+ emit_timer.set(count + 1 + self.frequency)
+ yield 'value'
+
+ TOTAL_RECORDS = 3
+ FREQUENCY = 1
+
+ test_stream = (TestStream()
+ .advance_watermark_to(0)
+ .add_elements([('key', 0)])
+ .advance_processing_time(1) # Timestamp: 1
+ .add_elements([('key', 1)])
+ .advance_processing_time(1) # Timestamp: 2
+ .add_elements([('key', 2)])
+ .advance_processing_time(1) # Timestamp: 3
+ .add_elements([('key', 3)]))
+
+ with beam.Pipeline(argv=['--streaming', '--runner=DirectRunner']) as p:
+ _ = (p
+ | test_stream
+ | beam.ParDo(GenerateRecords(FREQUENCY, TOTAL_RECORDS))
+ | beam.ParDo(self.record_dofn()))
+
+ self.assertEqual(
+ # 4 RECORDS go through process
+ # 3 values are emitted from timer
+ # Timestamp moves gradually.
+ [0, 'value', 1, 'value', 2, 'value', 3],
+ StatefulDoFnOnDirectRunnerTest.all_records)
+
def test_simple_stateful_dofn_combining(self):
class SimpleTestStatefulDoFn(DoFn):
BUFFER_STATE = CombiningValueStateSpec(