Makes Python API reference generation more strict
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e1baf55d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e1baf55d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e1baf55d Branch: refs/heads/master Commit: e1baf55d82fcc4a3951057b2321f77319d88b6c3 Parents: d035a34 Author: David Cavazos <dcava...@google.com> Authored: Fri Jul 21 09:58:11 2017 -0700 Committer: Ahmet Altay <al...@google.com> Committed: Fri Aug 11 17:01:34 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/__init__.py | 47 ++-- .../apache_beam/internal/gcp/json_value.py | 45 ++-- sdks/python/apache_beam/io/avroio.py | 79 ++++-- sdks/python/apache_beam/io/filebasedsink.py | 18 +- sdks/python/apache_beam/io/filebasedsource.py | 67 +++-- sdks/python/apache_beam/io/filesystem.py | 27 +- sdks/python/apache_beam/io/gcp/bigquery.py | 257 +++++++++++-------- sdks/python/apache_beam/io/gcp/gcsio.py | 12 +- sdks/python/apache_beam/io/range_trackers.py | 12 +- sdks/python/apache_beam/io/source_test_utils.py | 88 ++++--- sdks/python/apache_beam/io/textio.py | 121 +++++---- sdks/python/apache_beam/pipeline.py | 89 ++++--- sdks/python/apache_beam/runners/runner.py | 31 ++- .../python/apache_beam/testing/test_pipeline.py | 48 ++-- sdks/python/apache_beam/transforms/core.py | 165 +++++++----- sdks/python/apache_beam/transforms/display.py | 87 ++++--- .../python/apache_beam/transforms/ptransform.py | 61 +++-- sdks/python/apache_beam/typehints/decorators.py | 104 +++++--- .../typehints/native_type_compatibility.py | 7 +- sdks/python/apache_beam/typehints/typehints.py | 38 +-- sdks/python/generate_pydoc.sh | 134 ++++++++-- sdks/python/tox.ini | 1 + 22 files changed, 953 insertions(+), 585 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/__init__.py b/sdks/python/apache_beam/__init__.py index 8b772c9..791ebb7 100644 --- a/sdks/python/apache_beam/__init__.py +++ b/sdks/python/apache_beam/__init__.py @@ -15,11 +15,12 @@ # limitations under the License. # -"""Apache Beam SDK for Python. +""" +Apache Beam SDK for Python +========================== -Apache Beam <https://beam.apache.org/> -provides a simple, powerful programming model for building both batch -and streaming parallel data processing pipelines. +`Apache Beam <https://beam.apache.org>`_ provides a simple, powerful programming +model for building both batch and streaming parallel data processing pipelines. The Apache Beam SDK for Python provides access to Apache Beam capabilities from the Python programming language. @@ -33,32 +34,40 @@ Overview -------- The key concepts in this programming model are -* PCollection: represents a collection of data, which could be - bounded or unbounded in size. -* PTransform: represents a computation that transforms input - PCollections into output PCollections. -* Pipeline: manages a directed acyclic graph of PTransforms and - PCollections that is ready for execution. -* Runner: specifies where and how the Pipeline should execute. -* Reading and Writing Data: your pipeline can read from an external - source and write to an external data sink. +* :class:`~apache_beam.pvalue.PCollection`: represents a collection of data, + which could be bounded or unbounded in size. +* :class:`~apache_beam.transforms.ptransform.PTransform`: represents a + computation that transforms input PCollections into output PCollections. +* :class:`~apache_beam.pipeline.Pipeline`: manages a directed acyclic graph of + :class:`~apache_beam.transforms.ptransform.PTransform` s and + :class:`~apache_beam.pvalue.PCollection` s that is ready for execution. +* :class:`~apache_beam.runners.runner.PipelineRunner`: specifies where and how + the pipeline should execute. +* :class:`~apache_beam.io.iobase.Read`: read from an external source. +* :class:`~apache_beam.io.iobase.Write`: write to an external data sink. Typical usage ------------- At the top of your source file:: - import apache_beam as beam + import apache_beam as beam After this import statement -* transform classes are available as beam.FlatMap, beam.GroupByKey, etc. -* Pipeline class is available as beam.Pipeline -* text read/write transforms are available as beam.io.ReadfromText, - beam.io.WriteToText +* Transform classes are available as + :class:`beam.FlatMap <apache_beam.transforms.core.FlatMap>`, + :class:`beam.GroupByKey <apache_beam.transforms.core.GroupByKey>`, etc. +* Pipeline class is available as + :class:`beam.Pipeline <apache_beam.pipeline.Pipeline>` +* Text read/write transforms are available as + :class:`beam.io.ReadFromText <apache_beam.io.textio.ReadFromText>`, + :class:`beam.io.WriteToText <apache_beam.io.textio.WriteToText>`. Examples -------- -The examples subdirectory has some examples. +The `examples subdirectory +<https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples>`_ +has some examples. """ http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/internal/gcp/json_value.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/internal/gcp/json_value.py b/sdks/python/apache_beam/internal/gcp/json_value.py index 59f8b60..167b173 100644 --- a/sdks/python/apache_beam/internal/gcp/json_value.py +++ b/sdks/python/apache_beam/internal/gcp/json_value.py @@ -41,11 +41,12 @@ def get_typed_value_descriptor(obj): obj: A basestring, bool, int, or float to be converted. Returns: - A dictionary containing the keys '@type' and 'value' with the value for - the @type of appropriate type. + A dictionary containing the keys ``@type`` and ``value`` with the value for + the ``@type`` of appropriate type. Raises: - TypeError: if the Python object has a type that is not supported. + ~exceptions.TypeError: if the Python object has a type that is not + supported. """ if isinstance(obj, basestring): type_name = 'Text' @@ -66,21 +67,23 @@ def to_json_value(obj, with_type=False): Converts Python objects into extra_types.JsonValue objects. Args: - obj: Python object to be converted. Can be 'None'. - with_type: If true then the basic types (string, int, float, bool) will - be wrapped in @type/value dictionaries. Otherwise the straight value is - encoded into a JsonValue. + obj: Python object to be converted. Can be :data:`None`. + with_type: If true then the basic types (``string``, ``int``, ``float``, + ``bool``) will be wrapped in ``@type:value`` dictionaries. Otherwise the + straight value is encoded into a ``JsonValue``. Returns: - A JsonValue object using JsonValue, JsonArray and JsonObject types for the - corresponding values, lists, or dictionaries. + A ``JsonValue`` object using ``JsonValue``, ``JsonArray`` and ``JsonObject`` + types for the corresponding values, lists, or dictionaries. Raises: - TypeError: if the Python object contains a type that is not supported. + ~exceptions.TypeError: if the Python object contains a type that is not + supported. - The types supported are str, bool, list, tuple, dict, and None. The Dataflow - API requires JsonValue(s) in many places, and it is quite convenient to be - able to specify these hierarchical objects using Python syntax. + The types supported are ``str``, ``bool``, ``list``, ``tuple``, ``dict``, and + ``None``. The Dataflow API requires JsonValue(s) in many places, and it is + quite convenient to be able to specify these hierarchical objects using + Python syntax. """ if obj is None: return extra_types.JsonValue(is_null=True) @@ -121,21 +124,23 @@ def to_json_value(obj, with_type=False): def from_json_value(v): """For internal use only; no backwards-compatibility guarantees. - Converts extra_types.JsonValue objects into Python objects. + Converts ``extra_types.JsonValue`` objects into Python objects. Args: - v: JsonValue object to be converted. + v: ``JsonValue`` object to be converted. Returns: A Python object structured as values, lists, and dictionaries corresponding - to JsonValue, JsonArray and JsonObject types. + to ``JsonValue``, ``JsonArray`` and ``JsonObject`` types. Raises: - TypeError: if the JsonValue object contains a type that is not supported. + ~exceptions.TypeError: if the ``JsonValue`` object contains a type that is + not supported. - The types supported are str, bool, list, dict, and None. The Dataflow API - returns JsonValue(s) in many places and it is quite convenient to be able to - convert these hierarchical objects to much simpler Python objects. + The types supported are ``str``, ``bool``, ``list``, ``dict``, and ``None``. + The Dataflow API returns JsonValue(s) in many places and it is quite + convenient to be able to convert these hierarchical objects to much simpler + Python objects. """ if isinstance(v, extra_types.JsonValue): if v.string_value is not None: http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/avroio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py index 47ea282..cb14c65 100644 --- a/sdks/python/apache_beam/io/avroio.py +++ b/sdks/python/apache_beam/io/avroio.py @@ -64,27 +64,74 @@ __all__ = ['ReadFromAvro', 'ReadAllFromAvro', 'WriteToAvro'] class ReadFromAvro(PTransform): - """A ``PTransform`` for reading Avro files. + """A :class:`~apache_beam.transforms.ptransform.PTransform` for reading avro + files.""" - Uses source '_AvroSource' to read a set of Avro files defined by a given - file pattern. - If '/mypath/myavrofiles*' is a file-pattern that points to a set of Avro - files, a ``PCollection`` for the records in these Avro files can be created - in the following manner. + def __init__(self, file_pattern=None, min_bundle_size=0, validate=True): + """Initializes :class:`ReadFromAvro`. - p = df.Pipeline(argv=pipeline_args) - records = p | 'Read' >> df.io.ReadFromAvro('/mypath/myavrofiles*') - """ + Uses source :class:`~apache_beam.io._AvroSource` to read a set of Avro + files defined by a given file pattern. - def __init__(self, file_pattern=None, min_bundle_size=0, validate=True): - """Initializes ``ReadFromAvro``. + If ``/mypath/myavrofiles*`` is a file-pattern that points to a set of Avro + files, a :class:`~apache_beam.pvalue.PCollection` for the records in + these Avro files can be created in the following manner. + + .. testcode:: + + with beam.Pipeline() as p: + records = p | 'Read' >> beam.io.ReadFromAvro('/mypath/myavrofiles*') + + .. NOTE: We're not actually interested in this error; but if we get here, + it means that the way of calling this transform hasn't changed. + + .. testoutput:: + :hide: + + Traceback (most recent call last): + ... + IOError: No files found based on the file pattern + + Each record of this :class:`~apache_beam.pvalue.PCollection` will contain + a single record read from a source. Records that are of simple types will be + mapped into corresponding Python types. Records that are of Avro type + ``RECORD`` will be mapped to Python dictionaries that comply with the schema + contained in the Avro file that contains those records. In this case, keys + of each dictionary will contain the corresponding field names and will be of + type :class:`str` while the values of the dictionary will be of the type + defined in the corresponding Avro schema. + + For example, if schema of the Avro file is the following. :: + + { + "namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + + {"name": "name", + "type": "string"}, + + {"name": "favorite_number", + "type": ["int", "null"]}, + + {"name": "favorite_color", + "type": ["string", "null"]} + + ] + } + + Then records generated by :class:`~apache_beam.io._AvroSource` will be + dictionaries of the following form. :: + + {u'name': u'Alyssa', u'favorite_number': 256, u'favorite_color': None}). Args: - file_pattern: the set of files to be read. - min_bundle_size: the minimum size in bytes, to be considered when - splitting the input into bundles. - validate: flag to verify that the files exist during the pipeline - creation time. + file_pattern (str): the file glob to read + min_bundle_size (int): the minimum size in bytes, to be considered when + splitting the input into bundles. + validate (bool): flag to verify that the files exist during the pipeline + creation time. """ super(ReadFromAvro, self).__init__() self._source = _AvroSource(file_pattern, min_bundle_size, validate=validate) http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/filebasedsink.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filebasedsink.py b/sdks/python/apache_beam/io/filebasedsink.py index 76c09fc..eb99d08 100644 --- a/sdks/python/apache_beam/io/filebasedsink.py +++ b/sdks/python/apache_beam/io/filebasedsink.py @@ -44,12 +44,13 @@ class FileBasedSink(iobase.Sink): """A sink to a GCS or local files. To implement a file-based sink, extend this class and override - either ``write_record()`` or ``write_encoded_record()``. + either :meth:`.write_record()` or :meth:`.write_encoded_record()`. - If needed, also overwrite ``open()`` and/or ``close()`` to customize the - file handling or write headers and footers. + If needed, also overwrite :meth:`.open()` and/or :meth:`.close()` to customize + the file handling or write headers and footers. - The output of this write is a PCollection of all written shards. + The output of this write is a :class:`~apache_beam.pvalue.PCollection` of + all written shards. """ # Max number of threads to be used for renaming. @@ -65,9 +66,12 @@ class FileBasedSink(iobase.Sink): compression_type=CompressionTypes.AUTO): """ Raises: - TypeError: if file path parameters are not a string or ValueProvider, - or if compression_type is not member of CompressionTypes. - ValueError: if shard_name_template is not of expected format. + ~exceptions.TypeError: if file path parameters are not a :class:`str` or + :class:`~apache_beam.options.value_provider.ValueProvider`, or if + **compression_type** is not member of + :class:`~apache_beam.io.filesystem.CompressionTypes`. + ~exceptions.ValueError: if **shard_name_template** is not of expected + format. """ if not isinstance(file_path_prefix, (basestring, ValueProvider)): raise TypeError('file_path_prefix must be a string or ValueProvider;' http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/filebasedsource.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index f78bf3f..6496930 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -17,12 +17,13 @@ """A framework for developing sources for new file types. -To create a source for a new file type a sub-class of ``FileBasedSource`` should -be created. Sub-classes of ``FileBasedSource`` must implement the method -``FileBasedSource.read_records()``. Please read the documentation of that method -for more details. +To create a source for a new file type a sub-class of :class:`FileBasedSource` +should be created. Sub-classes of :class:`FileBasedSource` must implement the +method :meth:`FileBasedSource.read_records()`. Please read the documentation of +that method for more details. -For an example implementation of ``FileBasedSource`` see ``avroio.AvroSource``. +For an example implementation of :class:`FileBasedSource` see +:class:`~apache_beam.io._AvroSource`. """ import uuid @@ -51,7 +52,8 @@ __all__ = ['FileBasedSource'] class FileBasedSource(iobase.BoundedSource): - """A ``BoundedSource`` for reading a file glob of a given type.""" + """A :class:`~apache_beam.io.iobase.BoundedSource` for reading a file glob of + a given type.""" MIN_NUMBER_OF_FILES_TO_STAT = 100 MIN_FRACTION_OF_FILES_TO_STAT = 0.01 @@ -62,31 +64,40 @@ class FileBasedSource(iobase.BoundedSource): compression_type=CompressionTypes.AUTO, splittable=True, validate=True): - """Initializes ``FileBasedSource``. + """Initializes :class:`FileBasedSource`. Args: - file_pattern: the file glob to read a string or a ValueProvider - (placeholder to inject a runtime value). - min_bundle_size: minimum size of bundles that should be generated when - performing initial splitting on this source. - compression_type: compression type to use - splittable: whether FileBasedSource should try to logically split a single - file into data ranges so that different parts of the same file - can be read in parallel. If set to False, FileBasedSource will - prevent both initial and dynamic splitting of sources for - single files. File patterns that represent multiple files may - still get split into sources for individual files. Even if set - to True by the user, FileBasedSource may choose to not split - the file, for example, for compressed files where currently - it is not possible to efficiently read a data range without - decompressing the whole file. - validate: Boolean flag to verify that the files exist during the pipeline - creation time. + file_pattern (str): the file glob to read a string or a + :class:`~apache_beam.options.value_provider.ValueProvider` + (placeholder to inject a runtime value). + min_bundle_size (str): minimum size of bundles that should be generated + when performing initial splitting on this source. + compression_type (str): Used to handle compressed output files. + Typical value is :attr:`CompressionTypes.AUTO + <apache_beam.io.filesystem.CompressionTypes.AUTO>`, + in which case the final file path's extension will be used to detect + the compression. + splittable (bool): whether :class:`FileBasedSource` should try to + logically split a single file into data ranges so that different parts + of the same file can be read in parallel. If set to :data:`False`, + :class:`FileBasedSource` will prevent both initial and dynamic splitting + of sources for single files. File patterns that represent multiple files + may still get split into sources for individual files. Even if set to + :data:`True` by the user, :class:`FileBasedSource` may choose to not + split the file, for example, for compressed files where currently it is + not possible to efficiently read a data range without decompressing the + whole file. + validate (bool): Boolean flag to verify that the files exist during the + pipeline creation time. + Raises: - TypeError: when compression_type is not valid or if file_pattern is not a - string or a ValueProvider. - ValueError: when compression and splittable files are specified. - IOError: when the file pattern specified yields an empty result. + ~exceptions.TypeError: when **compression_type** is not valid or if + **file_pattern** is not a :class:`str` or a + :class:`~apache_beam.options.value_provider.ValueProvider`. + ~exceptions.ValueError: when compression and splittable files are + specified. + ~exceptions.IOError: when the file pattern specified yields an empty + result. """ if not isinstance(file_pattern, (basestring, ValueProvider)): http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/filesystem.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py index ef3040c..5804d00 100644 --- a/sdks/python/apache_beam/io/filesystem.py +++ b/sdks/python/apache_beam/io/filesystem.py @@ -299,23 +299,28 @@ class CompressedFile(object): """Set the file's current offset. Seeking behavior: - * seeking from the end (SEEK_END) the whole file is decompressed once to - determine it's size. Therefore it is preferred to use - SEEK_SET or SEEK_CUR to avoid the processing overhead - * seeking backwards from the current position rewinds the file to 0 + + * seeking from the end :data:`os.SEEK_END` the whole file is decompressed + once to determine it's size. Therefore it is preferred to use + :data:`os.SEEK_SET` or :data:`os.SEEK_CUR` to avoid the processing + overhead + * seeking backwards from the current position rewinds the file to ``0`` and decompresses the chunks to the requested offset * seeking is only supported in files opened for reading - * if the new offset is out of bound, it is adjusted to either 0 or EOF. + * if the new offset is out of bound, it is adjusted to either ``0`` or + ``EOF``. Args: - offset: seek offset in the uncompressed content represented as number - whence: seek mode. Supported modes are os.SEEK_SET (absolute seek), - os.SEEK_CUR (seek relative to the current position), and os.SEEK_END - (seek relative to the end, offset should be negative). + offset (int): seek offset in the uncompressed content represented as + number + whence (int): seek mode. Supported modes are :data:`os.SEEK_SET` + (absolute seek), :data:`os.SEEK_CUR` (seek relative to the current + position), and :data:`os.SEEK_END` (seek relative to the end, offset + should be negative). Raises: - IOError: When this buffer is closed. - ValueError: When whence is invalid or the file is not seekable + ~exceptions.IOError: When this buffer is closed. + ~exceptions.ValueError: When whence is invalid or the file is not seekable """ if whence == os.SEEK_SET: absolute_offset = offset http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/gcp/bigquery.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index db6715a..33d67bf 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -330,45 +330,49 @@ class BigQuerySource(dataflow_io.NativeSource): def __init__(self, table=None, dataset=None, project=None, query=None, validate=False, coder=None, use_standard_sql=False, flatten_results=True): - """Initialize a BigQuerySource. + """Initialize a :class:`BigQuerySource`. Args: - table: The ID of a BigQuery table. If specified all data of the table - will be used as input of the current source. The ID must contain only - letters (a-z, A-Z), numbers (0-9), or underscores (_). If dataset - and query arguments are None then the table argument must contain the - entire table reference specified as: 'DATASET.TABLE' or - 'PROJECT:DATASET.TABLE'. - dataset: The ID of the dataset containing this table or null if the table - reference is specified entirely by the table argument or a query is - specified. - project: The ID of the project containing this table or null if the table - reference is specified entirely by the table argument or a query is - specified. - query: A query to be used instead of arguments table, dataset, and + table (str): The ID of a BigQuery table. If specified all data of the + table will be used as input of the current source. The ID must contain + only letters ``a-z``, ``A-Z``, numbers ``0-9``, or underscores + ``_``. If dataset and query arguments are :data:`None` then the table + argument must contain the entire table reference specified as: + ``'DATASET.TABLE'`` or ``'PROJECT:DATASET.TABLE'``. + dataset (str): The ID of the dataset containing this table or + :data:`None` if the table reference is specified entirely by the table + argument or a query is specified. + project (str): The ID of the project containing this table or + :data:`None` if the table reference is specified entirely by the table + argument or a query is specified. + query (str): A query to be used instead of arguments table, dataset, and project. - validate: If true, various checks will be done when source gets - initialized (e.g., is table present?). This should be True for most - scenarios in order to catch errors as early as possible (pipeline - construction instead of pipeline execution). It should be False if the - table is created during pipeline execution by a previous step. - coder: The coder for the table rows if serialized to disk. If None, then - the default coder is RowAsDictJsonCoder, which will interpret every line - in a file as a JSON serialized dictionary. This argument needs a value - only in special cases when returning table rows as dictionaries is not - desirable. - use_standard_sql: Specifies whether to use BigQuery's standard - SQL dialect for this query. The default value is False. If set to True, - the query will use BigQuery's updated SQL dialect with improved - standards compliance. This parameter is ignored for table inputs. - flatten_results: Flattens all nested and repeated fields in the - query results. The default value is true. + validate (bool): If :data:`True`, various checks will be done when source + gets initialized (e.g., is table present?). This should be + :data:`True` for most scenarios in order to catch errors as early as + possible (pipeline construction instead of pipeline execution). It + should be :data:`False` if the table is created during pipeline + execution by a previous step. + coder (~apache_beam.coders.coders.Coder): The coder for the table + rows if serialized to disk. If :data:`None`, then the default coder is + :class:`~apache_beam.io.gcp.bigquery.RowAsDictJsonCoder`, + which will interpret every line in a file as a JSON serialized + dictionary. This argument needs a value only in special cases when + returning table rows as dictionaries is not desirable. + use_standard_sql (bool): Specifies whether to use BigQuery's standard SQL + dialect for this query. The default value is :data:`False`. + If set to :data:`True`, the query will use BigQuery's updated SQL + dialect with improved standards compliance. + This parameter is ignored for table inputs. + flatten_results (bool): Flattens all nested and repeated fields in the + query results. The default value is :data:`True`. Raises: - ValueError: if any of the following is true - (1) the table reference as a string does not match the expected format - (2) neither a table nor a query is specified - (3) both a table and a query is specified. + ~exceptions.ValueError: if any of the following is true: + + 1) the table reference as a string does not match the expected format + 2) neither a table nor a query is specified + 3) both a table and a query is specified. """ # Import here to avoid adding the dependency for local running scenarios. @@ -439,46 +443,62 @@ class BigQuerySink(dataflow_io.NativeSink): """Initialize a BigQuerySink. Args: - table: The ID of the table. The ID must contain only letters - (a-z, A-Z), numbers (0-9), or underscores (_). If dataset argument is - None then the table argument must contain the entire table reference - specified as: 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'. - dataset: The ID of the dataset containing this table or null if the table - reference is specified entirely by the table argument. - project: The ID of the project containing this table or null if the table - reference is specified entirely by the table argument. - schema: The schema to be used if the BigQuery table to write has to be - created. This can be either specified as a 'bigquery.TableSchema' object - or a single string of the form 'field1:type1,field2:type2,field3:type3' - that defines a comma separated list of fields. Here 'type' should - specify the BigQuery type of the field. Single string based schemas do - not support nested fields, repeated fields, or specifying a BigQuery - mode for fields (mode will always be set to 'NULLABLE'). - create_disposition: A string describing what happens if the table does not - exist. Possible values are: - - BigQueryDisposition.CREATE_IF_NEEDED: create if does not exist. - - BigQueryDisposition.CREATE_NEVER: fail the write if does not exist. - write_disposition: A string describing what happens if the table has - already some data. Possible values are: - - BigQueryDisposition.WRITE_TRUNCATE: delete existing rows. - - BigQueryDisposition.WRITE_APPEND: add to existing rows. - - BigQueryDisposition.WRITE_EMPTY: fail the write if table not empty. - validate: If true, various checks will be done when sink gets - initialized (e.g., is table present given the disposition arguments?). - This should be True for most scenarios in order to catch errors as early - as possible (pipeline construction instead of pipeline execution). It - should be False if the table is created during pipeline execution by a - previous step. - coder: The coder for the table rows if serialized to disk. If None, then - the default coder is RowAsDictJsonCoder, which will interpret every - element written to the sink as a dictionary that will be JSON serialized - as a line in a file. This argument needs a value only in special cases - when writing table rows as dictionaries is not desirable. + table (str): The ID of the table. The ID must contain only letters + ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. If + **dataset** argument is :data:`None` then the table argument must + contain the entire table reference specified as: ``'DATASET.TABLE'`` or + ``'PROJECT:DATASET.TABLE'``. + dataset (str): The ID of the dataset containing this table or + :data:`None` if the table reference is specified entirely by the table + argument. + project (str): The ID of the project containing this table or + :data:`None` if the table reference is specified entirely by the table + argument. + schema (str): The schema to be used if the BigQuery table to write has + to be created. This can be either specified as a + :class:`~apache_beam.io.gcp.internal.clients.bigquery.\ +bigquery_v2_messages.TableSchema` object or a single string of the form + ``'field1:type1,field2:type2,field3:type3'`` that defines a comma + separated list of fields. Here ``'type'`` should specify the BigQuery + type of the field. Single string based schemas do not support nested + fields, repeated fields, or specifying a BigQuery mode for fields (mode + will always be set to ``'NULLABLE'``). + create_disposition (BigQueryDisposition): A string describing what + happens if the table does not exist. Possible values are: + + * :attr:`BigQueryDisposition.CREATE_IF_NEEDED`: create if does not + exist. + * :attr:`BigQueryDisposition.CREATE_NEVER`: fail the write if does not + exist. + + write_disposition (BigQueryDisposition): A string describing what + happens if the table has already some data. Possible values are: + + * :attr:`BigQueryDisposition.WRITE_TRUNCATE`: delete existing rows. + * :attr:`BigQueryDisposition.WRITE_APPEND`: add to existing rows. + * :attr:`BigQueryDisposition.WRITE_EMPTY`: fail the write if table not + empty. + + validate (bool): If :data:`True`, various checks will be done when sink + gets initialized (e.g., is table present given the disposition + arguments?). This should be :data:`True` for most scenarios in order to + catch errors as early as possible (pipeline construction instead of + pipeline execution). It should be :data:`False` if the table is created + during pipeline execution by a previous step. + coder (~apache_beam.coders.coders.Coder): The coder for the + table rows if serialized to disk. If :data:`None`, then the default + coder is :class:`~apache_beam.io.gcp.bigquery.RowAsDictJsonCoder`, + which will interpret every element written to the sink as a dictionary + that will be JSON serialized as a line in a file. This argument needs a + value only in special cases when writing table rows as dictionaries is + not desirable. Raises: - TypeError: if the schema argument is not a string or a TableSchema object. - ValueError: if the table reference as a string does not match the expected - format. + ~exceptions.TypeError: if the schema argument is not a :class:`str` or a + :class:`~apache_beam.io.gcp.internal.clients.bigquery.\ +bigquery_v2_messages.TableSchema` object. + ~exceptions.ValueError: if the table reference as a string does not + match the expected format. """ # Import here to avoid adding the dependency for local running scenarios. @@ -1261,32 +1281,47 @@ class WriteToBigQuery(PTransform): """Initialize a WriteToBigQuery transform. Args: - table: The ID of the table. The ID must contain only letters - (a-z, A-Z), numbers (0-9), or underscores (_). If dataset argument is - None then the table argument must contain the entire table reference - specified as: 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'. - dataset: The ID of the dataset containing this table or null if the table - reference is specified entirely by the table argument. - project: The ID of the project containing this table or null if the table - reference is specified entirely by the table argument. - schema: The schema to be used if the BigQuery table to write has to be - created. This can be either specified as a 'bigquery.TableSchema' object - or a single string of the form 'field1:type1,field2:type2,field3:type3' - that defines a comma separated list of fields. Here 'type' should - specify the BigQuery type of the field. Single string based schemas do - not support nested fields, repeated fields, or specifying a BigQuery - mode for fields (mode will always be set to 'NULLABLE'). - create_disposition: A string describing what happens if the table does not - exist. Possible values are: - - BigQueryDisposition.CREATE_IF_NEEDED: create if does not exist. - - BigQueryDisposition.CREATE_NEVER: fail the write if does not exist. - write_disposition: A string describing what happens if the table has - already some data. Possible values are: - - BigQueryDisposition.WRITE_TRUNCATE: delete existing rows. - - BigQueryDisposition.WRITE_APPEND: add to existing rows. - - BigQueryDisposition.WRITE_EMPTY: fail the write if table not empty. + table (str): The ID of the table. The ID must contain only letters + ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. If dataset + argument is :data:`None` then the table argument must contain the + entire table reference specified as: ``'DATASET.TABLE'`` or + ``'PROJECT:DATASET.TABLE'``. + dataset (str): The ID of the dataset containing this table or + :data:`None` if the table reference is specified entirely by the table + argument. + project (str): The ID of the project containing this table or + :data:`None` if the table reference is specified entirely by the table + argument. + schema (str): The schema to be used if the BigQuery table to write has to + be created. This can be either specified as a + :class:`~apache_beam.io.gcp.internal.clients.bigquery.\ +bigquery_v2_messages.TableSchema` + object or a single string of the form + ``'field1:type1,field2:type2,field3:type3'`` that defines a comma + separated list of fields. Here ``'type'`` should specify the BigQuery + type of the field. Single string based schemas do not support nested + fields, repeated fields, or specifying a BigQuery mode for fields + (mode will always be set to ``'NULLABLE'``). + create_disposition (BigQueryDisposition): A string describing what + happens if the table does not exist. Possible values are: + + * :attr:`BigQueryDisposition.CREATE_IF_NEEDED`: create if does not + exist. + * :attr:`BigQueryDisposition.CREATE_NEVER`: fail the write if does not + exist. + + write_disposition (BigQueryDisposition): A string describing what happens + if the table has already some data. Possible values are: + + * :attr:`BigQueryDisposition.WRITE_TRUNCATE`: delete existing rows. + * :attr:`BigQueryDisposition.WRITE_APPEND`: add to existing rows. + * :attr:`BigQueryDisposition.WRITE_EMPTY`: fail the write if table not + empty. + For streaming pipelines WriteTruncate can not be used. - batch_size: Number of rows to be written to BQ per streaming API insert. + + batch_size (int): Number of rows to be written to BQ per streaming API + insert. test_client: Override the default bigquery client used for testing. """ self.table_reference = _parse_table_reference(table, dataset, project) @@ -1300,14 +1335,20 @@ class WriteToBigQuery(PTransform): @staticmethod def get_table_schema_from_string(schema): - """Transform the string table schema into a bigquery.TableSchema instance. + """Transform the string table schema into a + :class:`~apache_beam.io.gcp.internal.clients.bigquery.\ +bigquery_v2_messages.TableSchema` instance. Args: - schema: The sting schema to be used if the BigQuery table to write has - to be created. + schema (str): The sting schema to be used if the BigQuery table to write + has to be created. + Returns: - table_schema: The schema to be used if the BigQuery table to write has - to be created but in the bigquery.TableSchema format. + ~apache_beam.io.gcp.internal.clients.bigquery.\ +bigquery_v2_messages.TableSchema: + The schema to be used if the BigQuery table to write has to be created + but in the :class:`~apache_beam.io.gcp.internal.clients.bigquery.\ +bigquery_v2_messages.TableSchema` format. """ table_schema = bigquery.TableSchema() schema_list = [s.strip() for s in schema.split(',')] @@ -1349,12 +1390,14 @@ class WriteToBigQuery(PTransform): """Transform the table schema into a dictionary instance. Args: - schema: The schema to be used if the BigQuery table to write has to be - created. This can either be a dict or string or in the TableSchema - format. + schema (~apache_beam.io.gcp.internal.clients.bigquery.\ +bigquery_v2_messages.TableSchema): + The schema to be used if the BigQuery table to write has to be created. + This can either be a dict or string or in the TableSchema format. + Returns: - table_schema: The schema to be used if the BigQuery table to write has - to be created but in the dictionary format. + Dict[str, Any]: The schema to be used if the BigQuery table to write has + to be created but in the dictionary format. """ if isinstance(schema, dict): return schema http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/gcp/gcsio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 643fbc7..ae71a5f 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -137,16 +137,16 @@ class GcsIO(object): """Open a GCS file path for reading or writing. Args: - filename: GCS file path in the form gs://<bucket>/<object>. - mode: 'r' for reading or 'w' for writing. - read_buffer_size: Buffer size to use during read operations. - mime_type: Mime type to set for write operations. + filename (str): GCS file path in the form ``gs://<bucket>/<object>``. + mode (str): ``'r'`` for reading or ``'w'`` for writing. + read_buffer_size (int): Buffer size to use during read operations. + mime_type (str): Mime type to set for write operations. Returns: - file object. + GCS file object. Raises: - ValueError: Invalid open file mode. + ~exceptions.ValueError: Invalid open file mode. """ if mode == 'r' or mode == 'rb': return GcsBufferedReader(self.client, filename, mode=mode, http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/range_trackers.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/range_trackers.py b/sdks/python/apache_beam/io/range_trackers.py index 4bd19f8..1339b91 100644 --- a/sdks/python/apache_beam/io/range_trackers.py +++ b/sdks/python/apache_beam/io/range_trackers.py @@ -317,17 +317,19 @@ class OrderedPositionRangeTracker(iobase.RangeTracker): class UnsplittableRangeTracker(iobase.RangeTracker): """A RangeTracker that always ignores split requests. - This can be used to make a given ``RangeTracker`` object unsplittable by - ignoring all calls to ``try_split()``. All other calls will be delegated to - the given ``RangeTracker``. + This can be used to make a given + :class:`~apache_beam.io.iobase.RangeTracker` object unsplittable by + ignoring all calls to :meth:`.try_split()`. All other calls will be delegated + to the given :class:`~apache_beam.io.iobase.RangeTracker`. """ def __init__(self, range_tracker): """Initializes UnsplittableRangeTracker. Args: - range_tracker: a ``RangeTracker`` to which all method calls expect calls - to ``try_split()`` will be delegated. + range_tracker (~apache_beam.io.iobase.RangeTracker): a + :class:`~apache_beam.io.iobase.RangeTracker` to which all method + calls expect calls to :meth:`.try_split()` will be delegated. """ assert isinstance(range_tracker, iobase.RangeTracker) self._range_tracker = range_tracker http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/source_test_utils.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/source_test_utils.py b/sdks/python/apache_beam/io/source_test_utils.py index a144a8a..bea9708 100644 --- a/sdks/python/apache_beam/io/source_test_utils.py +++ b/sdks/python/apache_beam/io/source_test_utils.py @@ -80,12 +80,13 @@ def read_from_source(source, start_position=None, stop_position=None): Only reads elements within the given position range. Args: - source: ``iobase.BoundedSource`` implementation. - start_position: start position for reading. - stop_position: stop position for reading. + source (~apache_beam.io.iobase.BoundedSource): + :class:`~apache_beam.io.iobase.BoundedSource` implementation. + start_position (int): start position for reading. + stop_position (int): stop position for reading. Returns: - the set of values read from the sources. + List[str]: the set of values read from the sources. """ values = [] range_tracker = source.get_range_tracker(start_position, stop_position) @@ -108,21 +109,25 @@ def _ThreadPool(threads): def assert_sources_equal_reference_source(reference_source_info, sources_info): """Tests if a reference source is equal to a given set of sources. - Given a reference source (a ``BoundedSource`` and a position range) and a - list of sources, assert that the union of the records - read from the list of sources is equal to the records read from the + Given a reference source (a :class:`~apache_beam.io.iobase.BoundedSource` + and a position range) and a list of sources, assert that the union of the + records read from the list of sources is equal to the records read from the reference source. Args: - reference_source_info: a three-tuple that gives the reference - ``iobase.BoundedSource``, position to start reading - at, and position to stop reading at. - sources_info: a set of sources. Each source is a three-tuple that is of - the same format described above. + reference_source_info\ + (Tuple[~apache_beam.io.iobase.BoundedSource, int, int]): + a three-tuple that gives the reference + :class:`~apache_beam.io.iobase.BoundedSource`, position to start + reading at, and position to stop reading at. + sources_info\ + (Iterable[Tuple[~apache_beam.io.iobase.BoundedSource, int, int]]): + a set of sources. Each source is a three-tuple that is of the same + format described above. Raises: - ValueError: if the set of data produced by the reference source and the - given set of sources are not equivalent. + ~exceptions.ValueError: if the set of data produced by the reference source + and the given set of sources are not equivalent. """ @@ -172,18 +177,20 @@ def assert_sources_equal_reference_source(reference_source_info, sources_info): def assert_reentrant_reads_succeed(source_info): """Tests if a given source can be read in a reentrant manner. - Assume that given source produces the set of values {v1, v2, v3, ... vn}. For - i in range [1, n-1] this method performs a reentrant read after reading i - elements and verifies that both the original and reentrant read produce the - expected set of values. + Assume that given source produces the set of values ``{v1, v2, v3, ... vn}``. + For ``i`` in range ``[1, n-1]`` this method performs a reentrant read after + reading ``i`` elements and verifies that both the original and reentrant read + produce the expected set of values. Args: - source_info: a three-tuple that gives the reference - ``iobase.BoundedSource``, position to start reading at, and a - position to stop reading at. + source_info (Tuple[~apache_beam.io.iobase.BoundedSource, int, int]): + a three-tuple that gives the reference + :class:`~apache_beam.io.iobase.BoundedSource`, position to start reading + at, and a position to stop reading at. + Raises: - ValueError: if source is too trivial or reentrant read result in an - incorrect read. + ~exceptions.ValueError: if source is too trivial or reentrant read result + in an incorrect read. """ source, start_position, stop_position = source_info @@ -228,21 +235,25 @@ def assert_split_at_fraction_behavior(source, num_items_to_read_before_split, split_fraction, expected_outcome): """Verifies the behaviour of splitting a source at a given fraction. - Asserts that splitting a ``BoundedSource`` either fails after reading - ``num_items_to_read_before_split`` items, or succeeds in a way that is - consistent according to ``assertSplitAtFractionSucceedsAndConsistent()``. + Asserts that splitting a :class:`~apache_beam.io.iobase.BoundedSource` either + fails after reading **num_items_to_read_before_split** items, or succeeds in + a way that is consistent according to + :func:`assert_split_at_fraction_succeeds_and_consistent()`. Args: - source: the source to perform dynamic splitting on. - num_items_to_read_before_split: number of items to read before splitting. - split_fraction: fraction to split at. - expected_outcome: a value from 'ExpectedSplitOutcome'. + source (~apache_beam.io.iobase.BoundedSource): the source to perform + dynamic splitting on. + num_items_to_read_before_split (int): number of items to read before + splitting. + split_fraction (float): fraction to split at. + expected_outcome (int): a value from + :class:`~apache_beam.io.source_test_utils.ExpectedSplitOutcome`. Returns: - a tuple that gives the number of items produced by reading the two ranges - produced after dynamic splitting. If splitting did not occur, the first - value of the tuple will represent the full set of records read by the - source while the second value of the tuple will be '-1'. + Tuple[int, int]: a tuple that gives the number of items produced by reading + the two ranges produced after dynamic splitting. If splitting did not + occur, the first value of the tuple will represent the full set of records + read by the source while the second value of the tuple will be ``-1``. """ assert isinstance(source, iobase.BoundedSource) expected_items = read_from_source(source, None, None) @@ -503,12 +514,13 @@ def assert_split_at_fraction_exhaustive( Verifies multi threaded splitting as well. Args: - source: the source to perform dynamic splitting on. - perform_multi_threaded_test: if true performs a multi-threaded test - otherwise this test is skipped. + source (~apache_beam.io.iobase.BoundedSource): the source to perform + dynamic splitting on. + perform_multi_threaded_test (bool): if :data:`True` performs a + multi-threaded test, otherwise this test is skipped. Raises: - ValueError: if the exhaustive splitting test fails. + ~exceptions.ValueError: if the exhaustive splitting test fails. """ expected_items = read_from_source(source, start_position, stop_position) http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/textio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py index 9c6532e..9708df7 100644 --- a/sdks/python/apache_beam/io/textio.py +++ b/sdks/python/apache_beam/io/textio.py @@ -417,13 +417,15 @@ class ReadAllFromText(PTransform): class ReadFromText(PTransform): - """A ``PTransform`` for reading text files. + r"""A :class:`~apache_beam.transforms.ptransform.PTransform` for reading text + files. Parses a text file as newline-delimited elements, by default assuming - UTF-8 encoding. Supports newline delimiters '\\n' and '\\r\\n'. + ``UTF-8`` encoding. Supports newline delimiters ``\n`` and ``\r\n``. - This implementation only supports reading text encoded using UTF-8 or ASCII. - This does not support other encodings such as UTF-16 or UTF-32. + This implementation only supports reading text encoded using ``UTF-8`` or + ``ASCII``. + This does not support other encodings such as ``UTF-16`` or ``UTF-32``. """ def __init__( self, @@ -435,26 +437,28 @@ class ReadFromText(PTransform): validate=True, skip_header_lines=0, **kwargs): - """Initialize the ``ReadFromText`` transform. + """Initialize the :class:`ReadFromText` transform. Args: - file_pattern: The file path to read from as a local file path or a GCS - ``gs://`` path. The path can contain glob characters - ``(*, ?, and [...] sets)``. - min_bundle_size: Minimum size of bundles that should be generated when - splitting this source into bundles. See ``FileBasedSource`` for more + file_pattern (str): The file path to read from as a local file path or a + GCS ``gs://`` path. The path can contain glob characters + (``*``, ``?``, and ``[...]`` sets). + min_bundle_size (int): Minimum size of bundles that should be generated + when splitting this source into bundles. See + :class:`~apache_beam.io.filebasedsource.FileBasedSource` for more details. - compression_type: Used to handle compressed input files. Typical value - is ``CompressionTypes.AUTO``, in which case the underlying file_path's - extension will be used to detect the compression. - strip_trailing_newlines: Indicates whether this source should remove - the newline char in each line it reads before decoding that line. - validate: flag to verify that the files exist during the pipeline + compression_type (str): Used to handle compressed input files. + Typical value is :attr:`CompressionTypes.AUTO + <apache_beam.io.filesystem.CompressionTypes.AUTO>`, in which case the + underlying file_path's extension will be used to detect the compression. + strip_trailing_newlines (bool): Indicates whether this source should + remove the newline char in each line it reads before decoding that line. + validate (bool): flag to verify that the files exist during the pipeline creation time. - skip_header_lines: Number of header lines to skip. Same number is skipped - from each source file. Must be 0 or higher. Large number of skipped - lines might impact performance. - coder: Coder used to decode each line. + skip_header_lines (int): Number of header lines to skip. Same number is + skipped from each source file. Must be 0 or higher. Large number of + skipped lines might impact performance. + coder (~apache_beam.coders.coders.Coder): Coder used to decode each line. """ super(ReadFromText, self).__init__(**kwargs) @@ -468,49 +472,54 @@ class ReadFromText(PTransform): class WriteToText(PTransform): - """A PTransform for writing to text files.""" + """A :class:`~apache_beam.transforms.ptransform.PTransform` for writing to + text files.""" - def __init__(self, - file_path_prefix, - file_name_suffix='', - append_trailing_newlines=True, - num_shards=0, - shard_name_template=None, - coder=coders.ToStringCoder(), - compression_type=CompressionTypes.AUTO, - header=None): - """Initialize a WriteToText PTransform. + def __init__( + self, + file_path_prefix, + file_name_suffix='', + append_trailing_newlines=True, + num_shards=0, + shard_name_template=None, + coder=coders.ToStringCoder(), + compression_type=CompressionTypes.AUTO, + header=None): + r"""Initialize a :class:`WriteToText` transform. Args: - file_path_prefix: The file path to write to. The files written will begin - with this prefix, followed by a shard identifier (see num_shards), and - end in a common extension, if given by file_name_suffix. In most cases, - only this argument is specified and num_shards, shard_name_template, and - file_name_suffix use default values. - file_name_suffix: Suffix for the files written. - append_trailing_newlines: indicate whether this sink should write an - additional newline char after writing each element. - num_shards: The number of files (shards) used for output. If not set, the - service will decide on the optimal number of shards. + file_path_prefix (str): The file path to write to. The files written will + begin with this prefix, followed by a shard identifier (see + **num_shards**), and end in a common extension, if given by + **file_name_suffix**. In most cases, only this argument is specified and + **num_shards**, **shard_name_template**, and **file_name_suffix** use + default values. + file_name_suffix (str): Suffix for the files written. + append_trailing_newlines (bool): indicate whether this sink should write + an additional newline char after writing each element. + num_shards (int): The number of files (shards) used for output. + If not set, the service will decide on the optimal number of shards. Constraining the number of shards is likely to reduce the performance of a pipeline. Setting this value is not recommended unless you require a specific number of output files. - shard_name_template: A template string containing placeholders for - the shard number and shard count. Currently only '' and - '-SSSSS-of-NNNNN' are patterns accepted by the service. + shard_name_template (str): A template string containing placeholders for + the shard number and shard count. Currently only ``''`` and + ``'-SSSSS-of-NNNNN'`` are patterns accepted by the service. When constructing a filename for a particular shard number, the - upper-case letters 'S' and 'N' are replaced with the 0-padded shard - number and shard count respectively. This argument can be '' in which - case it behaves as if num_shards was set to 1 and only one file will be - generated. The default pattern used is '-SSSSS-of-NNNNN'. - coder: Coder used to encode each line. - compression_type: Used to handle compressed output files. Typical value - is CompressionTypes.AUTO, in which case the final file path's - extension (as determined by file_path_prefix, file_name_suffix, - num_shards and shard_name_template) will be used to detect the - compression. - header: String to write at beginning of file as a header. If not None and - append_trailing_newlines is set, '\n' will be added. + upper-case letters ``S`` and ``N`` are replaced with the ``0``-padded + shard number and shard count respectively. This argument can be ``''`` + in which case it behaves as if num_shards was set to 1 and only one file + will be generated. The default pattern used is ``'-SSSSS-of-NNNNN'``. + coder (~apache_beam.coders.coders.Coder): Coder used to encode each line. + compression_type (str): Used to handle compressed output files. + Typical value is :class:`CompressionTypes.AUTO + <apache_beam.io.filesystem.CompressionTypes.AUTO>`, in which case the + final file path's extension (as determined by **file_path_prefix**, + **file_name_suffix**, **num_shards** and **shard_name_template**) will + be used to detect the compression. + header (str): String to write at beginning of file as a header. + If not :data:`None` and **append_trailing_newlines** is set, ``\n`` will + be added. """ self._sink = _TextSink(file_path_prefix, file_name_suffix, http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/pipeline.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index e7c2322..1ade6c0 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -15,17 +15,18 @@ # limitations under the License. # -"""Pipeline, the top-level Dataflow object. +"""Pipeline, the top-level Beam object. A pipeline holds a DAG of data transforms. Conceptually the nodes of the DAG -are transforms (PTransform objects) and the edges are values (mostly PCollection +are transforms (:class:`~apache_beam.transforms.ptransform.PTransform` objects) +and the edges are values (mostly :class:`~apache_beam.pvalue.PCollection` objects). The transforms take as inputs one or more PValues and output one or -more PValues. +more :class:`~apache_beam.pvalue.PValue` s. The pipeline offers functionality to traverse the graph. The actual operation to be executed for each node visited is specified through a runner object. -Typical usage: +Typical usage:: # Create a pipeline object using a local runner for execution. with beam.Pipeline('DirectRunner') as p: @@ -73,32 +74,40 @@ __all__ = ['Pipeline'] class Pipeline(object): - """A pipeline object that manages a DAG of PValues and their PTransforms. + """A pipeline object that manages a DAG of + :class:`~apache_beam.pvalue.PValue` s and their + :class:`~apache_beam.transforms.ptransform.PTransform` s. - Conceptually the PValues are the DAG's nodes and the PTransforms computing - the PValues are the edges. + Conceptually the :class:`~apache_beam.pvalue.PValue` s are the DAG's nodes and + the :class:`~apache_beam.transforms.ptransform.PTransform` s computing + the :class:`~apache_beam.pvalue.PValue` s are the edges. All the transforms applied to the pipeline must have distinct full labels. If same transform instance needs to be applied then the right shift operator - should be used to designate new names (e.g. `input | "label" >> my_tranform`). + should be used to designate new names + (e.g. ``input | "label" >> my_tranform``). """ def __init__(self, runner=None, options=None, argv=None): """Initialize a pipeline object. Args: - runner: An object of type 'PipelineRunner' that will be used to execute - the pipeline. For registered runners, the runner name can be specified, - otherwise a runner object must be supplied. - options: A configured 'PipelineOptions' object containing arguments - that should be used for running the Dataflow job. - argv: a list of arguments (such as sys.argv) to be used for building a - 'PipelineOptions' object. This will only be used if argument 'options' - is None. + runner (~apache_beam.runners.runner.PipelineRunner): An object of + type :class:`~apache_beam.runners.runner.PipelineRunner` that will be + used to execute the pipeline. For registered runners, the runner name + can be specified, otherwise a runner object must be supplied. + options (~apache_beam.options.pipeline_options.PipelineOptions): + A configured + :class:`~apache_beam.options.pipeline_options.PipelineOptions` object + containing arguments that should be used for running the Beam job. + argv (List[str]): a list of arguments (such as :data:`sys.argv`) + to be used for building a + :class:`~apache_beam.options.pipeline_options.PipelineOptions` object. + This will only be used if argument **options** is :data:`None`. Raises: - ValueError: if either the runner or options argument is not of the - expected type. + ~exceptions.ValueError: if either the runner or options argument is not + of the expected type. """ if options is not None: if isinstance(options, PipelineOptions): @@ -292,13 +301,15 @@ class Pipeline(object): def replace_all(self, replacements): """ Dynamically replaces PTransforms in the currently populated hierarchy. - Currently this only works for replacements where input and output types - are exactly the same. - TODO: Update this to also work for transform overrides where input and - output types are different. + Currently this only works for replacements where input and output types + are exactly the same. + + TODO: Update this to also work for transform overrides where input and + output types are different. Args: - replacements a list of PTransformOverride objects. + replacements (List[~apache_beam.pipeline.PTransformOverride]): a list of + :class:`~apache_beam.pipeline.PTransformOverride` objects. """ for override in replacements: assert isinstance(override, PTransformOverride) @@ -341,13 +352,16 @@ class Pipeline(object): Runner-internal implementation detail; no backwards-compatibility guarantees Args: - visitor: PipelineVisitor object whose callbacks will be called for each - node visited. See PipelineVisitor comments. + visitor (~apache_beam.pipeline.PipelineVisitor): + :class:`~apache_beam.pipeline.PipelineVisitor` object whose callbacks + will be called for each node visited. See + :class:`~apache_beam.pipeline.PipelineVisitor` comments. Raises: - TypeError: if node is specified and is not a PValue. - pipeline.PipelineError: if node is specified and does not belong to this - pipeline instance. + ~exceptions.TypeError: if node is specified and is not a + :class:`~apache_beam.pvalue.PValue`. + ~apache_beam.error.PipelineError: if node is specified and does not + belong to this pipeline instance. """ visited = set() @@ -357,15 +371,20 @@ class Pipeline(object): """Applies a custom transform using the pvalueish specified. Args: - transform: the PTranform to apply. - pvalueish: the input for the PTransform (typically a PCollection). - label: label of the PTransform. + transform (~apache_beam.transforms.ptransform.PTransform): the + :class:`~apache_beam.transforms.ptransform.PTransform` to apply. + pvalueish (~apache_beam.pvalue.PCollection): the input for the + :class:`~apache_beam.transforms.ptransform.PTransform` (typically a + :class:`~apache_beam.pvalue.PCollection`). + label (str): label of the + :class:`~apache_beam.transforms.ptransform.PTransform`. Raises: - TypeError: if the transform object extracted from the argument list is - not a PTransform. - RuntimeError: if the transform object was already applied to this pipeline - and needs to be cloned in order to apply again. + ~exceptions.TypeError: if the transform object extracted from the + argument list is not a + :class:`~apache_beam.transforms.ptransform.PTransform`. + ~exceptions.RuntimeError: if the transform object was already applied to + this pipeline and needs to be cloned in order to apply again. """ if isinstance(transform, ptransform._NamedPTransform): return self.apply(transform.transform, pvalueish, http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/runners/runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py index 7ce9a03..a3c6b34 100644 --- a/sdks/python/apache_beam/runners/runner.py +++ b/sdks/python/apache_beam/runners/runner.py @@ -283,10 +283,10 @@ class PValueCache(object): class PipelineState(object): - """State of the Pipeline, as returned by PipelineResult.state. + """State of the Pipeline, as returned by :attr:`PipelineResult.state`. This is meant to be the union of all the states any runner can put a - pipeline in. Currently, it represents the values of the dataflow + pipeline in. Currently, it represents the values of the dataflow API JobState enum. """ UNKNOWN = 'UNKNOWN' # not specified @@ -301,7 +301,7 @@ class PipelineState(object): class PipelineResult(object): - """A PipelineResult provides access to info about a pipeline.""" + """A :class:`PipelineResult` provides access to info about a pipeline.""" def __init__(self, state): self._state = state @@ -315,15 +315,18 @@ class PipelineResult(object): """Waits until the pipeline finishes and returns the final status. Args: - duration: The time to wait (in milliseconds) for job to finish. If it is - set to None, it will wait indefinitely until the job is finished. + duration (int): The time to wait (in milliseconds) for job to finish. + If it is set to :data:`None`, it will wait indefinitely until the job + is finished. Raises: - IOError: If there is a persistent problem getting job information. - NotImplementedError: If the runner does not support this operation. + ~exceptions.IOError: If there is a persistent problem getting job + information. + ~exceptions.NotImplementedError: If the runner does not support this + operation. Returns: - The final state of the pipeline, or None on timeout. + The final state of the pipeline, or :data:`None` on timeout. """ raise NotImplementedError @@ -331,8 +334,10 @@ class PipelineResult(object): """Cancels the pipeline execution. Raises: - IOError: If there is a persistent problem getting job information. - NotImplementedError: If the runner does not support this operation. + ~exceptions.IOError: If there is a persistent problem getting job + information. + ~exceptions.NotImplementedError: If the runner does not support this + operation. Returns: The final state of the pipeline. @@ -340,10 +345,12 @@ class PipelineResult(object): raise NotImplementedError def metrics(self): - """Returns MetricsResult object to query metrics from the runner. + """Returns :class:`~apache_beam.metrics.metric.MetricResults` object to + query metrics from the runner. Raises: - NotImplementedError: If the runner does not support this operation. + ~exceptions.NotImplementedError: If the runner does not support this + operation. """ raise NotImplementedError http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/testing/test_pipeline.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/testing/test_pipeline.py b/sdks/python/apache_beam/testing/test_pipeline.py index 13b1639..8380242 100644 --- a/sdks/python/apache_beam/testing/test_pipeline.py +++ b/sdks/python/apache_beam/testing/test_pipeline.py @@ -33,23 +33,23 @@ __all__ = [ class TestPipeline(Pipeline): - """TestPipeline class is used inside of Beam tests that can be configured to - run against pipeline runner. + """:class:`TestPipeline` class is used inside of Beam tests that can be + configured to run against pipeline runner. It has a functionality to parse arguments from command line and build pipeline options for tests who runs against a pipeline runner and utilizes resources of the pipeline runner. Those test functions are recommended to be tagged by - @attr("ValidatesRunner") annotation. + ``@attr("ValidatesRunner")`` annotation. In order to configure the test with customized pipeline options from command - line, system argument 'test-pipeline-options' can be used to obtains a list - of pipeline options. If no options specified, default value will be used. + line, system argument ``--test-pipeline-options`` can be used to obtains a + list of pipeline options. If no options specified, default value will be used. For example, use following command line to execute all ValidatesRunner tests:: - python setup.py nosetests -a ValidatesRunner \ - --test-pipeline-options="--runner=DirectRunner \ - --job_name=myJobName \ + python setup.py nosetests -a ValidatesRunner \\ + --test-pipeline-options="--runner=DirectRunner \\ + --job_name=myJobName \\ --num_workers=1" For example, use assert_that for test validation:: @@ -69,21 +69,27 @@ class TestPipeline(Pipeline): """Initialize a pipeline object for test. Args: - runner: An object of type 'PipelineRunner' that will be used to execute - the pipeline. For registered runners, the runner name can be specified, - otherwise a runner object must be supplied. - options: A configured 'PipelineOptions' object containing arguments - that should be used for running the pipeline job. - argv: A list of arguments (such as sys.argv) to be used for building a - 'PipelineOptions' object. This will only be used if argument 'options' - is None. - is_integration_test: True if the test is an integration test, False - otherwise. - blocking: Run method will wait until pipeline execution is completed. + runner (~apache_beam.runners.runner.PipelineRunner): An object of type + :class:`~apache_beam.runners.runner.PipelineRunner` that will be used + to execute the pipeline. For registered runners, the runner name can be + specified, otherwise a runner object must be supplied. + options (~apache_beam.options.pipeline_options.PipelineOptions): + A configured + :class:`~apache_beam.options.pipeline_options.PipelineOptions` + object containing arguments that should be used for running the + pipeline job. + argv (List[str]): A list of arguments (such as :data:`sys.argv`) to be + used for building a + :class:`~apache_beam.options.pipeline_options.PipelineOptions` object. + This will only be used if argument **options** is :data:`None`. + is_integration_test (bool): :data:`True` if the test is an integration + test, :data:`False` otherwise. + blocking (bool): Run method will wait until pipeline execution is + completed. Raises: - ValueError: if either the runner or options argument is not of the - expected type. + ~exceptions.ValueError: if either the runner or options argument is not + of the expected type. """ self.is_integration_test = is_integration_test self.options_list = self._parse_test_option_args(argv) http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 9018a49..d6f56d2 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -601,31 +601,35 @@ class CallableWrapperPartitionFn(PartitionFn): class ParDo(PTransformWithSideInputs): - """A ParDo transform. + """A :class:`ParDo` transform. - Processes an input PCollection by applying a DoFn to each element and - returning the accumulated results into an output PCollection. The type of the - elements is not fixed as long as the DoFn can deal with it. In reality - the type is restrained to some extent because the elements sometimes must be - persisted to external storage. See the expand() method comments for a detailed - description of all possible arguments. + Processes an input :class:`~apache_beam.pvalue.PCollection` by applying a + :class:`DoFn` to each element and returning the accumulated results into an + output :class:`~apache_beam.pvalue.PCollection`. The type of the elements is + not fixed as long as the :class:`DoFn` can deal with it. In reality the type + is restrained to some extent because the elements sometimes must be persisted + to external storage. See the :meth:`.expand()` method comments for a + detailed description of all possible arguments. - Note that the DoFn must return an iterable for each element of the input - PCollection. An easy way to do this is to use the yield keyword in the - process method. + Note that the :class:`DoFn` must return an iterable for each element of the + input :class:`~apache_beam.pvalue.PCollection`. An easy way to do this is to + use the ``yield`` keyword in the process method. Args: - pcoll: a PCollection to be processed. - fn: a DoFn object to be applied to each element of pcoll argument. - *args: positional arguments passed to the dofn object. - **kwargs: keyword arguments passed to the dofn object. + pcoll (~apache_beam.pvalue.PCollection): + a :class:`~apache_beam.pvalue.PCollection` to be processed. + fn (DoFn): a :class:`DoFn` object to be applied to each element + of **pcoll** argument. + *args: positional arguments passed to the :class:`DoFn` object. + **kwargs: keyword arguments passed to the :class:`DoFn` object. Note that the positional and keyword arguments will be processed in order - to detect PCollections that will be computed as side inputs to the - transform. During pipeline execution whenever the DoFn object gets executed - (its apply() method gets called) the PCollection arguments will be replaced - by values from the PCollection in the exact positions where they appear in - the argument lists. + to detect :class:`~apache_beam.pvalue.PCollection` s that will be computed as + side inputs to the transform. During pipeline execution whenever the + :class:`DoFn` object gets executed (its :meth:`DoFn.process()` method gets + called) the :class:`~apache_beam.pvalue.PCollection` arguments will be + replaced by values from the :class:`~apache_beam.pvalue.PCollection` in the + exact positions where they appear in the argument lists. """ def __init__(self, fn, *args, **kwargs): @@ -665,27 +669,34 @@ class ParDo(PTransformWithSideInputs): return pvalue.PCollection(pcoll.pipeline) def with_outputs(self, *tags, **main_kw): - """Returns a tagged tuple allowing access to the outputs of a ParDo. + """Returns a tagged tuple allowing access to the outputs of a + :class:`ParDo`. The resulting object supports access to the - PCollection associated with a tag (e.g., o.tag, o[tag]) and iterating over - the available tags (e.g., for tag in o: ...). + :class:`~apache_beam.pvalue.PCollection` associated with a tag + (e.g. ``o.tag``, ``o[tag]``) and iterating over the available tags + (e.g. ``for tag in o: ...``). Args: *tags: if non-empty, list of valid tags. If a list of valid tags is given, it will be an error to use an undeclared tag later in the pipeline. - **main_kw: dictionary empty or with one key 'main' defining the tag to be - used for the main output (which will not have a tag associated with it). + **main_kw: dictionary empty or with one key ``'main'`` defining the tag to + be used for the main output (which will not have a tag associated with + it). Returns: - An object of type DoOutputsTuple that bundles together all the outputs - of a ParDo transform and allows accessing the individual - PCollections for each output using an object.tag syntax. + ~apache_beam.pvalue.DoOutputsTuple: An object of type + :class:`~apache_beam.pvalue.DoOutputsTuple` that bundles together all + the outputs of a :class:`ParDo` transform and allows accessing the + individual :class:`~apache_beam.pvalue.PCollection` s for each output + using an ``object.tag`` syntax. Raises: - TypeError: if the self object is not a PCollection that is the result of - a ParDo transform. - ValueError: if main_kw contains any key other than 'main'. + ~exceptions.TypeError: if the **self** object is not a + :class:`~apache_beam.pvalue.PCollection` that is the result of a + :class:`ParDo` transform. + ~exceptions.ValueError: if **main_kw** contains any key other than + ``'main'``. """ main_tag = main_kw.pop('main', None) if main_kw: @@ -739,24 +750,27 @@ class _MultiParDo(PTransform): def FlatMap(fn, *args, **kwargs): # pylint: disable=invalid-name - """FlatMap is like ParDo except it takes a callable to specify the - transformation. + """:func:`FlatMap` is like :class:`ParDo` except it takes a callable to + specify the transformation. The callable must return an iterable for each element of the input - PCollection. The elements of these iterables will be flattened into - the output PCollection. + :class:`~apache_beam.pvalue.PCollection`. The elements of these iterables will + be flattened into the output :class:`~apache_beam.pvalue.PCollection`. Args: - fn: a callable object. + fn (callable): a callable object. *args: positional arguments passed to the transform callable. **kwargs: keyword arguments passed to the transform callable. Returns: - A PCollection containing the Map outputs. + ~apache_beam.pvalue.PCollection: + A :class:`~apache_beam.pvalue.PCollection` containing the + :func:`FlatMap` outputs. Raises: - TypeError: If the fn passed as argument is not a callable. Typical error - is to pass a DoFn instance which is supported only for ParDo. + ~exceptions.TypeError: If the **fn** passed as argument is not a callable. + Typical error is to pass a :class:`DoFn` instance which is supported only + for :class:`ParDo`. """ label = 'FlatMap(%s)' % ptransform.label_from_callable(fn) if not callable(fn): @@ -770,19 +784,23 @@ def FlatMap(fn, *args, **kwargs): # pylint: disable=invalid-name def Map(fn, *args, **kwargs): # pylint: disable=invalid-name - """Map is like FlatMap except its callable returns only a single element. + """:func:`Map` is like :func:`FlatMap` except its callable returns only a + single element. Args: - fn: a callable object. + fn (callable): a callable object. *args: positional arguments passed to the transform callable. **kwargs: keyword arguments passed to the transform callable. Returns: - A PCollection containing the Map outputs. + ~apache_beam.pvalue.PCollection: + A :class:`~apache_beam.pvalue.PCollection` containing the + :func:`Map` outputs. Raises: - TypeError: If the fn passed as argument is not a callable. Typical error - is to pass a DoFn instance which is supported only for ParDo. + ~exceptions.TypeError: If the **fn** passed as argument is not a callable. + Typical error is to pass a :class:`DoFn` instance which is supported only + for :class:`ParDo`. """ if not callable(fn): raise TypeError( @@ -815,19 +833,23 @@ def Map(fn, *args, **kwargs): # pylint: disable=invalid-name def Filter(fn, *args, **kwargs): # pylint: disable=invalid-name - """Filter is a FlatMap with its callable filtering out elements. + """:func:`Filter` is a :func:`FlatMap` with its callable filtering out + elements. Args: - fn: a callable object. + fn (callable): a callable object. *args: positional arguments passed to the transform callable. **kwargs: keyword arguments passed to the transform callable. Returns: - A PCollection containing the Filter outputs. + ~apache_beam.pvalue.PCollection: + A :class:`~apache_beam.pvalue.PCollection` containing the + :func:`Filter` outputs. Raises: - TypeError: If the fn passed as argument is not a callable. Typical error - is to pass a DoFn instance which is supported only for FlatMap. + ~exceptions.TypeError: If the **fn** passed as argument is not a callable. + Typical error is to pass a :class:`DoFn` instance which is supported only + for :class:`ParDo`. """ if not callable(fn): raise TypeError( @@ -867,35 +889,42 @@ def _combine_payload(combine_fn, context): class CombineGlobally(PTransform): - """A CombineGlobally transform. + """A :class:`CombineGlobally` transform. - Reduces a PCollection to a single value by progressively applying a CombineFn - to portions of the PCollection (and to intermediate values created thereby). - See documentation in CombineFn for details on the specifics on how CombineFns - are applied. + Reduces a :class:`~apache_beam.pvalue.PCollection` to a single value by + progressively applying a :class:`CombineFn` to portions of the + :class:`~apache_beam.pvalue.PCollection` (and to intermediate values created + thereby). See documentation in :class:`CombineFn` for details on the specifics + on how :class:`CombineFn` s are applied. Args: - pcoll: a PCollection to be reduced into a single value. - fn: a CombineFn object that will be called to progressively reduce the - PCollection into single values, or a callable suitable for wrapping - by CallableWrapperCombineFn. - *args: positional arguments passed to the CombineFn object. - **kwargs: keyword arguments passed to the CombineFn object. + pcoll (~apache_beam.pvalue.PCollection): + a :class:`~apache_beam.pvalue.PCollection` to be reduced into a single + value. + fn (callable): a :class:`CombineFn` object that will be called to + progressively reduce the :class:`~apache_beam.pvalue.PCollection` into + single values, or a callable suitable for wrapping by + :class:`~apache_beam.transforms.core.CallableWrapperCombineFn`. + *args: positional arguments passed to the :class:`CombineFn` object. + **kwargs: keyword arguments passed to the :class:`CombineFn` object. Raises: - TypeError: If the output type of the input PCollection is not compatible - with Iterable[A]. + ~exceptions.TypeError: If the output type of the input + :class:`~apache_beam.pvalue.PCollection` is not compatible + with ``Iterable[A]``. Returns: - A single-element PCollection containing the main output of the Combine - transform. + ~apache_beam.pvalue.PCollection: A single-element + :class:`~apache_beam.pvalue.PCollection` containing the main output of + the :class:`CombineGlobally` transform. Note that the positional and keyword arguments will be processed in order - to detect PObjects that will be computed as side inputs to the transform. - During pipeline execution whenever the CombineFn object gets executed (i.e., - any of the CombineFn methods get called), the PObject arguments will be - replaced by their actual value in the exact position where they appear in - the argument lists. + to detect :class:`~apache_beam.pvalue.PValue` s that will be computed as side + inputs to the transform. + During pipeline execution whenever the :class:`CombineFn` object gets executed + (i.e. any of the :class:`CombineFn` methods get called), the + :class:`~apache_beam.pvalue.PValue` arguments will be replaced by their + actual value in the exact position where they appear in the argument lists. """ has_defaults = True as_view = False