Move all files to apache_beam folder
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b14dfadd Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b14dfadd Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b14dfadd Branch: refs/heads/python-sdk Commit: b14dfadd1f414063eb0710eae8237eb2fa9c8a2f Parents: e507928 Author: Silviu Calinoiu <silv...@google.com> Authored: Tue Jun 14 08:49:04 2016 -0700 Committer: Silviu Calinoiu <silv...@google.com> Committed: Tue Jun 14 12:07:07 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/__init__.py | 78 + sdks/python/apache_beam/coders/__init__.py | 16 + sdks/python/apache_beam/coders/coder_impl.pxd | 109 + sdks/python/apache_beam/coders/coder_impl.py | 316 ++ sdks/python/apache_beam/coders/coders.py | 511 +++ sdks/python/apache_beam/coders/coders_test.py | 60 + .../apache_beam/coders/coders_test_common.py | 180 ++ .../apache_beam/coders/fast_coders_test.py | 34 + sdks/python/apache_beam/coders/observable.py | 33 + .../apache_beam/coders/observable_test.py | 54 + .../apache_beam/coders/slow_coders_test.py | 36 + sdks/python/apache_beam/coders/slow_stream.py | 136 + sdks/python/apache_beam/coders/stream.pxd | 58 + sdks/python/apache_beam/coders/stream.pyx | 201 ++ sdks/python/apache_beam/coders/stream_test.py | 168 + sdks/python/apache_beam/coders/typecoders.py | 154 + .../apache_beam/coders/typecoders_test.py | 114 + sdks/python/apache_beam/dataflow_test.py | 405 +++ sdks/python/apache_beam/error.py | 39 + sdks/python/apache_beam/examples/__init__.py | 0 .../examples/complete/autocomplete.py | 79 + .../examples/complete/autocomplete_test.py | 78 + .../examples/complete/estimate_pi.py | 109 + .../examples/complete/estimate_pi_test.py | 46 + .../complete/juliaset/juliaset/__init__.py | 0 .../complete/juliaset/juliaset/juliaset.py | 119 + .../complete/juliaset/juliaset/juliaset_test.py | 83 + .../examples/complete/juliaset/juliaset_main.py | 55 + .../examples/complete/juliaset/setup.py | 115 + .../apache_beam/examples/complete/tfidf.py | 196 ++ .../apache_beam/examples/complete/tfidf_test.py | 88 + .../examples/complete/top_wikipedia_sessions.py | 170 + .../complete/top_wikipedia_sessions_test.py | 58 + .../examples/cookbook/bigquery_schema.py | 127 + .../examples/cookbook/bigquery_side_input.py | 114 + .../cookbook/bigquery_side_input_test.py | 59 + .../examples/cookbook/bigquery_tornadoes.py | 96 + .../cookbook/bigquery_tornadoes_test.py | 41 + .../apache_beam/examples/cookbook/bigshuffle.py | 84 + .../examples/cookbook/bigshuffle_test.py | 61 + .../apache_beam/examples/cookbook/coders.py | 92 + .../examples/cookbook/coders_test.py | 56 + .../examples/cookbook/combiners_test.py | 73 + .../examples/cookbook/custom_ptransform.py | 132 + .../examples/cookbook/custom_ptransform_test.py | 64 + .../apache_beam/examples/cookbook/filters.py | 104 + .../examples/cookbook/filters_test.py | 65 + .../examples/cookbook/group_with_coder.py | 111 + .../examples/cookbook/group_with_coder_test.py | 87 + .../examples/cookbook/mergecontacts.py | 126 + .../examples/cookbook/mergecontacts_test.py | 121 + .../examples/cookbook/multiple_output_pardo.py | 171 + .../cookbook/multiple_output_pardo_test.py | 69 + .../apache_beam/examples/snippets/snippets.py | 872 +++++ .../examples/snippets/snippets_test.py | 560 ++++ .../apache_beam/examples/streaming_wordcap.py | 61 + .../apache_beam/examples/streaming_wordcount.py | 71 + sdks/python/apache_beam/examples/wordcount.py | 99 + .../apache_beam/examples/wordcount_debugging.py | 154 + .../examples/wordcount_debugging_test.py | 56 + .../apache_beam/examples/wordcount_minimal.py | 111 + .../examples/wordcount_minimal_test.py | 56 + .../apache_beam/examples/wordcount_test.py | 55 + sdks/python/apache_beam/internal/__init__.py | 0 sdks/python/apache_beam/internal/apiclient.py | 935 ++++++ .../apache_beam/internal/apiclient_test.py | 110 + sdks/python/apache_beam/internal/auth.py | 161 + .../apache_beam/internal/clients/__init__.py | 0 .../internal/clients/bigquery/__init__.py | 10 + .../clients/bigquery/bigquery_v2_client.py | 642 ++++ .../clients/bigquery/bigquery_v2_messages.py | 1893 +++++++++++ .../internal/clients/dataflow/__init__.py | 10 + .../clients/dataflow/dataflow_v1b3_client.py | 316 ++ .../clients/dataflow/dataflow_v1b3_messages.py | 3056 ++++++++++++++++++ .../internal/clients/storage/__init__.py | 10 + .../clients/storage/storage_v1_client.py | 1021 ++++++ .../clients/storage/storage_v1_messages.py | 1903 +++++++++++ sdks/python/apache_beam/internal/json_value.py | 127 + .../apache_beam/internal/json_value_test.py | 63 + sdks/python/apache_beam/internal/module_test.py | 59 + sdks/python/apache_beam/internal/pickler.py | 205 ++ .../python/apache_beam/internal/pickler_test.py | 78 + sdks/python/apache_beam/internal/util.py | 90 + sdks/python/apache_beam/internal/util_test.py | 58 + .../python/apache_beam/internal/windmill_pb2.py | 2275 +++++++++++++ .../internal/windmill_service_pb2.py | 161 + sdks/python/apache_beam/io/__init__.py | 25 + sdks/python/apache_beam/io/bigquery.py | 826 +++++ sdks/python/apache_beam/io/bigquery_test.py | 450 +++ sdks/python/apache_beam/io/fileio.py | 747 +++++ sdks/python/apache_beam/io/fileio_test.py | 522 +++ sdks/python/apache_beam/io/gcsio.py | 602 ++++ sdks/python/apache_beam/io/gcsio_test.py | 503 +++ sdks/python/apache_beam/io/iobase.py | 1073 ++++++ sdks/python/apache_beam/io/pubsub.py | 73 + sdks/python/apache_beam/io/range_trackers.py | 270 ++ .../apache_beam/io/range_trackers_test.py | 318 ++ sdks/python/apache_beam/io/sources_test.py | 65 + sdks/python/apache_beam/pipeline.py | 435 +++ sdks/python/apache_beam/pipeline_test.py | 345 ++ sdks/python/apache_beam/pvalue.py | 459 +++ sdks/python/apache_beam/pvalue_test.py | 63 + sdks/python/apache_beam/python_sdk_releases.py | 53 + sdks/python/apache_beam/runners/__init__.py | 24 + sdks/python/apache_beam/runners/common.pxd | 28 + sdks/python/apache_beam/runners/common.py | 181 ++ .../apache_beam/runners/dataflow_runner.py | 639 ++++ .../python/apache_beam/runners/direct_runner.py | 326 ++ sdks/python/apache_beam/runners/runner.py | 305 ++ sdks/python/apache_beam/runners/runner_test.py | 66 + sdks/python/apache_beam/transforms/__init__.py | 23 + .../python/apache_beam/transforms/aggregator.py | 105 + .../apache_beam/transforms/aggregator_test.py | 73 + sdks/python/apache_beam/transforms/combiners.py | 523 +++ .../apache_beam/transforms/combiners_test.py | 225 ++ sdks/python/apache_beam/transforms/core.py | 1292 ++++++++ .../apache_beam/transforms/cy_combiners.pxd | 89 + .../apache_beam/transforms/cy_combiners.py | 250 ++ .../python/apache_beam/transforms/ptransform.py | 703 ++++ .../apache_beam/transforms/ptransform_test.py | 1814 +++++++++++ .../python/apache_beam/transforms/sideinputs.py | 145 + sdks/python/apache_beam/transforms/timeutil.py | 310 ++ .../apache_beam/transforms/timeutil_test.py | 165 + sdks/python/apache_beam/transforms/trigger.py | 958 ++++++ .../apache_beam/transforms/trigger_test.py | 566 ++++ .../transforms/trigger_transcripts.yaml | 207 ++ sdks/python/apache_beam/transforms/util.py | 227 ++ sdks/python/apache_beam/transforms/window.py | 383 +++ .../apache_beam/transforms/window_test.py | 201 ++ .../transforms/write_ptransform_test.py | 124 + sdks/python/apache_beam/typehints/__init__.py | 19 + sdks/python/apache_beam/typehints/decorators.py | 530 +++ sdks/python/apache_beam/typehints/opcodes.py | 331 ++ .../apache_beam/typehints/trivial_inference.py | 415 +++ .../typehints/trivial_inference_test.py | 148 + sdks/python/apache_beam/typehints/typecheck.py | 161 + .../typehints/typed_pipeline_test.py | 248 ++ sdks/python/apache_beam/typehints/typehints.py | 1054 ++++++ .../apache_beam/typehints/typehints_test.py | 1053 ++++++ sdks/python/apache_beam/utils/__init__.py | 19 + sdks/python/apache_beam/utils/counters.pxd | 27 + sdks/python/apache_beam/utils/counters.py | 180 ++ sdks/python/apache_beam/utils/dependency.py | 439 +++ .../python/apache_beam/utils/dependency_test.py | 394 +++ sdks/python/apache_beam/utils/names.py | 75 + sdks/python/apache_beam/utils/options.py | 486 +++ sdks/python/apache_beam/utils/path.py | 44 + sdks/python/apache_beam/utils/path_test.py | 67 + .../apache_beam/utils/pipeline_options_test.py | 104 + .../utils/pipeline_options_validator.py | 166 + .../utils/pipeline_options_validator_test.py | 234 ++ sdks/python/apache_beam/utils/processes.py | 49 + sdks/python/apache_beam/utils/processes_test.py | 103 + sdks/python/apache_beam/utils/profiler.py | 66 + sdks/python/apache_beam/utils/retry.py | 194 ++ sdks/python/apache_beam/utils/retry_test.py | 165 + sdks/python/apache_beam/version.py | 17 + sdks/python/google/cloud/dataflow/__init__.py | 78 - .../google/cloud/dataflow/coders/__init__.py | 16 - .../google/cloud/dataflow/coders/coder_impl.pxd | 109 - .../google/cloud/dataflow/coders/coder_impl.py | 316 -- .../google/cloud/dataflow/coders/coders.py | 511 --- .../google/cloud/dataflow/coders/coders_test.py | 60 - .../cloud/dataflow/coders/coders_test_common.py | 180 -- .../cloud/dataflow/coders/fast_coders_test.py | 34 - .../google/cloud/dataflow/coders/observable.py | 33 - .../cloud/dataflow/coders/observable_test.py | 54 - .../cloud/dataflow/coders/slow_coders_test.py | 36 - .../google/cloud/dataflow/coders/slow_stream.py | 136 - .../google/cloud/dataflow/coders/stream.pxd | 58 - .../google/cloud/dataflow/coders/stream.pyx | 201 -- .../google/cloud/dataflow/coders/stream_test.py | 168 - .../google/cloud/dataflow/coders/typecoders.py | 154 - .../cloud/dataflow/coders/typecoders_test.py | 114 - .../google/cloud/dataflow/dataflow_test.py | 405 --- sdks/python/google/cloud/dataflow/error.py | 39 - .../google/cloud/dataflow/examples/__init__.py | 0 .../dataflow/examples/complete/autocomplete.py | 79 - .../examples/complete/autocomplete_test.py | 78 - .../dataflow/examples/complete/estimate_pi.py | 109 - .../examples/complete/estimate_pi_test.py | 46 - .../complete/juliaset/juliaset/__init__.py | 0 .../complete/juliaset/juliaset/juliaset.py | 119 - .../complete/juliaset/juliaset/juliaset_test.py | 83 - .../examples/complete/juliaset/juliaset_main.py | 55 - .../examples/complete/juliaset/setup.py | 115 - .../cloud/dataflow/examples/complete/tfidf.py | 196 -- .../dataflow/examples/complete/tfidf_test.py | 88 - .../examples/complete/top_wikipedia_sessions.py | 170 - .../complete/top_wikipedia_sessions_test.py | 58 - .../examples/cookbook/bigquery_schema.py | 127 - .../examples/cookbook/bigquery_side_input.py | 114 - .../cookbook/bigquery_side_input_test.py | 59 - .../examples/cookbook/bigquery_tornadoes.py | 96 - .../cookbook/bigquery_tornadoes_test.py | 41 - .../dataflow/examples/cookbook/bigshuffle.py | 84 - .../examples/cookbook/bigshuffle_test.py | 61 - .../cloud/dataflow/examples/cookbook/coders.py | 92 - .../dataflow/examples/cookbook/coders_test.py | 56 - .../examples/cookbook/combiners_test.py | 73 - .../examples/cookbook/custom_ptransform.py | 132 - .../examples/cookbook/custom_ptransform_test.py | 64 - .../cloud/dataflow/examples/cookbook/filters.py | 104 - .../dataflow/examples/cookbook/filters_test.py | 65 - .../examples/cookbook/group_with_coder.py | 111 - .../examples/cookbook/group_with_coder_test.py | 87 - .../dataflow/examples/cookbook/mergecontacts.py | 126 - .../examples/cookbook/mergecontacts_test.py | 121 - .../examples/cookbook/multiple_output_pardo.py | 171 - .../cookbook/multiple_output_pardo_test.py | 69 - .../dataflow/examples/snippets/snippets.py | 872 ----- .../dataflow/examples/snippets/snippets_test.py | 560 ---- .../dataflow/examples/streaming_wordcap.py | 61 - .../dataflow/examples/streaming_wordcount.py | 71 - .../google/cloud/dataflow/examples/wordcount.py | 99 - .../dataflow/examples/wordcount_debugging.py | 154 - .../examples/wordcount_debugging_test.py | 56 - .../dataflow/examples/wordcount_minimal.py | 111 - .../dataflow/examples/wordcount_minimal_test.py | 56 - .../cloud/dataflow/examples/wordcount_test.py | 55 - .../google/cloud/dataflow/internal/__init__.py | 0 .../google/cloud/dataflow/internal/apiclient.py | 935 ------ .../cloud/dataflow/internal/apiclient_test.py | 110 - .../google/cloud/dataflow/internal/auth.py | 161 - .../cloud/dataflow/internal/clients/__init__.py | 0 .../internal/clients/bigquery/__init__.py | 10 - .../clients/bigquery/bigquery_v2_client.py | 642 ---- .../clients/bigquery/bigquery_v2_messages.py | 1893 ----------- .../internal/clients/dataflow/__init__.py | 10 - .../clients/dataflow/dataflow_v1b3_client.py | 316 -- .../clients/dataflow/dataflow_v1b3_messages.py | 3056 ------------------ .../internal/clients/storage/__init__.py | 10 - .../clients/storage/storage_v1_client.py | 1021 ------ .../clients/storage/storage_v1_messages.py | 1903 ----------- .../cloud/dataflow/internal/json_value.py | 127 - .../cloud/dataflow/internal/json_value_test.py | 63 - .../cloud/dataflow/internal/module_test.py | 59 - .../google/cloud/dataflow/internal/pickler.py | 205 -- .../cloud/dataflow/internal/pickler_test.py | 78 - .../google/cloud/dataflow/internal/util.py | 90 - .../google/cloud/dataflow/internal/util_test.py | 58 - .../cloud/dataflow/internal/windmill_pb2.py | 2275 ------------- .../dataflow/internal/windmill_service_pb2.py | 161 - .../python/google/cloud/dataflow/io/__init__.py | 25 - .../python/google/cloud/dataflow/io/bigquery.py | 826 ----- .../google/cloud/dataflow/io/bigquery_test.py | 450 --- sdks/python/google/cloud/dataflow/io/fileio.py | 747 ----- .../google/cloud/dataflow/io/fileio_test.py | 522 --- sdks/python/google/cloud/dataflow/io/gcsio.py | 602 ---- .../google/cloud/dataflow/io/gcsio_test.py | 503 --- sdks/python/google/cloud/dataflow/io/iobase.py | 1073 ------ sdks/python/google/cloud/dataflow/io/pubsub.py | 73 - .../google/cloud/dataflow/io/range_trackers.py | 270 -- .../cloud/dataflow/io/range_trackers_test.py | 318 -- .../google/cloud/dataflow/io/sources_test.py | 65 - sdks/python/google/cloud/dataflow/pipeline.py | 435 --- .../google/cloud/dataflow/pipeline_test.py | 345 -- sdks/python/google/cloud/dataflow/pvalue.py | 459 --- .../python/google/cloud/dataflow/pvalue_test.py | 63 - .../cloud/dataflow/python_sdk_releases.py | 53 - .../google/cloud/dataflow/runners/__init__.py | 24 - .../google/cloud/dataflow/runners/common.pxd | 28 - .../google/cloud/dataflow/runners/common.py | 181 -- .../cloud/dataflow/runners/dataflow_runner.py | 639 ---- .../cloud/dataflow/runners/direct_runner.py | 326 -- .../google/cloud/dataflow/runners/runner.py | 305 -- .../cloud/dataflow/runners/runner_test.py | 66 - .../cloud/dataflow/transforms/__init__.py | 23 - .../cloud/dataflow/transforms/aggregator.py | 105 - .../dataflow/transforms/aggregator_test.py | 73 - .../cloud/dataflow/transforms/combiners.py | 523 --- .../cloud/dataflow/transforms/combiners_test.py | 225 -- .../google/cloud/dataflow/transforms/core.py | 1292 -------- .../cloud/dataflow/transforms/cy_combiners.pxd | 89 - .../cloud/dataflow/transforms/cy_combiners.py | 250 -- .../cloud/dataflow/transforms/ptransform.py | 703 ---- .../dataflow/transforms/ptransform_test.py | 1814 ----------- .../cloud/dataflow/transforms/sideinputs.py | 145 - .../cloud/dataflow/transforms/timeutil.py | 310 -- .../cloud/dataflow/transforms/timeutil_test.py | 165 - .../google/cloud/dataflow/transforms/trigger.py | 958 ------ .../cloud/dataflow/transforms/trigger_test.py | 566 ---- .../transforms/trigger_transcripts.yaml | 207 -- .../google/cloud/dataflow/transforms/util.py | 227 -- .../google/cloud/dataflow/transforms/window.py | 383 --- .../cloud/dataflow/transforms/window_test.py | 201 -- .../transforms/write_ptransform_test.py | 124 - .../google/cloud/dataflow/typehints/__init__.py | 19 - .../cloud/dataflow/typehints/decorators.py | 530 --- .../google/cloud/dataflow/typehints/opcodes.py | 331 -- .../dataflow/typehints/trivial_inference.py | 415 --- .../typehints/trivial_inference_test.py | 148 - .../cloud/dataflow/typehints/typecheck.py | 161 - .../dataflow/typehints/typed_pipeline_test.py | 248 -- .../cloud/dataflow/typehints/typehints.py | 1054 ------ .../cloud/dataflow/typehints/typehints_test.py | 1053 ------ .../google/cloud/dataflow/utils/__init__.py | 19 - .../google/cloud/dataflow/utils/counters.pxd | 27 - .../google/cloud/dataflow/utils/counters.py | 180 -- .../google/cloud/dataflow/utils/dependency.py | 439 --- .../cloud/dataflow/utils/dependency_test.py | 394 --- .../python/google/cloud/dataflow/utils/names.py | 75 - .../google/cloud/dataflow/utils/options.py | 486 --- sdks/python/google/cloud/dataflow/utils/path.py | 44 - .../google/cloud/dataflow/utils/path_test.py | 67 - .../dataflow/utils/pipeline_options_test.py | 104 - .../utils/pipeline_options_validator.py | 166 - .../utils/pipeline_options_validator_test.py | 234 -- .../google/cloud/dataflow/utils/processes.py | 49 - .../cloud/dataflow/utils/processes_test.py | 103 - .../google/cloud/dataflow/utils/profiler.py | 66 - .../python/google/cloud/dataflow/utils/retry.py | 194 -- .../google/cloud/dataflow/utils/retry_test.py | 165 - sdks/python/google/cloud/dataflow/version.py | 17 - 314 files changed, 44598 insertions(+), 44598 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/__init__.py b/sdks/python/apache_beam/__init__.py new file mode 100644 index 0000000..af28d3a --- /dev/null +++ b/sdks/python/apache_beam/__init__.py @@ -0,0 +1,78 @@ +# Copyright 2016 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Google Cloud Dataflow SDK for Python. + +Google Cloud Dataflow <http://cloud.google.com/dataflow/> +provides a simple, powerful programming model for building both batch +and streaming parallel data processing pipelines. + +The Dataflow SDK for Python provides access to Dataflow capabilities +from the Python programming language. + +Status +------ +The SDK is still early in its development, and significant changes +should be expected before the first stable version. + +Overview +-------- +The key concepts in this programming model are + +* PCollection: represents a collection of data, which could be + bounded or unbounded in size. +* PTransform: represents a computation that transforms input + PCollections into output PCollections. +* Pipeline: manages a directed acyclic graph of PTransforms and + PCollections that is ready for execution. +* Runner: specifies where and how the Pipeline should execute. +* Reading and Writing Data: your pipeline can read from an external + source and write to an external data sink. + +Typical usage +------------- +At the top of your source file:: + + import google.cloud.dataflow as df + +After this import statement + +* transform classes are available as df.FlatMap, df.GroupByKey, etc. +* Pipeline class is available as df.Pipeline +* text source/sink classes are available as df.io.TextFileSource, + df.io.TextFileSink + +Examples +-------- +The examples subdirectory has some examples. + +""" + + +import sys + + +if sys.version_info.major != 2: + raise RuntimeError( + 'Dataflow SDK for Python is supported only on Python 2.7. ' + 'It is not supported on Python [%s].' % sys.version) + + +import google.cloud.dataflow.internal.pickler + +from google.cloud.dataflow import coders +from google.cloud.dataflow import io +from google.cloud.dataflow import typehints +from google.cloud.dataflow.pipeline import Pipeline +from google.cloud.dataflow.transforms import * http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/__init__.py b/sdks/python/apache_beam/coders/__init__.py new file mode 100644 index 0000000..610a6ef --- /dev/null +++ b/sdks/python/apache_beam/coders/__init__.py @@ -0,0 +1,16 @@ +# Copyright 2016 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from google.cloud.dataflow.coders.coders import * +from google.cloud.dataflow.coders.typecoders import registry http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/coder_impl.pxd ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd b/sdks/python/apache_beam/coders/coder_impl.pxd new file mode 100644 index 0000000..663d37d --- /dev/null +++ b/sdks/python/apache_beam/coders/coder_impl.pxd @@ -0,0 +1,109 @@ +# Copyright 2016 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# cython: profile=True + +cimport cython + +cimport cpython.ref +cimport cpython.tuple +cimport libc.stdint +cimport libc.stdlib +cimport libc.string + +from .stream cimport InputStream, OutputStream + + +cdef object loads, dumps, create_InputStream, create_OutputStream +cdef type WindowedValue + + +cdef class CoderImpl(object): + cpdef encode_to_stream(self, value, OutputStream stream, bint nested) + cpdef decode_from_stream(self, InputStream stream, bint nested) + cpdef bytes encode(self, value) + cpdef decode(self, bytes encoded) + + +cdef class SimpleCoderImpl(CoderImpl): + pass + + +cdef class StreamCoderImpl(CoderImpl): + pass + + +cdef class CallbackCoderImpl(CoderImpl): + cdef object _encoder + cdef object _decoder + + +cdef class DeterministicPickleCoderImpl(CoderImpl): + cdef CoderImpl _pickle_coder + cdef object _step_label + cdef bint _check_safe(self, value) except -1 + + +cdef class BytesCoderImpl(CoderImpl): + pass + + +cdef class FloatCoderImpl(StreamCoderImpl): + pass + + +cdef class TimestampCoderImpl(StreamCoderImpl): + cdef object timestamp_class + + +cdef list small_ints +cdef class VarIntCoderImpl(StreamCoderImpl): + @cython.locals(ivalue=libc.stdint.int64_t) + cpdef bytes encode(self, value) + + +cdef class SingletonCoderImpl(CoderImpl): + cdef object _value + + +cdef class AbstractComponentCoderImpl(StreamCoderImpl): + cdef tuple _coder_impls + + cpdef _extract_components(self, value) + cpdef _construct_from_components(self, components) + + @cython.locals(c=CoderImpl) + cpdef encode_to_stream(self, value, OutputStream stream, bint nested) + @cython.locals(c=CoderImpl) + cpdef decode_from_stream(self, InputStream stream, bint nested) + + +cdef class TupleCoderImpl(AbstractComponentCoderImpl): + pass + + +cdef class SequenceCoderImpl(StreamCoderImpl): + cdef CoderImpl _elem_coder + cpdef _construct_from_sequence(self, values) + + +cdef class TupleSequenceCoderImpl(SequenceCoderImpl): + pass + + +cdef class WindowedValueCoderImpl(StreamCoderImpl): + """A coder for windowed values.""" + cdef CoderImpl _value_coder + cdef CoderImpl _timestamp_coder + cdef CoderImpl _windows_coder http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/coder_impl.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py new file mode 100644 index 0000000..0ce4354 --- /dev/null +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -0,0 +1,316 @@ +# Copyright 2016 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Coder implementations. + +The actual encode/decode implementations are split off from coders to +allow conditional (compiled/pure) implementations, which can be used to +encode many elements with minimal overhead. + +This module may be optionally compiled with Cython, using the corresponding +coder_impl.pxd file for type hints. +""" + +import collections +from cPickle import loads, dumps + + +# pylint: disable=g-import-not-at-top +try: + # Don't depend on the full dataflow sdk to test coders. + from google.cloud.dataflow.transforms.window import WindowedValue +except ImportError: + WindowedValue = collections.namedtuple( + 'WindowedValue', ('value', 'timestamp', 'windows')) + +try: + from stream import InputStream as create_InputStream + from stream import OutputStream as create_OutputStream +except ImportError: + from slow_stream import InputStream as create_InputStream + from slow_stream import OutputStream as create_OutputStream +# pylint: enable=g-import-not-at-top + + +class CoderImpl(object): + + def encode_to_stream(self, value, stream, nested): + """Reads object from potentially-nested encoding in stream.""" + raise NotImplementedError + + def decode_from_stream(self, stream, nested): + """Reads object from potentially-nested encoding in stream.""" + raise NotImplementedError + + def encode(self, value): + """Encodes an object to an unnested string.""" + raise NotImplementedError + + def decode(self, encoded): + """Encodes an object to an unnested string.""" + raise NotImplementedError + + +class SimpleCoderImpl(CoderImpl): + """Subclass of CoderImpl implementing stream methods using encode/decode.""" + + def encode_to_stream(self, value, stream, nested): + """Reads object from potentially-nested encoding in stream.""" + stream.write(self.encode(value), nested) + + def decode_from_stream(self, stream, nested): + """Reads object from potentially-nested encoding in stream.""" + return self.decode(stream.read_all(nested)) + + +class StreamCoderImpl(CoderImpl): + """Subclass of CoderImpl implementing encode/decode using stream methods.""" + + def encode(self, value): + out = create_OutputStream() + self.encode_to_stream(value, out, False) + return out.get() + + def decode(self, encoded): + return self.decode_from_stream(create_InputStream(encoded), False) + + +class CallbackCoderImpl(CoderImpl): + """A CoderImpl that calls back to the _impl methods on the Coder itself. + + This is the default implementation used if Coder._get_impl() + is not overwritten. + """ + + def __init__(self, encoder, decoder): + self._encoder = encoder + self._decoder = decoder + + def encode_to_stream(self, value, stream, nested): + return stream.write(self._encoder(value), nested) + + def decode_from_stream(self, stream, nested): + return self._decoder(stream.read_all(nested)) + + def encode(self, value): + return self._encoder(value) + + def decode(self, encoded): + return self._decoder(encoded) + + +class DeterministicPickleCoderImpl(CoderImpl): + + def __init__(self, pickle_coder, step_label): + self._pickle_coder = pickle_coder + self._step_label = step_label + + def _check_safe(self, value): + if isinstance(value, (str, unicode, long, int, float)): + pass + elif value is None: + pass + elif isinstance(value, (tuple, list)): + for x in value: + self._check_safe(x) + else: + raise TypeError( + "Unable to deterministically code '%s' of type '%s', " + "please provide a type hint for the input of '%s'" % ( + value, type(value), self._step_label)) + + def encode_to_stream(self, value, stream, nested): + self._check_safe(value) + return self._pickle_coder.encode_to_stream(value, stream, nested) + + def decode_from_stream(self, stream, nested): + return self._pickle_coder.decode_from_stream(stream, nested) + + def encode(self, value): + self._check_safe(value) + return self._pickle_coder.encode(value) + + def decode(self, encoded): + return self._pickle_coder.decode(encoded) + + +class BytesCoderImpl(CoderImpl): + """A coder for bytes/str objects.""" + + def encode_to_stream(self, value, out, nested): + out.write(value, nested) + + def decode_from_stream(self, in_stream, nested): + return in_stream.read_all(nested) + + def encode(self, value): + assert isinstance(value, bytes), (value, type(value)) + return value + + def decode(self, encoded): + return encoded + + +class FloatCoderImpl(StreamCoderImpl): + + def encode_to_stream(self, value, out, nested): + out.write_bigendian_double(value) + + def decode_from_stream(self, in_stream, nested): + return in_stream.read_bigendian_double() + + +class TimestampCoderImpl(StreamCoderImpl): + + def __init__(self, timestamp_class): + self.timestamp_class = timestamp_class + + def encode_to_stream(self, value, out, nested): + out.write_bigendian_int64(value.micros) + + def decode_from_stream(self, in_stream, nested): + return self.timestamp_class(micros=in_stream.read_bigendian_int64()) + + +small_ints = [chr(_) for _ in range(128)] + + +class VarIntCoderImpl(StreamCoderImpl): + """A coder for long/int objects.""" + + def encode_to_stream(self, value, out, nested): + out.write_var_int64(value) + + def decode_from_stream(self, in_stream, nested): + return in_stream.read_var_int64() + + def encode(self, value): + ivalue = value # type cast + if 0 <= ivalue < len(small_ints): + return small_ints[ivalue] + else: + return StreamCoderImpl.encode(self, value) + + def decode(self, encoded): + if len(encoded) == 1: + i = ord(encoded) + if 0 <= i < 128: + return i + return StreamCoderImpl.decode(self, encoded) + + +class SingletonCoderImpl(CoderImpl): + """A coder that always encodes exactly one value.""" + + def __init__(self, value): + self._value = value + + def encode_to_stream(self, value, stream, nested): + pass + + def decode_from_stream(self, stream, nested): + return self._value + + def encode(self, value): + b = '' # avoid byte vs str vs unicode error + return b + + def decode(self, encoded): + return self._value + + +class AbstractComponentCoderImpl(StreamCoderImpl): + + def __init__(self, coder_impls): + for c in coder_impls: + assert isinstance(c, CoderImpl), c + self._coder_impls = tuple(coder_impls) + + def _extract_components(self, value): + raise NotImplementedError + + def _construct_from_components(self, components): + raise NotImplementedError + + def encode_to_stream(self, value, out, nested): + values = self._extract_components(value) + if len(self._coder_impls) != len(values): + raise ValueError( + 'Number of components does not match number of coders.') + for i in range(0, len(self._coder_impls)): + c = self._coder_impls[i] # type cast + c.encode_to_stream(values[i], out, True) + + def decode_from_stream(self, in_stream, nested): + return self._construct_from_components( + [c.decode_from_stream(in_stream, True) for c in self._coder_impls]) + + +class TupleCoderImpl(AbstractComponentCoderImpl): + """A coder for tuple objects.""" + + def _extract_components(self, value): + return value + + def _construct_from_components(self, components): + return tuple(components) + + +class SequenceCoderImpl(StreamCoderImpl): + """A coder for sequences of known length.""" + + def __init__(self, elem_coder): + self._elem_coder = elem_coder + + def _construct_from_sequence(self, values): + raise NotImplementedError + + def encode_to_stream(self, value, out, nested): + # Compatible with Java's IterableLikeCoder. + out.write_bigendian_int32(len(value)) + for elem in value: + self._elem_coder.encode_to_stream(elem, out, True) + + def decode_from_stream(self, in_stream, nested): + size = in_stream.read_bigendian_int32() + return self._construct_from_sequence( + [self._elem_coder.decode_from_stream(in_stream, True) + for _ in range(size)]) + + +class TupleSequenceCoderImpl(SequenceCoderImpl): + """A coder for homogeneous tuple objects.""" + + def _construct_from_sequence(self, components): + return tuple(components) + + +class WindowedValueCoderImpl(StreamCoderImpl): + """A coder for windowed values.""" + + def __init__(self, value_coder, timestamp_coder, window_coder): + self._value_coder = value_coder + self._timestamp_coder = timestamp_coder + self._windows_coder = TupleSequenceCoderImpl(window_coder) + + def encode_to_stream(self, value, out, nested): + self._value_coder.encode_to_stream(value.value, out, True) + self._timestamp_coder.encode_to_stream(value.timestamp, out, True) + self._windows_coder.encode_to_stream(value.windows, out, True) + + def decode_from_stream(self, in_stream, nested): + return WindowedValue( + self._value_coder.decode_from_stream(in_stream, True), + self._timestamp_coder.decode_from_stream(in_stream, True), + self._windows_coder.decode_from_stream(in_stream, True)) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/coders.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py new file mode 100644 index 0000000..16edff0 --- /dev/null +++ b/sdks/python/apache_beam/coders/coders.py @@ -0,0 +1,511 @@ +# Copyright 2016 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Collection of useful coders.""" + +import base64 +import collections +import cPickle as pickle + +from google.cloud.dataflow.coders import coder_impl + + +# pylint: disable=g-import-not-at-top +# Avoid dependencies on the full SDK. +try: + # Import dill from the pickler module to make sure our monkey-patching of dill + # occurs. + from google.cloud.dataflow.internal.pickler import dill + from google.cloud.dataflow.transforms.timeutil import Timestamp +except ImportError: + # We fall back to using the stock dill library in tests that don't use the + # full Python SDK. + import dill + Timestamp = collections.namedtuple('Timestamp', 'micros') + + +def serialize_coder(coder): + from google.cloud.dataflow.internal import pickler + return '%s$%s' % (coder.__class__.__name__, pickler.dumps(coder)) + + +def deserialize_coder(serialized): + from google.cloud.dataflow.internal import pickler + return pickler.loads(serialized.split('$', 1)[1]) +# pylint: enable=g-import-not-at-top + + +class Coder(object): + """Base class for coders.""" + + def encode(self, value): + """Encodes the given object into a byte string.""" + raise NotImplementedError('Encode not implemented: %s.' % self) + + def decode(self, encoded): + """Decodes the given byte string into the corresponding object.""" + raise NotImplementedError('Decode not implemented: %s.' % self) + + def is_deterministic(self): + """Whether this coder is guaranteed to encode values deterministically. + + A deterministic coder is required for key coders in GroupByKey operations + to produce consistent results. + + For example, note that the default coder, the PickleCoder, is not + deterministic: the ordering of picked entries in maps may vary across + executions since there is no defined order, and such a coder is not in + general suitable for usage as a key coder in GroupByKey operations, since + each instance of the same key may be encoded differently. + + Returns: + Whether coder is deterministic. + """ + return False + + # =========================================================================== + # Methods below are internal SDK details that don't need to be modified for + # user-defined coders. + # =========================================================================== + + def _create_impl(self): + """Creates a CoderImpl to do the actual encoding and decoding. + """ + return coder_impl.CallbackCoderImpl(self.encode, self.decode) + + def get_impl(self): + if not hasattr(self, '_impl'): + self._impl = self._create_impl() + assert isinstance(self._impl, coder_impl.CoderImpl) + return self._impl + + def __getstate__(self): + return self._dict_without_impl() + + def _dict_without_impl(self): + if hasattr(self, '_impl'): + d = dict(self.__dict__) + del d['_impl'] + return d + else: + return self.__dict__ + + @classmethod + def from_type_hint(cls, unused_typehint, unused_registry): + # If not overridden, just construct the coder without arguments. + return cls() + + def is_kv_coder(self): + return False + + def key_coder(self): + if self.is_kv_coder(): + raise NotImplementedError('key_coder: %s' % self) + else: + raise ValueError('Not a KV coder: %s.' % self) + + def value_coder(self): + if self.is_kv_coder(): + raise NotImplementedError('value_coder: %s' % self) + else: + raise ValueError('Not a KV coder: %s.' % self) + + def _get_component_coders(self): + """Returns the internal component coders of this coder.""" + # This is an internal detail of the Coder API and does not need to be + # refined in user-defined Coders. + return [] + + def as_cloud_object(self): + """Returns Google Cloud Dataflow API description of this coder.""" + # This is an internal detail of the Coder API and does not need to be + # refined in user-defined Coders. + + value = { + # We pass coders in the form "<coder_name>$<pickled_data>" to make the + # job description JSON more readable. Data before the $ is ignored by + # the worker. + '@type': serialize_coder(self), + 'component_encodings': list( + component.as_cloud_object() + for component in self._get_component_coders()) + } + return value + + def __repr__(self): + return self.__class__.__name__ + + def __eq__(self, other): + # pylint: disable=protected-access + return (self.__class__ == other.__class__ + and self._dict_without_impl() == other._dict_without_impl()) + # pylint: enable=protected-access + + +class StrUtf8Coder(Coder): + """A coder used for reading and writing strings as UTF-8.""" + + def encode(self, value): + return value.encode('utf-8') + + def decode(self, value): + return value.decode('utf-8') + + def is_deterministic(self): + return True + + +class ToStringCoder(Coder): + """A default string coder used if no sink coder is specified.""" + + def encode(self, value): + if isinstance(value, unicode): + return value.encode('utf-8') + elif isinstance(value, str): + return value + else: + return str(value) + + def decode(self, _): + raise NotImplementedError('ToStringCoder cannot be used for decoding.') + + def is_deterministic(self): + return True + + +class FastCoder(Coder): + """Coder subclass used when a (faster) CoderImpl is supplied directly. + + The Coder class defines _create_impl in terms of encode() and decode(); + this class inverts that defining encode() and decode() in terms of + _create_impl(). + """ + + def encode(self, value): + """Encodes the given object into a byte string.""" + return self.get_impl().encode(value) + + def decode(self, encoded): + """Decodes the given byte string into the corresponding object.""" + return self.get_impl().decode(encoded) + + def _create_impl(self): + raise NotImplementedError + + +class BytesCoder(FastCoder): + """Byte string coder.""" + + def _create_impl(self): + return coder_impl.BytesCoderImpl() + + def is_deterministic(self): + return True + + +class VarIntCoder(FastCoder): + """Variable-length integer coder.""" + + def _create_impl(self): + return coder_impl.VarIntCoderImpl() + + def is_deterministic(self): + return True + + +class FloatCoder(FastCoder): + """A coder used for floating-point values.""" + + def _create_impl(self): + return coder_impl.FloatCoderImpl() + + def is_deterministic(self): + return True + + +class TimestampCoder(FastCoder): + """A coder used for timeutil.Timestamp values.""" + + def _create_impl(self): + return coder_impl.TimestampCoderImpl(Timestamp) + + def is_deterministic(self): + return True + + +class SingletonCoder(FastCoder): + """A coder that always encodes exactly one value.""" + + def __init__(self, value): + self._value = value + + def _create_impl(self): + return coder_impl.SingletonCoderImpl(self._value) + + def is_deterministic(self): + return True + + +def maybe_dill_dumps(o): + """Pickle using cPickle or the Dill pickler as a fallback.""" + # We need to use the dill pickler for objects of certain custom classes, + # including, for example, ones that contain lambdas. + try: + return pickle.dumps(o) + except Exception: # pylint: disable=broad-except + return dill.dumps(o) + + +def maybe_dill_loads(o): + """Unpickle using cPickle or the Dill pickler as a fallback.""" + try: + return pickle.loads(o) + except Exception: # pylint: disable=broad-except + return dill.loads(o) + + +class _PickleCoderBase(FastCoder): + """Base class for pickling coders.""" + + def is_deterministic(self): + # Note that the default coder, the PickleCoder, is not deterministic (for + # example, the ordering of picked entries in maps may vary across + # executions), and so is not in general suitable for usage as a key coder in + # GroupByKey operations. + return False + + def as_cloud_object(self, is_pair_like=True): + value = super(_PickleCoderBase, self).as_cloud_object() + # We currently use this coder in places where we cannot infer the coder to + # use for the value type in a more granular way. In places where the + # service expects a pair, it checks for the "is_pair_like" key, in which + # case we would fail without the hack below. + if is_pair_like: + value['is_pair_like'] = True + value['component_encodings'] = [ + self.as_cloud_object(is_pair_like=False), + self.as_cloud_object(is_pair_like=False) + ] + + return value + + # We allow .key_coder() and .value_coder() to be called on PickleCoder since + # we can't always infer the return values of lambdas in ParDo operations, the + # result of which may be used in a GroupBykey. + def is_kv_coder(self): + return True + + def key_coder(self): + return self + + def value_coder(self): + return self + + +class PickleCoder(_PickleCoderBase): + """Coder using Python's pickle functionality.""" + + def _create_impl(self): + return coder_impl.CallbackCoderImpl(pickle.dumps, pickle.loads) + + +class DillCoder(_PickleCoderBase): + """Coder using dill's pickle functionality.""" + + def _create_impl(self): + return coder_impl.CallbackCoderImpl(maybe_dill_dumps, maybe_dill_loads) + + +class DeterministicPickleCoder(FastCoder): + """Throws runtime errors when pickling non-deterministic values.""" + + def __init__(self, pickle_coder, step_label): + self._pickle_coder = pickle_coder + self._step_label = step_label + + def _create_impl(self): + return coder_impl.DeterministicPickleCoderImpl( + self._pickle_coder.get_impl(), self._step_label) + + def is_deterministic(self): + return True + + def is_kv_coder(self): + return True + + def key_coder(self): + return self + + def value_coder(self): + return self + + +class Base64PickleCoder(Coder): + """Coder of objects by Python pickle, then base64 encoding.""" + # TODO(robertwb): Do base64 encoding where it's needed (e.g. in json) rather + # than via a special Coder. + + def encode(self, value): + return base64.b64encode(pickle.dumps(value)) + + def decode(self, encoded): + return pickle.loads(base64.b64decode(encoded)) + + def is_deterministic(self): + # Note that the Base64PickleCoder is not deterministic. See the + # corresponding comments for PickleCoder above. + return False + + # We allow .key_coder() and .value_coder() to be called on Base64PickleCoder + # since we can't always infer the return values of lambdas in ParDo + # operations, the result of which may be used in a GroupBykey. + # + # TODO(ccy): this is currently only used for KV values from Create transforms. + # Investigate a way to unify this with PickleCoder. + def is_kv_coder(self): + return True + + def key_coder(self): + return self + + def value_coder(self): + return self + + +class TupleCoder(FastCoder): + """Coder of tuple objects.""" + + def __init__(self, components): + self._coders = tuple(components) + + def _create_impl(self): + return coder_impl.TupleCoderImpl([c.get_impl() for c in self._coders]) + + def is_deterministic(self): + return all(c.is_deterministic() for c in self._coders) + + @staticmethod + def from_type_hint(typehint, registry): + return TupleCoder([registry.get_coder(t) for t in typehint.tuple_types]) + + def as_cloud_object(self): + value = super(TupleCoder, self).as_cloud_object() + value['is_pair_like'] = True + return value + + def _get_component_coders(self): + return self.coders() + + def coders(self): + return self._coders + + def is_kv_coder(self): + return len(self._coders) == 2 + + def key_coder(self): + if len(self._coders) != 2: + raise ValueError('TupleCoder does not have exactly 2 components.') + return self._coders[0] + + def value_coder(self): + if len(self._coders) != 2: + raise ValueError('TupleCoder does not have exactly 2 components.') + return self._coders[1] + + def __repr__(self): + return 'TupleCoder[%s]' % ', '.join(str(c) for c in self._coders) + + +class TupleSequenceCoder(FastCoder): + """Coder of homogeneous tuple objects.""" + + def __init__(self, elem_coder): + self._elem_coder = elem_coder + + def _create_impl(self): + return coder_impl.TupleSequenceCoderImpl(self._elem_coder.get_impl()) + + def is_deterministic(self): + return self._elem_coder.is_deterministic() + + @staticmethod + def from_type_hint(typehint, registry): + return TupleSequenceCoder(registry.get_coder(typehint.inner_type)) + + def _get_component_coders(self): + return (self._elem_coder,) + + def __repr__(self): + return 'TupleSequenceCoder[%r]' % self._elem_coder + + +class WindowCoder(PickleCoder): + """Coder for windows in windowed values.""" + + def _create_impl(self): + return coder_impl.CallbackCoderImpl(pickle.dumps, pickle.loads) + + def is_deterministic(self): + # Note that WindowCoder as implemented is not deterministic because the + # implementation simply pickles windows. See the corresponding comments + # on PickleCoder for more details. + return False + + def as_cloud_object(self): + return super(WindowCoder, self).as_cloud_object(is_pair_like=False) + + +class WindowedValueCoder(FastCoder): + """Coder for windowed values.""" + + def __init__(self, wrapped_value_coder, timestamp_coder=None, + window_coder=None): + if not timestamp_coder: + timestamp_coder = TimestampCoder() + if not window_coder: + window_coder = PickleCoder() + self.wrapped_value_coder = wrapped_value_coder + self.timestamp_coder = timestamp_coder + self.window_coder = window_coder + + def _create_impl(self): + return coder_impl.WindowedValueCoderImpl( + self.wrapped_value_coder.get_impl(), + self.timestamp_coder.get_impl(), + self.window_coder.get_impl()) + + def is_deterministic(self): + return all(c.is_deterministic() for c in [self.wrapped_value_coder, + self.timestamp_coder, + self.window_coder]) + + def as_cloud_object(self): + value = super(WindowedValueCoder, self).as_cloud_object() + value['is_wrapper'] = True + return value + + def _get_component_coders(self): + return [self.wrapped_value_coder, self.timestamp_coder, self.window_coder] + + def is_kv_coder(self): + return self.wrapped_value_coder.is_kv_coder() + + def key_coder(self): + return self.wrapped_value_coder.key_coder() + + def value_coder(self): + return self.wrapped_value_coder.value_coder() + + def __repr__(self): + return 'WindowedValueCoder[%s]' % self.wrapped_value_coder http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/coders_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coders_test.py b/sdks/python/apache_beam/coders/coders_test.py new file mode 100644 index 0000000..d11d310 --- /dev/null +++ b/sdks/python/apache_beam/coders/coders_test.py @@ -0,0 +1,60 @@ +# Copyright 2016 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import base64 +import logging +import unittest + +from google.cloud.dataflow import coders + + +class PickleCoderTest(unittest.TestCase): + + def test_basics(self): + v = ('a' * 10, 'b' * 90) + pickler = coders.PickleCoder() + self.assertEquals(v, pickler.decode(pickler.encode(v))) + pickler = coders.Base64PickleCoder() + self.assertEquals(v, pickler.decode(pickler.encode(v))) + self.assertEquals( + coders.Base64PickleCoder().encode(v), + base64.b64encode(coders.PickleCoder().encode(v))) + + def test_equality(self): + self.assertEquals(coders.PickleCoder(), coders.PickleCoder()) + self.assertEquals(coders.Base64PickleCoder(), coders.Base64PickleCoder()) + self.assertNotEquals(coders.Base64PickleCoder(), coders.PickleCoder()) + self.assertNotEquals(coders.Base64PickleCoder(), object()) + + +class CodersTest(unittest.TestCase): + + def test_str_utf8_coder(self): + real_coder = coders.registry.get_coder(str) + expected_coder = coders.BytesCoder() + self.assertEqual( + real_coder.encode('abc'), expected_coder.encode('abc')) + self.assertEqual('abc', real_coder.decode(real_coder.encode('abc'))) + + real_coder = coders.registry.get_coder(bytes) + expected_coder = coders.BytesCoder() + self.assertEqual( + real_coder.encode('abc'), expected_coder.encode('abc')) + self.assertEqual('abc', real_coder.decode(real_coder.encode('abc'))) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/coders_test_common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py new file mode 100644 index 0000000..29eaace --- /dev/null +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -0,0 +1,180 @@ +# Copyright 2016 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests common to all coder implementations.""" + +import logging +import math +import sys +import unittest + +import dill + +import coders + + +# Defined out of line for picklability. +class CustomCoder(coders.Coder): + + def encode(self, x): + return str(x+1) + + def decode(self, encoded): + return int(encoded) - 1 + + +class CodersTest(unittest.TestCase): + + # These class methods ensure that we test each defined coder in both + # nested and unnested context. + + @classmethod + def setUpClass(cls): + cls.seen = set() + cls.seen_nested = set() + + @classmethod + def tearDownClass(cls): + standard = set(c + for c in coders.__dict__.values() + if isinstance(c, type) and issubclass(c, coders.Coder) and + 'Base' not in c.__name__) + standard -= set([coders.Coder, + coders.FastCoder, + coders.Base64PickleCoder, + coders.FloatCoder, + coders.TimestampCoder, + coders.ToStringCoder, + coders.WindowCoder, + coders.WindowedValueCoder]) + assert not standard - cls.seen, standard - cls.seen + assert not standard - cls.seen_nested, standard - cls.seen_nested + + @classmethod + def _observe(cls, coder): + cls.seen.add(type(coder)) + cls._observe_nested(coder) + + @classmethod + def _observe_nested(cls, coder): + if isinstance(coder, coders.TupleCoder): + for c in coder.coders(): + cls.seen_nested.add(type(c)) + cls._observe_nested(c) + + def check_coder(self, coder, *values): + self._observe(coder) + for v in values: + self.assertEqual(v, coder.decode(coder.encode(v))) + copy1 = dill.loads(dill.dumps(coder)) + copy2 = dill.loads(dill.dumps(coder)) + for v in values: + self.assertEqual(v, copy1.decode(copy2.encode(v))) + + def test_custom_coder(self): + + self.check_coder(CustomCoder(), 1, -10, 5) + self.check_coder(coders.TupleCoder((CustomCoder(), coders.BytesCoder())), + (1, 'a'), (-10, 'b'), (5, 'c')) + + def test_pickle_coder(self): + self.check_coder(coders.PickleCoder(), 'a', 1, 1.5, (1, 2, 3)) + + def test_deterministic_pickle_coder(self): + coder = coders.DeterministicPickleCoder(coders.PickleCoder(), 'step') + self.check_coder(coder, 'a', 1, 1.5, (1, 2, 3)) + with self.assertRaises(TypeError): + self.check_coder(coder, dict()) + with self.assertRaises(TypeError): + self.check_coder(coder, [1, dict()]) + + self.check_coder(coders.TupleCoder((coder, coders.PickleCoder())), + (1, dict()), ('a', [dict()])) + + def test_dill_coder(self): + cell_value = (lambda x: lambda: x)(0).func_closure[0] + self.check_coder(coders.DillCoder(), 'a', 1, cell_value) + self.check_coder( + coders.TupleCoder((coders.VarIntCoder(), coders.DillCoder())), + (1, cell_value)) + + def test_bytes_coder(self): + self.check_coder(coders.BytesCoder(), 'a', '\0', 'z' * 1000) + + def test_varint_coder(self): + # Small ints. + self.check_coder(coders.VarIntCoder(), *range(-10, 10)) + # Multi-byte encoding starts at 128 + self.check_coder(coders.VarIntCoder(), *range(120, 140)) + # Large values + self.check_coder(coders.VarIntCoder(), + *[int(math.pow(-1, k) * math.exp(k)) + for k in range(0, int(math.log(sys.maxint)))]) + + def test_float_coder(self): + self.check_coder(coders.FloatCoder(), + *[float(0.1 * x) for x in range(-100, 100)]) + self.check_coder(coders.FloatCoder(), + *[float(2 ** (0.1 * x)) for x in range(-100, 100)]) + self.check_coder(coders.FloatCoder(), float('-Inf'), float('Inf')) + + def test_singleton_coder(self): + a = 'anything' + b = 'something else' + self.check_coder(coders.SingletonCoder(a), a) + self.check_coder(coders.SingletonCoder(b), b) + self.check_coder(coders.TupleCoder((coders.SingletonCoder(a), + coders.SingletonCoder(b))), (a, b)) + + def test_timestamp_coder(self): + self.check_coder(coders.TimestampCoder(), + *[coders.Timestamp(micros=x) for x in range(-100, 100)]) + self.check_coder(coders.TimestampCoder(), + coders.Timestamp(micros=-1234567890), + coders.Timestamp(micros=1234567890)) + self.check_coder(coders.TimestampCoder(), + coders.Timestamp(micros=-1234567890123456789), + coders.Timestamp(micros=1234567890123456789)) + + def test_tuple_coder(self): + self.check_coder( + coders.TupleCoder((coders.VarIntCoder(), coders.BytesCoder())), + (1, 'a'), + (-2, 'a' * 100), + (300, 'abc\0' * 5)) + self.check_coder( + coders.TupleCoder( + (coders.TupleCoder((coders.PickleCoder(), coders.VarIntCoder())), + coders.StrUtf8Coder())), + ((1, 2), 'a'), + ((-2, 5), u'a\u0101' * 100), + ((300, 1), 'abc\0' * 5)) + + def test_tuple_sequence_coder(self): + int_tuple_coder = coders.TupleSequenceCoder(coders.VarIntCoder()) + self.check_coder(int_tuple_coder, (1, -1, 0), (), tuple(range(1000))) + self.check_coder( + coders.TupleCoder((coders.VarIntCoder(), int_tuple_coder)), + (1, (1, 2, 3))) + + def test_base64_pickle_coder(self): + self.check_coder(coders.Base64PickleCoder(), 'a', 1, 1.5, (1, 2, 3)) + + def test_utf8_coder(self): + self.check_coder(coders.StrUtf8Coder(), 'a', u'ab\u00FF', u'\u0101\0') + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/fast_coders_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/fast_coders_test.py b/sdks/python/apache_beam/coders/fast_coders_test.py new file mode 100644 index 0000000..f2f4e6c --- /dev/null +++ b/sdks/python/apache_beam/coders/fast_coders_test.py @@ -0,0 +1,34 @@ +# Copyright 2016 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for compiled implementation of coder impls.""" + +import logging +import unittest + + +# Run all the standard coder test cases. +from google.cloud.dataflow.coders.coders_test_common import * + + +class FastCoders(unittest.TestCase): + + def test_using_fast_impl(self): + # pylint: disable=g-import-not-at-top + # pylint: disable=unused-variable + import google.cloud.dataflow.coders.stream + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/observable.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/observable.py b/sdks/python/apache_beam/coders/observable.py new file mode 100644 index 0000000..8a01752 --- /dev/null +++ b/sdks/python/apache_beam/coders/observable.py @@ -0,0 +1,33 @@ +# Copyright 2016 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +"""Observable base class for iterables.""" + + +class ObservableMixin(object): + """An observable iterable. + + Subclasses need to call self.notify_observers with any object yielded. + """ + + def __init__(self): + self.observers = [] + + def register_observer(self, callback): + self.observers.append(callback) + + def notify_observers(self, value, **kwargs): + for o in self.observers: + o(value, **kwargs) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/observable_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/observable_test.py b/sdks/python/apache_beam/coders/observable_test.py new file mode 100644 index 0000000..2b091bf --- /dev/null +++ b/sdks/python/apache_beam/coders/observable_test.py @@ -0,0 +1,54 @@ +# Copyright 2016 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for the Observable mixin class.""" + +import logging +import unittest + + +from google.cloud.dataflow.coders import observable + + +class ObservableMixinTest(unittest.TestCase): + observed_count = 0 + observed_sum = 0 + observed_keys = [] + + def observer(self, value, key=None): + self.observed_count += 1 + self.observed_sum += value + self.observed_keys.append(key) + + def test_observable(self): + class Watched(observable.ObservableMixin): + + def __iter__(self): + for i in (1, 4, 3): + self.notify_observers(i, key='a%d' % i) + yield i + + watched = Watched() + watched.register_observer(lambda v, key: self.observer(v, key=key)) + for _ in watched: + pass + + self.assertEquals(3, self.observed_count) + self.assertEquals(8, self.observed_sum) + self.assertEquals(['a1', 'a3', 'a4'], sorted(self.observed_keys)) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/slow_coders_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/slow_coders_test.py b/sdks/python/apache_beam/coders/slow_coders_test.py new file mode 100644 index 0000000..8cb23ae --- /dev/null +++ b/sdks/python/apache_beam/coders/slow_coders_test.py @@ -0,0 +1,36 @@ +# Copyright 2016 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for uncompiled implementation of coder impls.""" + +import logging +import unittest + + +# Run all the standard coder test cases. +from google.cloud.dataflow.coders.coders_test_common import * + + +class SlowCoders(unittest.TestCase): + + def test_using_slow_impl(self): + # Assert that we are not using the compiled implementation. + with self.assertRaises(ImportError): + # pylint: disable=g-import-not-at-top + # pylint: disable=unused-variable + import google.cloud.dataflow.coders.stream + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/slow_stream.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/slow_stream.py b/sdks/python/apache_beam/coders/slow_stream.py new file mode 100644 index 0000000..ea09d54 --- /dev/null +++ b/sdks/python/apache_beam/coders/slow_stream.py @@ -0,0 +1,136 @@ +# Copyright 2016 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""A pure Python implementation of stream.pyx.""" + +import struct + + +class OutputStream(object): + """A pure Python implementation of stream.OutputStream.""" + + def __init__(self): + self.data = [] + + def write(self, b, nested=False): + assert isinstance(b, str) + if nested: + self.write_var_int64(len(b)) + self.data.append(b) + + def write_byte(self, val): + self.data.append(chr(val)) + + def write_var_int64(self, v): + if v < 0: + v += 1 << 64 + if v <= 0: + raise ValueError('Value too large (negative).') + while True: + bits = v & 0x7F + v >>= 7 + if v: + bits |= 0x80 + self.write_byte(bits) + if not v: + break + + def write_bigendian_int64(self, v): + self.write(struct.pack('>q', v)) + + def write_bigendian_int32(self, v): + self.write(struct.pack('>i', v)) + + def write_bigendian_double(self, v): + self.write(struct.pack('>d', v)) + + def get(self): + return ''.join(self.data) + + +class ByteCountingOutputStream(OutputStream): + """A pure Python implementation of stream.ByteCountingOutputStream.""" + + def __init__(self): + # Note that we don't actually use any of the data initialized by our super. + super(ByteCountingOutputStream, self).__init__() + self.count = 0 + + def write(self, byte_array, nested=False): + blen = len(byte_array) + if nested: + self.write_var_int64(blen) + self.count += blen + + def write_byte(self, _): + self.count += 1 + + def get_count(self): + return self.count + + def get(self): + raise NotImplementedError + + def __str__(self): + return '<%s %s>' % (self.__class__.__name__, self.count) + + +class InputStream(object): + """A pure Python implementation of stream.InputStream.""" + + def __init__(self, data): + self.data = data + self.pos = 0 + + def size(self): + return len(self.data) - self.pos + + def read(self, size): + self.pos += size + return self.data[self.pos - size : self.pos] + + def read_all(self, nested): + return self.read(self.read_var_int64() if nested else self.size()) + + def read_byte(self): + self.pos += 1 + return ord(self.data[self.pos - 1]) + + def read_var_int64(self): + shift = 0 + result = 0 + while True: + byte = self.read_byte() + if byte < 0: + raise RuntimeError('VarLong not terminated.') + + bits = byte & 0x7F + if shift >= 64 or (shift >= 63 and bits > 1): + raise RuntimeError('VarLong too long.') + result |= bits << shift + shift += 7 + if not byte & 0x80: + break + if result >= 1 << 63: + result -= 1 << 64 + return result + + def read_bigendian_int64(self): + return struct.unpack('>q', self.read(8))[0] + + def read_bigendian_int32(self): + return struct.unpack('>i', self.read(4))[0] + + def read_bigendian_double(self): + return struct.unpack('>d', self.read(8))[0] http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/stream.pxd ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/stream.pxd b/sdks/python/apache_beam/coders/stream.pxd new file mode 100644 index 0000000..3da7324 --- /dev/null +++ b/sdks/python/apache_beam/coders/stream.pxd @@ -0,0 +1,58 @@ +# Copyright 2016 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cimport libc.stdint + + +cdef class OutputStream(object): + cdef char* data + cdef size_t size + cdef size_t pos + + cpdef write(self, bytes b, bint nested=*) + cpdef write_byte(self, unsigned char val) + cpdef write_var_int64(self, libc.stdint.int64_t v) + cpdef write_bigendian_int64(self, libc.stdint.int64_t signed_v) + cpdef write_bigendian_int32(self, libc.stdint.int32_t signed_v) + cpdef write_bigendian_double(self, double d) + + cpdef bytes get(self) + + cdef extend(self, size_t missing) + + +cdef class ByteCountingOutputStream(OutputStream): + cdef size_t count + + cpdef write(self, bytes b, bint nested=*) + cpdef write_byte(self, unsigned char val) + cpdef write_bigendian_int64(self, libc.stdint.int64_t val) + cpdef write_bigendian_int32(self, libc.stdint.int32_t val) + cpdef size_t get_count(self) + cpdef bytes get(self) + + +cdef class InputStream(object): + cdef size_t pos + cdef bytes all + cdef char* allc + + cpdef size_t size(self) except? -1 + cpdef bytes read(self, size_t len) + cpdef long read_byte(self) except? -1 + cpdef libc.stdint.int64_t read_var_int64(self) except? -1 + cpdef libc.stdint.int64_t read_bigendian_int64(self) except? -1 + cpdef libc.stdint.int32_t read_bigendian_int32(self) except? -1 + cpdef double read_bigendian_double(self) except? -1 + cpdef bytes read_all(self, bint nested=*) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/stream.pyx ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/stream.pyx b/sdks/python/apache_beam/coders/stream.pyx new file mode 100644 index 0000000..6df186a --- /dev/null +++ b/sdks/python/apache_beam/coders/stream.pyx @@ -0,0 +1,201 @@ +# Copyright 2016 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cimport libc.stdlib +cimport libc.string + + +cdef class OutputStream(object): + """An output string stream implementation supporting write() and get().""" + + #TODO(robertwb): Consider using raw C++ streams. + + def __cinit__(self): + self.size = 1024 + self.pos = 0 + self.data = <char*>libc.stdlib.malloc(self.size) + assert self.data, "OutputStream malloc failed." + + def __dealloc__(self): + if self.data: + libc.stdlib.free(self.data) + + cpdef write(self, bytes b, bint nested=False): + cdef size_t blen = len(b) + if nested: + self.write_var_int64(blen) + if self.size < self.pos + blen: + self.extend(blen) + libc.string.memcpy(self.data + self.pos, <char*>b, blen) + self.pos += blen + + cpdef write_byte(self, unsigned char val): + if self.size < self.pos + 1: + self.extend(1) + self.data[self.pos] = val + self.pos += 1 + + cpdef write_var_int64(self, libc.stdint.int64_t signed_v): + """Encode a long using variable-length encoding to a stream.""" + cdef libc.stdint.uint64_t v = signed_v + cdef long bits + while True: + bits = v & 0x7F + v >>= 7 + if v: + bits |= 0x80 + self.write_byte(bits) + if not v: + break + + cpdef write_bigendian_int64(self, libc.stdint.int64_t signed_v): + cdef libc.stdint.uint64_t v = signed_v + if self.size < self.pos + 8: + self.extend(8) + self.data[self.pos ] = <unsigned char>(v >> 56) + self.data[self.pos + 1] = <unsigned char>(v >> 48) + self.data[self.pos + 2] = <unsigned char>(v >> 40) + self.data[self.pos + 3] = <unsigned char>(v >> 32) + self.data[self.pos + 4] = <unsigned char>(v >> 24) + self.data[self.pos + 5] = <unsigned char>(v >> 16) + self.data[self.pos + 6] = <unsigned char>(v >> 8) + self.data[self.pos + 7] = <unsigned char>(v ) + self.pos += 8 + + cpdef write_bigendian_int32(self, libc.stdint.int32_t signed_v): + cdef libc.stdint.uint32_t v = signed_v + if self.size < self.pos + 4: + self.extend(4) + self.data[self.pos ] = <unsigned char>(v >> 24) + self.data[self.pos + 1] = <unsigned char>(v >> 16) + self.data[self.pos + 2] = <unsigned char>(v >> 8) + self.data[self.pos + 3] = <unsigned char>(v ) + self.pos += 4 + + cpdef write_bigendian_double(self, double d): + self.write_bigendian_int64((<libc.stdint.int64_t*><char*>&d)[0]) + + cpdef bytes get(self): + return self.data[:self.pos] + + cdef extend(self, size_t missing): + while missing > self.size - self.pos: + self.size *= 2 + self.data = <char*>libc.stdlib.realloc(self.data, self.size) + assert self.data, "OutputStream realloc failed." + + +cdef class ByteCountingOutputStream(OutputStream): + """An output string stream implementation that only counts the bytes. + + This implementation counts the number of bytes it "writes" but + doesn't actually write them anyway. Thus it has write() but not + get(). get_count() returns how many bytes were written. + + This is useful for sizing an encoding. + """ + + def __cinit__(self): + self.count = 0 + + cpdef write(self, bytes b, bint nested=False): + cdef size_t blen = len(b) + if nested: + self.write_var_int64(blen) + self.count += blen + + cpdef write_byte(self, unsigned char _): + self.count += 1 + + cpdef write_bigendian_int64(self, libc.stdint.int64_t _): + self.count += 8 + + cpdef write_bigendian_int32(self, libc.stdint.int32_t _): + self.count += 4 + + cpdef size_t get_count(self): + return self.count + + cpdef bytes get(self): + raise NotImplementedError + + def __str__(self): + return '<%s %s>' % (self.__class__.__name__, self.count) + + +cdef class InputStream(object): + """An input string stream implementation supporting read() and size().""" + + def __init__(self, all): + self.allc = self.all = all + + cpdef bytes read(self, size_t size): + self.pos += size + return self.allc[self.pos - size : self.pos] + + cpdef long read_byte(self) except? -1: + self.pos += 1 + # Note: the C++ compiler on Dataflow workers treats the char array below as + # a signed char. This causes incorrect coder behavior unless explicitly + # cast to an unsigned char here. + return <long>(<unsigned char> self.allc[self.pos - 1]) + + cpdef size_t size(self) except? -1: + return len(self.all) - self.pos + + cpdef bytes read_all(self, bint nested=False): + return self.read(self.read_var_int64() if nested else self.size()) + + cpdef libc.stdint.int64_t read_var_int64(self) except? -1: + """Decode a variable-length encoded long from a stream.""" + cdef long byte + cdef long bits + cdef long shift = 0 + cdef libc.stdint.int64_t result = 0 + while True: + byte = self.read_byte() + if byte < 0: + raise RuntimeError('VarInt not terminated.') + + bits = byte & 0x7F + if (shift >= sizeof(long) * 8 or + (shift >= (sizeof(long) * 8 - 1) and bits > 1)): + raise RuntimeError('VarLong too long.') + result |= bits << shift + shift += 7 + if not (byte & 0x80): + break + return result + + cpdef libc.stdint.int64_t read_bigendian_int64(self) except? -1: + self.pos += 8 + return (<unsigned char>self.allc[self.pos - 1] + | <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 2] << 8 + | <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 3] << 16 + | <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 4] << 24 + | <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 5] << 32 + | <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 6] << 40 + | <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 7] << 48 + | <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 8] << 56) + + cpdef libc.stdint.int32_t read_bigendian_int32(self) except? -1: + self.pos += 4 + return (<unsigned char>self.allc[self.pos - 1] + | <libc.stdint.uint32_t><unsigned char>self.allc[self.pos - 2] << 8 + | <libc.stdint.uint32_t><unsigned char>self.allc[self.pos - 3] << 16 + | <libc.stdint.uint32_t><unsigned char>self.allc[self.pos - 4] << 24) + + cpdef double read_bigendian_double(self) except? -1: + cdef libc.stdint.int64_t as_long = self.read_bigendian_int64() + return (<double*><char*>&as_long)[0] http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/stream_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/stream_test.py b/sdks/python/apache_beam/coders/stream_test.py new file mode 100644 index 0000000..3002116 --- /dev/null +++ b/sdks/python/apache_beam/coders/stream_test.py @@ -0,0 +1,168 @@ +# Copyright 2016 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for the stream implementations.""" + +import logging +import math +import unittest + + +from google.cloud.dataflow.coders import slow_stream + + +class StreamTest(unittest.TestCase): + # pylint: disable=invalid-name + InputStream = slow_stream.InputStream + OutputStream = slow_stream.OutputStream + ByteCountingOutputStream = slow_stream.ByteCountingOutputStream + # pylint: enable=invalid-name + + def test_read_write(self): + out_s = self.OutputStream() + out_s.write('abc') + out_s.write('\0\t\n') + out_s.write('xyz', True) + out_s.write('', True) + in_s = self.InputStream(out_s.get()) + self.assertEquals('abc\0\t\n', in_s.read(6)) + self.assertEquals('xyz', in_s.read_all(True)) + self.assertEquals('', in_s.read_all(True)) + + def test_read_all(self): + out_s = self.OutputStream() + out_s.write('abc') + in_s = self.InputStream(out_s.get()) + self.assertEquals('abc', in_s.read_all(False)) + + def test_read_write_byte(self): + out_s = self.OutputStream() + out_s.write_byte(1) + out_s.write_byte(0) + out_s.write_byte(0xFF) + in_s = self.InputStream(out_s.get()) + self.assertEquals(1, in_s.read_byte()) + self.assertEquals(0, in_s.read_byte()) + self.assertEquals(0xFF, in_s.read_byte()) + + def test_read_write_large(self): + values = range(4 * 1024) + out_s = self.OutputStream() + for v in values: + out_s.write_bigendian_int64(v) + in_s = self.InputStream(out_s.get()) + for v in values: + self.assertEquals(v, in_s.read_bigendian_int64()) + + def run_read_write_var_int64(self, values): + out_s = self.OutputStream() + for v in values: + out_s.write_var_int64(v) + in_s = self.InputStream(out_s.get()) + for v in values: + self.assertEquals(v, in_s.read_var_int64()) + + def test_small_var_int64(self): + self.run_read_write_var_int64(range(-10, 30)) + + def test_medium_var_int64(self): + base = -1.7 + self.run_read_write_var_int64( + [int(base**pow) + for pow in range(1, int(63 * math.log(2) / math.log(-base)))]) + + def test_large_var_int64(self): + self.run_read_write_var_int64([0, 2**63 - 1, -2**63, 2**63 - 3]) + + def test_read_write_double(self): + values = 0, 1, -1, 1e100, 1.0/3, math.pi, float('inf') + out_s = self.OutputStream() + for v in values: + out_s.write_bigendian_double(v) + in_s = self.InputStream(out_s.get()) + for v in values: + self.assertEquals(v, in_s.read_bigendian_double()) + + def test_read_write_bigendian_int64(self): + values = 0, 1, -1, 2**63-1, -2**63, int(2**61 * math.pi) + out_s = self.OutputStream() + for v in values: + out_s.write_bigendian_int64(v) + in_s = self.InputStream(out_s.get()) + for v in values: + self.assertEquals(v, in_s.read_bigendian_int64()) + + def test_read_write_bigendian_int32(self): + values = 0, 1, -1, 2**31-1, -2**31, int(2**29 * math.pi) + out_s = self.OutputStream() + for v in values: + out_s.write_bigendian_int32(v) + in_s = self.InputStream(out_s.get()) + for v in values: + self.assertEquals(v, in_s.read_bigendian_int32()) + + def test_byte_counting(self): + bc_s = self.ByteCountingOutputStream() + self.assertEquals(0, bc_s.get_count()) + bc_s.write('def') + self.assertEquals(3, bc_s.get_count()) + bc_s.write('') + self.assertEquals(3, bc_s.get_count()) + bc_s.write_byte(10) + self.assertEquals(4, bc_s.get_count()) + # "nested" also writes the length of the string, which should + # cause 1 extra byte to be counted. + bc_s.write('2345', nested=True) + self.assertEquals(9, bc_s.get_count()) + bc_s.write_var_int64(63) + self.assertEquals(10, bc_s.get_count()) + bc_s.write_bigendian_int64(42) + self.assertEquals(18, bc_s.get_count()) + bc_s.write_bigendian_int32(36) + self.assertEquals(22, bc_s.get_count()) + bc_s.write_bigendian_double(6.25) + self.assertEquals(30, bc_s.get_count()) + + +try: + # pylint: disable=g-import-not-at-top + from google.cloud.dataflow.coders import stream + + class FastStreamTest(StreamTest): + """Runs the test with the compiled stream classes.""" + InputStream = stream.InputStream + OutputStream = stream.OutputStream + ByteCountingOutputStream = stream.ByteCountingOutputStream + + + class SlowFastStreamTest(StreamTest): + """Runs the test with compiled and uncompiled stream classes.""" + InputStream = stream.InputStream + OutputStream = slow_stream.OutputStream + ByteCountingOutputStream = slow_stream.ByteCountingOutputStream + + + class FastSlowStreamTest(StreamTest): + """Runs the test with uncompiled and compiled stream classes.""" + InputStream = slow_stream.InputStream + OutputStream = stream.OutputStream + ByteCountingOutputStream = stream.ByteCountingOutputStream + +except ImportError: + pass + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main()