Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk cb1842c5b -> 887bef19d


Enables more linting rules.

- Fixes some import related warnings and enabled related pylint rules.
- Use pep8 for blank line related style check. pylint does not check
  for pep8 related to pep8. And pep8 is not configurable for indentations.
  So we need both tools.
- Fixed existing lint error related to blank lines.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/57a4495d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/57a4495d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/57a4495d

Branch: refs/heads/python-sdk
Commit: 57a4495d996acdd4eb69f7219993c9c5d992b2b1
Parents: cb1842c
Author: Ahmet Altay <al...@google.com>
Authored: Mon Jun 27 17:59:35 2016 -0700
Committer: Dan Halperin <dhalp...@google.com>
Committed: Tue Jun 28 18:23:10 2016 -0700

----------------------------------------------------------------------
 sdks/python/.pylintrc                           |  3 -
 sdks/python/apache_beam/__init__.py             |  3 +-
 sdks/python/apache_beam/coders/coder_impl.py    |  4 +-
 sdks/python/apache_beam/coders/coders.py        |  4 +-
 .../apache_beam/coders/fast_coders_test.py      |  2 +-
 .../apache_beam/coders/slow_coders_test.py      |  2 +-
 sdks/python/apache_beam/coders/stream_test.py   |  4 +-
 .../complete/juliaset/juliaset/juliaset.py      |  9 ++-
 .../examples/cookbook/bigquery_schema.py        |  2 +-
 .../apache_beam/examples/snippets/snippets.py   | 19 ++++---
 .../examples/snippets/snippets_test.py          | 58 +++++++++++++-------
 sdks/python/apache_beam/internal/apiclient.py   |  5 +-
 .../apache_beam/internal/json_value_test.py     |  1 +
 sdks/python/apache_beam/internal/pickler.py     |  2 +-
 sdks/python/apache_beam/internal/util.py        |  1 +
 sdks/python/apache_beam/io/avroio.py            |  3 +-
 sdks/python/apache_beam/io/bigquery.py          |  8 +--
 sdks/python/apache_beam/io/bigquery_test.py     |  6 +-
 sdks/python/apache_beam/io/fileio.py            | 16 +++---
 sdks/python/apache_beam/io/gcsio.py             |  7 ++-
 sdks/python/apache_beam/io/gcsio_test.py        |  5 +-
 sdks/python/apache_beam/pvalue.py               |  8 +--
 sdks/python/apache_beam/runners/common.py       |  6 ++
 .../apache_beam/runners/dataflow_runner.py      |  5 +-
 .../python/apache_beam/runners/direct_runner.py |  2 +-
 sdks/python/apache_beam/runners/runner.py       |  6 +-
 .../python/apache_beam/transforms/aggregator.py |  2 +-
 sdks/python/apache_beam/transforms/combiners.py | 10 ++--
 .../apache_beam/transforms/combiners_test.py    |  4 ++
 sdks/python/apache_beam/transforms/core.py      | 21 +++----
 .../apache_beam/transforms/cy_combiners.py      | 53 ++++++++++++++++++
 .../python/apache_beam/transforms/ptransform.py | 13 +++--
 .../apache_beam/transforms/ptransform_test.py   | 10 ++++
 .../python/apache_beam/transforms/sideinputs.py |  9 ++-
 sdks/python/apache_beam/transforms/trigger.py   |  1 +
 .../apache_beam/transforms/trigger_test.py      |  5 +-
 sdks/python/apache_beam/transforms/util.py      |  1 +
 .../apache_beam/transforms/window_test.py       |  1 +
 .../apache_beam/typehints/typehints_test.py     |  3 +
 .../python/apache_beam/utils/dependency_test.py |  1 +
 sdks/python/apache_beam/utils/retry_test.py     |  4 +-
 sdks/python/run_pylint.sh                       |  7 ++-
 sdks/python/tox.ini                             | 12 +++-
 43 files changed, 234 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/.pylintrc
----------------------------------------------------------------------
diff --git a/sdks/python/.pylintrc b/sdks/python/.pylintrc
index edf13ee..efe8147 100644
--- a/sdks/python/.pylintrc
+++ b/sdks/python/.pylintrc
@@ -83,7 +83,6 @@ disable =
   arguments-differ,
   attribute-defined-outside-init,
   bad-builtin,
-  bad-option-value,
   bad-super-call,
   broad-except,
   consider-using-enumerate,
@@ -131,8 +130,6 @@ disable =
   unused-wildcard-import,
   used-before-assignment,
   wildcard-import,
-  wrong-import-order,
-  wrong-import-position,
 
 
 [REPORTS]

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/__init__.py 
b/sdks/python/apache_beam/__init__.py
index c9bc736..eed251b 100644
--- a/sdks/python/apache_beam/__init__.py
+++ b/sdks/python/apache_beam/__init__.py
@@ -71,7 +71,7 @@ if sys.version_info.major != 2:
       'Dataflow SDK for Python is supported only on Python 2.7. '
       'It is not supported on Python [%s].' % sys.version)
 
-
+# pylint: disable=wrong-import-position
 import apache_beam.internal.pickler
 
 from apache_beam import coders
@@ -79,3 +79,4 @@ from apache_beam import io
 from apache_beam import typehints
 from apache_beam.pipeline import Pipeline
 from apache_beam.transforms import *
+# pylint: enable=wrong-import-position

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py 
b/sdks/python/apache_beam/coders/coder_impl.py
index 07b6711..a623f2b 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -29,7 +29,7 @@ import collections
 from cPickle import loads, dumps
 
 
-# pylint: disable=g-import-not-at-top
+# pylint: disable=wrong-import-order, wrong-import-position
 try:
   # Don't depend on the full dataflow sdk to test coders.
   from apache_beam.transforms.window import WindowedValue
@@ -43,7 +43,7 @@ try:
 except ImportError:
   from slow_stream import InputStream as create_InputStream
   from slow_stream import OutputStream as create_OutputStream
-# pylint: enable=g-import-not-at-top
+# pylint: enable=wrong-import-order, wrong-import-position
 
 
 class CoderImpl(object):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py 
b/sdks/python/apache_beam/coders/coders.py
index 6d5b10a..619586f 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -24,7 +24,7 @@ import cPickle as pickle
 from apache_beam.coders import coder_impl
 
 
-# pylint: disable=g-import-not-at-top
+# pylint: disable=wrong-import-order, wrong-import-position
 # Avoid dependencies on the full SDK.
 try:
   # Import dill from the pickler module to make sure our monkey-patching of 
dill
@@ -46,7 +46,7 @@ def serialize_coder(coder):
 def deserialize_coder(serialized):
   from apache_beam.internal import pickler
   return pickler.loads(serialized.split('$', 1)[1])
-# pylint: enable=g-import-not-at-top
+# pylint: enable=wrong-import-order, wrong-import-position
 
 
 class Coder(object):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/coders/fast_coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/fast_coders_test.py 
b/sdks/python/apache_beam/coders/fast_coders_test.py
index 466fd57..55cf16c 100644
--- a/sdks/python/apache_beam/coders/fast_coders_test.py
+++ b/sdks/python/apache_beam/coders/fast_coders_test.py
@@ -28,7 +28,7 @@ from apache_beam.coders.coders_test_common import *
 class FastCoders(unittest.TestCase):
 
   def test_using_fast_impl(self):
-    # pylint: disable=g-import-not-at-top
+    # pylint: disable=wrong-import-order, wrong-import-position
     # pylint: disable=unused-variable
     import apache_beam.coders.stream
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/coders/slow_coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/slow_coders_test.py 
b/sdks/python/apache_beam/coders/slow_coders_test.py
index 26e6fa7..62149a3 100644
--- a/sdks/python/apache_beam/coders/slow_coders_test.py
+++ b/sdks/python/apache_beam/coders/slow_coders_test.py
@@ -30,7 +30,7 @@ class SlowCoders(unittest.TestCase):
   def test_using_slow_impl(self):
     # Assert that we are not using the compiled implementation.
     with self.assertRaises(ImportError):
-      # pylint: disable=g-import-not-at-top
+      # pylint: disable=wrong-import-order, wrong-import-position
       # pylint: disable=unused-variable
       import apache_beam.coders.stream
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/coders/stream_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/stream_test.py 
b/sdks/python/apache_beam/coders/stream_test.py
index f2163ca..cfd627f 100644
--- a/sdks/python/apache_beam/coders/stream_test.py
+++ b/sdks/python/apache_beam/coders/stream_test.py
@@ -139,7 +139,7 @@ class StreamTest(unittest.TestCase):
 
 
 try:
