Repository: incubator-beam Updated Branches: refs/heads/python-sdk cb1842c5b -> 887bef19d
Enables more linting rules. - Fixes some import related warnings and enabled related pylint rules. - Use pep8 for blank line related style check. pylint does not check for pep8 related to pep8. And pep8 is not configurable for indentations. So we need both tools. - Fixed existing lint error related to blank lines. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/57a4495d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/57a4495d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/57a4495d Branch: refs/heads/python-sdk Commit: 57a4495d996acdd4eb69f7219993c9c5d992b2b1 Parents: cb1842c Author: Ahmet Altay <al...@google.com> Authored: Mon Jun 27 17:59:35 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Tue Jun 28 18:23:10 2016 -0700 ---------------------------------------------------------------------- sdks/python/.pylintrc | 3 - sdks/python/apache_beam/__init__.py | 3 +- sdks/python/apache_beam/coders/coder_impl.py | 4 +- sdks/python/apache_beam/coders/coders.py | 4 +- .../apache_beam/coders/fast_coders_test.py | 2 +- .../apache_beam/coders/slow_coders_test.py | 2 +- sdks/python/apache_beam/coders/stream_test.py | 4 +- .../complete/juliaset/juliaset/juliaset.py | 9 ++- .../examples/cookbook/bigquery_schema.py | 2 +- .../apache_beam/examples/snippets/snippets.py | 19 ++++--- .../examples/snippets/snippets_test.py | 58 +++++++++++++------- sdks/python/apache_beam/internal/apiclient.py | 5 +- .../apache_beam/internal/json_value_test.py | 1 + sdks/python/apache_beam/internal/pickler.py | 2 +- sdks/python/apache_beam/internal/util.py | 1 + sdks/python/apache_beam/io/avroio.py | 3 +- sdks/python/apache_beam/io/bigquery.py | 8 +-- sdks/python/apache_beam/io/bigquery_test.py | 6 +- sdks/python/apache_beam/io/fileio.py | 16 +++--- sdks/python/apache_beam/io/gcsio.py | 7 ++- sdks/python/apache_beam/io/gcsio_test.py | 5 +- sdks/python/apache_beam/pvalue.py | 8 +-- sdks/python/apache_beam/runners/common.py | 6 ++ .../apache_beam/runners/dataflow_runner.py | 5 +- .../python/apache_beam/runners/direct_runner.py | 2 +- sdks/python/apache_beam/runners/runner.py | 6 +- .../python/apache_beam/transforms/aggregator.py | 2 +- sdks/python/apache_beam/transforms/combiners.py | 10 ++-- .../apache_beam/transforms/combiners_test.py | 4 ++ sdks/python/apache_beam/transforms/core.py | 21 +++---- .../apache_beam/transforms/cy_combiners.py | 53 ++++++++++++++++++ .../python/apache_beam/transforms/ptransform.py | 13 +++-- .../apache_beam/transforms/ptransform_test.py | 10 ++++ .../python/apache_beam/transforms/sideinputs.py | 9 ++- sdks/python/apache_beam/transforms/trigger.py | 1 + .../apache_beam/transforms/trigger_test.py | 5 +- sdks/python/apache_beam/transforms/util.py | 1 + .../apache_beam/transforms/window_test.py | 1 + .../apache_beam/typehints/typehints_test.py | 3 + .../python/apache_beam/utils/dependency_test.py | 1 + sdks/python/apache_beam/utils/retry_test.py | 4 +- sdks/python/run_pylint.sh | 7 ++- sdks/python/tox.ini | 12 +++- 43 files changed, 234 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/.pylintrc ---------------------------------------------------------------------- diff --git a/sdks/python/.pylintrc b/sdks/python/.pylintrc index edf13ee..efe8147 100644 --- a/sdks/python/.pylintrc +++ b/sdks/python/.pylintrc @@ -83,7 +83,6 @@ disable = arguments-differ, attribute-defined-outside-init, bad-builtin, - bad-option-value, bad-super-call, broad-except, consider-using-enumerate, @@ -131,8 +130,6 @@ disable = unused-wildcard-import, used-before-assignment, wildcard-import, - wrong-import-order, - wrong-import-position, [REPORTS] http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/__init__.py b/sdks/python/apache_beam/__init__.py index c9bc736..eed251b 100644 --- a/sdks/python/apache_beam/__init__.py +++ b/sdks/python/apache_beam/__init__.py @@ -71,7 +71,7 @@ if sys.version_info.major != 2: 'Dataflow SDK for Python is supported only on Python 2.7. ' 'It is not supported on Python [%s].' % sys.version) - +# pylint: disable=wrong-import-position import apache_beam.internal.pickler from apache_beam import coders @@ -79,3 +79,4 @@ from apache_beam import io from apache_beam import typehints from apache_beam.pipeline import Pipeline from apache_beam.transforms import * +# pylint: enable=wrong-import-position http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/coders/coder_impl.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index 07b6711..a623f2b 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -29,7 +29,7 @@ import collections from cPickle import loads, dumps -# pylint: disable=g-import-not-at-top +# pylint: disable=wrong-import-order, wrong-import-position try: # Don't depend on the full dataflow sdk to test coders. from apache_beam.transforms.window import WindowedValue @@ -43,7 +43,7 @@ try: except ImportError: from slow_stream import InputStream as create_InputStream from slow_stream import OutputStream as create_OutputStream -# pylint: enable=g-import-not-at-top +# pylint: enable=wrong-import-order, wrong-import-position class CoderImpl(object): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/coders/coders.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 6d5b10a..619586f 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -24,7 +24,7 @@ import cPickle as pickle from apache_beam.coders import coder_impl -# pylint: disable=g-import-not-at-top +# pylint: disable=wrong-import-order, wrong-import-position # Avoid dependencies on the full SDK. try: # Import dill from the pickler module to make sure our monkey-patching of dill @@ -46,7 +46,7 @@ def serialize_coder(coder): def deserialize_coder(serialized): from apache_beam.internal import pickler return pickler.loads(serialized.split('$', 1)[1]) -# pylint: enable=g-import-not-at-top +# pylint: enable=wrong-import-order, wrong-import-position class Coder(object): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/coders/fast_coders_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/fast_coders_test.py b/sdks/python/apache_beam/coders/fast_coders_test.py index 466fd57..55cf16c 100644 --- a/sdks/python/apache_beam/coders/fast_coders_test.py +++ b/sdks/python/apache_beam/coders/fast_coders_test.py @@ -28,7 +28,7 @@ from apache_beam.coders.coders_test_common import * class FastCoders(unittest.TestCase): def test_using_fast_impl(self): - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order, wrong-import-position # pylint: disable=unused-variable import apache_beam.coders.stream http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/coders/slow_coders_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/slow_coders_test.py b/sdks/python/apache_beam/coders/slow_coders_test.py index 26e6fa7..62149a3 100644 --- a/sdks/python/apache_beam/coders/slow_coders_test.py +++ b/sdks/python/apache_beam/coders/slow_coders_test.py @@ -30,7 +30,7 @@ class SlowCoders(unittest.TestCase): def test_using_slow_impl(self): # Assert that we are not using the compiled implementation. with self.assertRaises(ImportError): - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order, wrong-import-position # pylint: disable=unused-variable import apache_beam.coders.stream http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/coders/stream_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/stream_test.py b/sdks/python/apache_beam/coders/stream_test.py index f2163ca..cfd627f 100644 --- a/sdks/python/apache_beam/coders/stream_test.py +++ b/sdks/python/apache_beam/coders/stream_test.py @@ -139,7 +139,7 @@ class StreamTest(unittest.TestCase): try: - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-position from apache_beam.coders import stream class FastStreamTest(StreamTest): @@ -148,14 +148,12 @@ try: OutputStream = stream.OutputStream ByteCountingOutputStream = stream.ByteCountingOutputStream - class SlowFastStreamTest(StreamTest): """Runs the test with compiled and uncompiled stream classes.""" InputStream = stream.InputStream OutputStream = slow_stream.OutputStream ByteCountingOutputStream = slow_stream.ByteCountingOutputStream - class FastSlowStreamTest(StreamTest): """Runs the test with uncompiled and compiled stream classes.""" InputStream = slow_stream.InputStream http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py index 1625418..2bc37e6 100644 --- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py +++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py @@ -51,14 +51,15 @@ def generate_julia_set_colors(pipeline, c, n, max_iterations): julia_set_colors = (pipeline | beam.Create('add points', point_set(n)) - | beam.Map(get_julia_set_point_color, c, n, max_iterations)) + | beam.Map( + get_julia_set_point_color, c, n, max_iterations)) return julia_set_colors def generate_julia_set_visualization(data, n, max_iterations): """Generate the pixel matrix for rendering the julia set as an image.""" - import numpy as np # pylint: disable=g-import-not-at-top + import numpy as np # pylint: disable=wrong-import-order, wrong-import-position colors = [] for r in range(0, 256, 16): for g in range(0, 256, 16): @@ -74,7 +75,7 @@ def generate_julia_set_visualization(data, n, max_iterations): def save_julia_set_visualization(out_file, image_array): """Save the fractal image of our julia set as a png.""" - from matplotlib import pyplot as plt # pylint: disable=g-import-not-at-top + from matplotlib import pyplot as plt # pylint: disable=wrong-import-order, wrong-import-position plt.imsave(out_file, image_array, format='png') @@ -104,13 +105,11 @@ def run(argv=None): # pylint: disable=missing-docstring # Group each coordinate triplet by its x value, then write the coordinates to # the output file with an x-coordinate grouping per line. # pylint: disable=expression-not-assigned - # pylint: disable=g-long-lambda (coordinates | beam.Map('x coord key', lambda (x, y, i): (x, (x, y, i))) | beam.GroupByKey('x coord') | beam.Map( 'format', lambda (k, coords): ' '.join('(%s, %s, %s)' % coord for coord in coords)) | beam.io.Write('write', beam.io.TextFileSink(known_args.coordinate_output))) - # pylint: enable=g-long-lambda # pylint: enable=expression-not-assigned p.run() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py index 99a967c..7c420fb 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py @@ -44,7 +44,7 @@ def run(argv=None): p = beam.Pipeline(argv=pipeline_args) - from apache_beam.internal.clients import bigquery # pylint: disable=g-import-not-at-top + from apache_beam.internal.clients import bigquery # pylint: disable=wrong-import-order, wrong-import-position table_schema = bigquery.TableSchema() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/examples/snippets/snippets.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index e7af26a..f5bbc66 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -40,8 +40,7 @@ import apache_beam as beam # pylint:disable=expression-not-assigned # pylint:disable=redefined-outer-name # pylint:disable=unused-variable -# pylint:disable=g-doc-args -# pylint:disable=g-import-not-at-top +# pylint:disable=wrong-import-order, wrong-import-position class SnippetUtils(object): @@ -95,7 +94,7 @@ def construct_pipeline(renames): # [START pipelines_constructing_reading] lines = p | beam.io.Read('ReadMyFile', - beam.io.TextFileSource('gs://some/inputData.txt')) + beam.io.TextFileSource('gs://some/inputData.txt')) # [END pipelines_constructing_reading] # [START pipelines_constructing_applying] @@ -106,7 +105,8 @@ def construct_pipeline(renames): # [START pipelines_constructing_writing] filtered_words = reversed_words | beam.Filter('FilterWords', filter_words) filtered_words | beam.io.Write('WriteMyFile', - beam.io.TextFileSink('gs://some/outputData.txt')) + beam.io.TextFileSink( + 'gs://some/outputData.txt')) # [END pipelines_constructing_writing] p.visit(SnippetUtils.RenameFiles(renames)) @@ -304,7 +304,8 @@ def pipeline_options_command_line(argv): # Create the Pipeline with remaining arguments. p = beam.Pipeline(argv=pipeline_args) - lines = p | beam.io.Read('ReadFromText', beam.io.TextFileSource(known_args.input)) + lines = p | beam.io.Read('ReadFromText', + beam.io.TextFileSource(known_args.input)) lines | beam.io.Write('WriteToText', beam.io.TextFileSink(known_args.output)) # [END pipeline_options_command_line] @@ -594,7 +595,8 @@ def examples_wordcount_debugging(renames): | beam.ParDo('FilterText', FilterTextFn('Flourish|stomach'))) # [START example_wordcount_debugging_assert] - beam.assert_that(filtered_words, beam.equal_to([('Flourish', 3), ('stomach', 1)])) + beam.assert_that( + filtered_words, beam.equal_to([('Flourish', 3), ('stomach', 1)])) # [END example_wordcount_debugging_assert] output = (filtered_words @@ -634,7 +636,7 @@ def model_textio(renames): # [START model_pipelineio_write] filtered_words | beam.io.Write( 'WriteToText', beam.io.TextFileSink('gs://my_bucket/path/to/numbers', - file_name_suffix='.csv')) + file_name_suffix='.csv')) # [END model_pipelineio_write] # [END model_textio_write] @@ -762,6 +764,7 @@ def model_multiple_pcollections_partition(contents, output_path): URL: https://cloud.google.com/dataflow/model/multiple-pcollections """ some_hash_fn = lambda s: ord(s[0]) + def get_percentile(i): """Assume i in [0,100).""" return i @@ -770,6 +773,7 @@ def model_multiple_pcollections_partition(contents, output_path): p = beam.Pipeline(options=PipelineOptions()) students = p | beam.Create(contents) + # [START model_multiple_pcollections_partition] def partition_fn(student, num_partitions): return int(get_percentile(student) * num_partitions / 100) @@ -872,4 +876,3 @@ class Count(beam.PTransform): | beam.Map('Init', lambda v: (v, 1)) | beam.CombinePerKey(sum)) # [END model_library_transforms_count] -# pylint: enable=g-wrong-blank-lines http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/examples/snippets/snippets_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index f2f9552..87ce266 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -43,6 +43,7 @@ class ParDoTest(unittest.TestCase): # the text of the doc. words = ['aa', 'bbb', 'c'] + # [START model_pardo_pardo] class ComputeWordLengthFn(beam.DoFn): def process(self, context): @@ -57,6 +58,7 @@ class ParDoTest(unittest.TestCase): def test_pardo_yield(self): words = ['aa', 'bbb', 'c'] + # [START model_pardo_yield] class ComputeWordLengthFn(beam.DoFn): def process(self, context): @@ -84,11 +86,12 @@ class ParDoTest(unittest.TestCase): def test_pardo_using_flatmap_yield(self): words = ['aA', 'bbb', 'C'] + # [START model_pardo_using_flatmap_yield] def capitals(word): for letter in word: if 'A' <= letter <= 'Z': - yield letter + yield letter all_capitals = words | beam.FlatMap(capitals) # [END model_pardo_using_flatmap_yield] @@ -113,27 +116,31 @@ class ParDoTest(unittest.TestCase): yield word # Construct a deferred side input. - avg_word_len = words | beam.Map(len) | beam.CombineGlobally(beam.combiners.MeanCombineFn()) + avg_word_len = (words + | beam.Map(len) + | beam.CombineGlobally(beam.combiners.MeanCombineFn())) # Call with explicit side inputs. small_words = words | beam.FlatMap('small', filter_using_length, 0, 3) # A single deferred side input. - larger_than_average = words | beam.FlatMap('large', - filter_using_length, - lower_bound=pvalue.AsSingleton(avg_word_len)) + larger_than_average = (words + | beam.FlatMap('large', filter_using_length, + lower_bound=pvalue.AsSingleton( + avg_word_len))) # Mix and match. small_but_nontrivial = words | beam.FlatMap(filter_using_length, - lower_bound=2, - upper_bound=pvalue.AsSingleton(avg_word_len)) + lower_bound=2, + upper_bound=pvalue.AsSingleton( + avg_word_len)) # [END model_pardo_side_input] beam.assert_that(small_words, beam.equal_to(['a', 'bb', 'ccc'])) beam.assert_that(larger_than_average, beam.equal_to(['ccc', 'dddd']), - label='larger_than_average') + label='larger_than_average') beam.assert_that(small_but_nontrivial, beam.equal_to(['bb']), - label='small_but_not_trivial') + label='small_but_not_trivial') p.run() def test_pardo_side_input_dofn(self): @@ -170,9 +177,8 @@ class ParDoTest(unittest.TestCase): # [START model_pardo_with_side_outputs] results = (words | beam.ParDo(ProcessWords(), cutoff_length=2, marker='x') - .with_outputs('above_cutoff_lengths', - 'marked strings', - main='below_cutoff_strings')) + .with_outputs('above_cutoff_lengths', 'marked strings', + main='below_cutoff_strings')) below = results.below_cutoff_strings above = results.above_cutoff_lengths marked = results['marked strings'] # indexing works as well @@ -183,10 +189,12 @@ class ParDoTest(unittest.TestCase): self.assertEqual({'xyz'}, set(marked)) # [START model_pardo_with_side_outputs_iter] - below, above, marked = (words | beam.ParDo(ProcessWords(), cutoff_length=2, marker='x') - .with_outputs('above_cutoff_lengths', - 'marked strings', - main='below_cutoff_strings')) + below, above, marked = (words + | beam.ParDo( + ProcessWords(), cutoff_length=2, marker='x') + .with_outputs('above_cutoff_lengths', + 'marked strings', + main='below_cutoff_strings')) # [END model_pardo_with_side_outputs_iter] self.assertEqual({'a', 'an'}, set(below)) @@ -195,6 +203,7 @@ class ParDoTest(unittest.TestCase): def test_pardo_with_undeclared_side_outputs(self): numbers = [1, 2, 3, 4, 5, 10, 20] + # [START model_pardo_with_side_outputs_undeclared] def even_odd(x): yield pvalue.SideOutputValue('odd' if x % 2 else 'even', x) @@ -258,6 +267,7 @@ class TypeHintsTest(unittest.TestCase): # Helps document the contract and checks it at pipeline construction time. # [START type_hints_transform] T = beam.typehints.TypeVariable('T') + @beam.typehints.with_input_types(T) @beam.typehints.with_output_types(beam.typehints.Tuple[int, T]) class MyTransform(beam.PTransform): @@ -316,7 +326,8 @@ class TypeHintsTest(unittest.TestCase): totals = ( lines | beam.Map(parse_player_and_score) - | beam.CombinePerKey(sum).with_input_types(beam.typehints.Tuple[Player, int])) + | beam.CombinePerKey(sum).with_input_types( + beam.typehints.Tuple[Player, int])) # [END type_hints_deterministic_key] self.assertEquals( @@ -491,12 +502,15 @@ class CombineTest(unittest.TestCase): ('cat', 1), ('cat', 5), ('cat', 9), ('cat', 1), ('dog', 5), ('dog', 2)] # [START combine_per_key] - avg_accuracy_per_player = player_accuracies | beam.CombinePerKey(beam.combiners.MeanCombineFn()) + avg_accuracy_per_player = (player_accuracies + | beam.CombinePerKey( + beam.combiners.MeanCombineFn())) # [END combine_per_key] self.assertEqual({('cat', 4.0), ('dog', 3.5)}, set(avg_accuracy_per_player)) def test_combine_concat(self): pc = ['a', 'b'] + # [START combine_concat] def concat(values, separator=', '): return separator.join(values) @@ -511,6 +525,7 @@ class CombineTest(unittest.TestCase): def test_bounded_sum(self): # [START combine_bounded_sum] pc = [1, 10, 100, 1000] + def bounded_sum(values, bound=500): return min(sum(values), bound) small_sum = pc | beam.CombineGlobally(bounded_sum) # [500] @@ -524,23 +539,26 @@ class CombineTest(unittest.TestCase): # [START combine_reduce] import functools import operator - product = factors | beam.CombineGlobally(functools.partial(reduce, operator.mul), 1) + product = factors | beam.CombineGlobally( + functools.partial(reduce, operator.mul), 1) # [END combine_reduce] self.assertEqual([210], product) def test_custom_average(self): pc = [2, 3, 5, 7] - # [START combine_custom_average] class AverageFn(beam.CombineFn): def create_accumulator(self): return (0.0, 0) + def add_input(self, (sum, count), input): return sum + input, count + 1 + def merge_accumulators(self, accumulators): sums, counts = zip(*accumulators) return sum(sums), sum(counts) + def extract_output(self, (sum, count)): return sum / count if count else float('NaN') average = pc | beam.CombineGlobally(AverageFn()) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/internal/apiclient.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py index a363511..7dfb035 100644 --- a/sdks/python/apache_beam/internal/apiclient.py +++ b/sdks/python/apache_beam/internal/apiclient.py @@ -25,6 +25,9 @@ import re import time +from apitools.base.py import encoding +from apitools.base.py import exceptions + from apache_beam import utils from apache_beam import version from apache_beam.internal import pickler @@ -40,8 +43,6 @@ from apache_beam.utils.options import GoogleCloudOptions from apache_beam.utils.options import StandardOptions from apache_beam.utils.options import WorkerOptions -from apitools.base.py import encoding -from apitools.base.py import exceptions from apache_beam.internal.clients import storage import apache_beam.internal.clients.dataflow as dataflow http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/internal/json_value_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/internal/json_value_test.py b/sdks/python/apache_beam/internal/json_value_test.py index 9b26ab2..f2ae0c1 100644 --- a/sdks/python/apache_beam/internal/json_value_test.py +++ b/sdks/python/apache_beam/internal/json_value_test.py @@ -24,6 +24,7 @@ from apitools.base.py.extra_types import JsonObject from apache_beam.internal.json_value import from_json_value from apache_beam.internal.json_value import to_json_value + class JsonValueTest(unittest.TestCase): def test_string_to(self): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/internal/pickler.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index edf63ad..898e04b 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -159,13 +159,13 @@ if 'save_module' in dir(dill.dill): return old_save_module_dict(pickler, obj) dill.dill.save_module_dict = new_save_module_dict - def _nest_dill_logging(): """Prefix all dill logging with its depth in the callstack. Useful for debugging pickling of deeply nested structures. """ old_log_info = dill.dill.log.info + def new_log_info(msg, *args, **kwargs): old_log_info( ('1 2 3 4 5 6 7 8 9 0 ' * 10)[:len(traceback.extract_stack())] + msg, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/internal/util.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/internal/util.py b/sdks/python/apache_beam/internal/util.py index 50ea2f6..ad60ba6 100644 --- a/sdks/python/apache_beam/internal/util.py +++ b/sdks/python/apache_beam/internal/util.py @@ -58,6 +58,7 @@ def remove_objects_from_args(args, kwargs, pvalue_classes): a placeholder value. """ pvals = [] + def swapper(value): pvals.append(value) return ArgumentPlaceholder() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/io/avroio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py index 022a68d..25412af 100644 --- a/sdks/python/apache_beam/io/avroio.py +++ b/sdks/python/apache_beam/io/avroio.py @@ -21,11 +21,12 @@ import os import StringIO import zlib -from apache_beam.io import filebasedsource from avro import datafile from avro import io as avro_io from avro import schema +from apache_beam.io import filebasedsource + class AvroSource(filebasedsource.FileBasedSource): """A source for reading Avro files. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/io/bigquery.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py index 9d33134..f2c56dc 100644 --- a/sdks/python/apache_beam/io/bigquery.py +++ b/sdks/python/apache_beam/io/bigquery.py @@ -110,6 +110,8 @@ import re import time import uuid +from apitools.base.py.exceptions import HttpError + from apache_beam import coders from apache_beam.internal import auth from apache_beam.internal.json_value import from_json_value @@ -118,15 +120,13 @@ from apache_beam.io import iobase from apache_beam.utils import retry from apache_beam.utils.options import GoogleCloudOptions -from apitools.base.py.exceptions import HttpError - # Protect against environments where bigquery library is not available. -# pylint: disable=g-import-not-at-top +# pylint: disable=wrong-import-order, wrong-import-position try: from apache_beam.internal.clients import bigquery except ImportError: pass -# pylint: enable=g-import-not-at-top +# pylint: enable=wrong-import-order, wrong-import-position __all__ = [ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/io/bigquery_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/bigquery_test.py b/sdks/python/apache_beam/io/bigquery_test.py index 7c31526..2bca0dc 100644 --- a/sdks/python/apache_beam/io/bigquery_test.py +++ b/sdks/python/apache_beam/io/bigquery_test.py @@ -22,16 +22,16 @@ import logging import time import unittest +from apitools.base.py.exceptions import HttpError import mock + import apache_beam as beam +from apache_beam.internal.clients import bigquery from apache_beam.internal.json_value import to_json_value from apache_beam.io.bigquery import RowAsDictJsonCoder from apache_beam.io.bigquery import TableRowJsonCoder from apache_beam.utils.options import PipelineOptions -from apitools.base.py.exceptions import HttpError -from apache_beam.internal.clients import bigquery - class TestRowAsDictJsonCoder(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 560d935..115bc0e 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -172,7 +172,7 @@ class ChannelFactory(object): @staticmethod def open(path, mode, mime_type): if path.startswith('gs://'): - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.io import gcsio return gcsio.GcsIO().open(path, mode, mime_type=mime_type) else: @@ -182,7 +182,7 @@ class ChannelFactory(object): def rename(src, dst): if src.startswith('gs://'): assert dst.startswith('gs://'), dst - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.io import gcsio gcsio.GcsIO().rename(src, dst) else: @@ -197,7 +197,7 @@ class ChannelFactory(object): assert dst.startswith('gs://'), dst assert src.endswith('/'), src assert dst.endswith('/'), dst - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.io import gcsio gcsio.GcsIO().copytree(src, dst) else: @@ -211,7 +211,7 @@ class ChannelFactory(object): @staticmethod def exists(path): if path.startswith('gs://'): - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.io import gcsio return gcsio.GcsIO().exists() else: @@ -220,7 +220,7 @@ class ChannelFactory(object): @staticmethod def rmdir(path): if path.startswith('gs://'): - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.io import gcsio gcs = gcsio.GcsIO() if not path.endswith('/'): @@ -237,7 +237,7 @@ class ChannelFactory(object): @staticmethod def rm(path): if path.startswith('gs://'): - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.io import gcsio gcsio.GcsIO().delete(path) else: @@ -249,7 +249,7 @@ class ChannelFactory(object): @staticmethod def glob(path): if path.startswith('gs://'): - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.io import gcsio return gcsio.GcsIO().glob(path) else: @@ -608,7 +608,7 @@ class TextFileReader(iobase.NativeSourceReader): def __enter__(self): if self.source.is_gcs_source: - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.io import gcsio self._file = gcsio.GcsIO().open(self.source.file_path, 'rb') else: http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/io/gcsio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py index 3d7e00f..a01988b 100644 --- a/sdks/python/apache_beam/io/gcsio.py +++ b/sdks/python/apache_beam/io/gcsio.py @@ -30,16 +30,17 @@ import re import StringIO import threading +from apitools.base.py.exceptions import HttpError +import apitools.base.py.transfer as transfer + from apache_beam.internal import auth from apache_beam.utils import retry -from apitools.base.py.exceptions import HttpError -import apitools.base.py.transfer as transfer # Issue a friendlier error message if the storage library is not available. # TODO(silviuc): Remove this guard when storage is available everywhere. try: - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.internal.clients import storage except ImportError: raise RuntimeError( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/io/gcsio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcsio_test.py b/sdks/python/apache_beam/io/gcsio_test.py index 138f2a8..eeabb1a 100644 --- a/sdks/python/apache_beam/io/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcsio_test.py @@ -24,13 +24,12 @@ import random import threading import unittest - import httplib2 - -from apache_beam.io import gcsio from apitools.base.py.exceptions import HttpError from apache_beam.internal.clients import storage +from apache_beam.io import gcsio + class FakeGcsClient(object): # Fake storage client. Usage in gcsio.py is client.objects.Get(...) and http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/pvalue.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py index 6354139..78ff209 100644 --- a/sdks/python/apache_beam/pvalue.py +++ b/sdks/python/apache_beam/pvalue.py @@ -348,7 +348,7 @@ def AsSingleton(pcoll, default_value=_SINGLETON_NO_DEFAULT, label=None): # pyli # Local import is required due to dependency loop; even though the # implementation of this function requires concepts defined in modules that # depend on pvalue, it lives in this module to reduce user workload. - from apache_beam.transforms import sideinputs # pylint: disable=g-import-not-at-top + from apache_beam.transforms import sideinputs # pylint: disable=wrong-import-order, wrong-import-position view = (pcoll | sideinputs.ViewAsSingleton(has_default, default_value, label=label)) _cache_view(pcoll.pipeline, cache_key, view) @@ -380,7 +380,7 @@ def AsIter(pcoll, label=None): # pylint: disable=invalid-name # Local import is required due to dependency loop; even though the # implementation of this function requires concepts defined in modules that # depend on pvalue, it lives in this module to reduce user workload. - from apache_beam.transforms import sideinputs # pylint: disable=g-import-not-at-top + from apache_beam.transforms import sideinputs # pylint: disable=wrong-import-order, wrong-import-position view = (pcoll | sideinputs.ViewAsIterable(label=label)) _cache_view(pcoll.pipeline, cache_key, view) return view @@ -411,7 +411,7 @@ def AsList(pcoll, label=None): # pylint: disable=invalid-name # Local import is required due to dependency loop; even though the # implementation of this function requires concepts defined in modules that # depend on pvalue, it lives in this module to reduce user workload. - from apache_beam.transforms import sideinputs # pylint: disable=g-import-not-at-top + from apache_beam.transforms import sideinputs # pylint: disable=wrong-import-order, wrong-import-position view = (pcoll | sideinputs.ViewAsList(label=label)) _cache_view(pcoll.pipeline, cache_key, view) return view @@ -443,7 +443,7 @@ def AsDict(pcoll, label=None): # pylint: disable=invalid-name # Local import is required due to dependency loop; even though the # implementation of this function requires concepts defined in modules that # depend on pvalue, it lives in this module to reduce user workload. - from apache_beam.transforms import sideinputs # pylint: disable=g-import-not-at-top + from apache_beam.transforms import sideinputs # pylint: disable=wrong-import-order, wrong-import-position view = (pcoll | sideinputs.ViewAsDict(label=label)) _cache_view(pcoll.pipeline, cache_key, view) return view http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/runners/common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 0e4a057..e33d4ce 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -32,8 +32,10 @@ from apache_beam.transforms.window import WindowFn class FakeLogger(object): def PerThreadLoggingContext(self, *unused_args, **unused_kwargs): return self + def __enter__(self): pass + def __exit__(self, *unused_args): pass @@ -153,18 +155,22 @@ class DoFnRunner(object): else: self.tagged_receivers[tag].output(windowed_value) + class NoContext(WindowFn.AssignContext): """An uninspectable WindowFn.AssignContext.""" NO_VALUE = object() + def __init__(self, value, timestamp=NO_VALUE): self.value = value self._timestamp = timestamp + @property def timestamp(self): if self._timestamp is self.NO_VALUE: raise ValueError('No timestamp in this context.') else: return self._timestamp + @property def existing_windows(self): raise ValueError('No existing_windows in this context.') http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/runners/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py index 7b04ae9..43a50ba 100644 --- a/sdks/python/apache_beam/runners/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow_runner.py @@ -86,6 +86,7 @@ class DataflowPipelineRunner(PipelineRunner): # It typically takes about 30 seconds. final_countdown_timer_secs = 50.0 sleep_secs = 5.0 + # Try to prioritize the user-level traceback, if any. def rank_error(msg): if 'work item was attempted' in msg: @@ -151,7 +152,7 @@ class DataflowPipelineRunner(PipelineRunner): def run(self, pipeline): """Remotely executes entire pipeline or parts reachable from node.""" # Import here to avoid adding the dependency for local running scenarios. - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.internal import apiclient self.job = apiclient.Job(pipeline.options) # The superclass's run will trigger a traversal of all reachable nodes. @@ -244,7 +245,7 @@ class DataflowPipelineRunner(PipelineRunner): def _add_step(self, step_kind, step_label, transform_node, side_tags=()): """Creates a Step object and adds it to the cache.""" # Import here to avoid adding the dependency for local running scenarios. - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.internal import apiclient step = apiclient.Step(step_kind, self._get_unique_step_name()) self.job.proto.steps.append(step.proto) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/runners/direct_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct_runner.py b/sdks/python/apache_beam/runners/direct_runner.py index 2c73394..e0df439 100644 --- a/sdks/python/apache_beam/runners/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct_runner.py @@ -112,7 +112,7 @@ class DirectPipelineRunner(PipelineRunner): values = self._cache.get_pvalue(transform_node.inputs[0]) if isinstance(view, SingletonPCollectionView): has_default, default_value = view._view_options() # pylint: disable=protected-access - if len(values) == 0: # pylint: disable=g-explicit-length-test + if len(values) == 0: if has_default: result = default_value else: http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/runners/runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py index 9b8d622..55b63f3 100644 --- a/sdks/python/apache_beam/runners/runner.py +++ b/sdks/python/apache_beam/runners/runner.py @@ -40,7 +40,7 @@ def create_runner(runner_name): Raises: RuntimeError: if an invalid runner name is used. """ - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order, wrong-import-position if runner_name == 'DirectPipelineRunner': import apache_beam.runners.direct_runner return apache_beam.runners.direct_runner.DirectPipelineRunner() @@ -81,7 +81,7 @@ class PipelineRunner(object): """Execute the entire pipeline or the sub-DAG reachable from a node.""" # Imported here to avoid circular dependencies. - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.pipeline import PipelineVisitor class RunVisitor(PipelineVisitor): @@ -117,7 +117,7 @@ class PipelineRunner(object): """ # Imported here to avoid circular dependencies. - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.pipeline import PipelineVisitor class ClearVisitor(PipelineVisitor): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/transforms/aggregator.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/aggregator.py b/sdks/python/apache_beam/transforms/aggregator.py index b7167ff..a5e83cb 100644 --- a/sdks/python/apache_beam/transforms/aggregator.py +++ b/sdks/python/apache_beam/transforms/aggregator.py @@ -103,6 +103,6 @@ class Aggregator(object): def _is_supported_kind(combine_fn): - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.internal.apiclient import metric_translations return combine_fn.__class__ in metric_translations http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/transforms/combiners.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py index 64ede3b..e9f11a0 100644 --- a/sdks/python/apache_beam/transforms/combiners.py +++ b/sdks/python/apache_beam/transforms/combiners.py @@ -46,6 +46,11 @@ __all__ = [ 'ToList', ] +# Type variables +T = TypeVariable('T') +K = TypeVariable('K') +V = TypeVariable('V') + class Mean(object): """Combiners for computing arithmetic means of elements.""" @@ -214,7 +219,6 @@ class Top(object): return pcoll | Top.PerKey(label, n, lambda a, b: b < a) -T = TypeVariable('T') @with_input_types(T) @with_output_types(List[T]) class TopCombineFn(core.CombineFn): @@ -329,7 +333,6 @@ class Sample(object): return pcoll | core.CombinePerKey(label, SampleCombineFn(n)) -T = TypeVariable('T') @with_input_types(T) @with_output_types(List[T]) class SampleCombineFn(core.CombineFn): @@ -404,7 +407,6 @@ class ToList(ptransform.PTransform): return pcoll | core.CombineGlobally(self.label, ToListCombineFn()) -T = TypeVariable('T') @with_input_types(T) @with_output_types(List[T]) class ToListCombineFn(core.CombineFn): @@ -439,8 +441,6 @@ class ToDict(ptransform.PTransform): return pcoll | core.CombineGlobally(self.label, ToDictCombineFn()) -K = TypeVariable('K') -V = TypeVariable('V') @with_input_types(Tuple[K, V]) @with_output_types(Dict[K, V]) class ToDictCombineFn(core.CombineFn): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/transforms/combiners_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py index 112c591..10682b4 100644 --- a/sdks/python/apache_beam/transforms/combiners_test.py +++ b/sdks/python/apache_beam/transforms/combiners_test.py @@ -122,6 +122,7 @@ class CombineTest(unittest.TestCase): pipeline = Pipeline('DirectPipelineRunner') pcoll = pipeline | Create('start', [1, 1, 2, 2]) result = pcoll | combine.Sample.FixedSizeGlobally('sample-%d' % ix, 3) + def matcher(): def match(actual): # There is always exactly one result. @@ -142,6 +143,7 @@ class CombineTest(unittest.TestCase): 'start-perkey', sum(([(i, 1), (i, 1), (i, 2), (i, 2)] for i in xrange(300)), [])) result = pcoll | combine.Sample.FixedSizePerKey('sample', 3) + def matcher(): def match(actual): for _, samples in actual: @@ -180,6 +182,7 @@ class CombineTest(unittest.TestCase): the_list = [6, 3, 1, 1, 9, 1, 5, 2, 0, 6] pcoll = pipeline | Create('start', the_list) result = pcoll | combine.ToList('to list') + def matcher(expected): def match(actual): equal_to(expected[0])(actual[0]) @@ -191,6 +194,7 @@ class CombineTest(unittest.TestCase): pairs = [(1, 2), (3, 4), (5, 6)] pcoll = pipeline | Create('start-pairs', pairs) result = pcoll | combine.ToDict('to dict') + def matcher(): def match(actual): equal_to([1])([len(actual)]) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 445367f..8f4c246 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -49,6 +49,11 @@ from apache_beam.typehints import WithTypeHints from apache_beam.typehints.trivial_inference import element_type from apache_beam.utils.options import TypeOptions +# Type variables +T = typehints.TypeVariable('T') +K = typehints.TypeVariable('K') +V = typehints.TypeVariable('V') + class DoFnProcessContext(object): """A processing context passed to DoFn methods during execution. @@ -842,6 +847,7 @@ class CombineGlobally(PTransform): "an empty PCollection if the input PCollection is empty, " "or CombineGlobally().as_singleton_view() to get the default " "output of the CombineFn if the input PCollection is empty.") + def typed(transform): # TODO(robertwb): We should infer this. if combined.element_type: @@ -954,8 +960,6 @@ class CombineValuesDoFn(DoFn): return hints -K = typehints.TypeVariable('K') -V = typehints.TypeVariable('V') @typehints.with_input_types(typehints.KV[K, V]) @typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]]) class GroupByKey(PTransform): @@ -999,10 +1003,10 @@ class GroupByKey(PTransform): def process(self, context): k, vs = context.element - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.transforms.trigger import InMemoryUnmergedState from apache_beam.transforms.trigger import create_trigger_driver - # pylint: enable=g-import-not-at-top + # pylint: enable=wrong-import-order, wrong-import-position driver = create_trigger_driver(self.windowing, True) state = InMemoryUnmergedState() # TODO(robertwb): Conditionally process in smaller chunks. @@ -1051,8 +1055,6 @@ class GroupByKey(PTransform): self.GroupAlsoByWindow(pcoll.windowing))) -K = typehints.TypeVariable('K') -V = typehints.TypeVariable('V') @typehints.with_input_types(typehints.KV[K, V]) @typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]]) class GroupByKeyOnly(PTransform): @@ -1115,9 +1117,9 @@ class Windowing(object): def __init__(self, windowfn, triggerfn=None, accumulation_mode=None, output_time_fn=None): global AccumulationMode, DefaultTrigger - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.transforms.trigger import AccumulationMode, DefaultTrigger - # pylint: enable=g-import-not-at-top + # pylint: enable=wrong-import-order, wrong-import-position if triggerfn is None: triggerfn = DefaultTrigger() if accumulation_mode is None: @@ -1145,10 +1147,9 @@ class Windowing(object): return self._is_default -T = typehints.TypeVariable('T') @typehints.with_input_types(T) @typehints.with_output_types(T) -class WindowInto(ParDo): # pylint: disable=g-wrong-blank-lines +class WindowInto(ParDo): """A window transform assigning windows to each element of a PCollection. Transforms an input PCollection by applying a windowing function to each http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/transforms/cy_combiners.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/cy_combiners.py b/sdks/python/apache_beam/transforms/cy_combiners.py index be40d66..f824870 100644 --- a/sdks/python/apache_beam/transforms/cy_combiners.py +++ b/sdks/python/apache_beam/transforms/cy_combiners.py @@ -26,20 +26,25 @@ class AccumulatorCombineFn(core.CombineFn): # singleton? def create_accumulator(self): return self._accumulator_type() + @staticmethod def add_input(accumulator, element): accumulator.add_input(element) return accumulator + def merge_accumulators(self, accumulators): accumulator = self._accumulator_type() accumulator.merge(accumulators) return accumulator + @staticmethod def extract_output(accumulator): return accumulator.extract_output() + def __eq__(self, other): return (isinstance(other, AccumulatorCombineFn) and self._accumulator_type is other._accumulator_type) + def __hash__(self): return hash(self._accumulator_type) @@ -52,11 +57,14 @@ globals()['INT64_MIN'] = -2**_63 class CountAccumulator(object): def __init__(self): self.value = 0 + def add_input(self, unused_element): self.value += 1 + def merge(self, accumulators): for accumulator in accumulators: self.value += accumulator.value + def extract_output(self): return self.value @@ -64,14 +72,17 @@ class CountAccumulator(object): class SumInt64Accumulator(object): def __init__(self): self.value = 0 + def add_input(self, element): element = int(element) if not INT64_MIN <= element <= INT64_MAX: raise OverflowError(element) self.value += element + def merge(self, accumulators): for accumulator in accumulators: self.value += accumulator.value + def extract_output(self): if not INT64_MIN <= self.value <= INT64_MAX: self.value %= 2**64 @@ -83,16 +94,19 @@ class SumInt64Accumulator(object): class MinInt64Accumulator(object): def __init__(self): self.value = INT64_MAX + def add_input(self, element): element = int(element) if not INT64_MIN <= element <= INT64_MAX: raise OverflowError(element) if element < self.value: self.value = element + def merge(self, accumulators): for accumulator in accumulators: if accumulator.value < self.value: self.value = accumulator.value + def extract_output(self): return self.value @@ -100,16 +114,19 @@ class MinInt64Accumulator(object): class MaxInt64Accumulator(object): def __init__(self): self.value = INT64_MIN + def add_input(self, element): element = int(element) if not INT64_MIN <= element <= INT64_MAX: raise OverflowError(element) if element > self.value: self.value = element + def merge(self, accumulators): for accumulator in accumulators: if accumulator.value > self.value: self.value = accumulator.value + def extract_output(self): return self.value @@ -118,16 +135,19 @@ class MeanInt64Accumulator(object): def __init__(self): self.sum = 0 self.count = 0 + def add_input(self, element): element = int(element) if not INT64_MIN <= element <= INT64_MAX: raise OverflowError(element) self.sum += element self.count += 1 + def merge(self, accumulators): for accumulator in accumulators: self.sum += accumulator.sum self.count += accumulator.count + def extract_output(self): if not INT64_MIN <= self.sum <= INT64_MAX: self.sum %= 2**64 @@ -138,12 +158,20 @@ class MeanInt64Accumulator(object): class CountCombineFn(AccumulatorCombineFn): _accumulator_type = CountAccumulator + + class SumInt64Fn(AccumulatorCombineFn): _accumulator_type = SumInt64Accumulator + + class MinInt64Fn(AccumulatorCombineFn): _accumulator_type = MinInt64Accumulator + + class MaxInt64Fn(AccumulatorCombineFn): _accumulator_type = MaxInt64Accumulator + + class MeanInt64Fn(AccumulatorCombineFn): _accumulator_type = MeanInt64Accumulator @@ -156,12 +184,15 @@ _NAN = float('nan') class SumDoubleAccumulator(object): def __init__(self): self.value = 0 + def add_input(self, element): element = float(element) self.value += element + def merge(self, accumulators): for accumulator in accumulators: self.value += accumulator.value + def extract_output(self): return self.value @@ -169,14 +200,17 @@ class SumDoubleAccumulator(object): class MinDoubleAccumulator(object): def __init__(self): self.value = _POS_INF + def add_input(self, element): element = float(element) if element < self.value: self.value = element + def merge(self, accumulators): for accumulator in accumulators: if accumulator.value < self.value: self.value = accumulator.value + def extract_output(self): return self.value @@ -184,14 +218,17 @@ class MinDoubleAccumulator(object): class MaxDoubleAccumulator(object): def __init__(self): self.value = _NEG_INF + def add_input(self, element): element = float(element) if element > self.value: self.value = element + def merge(self, accumulators): for accumulator in accumulators: if accumulator.value > self.value: self.value = accumulator.value + def extract_output(self): return self.value @@ -200,24 +237,33 @@ class MeanDoubleAccumulator(object): def __init__(self): self.sum = 0 self.count = 0 + def add_input(self, element): element = float(element) self.sum += element self.count += 1 + def merge(self, accumulators): for accumulator in accumulators: self.sum += accumulator.sum self.count += accumulator.count + def extract_output(self): return self.sum / self.count if self.count else _NAN class SumFloatFn(AccumulatorCombineFn): _accumulator_type = SumDoubleAccumulator + + class MinFloatFn(AccumulatorCombineFn): _accumulator_type = MinDoubleAccumulator + + class MaxFloatFn(AccumulatorCombineFn): _accumulator_type = MaxDoubleAccumulator + + class MeanFloatFn(AccumulatorCombineFn): _accumulator_type = MeanDoubleAccumulator @@ -225,11 +271,14 @@ class MeanFloatFn(AccumulatorCombineFn): class AllAccumulator(object): def __init__(self): self.value = True + def add_input(self, element): self.value &= not not element + def merge(self, accumulators): for accumulator in accumulators: self.value &= accumulator.value + def extract_output(self): return self.value @@ -237,11 +286,14 @@ class AllAccumulator(object): class AnyAccumulator(object): def __init__(self): self.value = False + def add_input(self, element): self.value |= not not element + def merge(self, accumulators): for accumulator in accumulators: self.value |= accumulator.value + def extract_output(self): return self.value @@ -249,5 +301,6 @@ class AnyAccumulator(object): class AnyCombineFn(AccumulatorCombineFn): _accumulator_type = AnyAccumulator + class AllCombineFn(AccumulatorCombineFn): _accumulator_type = AllAccumulator http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/transforms/ptransform.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 5b089b9..106c44f 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -383,10 +383,10 @@ class PTransform(WithTypeHints): pipelines = [v.pipeline for v in pvalues if isinstance(v, pvalue.PValue)] if pvalues and not pipelines: deferred = False - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam import pipeline from apache_beam.utils.options import PipelineOptions - # pylint: enable=g-import-not-at-top + # pylint: enable=wrong-import-order, wrong-import-position p = pipeline.Pipeline( 'DirectPipelineRunner', PipelineOptions(sys.argv)) else: @@ -403,9 +403,9 @@ class PTransform(WithTypeHints): raise ValueError( 'Mixing value from different pipelines not allowed.') deferred = not getattr(p.runner, 'is_eager', False) - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.transforms.core import Create - # pylint: enable=g-import-not-at-top + # pylint: enable=wrong-import-order, wrong-import-position replacements = {id(v): p | Create('CreatePInput%s' % ix, v) for ix, v in enumerate(pvalues) if not isinstance(v, pvalue.PValue) and v is not None} @@ -431,9 +431,9 @@ class PTransform(WithTypeHints): Generally only needs to be overriden for multi-input PTransforms. """ - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order from apache_beam import pipeline - # pylint: enable=g-import-not-at-top + # pylint: enable=wrong-import-order if isinstance(pvalueish, pipeline.Pipeline): pvalueish = pvalue.PBegin(pvalueish) @@ -557,6 +557,7 @@ class PTransformWithSideInputs(PTransform): type_hints = self.get_type_hints().input_types if type_hints: args, kwargs = self.raw_side_inputs + def element_type(side_input): if isinstance(side_input, pvalue.PCollectionView): return side_input.element_type http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/transforms/ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 8ae7a37..d6ee18a 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -208,8 +208,10 @@ class PTransformTest(unittest.TestCase): class MyDoFn(beam.DoFn): def start_bundle(self, c): yield 'start' + def process(self, c): pass + def finish_bundle(self, c): yield 'finish' pipeline = Pipeline('DirectPipelineRunner') @@ -540,6 +542,7 @@ class PTransformTest(unittest.TestCase): def _extract_input_pvalues(self, pvalueish): pvalueish = list(pvalueish) return pvalueish, sum([list(p.values()) for p in pvalueish], []) + def apply(self, pcoll_dicts): keys = reduce(operator.or_, [set(p.keys()) for p in pcoll_dicts]) res = {} @@ -554,6 +557,7 @@ class PTransformTest(unittest.TestCase): self.assertEqual(['x', 'x', 'y', 'y', 'z'], sorted(res['b'])) self.assertEqual([], sorted(res['c'])) + @beam.ptransform_fn def SamplePTransform(label, pcoll, context, *args, **kwargs): """Sample transform using the @ptransform_fn decorator.""" @@ -1646,6 +1650,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): | combine.Sample.FixedSizeGlobally('sample', 3)) self.assertCompatible(typehints.Iterable[int], d.element_type) + def matcher(expected_len): def match(actual): equal_to([expected_len])([len(actual[0])]) @@ -1661,6 +1666,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): | combine.Sample.FixedSizeGlobally('sample', 2)) self.assertCompatible(typehints.Iterable[int], d.element_type) + def matcher(expected_len): def match(actual): equal_to([expected_len])([len(actual[0])]) @@ -1676,6 +1682,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.assertCompatible(typehints.KV[int, typehints.Iterable[int]], d.element_type) + def matcher(expected_len): def match(actual): for _, sample in actual: @@ -1694,6 +1701,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.assertCompatible(typehints.KV[int, typehints.Iterable[int]], d.element_type) + def matcher(expected_len): def match(actual): for _, sample in actual: @@ -1708,6 +1716,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): | combine.ToList('to list')) self.assertCompatible(typehints.List[int], d.element_type) + def matcher(expected): def match(actual): equal_to(expected)(actual[0]) @@ -1723,6 +1732,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): | combine.ToList('to list')) self.assertCompatible(typehints.List[str], d.element_type) + def matcher(expected): def match(actual): equal_to(expected)(actual[0]) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/transforms/sideinputs.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/sideinputs.py b/sdks/python/apache_beam/transforms/sideinputs.py index fc54fa3..6484a7c 100644 --- a/sdks/python/apache_beam/transforms/sideinputs.py +++ b/sdks/python/apache_beam/transforms/sideinputs.py @@ -28,6 +28,10 @@ from apache_beam import pvalue from apache_beam import typehints from apache_beam.transforms.ptransform import PTransform +# Type variables +K = typehints.TypeVariable('K') +V = typehints.TypeVariable('V') + class CreatePCollectionView(PTransform): """Transform to materialize a given PCollectionView in the pipeline. @@ -119,11 +123,10 @@ class ViewAsList(PTransform): .with_input_types(input_type) .with_output_types(output_type)) -K = typehints.TypeVariable('K') -V = typehints.TypeVariable('V') + @typehints.with_input_types(typehints.Tuple[K, V]) @typehints.with_output_types(typehints.Dict[K, V]) -class ViewAsDict(PTransform): # pylint: disable=g-wrong-blank-lines +class ViewAsDict(PTransform): """Transform to view PCollection as a dict PCollectionView. Important: this transform is an implementation detail and should not be used http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/transforms/trigger.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py index 1c75d3b..788277c 100644 --- a/sdks/python/apache_beam/transforms/trigger.py +++ b/sdks/python/apache_beam/transforms/trigger.py @@ -731,6 +731,7 @@ class DefaultGlobalBatchTriggerDriver(TriggerDriver): unwindowed_value = wv.value self.notify_observers(unwindowed_value) yield unwindowed_value + def __repr__(self): return '<UnwindowedValues of %s>' % windowed_values unwindowed = UnwindowedValues() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/transforms/trigger_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py index fee9b1b..2cdd1e3 100644 --- a/sdks/python/apache_beam/transforms/trigger_test.py +++ b/sdks/python/apache_beam/transforms/trigger_test.py @@ -61,6 +61,7 @@ class TriggerTest(unittest.TestCase): **kwargs): late_data = kwargs.pop('late_data', []) assert not kwargs + def bundle_data(data, size): bundle = [] for timestamp, elem in data: @@ -477,10 +478,10 @@ class TranscriptTest(unittest.TestCase): else: return fn - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.transforms import window as window_module from apache_beam.transforms import trigger as trigger_module - # pylint: enable=g-import-not-at-top + # pylint: enable=wrong-import-order, wrong-import-position window_fn_names = dict(window_module.__dict__) window_fn_names.update({'CustomTimestampingFixedWindowsWindowFn': CustomTimestampingFixedWindowsWindowFn}) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/transforms/util.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 52bba05..b8380a0 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -181,6 +181,7 @@ class DataflowAssertException(Exception): # TODO(silviuc): Add contains_in_any_order-style matchers. def equal_to(expected): expected = list(expected) + def _equal(actual): sorted_expected = sorted(expected) sorted_actual = sorted(actual) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/transforms/window_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py index 1d7f272..186fcd4 100644 --- a/sdks/python/apache_beam/transforms/window_test.py +++ b/sdks/python/apache_beam/transforms/window_test.py @@ -52,6 +52,7 @@ class ReifyWindowsFn(core.DoFn): yield "%s @ %s" % (key, window), values reify_windows = core.ParDo(ReifyWindowsFn()) + class WindowTest(unittest.TestCase): def test_fixed_windows(self): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/typehints/typehints_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/typehints/typehints_test.py b/sdks/python/apache_beam/typehints/typehints_test.py index e6bab61..aa04fe2 100644 --- a/sdks/python/apache_beam/typehints/typehints_test.py +++ b/sdks/python/apache_beam/typehints/typehints_test.py @@ -45,6 +45,7 @@ def check_or_interleave(hint, value, var): _check_instance_type(hint, value, var) return value + def check_type_hints(f): @functools.wraps(f) def wrapper(*args, **kwargs): @@ -770,6 +771,7 @@ class TakesDecoratorTestCase(TypeHintTestCase): def test_must_be_primitive_type_or_constraint(self): with self.assertRaises(TypeError) as e: t = [1, 2] + @with_input_types(a=t) def foo(a): pass @@ -781,6 +783,7 @@ class TakesDecoratorTestCase(TypeHintTestCase): with self.assertRaises(TypeError) as e: t = 5 + @check_type_hints @with_input_types(a=t) def foo(a): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/utils/dependency_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/dependency_test.py b/sdks/python/apache_beam/utils/dependency_test.py index 4edcaa7..8a97f4b 100644 --- a/sdks/python/apache_beam/utils/dependency_test.py +++ b/sdks/python/apache_beam/utils/dependency_test.py @@ -336,6 +336,7 @@ class SetupTest(unittest.TestCase): 'gs://my-gcs-bucket/gcs.tar.gz'] gcs_copied_files = [] + def file_copy(from_path, to_path): if from_path.startswith('gs://'): gcs_copied_files.append(from_path) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/utils/retry_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/retry_test.py b/sdks/python/apache_beam/utils/retry_test.py index 0a016b9..705c555 100644 --- a/sdks/python/apache_beam/utils/retry_test.py +++ b/sdks/python/apache_beam/utils/retry_test.py @@ -19,10 +19,10 @@ import unittest -from apache_beam.utils import retry - from apitools.base.py.exceptions import HttpError +from apache_beam.utils import retry + class FakeClock(object): """A fake clock object implementing sleep() and recording calls.""" http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/run_pylint.sh ---------------------------------------------------------------------- diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh index e8062c0..4e0b129 100755 --- a/sdks/python/run_pylint.sh +++ b/sdks/python/run_pylint.sh @@ -16,10 +16,11 @@ # limitations under the License. # -# This script will run pylint on files that changed compared to the current -# HEAD of the branch. +# This script will run pylint and pep8 on files that changed compared to the +# current HEAD of the branch. # # Use "pylint apache_beam" to run pylint all files. +# Use "pep8 apache_beam" to run pep8 all files. # # The exit-code of the script indicates success or a failure. @@ -43,6 +44,8 @@ if test "$CHANGED_FILES"; then echo "Running pylint on changed files:" echo "$CHANGED_FILES" pylint $CHANGED_FILES + echo "Running pep8 on changed files:" + pep8 $CHANGED_FILES else echo "Not running pylint. No eligible files." fi http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/tox.ini ---------------------------------------------------------------------- diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index 356de57..29674ed 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -18,8 +18,18 @@ [tox] envlist = py27 +[pep8] +# Disable all errors and warnings except for the ones related to blank lines. +# pylint does not check the number of blank lines. +select = E3 + +# Skip auto generated files (windmill_pb2.py, windmill_service_pb2.py) +exclude = windmill_pb2.py, windmill_service_pb2.py + [testenv:py27] -deps=pylint +deps= + pep8 + pylint commands = python setup.py test {toxinidir}/run_pylint.sh