http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py index 62c09ed..e656600 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py @@ -69,34 +69,20 @@ from apache_beam.utils import processes from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import SetupOptions -# All constants are for internal use only; no backwards-compatibility -# guarantees. -# In a released version BEAM_CONTAINER_VERSION and BEAM_FNAPI_CONTAINER_VERSION -# should match each other, and should be in the same format as the SDK version -# (i.e. MAJOR.MINOR.PATCH). For non-released (dev) versions, read below. # Update this version to the next version whenever there is a change that will -# require changes to legacy Dataflow worker execution environment. +# require changes to the execution environment. # This should be in the beam-[version]-[date] format, date is optional. -BEAM_CONTAINER_VERSION = 'beam-2.1.0-20170626' -# Update this version to the next version whenever there is a change that -# requires changes to SDK harness container or SDK harness launcher. -# This should be in the beam-[version]-[date] format, date is optional. -BEAM_FNAPI_CONTAINER_VERSION = 'beam-2.1.0-20170621' +BEAM_CONTAINER_VERSION = 'beam-2.1.0-20170601' # Standard file names used for staging files. WORKFLOW_TARBALL_FILE = 'workflow.tar.gz' REQUIREMENTS_FILE = 'requirements.txt' EXTRA_PACKAGES_FILE = 'extra_packages.txt' -# Package names for different distributions GOOGLE_PACKAGE_NAME = 'google-cloud-dataflow' BEAM_PACKAGE_NAME = 'apache-beam' -# SDK identifiers for different distributions -GOOGLE_SDK_NAME = 'Google Cloud Dataflow SDK for Python' -BEAM_SDK_NAME = 'Apache Beam SDK for Python' - def _dependency_file_copy(from_path, to_path): """Copies a local file to a GCS file or vice versa.""" @@ -488,33 +474,10 @@ def _stage_beam_sdk_tarball(sdk_remote_location, staged_path, temp_dir): 'type of location: %s' % sdk_remote_location) -def get_default_container_image_for_current_sdk(job_type): +def get_required_container_version(): """For internal use only; no backwards-compatibility guarantees. - Args: - job_type (str): BEAM job type. - - Returns: - str: Google Cloud Dataflow container image for remote execution. - """ - # TODO(tvalentyn): Use enumerated type instead of strings for job types. - if job_type == 'FNAPI_BATCH' or job_type == 'FNAPI_STREAMING': - image_name = 'dataflow.gcr.io/v1beta3/python-fnapi' - else: - image_name = 'dataflow.gcr.io/v1beta3/python' - image_tag = _get_required_container_version(job_type) - return image_name + ':' + image_tag - - -def _get_required_container_version(job_type=None): - """For internal use only; no backwards-compatibility guarantees. - - Args: - job_type (str, optional): BEAM job type. Defaults to None. - - Returns: - str: The tag of worker container images in GCR that corresponds to - current version of the SDK. + Returns the Google Cloud Dataflow container version for remote execution. """ # TODO(silviuc): Handle apache-beam versions when we have official releases. import pkg_resources as pkg @@ -530,34 +493,28 @@ def _get_required_container_version(job_type=None): except pkg.DistributionNotFound: # This case covers Apache Beam end-to-end testing scenarios. All these tests # will run with a special container version. - if job_type == 'FNAPI_BATCH' or job_type == 'FNAPI_STREAMING': - return BEAM_FNAPI_CONTAINER_VERSION - else: - return BEAM_CONTAINER_VERSION + return BEAM_CONTAINER_VERSION def get_sdk_name_and_version(): """For internal use only; no backwards-compatibility guarantees. Returns name and version of SDK reported to Google Cloud Dataflow.""" - import pkg_resources as pkg - container_version = _get_required_container_version() - try: - pkg.get_distribution(GOOGLE_PACKAGE_NAME) - return (GOOGLE_SDK_NAME, container_version) - except pkg.DistributionNotFound: - return (BEAM_SDK_NAME, beam_version.__version__) + # TODO(ccy): Make this check cleaner. + container_version = get_required_container_version() + if container_version == BEAM_CONTAINER_VERSION: + return ('Apache Beam SDK for Python', beam_version.__version__) + return ('Google Cloud Dataflow SDK for Python', container_version) def get_sdk_package_name(): """For internal use only; no backwards-compatibility guarantees. Returns the PyPI package name to be staged to Google Cloud Dataflow.""" - sdk_name, _ = get_sdk_name_and_version() - if sdk_name == GOOGLE_SDK_NAME: - return GOOGLE_PACKAGE_NAME - else: + container_version = get_required_container_version() + if container_version == BEAM_CONTAINER_VERSION: return BEAM_PACKAGE_NAME + return GOOGLE_PACKAGE_NAME def _download_pypi_sdk_package(temp_dir):
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/runners/dataflow/native_io/iobase_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/iobase_test.py b/sdks/python/apache_beam/runners/dataflow/native_io/iobase_test.py index 3d8c24f..7610baf 100644 --- a/sdks/python/apache_beam/runners/dataflow/native_io/iobase_test.py +++ b/sdks/python/apache_beam/runners/dataflow/native_io/iobase_test.py @@ -20,9 +20,7 @@ import unittest -from apache_beam import Create -from apache_beam import error -from apache_beam import pvalue +from apache_beam import error, pvalue from apache_beam.runners.dataflow.native_io.iobase import ( _dict_printable_fields, _NativeWrite, @@ -30,12 +28,10 @@ from apache_beam.runners.dataflow.native_io.iobase import ( DynamicSplitRequest, DynamicSplitResultWithPosition, NativeSink, - NativeSinkWriter, NativeSource, ReaderPosition, ReaderProgress ) -from apache_beam.testing.test_pipeline import TestPipeline class TestHelperFunctions(unittest.TestCase): @@ -158,39 +154,6 @@ class TestNativeSink(unittest.TestCase): fake_sink = FakeSink() self.assertEqual(fake_sink.__repr__(), "<FakeSink ['validate=False']>") - def test_on_direct_runner(self): - class FakeSink(NativeSink): - """A fake sink outputing a number of elements.""" - - def __init__(self): - self.written_values = [] - self.writer_instance = FakeSinkWriter(self.written_values) - - def writer(self): - return self.writer_instance - - class FakeSinkWriter(NativeSinkWriter): - """A fake sink writer for testing.""" - - def __init__(self, written_values): - self.written_values = written_values - - def __enter__(self): - return self - - def __exit__(self, *unused_args): - pass - - def Write(self, value): - self.written_values.append(value) - - p = TestPipeline() - sink = FakeSink() - p | Create(['a', 'b', 'c']) | _NativeWrite(sink) # pylint: disable=expression-not-assigned - p.run() - - self.assertEqual(['a', 'b', 'c'], sink.written_values) - class Test_NativeWrite(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py b/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py deleted file mode 100644 index 8c6c8d6..0000000 --- a/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py +++ /dev/null @@ -1,72 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Create transform for streaming.""" - -from apache_beam import pvalue -from apache_beam import DoFn -from apache_beam import ParDo -from apache_beam import PTransform -from apache_beam import Windowing -from apache_beam.transforms.window import GlobalWindows - - -class StreamingCreate(PTransform): - """A specialized implementation for ``Create`` transform in streaming mode. - - Note: There is no unbounded source API in python to wrap the Create source, - so we map this to composite of Impulse primitive and an SDF. - """ - - def __init__(self, values, coder): - self.coder = coder - self.encoded_values = map(coder.encode, values) - - class DecodeAndEmitDoFn(DoFn): - """A DoFn which stores encoded versions of elements. - - It also stores a Coder to decode and emit those elements. - TODO: BEAM-2422 - Make this a SplittableDoFn. - """ - - def __init__(self, encoded_values, coder): - self.encoded_values = encoded_values - self.coder = coder - - def process(self, unused_element): - for encoded_value in self.encoded_values: - yield self.coder.decode(encoded_value) - - class Impulse(PTransform): - """The Dataflow specific override for the impulse primitive.""" - - def expand(self, pbegin): - assert isinstance(pbegin, pvalue.PBegin), ( - 'Input to Impulse transform must be a PBegin but found %s' % pbegin) - return pvalue.PCollection(pbegin.pipeline) - - def get_windowing(self, inputs): - return Windowing(GlobalWindows()) - - def infer_output_type(self, unused_input_type): - return bytes - - def expand(self, pbegin): - return (pbegin - | 'Impulse' >> self.Impulse() - | 'Decode Values' >> ParDo( - self.DecodeAndEmitDoFn(self.encoded_values, self.coder))) http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py deleted file mode 100644 index 680a4b7..0000000 --- a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py +++ /dev/null @@ -1,52 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Ptransform overrides for DataflowRunner.""" - -from apache_beam.coders import typecoders -from apache_beam.pipeline import PTransformOverride - - -class CreatePTransformOverride(PTransformOverride): - """A ``PTransformOverride`` for ``Create`` in streaming mode.""" - - def get_matcher(self): - return self.is_streaming_create - - @staticmethod - def is_streaming_create(applied_ptransform): - # Imported here to avoid circular dependencies. - # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam import Create - from apache_beam.options.pipeline_options import StandardOptions - - if isinstance(applied_ptransform.transform, Create): - standard_options = (applied_ptransform - .outputs[None] - .pipeline._options - .view_as(StandardOptions)) - return standard_options.streaming - else: - return False - - def get_replacement_transform(self, ptransform): - # Imported here to avoid circular dependencies. - # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam.runners.dataflow.native_io.streaming_create import \ - StreamingCreate - coder = typecoders.registry.get_coder(ptransform.get_output_type()) - return StreamingCreate(ptransform.value, coder) http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/runners/direct/bundle_factory.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/bundle_factory.py b/sdks/python/apache_beam/runners/direct/bundle_factory.py index 0182b4c..ed00b03 100644 --- a/sdks/python/apache_beam/runners/direct/bundle_factory.py +++ b/sdks/python/apache_beam/runners/direct/bundle_factory.py @@ -108,7 +108,7 @@ class _Bundle(object): self._initial_windowed_value.windows) def __init__(self, pcollection, stacked=True): - assert isinstance(pcollection, (pvalue.PBegin, pvalue.PCollection)) + assert isinstance(pcollection, pvalue.PCollection) self._pcollection = pcollection self._elements = [] self._stacked = stacked http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/runners/direct/direct_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index 1a94b3d..ecf5114 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -26,55 +26,22 @@ from __future__ import absolute_import import collections import logging -import apache_beam as beam -from apache_beam import typehints from apache_beam.metrics.execution import MetricsEnvironment -from apache_beam.pvalue import PCollection from apache_beam.runners.direct.bundle_factory import BundleFactory from apache_beam.runners.runner import PipelineResult from apache_beam.runners.runner import PipelineRunner from apache_beam.runners.runner import PipelineState from apache_beam.runners.runner import PValueCache -from apache_beam.transforms.core import _GroupAlsoByWindow -from apache_beam.transforms.core import _GroupByKeyOnly from apache_beam.options.pipeline_options import DirectOptions -from apache_beam.options.pipeline_options import StandardOptions from apache_beam.options.value_provider import RuntimeValueProvider __all__ = ['DirectRunner'] -# Type variables. -K = typehints.TypeVariable('K') -V = typehints.TypeVariable('V') - - [email protected]_input_types(typehints.KV[K, V]) [email protected]_output_types(typehints.KV[K, typehints.Iterable[V]]) -class _StreamingGroupByKeyOnly(_GroupByKeyOnly): - """Streaming GroupByKeyOnly placeholder for overriding in DirectRunner.""" - pass - - [email protected]_input_types(typehints.KV[K, typehints.Iterable[V]]) [email protected]_output_types(typehints.KV[K, typehints.Iterable[V]]) -class _StreamingGroupAlsoByWindow(_GroupAlsoByWindow): - """Streaming GroupAlsoByWindow placeholder for overriding in DirectRunner.""" - pass - - class DirectRunner(PipelineRunner): """Executes a single pipeline on the local machine.""" - # A list of PTransformOverride objects to be applied before running a pipeline - # using DirectRunner. - # Currently this only works for overrides where the input and output types do - # not change. - # For internal SDK use only. This should not be updated by Beam pipeline - # authors. - _PTRANSFORM_OVERRIDES = [] - def __init__(self): self._cache = None @@ -89,84 +56,9 @@ class DirectRunner(PipelineRunner): except NotImplementedError: return transform.expand(pcoll) - def apply__GroupByKeyOnly(self, transform, pcoll): - if (transform.__class__ == _GroupByKeyOnly and - pcoll.pipeline._options.view_as(StandardOptions).streaming): - # Use specialized streaming implementation, if requested. - type_hints = transform.get_type_hints() - return pcoll | (_StreamingGroupByKeyOnly() - .with_input_types(*type_hints.input_types[0]) - .with_output_types(*type_hints.output_types[0])) - return transform.expand(pcoll) - - def apply__GroupAlsoByWindow(self, transform, pcoll): - if (transform.__class__ == _GroupAlsoByWindow and - pcoll.pipeline._options.view_as(StandardOptions).streaming): - # Use specialized streaming implementation, if requested. - type_hints = transform.get_type_hints() - return pcoll | (_StreamingGroupAlsoByWindow(transform.windowing) - .with_input_types(*type_hints.input_types[0]) - .with_output_types(*type_hints.output_types[0])) - return transform.expand(pcoll) - - def apply_ReadStringsFromPubSub(self, transform, pcoll): - try: - from google.cloud import pubsub as unused_pubsub - except ImportError: - raise ImportError('Google Cloud PubSub not available, please install ' - 'apache_beam[gcp]') - # Execute this as a native transform. - output = PCollection(pcoll.pipeline) - output.element_type = unicode - return output - - def apply_WriteStringsToPubSub(self, transform, pcoll): - try: - from google.cloud import pubsub - except ImportError: - raise ImportError('Google Cloud PubSub not available, please install ' - 'apache_beam[gcp]') - project = transform._sink.project - topic_name = transform._sink.topic_name - - class DirectWriteToPubSub(beam.DoFn): - _topic = None - - def __init__(self, project, topic_name): - self.project = project - self.topic_name = topic_name - - def start_bundle(self): - if self._topic is None: - self._topic = pubsub.Client(project=self.project).topic( - self.topic_name) - self._buffer = [] - - def process(self, elem): - self._buffer.append(elem.encode('utf-8')) - if len(self._buffer) >= 100: - self._flush() - - def finish_bundle(self): - self._flush() - - def _flush(self): - if self._buffer: - with self._topic.batch() as batch: - for datum in self._buffer: - batch.publish(datum) - self._buffer = [] - - output = pcoll | beam.ParDo(DirectWriteToPubSub(project, topic_name)) - output.element_type = unicode - return output - def run(self, pipeline): """Execute the entire pipeline and returns an DirectPipelineResult.""" - # Performing configured PTransform overrides. - pipeline.replace_all(DirectRunner._PTRANSFORM_OVERRIDES) - # TODO: Move imports to top. Pipeline <-> Runner dependency cause problems # with resolving imports when they are at top. # pylint: disable=wrong-import-position http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/runners/direct/evaluation_context.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py index 54c407c..68d99d3 100644 --- a/sdks/python/apache_beam/runners/direct/evaluation_context.py +++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py @@ -27,22 +27,22 @@ from apache_beam.runners.direct.clock import Clock from apache_beam.runners.direct.watermark_manager import WatermarkManager from apache_beam.runners.direct.executor import TransformExecutor from apache_beam.runners.direct.direct_metrics import DirectMetrics -from apache_beam.transforms.trigger import InMemoryUnmergedState from apache_beam.utils import counters class _ExecutionContext(object): - def __init__(self, watermarks, keyed_states): - self.watermarks = watermarks - self.keyed_states = keyed_states + def __init__(self, watermarks, existing_state): + self._watermarks = watermarks + self._existing_state = existing_state - self._step_context = None + @property + def watermarks(self): + return self._watermarks - def get_step_context(self): - if not self._step_context: - self._step_context = DirectStepContext(self.keyed_states) - return self._step_context + @property + def existing_state(self): + return self._existing_state class _SideInputView(object): @@ -145,11 +145,11 @@ class EvaluationContext(object): self._pcollection_to_views = collections.defaultdict(list) for view in views: self._pcollection_to_views[view.pvalue].append(view) - self._transform_keyed_states = self._initialize_keyed_states( - root_transforms, value_to_consumers) + + # AppliedPTransform -> Evaluator specific state objects + self._application_state_interals = {} self._watermark_manager = WatermarkManager( - Clock(), root_transforms, value_to_consumers, - self._transform_keyed_states) + Clock(), root_transforms, value_to_consumers) self._side_inputs_container = _SideInputsContainer(views) self._pending_unblocked_tasks = [] self._counter_factory = counters.CounterFactory() @@ -158,15 +158,6 @@ class EvaluationContext(object): self._lock = threading.Lock() - def _initialize_keyed_states(self, root_transforms, value_to_consumers): - transform_keyed_states = {} - for transform in root_transforms: - transform_keyed_states[transform] = {} - for consumers in value_to_consumers.values(): - for consumer in consumers: - transform_keyed_states[consumer] = {} - return transform_keyed_states - def use_pvalue_cache(self, cache): assert not self._cache self._cache = cache @@ -208,12 +199,11 @@ class EvaluationContext(object): the committed bundles contained within the handled result. """ with self._lock: - committed_bundles, unprocessed_bundles = self._commit_bundles( - result.uncommitted_output_bundles, - result.unprocessed_bundles) + committed_bundles = self._commit_bundles( + result.uncommitted_output_bundles) self._watermark_manager.update_watermarks( completed_bundle, result.transform, completed_timers, - committed_bundles, unprocessed_bundles, result.keyed_watermark_holds) + committed_bundles, result.watermark_hold) self._metrics.commit_logical(completed_bundle, result.logical_metric_updates) @@ -241,6 +231,7 @@ class EvaluationContext(object): counter.name, counter.combine_fn) merged_counter.accumulator.merge([counter.accumulator]) + self._application_state_interals[result.transform] = result.state return committed_bundles def get_aggregator_values(self, aggregator_or_name): @@ -253,22 +244,19 @@ class EvaluationContext(object): executor_service.submit(task) self._pending_unblocked_tasks = [] - def _commit_bundles(self, uncommitted_bundles, unprocessed_bundles): + def _commit_bundles(self, uncommitted_bundles): """Commits bundles and returns a immutable set of committed bundles.""" for in_progress_bundle in uncommitted_bundles: producing_applied_ptransform = in_progress_bundle.pcollection.producer watermarks = self._watermark_manager.get_watermarks( producing_applied_ptransform) in_progress_bundle.commit(watermarks.synchronized_processing_output_time) - - for unprocessed_bundle in unprocessed_bundles: - unprocessed_bundle.commit(None) - return tuple(uncommitted_bundles), tuple(unprocessed_bundles) + return tuple(uncommitted_bundles) def get_execution_context(self, applied_ptransform): return _ExecutionContext( self._watermark_manager.get_watermarks(applied_ptransform), - self._transform_keyed_states[applied_ptransform]) + self._application_state_interals.get(applied_ptransform)) def create_bundle(self, output_pcollection): """Create an uncommitted bundle for the specified PCollection.""" @@ -308,24 +296,3 @@ class EvaluationContext(object): assert isinstance(task, TransformExecutor) return self._side_inputs_container.get_value_or_schedule_after_output( side_input, task) - - -class DirectUnmergedState(InMemoryUnmergedState): - """UnmergedState implementation for the DirectRunner.""" - - def __init__(self): - super(DirectUnmergedState, self).__init__(defensive_copy=False) - - -class DirectStepContext(object): - """Context for the currently-executing step.""" - - def __init__(self, keyed_existing_state): - self.keyed_existing_state = keyed_existing_state - - def get_keyed_state(self, key): - # TODO(ccy): consider implementing transactional copy on write semantics - # for state so that work items can be safely retried. - if not self.keyed_existing_state.get(key): - self.keyed_existing_state[key] = DirectUnmergedState() - return self.keyed_existing_state[key] http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/runners/direct/executor.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py index e70e326..86db291 100644 --- a/sdks/python/apache_beam/runners/direct/executor.py +++ b/sdks/python/apache_beam/runners/direct/executor.py @@ -20,7 +20,6 @@ from __future__ import absolute_import import collections -import itertools import logging import Queue import sys @@ -222,30 +221,22 @@ class _CompletionCallback(object): or for a source transform. """ - def __init__(self, evaluation_context, all_updates, timer_firings=None): + def __init__(self, evaluation_context, all_updates, timers=None): self._evaluation_context = evaluation_context self._all_updates = all_updates - self._timer_firings = timer_firings or [] + self._timers = timers - def handle_result(self, transform_executor, input_committed_bundle, - transform_result): + def handle_result(self, input_committed_bundle, transform_result): output_committed_bundles = self._evaluation_context.handle_result( - input_committed_bundle, self._timer_firings, transform_result) + input_committed_bundle, self._timers, transform_result) for output_committed_bundle in output_committed_bundles: self._all_updates.offer(_ExecutorServiceParallelExecutor._ExecutorUpdate( - transform_executor, - committed_bundle=output_committed_bundle)) - for unprocessed_bundle in transform_result.unprocessed_bundles: - self._all_updates.offer( - _ExecutorServiceParallelExecutor._ExecutorUpdate( - transform_executor, - unprocessed_bundle=unprocessed_bundle)) + output_committed_bundle, None)) return output_committed_bundles - def handle_exception(self, transform_executor, exception): + def handle_exception(self, exception): self._all_updates.offer( - _ExecutorServiceParallelExecutor._ExecutorUpdate( - transform_executor, exception=exception)) + _ExecutorServiceParallelExecutor._ExecutorUpdate(None, exception)) class TransformExecutor(_ExecutorService.CallableTask): @@ -259,13 +250,12 @@ class TransformExecutor(_ExecutorService.CallableTask): """ def __init__(self, transform_evaluator_registry, evaluation_context, - input_bundle, fired_timers, applied_ptransform, - completion_callback, transform_evaluation_state): + input_bundle, applied_transform, completion_callback, + transform_evaluation_state): self._transform_evaluator_registry = transform_evaluator_registry self._evaluation_context = evaluation_context self._input_bundle = input_bundle - self._fired_timers = fired_timers - self._applied_ptransform = applied_ptransform + self._applied_transform = applied_transform self._completion_callback = completion_callback self._transform_evaluation_state = transform_evaluation_state self._side_input_values = {} @@ -274,11 +264,11 @@ class TransformExecutor(_ExecutorService.CallableTask): def call(self): self._call_count += 1 - assert self._call_count <= (1 + len(self._applied_ptransform.side_inputs)) - metrics_container = MetricsContainer(self._applied_ptransform.full_label) + assert self._call_count <= (1 + len(self._applied_transform.side_inputs)) + metrics_container = MetricsContainer(self._applied_transform.full_label) scoped_metrics_container = ScopedMetricsContainer(metrics_container) - for side_input in self._applied_ptransform.side_inputs: + for side_input in self._applied_transform.side_inputs: if side_input not in self._side_input_values: has_result, value = ( self._evaluation_context.get_value_or_schedule_after_output( @@ -290,17 +280,13 @@ class TransformExecutor(_ExecutorService.CallableTask): self._side_input_values[side_input] = value side_input_values = [self._side_input_values[side_input] - for side_input in self._applied_ptransform.side_inputs] + for side_input in self._applied_transform.side_inputs] try: - evaluator = self._transform_evaluator_registry.get_evaluator( - self._applied_ptransform, self._input_bundle, + evaluator = self._transform_evaluator_registry.for_application( + self._applied_transform, self._input_bundle, side_input_values, scoped_metrics_container) - if self._fired_timers: - for timer_firing in self._fired_timers: - evaluator.process_timer_wrapper(timer_firing) - if self._input_bundle: for value in self._input_bundle.get_elements_iterable(): evaluator.process_element(value) @@ -312,18 +298,18 @@ class TransformExecutor(_ExecutorService.CallableTask): if self._evaluation_context.has_cache: for uncommitted_bundle in result.uncommitted_output_bundles: self._evaluation_context.append_to_cache( - self._applied_ptransform, uncommitted_bundle.tag, + self._applied_transform, uncommitted_bundle.tag, uncommitted_bundle.get_elements_iterable()) undeclared_tag_values = result.undeclared_tag_values if undeclared_tag_values: for tag, value in undeclared_tag_values.iteritems(): self._evaluation_context.append_to_cache( - self._applied_ptransform, tag, value) + self._applied_transform, tag, value) - self._completion_callback.handle_result(self, self._input_bundle, result) + self._completion_callback.handle_result(self._input_bundle, result) return result except Exception as e: # pylint: disable=broad-except - self._completion_callback.handle_exception(self, e) + self._completion_callback.handle_exception(e) finally: self._evaluation_context.metrics().commit_physical( self._input_bundle, @@ -367,15 +353,6 @@ class _ExecutorServiceParallelExecutor(object): def start(self, roots): self.root_nodes = frozenset(roots) - self.all_nodes = frozenset( - itertools.chain( - roots, - *itertools.chain(self.value_to_consumers.values()))) - self.node_to_pending_bundles = {} - for root_node in self.root_nodes: - provider = (self.transform_evaluator_registry - .get_root_bundle_provider(root_node)) - self.node_to_pending_bundles[root_node] = provider.get_root_bundles() self.executor_service.submit( _ExecutorServiceParallelExecutor._MonitorTask(self)) @@ -392,30 +369,26 @@ class _ExecutorServiceParallelExecutor(object): if committed_bundle.pcollection in self.value_to_consumers: consumers = self.value_to_consumers[committed_bundle.pcollection] for applied_ptransform in consumers: - self.schedule_consumption(applied_ptransform, committed_bundle, [], + self.schedule_consumption(applied_ptransform, committed_bundle, self.default_completion_callback) - def schedule_unprocessed_bundle(self, applied_ptransform, - unprocessed_bundle): - self.node_to_pending_bundles[applied_ptransform].append(unprocessed_bundle) - - def schedule_consumption(self, consumer_applied_ptransform, committed_bundle, - fired_timers, on_complete): + def schedule_consumption(self, consumer_applied_transform, committed_bundle, + on_complete): """Schedules evaluation of the given bundle with the transform.""" - assert consumer_applied_ptransform - assert committed_bundle - assert on_complete - if self.transform_evaluator_registry.should_execute_serially( - consumer_applied_ptransform): + assert all([consumer_applied_transform, on_complete]) + assert committed_bundle or consumer_applied_transform in self.root_nodes + if (committed_bundle + and self.transform_evaluator_registry.should_execute_serially( + consumer_applied_transform)): transform_executor_service = self.transform_executor_services.serial( - consumer_applied_ptransform) + consumer_applied_transform) else: transform_executor_service = self.transform_executor_services.parallel() transform_executor = TransformExecutor( self.transform_evaluator_registry, self.evaluation_context, - committed_bundle, fired_timers, consumer_applied_ptransform, - on_complete, transform_executor_service) + committed_bundle, consumer_applied_transform, on_complete, + transform_executor_service) transform_executor_service.schedule(transform_executor) class _TypedUpdateQueue(object): @@ -445,16 +418,10 @@ class _ExecutorServiceParallelExecutor(object): class _ExecutorUpdate(object): """An internal status update on the state of the executor.""" - def __init__(self, transform_executor, committed_bundle=None, - unprocessed_bundle=None, exception=None): - self.transform_executor = transform_executor + def __init__(self, produced_bundle=None, exception=None): # Exactly one of them should be not-None - assert sum([ - bool(committed_bundle), - bool(unprocessed_bundle), - bool(exception)]) == 1 - self.committed_bundle = committed_bundle - self.unprocessed_bundle = unprocessed_bundle + assert bool(produced_bundle) != bool(exception) + self.committed_bundle = produced_bundle self.exception = exception self.exc_info = sys.exc_info() if self.exc_info[1] is not exception: @@ -489,10 +456,6 @@ class _ExecutorServiceParallelExecutor(object): while update: if update.committed_bundle: self._executor.schedule_consumers(update.committed_bundle) - elif update.unprocessed_bundle: - self._executor.schedule_unprocessed_bundle( - update.transform_executor._applied_ptransform, - update.unprocessed_bundle) else: assert update.exception logging.warning('A task failed with exception.\n %s', @@ -554,21 +517,19 @@ class _ExecutorServiceParallelExecutor(object): Returns: True if timers fired. """ - transform_fired_timers = ( - self._executor.evaluation_context.extract_fired_timers()) - for applied_ptransform, fired_timers in transform_fired_timers: + fired_timers = self._executor.evaluation_context.extract_fired_timers() + for applied_ptransform in fired_timers: # Use an empty committed bundle. just to trigger. empty_bundle = ( self._executor.evaluation_context.create_empty_committed_bundle( applied_ptransform.inputs[0])) timer_completion_callback = _CompletionCallback( self._executor.evaluation_context, self._executor.all_updates, - timer_firings=fired_timers) + applied_ptransform) self._executor.schedule_consumption( - applied_ptransform, empty_bundle, fired_timers, - timer_completion_callback) - return bool(transform_fired_timers) + applied_ptransform, empty_bundle, timer_completion_callback) + return bool(fired_timers) def _is_executing(self): """Returns True if there is at least one non-blocked TransformExecutor.""" @@ -603,14 +564,10 @@ class _ExecutorServiceParallelExecutor(object): # additional work. return - # All current TransformExecutors are blocked; add more work from any - # pending bundles. - for applied_ptransform in self._executor.all_nodes: - if not self._executor.evaluation_context.is_done(applied_ptransform): - pending_bundles = self._executor.node_to_pending_bundles.get( - applied_ptransform, []) - for bundle in pending_bundles: - self._executor.schedule_consumption( - applied_ptransform, bundle, [], - self._executor.default_completion_callback) - self._executor.node_to_pending_bundles[applied_ptransform] = [] + # All current TransformExecutors are blocked; add more work from the + # roots. + for applied_transform in self._executor.root_nodes: + if not self._executor.evaluation_context.is_done(applied_transform): + self._executor.schedule_consumption( + applied_transform, None, + self._executor.default_completion_callback) http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/runners/direct/transform_evaluator.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py index cb2ace2..b1cb626 100644 --- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py @@ -20,8 +20,6 @@ from __future__ import absolute_import import collections -import random -import time from apache_beam import coders from apache_beam import pvalue @@ -29,29 +27,16 @@ from apache_beam.internal import pickler import apache_beam.io as io from apache_beam.runners.common import DoFnRunner from apache_beam.runners.common import DoFnState -from apache_beam.runners.direct.direct_runner import _StreamingGroupByKeyOnly -from apache_beam.runners.direct.direct_runner import _StreamingGroupAlsoByWindow from apache_beam.runners.direct.watermark_manager import WatermarkManager -from apache_beam.runners.direct.util import KeyedWorkItem -from apache_beam.runners.direct.util import TransformResult +from apache_beam.runners.direct.transform_result import TransformResult from apache_beam.runners.dataflow.native_io.iobase import _NativeWrite # pylint: disable=protected-access -from apache_beam.testing.test_stream import TestStream -from apache_beam.testing.test_stream import ElementEvent -from apache_beam.testing.test_stream import WatermarkEvent -from apache_beam.testing.test_stream import ProcessingTimeEvent from apache_beam.transforms import core from apache_beam.transforms.window import GlobalWindows from apache_beam.transforms.window import WindowedValue -from apache_beam.transforms.trigger import create_trigger_driver -from apache_beam.transforms.trigger import _CombiningValueStateTag -from apache_beam.transforms.trigger import _ListStateTag -from apache_beam.transforms.trigger import TimeDomain from apache_beam.typehints.typecheck import OutputCheckWrapperDoFn from apache_beam.typehints.typecheck import TypeCheckError from apache_beam.typehints.typecheck import TypeCheckWrapperDoFn from apache_beam.utils import counters -from apache_beam.utils.timestamp import Timestamp -from apache_beam.utils.timestamp import MIN_TIMESTAMP from apache_beam.options.pipeline_options import TypeOptions @@ -66,21 +51,13 @@ class TransformEvaluatorRegistry(object): self._evaluation_context = evaluation_context self._evaluators = { io.Read: _BoundedReadEvaluator, - io.ReadStringsFromPubSub: _PubSubReadEvaluator, core.Flatten: _FlattenEvaluator, core.ParDo: _ParDoEvaluator, core._GroupByKeyOnly: _GroupByKeyOnlyEvaluator, - _StreamingGroupByKeyOnly: _StreamingGroupByKeyOnlyEvaluator, - _StreamingGroupAlsoByWindow: _StreamingGroupAlsoByWindowEvaluator, _NativeWrite: _NativeWriteEvaluator, - TestStream: _TestStreamEvaluator, - } - self._root_bundle_providers = { - core.PTransform: DefaultRootBundleProvider, - TestStream: _TestStreamRootBundleProvider, } - def get_evaluator( + def for_application( self, applied_ptransform, input_committed_bundle, side_inputs, scoped_metrics_container): """Returns a TransformEvaluator suitable for processing given inputs.""" @@ -102,18 +79,6 @@ class TransformEvaluatorRegistry(object): input_committed_bundle, side_inputs, scoped_metrics_container) - def get_root_bundle_provider(self, applied_ptransform): - provider_cls = None - for cls in applied_ptransform.transform.__class__.mro(): - provider_cls = self._root_bundle_providers.get(cls) - if provider_cls: - break - if not provider_cls: - raise NotImplementedError( - 'Root provider for [%s] not implemented in runner %s' % ( - type(applied_ptransform.transform), self)) - return provider_cls(self._evaluation_context, applied_ptransform) - def should_execute_serially(self, applied_ptransform): """Returns True if this applied_ptransform should run one bundle at a time. @@ -134,48 +99,7 @@ class TransformEvaluatorRegistry(object): True if executor should execute applied_ptransform serially. """ return isinstance(applied_ptransform.transform, - (core._GroupByKeyOnly, - _StreamingGroupByKeyOnly, - _StreamingGroupAlsoByWindow, - _NativeWrite,)) - - -class RootBundleProvider(object): - """Provides bundles for the initial execution of a root transform.""" - - def __init__(self, evaluation_context, applied_ptransform): - self._evaluation_context = evaluation_context - self._applied_ptransform = applied_ptransform - - def get_root_bundles(self): - raise NotImplementedError - - -class DefaultRootBundleProvider(RootBundleProvider): - """Provides an empty bundle by default for root transforms.""" - - def get_root_bundles(self): - input_node = pvalue.PBegin(self._applied_ptransform.transform.pipeline) - empty_bundle = ( - self._evaluation_context.create_empty_committed_bundle(input_node)) - return [empty_bundle] - - -class _TestStreamRootBundleProvider(RootBundleProvider): - """Provides an initial bundle for the TestStream evaluator.""" - - def get_root_bundles(self): - test_stream = self._applied_ptransform.transform - bundles = [] - if len(test_stream.events) > 0: - bundle = self._evaluation_context.create_bundle( - pvalue.PBegin(self._applied_ptransform.transform.pipeline)) - # Explicitly set timestamp to MIN_TIMESTAMP to ensure that we hold the - # watermark. - bundle.add(GlobalWindows.windowed_value(0, timestamp=MIN_TIMESTAMP)) - bundle.commit(None) - bundles.append(bundle) - return bundles + (core._GroupByKeyOnly, _NativeWrite)) class _TransformEvaluator(object): @@ -237,27 +161,6 @@ class _TransformEvaluator(object): """Starts a new bundle.""" pass - def process_timer_wrapper(self, timer_firing): - """Process timer by clearing and then calling process_timer(). - - This method is called with any timer firing and clears the delivered - timer from the keyed state and then calls process_timer(). The default - process_timer() implementation emits a KeyedWorkItem for the particular - timer and passes it to process_element(). Evaluator subclasses which - desire different timer delivery semantics can override process_timer(). - """ - state = self.step_context.get_keyed_state(timer_firing.encoded_key) - state.clear_timer( - timer_firing.window, timer_firing.name, timer_firing.time_domain) - self.process_timer(timer_firing) - - def process_timer(self, timer_firing): - """Default process_timer() impl. generating KeyedWorkItem element.""" - self.process_element( - GlobalWindows.windowed_value( - KeyedWorkItem(timer_firing.encoded_key, - timer_firings=[timer_firing]))) - def process_element(self, element): """Processes a new element as part of the current bundle.""" raise NotImplementedError('%s do not process elements.', type(self)) @@ -275,6 +178,7 @@ class _BoundedReadEvaluator(_TransformEvaluator): def __init__(self, evaluation_context, applied_ptransform, input_committed_bundle, side_inputs, scoped_metrics_container): + assert not input_committed_bundle assert not side_inputs self._source = applied_ptransform.transform.source self._source.pipeline_options = evaluation_context.pipeline_options @@ -303,148 +207,7 @@ class _BoundedReadEvaluator(_TransformEvaluator): bundles = _read_values_to_bundles(reader) return TransformResult( - self._applied_ptransform, bundles, [], None, None) - - -class _TestStreamEvaluator(_TransformEvaluator): - """TransformEvaluator for the TestStream transform.""" - - def __init__(self, evaluation_context, applied_ptransform, - input_committed_bundle, side_inputs, scoped_metrics_container): - assert not side_inputs - self.test_stream = applied_ptransform.transform - super(_TestStreamEvaluator, self).__init__( - evaluation_context, applied_ptransform, input_committed_bundle, - side_inputs, scoped_metrics_container) - - def start_bundle(self): - self.current_index = -1 - self.watermark = MIN_TIMESTAMP - self.bundles = [] - - def process_element(self, element): - index = element.value - self.watermark = element.timestamp - assert isinstance(index, int) - assert 0 <= index <= len(self.test_stream.events) - self.current_index = index - event = self.test_stream.events[self.current_index] - if isinstance(event, ElementEvent): - assert len(self._outputs) == 1 - output_pcollection = list(self._outputs)[0] - bundle = self._evaluation_context.create_bundle(output_pcollection) - for tv in event.timestamped_values: - bundle.output( - GlobalWindows.windowed_value(tv.value, timestamp=tv.timestamp)) - self.bundles.append(bundle) - elif isinstance(event, WatermarkEvent): - assert event.new_watermark >= self.watermark - self.watermark = event.new_watermark - elif isinstance(event, ProcessingTimeEvent): - # TODO(ccy): advance processing time in the context's mock clock. - pass - else: - raise ValueError('Invalid TestStream event: %s.' % event) - - def finish_bundle(self): - unprocessed_bundles = [] - hold = None - if self.current_index < len(self.test_stream.events) - 1: - unprocessed_bundle = self._evaluation_context.create_bundle( - pvalue.PBegin(self._applied_ptransform.transform.pipeline)) - unprocessed_bundle.add(GlobalWindows.windowed_value( - self.current_index + 1, timestamp=self.watermark)) - unprocessed_bundles.append(unprocessed_bundle) - hold = self.watermark - return TransformResult( - self._applied_ptransform, self.bundles, unprocessed_bundles, None, - {None: hold}) - - -class _PubSubSubscriptionWrapper(object): - """Wrapper for garbage-collecting temporary PubSub subscriptions.""" - - def __init__(self, subscription, should_cleanup): - self.subscription = subscription - self.should_cleanup = should_cleanup - - def __del__(self): - if self.should_cleanup: - self.subscription.delete() - - -class _PubSubReadEvaluator(_TransformEvaluator): - """TransformEvaluator for PubSub read.""" - - _subscription_cache = {} - - def __init__(self, evaluation_context, applied_ptransform, - input_committed_bundle, side_inputs, scoped_metrics_container): - assert not side_inputs - super(_PubSubReadEvaluator, self).__init__( - evaluation_context, applied_ptransform, input_committed_bundle, - side_inputs, scoped_metrics_container) - - source = self._applied_ptransform.transform._source - self._subscription = _PubSubReadEvaluator.get_subscription( - self._applied_ptransform, source.project, source.topic_name, - source.subscription_name) - - @classmethod - def get_subscription(cls, transform, project, topic, subscription_name): - if transform not in cls._subscription_cache: - from google.cloud import pubsub - should_create = not subscription_name - if should_create: - subscription_name = 'beam_%d_%x' % ( - int(time.time()), random.randrange(1 << 32)) - cls._subscription_cache[transform] = _PubSubSubscriptionWrapper( - pubsub.Client(project=project).topic(topic).subscription( - subscription_name), - should_create) - if should_create: - cls._subscription_cache[transform].subscription.create() - return cls._subscription_cache[transform].subscription - - def start_bundle(self): - pass - - def process_element(self, element): - pass - - def _read_from_pubsub(self): - from google.cloud import pubsub - # Because of the AutoAck, we are not able to reread messages if this - # evaluator fails with an exception before emitting a bundle. However, - # the DirectRunner currently doesn't retry work items anyway, so the - # pipeline would enter an inconsistent state on any error. - with pubsub.subscription.AutoAck( - self._subscription, return_immediately=True, - max_messages=10) as results: - return [message.data for unused_ack_id, message in results.items()] - - def finish_bundle(self): - data = self._read_from_pubsub() - if data: - output_pcollection = list(self._outputs)[0] - bundle = self._evaluation_context.create_bundle(output_pcollection) - # TODO(ccy): we currently do not use the PubSub message timestamp or - # respect the PubSub source's id_label field. - now = Timestamp.of(time.time()) - for message_data in data: - bundle.output(GlobalWindows.windowed_value(message_data, timestamp=now)) - bundles = [bundle] - else: - bundles = [] - if self._applied_ptransform.inputs: - input_pvalue = self._applied_ptransform.inputs[0] - else: - input_pvalue = pvalue.PBegin(self._applied_ptransform.transform.pipeline) - unprocessed_bundle = self._evaluation_context.create_bundle( - input_pvalue) - return TransformResult( - self._applied_ptransform, bundles, - [unprocessed_bundle], None, {None: Timestamp.of(time.time())}) + self._applied_ptransform, bundles, None, None, None, None) class _FlattenEvaluator(_TransformEvaluator): @@ -468,7 +231,7 @@ class _FlattenEvaluator(_TransformEvaluator): def finish_bundle(self): bundles = [self.bundle] return TransformResult( - self._applied_ptransform, bundles, [], None, None) + self._applied_ptransform, bundles, None, None, None, None) class _TaggedReceivers(dict): @@ -557,7 +320,7 @@ class _ParDoEvaluator(_TransformEvaluator): bundles = self._tagged_receivers.values() result_counters = self._counter_factory.get_counters() return TransformResult( - self._applied_ptransform, bundles, [], result_counters, None, + self._applied_ptransform, bundles, None, None, result_counters, None, self._tagged_receivers.undeclared_in_memory_tag_values) @@ -565,8 +328,13 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator): """TransformEvaluator for _GroupByKeyOnly transform.""" MAX_ELEMENT_PER_BUNDLE = None - ELEMENTS_TAG = _ListStateTag('elements') - COMPLETION_TAG = _CombiningValueStateTag('completed', any) + + class _GroupByKeyOnlyEvaluatorState(object): + + def __init__(self): + # output: {} key -> [values] + self.output = collections.defaultdict(list) + self.completed = False def __init__(self, evaluation_context, applied_ptransform, input_committed_bundle, side_inputs, scoped_metrics_container): @@ -575,13 +343,15 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator): evaluation_context, applied_ptransform, input_committed_bundle, side_inputs, scoped_metrics_container) + @property def _is_final_bundle(self): return (self._execution_context.watermarks.input_watermark == WatermarkManager.WATERMARK_POS_INF) def start_bundle(self): - self.step_context = self._execution_context.get_step_context() - self.global_state = self.step_context.get_keyed_state(None) + self.state = (self._execution_context.existing_state + if self._execution_context.existing_state + else _GroupByKeyOnlyEvaluator._GroupByKeyOnlyEvaluatorState()) assert len(self._outputs) == 1 self.output_pcollection = list(self._outputs)[0] @@ -591,44 +361,29 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator): self._applied_ptransform.transform.get_type_hints().input_types[0]) self.key_coder = coders.registry.get_coder(kv_type_hint[0].tuple_types[0]) - def process_timer(self, timer_firing): - # We do not need to emit a KeyedWorkItem to process_element(). - pass - def process_element(self, element): - assert not self.global_state.get_state( - None, _GroupByKeyOnlyEvaluator.COMPLETION_TAG) + assert not self.state.completed if (isinstance(element, WindowedValue) and isinstance(element.value, collections.Iterable) and len(element.value) == 2): k, v = element.value - encoded_k = self.key_coder.encode(k) - state = self.step_context.get_keyed_state(encoded_k) - state.add_state(None, _GroupByKeyOnlyEvaluator.ELEMENTS_TAG, v) + self.state.output[self.key_coder.encode(k)].append(v) else: raise TypeCheckError('Input to _GroupByKeyOnly must be a PCollection of ' 'windowed key-value pairs. Instead received: %r.' % element) def finish_bundle(self): - if self._is_final_bundle(): - if self.global_state.get_state( - None, _GroupByKeyOnlyEvaluator.COMPLETION_TAG): + if self._is_final_bundle: + if self.state.completed: # Ignore empty bundles after emitting output. (This may happen because # empty bundles do not affect input watermarks.) bundles = [] else: - gbk_result = [] - # TODO(ccy): perhaps we can clean this up to not use this - # internal attribute of the DirectStepContext. - for encoded_k in self.step_context.keyed_existing_state: - # Ignore global state. - if encoded_k is None: - continue - k = self.key_coder.decode(encoded_k) - state = self.step_context.get_keyed_state(encoded_k) - vs = state.get_state(None, _GroupByKeyOnlyEvaluator.ELEMENTS_TAG) - gbk_result.append(GlobalWindows.windowed_value((k, vs))) + gbk_result = ( + map(GlobalWindows.windowed_value, ( + (self.key_coder.decode(k), v) + for k, v in self.state.output.iteritems()))) def len_element_fn(element): _, v = element.value @@ -638,139 +393,21 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator): self.output_pcollection, gbk_result, _GroupByKeyOnlyEvaluator.MAX_ELEMENT_PER_BUNDLE, len_element_fn) - self.global_state.add_state( - None, _GroupByKeyOnlyEvaluator.COMPLETION_TAG, True) + self.state.completed = True + state = self.state hold = WatermarkManager.WATERMARK_POS_INF else: bundles = [] + state = self.state hold = WatermarkManager.WATERMARK_NEG_INF - self.global_state.set_timer( - None, '', TimeDomain.WATERMARK, WatermarkManager.WATERMARK_POS_INF) return TransformResult( - self._applied_ptransform, bundles, [], None, {None: hold}) - - -class _StreamingGroupByKeyOnlyEvaluator(_TransformEvaluator): - """TransformEvaluator for _StreamingGroupByKeyOnly transform. - - The _GroupByKeyOnlyEvaluator buffers elements until its input watermark goes - to infinity, which is suitable for batch mode execution. During streaming - mode execution, we emit each bundle as it comes to the next transform. - """ - - MAX_ELEMENT_PER_BUNDLE = None - - def __init__(self, evaluation_context, applied_ptransform, - input_committed_bundle, side_inputs, scoped_metrics_container): - assert not side_inputs - super(_StreamingGroupByKeyOnlyEvaluator, self).__init__( - evaluation_context, applied_ptransform, input_committed_bundle, - side_inputs, scoped_metrics_container) - - def start_bundle(self): - self.gbk_items = collections.defaultdict(list) - - assert len(self._outputs) == 1 - self.output_pcollection = list(self._outputs)[0] - - # The input type of a GroupByKey will be KV[Any, Any] or more specific. - kv_type_hint = ( - self._applied_ptransform.transform.get_type_hints().input_types[0]) - self.key_coder = coders.registry.get_coder(kv_type_hint[0].tuple_types[0]) - - def process_element(self, element): - if (isinstance(element, WindowedValue) - and isinstance(element.value, collections.Iterable) - and len(element.value) == 2): - k, v = element.value - self.gbk_items[self.key_coder.encode(k)].append(v) - else: - raise TypeCheckError('Input to _GroupByKeyOnly must be a PCollection of ' - 'windowed key-value pairs. Instead received: %r.' - % element) - - def finish_bundle(self): - bundles = [] - bundle = None - for encoded_k, vs in self.gbk_items.iteritems(): - if not bundle: - bundle = self._evaluation_context.create_bundle( - self.output_pcollection) - bundles.append(bundle) - kwi = KeyedWorkItem(encoded_k, elements=vs) - bundle.add(GlobalWindows.windowed_value(kwi)) - - return TransformResult( - self._applied_ptransform, bundles, [], None, None) - - -class _StreamingGroupAlsoByWindowEvaluator(_TransformEvaluator): - """TransformEvaluator for the _StreamingGroupAlsoByWindow transform. - - This evaluator is only used in streaming mode. In batch mode, the - GroupAlsoByWindow operation is evaluated as a normal DoFn, as defined - in transforms/core.py. - """ - - def __init__(self, evaluation_context, applied_ptransform, - input_committed_bundle, side_inputs, scoped_metrics_container): - assert not side_inputs - super(_StreamingGroupAlsoByWindowEvaluator, self).__init__( - evaluation_context, applied_ptransform, input_committed_bundle, - side_inputs, scoped_metrics_container) - - def start_bundle(self): - assert len(self._outputs) == 1 - self.output_pcollection = list(self._outputs)[0] - self.step_context = self._execution_context.get_step_context() - self.driver = create_trigger_driver( - self._applied_ptransform.transform.windowing) - self.gabw_items = [] - self.keyed_holds = {} - - # The input type of a GroupAlsoByWindow will be KV[Any, Iter[Any]] or more - # specific. - kv_type_hint = ( - self._applied_ptransform.transform.get_type_hints().input_types[0]) - self.key_coder = coders.registry.get_coder(kv_type_hint[0].tuple_types[0]) - - def process_element(self, element): - kwi = element.value - assert isinstance(kwi, KeyedWorkItem), kwi - encoded_k, timer_firings, vs = ( - kwi.encoded_key, kwi.timer_firings, kwi.elements) - k = self.key_coder.decode(encoded_k) - state = self.step_context.get_keyed_state(encoded_k) - - for timer_firing in timer_firings: - for wvalue in self.driver.process_timer( - timer_firing.window, timer_firing.name, timer_firing.time_domain, - timer_firing.timestamp, state): - self.gabw_items.append(wvalue.with_value((k, wvalue.value))) - if vs: - for wvalue in self.driver.process_elements(state, vs, MIN_TIMESTAMP): - self.gabw_items.append(wvalue.with_value((k, wvalue.value))) - - self.keyed_holds[encoded_k] = state.get_earliest_hold() - - def finish_bundle(self): - bundles = [] - if self.gabw_items: - bundle = self._evaluation_context.create_bundle(self.output_pcollection) - for item in self.gabw_items: - bundle.add(item) - bundles.append(bundle) - - return TransformResult( - self._applied_ptransform, bundles, [], None, self.keyed_holds) + self._applied_ptransform, bundles, state, None, None, hold) class _NativeWriteEvaluator(_TransformEvaluator): """TransformEvaluator for _NativeWrite transform.""" - ELEMENTS_TAG = _ListStateTag('elements') - def __init__(self, evaluation_context, applied_ptransform, input_committed_bundle, side_inputs, scoped_metrics_container): assert not side_inputs @@ -792,16 +429,12 @@ class _NativeWriteEvaluator(_TransformEvaluator): == WatermarkManager.WATERMARK_POS_INF) def start_bundle(self): - self.step_context = self._execution_context.get_step_context() - self.global_state = self.step_context.get_keyed_state(None) - - def process_timer(self, timer_firing): - # We do not need to emit a KeyedWorkItem to process_element(). - pass + # state: [values] + self.state = (self._execution_context.existing_state + if self._execution_context.existing_state else []) def process_element(self, element): - self.global_state.add_state( - None, _NativeWriteEvaluator.ELEMENTS_TAG, element) + self.state.append(element) def finish_bundle(self): # finish_bundle will append incoming bundles in memory until all the bundles @@ -811,21 +444,19 @@ class _NativeWriteEvaluator(_TransformEvaluator): # ignored and would not generate additional output files. # TODO(altay): Do not wait until the last bundle to write in a single shard. if self._is_final_bundle: - elements = self.global_state.get_state( - None, _NativeWriteEvaluator.ELEMENTS_TAG) if self._has_already_produced_output: # Ignore empty bundles that arrive after the output is produced. - assert elements == [] + assert self.state == [] else: self._sink.pipeline_options = self._evaluation_context.pipeline_options with self._sink.writer() as writer: - for v in elements: + for v in self.state: writer.Write(v.value) + state = None hold = WatermarkManager.WATERMARK_POS_INF else: + state = self.state hold = WatermarkManager.WATERMARK_NEG_INF - self.global_state.set_timer( - None, '', TimeDomain.WATERMARK, WatermarkManager.WATERMARK_POS_INF) return TransformResult( - self._applied_ptransform, [], [], None, {None: hold}) + self._applied_ptransform, [], state, None, None, hold) http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/runners/direct/transform_result.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/transform_result.py b/sdks/python/apache_beam/runners/direct/transform_result.py new file mode 100644 index 0000000..febdd20 --- /dev/null +++ b/sdks/python/apache_beam/runners/direct/transform_result.py @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""The result of evaluating an AppliedPTransform with a TransformEvaluator.""" + +from __future__ import absolute_import + + +class TransformResult(object): + """For internal use only; no backwards-compatibility guarantees. + + The result of evaluating an AppliedPTransform with a TransformEvaluator.""" + + def __init__(self, applied_ptransform, uncommitted_output_bundles, state, + timer_update, counters, watermark_hold, + undeclared_tag_values=None): + self.transform = applied_ptransform + self.uncommitted_output_bundles = uncommitted_output_bundles + self.state = state + # TODO: timer update is currently unused. + self.timer_update = timer_update + self.counters = counters + self.watermark_hold = watermark_hold + # Only used when caching (materializing) all values is requested. + self.undeclared_tag_values = undeclared_tag_values + # Populated by the TransformExecutor. + self.logical_metric_updates = None http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/runners/direct/util.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/util.py b/sdks/python/apache_beam/runners/direct/util.py deleted file mode 100644 index 10f7b29..0000000 --- a/sdks/python/apache_beam/runners/direct/util.py +++ /dev/null @@ -1,67 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Utility classes used by the DirectRunner. - -For internal use only. No backwards compatibility guarantees. -""" - -from __future__ import absolute_import - - -class TransformResult(object): - """Result of evaluating an AppliedPTransform with a TransformEvaluator.""" - - def __init__(self, applied_ptransform, uncommitted_output_bundles, - unprocessed_bundles, counters, keyed_watermark_holds, - undeclared_tag_values=None): - self.transform = applied_ptransform - self.uncommitted_output_bundles = uncommitted_output_bundles - self.unprocessed_bundles = unprocessed_bundles - self.counters = counters - # Mapping of key -> earliest hold timestamp or None. Keys should be - # strings or None. - # - # For each key, we receive as its corresponding value the earliest - # watermark hold for that key (the key can be None for global state), past - # which the output watermark for the currently-executing step will not - # advance. If the value is None or utils.timestamp.MAX_TIMESTAMP, the - # watermark hold will be removed. - self.keyed_watermark_holds = keyed_watermark_holds or {} - # Only used when caching (materializing) all values is requested. - self.undeclared_tag_values = undeclared_tag_values - # Populated by the TransformExecutor. - self.logical_metric_updates = None - - -class TimerFiring(object): - """A single instance of a fired timer.""" - - def __init__(self, encoded_key, window, name, time_domain, timestamp): - self.encoded_key = encoded_key - self.window = window - self.name = name - self.time_domain = time_domain - self.timestamp = timestamp - - -class KeyedWorkItem(object): - """A keyed item that can either be a timer firing or a list of elements.""" - def __init__(self, encoded_key, timer_firings=None, elements=None): - self.encoded_key = encoded_key - self.timer_firings = timer_firings or [] - self.elements = elements or [] http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/runners/direct/watermark_manager.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py b/sdks/python/apache_beam/runners/direct/watermark_manager.py index 935998d..3a13539 100644 --- a/sdks/python/apache_beam/runners/direct/watermark_manager.py +++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py @@ -23,10 +23,8 @@ import threading from apache_beam import pipeline from apache_beam import pvalue -from apache_beam.runners.direct.util import TimerFiring from apache_beam.utils.timestamp import MAX_TIMESTAMP from apache_beam.utils.timestamp import MIN_TIMESTAMP -from apache_beam.utils.timestamp import TIME_GRANULARITY class WatermarkManager(object): @@ -37,23 +35,21 @@ class WatermarkManager(object): WATERMARK_POS_INF = MAX_TIMESTAMP WATERMARK_NEG_INF = MIN_TIMESTAMP - def __init__(self, clock, root_transforms, value_to_consumers, - transform_keyed_states): + def __init__(self, clock, root_transforms, value_to_consumers): self._clock = clock # processing time clock - self._root_transforms = root_transforms self._value_to_consumers = value_to_consumers - self._transform_keyed_states = transform_keyed_states + self._root_transforms = root_transforms # AppliedPTransform -> TransformWatermarks self._transform_to_watermarks = {} for root_transform in root_transforms: self._transform_to_watermarks[root_transform] = _TransformWatermarks( - self._clock, transform_keyed_states[root_transform], root_transform) + self._clock) for consumers in value_to_consumers.values(): for consumer in consumers: self._transform_to_watermarks[consumer] = _TransformWatermarks( - self._clock, transform_keyed_states[consumer], consumer) + self._clock) for consumers in value_to_consumers.values(): for consumer in consumers: @@ -93,19 +89,16 @@ class WatermarkManager(object): return self._transform_to_watermarks[applied_ptransform] def update_watermarks(self, completed_committed_bundle, applied_ptransform, - completed_timers, outputs, unprocessed_bundles, - keyed_earliest_holds): + timer_update, outputs, earliest_hold): assert isinstance(applied_ptransform, pipeline.AppliedPTransform) self._update_pending( - completed_committed_bundle, applied_ptransform, completed_timers, - outputs, unprocessed_bundles) + completed_committed_bundle, applied_ptransform, timer_update, outputs) tw = self.get_watermarks(applied_ptransform) - tw.hold(keyed_earliest_holds) + tw.hold(earliest_hold) self._refresh_watermarks(applied_ptransform) def _update_pending(self, input_committed_bundle, applied_ptransform, - completed_timers, output_committed_bundles, - unprocessed_bundles): + timer_update, output_committed_bundles): """Updated list of pending bundles for the given AppliedPTransform.""" # Update pending elements. Filter out empty bundles. They do not impact @@ -119,10 +112,7 @@ class WatermarkManager(object): consumer_tw.add_pending(output) completed_tw = self._transform_to_watermarks[applied_ptransform] - completed_tw.update_timers(completed_timers) - - for unprocessed_bundle in unprocessed_bundles: - completed_tw.add_pending(unprocessed_bundle) + completed_tw.update_timers(timer_update) assert input_committed_bundle or applied_ptransform in self._root_transforms if input_committed_bundle and input_committed_bundle.has_elements(): @@ -146,36 +136,33 @@ class WatermarkManager(object): def extract_fired_timers(self): all_timers = [] for applied_ptransform, tw in self._transform_to_watermarks.iteritems(): - fired_timers = tw.extract_fired_timers() - if fired_timers: - all_timers.append((applied_ptransform, fired_timers)) + if tw.extract_fired_timers(): + all_timers.append(applied_ptransform) return all_timers class _TransformWatermarks(object): - """Tracks input and output watermarks for an AppliedPTransform.""" + """Tracks input and output watermarks for aan AppliedPTransform.""" - def __init__(self, clock, keyed_states, transform): + def __init__(self, clock): self._clock = clock - self._keyed_states = keyed_states self._input_transform_watermarks = [] self._input_watermark = WatermarkManager.WATERMARK_NEG_INF self._output_watermark = WatermarkManager.WATERMARK_NEG_INF - self._keyed_earliest_holds = {} + self._earliest_hold = WatermarkManager.WATERMARK_POS_INF self._pending = set() # Scheduled bundles targeted for this transform. - self._fired_timers = set() + self._fired_timers = False self._lock = threading.Lock() - self._label = str(transform) - def update_input_transform_watermarks(self, input_transform_watermarks): with self._lock: self._input_transform_watermarks = input_transform_watermarks - def update_timers(self, completed_timers): + def update_timers(self, timer_update): with self._lock: - for timer_firing in completed_timers: - self._fired_timers.remove(timer_firing) + if timer_update: + assert self._fired_timers + self._fired_timers = False @property def input_watermark(self): @@ -187,13 +174,11 @@ class _TransformWatermarks(object): with self._lock: return self._output_watermark - def hold(self, keyed_earliest_holds): + def hold(self, value): with self._lock: - for key, hold_value in keyed_earliest_holds.iteritems(): - self._keyed_earliest_holds[key] = hold_value - if (hold_value is None or - hold_value == WatermarkManager.WATERMARK_POS_INF): - del self._keyed_earliest_holds[key] + if value is None: + value = WatermarkManager.WATERMARK_POS_INF + self._earliest_hold = value def add_pending(self, pending): with self._lock: @@ -208,22 +193,9 @@ class _TransformWatermarks(object): def refresh(self): with self._lock: - min_pending_timestamp = WatermarkManager.WATERMARK_POS_INF - has_pending_elements = False - for input_bundle in self._pending: - # TODO(ccy): we can have the Bundle class keep track of the minimum - # timestamp so we don't have to do an iteration here. - for wv in input_bundle.get_elements_iterable(): - has_pending_elements = True - if wv.timestamp < min_pending_timestamp: - min_pending_timestamp = wv.timestamp - - # If there is a pending element with a certain timestamp, we can at most - # advance our watermark to the maximum timestamp less than that - # timestamp. - pending_holder = WatermarkManager.WATERMARK_POS_INF - if has_pending_elements: - pending_holder = min_pending_timestamp - TIME_GRANULARITY + pending_holder = (WatermarkManager.WATERMARK_NEG_INF + if self._pending else + WatermarkManager.WATERMARK_POS_INF) input_watermarks = [ tw.output_watermark for tw in self._input_transform_watermarks] @@ -232,11 +204,7 @@ class _TransformWatermarks(object): self._input_watermark = max(self._input_watermark, min(pending_holder, producer_watermark)) - earliest_hold = WatermarkManager.WATERMARK_POS_INF - for hold in self._keyed_earliest_holds.values(): - if hold < earliest_hold: - earliest_hold = hold - new_output_watermark = min(self._input_watermark, earliest_hold) + new_output_watermark = min(self._input_watermark, self._earliest_hold) advanced = new_output_watermark > self._output_watermark self._output_watermark = new_output_watermark @@ -251,12 +219,8 @@ class _TransformWatermarks(object): if self._fired_timers: return False - fired_timers = [] - for encoded_key, state in self._keyed_states.iteritems(): - timers = state.get_timers(watermark=self._input_watermark) - for expired in timers: - window, (name, time_domain, timestamp) = expired - fired_timers.append( - TimerFiring(encoded_key, window, name, time_domain, timestamp)) - self._fired_timers.update(fired_timers) - return fired_timers + should_fire = ( + self._earliest_hold < WatermarkManager.WATERMARK_POS_INF and + self._input_watermark == WatermarkManager.WATERMARK_POS_INF) + self._fired_timers = should_fire + return should_fire http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/runners/pipeline_context.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py index a40069b..1c89d06 100644 --- a/sdks/python/apache_beam/runners/pipeline_context.py +++ b/sdks/python/apache_beam/runners/pipeline_context.py @@ -24,8 +24,7 @@ For internal use only; no backwards-compatibility guarantees. from apache_beam import pipeline from apache_beam import pvalue from apache_beam import coders -from apache_beam.portability.api import beam_fn_api_pb2 -from apache_beam.portability.api import beam_runner_api_pb2 +from apache_beam.runners.api import beam_runner_api_pb2 from apache_beam.transforms import core @@ -43,10 +42,9 @@ class _PipelineContextMap(object): self._id_to_proto = proto_map if proto_map else {} self._counter = 0 - def _unique_ref(self, obj=None): + def _unique_ref(self): self._counter += 1 - return "ref_%s_%s_%s" % ( - self._obj_type.__name__, type(obj).__name__, self._counter) + return "ref_%s_%s" % (self._obj_type.__name__, self._counter) def populate_map(self, proto_map): for id, proto in self._id_to_proto.items(): @@ -54,7 +52,7 @@ class _PipelineContextMap(object): def get_id(self, obj): if obj not in self._obj_to_id: - id = self._unique_ref(obj) + id = self._unique_ref() self._id_to_obj[id] = obj self._obj_to_id[obj] = id self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context) @@ -81,16 +79,11 @@ class PipelineContext(object): # TODO: environment } - def __init__(self, proto=None): - if isinstance(proto, beam_fn_api_pb2.ProcessBundleDescriptor): - proto = beam_runner_api_pb2.Components( - coders=dict(proto.coders.items()), - windowing_strategies=dict(proto.windowing_strategies.items()), - environments=dict(proto.environments.items())) + def __init__(self, context_proto=None): for name, cls in self._COMPONENT_TYPES.items(): setattr( self, name, _PipelineContextMap( - self, cls, getattr(proto, name, None))) + self, cls, getattr(context_proto, name, None))) @staticmethod def from_runner_api(proto):