-  # pylint: disable=g-import-not-at-top
+  # pylint: disable=wrong-import-position
   from apache_beam.coders import stream
 
   class FastStreamTest(StreamTest):
@@ -148,14 +148,12 @@ try:
     OutputStream = stream.OutputStream
     ByteCountingOutputStream = stream.ByteCountingOutputStream
 
-
   class SlowFastStreamTest(StreamTest):
     """Runs the test with compiled and uncompiled stream classes."""
     InputStream = stream.InputStream
     OutputStream = slow_stream.OutputStream
     ByteCountingOutputStream = slow_stream.ByteCountingOutputStream
 
-
   class FastSlowStreamTest(StreamTest):
     """Runs the test with uncompiled and compiled stream classes."""
     InputStream = slow_stream.InputStream

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py 
b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
index 1625418..2bc37e6 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
@@ -51,14 +51,15 @@ def generate_julia_set_colors(pipeline, c, n, 
max_iterations):
 
   julia_set_colors = (pipeline
                       | beam.Create('add points', point_set(n))
-                      | beam.Map(get_julia_set_point_color, c, n, 
max_iterations))
+                      | beam.Map(
+                          get_julia_set_point_color, c, n, max_iterations))
 
   return julia_set_colors
 
 
 def generate_julia_set_visualization(data, n, max_iterations):
   """Generate the pixel matrix for rendering the julia set as an image."""
-  import numpy as np  # pylint: disable=g-import-not-at-top
+  import numpy as np  # pylint: disable=wrong-import-order, 
wrong-import-position
   colors = []
   for r in range(0, 256, 16):
     for g in range(0, 256, 16):
@@ -74,7 +75,7 @@ def generate_julia_set_visualization(data, n, max_iterations):
 
 def save_julia_set_visualization(out_file, image_array):
   """Save the fractal image of our julia set as a png."""
-  from matplotlib import pyplot as plt  # pylint: disable=g-import-not-at-top
+  from matplotlib import pyplot as plt  # pylint: disable=wrong-import-order, 
wrong-import-position
   plt.imsave(out_file, image_array, format='png')
 
 
@@ -104,13 +105,11 @@ def run(argv=None):  # pylint: disable=missing-docstring
   # Group each coordinate triplet by its x value, then write the coordinates to
   # the output file with an x-coordinate grouping per line.
   # pylint: disable=expression-not-assigned
-  # pylint: disable=g-long-lambda
   (coordinates | beam.Map('x coord key', lambda (x, y, i): (x, (x, y, i)))
    | beam.GroupByKey('x coord') | beam.Map(
        'format',
        lambda (k, coords): ' '.join('(%s, %s, %s)' % coord for coord in 
coords))
    | beam.io.Write('write', 
beam.io.TextFileSink(known_args.coordinate_output)))
-  # pylint: enable=g-long-lambda
   # pylint: enable=expression-not-assigned
   p.run()
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py 
b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
index 99a967c..7c420fb 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
@@ -44,7 +44,7 @@ def run(argv=None):
 
   p = beam.Pipeline(argv=pipeline_args)
 
-  from apache_beam.internal.clients import bigquery  # pylint: 
disable=g-import-not-at-top
+  from apache_beam.internal.clients import bigquery  # pylint: 
disable=wrong-import-order, wrong-import-position
 
   table_schema = bigquery.TableSchema()
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py 
b/sdks/python/apache_beam/examples/snippets/snippets.py
index e7af26a..f5bbc66 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -40,8 +40,7 @@ import apache_beam as beam
 # pylint:disable=expression-not-assigned
 # pylint:disable=redefined-outer-name
 # pylint:disable=unused-variable
-# pylint:disable=g-doc-args
-# pylint:disable=g-import-not-at-top
+# pylint:disable=wrong-import-order, wrong-import-position
 
 
 class SnippetUtils(object):
@@ -95,7 +94,7 @@ def construct_pipeline(renames):
 
   # [START pipelines_constructing_reading]
   lines = p | beam.io.Read('ReadMyFile',
-                      beam.io.TextFileSource('gs://some/inputData.txt'))
+                           beam.io.TextFileSource('gs://some/inputData.txt'))
   # [END pipelines_constructing_reading]
 
   # [START pipelines_constructing_applying]
@@ -106,7 +105,8 @@ def construct_pipeline(renames):
   # [START pipelines_constructing_writing]
   filtered_words = reversed_words | beam.Filter('FilterWords', filter_words)
   filtered_words | beam.io.Write('WriteMyFile',
-                               
beam.io.TextFileSink('gs://some/outputData.txt'))
+                                 beam.io.TextFileSink(
+                                     'gs://some/outputData.txt'))
   # [END pipelines_constructing_writing]
 
   p.visit(SnippetUtils.RenameFiles(renames))
@@ -304,7 +304,8 @@ def pipeline_options_command_line(argv):
 
   # Create the Pipeline with remaining arguments.
   p = beam.Pipeline(argv=pipeline_args)
-  lines = p | beam.io.Read('ReadFromText', 
beam.io.TextFileSource(known_args.input))
+  lines = p | beam.io.Read('ReadFromText',
+                           beam.io.TextFileSource(known_args.input))
   lines | beam.io.Write('WriteToText', beam.io.TextFileSink(known_args.output))
   # [END pipeline_options_command_line]
 
@@ -594,7 +595,8 @@ def examples_wordcount_debugging(renames):
       | beam.ParDo('FilterText', FilterTextFn('Flourish|stomach')))
 
   # [START example_wordcount_debugging_assert]
-  beam.assert_that(filtered_words, beam.equal_to([('Flourish', 3), ('stomach', 
1)]))
+  beam.assert_that(
+      filtered_words, beam.equal_to([('Flourish', 3), ('stomach', 1)]))
   # [END example_wordcount_debugging_assert]
 
   output = (filtered_words
@@ -634,7 +636,7 @@ def model_textio(renames):
   # [START model_pipelineio_write]
   filtered_words | beam.io.Write(
       'WriteToText', beam.io.TextFileSink('gs://my_bucket/path/to/numbers',
-                                        file_name_suffix='.csv'))
+                                          file_name_suffix='.csv'))
   # [END model_pipelineio_write]
   # [END model_textio_write]
 
