http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/direct/watermark_manager.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py b/sdks/python/apache_beam/runners/direct/watermark_manager.py new file mode 100644 index 0000000..f439731 --- /dev/null +++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py @@ -0,0 +1,224 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Manages watermarks of PCollections and AppliedPTransforms.""" + +from __future__ import absolute_import + +import threading + +from apache_beam import pipeline +from apache_beam import pvalue +from apache_beam.transforms.timeutil import MAX_TIMESTAMP +from apache_beam.transforms.timeutil import MIN_TIMESTAMP + + +class WatermarkManager(object): + """Tracks and updates watermarks for all AppliedPTransforms.""" + + WATERMARK_POS_INF = MAX_TIMESTAMP + WATERMARK_NEG_INF = MIN_TIMESTAMP + + def __init__(self, clock, root_transforms, value_to_consumers): + self._clock = clock # processing time clock + self._value_to_consumers = value_to_consumers + self._root_transforms = root_transforms + # AppliedPTransform -> TransformWatermarks + self._transform_to_watermarks = {} + + for root_transform in root_transforms: + self._transform_to_watermarks[root_transform] = TransformWatermarks( + self._clock) + + for consumers in value_to_consumers.values(): + for consumer in consumers: + self._transform_to_watermarks[consumer] = TransformWatermarks( + self._clock) + + for consumers in value_to_consumers.values(): + for consumer in consumers: + self._update_input_transform_watermarks(consumer) + + def _update_input_transform_watermarks(self, applied_ptransform): + assert isinstance(applied_ptransform, pipeline.AppliedPTransform) + input_transform_watermarks = [] + for input_pvalue in applied_ptransform.inputs: + assert input_pvalue.producer or isinstance(input_pvalue, pvalue.PBegin) + if input_pvalue.producer: + input_transform_watermarks.append( + self.get_watermarks(input_pvalue.producer)) + self._transform_to_watermarks[ + applied_ptransform].update_input_transform_watermarks( + input_transform_watermarks) + + def get_watermarks(self, applied_ptransform): + """Gets the input and output watermarks for an AppliedPTransform. + + If the applied_ptransform has not processed any elements, return a + watermark with minimum value. + + Args: + applied_ptransform: AppliedPTransform to get the watermarks for. + + Returns: + A snapshot (TransformWatermarks) of the input watermark and output + watermark for the provided transform. + """ + + # TODO(altay): Composite transforms should have a composite watermark. Until + # then they are represented by their last transform. + while applied_ptransform.parts: + applied_ptransform = applied_ptransform.parts[-1] + + return self._transform_to_watermarks[applied_ptransform] + + def update_watermarks(self, completed_committed_bundle, applied_ptransform, + timer_update, outputs, earliest_hold): + assert isinstance(applied_ptransform, pipeline.AppliedPTransform) + self._update_pending( + completed_committed_bundle, applied_ptransform, timer_update, outputs) + tw = self.get_watermarks(applied_ptransform) + tw.hold(earliest_hold) + self._refresh_watermarks(applied_ptransform) + + def _update_pending(self, input_committed_bundle, applied_ptransform, + timer_update, output_committed_bundles): + """Updated list of pending bundles for the given AppliedPTransform.""" + + # Update pending elements. Filter out empty bundles. They do not impact + # watermarks and should not trigger downstream execution. + for output in output_committed_bundles: + if output.elements: + if output.pcollection in self._value_to_consumers: + consumers = self._value_to_consumers[output.pcollection] + for consumer in consumers: + consumer_tw = self._transform_to_watermarks[consumer] + consumer_tw.add_pending(output) + + completed_tw = self._transform_to_watermarks[applied_ptransform] + completed_tw.update_timers(timer_update) + + assert input_committed_bundle or applied_ptransform in self._root_transforms + if input_committed_bundle and input_committed_bundle.elements: + completed_tw.remove_pending(input_committed_bundle) + + def _refresh_watermarks(self, applied_ptransform): + assert isinstance(applied_ptransform, pipeline.AppliedPTransform) + tw = self.get_watermarks(applied_ptransform) + if tw.refresh(): + for pval in applied_ptransform.outputs.values(): + if isinstance(pval, pvalue.DoOutputsTuple): + pvals = (v for v in pval) + else: + pvals = (pval,) + for v in pvals: + if v in self._value_to_consumers: # If there are downstream consumers + consumers = self._value_to_consumers[v] + for consumer in consumers: + self._refresh_watermarks(consumer) + + def extract_fired_timers(self): + all_timers = [] + for applied_ptransform, tw in self._transform_to_watermarks.iteritems(): + if tw.extract_fired_timers(): + all_timers.append(applied_ptransform) + return all_timers + + +class TransformWatermarks(object): + """Tracks input and output watermarks for aan AppliedPTransform.""" + + def __init__(self, clock): + self._clock = clock + self._input_transform_watermarks = [] + self._input_watermark = WatermarkManager.WATERMARK_NEG_INF + self._output_watermark = WatermarkManager.WATERMARK_NEG_INF + self._earliest_hold = WatermarkManager.WATERMARK_POS_INF + self._pending = set() # Scheduled bundles targeted for this transform. + self._fired_timers = False + self._lock = threading.Lock() + + def update_input_transform_watermarks(self, input_transform_watermarks): + with self._lock: + self._input_transform_watermarks = input_transform_watermarks + + def update_timers(self, timer_update): + with self._lock: + if timer_update: + assert self._fired_timers + self._fired_timers = False + + @property + def input_watermark(self): + with self._lock: + return self._input_watermark + + @property + def output_watermark(self): + with self._lock: + return self._output_watermark + + def hold(self, value): + with self._lock: + if value is None: + value = WatermarkManager.WATERMARK_POS_INF + self._earliest_hold = value + + def add_pending(self, pending): + with self._lock: + self._pending.add(pending) + + def remove_pending(self, completed): + with self._lock: + # Ignore repeated removes. This will happen if a transform has a repeated + # input. + if completed in self._pending: + self._pending.remove(completed) + + def refresh(self): + with self._lock: + pending_holder = (WatermarkManager.WATERMARK_NEG_INF + if self._pending else + WatermarkManager.WATERMARK_POS_INF) + + input_watermarks = [ + tw.output_watermark for tw in self._input_transform_watermarks] + input_watermarks.append(WatermarkManager.WATERMARK_POS_INF) + producer_watermark = min(input_watermarks) + + self._input_watermark = max(self._input_watermark, + min(pending_holder, producer_watermark)) + new_output_watermark = min(self._input_watermark, self._earliest_hold) + + advanced = new_output_watermark > self._output_watermark + self._output_watermark = new_output_watermark + return advanced + + @property + def synchronized_processing_output_time(self): + return self._clock.now + + def extract_fired_timers(self): + with self._lock: + if self._fired_timers: + return False + + should_fire = ( + self._earliest_hold < WatermarkManager.WATERMARK_POS_INF and + self._input_watermark == WatermarkManager.WATERMARK_POS_INF) + self._fired_timers = should_fire + return should_fire
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/direct_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct_runner.py b/sdks/python/apache_beam/runners/direct_runner.py deleted file mode 100644 index c4c52b3..0000000 --- a/sdks/python/apache_beam/runners/direct_runner.py +++ /dev/null @@ -1,308 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""DirectPipelineRunner, executing on the local machine. - -The DirectPipelineRunner class implements what is called in Dataflow -parlance the "direct runner". Such a runner executes the entire graph -of transformations belonging to a pipeline on the local machine. -""" - -from __future__ import absolute_import - -import collections -import itertools -import logging - -from apache_beam import coders -from apache_beam import error -from apache_beam.runners.common import DoFnRunner -from apache_beam.runners.common import DoFnState -from apache_beam.runners.runner import PipelineResult -from apache_beam.runners.runner import PipelineRunner -from apache_beam.runners.runner import PipelineState -from apache_beam.runners.runner import PValueCache -from apache_beam.transforms import sideinputs -from apache_beam.transforms.window import GlobalWindows -from apache_beam.transforms.window import WindowedValue -from apache_beam.typehints.typecheck import OutputCheckWrapperDoFn -from apache_beam.typehints.typecheck import TypeCheckError -from apache_beam.typehints.typecheck import TypeCheckWrapperDoFn -from apache_beam.utils import counters -from apache_beam.utils.options import TypeOptions - - -class DirectPipelineRunner(PipelineRunner): - """A local pipeline runner. - - The runner computes everything locally and does not make any attempt to - optimize for time or space. - """ - - def __init__(self, cache=None): - # Cache of values computed while the runner executes a pipeline. - self._cache = cache if cache is not None else PValueCache() - self._counter_factory = counters.CounterFactory() - # Element counts used for debugging footprint issues in the direct runner. - # The values computed are used only for logging and do not take part in - # any decision making logic. The key for the counter dictionary is either - # the full label for the transform producing the elements or a tuple - # (full label, output tag) for ParDo transforms since they can output values - # on multiple outputs. - self.debug_counters = {} - self.debug_counters['element_counts'] = collections.Counter() - - @property - def cache(self): - return self._cache - - def get_pvalue(self, pvalue): - """Gets the PValue's computed value from the runner's cache.""" - try: - return self._cache.get_pvalue(pvalue) - except KeyError: - raise error.PValueError('PValue is not computed.') - - def clear_pvalue(self, pvalue): - """Removes a PValue from the runner's cache.""" - self._cache.clear_pvalue(pvalue) - - def skip_if_cached(func): # pylint: disable=no-self-argument - """Decorator to skip execution of a transform if value is cached.""" - - def func_wrapper(self, pvalue, *args, **kwargs): - logging.debug('Current: Debug counters: %s', self.debug_counters) - if self._cache.is_cached(pvalue): # pylint: disable=protected-access - return - else: - func(self, pvalue, *args, **kwargs) - return func_wrapper - - def run(self, pipeline): - super(DirectPipelineRunner, self).run(pipeline) - logging.info('Final: Debug counters: %s', self.debug_counters) - return DirectPipelineResult(state=PipelineState.DONE, - counter_factory=self._counter_factory) - - @skip_if_cached - def run_CreatePCollectionView(self, transform_node): - transform = transform_node.transform - view = transform.view - values = self._cache.get_pvalue(transform_node.inputs[0]) - result = sideinputs.SideInputMap(type(view), view._view_options(), values) - self._cache.cache_output(transform_node, result) - - @skip_if_cached - def run_ParDo(self, transform_node): - transform = transform_node.transform - - side_inputs = [self._cache.get_pvalue(view) - for view in transform_node.side_inputs] - - # TODO(robertwb): Do this type checking inside DoFnRunner to get it on - # remote workers as well? - options = transform_node.inputs[0].pipeline.options - if options is not None and options.view_as(TypeOptions).runtime_type_check: - transform.dofn = TypeCheckWrapperDoFn( - transform.dofn, transform.get_type_hints()) - - # TODO(robertwb): Should this be conditionally done on the workers as well? - transform.dofn = OutputCheckWrapperDoFn( - transform.dofn, transform_node.full_label) - - class RecordingReceiverSet(object): - - def __init__(self, tag): - self.tag = tag - - def output(self, element): - results[self.tag].append(element) - - class TaggedReceivers(dict): - - def __missing__(self, key): - return RecordingReceiverSet(key) - - results = collections.defaultdict(list) - # Some tags may be empty. - for tag in transform.side_output_tags: - results[tag] = [] - - runner = DoFnRunner(transform.dofn, transform.args, transform.kwargs, - side_inputs, transform_node.inputs[0].windowing, - tagged_receivers=TaggedReceivers(), - step_name=transform_node.full_label, - state=DoFnState(self._counter_factory)) - runner.start() - for v in self._cache.get_pvalue(transform_node.inputs[0]): - runner.process(v) - runner.finish() - - self._cache.cache_output(transform_node, []) - for tag, value in results.items(): - self.debug_counters['element_counts'][ - (transform_node.full_label, tag)] += len(value) - self._cache.cache_output(transform_node, tag, value) - - @skip_if_cached - def run_GroupByKeyOnly(self, transform_node): - result_dict = collections.defaultdict(list) - # The input type of a GroupByKey will be KV[Any, Any] or more specific. - kv_type_hint = transform_node.transform.get_type_hints().input_types[0] - key_coder = coders.registry.get_coder(kv_type_hint[0].tuple_types[0]) - - for wv in self._cache.get_pvalue(transform_node.inputs[0]): - if (isinstance(wv, WindowedValue) and - isinstance(wv.value, collections.Iterable) and len(wv.value) == 2): - k, v = wv.value - # We use as key a string encoding of the key object to support keys - # that are based on custom classes. This mimics also the remote - # execution behavior where key objects are encoded before being written - # to the shuffler system responsible for grouping. - result_dict[key_coder.encode(k)].append(v) - else: - raise TypeCheckError('Input to GroupByKeyOnly must be a PCollection of ' - 'windowed key-value pairs. Instead received: %r.' - % wv) - - gbk_result = map( - GlobalWindows.windowed_value, - ((key_coder.decode(k), v) for k, v in result_dict.iteritems())) - self.debug_counters['element_counts'][ - transform_node.full_label] += len(gbk_result) - self._cache.cache_output(transform_node, gbk_result) - - @skip_if_cached - def run_Create(self, transform_node): - transform = transform_node.transform - create_result = [GlobalWindows.windowed_value(v) for v in transform.value] - self.debug_counters['element_counts'][ - transform_node.full_label] += len(create_result) - self._cache.cache_output(transform_node, create_result) - - @skip_if_cached - def run_Flatten(self, transform_node): - flatten_result = list( - itertools.chain.from_iterable( - self._cache.get_pvalue(pc) for pc in transform_node.inputs)) - self.debug_counters['element_counts'][ - transform_node.full_label] += len(flatten_result) - self._cache.cache_output(transform_node, flatten_result) - - @skip_if_cached - def run_Read(self, transform_node): - # TODO(chamikara) Implement a more generic way for passing PipelineOptions - # to sources and sinks when using DirectRunner. - source = transform_node.transform.source - source.pipeline_options = transform_node.inputs[0].pipeline.options - - def read_values(reader): - read_result = [GlobalWindows.windowed_value(e) for e in reader] - self.debug_counters['element_counts'][ - transform_node.full_label] += len(read_result) - self._cache.cache_output(transform_node, read_result) - - # pylint: disable=wrong-import-position - from apache_beam.io import iobase - - if isinstance(source, iobase.BoundedSource): - # Getting a RangeTracker for the default range of the source and reading - # the full source using that. - range_tracker = source.get_range_tracker(None, None) - reader = source.read(range_tracker) - read_values(reader) - else: - with source.reader() as reader: - read_values(reader) - - @skip_if_cached - def run__NativeWrite(self, transform_node): - sink = transform_node.transform.sink - - # pylint: disable=wrong-import-position - from apache_beam.io import fileio - - if isinstance(sink, fileio.NativeTextFileSink): - assert sink.num_shards in (0, 1) - if sink.shard_name_template: - sink.file_path += '-00000-of-00001' - sink.file_path += sink.file_name_suffix - sink.pipeline_options = transform_node.inputs[0].pipeline.options - with sink.writer() as writer: - for v in self._cache.get_pvalue(transform_node.inputs[0]): - self.debug_counters['element_counts'][transform_node.full_label] += 1 - writer.Write(v.value) - - -class DirectPipelineResult(PipelineResult): - """A DirectPipelineResult provides access to info about a pipeline.""" - - def __init__(self, state, counter_factory=None): - super(DirectPipelineResult, self).__init__(state) - self._counter_factory = counter_factory - - def aggregated_values(self, aggregator_or_name): - return self._counter_factory.get_aggregator_values(aggregator_or_name) - - -class EagerPipelineRunner(DirectPipelineRunner): - - is_eager = True - - def __init__(self): - super(EagerPipelineRunner, self).__init__() - self._seen_transforms = set() - - def run_transform(self, transform): - if transform not in self._seen_transforms: - self._seen_transforms.add(transform) - super(EagerPipelineRunner, self).run_transform(transform) - - -class DiskCachedPipelineRunner(DirectPipelineRunner): - """A DirectPipelineRunner that uses a disk backed cache. - - DiskCachedPipelineRunner uses a temporary disk backed cache for running - pipelines. This allows for running pipelines that will require more memory - than it is available, however this comes with a performance cost due to disk - IO. - - Memory requirement for DiskCachedPipelineRunner is approximately capped by the - single transform in the pipeline that consumes and outputs largest total - collection (i.e. inputs, side-inputs and outputs in aggregate). In the extreme - case a where a transform will use all previous intermediate values as input, - memory requirements for DiskCachedPipelineRunner will be the same as - DirectPipelineRunner. - """ - - def __init__(self): - self._null_cache = () - super(DiskCachedPipelineRunner, self).__init__(self._null_cache) - - def run(self, pipeline): - try: - self._cache = PValueCache(use_disk_backed_cache=True) - return super(DirectPipelineRunner, self).run(pipeline) - finally: - del self._cache - self._cache = self._null_cache - - @property - def cache(self): - raise NotImplementedError( - 'DiskCachedPipelineRunner does not keep cache outside the scope of its ' - 'run method.') http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/inprocess/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/inprocess/__init__.py b/sdks/python/apache_beam/runners/inprocess/__init__.py deleted file mode 100644 index 53e725a..0000000 --- a/sdks/python/apache_beam/runners/inprocess/__init__.py +++ /dev/null @@ -1,19 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Inprocess runner executes pipelines locally in a single process.""" -from apache_beam.runners.inprocess.inprocess_runner import InProcessPipelineRunner http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/inprocess/bundle_factory.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/inprocess/bundle_factory.py b/sdks/python/apache_beam/runners/inprocess/bundle_factory.py deleted file mode 100644 index d284449..0000000 --- a/sdks/python/apache_beam/runners/inprocess/bundle_factory.py +++ /dev/null @@ -1,102 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""A factory that creates UncommittedBundles.""" - -from __future__ import absolute_import - -from apache_beam import pvalue - - -class BundleFactory(object): - """BundleFactory creates output bundles to be used by transform evaluators.""" - - def create_bundle(self, output_pcollection): - return Bundle(output_pcollection) - - def create_empty_committed_bundle(self, output_pcollection): - bundle = self.create_bundle(output_pcollection) - bundle.commit(None) - return bundle - - -# a bundle represents a unit of work that will be processed by a transform. -class Bundle(object): - """Part of a PCollection with output elements. - - Part of a PCollection. Elements are output to a bundle, which will cause them - to be executed by PTransform that consume the PCollection this bundle is a - part of at a later point. It starts as an uncommitted bundle and can have - elements added to it. It needs to be committed to make it immutable before - passing it to a downstream ptransform. - """ - - def __init__(self, pcollection): - assert (isinstance(pcollection, pvalue.PCollection) - or isinstance(pcollection, pvalue.PCollectionView)) - self._pcollection = pcollection - self._elements = [] - self._committed = False - self._tag = None # optional tag information for this bundle - - @property - def elements(self): - """Returns iterable elements. If not committed will return a copy.""" - if self._committed: - return self._elements - else: - return list(self._elements) - - @property - def tag(self): - return self._tag - - @tag.setter - def tag(self, value): - assert not self._tag - self._tag = value - - @property - def pcollection(self): - """PCollection that the elements of this UncommittedBundle belong to.""" - return self._pcollection - - def add(self, element): - """Outputs an element to this bundle. - - Args: - element: WindowedValue - """ - assert not self._committed - self._elements.append(element) - - def output(self, element): - self.add(element) - - def commit(self, synchronized_processing_time): - """Commits this bundle. - - Uncommitted bundle will become committed (immutable) after this call. - - Args: - synchronized_processing_time: the synchronized processing time at which - this bundle was committed - """ - assert not self._committed - self._committed = True - self._elements = tuple(self._elements) - self._synchronized_processing_time = synchronized_processing_time http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/inprocess/clock.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/inprocess/clock.py b/sdks/python/apache_beam/runners/inprocess/clock.py deleted file mode 100644 index 11e49cd..0000000 --- a/sdks/python/apache_beam/runners/inprocess/clock.py +++ /dev/null @@ -1,50 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Clock implementations for real time processing and testing.""" - -from __future__ import absolute_import - -import time - - -class Clock(object): - - @property - def now(self): - """Returns the number of milliseconds since epoch.""" - return int(time.time() * 1000) - - -class MockClock(Clock): - """Mock clock implementation for testing.""" - - def __init__(self, now_in_ms): - self._now_in_ms = now_in_ms - - @property - def now(self): - return self._now_in_ms - - @now.setter - def now(self, value_in_ms): - assert value_in_ms >= self._now_in_ms - self._now_in_ms = value_in_ms - - def advance(self, duration_in_ms): - assert duration_in_ms >= 0 - self._now_in_ms += duration_in_ms http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor.py b/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor.py deleted file mode 100644 index 6f1757a..0000000 --- a/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor.py +++ /dev/null @@ -1,59 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""ConsumerTrackingPipelineVisitor, a PipelineVisitor object.""" - -from __future__ import absolute_import - -from apache_beam import pvalue -from apache_beam.pipeline import PipelineVisitor - - -class ConsumerTrackingPipelineVisitor(PipelineVisitor): - """Visitor for extracting value-consumer relations from the graph. - - Tracks the AppliedPTransforms that consume each PValue in the Pipeline. This - is used to schedule consuming PTransforms to consume input after the upstream - transform has produced and committed output. - """ - - def __init__(self): - self.value_to_consumers = {} # Map from PValue to [AppliedPTransform]. - self.root_transforms = set() # set of (root) AppliedPTransforms. - self.views = [] # list of PCollectionViews. - self.step_names = {} # Map from AppliedPTransform to String. - - self._num_transforms = 0 - - def visit_value(self, value, producer_node): - if value: - if isinstance(value, pvalue.PCollectionView): - self.views.append(value) - - def visit_transform(self, applied_ptransform): - inputs = applied_ptransform.inputs - if inputs: - for input_value in inputs: - if isinstance(input_value, pvalue.PBegin): - self.root_transforms.add(applied_ptransform) - if input_value not in self.value_to_consumers: - self.value_to_consumers[input_value] = [] - self.value_to_consumers[input_value].append(applied_ptransform) - else: - self.root_transforms.add(applied_ptransform) - self.step_names[applied_ptransform] = 's%d' % (self._num_transforms) - self._num_transforms += 1 http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py b/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py deleted file mode 100644 index 3cd8d73..0000000 --- a/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py +++ /dev/null @@ -1,122 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Tests for consumer_tracking_pipeline_visitor.""" - -import logging -import unittest - -from apache_beam import pvalue -from apache_beam.io import Read -from apache_beam.io import TextFileSource -from apache_beam.pipeline import Pipeline -from apache_beam.pvalue import AsList -from apache_beam.runners.inprocess import InProcessPipelineRunner -from apache_beam.runners.inprocess.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor -from apache_beam.transforms import CoGroupByKey -from apache_beam.transforms import Create -from apache_beam.transforms import DoFn -from apache_beam.transforms import FlatMap -from apache_beam.transforms import Flatten -from apache_beam.transforms import ParDo - -# Disable frequent lint warning due to pipe operator for chaining transforms. -# pylint: disable=expression-not-assigned -# pylint: disable=pointless-statement - - -class ConsumerTrackingPipelineVisitorTest(unittest.TestCase): - - def setUp(self): - self.pipeline = Pipeline(InProcessPipelineRunner()) - self.visitor = ConsumerTrackingPipelineVisitor() - - def test_root_transforms(self): - root_create = Create('create', [[1, 2, 3]]) - root_read = Read('read', TextFileSource('/tmp/somefile')) - root_flatten = Flatten('flatten', pipeline=self.pipeline) - - pbegin = pvalue.PBegin(self.pipeline) - pcoll_create = pbegin | root_create - pbegin | root_read - pcoll_create | FlatMap(lambda x: x) - [] | root_flatten - - self.pipeline.visit(self.visitor) - - root_transforms = sorted( - [t.transform for t in self.visitor.root_transforms]) - self.assertEqual(root_transforms, sorted( - [root_read, root_create, root_flatten])) - - pbegin_consumers = sorted( - [c.transform for c in self.visitor.value_to_consumers[pbegin]]) - self.assertEqual(pbegin_consumers, sorted([root_read, root_create])) - self.assertEqual(len(self.visitor.step_names), 4) - - def test_side_inputs(self): - - class SplitNumbersFn(DoFn): - - def process(self, context): - if context.element < 0: - yield pvalue.SideOutputValue('tag_negative', context.element) - else: - yield context.element - - class ProcessNumbersFn(DoFn): - - def process(self, context, negatives): - yield context.element - - root_create = Create('create', [[-1, 2, 3]]) - - result = (self.pipeline - | root_create - | ParDo(SplitNumbersFn()).with_outputs('tag_negative', - main='positive')) - positive, negative = result - positive | ParDo(ProcessNumbersFn(), AsList(negative)) - - self.pipeline.visit(self.visitor) - - root_transforms = sorted( - [t.transform for t in self.visitor.root_transforms]) - self.assertEqual(root_transforms, sorted([root_create])) - self.assertEqual(len(self.visitor.step_names), 4) - self.assertEqual(len(self.visitor.views), 1) - self.assertTrue(isinstance(self.visitor.views[0], - pvalue.ListPCollectionView)) - - def test_co_group_by_key(self): - emails = self.pipeline | 'email' >> Create([('joe', 'j...@example.com')]) - phones = self.pipeline | 'phone' >> Create([('mary', '111-222-3333')]) - {'emails': emails, 'phones': phones} | CoGroupByKey() - - self.pipeline.visit(self.visitor) - - root_transforms = sorted( - [t.transform for t in self.visitor.root_transforms]) - self.assertEqual(len(root_transforms), 2) - self.assertGreater( - len(self.visitor.step_names), 3) # 2 creates + expanded CoGBK - self.assertEqual(len(self.visitor.views), 0) - - -if __name__ == '__main__': - logging.getLogger().setLevel(logging.DEBUG) - unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py b/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py deleted file mode 100644 index 7af1608..0000000 --- a/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py +++ /dev/null @@ -1,272 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""InProcessEvaluationContext tracks global state, triggers and watermarks.""" - -from __future__ import absolute_import - -import collections -import threading - -from apache_beam.transforms import sideinputs -from apache_beam.runners.inprocess.clock import Clock -from apache_beam.runners.inprocess.inprocess_watermark_manager import InProcessWatermarkManager -from apache_beam.runners.inprocess.inprocess_executor import TransformExecutor -from apache_beam.utils import counters - - -class _InProcessExecutionContext(object): - - def __init__(self, watermarks, existing_state): - self._watermarks = watermarks - self._existing_state = existing_state - - @property - def watermarks(self): - return self._watermarks - - @property - def existing_state(self): - return self._existing_state - - -class _InProcessSideInputView(object): - - def __init__(self, view): - self._view = view - self.callable_queue = collections.deque() - self.value = None - self.has_result = False - - -class _InProcessSideInputsContainer(object): - """An in-process container for PCollectionViews. - - It provides methods for blocking until a side-input is available and writing - to a side input. - """ - - def __init__(self, views): - self._lock = threading.Lock() - self._views = {} - for view in views: - self._views[view] = _InProcessSideInputView(view) - - def get_value_or_schedule_after_output(self, pcollection_view, task): - with self._lock: - view = self._views[pcollection_view] - if not view.has_result: - view.callable_queue.append(task) - task.blocked = True - return (view.has_result, view.value) - - def set_value_and_get_callables(self, pcollection_view, values): - with self._lock: - view = self._views[pcollection_view] - assert not view.has_result - assert view.value is None - assert view.callable_queue is not None - view.value = self._pvalue_to_value(pcollection_view, values) - result = tuple(view.callable_queue) - for task in result: - task.blocked = False - view.callable_queue = None - view.has_result = True - return result - - def _pvalue_to_value(self, view, values): - """Given a PCollectionView, returns the associated value in requested form. - - Args: - view: PCollectionView for the requested side input. - values: Iterable values associated with the side input. - - Returns: - The side input in its requested form. - - Raises: - ValueError: If values cannot be converted into the requested form. - """ - return sideinputs.SideInputMap(type(view), view._view_options(), values) - - -class InProcessEvaluationContext(object): - """Evaluation context with the global state information of the pipeline. - - The evaluation context for a specific pipeline being executed by the - InProcessPipelineRunner. Contains state shared within the execution across all - transforms. - - InProcessEvaluationContext contains shared state for an execution of the - InProcessPipelineRunner that can be used while evaluating a PTransform. This - consists of views into underlying state and watermark implementations, access - to read and write PCollectionViews, and constructing counter sets and - execution contexts. This includes executing callbacks asynchronously when - state changes to the appropriate point (e.g. when a PCollectionView is - requested and known to be empty). - - InProcessEvaluationContext also handles results by committing finalizing - bundles based on the current global state and updating the global state - appropriately. This includes updating the per-(step,key) state, updating - global watermarks, and executing any callbacks that can be executed. - """ - - def __init__(self, pipeline_options, bundle_factory, root_transforms, - value_to_consumers, step_names, views): - self.pipeline_options = pipeline_options - self._bundle_factory = bundle_factory - self._root_transforms = root_transforms - self._value_to_consumers = value_to_consumers - self._step_names = step_names - self.views = views - - # AppliedPTransform -> Evaluator specific state objects - self._application_state_interals = {} - self._watermark_manager = InProcessWatermarkManager( - Clock(), root_transforms, value_to_consumers) - self._side_inputs_container = _InProcessSideInputsContainer(views) - self._pending_unblocked_tasks = [] - self._counter_factory = counters.CounterFactory() - self._cache = None - - self._lock = threading.Lock() - - def use_pvalue_cache(self, cache): - assert not self._cache - self._cache = cache - - @property - def has_cache(self): - return self._cache is not None - - def append_to_cache(self, applied_ptransform, tag, elements): - with self._lock: - assert self._cache - self._cache.append(applied_ptransform, tag, elements) - - def is_root_transform(self, applied_ptransform): - return applied_ptransform in self._root_transforms - - def handle_result( - self, completed_bundle, completed_timers, result): - """Handle the provided result produced after evaluating the input bundle. - - Handle the provided InProcessTransformResult, produced after evaluating - the provided committed bundle (potentially None, if the result of a root - PTransform). - - The result is the output of running the transform contained in the - InProcessTransformResult on the contents of the provided bundle. - - Args: - completed_bundle: the bundle that was processed to produce the result. - completed_timers: the timers that were delivered to produce the - completed_bundle. - result: the InProcessTransformResult of evaluating the input bundle - - Returns: - the committed bundles contained within the handled result. - """ - with self._lock: - committed_bundles = self._commit_bundles(result.output_bundles) - self._watermark_manager.update_watermarks( - completed_bundle, result.transform, completed_timers, - committed_bundles, result.watermark_hold) - - # If the result is for a view, update side inputs container. - if (result.output_bundles - and result.output_bundles[0].pcollection in self.views): - if committed_bundles: - assert len(committed_bundles) == 1 - side_input_result = committed_bundles[0].elements - else: - side_input_result = [] - tasks = self._side_inputs_container.set_value_and_get_callables( - result.output_bundles[0].pcollection, side_input_result) - self._pending_unblocked_tasks.extend(tasks) - - if result.counters: - for counter in result.counters: - merged_counter = self._counter_factory.get_counter( - counter.name, counter.combine_fn) - merged_counter.accumulator.merge([counter.accumulator]) - - self._application_state_interals[result.transform] = result.state - return committed_bundles - - def get_aggregator_values(self, aggregator_or_name): - return self._counter_factory.get_aggregator_values(aggregator_or_name) - - def schedule_pending_unblocked_tasks(self, executor_service): - if self._pending_unblocked_tasks: - with self._lock: - for task in self._pending_unblocked_tasks: - executor_service.submit(task) - self._pending_unblocked_tasks = [] - - def _commit_bundles(self, uncommitted_bundles): - """Commits bundles and returns a immutable set of committed bundles.""" - for in_progress_bundle in uncommitted_bundles: - producing_applied_ptransform = in_progress_bundle.pcollection.producer - watermarks = self._watermark_manager.get_watermarks( - producing_applied_ptransform) - in_progress_bundle.commit(watermarks.synchronized_processing_output_time) - return tuple(uncommitted_bundles) - - def get_execution_context(self, applied_ptransform): - return _InProcessExecutionContext( - self._watermark_manager.get_watermarks(applied_ptransform), - self._application_state_interals.get(applied_ptransform)) - - def create_bundle(self, output_pcollection): - """Create an uncommitted bundle for the specified PCollection.""" - return self._bundle_factory.create_bundle(output_pcollection) - - def create_empty_committed_bundle(self, output_pcollection): - """Create empty bundle useful for triggering evaluation.""" - return self._bundle_factory.create_empty_committed_bundle( - output_pcollection) - - def extract_fired_timers(self): - return self._watermark_manager.extract_fired_timers() - - def is_done(self, transform=None): - """Checks completion of a step or the pipeline. - - Args: - transform: AppliedPTransform to check for completion. - - Returns: - True if the step will not produce additional output. If transform is None - returns true if all steps are done. - """ - if transform: - return self._is_transform_done(transform) - else: - for applied_ptransform in self._step_names: - if not self._is_transform_done(applied_ptransform): - return False - return True - - def _is_transform_done(self, transform): - tw = self._watermark_manager.get_watermarks(transform) - return tw.output_watermark == InProcessWatermarkManager.WATERMARK_POS_INF - - def get_value_or_schedule_after_output(self, pcollection_view, task): - assert isinstance(task, TransformExecutor) - return self._side_inputs_container.get_value_or_schedule_after_output( - pcollection_view, task) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/inprocess/inprocess_executor.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/inprocess/inprocess_executor.py b/sdks/python/apache_beam/runners/inprocess/inprocess_executor.py deleted file mode 100644 index 2136855..0000000 --- a/sdks/python/apache_beam/runners/inprocess/inprocess_executor.py +++ /dev/null @@ -1,550 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""An executor that schedules and executes applied ptransforms.""" - -from __future__ import absolute_import - -import collections -import logging -import Queue -import threading -import traceback -from weakref import WeakValueDictionary - - -class ExecutorService(object): - """Thread pool for executing tasks in parallel.""" - - class CallableTask(object): - - def __call__(self): - pass - - @property - def name(self): - return None - - class ExecutorServiceWorker(threading.Thread): - """Worker thread for executing a single task at a time.""" - - # Amount to block waiting for getting an item from the queue in seconds. - TIMEOUT = 5 - - def __init__(self, queue, index): - super(ExecutorService.ExecutorServiceWorker, self).__init__() - self.queue = queue - self._index = index - self._default_name = 'ExecutorServiceWorker-' + str(index) - self._update_name() - self.shutdown_requested = False - self.start() - - def _update_name(self, task=None): - if task and task.name: - name = task.name - else: - name = self._default_name - self.name = 'Thread: %d, %s (%s)' % ( - self._index, name, 'executing' if task else 'idle') - - def _get_task_or_none(self): - try: - # Do not block indefinitely, otherwise we may not act for a requested - # shutdown. - return self.queue.get( - timeout=ExecutorService.ExecutorServiceWorker.TIMEOUT) - except Queue.Empty: - return None - - def run(self): - while not self.shutdown_requested: - task = self._get_task_or_none() - if task: - try: - if not self.shutdown_requested: - self._update_name(task) - task() - self._update_name() - finally: - self.queue.task_done() - - def shutdown(self): - self.shutdown_requested = True - - def __init__(self, num_workers): - self.queue = Queue.Queue() - self.workers = [ExecutorService.ExecutorServiceWorker( - self.queue, i) for i in range(num_workers)] - self.shutdown_requested = False - - def submit(self, task): - assert isinstance(task, ExecutorService.CallableTask) - if not self.shutdown_requested: - self.queue.put(task) - - def await_completion(self): - for worker in self.workers: - worker.join() - - def shutdown(self): - self.shutdown_requested = True - - for worker in self.workers: - worker.shutdown() - - # Consume all the remaining items in the queue - while not self.queue.empty(): - try: - self.queue.get_nowait() - self.queue.task_done() - except Queue.Empty: - continue - # All existing threads will eventually terminate (after they complete their - # last task). - - -class TransformEvaluationState(object): - - def __init__(self, executor_service, scheduled): - self.executor_service = executor_service - self.scheduled = scheduled - - def schedule(self, work): - self.scheduled.add(work) - self.executor_service.submit(work) - - def complete(self, completed_work): - self.scheduled.remove(completed_work) - - -class ParallelEvaluationState(TransformEvaluationState): - """A TransformEvaluationState with unlimited parallelism. - - Any TransformExecutor scheduled will be immediately submitted to the - ExecutorService. - - A principal use of this is for evaluators that can generate output bundles - only using the input bundle (e.g. ParDo). - """ - pass - - -class SerialEvaluationState(TransformEvaluationState): - """A TransformEvaluationState with a single work queue. - - Any TransformExecutor scheduled will be placed on the work queue. Only one - item of work will be submitted to the ExecutorService at any time. - - A principal use of this is for evaluators that keeps a global state such as - GroupByKeyOnly. - """ - - def __init__(self, executor_service, scheduled): - super(SerialEvaluationState, self).__init__(executor_service, scheduled) - self.serial_queue = collections.deque() - self.currently_evaluating = None - self._lock = threading.Lock() - - def complete(self, completed_work): - self._update_currently_evaluating(None, completed_work) - super(SerialEvaluationState, self).complete(completed_work) - - def schedule(self, new_work): - self._update_currently_evaluating(new_work, None) - - def _update_currently_evaluating(self, new_work, completed_work): - with self._lock: - if new_work: - self.serial_queue.append(new_work) - if completed_work: - assert self.currently_evaluating == completed_work - self.currently_evaluating = None - if self.serial_queue and not self.currently_evaluating: - next_work = self.serial_queue.pop() - self.currently_evaluating = next_work - super(SerialEvaluationState, self).schedule(next_work) - - -class TransformExecutorServices(object): - """Schedules and completes TransformExecutors. - - Controls the concurrency as appropriate for the applied transform the executor - exists for. - """ - - def __init__(self, executor_service): - self._executor_service = executor_service - self._scheduled = set() - self._parallel = ParallelEvaluationState( - self._executor_service, self._scheduled) - self._serial_cache = WeakValueDictionary() - - def parallel(self): - return self._parallel - - def serial(self, step): - cached = self._serial_cache.get(step) - if not cached: - cached = SerialEvaluationState(self._executor_service, self._scheduled) - self._serial_cache[step] = cached - return cached - - @property - def executors(self): - return frozenset(self._scheduled) - - -class _CompletionCallback(object): - """The default completion callback. - - The default completion callback is used to complete transform evaluations - that are triggered due to the arrival of elements from an upstream transform, - or for a source transform. - """ - - def __init__(self, evaluation_context, all_updates, timers=None): - self._evaluation_context = evaluation_context - self._all_updates = all_updates - self._timers = timers - - def handle_result(self, input_committed_bundle, transform_result): - output_committed_bundles = self._evaluation_context.handle_result( - input_committed_bundle, self._timers, transform_result) - for output_committed_bundle in output_committed_bundles: - self._all_updates.offer(_ExecutorServiceParallelExecutor.ExecutorUpdate( - output_committed_bundle, None)) - return output_committed_bundles - - def handle_exception(self, exception): - self._all_updates.offer( - _ExecutorServiceParallelExecutor.ExecutorUpdate(None, exception)) - - -class _TimerCompletionCallback(_CompletionCallback): - - def __init__(self, evaluation_context, all_updates, timers): - super(_TimerCompletionCallback, self).__init__( - evaluation_context, all_updates, timers) - - -class TransformExecutor(ExecutorService.CallableTask): - """TransformExecutor will evaluate a bundle using an applied ptransform. - - A CallableTask responsible for constructing a TransformEvaluator andevaluating - it on some bundle of input, and registering the result using the completion - callback. - """ - - def __init__(self, transform_evaluator_registry, evaluation_context, - input_bundle, applied_transform, completion_callback, - transform_evaluation_state): - self._transform_evaluator_registry = transform_evaluator_registry - self._evaluation_context = evaluation_context - self._input_bundle = input_bundle - self._applied_transform = applied_transform - self._completion_callback = completion_callback - self._transform_evaluation_state = transform_evaluation_state - self._side_input_values = {} - self.blocked = False - self._call_count = 0 - - def __call__(self): - self._call_count += 1 - assert self._call_count <= (1 + len(self._applied_transform.side_inputs)) - - for side_input in self._applied_transform.side_inputs: - if side_input not in self._side_input_values: - has_result, value = ( - self._evaluation_context.get_value_or_schedule_after_output( - side_input, self)) - if not has_result: - # Monitor task will reschedule this executor once the side input is - # available. - return - self._side_input_values[side_input] = value - - side_input_values = [self._side_input_values[side_input] - for side_input in self._applied_transform.side_inputs] - - try: - evaluator = self._transform_evaluator_registry.for_application( - self._applied_transform, self._input_bundle, side_input_values) - - if self._input_bundle: - for value in self._input_bundle.elements: - evaluator.process_element(value) - - result = evaluator.finish_bundle() - - if self._evaluation_context.has_cache: - for uncommitted_bundle in result.output_bundles: - self._evaluation_context.append_to_cache( - self._applied_transform, uncommitted_bundle.tag, - uncommitted_bundle.elements) - undeclared_tag_values = result.undeclared_tag_values - if undeclared_tag_values: - for tag, value in undeclared_tag_values.iteritems(): - self._evaluation_context.append_to_cache( - self._applied_transform, tag, value) - - self._completion_callback.handle_result(self._input_bundle, result) - return result - except Exception as e: # pylint: disable=broad-except - logging.warning('Task failed: %s', traceback.format_exc(), exc_info=True) - self._completion_callback.handle_exception(e) - finally: - self._transform_evaluation_state.complete(self) - - -class InProcessExecutor(object): - - def __init__(self, *args, **kwargs): - self._executor = _ExecutorServiceParallelExecutor(*args, **kwargs) - - def start(self, roots): - self._executor.start(roots) - - def await_completion(self): - self._executor.await_completion() - - -class _ExecutorServiceParallelExecutor(object): - """An internal implementation for InProcessExecutor.""" - - NUM_WORKERS = 1 - - def __init__(self, value_to_consumers, transform_evaluator_registry, - evaluation_context): - self.executor_service = ExecutorService( - _ExecutorServiceParallelExecutor.NUM_WORKERS) - self.transform_executor_services = TransformExecutorServices( - self.executor_service) - self.value_to_consumers = value_to_consumers - self.transform_evaluator_registry = transform_evaluator_registry - self.evaluation_context = evaluation_context - self.all_updates = _ExecutorServiceParallelExecutor._TypedUpdateQueue( - _ExecutorServiceParallelExecutor.ExecutorUpdate) - self.visible_updates = _ExecutorServiceParallelExecutor._TypedUpdateQueue( - _ExecutorServiceParallelExecutor.VisibleExecutorUpdate) - self.default_completion_callback = _CompletionCallback( - evaluation_context, self.all_updates) - - def start(self, roots): - self.root_nodes = frozenset(roots) - self.executor_service.submit( - _ExecutorServiceParallelExecutor._MonitorTask(self)) - - def await_completion(self): - update = self.visible_updates.take() - try: - if update.exception: - raise update.exception - finally: - self.executor_service.shutdown() - - def schedule_consumers(self, committed_bundle): - if committed_bundle.pcollection in self.value_to_consumers: - consumers = self.value_to_consumers[committed_bundle.pcollection] - for applied_ptransform in consumers: - self.schedule_consumption(applied_ptransform, committed_bundle, - self.default_completion_callback) - - def schedule_consumption(self, consumer_applied_transform, committed_bundle, - on_complete): - """Schedules evaluation of the given bundle with the transform.""" - assert all([consumer_applied_transform, on_complete]) - assert committed_bundle or consumer_applied_transform in self.root_nodes - if (committed_bundle - and self.transform_evaluator_registry.should_execute_serially( - consumer_applied_transform)): - transform_executor_service = self.transform_executor_services.serial( - consumer_applied_transform) - else: - transform_executor_service = self.transform_executor_services.parallel() - - transform_executor = TransformExecutor( - self.transform_evaluator_registry, self.evaluation_context, - committed_bundle, consumer_applied_transform, on_complete, - transform_executor_service) - transform_executor_service.schedule(transform_executor) - - class _TypedUpdateQueue(object): - """Type checking update queue with blocking and non-blocking operations.""" - - def __init__(self, item_type): - self._item_type = item_type - self._queue = Queue.Queue() - - def poll(self): - try: - item = self._queue.get_nowait() - self._queue.task_done() - return item - except Queue.Empty: - return None - - def take(self): - item = self._queue.get() - self._queue.task_done() - return item - - def offer(self, item): - assert isinstance(item, self._item_type) - self._queue.put_nowait(item) - - class ExecutorUpdate(object): - """An internal status update on the state of the executor.""" - - def __init__(self, produced_bundle=None, exception=None): - # Exactly one of them should be not-None - assert bool(produced_bundle) != bool(exception) - self.committed_bundle = produced_bundle - self.exception = exception - - class VisibleExecutorUpdate(object): - """An update of interest to the user. - - Used for awaiting the completion to decide whether to return normally or - raise an exception. - """ - - def __init__(self, exception=None): - self.finished = exception is not None - self.exception = exception - - class _MonitorTask(ExecutorService.CallableTask): - """MonitorTask continuously runs to ensure that pipeline makes progress.""" - - def __init__(self, executor): - self._executor = executor - - @property - def name(self): - return 'monitor' - - def __call__(self): - try: - update = self._executor.all_updates.poll() - while update: - if update.committed_bundle: - self._executor.schedule_consumers(update.committed_bundle) - else: - assert update.exception - logging.warning('A task failed with exception.\n %s', - update.exception) - self._executor.visible_updates.offer( - _ExecutorServiceParallelExecutor.VisibleExecutorUpdate( - update.exception)) - update = self._executor.all_updates.poll() - self._executor.evaluation_context.schedule_pending_unblocked_tasks( - self._executor.executor_service) - self._add_work_if_necessary(self._fire_timers()) - except Exception as e: # pylint: disable=broad-except - logging.error('Monitor task died due to exception.\n %s', e) - self._executor.visible_updates.offer( - _ExecutorServiceParallelExecutor.VisibleExecutorUpdate(e)) - finally: - if not self._should_shutdown(): - self._executor.executor_service.submit(self) - - def _should_shutdown(self): - """_should_shutdown checks whether pipeline is completed or not. - - It will check for successful completion by checking the watermarks of all - transforms. If they all reached the maximum watermark it means that - pipeline successfully reached to completion. - - If the above is not true, it will check that at least one executor is - making progress. Otherwise pipeline will be declared stalled. - - If the pipeline reached to a terminal state as explained above - _should_shutdown will request executor to gracefully shutdown. - - Returns: - True if pipeline reached a terminal state and monitor task could finish. - Otherwise monitor task should schedule itself again for future - execution. - """ - if self._executor.evaluation_context.is_done(): - self._executor.visible_updates.offer( - _ExecutorServiceParallelExecutor.VisibleExecutorUpdate()) - self._executor.executor_service.shutdown() - return True - elif not self._is_executing: - self._executor.visible_updates.offer( - _ExecutorServiceParallelExecutor.VisibleExecutorUpdate( - Exception('Monitor task detected a pipeline stall.'))) - self._executor.executor_service.shutdown() - return True - return False - - def _fire_timers(self): - """Schedules triggered consumers if any timers fired. - - Returns: - True if timers fired. - """ - fired_timers = self._executor.evaluation_context.extract_fired_timers() - for applied_ptransform in fired_timers: - # Use an empty committed bundle. just to trigger. - empty_bundle = ( - self._executor.evaluation_context.create_empty_committed_bundle( - applied_ptransform.inputs[0])) - timer_completion_callback = _TimerCompletionCallback( - self._executor.evaluation_context, self._executor.all_updates, - applied_ptransform) - - self._executor.schedule_consumption( - applied_ptransform, empty_bundle, timer_completion_callback) - return bool(fired_timers) - - def _is_executing(self): - """Returns True if there is at least one non-blocked TransformExecutor.""" - for transform_executor in ( - self._executor.transform_executor_services.executors): - if not transform_executor.blocked: - return True - return False - - def _add_work_if_necessary(self, timers_fired): - """Adds more work from the roots if pipeline requires more input. - - If all active TransformExecutors are in a blocked state, add more work - from root nodes that may have additional work. This ensures that if a - pipeline has elements available from the root nodes it will add those - elements when necessary. - - Args: - timers_fired: True if any timers fired prior to this call. - """ - # If any timers have fired, they will add more work; No need to add more. - if timers_fired: - return - - if self._is_executing(): - # We have at least one executor that can proceed without adding - # additional work. - return - - # All current TransformExecutors are blocked; add more work from the - # roots. - for applied_transform in self._executor.root_nodes: - if not self._executor.evaluation_context.is_done(applied_transform): - self._executor.schedule_consumption( - applied_transform, None, - self._executor.default_completion_callback) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/inprocess/inprocess_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/inprocess/inprocess_runner.py b/sdks/python/apache_beam/runners/inprocess/inprocess_runner.py deleted file mode 100644 index 287c170..0000000 --- a/sdks/python/apache_beam/runners/inprocess/inprocess_runner.py +++ /dev/null @@ -1,142 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""InProcessPipelineRunner, executing on the local machine.""" - -from __future__ import absolute_import - -import collections -import logging - -from apache_beam.runners.inprocess.bundle_factory import BundleFactory -from apache_beam.runners.inprocess.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor -from apache_beam.runners.inprocess.inprocess_evaluation_context import InProcessEvaluationContext -from apache_beam.runners.inprocess.inprocess_executor import InProcessExecutor -from apache_beam.runners.inprocess.transform_evaluator import TransformEvaluatorRegistry -from apache_beam.runners.runner import PipelineResult -from apache_beam.runners.runner import PipelineRunner -from apache_beam.runners.runner import PipelineState -from apache_beam.runners.runner import PValueCache - - -class InProcessPipelineRunner(PipelineRunner): - """Executes a single pipeline on the local machine.""" - - def __init__(self): - self._cache = None - - def run(self, pipeline): - """Execute the entire pipeline and returns an InProcessPipelineResult.""" - logging.info('Running pipeline with InProcessPipelineRunner.') - self.visitor = ConsumerTrackingPipelineVisitor() - pipeline.visit(self.visitor) - - evaluation_context = InProcessEvaluationContext( - pipeline.options, - BundleFactory(), - self.visitor.root_transforms, - self.visitor.value_to_consumers, - self.visitor.step_names, - self.visitor.views) - - evaluation_context.use_pvalue_cache(self._cache) - - executor = InProcessExecutor(self.visitor.value_to_consumers, - TransformEvaluatorRegistry(evaluation_context), - evaluation_context) - # Start the executor. This is a non-blocking call, it will start the - # execution in background threads and return. - executor.start(self.visitor.root_transforms) - result = InProcessPipelineResult(executor, evaluation_context) - - # TODO(altay): If blocking: - # Block until the pipeline completes. This call will return after the - # pipeline was fully terminated (successfully or with a failure). - result.await_completion() - - if self._cache: - self._cache.finalize() - - return result - - @property - def cache(self): - if not self._cache: - self._cache = InProcessBufferingInMemoryCache() - return self._cache.pvalue_cache - - def apply(self, transform, input): # pylint: disable=redefined-builtin - """Runner callback for a pipeline.apply call.""" - return transform.apply(input) - - -class InProcessBufferingInMemoryCache(object): - """PValueCache wrapper for buffering bundles until a PValue is fully computed. - - InProcessBufferingInMemoryCache keeps an in memory cache of - (applied_ptransform, tag) tuples. It accepts appending to existing cache - entries until it is finalized. finalize() will make all the existing cached - entries visible to the underyling PValueCache in their entirety, clean the in - memory cache and stop accepting new cache entries. - """ - - def __init__(self): - self._cache = collections.defaultdict(list) - self._pvalue_cache = PValueCache() - self._finalized = False - - @property - def pvalue_cache(self): - return self._pvalue_cache - - def append(self, applied_ptransform, tag, elements): - assert not self._finalized - assert elements is not None - self._cache[(applied_ptransform, tag)].extend(elements) - - def finalize(self): - """Make buffered cache elements visible to the underlying PValueCache.""" - assert not self._finalized - for key, value in self._cache.iteritems(): - applied_ptransform, tag = key - self._pvalue_cache.cache_output(applied_ptransform, tag, value) - self._cache = None - - -class InProcessPipelineResult(PipelineResult): - """A InProcessPipelineResult provides access to info about a pipeline.""" - - def __init__(self, executor, evaluation_context): - super(InProcessPipelineResult, self).__init__(PipelineState.RUNNING) - self._executor = executor - self._evaluation_context = evaluation_context - - def _is_in_terminal_state(self): - return self._state is not PipelineState.RUNNING - - def await_completion(self): - if not self._is_in_terminal_state(): - try: - self._executor.await_completion() - self._state = PipelineState.DONE - except: # pylint: disable=broad-except - self._state = PipelineState.FAILED - raise - return self._state - - def aggregated_values(self, aggregator_or_name): - return self._evaluation_context.get_aggregator_values(aggregator_or_name) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/inprocess/inprocess_runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/inprocess/inprocess_runner_test.py b/sdks/python/apache_beam/runners/inprocess/inprocess_runner_test.py deleted file mode 100644 index aa9db24..0000000 --- a/sdks/python/apache_beam/runners/inprocess/inprocess_runner_test.py +++ /dev/null @@ -1,121 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Tests for InProcessPipelineRunner.""" - -import logging -import unittest - -from apache_beam import Pipeline -import apache_beam.examples.snippets.snippets_test as snippets_test -import apache_beam.io.fileio_test as fileio_test -import apache_beam.io.textio_test as textio_test -import apache_beam.io.sources_test as sources_test -import apache_beam.pipeline_test as pipeline_test -import apache_beam.pvalue_test as pvalue_test -from apache_beam.runners.inprocess.inprocess_runner import InProcessPipelineRunner -import apache_beam.transforms.aggregator_test as aggregator_test -import apache_beam.transforms.combiners_test as combiners_test -import apache_beam.transforms.ptransform_test as ptransform_test -import apache_beam.transforms.trigger_test as trigger_test -import apache_beam.transforms.window_test as window_test -import apache_beam.transforms.write_ptransform_test as write_ptransform_test -import apache_beam.typehints.typed_pipeline_test as typed_pipeline_test - - -class TestWithInProcessPipelineRunner(object): - - def setUp(self): - original_init = Pipeline.__init__ - - def override_pipeline_init(self, runner=None, options=None, argv=None): - runner = InProcessPipelineRunner() - return original_init(self, runner, options, argv) - - self.runner_name = None - self.original_init = original_init - Pipeline.__init__ = override_pipeline_init - - def tearDown(self): - Pipeline.__init__ = self.original_init - - -class InProcessPipelineRunnerPipelineTest( - TestWithInProcessPipelineRunner, pipeline_test.PipelineTest): - - def test_cached_pvalues_are_refcounted(self): - # InProcessPipelineRunner does not have a refcounted cache. - pass - - def test_eager_pipeline(self): - # Tests eager runner only - pass - - -class InProcessPipelineRunnerSnippetsTest( - TestWithInProcessPipelineRunner, snippets_test.SnippetsTest, - snippets_test.ParDoTest, snippets_test.TypeHintsTest, - snippets_test.CombineTest): - pass - - -class InProcessPipelineRunnerTransform( - TestWithInProcessPipelineRunner, aggregator_test.AggregatorTest, - combiners_test.CombineTest, ptransform_test.PTransformTest, - pvalue_test.PValueTest, window_test.WindowTest, - typed_pipeline_test.MainInputTest, typed_pipeline_test.SideInputTest, - typed_pipeline_test.CustomTransformTest, trigger_test.TriggerPipelineTest, - write_ptransform_test.WriteTest): - pass - - -class TestTextFileSource( - TestWithInProcessPipelineRunner, fileio_test.TestTextFileSource): - pass - - -class TestNativeTextFileSink( - TestWithInProcessPipelineRunner, fileio_test.TestNativeTextFileSink): - - def setUp(self): - TestWithInProcessPipelineRunner.setUp(self) - fileio_test.TestNativeTextFileSink.setUp(self) - - -class TestTextFileSink( - TestWithInProcessPipelineRunner, textio_test.TextSinkTest): - - def setUp(self): - TestWithInProcessPipelineRunner.setUp(self) - textio_test.TextSinkTest.setUp(self) - - -class MyFileSink(TestWithInProcessPipelineRunner, fileio_test.MyFileSink): - pass - - -class TestFileSink(TestWithInProcessPipelineRunner, fileio_test.TestFileSink): - pass - - -class SourcesTest(TestWithInProcessPipelineRunner, sources_test.SourcesTest): - pass - - -if __name__ == '__main__': - logging.getLogger().setLevel(logging.DEBUG) - unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/inprocess/inprocess_transform_result.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/inprocess/inprocess_transform_result.py b/sdks/python/apache_beam/runners/inprocess/inprocess_transform_result.py deleted file mode 100644 index 798ebfb..0000000 --- a/sdks/python/apache_beam/runners/inprocess/inprocess_transform_result.py +++ /dev/null @@ -1,60 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""The result of evaluating an AppliedPTransform with a TransformEvaluator.""" - -from __future__ import absolute_import - - -class InProcessTransformResult(object): - """The result of evaluating an AppliedPTransform with a TransformEvaluator.""" - - def __init__(self, applied_ptransform, uncommitted_output_bundles, state, - timer_update, counters, watermark_hold, - undeclared_tag_values=None): - self._applied_ptransform = applied_ptransform - self._uncommitted_output_bundles = uncommitted_output_bundles - self._state = state - self._timer_update = timer_update - self._counters = counters - self._watermark_hold = watermark_hold - # Only used when caching (materializing) all values is requested. - self._undeclared_tag_values = undeclared_tag_values - - @property - def transform(self): - return self._applied_ptransform - - @property - def output_bundles(self): - return self._uncommitted_output_bundles - - @property - def state(self): - return self._state - - @property - def counters(self): - return self._counters - - @property - def watermark_hold(self): - return self._watermark_hold - - @property - def undeclared_tag_values(self): - return self._undeclared_tag_values