Move dataflow native sinks and sources into dataflow directory.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/78520758 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/78520758 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/78520758 Branch: refs/heads/python-sdk Commit: 78520758abc3b1c2b38e26f3ffd64e01870de067 Parents: 90004a0 Author: Robert Bradshaw <rober...@google.com> Authored: Thu Oct 6 16:52:57 2016 -0700 Committer: Robert Bradshaw <rober...@google.com> Committed: Mon Oct 10 10:30:00 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/iobase.py | 287 +---------------- .../apache_beam/runners/dataflow/__init__.py | 0 .../runners/dataflow/native_io/__init__.py | 0 .../runners/dataflow/native_io/iobase.py | 319 +++++++++++++++++++ 4 files changed, 321 insertions(+), 285 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78520758/sdks/python/apache_beam/io/iobase.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index 4305fb6..b83d7eb 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -43,263 +43,8 @@ from apache_beam.transforms import core from apache_beam.transforms import ptransform from apache_beam.transforms import window - -def _dict_printable_fields(dict_object, skip_fields): - """Returns a list of strings for the interesting fields of a dict.""" - return ['%s=%r' % (name, value) - for name, value in dict_object.iteritems() - # want to output value 0 but not None nor [] - if (value or value == 0) - and name not in skip_fields] - -_minor_fields = ['coder', 'key_coder', 'value_coder', - 'config_bytes', 'elements', - 'append_trailing_newlines', 'strip_trailing_newlines', - 'compression_type'] - - -class NativeSource(object): - """A source implemented by Dataflow service. - - This class is to be only inherited by sources natively implemented by Cloud - Dataflow service, hence should not be sub-classed by users. - - This class is deprecated and should not be used to define new sources. - """ - - def reader(self): - """Returns a NativeSourceReader instance associated with this source.""" - raise NotImplementedError - - def __repr__(self): - return '<{name} {vals}>'.format( - name=self.__class__.__name__, - vals=', '.join(_dict_printable_fields(self.__dict__, - _minor_fields))) - - -class NativeSourceReader(object): - """A reader for a source implemented by Dataflow service.""" - - def __enter__(self): - """Opens everything necessary for a reader to function properly.""" - raise NotImplementedError - - def __exit__(self, exception_type, exception_value, traceback): - """Cleans up after a reader executed.""" - raise NotImplementedError - - def __iter__(self): - """Returns an iterator over all the records of the source.""" - raise NotImplementedError - - @property - def returns_windowed_values(self): - """Returns whether this reader returns windowed values.""" - return False - - def get_progress(self): - """Returns a representation of how far the reader has read. - - Returns: - A SourceReaderProgress object that gives the current progress of the - reader. - """ - return - - def request_dynamic_split(self, dynamic_split_request): - """Attempts to split the input in two parts. - - The two parts are named the "primary" part and the "residual" part. The - current 'NativeSourceReader' keeps processing the primary part, while the - residual part will be processed elsewhere (e.g. perhaps on a different - worker). - - The primary and residual parts, if concatenated, must represent the - same input as the current input of this 'NativeSourceReader' before this - call. - - The boundary between the primary part and the residual part is - specified in a framework-specific way using 'DynamicSplitRequest' e.g., - if the framework supports the notion of positions, it might be a - position at which the input is asked to split itself (which is not - necessarily the same position at which it *will* split itself); it - might be an approximate fraction of input, or something else. - - This function returns a 'DynamicSplitResult', which encodes, in a - framework-specific way, the information sufficient to construct a - description of the resulting primary and residual inputs. For example, it - might, again, be a position demarcating these parts, or it might be a pair - of fully-specified input descriptions, or something else. - - After a successful call to 'request_dynamic_split()', subsequent calls - should be interpreted relative to the new primary. - - Args: - dynamic_split_request: A 'DynamicSplitRequest' describing the split - request. - - Returns: - 'None' if the 'DynamicSplitRequest' cannot be honored (in that - case the input represented by this 'NativeSourceReader' stays the same), - or a 'DynamicSplitResult' describing how the input was split into a - primary and residual part. - """ - logging.debug( - 'SourceReader %r does not support dynamic splitting. Ignoring dynamic ' - 'split request: %r', - self, dynamic_split_request) - return - - -class ReaderProgress(object): - """A representation of how far a NativeSourceReader has read.""" - - def __init__(self, position=None, percent_complete=None, remaining_time=None): - - self._position = position - - if percent_complete is not None: - percent_complete = float(percent_complete) - if percent_complete < 0 or percent_complete > 1: - raise ValueError( - 'The percent_complete argument was %f. Must be in range [0, 1].' - % percent_complete) - self._percent_complete = percent_complete - - self._remaining_time = remaining_time - - @property - def position(self): - """Returns progress, represented as a ReaderPosition object.""" - return self._position - - @property - def percent_complete(self): - """Returns progress, represented as a percentage of total work. - - Progress range from 0.0 (beginning, nothing complete) to 1.0 (end of the - work range, entire WorkItem complete). - - Returns: - Progress represented as a percentage of total work. - """ - return self._percent_complete - - @property - def remaining_time(self): - """Returns progress, represented as an estimated time remaining.""" - return self._remaining_time - - -class ReaderPosition(object): - """A representation of position in an iteration of a 'NativeSourceReader'.""" - - def __init__(self, end=None, key=None, byte_offset=None, record_index=None, - shuffle_position=None, concat_position=None): - """Initializes ReaderPosition. - - A ReaderPosition may get instantiated for one of these position types. Only - one of these should be specified. - - Args: - end: position is past all other positions. For example, this may be used - to represent the end position of an unbounded range. - key: position is a string key. - byte_offset: position is a byte offset. - record_index: position is a record index - shuffle_position: position is a base64 encoded shuffle position. - concat_position: position is a 'ConcatPosition'. - """ - - self.end = end - self.key = key - self.byte_offset = byte_offset - self.record_index = record_index - self.shuffle_position = shuffle_position - - if concat_position is not None: - assert isinstance(concat_position, ConcatPosition) - self.concat_position = concat_position - - -class ConcatPosition(object): - """A position that encapsulate an inner position and an index. - - This is used to represent the position of a source that encapsulate several - other sources. - """ - - def __init__(self, index, position): - """Initializes ConcatPosition. - - Args: - index: index of the source currently being read. - position: inner position within the source currently being read. - """ - - if position is not None: - assert isinstance(position, ReaderPosition) - self.index = index - self.position = position - - -class DynamicSplitRequest(object): - """Specifies how 'NativeSourceReader.request_dynamic_split' should split. - """ - - def __init__(self, progress): - assert isinstance(progress, ReaderProgress) - self.progress = progress - - -class DynamicSplitResult(object): - pass - - -class DynamicSplitResultWithPosition(DynamicSplitResult): - - def __init__(self, stop_position): - assert isinstance(stop_position, ReaderPosition) - self.stop_position = stop_position - - -class NativeSink(object): - """A sink implemented by Dataflow service. - - This class is to be only inherited by sinks natively implemented by Cloud - Dataflow service, hence should not be sub-classed by users. - """ - - def writer(self): - """Returns a SinkWriter for this source.""" - raise NotImplementedError - - def __repr__(self): - return '<{name} {vals}>'.format( - name=self.__class__.__name__, - vals=_dict_printable_fields(self.__dict__, _minor_fields)) - - -class NativeSinkWriter(object): - """A writer for a sink implemented by Dataflow service.""" - - def __enter__(self): - """Opens everything necessary for a writer to function properly.""" - raise NotImplementedError - - def __exit__(self, exception_type, exception_value, traceback): - """Cleans up after a writer executed.""" - raise NotImplementedError - - @property - def takes_windowed_values(self): - """Returns whether this writer takes windowed values.""" - return False - - def Write(self, o): # pylint: disable=invalid-name - """Writes a record to the sink associated with this writer.""" - raise NotImplementedError +from apache_beam.runners.dataflow.native_io.iobase import * +from apache_beam.runners.dataflow.native_io.iobase import _NativeWrite # Encapsulates information about a bundle of a source generated when method @@ -887,34 +632,6 @@ class Writer(object): raise NotImplementedError -class _NativeWrite(ptransform.PTransform): - """A PTransform for writing to a Dataflow native sink. - - These are sinks that are implemented natively by the Dataflow service - and hence should not be updated by users. These sinks are processed - using a Dataflow native write transform. - - Applying this transform results in a ``pvalue.PDone``. - """ - - def __init__(self, *args, **kwargs): - """Initializes a Write transform. - - Args: - *args: A tuple of position arguments. - **kwargs: A dictionary of keyword arguments. - - The *args, **kwargs are expected to be (label, sink) or (sink). - """ - label, sink = self.parse_label_and_arg(args, kwargs, 'sink') - super(_NativeWrite, self).__init__(label) - self.sink = sink - - def apply(self, pcoll): - self._check_pcollection(pcoll) - return pvalue.PDone(pcoll.pipeline) - - class Read(ptransform.PTransform): """A transform that reads a PCollection.""" http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78520758/sdks/python/apache_beam/runners/dataflow/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/__init__.py b/sdks/python/apache_beam/runners/dataflow/__init__.py new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78520758/sdks/python/apache_beam/runners/dataflow/native_io/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/__init__.py b/sdks/python/apache_beam/runners/dataflow/native_io/__init__.py new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78520758/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py new file mode 100644 index 0000000..bccca9f --- /dev/null +++ b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py @@ -0,0 +1,319 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Dataflow native sources and sinks. +""" + +from collections import namedtuple + +import logging +import random +import uuid + +from apache_beam import pvalue +from apache_beam.coders import PickleCoder +from apache_beam.pvalue import AsIter +from apache_beam.pvalue import AsSingleton +from apache_beam.transforms import core +from apache_beam.transforms import ptransform +from apache_beam.transforms import window + + +def _dict_printable_fields(dict_object, skip_fields): + """Returns a list of strings for the interesting fields of a dict.""" + return ['%s=%r' % (name, value) + for name, value in dict_object.iteritems() + # want to output value 0 but not None nor [] + if (value or value == 0) + and name not in skip_fields] + +_minor_fields = ['coder', 'key_coder', 'value_coder', + 'config_bytes', 'elements', + 'append_trailing_newlines', 'strip_trailing_newlines', + 'compression_type'] + + +class NativeSource(object): + """A source implemented by Dataflow service. + + This class is to be only inherited by sources natively implemented by Cloud + Dataflow service, hence should not be sub-classed by users. + + This class is deprecated and should not be used to define new sources. + """ + + def reader(self): + """Returns a NativeSourceReader instance associated with this source.""" + raise NotImplementedError + + def __repr__(self): + return '<{name} {vals}>'.format( + name=self.__class__.__name__, + vals=', '.join(_dict_printable_fields(self.__dict__, + _minor_fields))) + + +class NativeSourceReader(object): + """A reader for a source implemented by Dataflow service.""" + + def __enter__(self): + """Opens everything necessary for a reader to function properly.""" + raise NotImplementedError + + def __exit__(self, exception_type, exception_value, traceback): + """Cleans up after a reader executed.""" + raise NotImplementedError + + def __iter__(self): + """Returns an iterator over all the records of the source.""" + raise NotImplementedError + + @property + def returns_windowed_values(self): + """Returns whether this reader returns windowed values.""" + return False + + def get_progress(self): + """Returns a representation of how far the reader has read. + + Returns: + A SourceReaderProgress object that gives the current progress of the + reader. + """ + return + + def request_dynamic_split(self, dynamic_split_request): + """Attempts to split the input in two parts. + + The two parts are named the "primary" part and the "residual" part. The + current 'NativeSourceReader' keeps processing the primary part, while the + residual part will be processed elsewhere (e.g. perhaps on a different + worker). + + The primary and residual parts, if concatenated, must represent the + same input as the current input of this 'NativeSourceReader' before this + call. + + The boundary between the primary part and the residual part is + specified in a framework-specific way using 'DynamicSplitRequest' e.g., + if the framework supports the notion of positions, it might be a + position at which the input is asked to split itself (which is not + necessarily the same position at which it *will* split itself); it + might be an approximate fraction of input, or something else. + + This function returns a 'DynamicSplitResult', which encodes, in a + framework-specific way, the information sufficient to construct a + description of the resulting primary and residual inputs. For example, it + might, again, be a position demarcating these parts, or it might be a pair + of fully-specified input descriptions, or something else. + + After a successful call to 'request_dynamic_split()', subsequent calls + should be interpreted relative to the new primary. + + Args: + dynamic_split_request: A 'DynamicSplitRequest' describing the split + request. + + Returns: + 'None' if the 'DynamicSplitRequest' cannot be honored (in that + case the input represented by this 'NativeSourceReader' stays the same), + or a 'DynamicSplitResult' describing how the input was split into a + primary and residual part. + """ + logging.debug( + 'SourceReader %r does not support dynamic splitting. Ignoring dynamic ' + 'split request: %r', + self, dynamic_split_request) + return + + +class ReaderProgress(object): + """A representation of how far a NativeSourceReader has read.""" + + def __init__(self, position=None, percent_complete=None, remaining_time=None): + + self._position = position + + if percent_complete is not None: + percent_complete = float(percent_complete) + if percent_complete < 0 or percent_complete > 1: + raise ValueError( + 'The percent_complete argument was %f. Must be in range [0, 1].' + % percent_complete) + self._percent_complete = percent_complete + + self._remaining_time = remaining_time + + @property + def position(self): + """Returns progress, represented as a ReaderPosition object.""" + return self._position + + @property + def percent_complete(self): + """Returns progress, represented as a percentage of total work. + + Progress range from 0.0 (beginning, nothing complete) to 1.0 (end of the + work range, entire WorkItem complete). + + Returns: + Progress represented as a percentage of total work. + """ + return self._percent_complete + + @property + def remaining_time(self): + """Returns progress, represented as an estimated time remaining.""" + return self._remaining_time + + +class ReaderPosition(object): + """A representation of position in an iteration of a 'NativeSourceReader'.""" + + def __init__(self, end=None, key=None, byte_offset=None, record_index=None, + shuffle_position=None, concat_position=None): + """Initializes ReaderPosition. + + A ReaderPosition may get instantiated for one of these position types. Only + one of these should be specified. + + Args: + end: position is past all other positions. For example, this may be used + to represent the end position of an unbounded range. + key: position is a string key. + byte_offset: position is a byte offset. + record_index: position is a record index + shuffle_position: position is a base64 encoded shuffle position. + concat_position: position is a 'ConcatPosition'. + """ + + self.end = end + self.key = key + self.byte_offset = byte_offset + self.record_index = record_index + self.shuffle_position = shuffle_position + + if concat_position is not None: + assert isinstance(concat_position, ConcatPosition) + self.concat_position = concat_position + + +class ConcatPosition(object): + """A position that encapsulate an inner position and an index. + + This is used to represent the position of a source that encapsulate several + other sources. + """ + + def __init__(self, index, position): + """Initializes ConcatPosition. + + Args: + index: index of the source currently being read. + position: inner position within the source currently being read. + """ + + if position is not None: + assert isinstance(position, ReaderPosition) + self.index = index + self.position = position + + +class DynamicSplitRequest(object): + """Specifies how 'NativeSourceReader.request_dynamic_split' should split. + """ + + def __init__(self, progress): + assert isinstance(progress, ReaderProgress) + self.progress = progress + + +class DynamicSplitResult(object): + pass + + +class DynamicSplitResultWithPosition(DynamicSplitResult): + + def __init__(self, stop_position): + assert isinstance(stop_position, ReaderPosition) + self.stop_position = stop_position + + +class NativeSink(object): + """A sink implemented by Dataflow service. + + This class is to be only inherited by sinks natively implemented by Cloud + Dataflow service, hence should not be sub-classed by users. + """ + + def writer(self): + """Returns a SinkWriter for this source.""" + raise NotImplementedError + + def __repr__(self): + return '<{name} {vals}>'.format( + name=self.__class__.__name__, + vals=_dict_printable_fields(self.__dict__, _minor_fields)) + + +class NativeSinkWriter(object): + """A writer for a sink implemented by Dataflow service.""" + + def __enter__(self): + """Opens everything necessary for a writer to function properly.""" + raise NotImplementedError + + def __exit__(self, exception_type, exception_value, traceback): + """Cleans up after a writer executed.""" + raise NotImplementedError + + @property + def takes_windowed_values(self): + """Returns whether this writer takes windowed values.""" + return False + + def Write(self, o): # pylint: disable=invalid-name + """Writes a record to the sink associated with this writer.""" + raise NotImplementedError + + +class _NativeWrite(ptransform.PTransform): + """A PTransform for writing to a Dataflow native sink. + + These are sinks that are implemented natively by the Dataflow service + and hence should not be updated by users. These sinks are processed + using a Dataflow native write transform. + + Applying this transform results in a ``pvalue.PDone``. + """ + + def __init__(self, *args, **kwargs): + """Initializes a Write transform. + + Args: + *args: A tuple of position arguments. + **kwargs: A dictionary of keyword arguments. + + The *args, **kwargs are expected to be (label, sink) or (sink). + """ + label, sink = self.parse_label_and_arg(args, kwargs, 'sink') + super(_NativeWrite, self).__init__(label) + self.sink = sink + + def apply(self, pcoll): + self._check_pcollection(pcoll) + return pvalue.PDone(pcoll.pipeline)