@@ -762,6 +764,7 @@ def model_multiple_pcollections_partition(contents, 
output_path):
   URL: https://cloud.google.com/dataflow/model/multiple-pcollections
   """
   some_hash_fn = lambda s: ord(s[0])
+
   def get_percentile(i):
     """Assume i in [0,100)."""
     return i
@@ -770,6 +773,7 @@ def model_multiple_pcollections_partition(contents, 
output_path):
   p = beam.Pipeline(options=PipelineOptions())
 
   students = p | beam.Create(contents)
+
   # [START model_multiple_pcollections_partition]
   def partition_fn(student, num_partitions):
     return int(get_percentile(student) * num_partitions / 100)
@@ -872,4 +876,3 @@ class Count(beam.PTransform):
         | beam.Map('Init', lambda v: (v, 1))
         | beam.CombinePerKey(sum))
 # [END model_library_transforms_count]
-# pylint: enable=g-wrong-blank-lines

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py 
b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index f2f9552..87ce266 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -43,6 +43,7 @@ class ParDoTest(unittest.TestCase):
     # the text of the doc.
 
     words = ['aa', 'bbb', 'c']
+
     # [START model_pardo_pardo]
     class ComputeWordLengthFn(beam.DoFn):
       def process(self, context):
@@ -57,6 +58,7 @@ class ParDoTest(unittest.TestCase):
 
   def test_pardo_yield(self):
     words = ['aa', 'bbb', 'c']
+
     # [START model_pardo_yield]
     class ComputeWordLengthFn(beam.DoFn):
       def process(self, context):
@@ -84,11 +86,12 @@ class ParDoTest(unittest.TestCase):
 
   def test_pardo_using_flatmap_yield(self):
     words = ['aA', 'bbb', 'C']
+
     # [START model_pardo_using_flatmap_yield]
     def capitals(word):
       for letter in word:
         if 'A' <= letter <= 'Z':
-            yield letter
+          yield letter
     all_capitals = words | beam.FlatMap(capitals)
     # [END model_pardo_using_flatmap_yield]
 
@@ -113,27 +116,31 @@ class ParDoTest(unittest.TestCase):
         yield word
 
     # Construct a deferred side input.
-    avg_word_len = words | beam.Map(len) | 
beam.CombineGlobally(beam.combiners.MeanCombineFn())
+    avg_word_len = (words
+                    | beam.Map(len)
+                    | beam.CombineGlobally(beam.combiners.MeanCombineFn()))
 
     # Call with explicit side inputs.
     small_words = words | beam.FlatMap('small', filter_using_length, 0, 3)
 
     # A single deferred side input.
-    larger_than_average = words | beam.FlatMap('large',
-                                             filter_using_length,
-                                             
lower_bound=pvalue.AsSingleton(avg_word_len))
+    larger_than_average = (words
+                           | beam.FlatMap('large', filter_using_length,
+                                          lower_bound=pvalue.AsSingleton(
+                                              avg_word_len)))
 
     # Mix and match.
     small_but_nontrivial = words | beam.FlatMap(filter_using_length,
-                                              lower_bound=2,
-                                              
upper_bound=pvalue.AsSingleton(avg_word_len))
+                                                lower_bound=2,
+                                                upper_bound=pvalue.AsSingleton(
+                                                    avg_word_len))
     # [END model_pardo_side_input]
 
     beam.assert_that(small_words, beam.equal_to(['a', 'bb', 'ccc']))
     beam.assert_that(larger_than_average, beam.equal_to(['ccc', 'dddd']),
-                   label='larger_than_average')
+                     label='larger_than_average')
     beam.assert_that(small_but_nontrivial, beam.equal_to(['bb']),
-                   label='small_but_not_trivial')
+                     label='small_but_not_trivial')
     p.run()
 
   def test_pardo_side_input_dofn(self):
@@ -170,9 +177,8 @@ class ParDoTest(unittest.TestCase):
 
     # [START model_pardo_with_side_outputs]
     results = (words | beam.ParDo(ProcessWords(), cutoff_length=2, marker='x')
-                         .with_outputs('above_cutoff_lengths',
-                                       'marked strings',
-                                       main='below_cutoff_strings'))
+               .with_outputs('above_cutoff_lengths', 'marked strings',
+                             main='below_cutoff_strings'))
     below = results.below_cutoff_strings
     above = results.above_cutoff_lengths
     marked = results['marked strings']  # indexing works as well
@@ -183,10 +189,12 @@ class ParDoTest(unittest.TestCase):
     self.assertEqual({'xyz'}, set(marked))
 
     # [START model_pardo_with_side_outputs_iter]
-    below, above, marked = (words | beam.ParDo(ProcessWords(), 
cutoff_length=2, marker='x')
-                                      .with_outputs('above_cutoff_lengths',
-                                                    'marked strings',
-                                                    
main='below_cutoff_strings'))
+    below, above, marked = (words
+                            | beam.ParDo(
+                                ProcessWords(), cutoff_length=2, marker='x')
+                            .with_outputs('above_cutoff_lengths',
+                                          'marked strings',
+                                          main='below_cutoff_strings'))
     # [END model_pardo_with_side_outputs_iter]
 
     self.assertEqual({'a', 'an'}, set(below))
@@ -195,6 +203,7 @@ class ParDoTest(unittest.TestCase):
 
   def test_pardo_with_undeclared_side_outputs(self):
     numbers = [1, 2, 3, 4, 5, 10, 20]
+
     # [START model_pardo_with_side_outputs_undeclared]
     def even_odd(x):
       yield pvalue.SideOutputValue('odd' if x % 2 else 'even', x)
@@ -258,6 +267,7 @@ class TypeHintsTest(unittest.TestCase):
     # Helps document the contract and checks it at pipeline construction time.
     # [START type_hints_transform]
     T = beam.typehints.TypeVariable('T')
+
     @beam.typehints.with_input_types(T)
     @beam.typehints.with_output_types(beam.typehints.Tuple[int, T])
     class MyTransform(beam.PTransform):
@@ -316,7 +326,8 @@ class TypeHintsTest(unittest.TestCase):
     totals = (
         lines
         | beam.Map(parse_player_and_score)
-        | 
beam.CombinePerKey(sum).with_input_types(beam.typehints.Tuple[Player, int]))
+        | beam.CombinePerKey(sum).with_input_types(
+            beam.typehints.Tuple[Player, int]))
     # [END type_hints_deterministic_key]
 
     self.assertEquals(
@@ -491,12 +502,15 @@ class CombineTest(unittest.TestCase):
         ('cat', 1), ('cat', 5), ('cat', 9), ('cat', 1),
         ('dog', 5), ('dog', 2)]
     # [START combine_per_key]
-    avg_accuracy_per_player = player_accuracies | 
beam.CombinePerKey(beam.combiners.MeanCombineFn())
+    avg_accuracy_per_player = (player_accuracies
+                               | beam.CombinePerKey(
+                                   beam.combiners.MeanCombineFn()))
     # [END combine_per_key]
     self.assertEqual({('cat', 4.0), ('dog', 3.5)}, 
set(avg_accuracy_per_player))
 
   def test_combine_concat(self):
     pc = ['a', 'b']
+
     # [START combine_concat]
     def concat(values, separator=', '):
       return separator.join(values)
@@ -511,6 +525,7 @@ class CombineTest(unittest.TestCase):
   def test_bounded_sum(self):
     # [START combine_bounded_sum]
     pc = [1, 10, 100, 1000]
+
     def bounded_sum(values, bound=500):
       return min(sum(values), bound)
     small_sum = pc | beam.CombineGlobally(bounded_sum)              # [500]
@@ -524,23 +539,26 @@ class CombineTest(unittest.TestCase):
     # [START combine_reduce]
     import functools
     import operator
-    product = factors | beam.CombineGlobally(functools.partial(reduce, 
operator.mul), 1)
+    product = factors | beam.CombineGlobally(
+        functools.partial(reduce, operator.mul), 1)
     # [END combine_reduce]
     self.assertEqual([210], product)
 
   def test_custom_average(self):
     pc = [2, 3, 5, 7]
 
-
     # [START combine_custom_average]
     class AverageFn(beam.CombineFn):
       def create_accumulator(self):
         return (0.0, 0)
+
       def add_input(self, (sum, count), input):
         return sum + input, count + 1
+
       def merge_accumulators(self, accumulators):
         sums, counts = zip(*accumulators)
         return sum(sums), sum(counts)
+
       def extract_output(self, (sum, count)):
         return sum / count if count else float('NaN')
     average = pc | beam.CombineGlobally(AverageFn())

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/apiclient.py 
b/sdks/python/apache_beam/internal/apiclient.py
index a363511..7dfb035 100644
--- a/sdks/python/apache_beam/internal/apiclient.py
+++ b/sdks/python/apache_beam/internal/apiclient.py
@@ -25,6 +25,9 @@ import re
 import time
 
 
+from apitools.base.py import encoding
+from apitools.base.py import exceptions
+
 from apache_beam import utils
 from apache_beam import version
 from apache_beam.internal import pickler
@@ -40,8 +43,6 @@ from apache_beam.utils.options import GoogleCloudOptions
 from apache_beam.utils.options import StandardOptions
 from apache_beam.utils.options import WorkerOptions
 
-from apitools.base.py import encoding
-from apitools.base.py import exceptions
 from apache_beam.internal.clients import storage
 import apache_beam.internal.clients.dataflow as dataflow
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/internal/json_value_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/json_value_test.py 
b/sdks/python/apache_beam/internal/json_value_test.py
index 9b26ab2..f2ae0c1 100644
--- a/sdks/python/apache_beam/internal/json_value_test.py
+++ b/sdks/python/apache_beam/internal/json_value_test.py
@@ -24,6 +24,7 @@ from apitools.base.py.extra_types import JsonObject
 from apache_beam.internal.json_value import from_json_value
 from apache_beam.internal.json_value import to_json_value
 
+
 class JsonValueTest(unittest.TestCase):
 
   def test_string_to(self):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/internal/pickler.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/pickler.py 
b/sdks/python/apache_beam/internal/pickler.py
index edf63ad..898e04b 100644
--- a/sdks/python/apache_beam/internal/pickler.py
+++ b/sdks/python/apache_beam/internal/pickler.py
@@ -159,13 +159,13 @@ if 'save_module' in dir(dill.dill):
       return old_save_module_dict(pickler, obj)
   dill.dill.save_module_dict = new_save_module_dict
 
-
   def _nest_dill_logging():
     """Prefix all dill logging with its depth in the callstack.
 
     Useful for debugging pickling of deeply nested structures.
     """
     old_log_info = dill.dill.log.info
+
     def new_log_info(msg, *args, **kwargs):
       old_log_info(
           ('1 2 3 4 5 6 7 8 9 0 ' * 10)[:len(traceback.extract_stack())] + msg,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/internal/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/util.py 
b/sdks/python/apache_beam/internal/util.py
index 50ea2f6..ad60ba6 100644
--- a/sdks/python/apache_beam/internal/util.py
+++ b/sdks/python/apache_beam/internal/util.py
@@ -58,6 +58,7 @@ def remove_objects_from_args(args, kwargs, pvalue_classes):
     a placeholder value.
   """
   pvals = []
+
   def swapper(value):
     pvals.append(value)
     return ArgumentPlaceholder()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/io/avroio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio.py 
b/sdks/python/apache_beam/io/avroio.py
index 022a68d..25412af 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -21,11 +21,12 @@ import os
 import StringIO
 import zlib
 
-from apache_beam.io import filebasedsource
 from avro import datafile
 from avro import io as avro_io
 from avro import schema
 
+from apache_beam.io import filebasedsource
+
 
 class AvroSource(filebasedsource.FileBasedSource):
   """A source for reading Avro files.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/io/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/bigquery.py 
b/sdks/python/apache_beam/io/bigquery.py
index 9d33134..f2c56dc 100644
--- a/sdks/python/apache_beam/io/bigquery.py
+++ b/sdks/python/apache_beam/io/bigquery.py
@@ -110,6 +110,8 @@ import re
 import time
 import uuid
 
+from apitools.base.py.exceptions import HttpError
+
 from apache_beam import coders
 from apache_beam.internal import auth
 from apache_beam.internal.json_value import from_json_value
@@ -118,15 +120,13 @@ from apache_beam.io import iobase
 from apache_beam.utils import retry
 from apache_beam.utils.options import GoogleCloudOptions
 
-from apitools.base.py.exceptions import HttpError
-
 # Protect against environments where bigquery library is not available.
-# pylint: disable=g-import-not-at-top
+# pylint: disable=wrong-import-order, wrong-import-position
 try:
   from apache_beam.internal.clients import bigquery
 except ImportError:
   pass
-# pylint: enable=g-import-not-at-top
+# pylint: enable=wrong-import-order, wrong-import-position
 
 
 __all__ = [

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/io/bigquery_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/bigquery_test.py 
b/sdks/python/apache_beam/io/bigquery_test.py
index 7c31526..2bca0dc 100644
--- a/sdks/python/apache_beam/io/bigquery_test.py
+++ b/sdks/python/apache_beam/io/bigquery_test.py
@@ -22,16 +22,16 @@ import logging
 import time
 import unittest
 
+from apitools.base.py.exceptions import HttpError
 import mock
+
 import apache_beam as beam
+from apache_beam.internal.clients import bigquery
 from apache_beam.internal.json_value import to_json_value
 from apache_beam.io.bigquery import RowAsDictJsonCoder
 from apache_beam.io.bigquery import TableRowJsonCoder
 from apache_beam.utils.options import PipelineOptions
 
-from apitools.base.py.exceptions import HttpError
-from apache_beam.internal.clients import bigquery
-
 
 class TestRowAsDictJsonCoder(unittest.TestCase):
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py 
b/sdks/python/apache_beam/io/fileio.py
index 560d935..115bc0e 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -172,7 +172,7 @@ class ChannelFactory(object):
   @staticmethod
   def open(path, mode, mime_type):
     if path.startswith('gs://'):
-      # pylint: disable=g-import-not-at-top
+      # pylint: disable=wrong-import-order, wrong-import-position
       from apache_beam.io import gcsio
       return gcsio.GcsIO().open(path, mode, mime_type=mime_type)
     else:
@@ -182,7 +182,7 @@ class ChannelFactory(object):
   def rename(src, dst):
     if src.startswith('gs://'):
       assert dst.startswith('gs://'), dst
-      # pylint: disable=g-import-not-at-top
+      # pylint: disable=wrong-import-order, wrong-import-position
       from apache_beam.io import gcsio
       gcsio.GcsIO().rename(src, dst)
     else:
@@ -197,7 +197,7 @@ class ChannelFactory(object):
       assert dst.startswith('gs://'), dst
       assert src.endswith('/'), src
       assert dst.endswith('/'), dst
-      # pylint: disable=g-import-not-at-top
+      # pylint: disable=wrong-import-order, wrong-import-position
       from apache_beam.io import gcsio
       gcsio.GcsIO().copytree(src, dst)
     else:
@@ -211,7 +211,7 @@ class ChannelFactory(object):
   @staticmethod
   def exists(path):
     if path.startswith('gs://'):
-      # pylint: disable=g-import-not-at-top
+      # pylint: disable=wrong-import-order, wrong-import-position
       from apache_beam.io import gcsio
       return gcsio.GcsIO().exists()
     else:
@@ -220,7 +220,7 @@ class ChannelFactory(object):
   @staticmethod
   def rmdir(path):
     if path.startswith('gs://'):
-      # pylint: disable=g-import-not-at-top
+      # pylint: disable=wrong-import-order, wrong-import-position
       from apache_beam.io import gcsio
       gcs = gcsio.GcsIO()
       if not path.endswith('/'):
@@ -237,7 +237,7 @@ class ChannelFactory(object):
   @staticmethod
   def rm(path):
     if path.startswith('gs://'):
-      # pylint: disable=g-import-not-at-top
+      # pylint: disable=wrong-import-order, wrong-import-position
       from apache_beam.io import gcsio
       gcsio.GcsIO().delete(path)
     else:
@@ -249,7 +249,7 @@ class ChannelFactory(object):
   @staticmethod
   def glob(path):
     if path.startswith('gs://'):
-      # pylint: disable=g-import-not-at-top
+      # pylint: disable=wrong-import-order, wrong-import-position
       from apache_beam.io import gcsio
       return gcsio.GcsIO().glob(path)
     else:
@@ -608,7 +608,7 @@ class TextFileReader(iobase.NativeSourceReader):
 
   def __enter__(self):
     if self.source.is_gcs_source:
-      # pylint: disable=g-import-not-at-top
+      # pylint: disable=wrong-import-order, wrong-import-position
       from apache_beam.io import gcsio
       self._file = gcsio.GcsIO().open(self.source.file_path, 'rb')
     else:

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/io/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcsio.py 
b/sdks/python/apache_beam/io/gcsio.py
index 3d7e00f..a01988b 100644
--- a/sdks/python/apache_beam/io/gcsio.py
+++ b/sdks/python/apache_beam/io/gcsio.py
@@ -30,16 +30,17 @@ import re
 import StringIO
 import threading
 
+from apitools.base.py.exceptions import HttpError
+import apitools.base.py.transfer as transfer
+
 from apache_beam.internal import auth
 from apache_beam.utils import retry
 
-from apitools.base.py.exceptions import HttpError
-import apitools.base.py.transfer as transfer
 
 # Issue a friendlier error message if the storage library is not available.
 # TODO(silviuc): Remove this guard when storage is available everywhere.
 try:
-  # pylint: disable=g-import-not-at-top
+  # pylint: disable=wrong-import-order, wrong-import-position
   from apache_beam.internal.clients import storage
 except ImportError:
   raise RuntimeError(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/io/gcsio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcsio_test.py 
b/sdks/python/apache_beam/io/gcsio_test.py
index 138f2a8..eeabb1a 100644
--- a/sdks/python/apache_beam/io/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcsio_test.py
@@ -24,13 +24,12 @@ import random
 import threading
 import unittest
 
-
 import httplib2
-
-from apache_beam.io import gcsio
 from apitools.base.py.exceptions import HttpError
 from apache_beam.internal.clients import storage
 
+from apache_beam.io import gcsio
+
 
 class FakeGcsClient(object):
   # Fake storage client.  Usage in gcsio.py is client.objects.Get(...) and

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/pvalue.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pvalue.py 
b/sdks/python/apache_beam/pvalue.py
index 6354139..78ff209 100644
--- a/sdks/python/apache_beam/pvalue.py
+++ b/sdks/python/apache_beam/pvalue.py
@@ -348,7 +348,7 @@ def AsSingleton(pcoll, default_value=_SINGLETON_NO_DEFAULT, 
label=None):  # pyli
   # Local import is required due to dependency loop; even though the
   # implementation of this function requires concepts defined in modules that
   # depend on pvalue, it lives in this module to reduce user workload.
-  from apache_beam.transforms import sideinputs  # pylint: 
disable=g-import-not-at-top
+  from apache_beam.transforms import sideinputs  # pylint: 
disable=wrong-import-order, wrong-import-position
   view = (pcoll | sideinputs.ViewAsSingleton(has_default, default_value,
                                              label=label))
   _cache_view(pcoll.pipeline, cache_key, view)
@@ -380,7 +380,7 @@ def AsIter(pcoll, label=None):  # pylint: 
disable=invalid-name
   # Local import is required due to dependency loop; even though the
   # implementation of this function requires concepts defined in modules that
   # depend on pvalue, it lives in this module to reduce user workload.
-  from apache_beam.transforms import sideinputs  # pylint: 
disable=g-import-not-at-top
+  from apache_beam.transforms import sideinputs  # pylint: 
disable=wrong-import-order, wrong-import-position
   view = (pcoll | sideinputs.ViewAsIterable(label=label))
   _cache_view(pcoll.pipeline, cache_key, view)
   return view
@@ -411,7 +411,7 @@ def AsList(pcoll, label=None):  # pylint: 
disable=invalid-name
   # Local import is required due to dependency loop; even though the
   # implementation of this function requires concepts defined in modules that
   # depend on pvalue, it lives in this module to reduce user workload.
-  from apache_beam.transforms import sideinputs  # pylint: 
disable=g-import-not-at-top
+  from apache_beam.transforms import sideinputs  # pylint: 
disable=wrong-import-order, wrong-import-position
   view = (pcoll | sideinputs.ViewAsList(label=label))
   _cache_view(pcoll.pipeline, cache_key, view)
   return view
@@ -443,7 +443,7 @@ def AsDict(pcoll, label=None):  # pylint: 
disable=invalid-name
   # Local import is required due to dependency loop; even though the
   # implementation of this function requires concepts defined in modules that
   # depend on pvalue, it lives in this module to reduce user workload.
-  from apache_beam.transforms import sideinputs  # pylint: 
disable=g-import-not-at-top
+  from apache_beam.transforms import sideinputs  # pylint: 
disable=wrong-import-order, wrong-import-position
   view = (pcoll | sideinputs.ViewAsDict(label=label))
   _cache_view(pcoll.pipeline, cache_key, view)
   return view

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index 0e4a057..e33d4ce 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -32,8 +32,10 @@ from apache_beam.transforms.window import WindowFn
 class FakeLogger(object):
   def PerThreadLoggingContext(self, *unused_args, **unused_kwargs):
     return self
+
   def __enter__(self):
     pass
+
   def __exit__(self, *unused_args):
     pass
 
@@ -153,18 +155,22 @@ class DoFnRunner(object):
       else:
         self.tagged_receivers[tag].output(windowed_value)
 
+
 class NoContext(WindowFn.AssignContext):
   """An uninspectable WindowFn.AssignContext."""
   NO_VALUE = object()
+
   def __init__(self, value, timestamp=NO_VALUE):
     self.value = value
     self._timestamp = timestamp
+
   @property
   def timestamp(self):
     if self._timestamp is self.NO_VALUE:
       raise ValueError('No timestamp in this context.')
     else:
       return self._timestamp
+
   @property
   def existing_windows(self):
     raise ValueError('No existing_windows in this context.')

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow_runner.py
index 7b04ae9..43a50ba 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -86,6 +86,7 @@ class DataflowPipelineRunner(PipelineRunner):
     # It typically takes about 30 seconds.
     final_countdown_timer_secs = 50.0
     sleep_secs = 5.0
+
     # Try to prioritize the user-level traceback, if any.
     def rank_error(msg):
       if 'work item was attempted' in msg:
@@ -151,7 +152,7 @@ class DataflowPipelineRunner(PipelineRunner):
   def run(self, pipeline):
     """Remotely executes entire pipeline or parts reachable from node."""
     # Import here to avoid adding the dependency for local running scenarios.
-    # pylint: disable=g-import-not-at-top
+    # pylint: disable=wrong-import-order, wrong-import-position
     from apache_beam.internal import apiclient
     self.job = apiclient.Job(pipeline.options)
     # The superclass's run will trigger a traversal of all reachable nodes.
@@ -244,7 +245,7 @@ class DataflowPipelineRunner(PipelineRunner):
   def _add_step(self, step_kind, step_label, transform_node, side_tags=()):
     """Creates a Step object and adds it to the cache."""
     # Import here to avoid adding the dependency for local running scenarios.
-    # pylint: disable=g-import-not-at-top
+    # pylint: disable=wrong-import-order, wrong-import-position
     from apache_beam.internal import apiclient
     step = apiclient.Step(step_kind, self._get_unique_step_name())
     self.job.proto.steps.append(step.proto)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/runners/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct_runner.py 
b/sdks/python/apache_beam/runners/direct_runner.py
index 2c73394..e0df439 100644
--- a/sdks/python/apache_beam/runners/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct_runner.py
@@ -112,7 +112,7 @@ class DirectPipelineRunner(PipelineRunner):
     values = self._cache.get_pvalue(transform_node.inputs[0])
     if isinstance(view, SingletonPCollectionView):
       has_default, default_value = view._view_options()  # pylint: 
disable=protected-access
-      if len(values) == 0:  # pylint: disable=g-explicit-length-test
+      if len(values) == 0:
         if has_default:
           result = default_value
         else:

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py 
b/sdks/python/apache_beam/runners/runner.py
index 9b8d622..55b63f3 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -40,7 +40,7 @@ def create_runner(runner_name):
   Raises:
     RuntimeError: if an invalid runner name is used.
   """
-  # pylint: disable=g-import-not-at-top
+  # pylint: disable=wrong-import-order, wrong-import-position
   if runner_name == 'DirectPipelineRunner':
     import apache_beam.runners.direct_runner
     return apache_beam.runners.direct_runner.DirectPipelineRunner()
@@ -81,7 +81,7 @@ class PipelineRunner(object):
     """Execute the entire pipeline or the sub-DAG reachable from a node."""
 
     # Imported here to avoid circular dependencies.
-    # pylint: disable=g-import-not-at-top
+    # pylint: disable=wrong-import-order, wrong-import-position
     from apache_beam.pipeline import PipelineVisitor
 
     class RunVisitor(PipelineVisitor):
@@ -117,7 +117,7 @@ class PipelineRunner(object):
     """
 
     # Imported here to avoid circular dependencies.
-    # pylint: disable=g-import-not-at-top
+    # pylint: disable=wrong-import-order, wrong-import-position
     from apache_beam.pipeline import PipelineVisitor
 
     class ClearVisitor(PipelineVisitor):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/transforms/aggregator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/aggregator.py 
b/sdks/python/apache_beam/transforms/aggregator.py
index b7167ff..a5e83cb 100644
--- a/sdks/python/apache_beam/transforms/aggregator.py
+++ b/sdks/python/apache_beam/transforms/aggregator.py
@@ -103,6 +103,6 @@ class Aggregator(object):
 
 
 def _is_supported_kind(combine_fn):
-  # pylint: disable=g-import-not-at-top
+  # pylint: disable=wrong-import-order, wrong-import-position
   from apache_beam.internal.apiclient import metric_translations
   return combine_fn.__class__ in metric_translations

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/transforms/combiners.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners.py 
b/sdks/python/apache_beam/transforms/combiners.py
index 64ede3b..e9f11a0 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -46,6 +46,11 @@ __all__ = [
     'ToList',
     ]
 
+# Type variables
+T = TypeVariable('T')
+K = TypeVariable('K')
+V = TypeVariable('V')
+
 
 class Mean(object):
   """Combiners for computing arithmetic means of elements."""
@@ -214,7 +219,6 @@ class Top(object):
     return pcoll | Top.PerKey(label, n, lambda a, b: b < a)
 
 
-T = TypeVariable('T')
 @with_input_types(T)
 @with_output_types(List[T])
 class TopCombineFn(core.CombineFn):
@@ -329,7 +333,6 @@ class Sample(object):
     return pcoll | core.CombinePerKey(label, SampleCombineFn(n))
 
 
-T = TypeVariable('T')
 @with_input_types(T)
 @with_output_types(List[T])
 class SampleCombineFn(core.CombineFn):
@@ -404,7 +407,6 @@ class ToList(ptransform.PTransform):
     return pcoll | core.CombineGlobally(self.label, ToListCombineFn())
 
 
-T = TypeVariable('T')
 @with_input_types(T)
 @with_output_types(List[T])
 class ToListCombineFn(core.CombineFn):
@@ -439,8 +441,6 @@ class ToDict(ptransform.PTransform):
     return pcoll | core.CombineGlobally(self.label, ToDictCombineFn())
 
 
-K = TypeVariable('K')
-V = TypeVariable('V')
 @with_input_types(Tuple[K, V])
 @with_output_types(Dict[K, V])
 class ToDictCombineFn(core.CombineFn):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/transforms/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py 
b/sdks/python/apache_beam/transforms/combiners_test.py
index 112c591..10682b4 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -122,6 +122,7 @@ class CombineTest(unittest.TestCase):
       pipeline = Pipeline('DirectPipelineRunner')
       pcoll = pipeline | Create('start', [1, 1, 2, 2])
       result = pcoll | combine.Sample.FixedSizeGlobally('sample-%d' % ix, 3)
+
       def matcher():
         def match(actual):
           # There is always exactly one result.
@@ -142,6 +143,7 @@ class CombineTest(unittest.TestCase):
         'start-perkey',
         sum(([(i, 1), (i, 1), (i, 2), (i, 2)] for i in xrange(300)), []))
     result = pcoll | combine.Sample.FixedSizePerKey('sample', 3)
+
     def matcher():
       def match(actual):
         for _, samples in actual:
@@ -180,6 +182,7 @@ class CombineTest(unittest.TestCase):
     the_list = [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]
     pcoll = pipeline | Create('start', the_list)
     result = pcoll | combine.ToList('to list')
+
     def matcher(expected):
       def match(actual):
         equal_to(expected[0])(actual[0])
@@ -191,6 +194,7 @@ class CombineTest(unittest.TestCase):
     pairs = [(1, 2), (3, 4), (5, 6)]
     pcoll = pipeline | Create('start-pairs', pairs)
     result = pcoll | combine.ToDict('to dict')
+
     def matcher():
       def match(actual):
         equal_to([1])([len(actual)])

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index 445367f..8f4c246 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -49,6 +49,11 @@ from apache_beam.typehints import WithTypeHints
 from apache_beam.typehints.trivial_inference import element_type
 from apache_beam.utils.options import TypeOptions
 
+# Type variables
+T = typehints.TypeVariable('T')
+K = typehints.TypeVariable('K')
+V = typehints.TypeVariable('V')
+
 
 class DoFnProcessContext(object):
   """A processing context passed to DoFn methods during execution.
@@ -842,6 +847,7 @@ class CombineGlobally(PTransform):
             "an empty PCollection if the input PCollection is empty, "
             "or CombineGlobally().as_singleton_view() to get the default "
             "output of the CombineFn if the input PCollection is empty.")
+
       def typed(transform):
         # TODO(robertwb): We should infer this.
         if combined.element_type:
@@ -954,8 +960,6 @@ class CombineValuesDoFn(DoFn):
     return hints
 
 
-K = typehints.TypeVariable('K')
-V = typehints.TypeVariable('V')
 @typehints.with_input_types(typehints.KV[K, V])
 @typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
 class GroupByKey(PTransform):
@@ -999,10 +1003,10 @@ class GroupByKey(PTransform):
 
     def process(self, context):
       k, vs = context.element
-      # pylint: disable=g-import-not-at-top
+      # pylint: disable=wrong-import-order, wrong-import-position
       from apache_beam.transforms.trigger import InMemoryUnmergedState
       from apache_beam.transforms.trigger import create_trigger_driver
-      # pylint: enable=g-import-not-at-top
+      # pylint: enable=wrong-import-order, wrong-import-position
       driver = create_trigger_driver(self.windowing, True)
       state = InMemoryUnmergedState()
       # TODO(robertwb): Conditionally process in smaller chunks.
@@ -1051,8 +1055,6 @@ class GroupByKey(PTransform):
                       self.GroupAlsoByWindow(pcoll.windowing)))
 
 
-K = typehints.TypeVariable('K')
-V = typehints.TypeVariable('V')
 @typehints.with_input_types(typehints.KV[K, V])
 @typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
 class GroupByKeyOnly(PTransform):
@@ -1115,9 +1117,9 @@ class Windowing(object):
   def __init__(self, windowfn, triggerfn=None, accumulation_mode=None,
                output_time_fn=None):
     global AccumulationMode, DefaultTrigger
-    # pylint: disable=g-import-not-at-top
+    # pylint: disable=wrong-import-order, wrong-import-position
     from apache_beam.transforms.trigger import AccumulationMode, DefaultTrigger
-    # pylint: enable=g-import-not-at-top
+    # pylint: enable=wrong-import-order, wrong-import-position
     if triggerfn is None:
       triggerfn = DefaultTrigger()
     if accumulation_mode is None:
@@ -1145,10 +1147,9 @@ class Windowing(object):
     return self._is_default
 
 
-T = typehints.TypeVariable('T')
 @typehints.with_input_types(T)
 @typehints.with_output_types(T)
-class WindowInto(ParDo):  # pylint: disable=g-wrong-blank-lines
+class WindowInto(ParDo):
   """A window transform assigning windows to each element of a PCollection.
 
   Transforms an input PCollection by applying a windowing function to each

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/transforms/cy_combiners.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/cy_combiners.py 
b/sdks/python/apache_beam/transforms/cy_combiners.py
index be40d66..f824870 100644
--- a/sdks/python/apache_beam/transforms/cy_combiners.py
+++ b/sdks/python/apache_beam/transforms/cy_combiners.py
@@ -26,20 +26,25 @@ class AccumulatorCombineFn(core.CombineFn):
   # singleton?
   def create_accumulator(self):
     return self._accumulator_type()
+
   @staticmethod
   def add_input(accumulator, element):
     accumulator.add_input(element)
     return accumulator
+
   def merge_accumulators(self, accumulators):
     accumulator = self._accumulator_type()
     accumulator.merge(accumulators)
     return accumulator
+
   @staticmethod
   def extract_output(accumulator):
     return accumulator.extract_output()
+
   def __eq__(self, other):
     return (isinstance(other, AccumulatorCombineFn)
             and self._accumulator_type is other._accumulator_type)
+
   def __hash__(self):
     return hash(self._accumulator_type)
 
@@ -52,11 +57,14 @@ globals()['INT64_MIN'] = -2**_63
 class CountAccumulator(object):
   def __init__(self):
     self.value = 0
+
   def add_input(self, unused_element):
     self.value += 1
+
   def merge(self, accumulators):
     for accumulator in accumulators:
       self.value += accumulator.value
+
   def extract_output(self):
     return self.value
 
@@ -64,14 +72,17 @@ class CountAccumulator(object):
 class SumInt64Accumulator(object):
   def __init__(self):
     self.value = 0
+
   def add_input(self, element):
     element = int(element)
     if not INT64_MIN <= element <= INT64_MAX:
       raise OverflowError(element)
     self.value += element
+
   def merge(self, accumulators):
     for accumulator in accumulators:
       self.value += accumulator.value
+
   def extract_output(self):
     if not INT64_MIN <= self.value <= INT64_MAX:
       self.value %= 2**64
@@ -83,16 +94,19 @@ class SumInt64Accumulator(object):
 class MinInt64Accumulator(object):
   def __init__(self):
     self.value = INT64_MAX
+
   def add_input(self, element):
     element = int(element)
     if not INT64_MIN <= element <= INT64_MAX:
       raise OverflowError(element)
     if element < self.value:
       self.value = element
+
   def merge(self, accumulators):
     for accumulator in accumulators:
       if accumulator.value < self.value:
         self.value = accumulator.value
+
   def extract_output(self):
     return self.value
 
@@ -100,16 +114,19 @@ class MinInt64Accumulator(object):
 class MaxInt64Accumulator(object):
   def __init__(self):
     self.value = INT64_MIN
+
   def add_input(self, element):
     element = int(element)
     if not INT64_MIN <= element <= INT64_MAX:
       raise OverflowError(element)
     if element > self.value:
       self.value = element
+
   def merge(self, accumulators):
     for accumulator in accumulators:
       if accumulator.value > self.value:
         self.value = accumulator.value
+
   def extract_output(self):
     return self.value
 
@@ -118,16 +135,19 @@ class MeanInt64Accumulator(object):
   def __init__(self):
     self.sum = 0
     self.count = 0
+
   def add_input(self, element):
     element = int(element)
     if not INT64_MIN <= element <= INT64_MAX:
       raise OverflowError(element)
     self.sum += element
     self.count += 1
+
   def merge(self, accumulators):
     for accumulator in accumulators:
       self.sum += accumulator.sum
       self.count += accumulator.count
+
   def extract_output(self):
     if not INT64_MIN <= self.sum <= INT64_MAX:
       self.sum %= 2**64
@@ -138,12 +158,20 @@ class MeanInt64Accumulator(object):
 
 class CountCombineFn(AccumulatorCombineFn):
   _accumulator_type = CountAccumulator
+
+
 class SumInt64Fn(AccumulatorCombineFn):
   _accumulator_type = SumInt64Accumulator
+
+
 class MinInt64Fn(AccumulatorCombineFn):
   _accumulator_type = MinInt64Accumulator
+
+
 class MaxInt64Fn(AccumulatorCombineFn):
   _accumulator_type = MaxInt64Accumulator
+
+
 class MeanInt64Fn(AccumulatorCombineFn):
   _accumulator_type = MeanInt64Accumulator
 
@@ -156,12 +184,15 @@ _NAN = float('nan')
 class SumDoubleAccumulator(object):
   def __init__(self):
     self.value = 0
+
   def add_input(self, element):
     element = float(element)
     self.value += element
+
   def merge(self, accumulators):
     for accumulator in accumulators:
       self.value += accumulator.value
+
   def extract_output(self):
     return self.value
 
@@ -169,14 +200,17 @@ class SumDoubleAccumulator(object):
 class MinDoubleAccumulator(object):
   def __init__(self):
     self.value = _POS_INF
+
   def add_input(self, element):
     element = float(element)
     if element < self.value:
       self.value = element
+
   def merge(self, accumulators):
     for accumulator in accumulators:
       if accumulator.value < self.value:
         self.value = accumulator.value
+
   def extract_output(self):
     return self.value
 
@@ -184,14 +218,17 @@ class MinDoubleAccumulator(object):
 class MaxDoubleAccumulator(object):
   def __init__(self):
     self.value = _NEG_INF
+
   def add_input(self, element):
     element = float(element)
     if element > self.value:
       self.value = element
+
   def merge(self, accumulators):
     for accumulator in accumulators:
       if accumulator.value > self.value:
         self.value = accumulator.value
+
   def extract_output(self):
     return self.value
 
@@ -200,24 +237,33 @@ class MeanDoubleAccumulator(object):
   def __init__(self):
     self.sum = 0
     self.count = 0
+
   def add_input(self, element):
     element = float(element)
     self.sum += element
     self.count += 1
+
   def merge(self, accumulators):
     for accumulator in accumulators:
       self.sum += accumulator.sum
       self.count += accumulator.count
+
   def extract_output(self):
     return self.sum / self.count if self.count else _NAN
 
 
 class SumFloatFn(AccumulatorCombineFn):
   _accumulator_type = SumDoubleAccumulator
+
+
 class MinFloatFn(AccumulatorCombineFn):
   _accumulator_type = MinDoubleAccumulator
+
+
 class MaxFloatFn(AccumulatorCombineFn):
   _accumulator_type = MaxDoubleAccumulator
+
+
 class MeanFloatFn(AccumulatorCombineFn):
   _accumulator_type = MeanDoubleAccumulator
 
@@ -225,11 +271,14 @@ class MeanFloatFn(AccumulatorCombineFn):
 class AllAccumulator(object):
   def __init__(self):
     self.value = True
+
   def add_input(self, element):
     self.value &= not not element
+
   def merge(self, accumulators):
     for accumulator in accumulators:
       self.value &= accumulator.value
+
   def extract_output(self):
     return self.value
 
@@ -237,11 +286,14 @@ class AllAccumulator(object):
 class AnyAccumulator(object):
   def __init__(self):
     self.value = False
+
   def add_input(self, element):
     self.value |= not not element
+
   def merge(self, accumulators):
     for accumulator in accumulators:
       self.value |= accumulator.value
+
   def extract_output(self):
     return self.value
 
@@ -249,5 +301,6 @@ class AnyAccumulator(object):
 class AnyCombineFn(AccumulatorCombineFn):
   _accumulator_type = AnyAccumulator
 
+
 class AllCombineFn(AccumulatorCombineFn):
   _accumulator_type = AllAccumulator

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py 
b/sdks/python/apache_beam/transforms/ptransform.py
index 5b089b9..106c44f 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -383,10 +383,10 @@ class PTransform(WithTypeHints):
     pipelines = [v.pipeline for v in pvalues if isinstance(v, pvalue.PValue)]
     if pvalues and not pipelines:
       deferred = False
-      # pylint: disable=g-import-not-at-top
+      # pylint: disable=wrong-import-order, wrong-import-position
       from apache_beam import pipeline
       from apache_beam.utils.options import PipelineOptions
-      # pylint: enable=g-import-not-at-top
+      # pylint: enable=wrong-import-order, wrong-import-position
       p = pipeline.Pipeline(
           'DirectPipelineRunner', PipelineOptions(sys.argv))
     else:
@@ -403,9 +403,9 @@ class PTransform(WithTypeHints):
             raise ValueError(
                 'Mixing value from different pipelines not allowed.')
       deferred = not getattr(p.runner, 'is_eager', False)
-    # pylint: disable=g-import-not-at-top
+    # pylint: disable=wrong-import-order, wrong-import-position
     from apache_beam.transforms.core import Create
-    # pylint: enable=g-import-not-at-top
+    # pylint: enable=wrong-import-order, wrong-import-position
     replacements = {id(v): p | Create('CreatePInput%s' % ix, v)
                     for ix, v in enumerate(pvalues)
                     if not isinstance(v, pvalue.PValue) and v is not None}
@@ -431,9 +431,9 @@ class PTransform(WithTypeHints):
 
     Generally only needs to be overriden for multi-input PTransforms.
     """
-    # pylint: disable=g-import-not-at-top
+    # pylint: disable=wrong-import-order
     from apache_beam import pipeline
-    # pylint: enable=g-import-not-at-top
+    # pylint: enable=wrong-import-order
     if isinstance(pvalueish, pipeline.Pipeline):
       pvalueish = pvalue.PBegin(pvalueish)
 
@@ -557,6 +557,7 @@ class PTransformWithSideInputs(PTransform):
     type_hints = self.get_type_hints().input_types
     if type_hints:
       args, kwargs = self.raw_side_inputs
+
       def element_type(side_input):
         if isinstance(side_input, pvalue.PCollectionView):
           return side_input.element_type

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py 
b/sdks/python/apache_beam/transforms/ptransform_test.py
index 8ae7a37..d6ee18a 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -208,8 +208,10 @@ class PTransformTest(unittest.TestCase):
     class MyDoFn(beam.DoFn):
       def start_bundle(self, c):
         yield 'start'
+
       def process(self, c):
         pass
+
       def finish_bundle(self, c):
         yield 'finish'
     pipeline = Pipeline('DirectPipelineRunner')
@@ -540,6 +542,7 @@ class PTransformTest(unittest.TestCase):
       def _extract_input_pvalues(self, pvalueish):
         pvalueish = list(pvalueish)
         return pvalueish, sum([list(p.values()) for p in pvalueish], [])
+
       def apply(self, pcoll_dicts):
         keys = reduce(operator.or_, [set(p.keys()) for p in pcoll_dicts])
         res = {}
@@ -554,6 +557,7 @@ class PTransformTest(unittest.TestCase):
     self.assertEqual(['x', 'x', 'y', 'y', 'z'], sorted(res['b']))
     self.assertEqual([], sorted(res['c']))
 
+
 @beam.ptransform_fn
 def SamplePTransform(label, pcoll, context, *args, **kwargs):
   """Sample transform using the @ptransform_fn decorator."""
@@ -1646,6 +1650,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
          | combine.Sample.FixedSizeGlobally('sample', 3))
 
     self.assertCompatible(typehints.Iterable[int], d.element_type)
+
     def matcher(expected_len):
       def match(actual):
         equal_to([expected_len])([len(actual[0])])
@@ -1661,6 +1666,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
          | combine.Sample.FixedSizeGlobally('sample', 2))
 
     self.assertCompatible(typehints.Iterable[int], d.element_type)
+
     def matcher(expected_len):
       def match(actual):
         equal_to([expected_len])([len(actual[0])])
@@ -1676,6 +1682,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     self.assertCompatible(typehints.KV[int, typehints.Iterable[int]],
                           d.element_type)
+
     def matcher(expected_len):
       def match(actual):
         for _, sample in actual:
@@ -1694,6 +1701,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     self.assertCompatible(typehints.KV[int, typehints.Iterable[int]],
                           d.element_type)
+
     def matcher(expected_len):
       def match(actual):
         for _, sample in actual:
@@ -1708,6 +1716,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
          | combine.ToList('to list'))
 
     self.assertCompatible(typehints.List[int], d.element_type)
+
     def matcher(expected):
       def match(actual):
         equal_to(expected)(actual[0])
@@ -1723,6 +1732,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
          | combine.ToList('to list'))
 
     self.assertCompatible(typehints.List[str], d.element_type)
+
     def matcher(expected):
       def match(actual):
         equal_to(expected)(actual[0])

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/transforms/sideinputs.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs.py 
b/sdks/python/apache_beam/transforms/sideinputs.py
index fc54fa3..6484a7c 100644
--- a/sdks/python/apache_beam/transforms/sideinputs.py
+++ b/sdks/python/apache_beam/transforms/sideinputs.py
@@ -28,6 +28,10 @@ from apache_beam import pvalue
 from apache_beam import typehints
 from apache_beam.transforms.ptransform import PTransform
 
+# Type variables
+K = typehints.TypeVariable('K')
+V = typehints.TypeVariable('V')
+
 
 class CreatePCollectionView(PTransform):
   """Transform to materialize a given PCollectionView in the pipeline.
@@ -119,11 +123,10 @@ class ViewAsList(PTransform):
             .with_input_types(input_type)
             .with_output_types(output_type))
 
-K = typehints.TypeVariable('K')
-V = typehints.TypeVariable('V')
+
 @typehints.with_input_types(typehints.Tuple[K, V])
 @typehints.with_output_types(typehints.Dict[K, V])
-class ViewAsDict(PTransform):  # pylint: disable=g-wrong-blank-lines
+class ViewAsDict(PTransform):
   """Transform to view PCollection as a dict PCollectionView.
 
   Important: this transform is an implementation detail and should not be used

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/transforms/trigger.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger.py 
b/sdks/python/apache_beam/transforms/trigger.py
index 1c75d3b..788277c 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -731,6 +731,7 @@ class DefaultGlobalBatchTriggerDriver(TriggerDriver):
             unwindowed_value = wv.value
             self.notify_observers(unwindowed_value)
             yield unwindowed_value
+
         def __repr__(self):
           return '<UnwindowedValues of %s>' % windowed_values
       unwindowed = UnwindowedValues()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/transforms/trigger_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py 
b/sdks/python/apache_beam/transforms/trigger_test.py
index fee9b1b..2cdd1e3 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -61,6 +61,7 @@ class TriggerTest(unittest.TestCase):
                          **kwargs):
     late_data = kwargs.pop('late_data', [])
     assert not kwargs
+
     def bundle_data(data, size):
       bundle = []
       for timestamp, elem in data:
@@ -477,10 +478,10 @@ class TranscriptTest(unittest.TestCase):
       else:
         return fn
 
-    # pylint: disable=g-import-not-at-top
+    # pylint: disable=wrong-import-order, wrong-import-position
     from apache_beam.transforms import window as window_module
     from apache_beam.transforms import trigger as trigger_module
-    # pylint: enable=g-import-not-at-top
+    # pylint: enable=wrong-import-order, wrong-import-position
     window_fn_names = dict(window_module.__dict__)
     window_fn_names.update({'CustomTimestampingFixedWindowsWindowFn':
                             CustomTimestampingFixedWindowsWindowFn})

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/transforms/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/util.py 
b/sdks/python/apache_beam/transforms/util.py
index 52bba05..b8380a0 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -181,6 +181,7 @@ class DataflowAssertException(Exception):
 # TODO(silviuc): Add contains_in_any_order-style matchers.
 def equal_to(expected):
   expected = list(expected)
+
   def _equal(actual):
     sorted_expected = sorted(expected)
     sorted_actual = sorted(actual)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py 
b/sdks/python/apache_beam/transforms/window_test.py
index 1d7f272..186fcd4 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -52,6 +52,7 @@ class ReifyWindowsFn(core.DoFn):
       yield "%s @ %s" % (key, window), values
 reify_windows = core.ParDo(ReifyWindowsFn())
 
+
 class WindowTest(unittest.TestCase):
 
   def test_fixed_windows(self):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/typehints/typehints_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typehints_test.py 
b/sdks/python/apache_beam/typehints/typehints_test.py
index e6bab61..aa04fe2 100644
--- a/sdks/python/apache_beam/typehints/typehints_test.py
+++ b/sdks/python/apache_beam/typehints/typehints_test.py
@@ -45,6 +45,7 @@ def check_or_interleave(hint, value, var):
     _check_instance_type(hint, value, var)
     return value
 
+
 def check_type_hints(f):
   @functools.wraps(f)
   def wrapper(*args, **kwargs):
@@ -770,6 +771,7 @@ class TakesDecoratorTestCase(TypeHintTestCase):
   def test_must_be_primitive_type_or_constraint(self):
     with self.assertRaises(TypeError) as e:
       t = [1, 2]
+
       @with_input_types(a=t)
       def foo(a):
         pass
@@ -781,6 +783,7 @@ class TakesDecoratorTestCase(TypeHintTestCase):
 
     with self.assertRaises(TypeError) as e:
       t = 5
+
       @check_type_hints
       @with_input_types(a=t)
       def foo(a):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/utils/dependency_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/dependency_test.py 
b/sdks/python/apache_beam/utils/dependency_test.py
index 4edcaa7..8a97f4b 100644
--- a/sdks/python/apache_beam/utils/dependency_test.py
+++ b/sdks/python/apache_beam/utils/dependency_test.py
@@ -336,6 +336,7 @@ class SetupTest(unittest.TestCase):
         'gs://my-gcs-bucket/gcs.tar.gz']
 
     gcs_copied_files = []
+
     def file_copy(from_path, to_path):
       if from_path.startswith('gs://'):
         gcs_copied_files.append(from_path)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/apache_beam/utils/retry_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/retry_test.py 
b/sdks/python/apache_beam/utils/retry_test.py
index 0a016b9..705c555 100644
--- a/sdks/python/apache_beam/utils/retry_test.py
+++ b/sdks/python/apache_beam/utils/retry_test.py
@@ -19,10 +19,10 @@
 
 import unittest
 
-from apache_beam.utils import retry
-
 from apitools.base.py.exceptions import HttpError
 
+from apache_beam.utils import retry
+
 
 class FakeClock(object):
   """A fake clock object implementing sleep() and recording calls."""

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/run_pylint.sh
----------------------------------------------------------------------
diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh
index e8062c0..4e0b129 100755
--- a/sdks/python/run_pylint.sh
+++ b/sdks/python/run_pylint.sh
@@ -16,10 +16,11 @@
 #    limitations under the License.
 #
 
-# This script will run pylint on files that changed compared to the current
-# HEAD of the branch.
+# This script will run pylint and pep8 on files that changed compared to the
+# current HEAD of the branch.
 #
 # Use "pylint apache_beam" to run pylint all files.
+# Use "pep8 apache_beam" to run pep8 all files.
 #
 # The exit-code of the script indicates success or a failure.
 
@@ -43,6 +44,8 @@ if test "$CHANGED_FILES"; then
   echo "Running pylint on changed files:"
   echo "$CHANGED_FILES"
   pylint $CHANGED_FILES
+  echo "Running pep8 on changed files:"
+  pep8 $CHANGED_FILES
 else
   echo "Not running pylint. No eligible files."
 fi

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57a4495d/sdks/python/tox.ini
----------------------------------------------------------------------
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index 356de57..29674ed 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -18,8 +18,18 @@
 [tox]
 envlist = py27
 
+[pep8]
+# Disable all errors and warnings except for the ones related to blank lines.
+# pylint does not check the number of blank lines.
+select = E3
+
+# Skip auto generated files (windmill_pb2.py, windmill_service_pb2.py)
+exclude = windmill_pb2.py, windmill_service_pb2.py
+
 [testenv:py27]
-deps=pylint
+deps=
+  pep8
+  pylint
 commands =
   python setup.py test
   {toxinidir}/run_pylint.sh

Reply via email to