[ 
https://issues.apache.org/jira/browse/BEAM-3731?focusedWorklogId=83041&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83041
 ]

ASF GitHub Bot logged work on BEAM-3731:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/Mar/18 02:36
            Start Date: 22/Mar/18 02:36
    Worklog Time Spent: 10m 
      Work Description: luke-zhu closed pull request #4730: [BEAM-3731] Enable 
tests to run in Python 3
URL: https://github.com/apache/beam/pull/4730
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd 
b/sdks/python/apache_beam/coders/coder_impl.pxd
index 8af394b6686..91768432d91 100644
--- a/sdks/python/apache_beam/coders/coder_impl.pxd
+++ b/sdks/python/apache_beam/coders/coder_impl.pxd
@@ -68,7 +68,6 @@ cdef class DeterministicFastPrimitivesCoderImpl(CoderImpl):
   cdef bint _check_safe(self, value) except -1
 
 
-cdef object NoneType
 cdef char UNKNOWN_TYPE, NONE_TYPE, INT_TYPE, FLOAT_TYPE, BOOL_TYPE
 cdef char STR_TYPE, UNICODE_TYPE, LIST_TYPE, TUPLE_TYPE, DICT_TYPE, SET_TYPE
 
diff --git a/sdks/python/apache_beam/coders/coder_impl.py 
b/sdks/python/apache_beam/coders/coder_impl.py
index b5b17899601..4bfb19a52a1 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -28,7 +28,7 @@
 """
 from __future__ import absolute_import
 
-from types import NoneType
+import six
 
 from apache_beam.coders import observable
 from apache_beam.utils import windowed_value
@@ -197,7 +197,7 @@ def __init__(self, coder, step_label):
     self._step_label = step_label
 
   def _check_safe(self, value):
-    if isinstance(value, (str, unicode, long, int, float)):
+    if isinstance(value, (str, six.text_type, long, int, float)):
       pass
     elif value is None:
       pass
@@ -277,7 +277,7 @@ def get_estimated_size_and_observables(self, value, 
nested=False):
 
   def encode_to_stream(self, value, stream, nested):
     t = type(value)
-    if t is NoneType:
+    if value is None:
       stream.write_byte(NONE_TYPE)
     elif t is int:
       stream.write_byte(INT_TYPE)
@@ -288,7 +288,7 @@ def encode_to_stream(self, value, stream, nested):
     elif t is str:
       stream.write_byte(STR_TYPE)
       stream.write(value, nested)
-    elif t is unicode:
+    elif t is six.text_type:
       unicode_value = value  # for typing
       stream.write_byte(UNICODE_TYPE)
       stream.write(unicode_value.encode('utf-8'), nested)
@@ -302,7 +302,7 @@ def encode_to_stream(self, value, stream, nested):
       dict_value = value  # for typing
       stream.write_byte(DICT_TYPE)
       stream.write_var_int64(len(dict_value))
-      for k, v in dict_value.iteritems():
+      for k, v in six.iteritems(dict_value):
         self.encode_to_stream(k, stream, True)
         self.encode_to_stream(v, stream, True)
     elif t is bool:
diff --git a/sdks/python/apache_beam/coders/coders.py 
b/sdks/python/apache_beam/coders/coders.py
index f7662586987..f3c99f730f3 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -22,17 +22,22 @@
 from __future__ import absolute_import
 
 import base64
-import cPickle as pickle
 
+# pylint: disable=ungrouped-imports
 import google.protobuf
+import six
 from google.protobuf import wrappers_pb2
 
+import six.moves.cPickle as pickle
 from apache_beam.coders import coder_impl
 from apache_beam.portability import common_urns
 from apache_beam.portability import python_urns
 from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.utils import proto_utils
 
+# pylint: enable=ungrouped-imports
+
+
 # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
 try:
   from .stream import get_varint_size
@@ -309,7 +314,7 @@ class ToStringCoder(Coder):
   """A default string coder used if no sink coder is specified."""
 
   def encode(self, value):
-    if isinstance(value, unicode):
+    if isinstance(value, six.text_type):
       return value.encode('utf-8')
     elif isinstance(value, str):
       return value
diff --git a/sdks/python/apache_beam/coders/fast_coders_test.py 
b/sdks/python/apache_beam/coders/fast_coders_test.py
index a13334a2c26..32089ced0e0 100644
--- a/sdks/python/apache_beam/coders/fast_coders_test.py
+++ b/sdks/python/apache_beam/coders/fast_coders_test.py
@@ -20,7 +20,6 @@
 import logging
 import unittest
 
-
 # Run all the standard coder test cases.
 from apache_beam.coders.coders_test_common import *
 
diff --git a/sdks/python/apache_beam/coders/slow_coders_test.py 
b/sdks/python/apache_beam/coders/slow_coders_test.py
index 97aa39ca094..ff4693e16d7 100644
--- a/sdks/python/apache_beam/coders/slow_coders_test.py
+++ b/sdks/python/apache_beam/coders/slow_coders_test.py
@@ -20,7 +20,6 @@
 import logging
 import unittest
 
-
 # Run all the standard coder test cases.
 from apache_beam.coders.coders_test_common import *
 
diff --git a/sdks/python/apache_beam/coders/stream_test.py 
b/sdks/python/apache_beam/coders/stream_test.py
index 15bc5eb9ba9..e509eadbd3f 100644
--- a/sdks/python/apache_beam/coders/stream_test.py
+++ b/sdks/python/apache_beam/coders/stream_test.py
@@ -17,6 +17,8 @@
 
 """Tests for the stream implementations."""
 
+from __future__ import absolute_import
+
 import logging
 import math
 import unittest
diff --git a/sdks/python/apache_beam/coders/typecoders.py 
b/sdks/python/apache_beam/coders/typecoders.py
index dd071d7a933..96b71744b9a 100644
--- a/sdks/python/apache_beam/coders/typecoders.py
+++ b/sdks/python/apache_beam/coders/typecoders.py
@@ -64,6 +64,10 @@ def MakeXyzs(v):
 See apache_beam.typehints.decorators module for more details.
 """
 
+from __future__ import absolute_import
+
+import six
+
 from apache_beam.coders import coders
 from apache_beam.typehints import typehints
 
@@ -84,7 +88,7 @@ def register_standard_coders(self, fallback_coder):
     self._register_coder_internal(float, coders.FloatCoder)
     self._register_coder_internal(str, coders.BytesCoder)
     self._register_coder_internal(bytes, coders.BytesCoder)
-    self._register_coder_internal(unicode, coders.StrUtf8Coder)
+    self._register_coder_internal(six.text_type, coders.StrUtf8Coder)
     self._register_coder_internal(typehints.TupleConstraint, coders.TupleCoder)
     # Default fallback coders applied in that order until the first matching
     # coder found.
diff --git a/sdks/python/apache_beam/io/avroio.py 
b/sdks/python/apache_beam/io/avroio.py
index 30fc8903283..71cf5b72147 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -42,7 +42,6 @@
 Avro file.
 """
 
-import cStringIO
 import os
 import zlib
 from functools import partial
@@ -60,6 +59,11 @@
 from apache_beam.io.iobase import Read
 from apache_beam.transforms import PTransform
 
+try:
+  from cStringIO import StringIO as BytesIO
+except ImportError:
+  from io import BytesIO
+
 __all__ = ['ReadFromAvro', 'ReadAllFromAvro', 'WriteToAvro']
 
 
@@ -311,7 +315,7 @@ def _decompress_bytes(data, codec):
       # We take care to avoid extra copies of data while slicing large objects
       # by use of a buffer.
       result = snappy.decompress(buffer(data)[:-4])
-      avroio.BinaryDecoder(cStringIO.StringIO(data[-4:])).check_crc32(result)
+      avroio.BinaryDecoder(BytesIO(data[-4:])).check_crc32(result)
       return result
     else:
       raise ValueError('Unknown codec: %r', codec)
@@ -321,7 +325,7 @@ def num_records(self):
 
   def records(self):
     decoder = avroio.BinaryDecoder(
-        cStringIO.StringIO(self._decompressed_block_bytes))
+        BytesIO(self._decompressed_block_bytes))
     reader = avroio.DatumReader(
         writers_schema=self._schema, readers_schema=self._schema)
 
diff --git a/sdks/python/apache_beam/io/concat_source_test.py 
b/sdks/python/apache_beam/io/concat_source_test.py
index 0f7dd547e76..c99add1dd7b 100644
--- a/sdks/python/apache_beam/io/concat_source_test.py
+++ b/sdks/python/apache_beam/io/concat_source_test.py
@@ -91,10 +91,10 @@ def test_conact_source(self):
                            RangeSource(12, 16),
                           ])
     self.assertEqual(list(source.read(source.get_range_tracker())),
-                     range(16))
+                     list(range(16)))
     self.assertEqual(list(source.read(source.get_range_tracker((1, None),
                                                                (2, 10)))),
-                     range(4, 10))
+                     list(range(4, 10)))
     range_tracker = source.get_range_tracker(None, None)
     self.assertEqual(range_tracker.position_at_fraction(0), (0, 0))
     self.assertEqual(range_tracker.position_at_fraction(.5), (2, 8))
@@ -176,10 +176,11 @@ def test_single_source(self):
     read_all = source_test_utils.read_from_source
 
     range10 = RangeSource(0, 10)
-    self.assertEquals(read_all(ConcatSource([range10])), range(10))
-    self.assertEquals(read_all(ConcatSource([range10]), (0, 5)), range(5, 10))
+    self.assertEquals(read_all(ConcatSource([range10])), list(range(10)))
+    self.assertEquals(read_all(ConcatSource([range10]), (0, 5)),
+                      list(range(5, 10)))
     self.assertEquals(read_all(ConcatSource([range10]), None, (0, 5)),
-                      range(5))
+                      list(range(5)))
 
   def test_source_with_empty_ranges(self):
     read_all = source_test_utils.read_from_source
@@ -189,11 +190,11 @@ def test_source_with_empty_ranges(self):
 
     range10 = RangeSource(0, 10)
     self.assertEquals(read_all(ConcatSource([empty, empty, range10])),
-                      range(10))
+                      list(range(10)))
     self.assertEquals(read_all(ConcatSource([empty, range10, empty])),
-                      range(10))
+                      list(range(10)))
     self.assertEquals(read_all(ConcatSource([range10, empty, range10, empty])),
-                      range(10) + range(10))
+                      list(range(10)) + list(range(10)))
 
   def test_source_with_empty_ranges_exhastive(self):
     empty = RangeSource(0, 0)
@@ -214,7 +215,7 @@ def test_run_concat_direct(self):
                           ])
     pipeline = TestPipeline()
     pcoll = pipeline | beam.io.Read(source)
-    assert_that(pcoll, equal_to(range(1000)))
+    assert_that(pcoll, equal_to(list(range(1000))))
 
     pipeline.run()
 
diff --git a/sdks/python/apache_beam/io/filebasedsink.py 
b/sdks/python/apache_beam/io/filebasedsink.py
index 5b3bbf200a1..ac3e43826fb 100644
--- a/sdks/python/apache_beam/io/filebasedsink.py
+++ b/sdks/python/apache_beam/io/filebasedsink.py
@@ -25,6 +25,8 @@
 import time
 import uuid
 
+import six
+
 from apache_beam.internal import util
 from apache_beam.io import iobase
 from apache_beam.io.filesystem import BeamIOError
@@ -73,10 +75,10 @@ def __init__(self,
       ~exceptions.ValueError: if **shard_name_template** is not of expected
         format.
     """
-    if not isinstance(file_path_prefix, (basestring, ValueProvider)):
+    if not isinstance(file_path_prefix, (six.string_types, ValueProvider)):
       raise TypeError('file_path_prefix must be a string or ValueProvider;'
                       'got %r instead' % file_path_prefix)
-    if not isinstance(file_name_suffix, (basestring, ValueProvider)):
+    if not isinstance(file_name_suffix, (six.string_types, ValueProvider)):
       raise TypeError('file_name_suffix must be a string or ValueProvider;'
                       'got %r instead' % file_name_suffix)
 
@@ -87,9 +89,9 @@ def __init__(self,
       shard_name_template = DEFAULT_SHARD_NAME_TEMPLATE
     elif shard_name_template == '':
       num_shards = 1
-    if isinstance(file_path_prefix, basestring):
+    if isinstance(file_path_prefix, six.string_types):
       file_path_prefix = StaticValueProvider(str, file_path_prefix)
-    if isinstance(file_name_suffix, basestring):
+    if isinstance(file_name_suffix, six.string_types):
       file_name_suffix = StaticValueProvider(str, file_name_suffix)
     self.file_path_prefix = file_path_prefix
     self.file_name_suffix = file_name_suffix
@@ -221,7 +223,7 @@ def _rename_batch(batch):
       except BeamIOError as exp:
         if exp.exception_details is None:
           raise
-        for (src, dest), exception in exp.exception_details.iteritems():
+        for (src, dest), exception in exp.exception_details.items():
           if exception:
             logging.warning('Rename not successful: %s -> %s, %s', src, dest,
                             exception)
@@ -243,7 +245,7 @@ def _rename_batch(batch):
         return exceptions
 
     exception_batches = util.run_using_threadpool(
-        _rename_batch, zip(source_file_batch, destination_file_batch),
+        _rename_batch, list(zip(source_file_batch, destination_file_batch)),
         num_threads)
 
     all_exceptions = [e for exception_batch in exception_batches
diff --git a/sdks/python/apache_beam/io/filebasedsource.py 
b/sdks/python/apache_beam/io/filebasedsource.py
index a80896c7818..e757fe242ec 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -26,7 +26,9 @@
 :class:`~apache_beam.io._AvroSource`.
 """
 
-from six import integer_types
+from __future__ import absolute_import
+
+import six
 
 from apache_beam.internal import pickler
 from apache_beam.io import concat_source
@@ -98,12 +100,12 @@ def __init__(self,
         result.
     """
 
-    if not isinstance(file_pattern, (basestring, ValueProvider)):
+    if not isinstance(file_pattern, (six.string_types, ValueProvider)):
       raise TypeError('%s: file_pattern must be of type string'
                       ' or ValueProvider; got %r instead'
                       % (self.__class__.__name__, file_pattern))
 
-    if isinstance(file_pattern, basestring):
+    if isinstance(file_pattern, six.string_types):
       file_pattern = StaticValueProvider(str, file_pattern)
     self._pattern = file_pattern
 
@@ -234,11 +236,11 @@ class _SingleFileSource(iobase.BoundedSource):
 
   def __init__(self, file_based_source, file_name, start_offset, stop_offset,
                min_bundle_size=0, splittable=True):
-    if not isinstance(start_offset, integer_types):
+    if not isinstance(start_offset, six.integer_types):
       raise TypeError(
           'start_offset must be a number. Received: %r' % start_offset)
     if stop_offset != range_trackers.OffsetRangeTracker.OFFSET_INFINITY:
-      if not isinstance(stop_offset, integer_types):
+      if not isinstance(stop_offset, six.integer_types):
         raise TypeError(
             'stop_offset must be a number. Received: %r' % stop_offset)
       if start_offset >= stop_offset:
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py 
b/sdks/python/apache_beam/io/filebasedsource_test.py
index 0110c3f683c..5cb6b1d0665 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -183,16 +183,16 @@ def setUp(self):
     filebasedsource.MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 2
 
   def test_read(self):
-    sources = [TestConcatSource.DummySource(range(start, start + 10)) for start
-               in [0, 10, 20]]
+    sources = [TestConcatSource.DummySource(range(start, start + 10))
+               for start in [0, 10, 20]]
     concat = ConcatSource(sources)
     range_tracker = concat.get_range_tracker(None, None)
     read_data = [value for value in concat.read(range_tracker)]
-    self.assertItemsEqual(range(30), read_data)
+    self.assertItemsEqual(list(range(30)), read_data)
 
   def test_split(self):
-    sources = [TestConcatSource.DummySource(range(start, start + 10)) for start
-               in [0, 10, 20]]
+    sources = [TestConcatSource.DummySource(range(start, start + 10))
+               for start in [0, 10, 20]]
     concat = ConcatSource(sources)
     splits = [split for split in concat.split()]
     self.assertEquals(6, len(splits))
@@ -205,11 +205,11 @@ def test_split(self):
           split.stop_position)
       read_data.extend([value for value in split.source.read(
           range_tracker_for_split)])
-    self.assertItemsEqual(range(30), read_data)
+    self.assertItemsEqual(list(range(30)), read_data)
 
   def test_estimate_size(self):
-    sources = [TestConcatSource.DummySource(range(start, start + 10)) for start
-               in [0, 10, 20]]
+    sources = [TestConcatSource.DummySource(range(start, start + 10))
+               for start in [0, 10, 20]]
     concat = ConcatSource(sources)
     self.assertEquals(30, concat.estimate_size())
 
diff --git a/sdks/python/apache_beam/io/filesystem.py 
b/sdks/python/apache_beam/io/filesystem.py
index 09739dc9445..87bba7e8dea 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -20,16 +20,20 @@
 
 import abc
 import bz2
-import cStringIO
 import logging
 import os
 import time
 import zlib
 
-from six import integer_types
+import six
 
 from apache_beam.utils.plugin import BeamPlugin
 
+try:
+  from cStringIO import StringIO as BytesIO
+except ImportError:
+  from io import BytesIO
+
 logger = logging.getLogger(__name__)
 
 DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
@@ -82,7 +86,7 @@ def detect_compression_type(cls, file_path):
     """Returns the compression type of a file (based on its suffix)."""
     compression_types_by_suffix = {'.bz2': cls.BZIP2, '.gz': cls.GZIP}
     lowercased_path = file_path.lower()
-    for suffix, compression_type in compression_types_by_suffix.iteritems():
+    for suffix, compression_type in compression_types_by_suffix.items():
       if lowercased_path.endswith(suffix):
         return compression_type
     return cls.UNCOMPRESSED
@@ -122,7 +126,7 @@ def __init__(self,
 
     if self.readable():
       self._read_size = read_size
-      self._read_buffer = cStringIO.StringIO()
+      self._read_buffer = BytesIO()
       self._read_position = 0
       self._read_eof = False
 
@@ -237,7 +241,7 @@ def readline(self):
     if not self._decompressor:
       raise ValueError('decompressor not initialized')
 
-    io = cStringIO.StringIO()
+    output_stream = BytesIO()
     while True:
       # Ensure that the internal buffer has at least half the read_size. Going
       # with half the _read_size (as opposed to a full _read_size) to ensure
@@ -246,11 +250,11 @@ def readline(self):
       self._fetch_to_internal_buffer(self._read_size / 2)
       line = self._read_from_internal_buffer(
           lambda: self._read_buffer.readline())
-      io.write(line)
+      output_stream.write(line)
       if line.endswith('\n') or not line:
         break  # Newline or EOF reached.
 
-    return io.getvalue()
+    return output_stream.getvalue()
 
   def closed(self):
     return not self._file or self._file.closed()
@@ -373,10 +377,12 @@ class FileMetadata(object):
   """Metadata about a file path that is the output of FileSystem.match
   """
   def __init__(self, path, size_in_bytes):
-    assert isinstance(path, basestring) and path, "Path should be a string"
-    assert isinstance(size_in_bytes, integer_types) and size_in_bytes >= 0, \
-        "Invalid value for size_in_bytes should %s (of type %s)" % (
-            size_in_bytes, type(size_in_bytes))
+    assert isinstance(path, six.string_types) and path, \
+        "Path should be a string"
+    assert (isinstance(size_in_bytes, six.integer_types)
+            and size_in_bytes >= 0), \
+        ("Invalid value for size_in_bytes should %s (of type %s)"
+         % (size_in_bytes, type(size_in_bytes)))
     self.path = path
     self.size_in_bytes = size_in_bytes
 
@@ -423,14 +429,13 @@ def __init__(self, msg, exception_details=None):
     self.exception_details = exception_details
 
 
-class FileSystem(BeamPlugin):
+class FileSystem(six.with_metaclass(abc.ABCMeta, BeamPlugin)):
   """A class that defines the functions that can be performed on a filesystem.
 
   All methods are abstract and they are for file system providers to
   implement. Clients should use the FileSystems class to interact with
   the correct file system based on the provided file pattern scheme.
   """
-  __metaclass__ = abc.ABCMeta
   CHUNK_SIZE = 1  # Chuck size in the batch operations
 
   def __init__(self, pipeline_options):
diff --git a/sdks/python/apache_beam/io/filesystemio.py 
b/sdks/python/apache_beam/io/filesystemio.py
index 35e141bb756..1572c6d4141 100644
--- a/sdks/python/apache_beam/io/filesystemio.py
+++ b/sdks/python/apache_beam/io/filesystemio.py
@@ -16,22 +16,24 @@
 #
 """Utilities for ``FileSystem`` implementations."""
 
+from __future__ import absolute_import
+
 import abc
 import io
 import os
 
+import six
+
 __all__ = ['Downloader', 'Uploader', 'DownloaderStream', 'UploaderStream',
            'PipeStream']
 
 
-class Downloader(object):
+class Downloader(six.with_metaclass(abc.ABCMeta, object)):
   """Download interface for a single file.
 
   Implementations should support random access reads.
   """
 
-  __metaclass__ = abc.ABCMeta
-
   @abc.abstractproperty
   def size(self):
     """Size of file to download."""
@@ -52,11 +54,9 @@ def get_range(self, start, end):
     """
 
 
-class Uploader(object):
+class Uploader(six.with_metaclass(abc.ABCMeta, object)):
   """Upload interface for a single file."""
 
-  __metaclass__ = abc.ABCMeta
-
   @abc.abstractmethod
   def put(self, data):
     """Write data to file sequentially.
diff --git a/sdks/python/apache_beam/io/filesystems_test.py 
b/sdks/python/apache_beam/io/filesystems_test.py
index d8cc9711550..d936e313af3 100644
--- a/sdks/python/apache_beam/io/filesystems_test.py
+++ b/sdks/python/apache_beam/io/filesystems_test.py
@@ -123,7 +123,7 @@ def test_match_file_exception(self):
     with self.assertRaisesRegexp(BeamIOError,
                                  r'^Unable to get the Filesystem') as error:
       FileSystems.match([None])
-    self.assertEqual(error.exception.exception_details.keys(), [None])
+    self.assertEqual(list(error.exception.exception_details.keys()), [None])
 
   def test_match_directory(self):
     path1 = os.path.join(self.tmpdir, 'f1')
@@ -157,7 +157,8 @@ def test_copy_error(self):
     with self.assertRaisesRegexp(BeamIOError,
                                  r'^Copy operation failed') as error:
       FileSystems.copy([path1], [path2])
-    self.assertEqual(error.exception.exception_details.keys(), [(path1, 
path2)])
+    self.assertEqual(list(error.exception.exception_details.keys()),
+                     [(path1, path2)])
 
   def test_copy_directory(self):
     path_t1 = os.path.join(self.tmpdir, 't1')
@@ -189,7 +190,8 @@ def test_rename_error(self):
     with self.assertRaisesRegexp(BeamIOError,
                                  r'^Rename operation failed') as error:
       FileSystems.rename([path1], [path2])
-    self.assertEqual(error.exception.exception_details.keys(), [(path1, 
path2)])
+    self.assertEqual(list(error.exception.exception_details.keys()),
+                     [(path1, path2)])
 
   def test_rename_directory(self):
     path_t1 = os.path.join(self.tmpdir, 't1')
@@ -230,4 +232,4 @@ def test_delete_error(self):
     with self.assertRaisesRegexp(BeamIOError,
                                  r'^Delete operation failed') as error:
       FileSystems.delete([path1])
-    self.assertEqual(error.exception.exception_details.keys(), [path1])
+    self.assertEqual(list(error.exception.exception_details.keys()), [path1])
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index a79ad5e3985..109cecafc54 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -120,6 +120,8 @@
 import time
 import uuid
 
+import six
+
 from apache_beam import coders
 from apache_beam.internal.gcp import auth
 from apache_beam.internal.gcp.json_value import from_json_value
@@ -210,7 +212,7 @@ def decode(self, encoded_table_row):
     od = json.loads(
         encoded_table_row, object_pairs_hook=collections.OrderedDict)
     return bigquery.TableRow(
-        f=[bigquery.TableCell(v=to_json_value(e)) for e in od.itervalues()])
+        f=[bigquery.TableCell(v=to_json_value(e)) for e in six.itervalues(od)])
 
 
 def parse_table_schema_from_json(schema_string):
@@ -522,7 +524,7 @@ def __init__(self, table, dataset=None, project=None, 
schema=None,
 
     self.table_reference = _parse_table_reference(table, dataset, project)
     # Transform the table schema into a bigquery.TableSchema instance.
-    if isinstance(schema, basestring):
+    if isinstance(schema, six.string_types):
       # TODO(silviuc): Should add a regex-based validation of the format.
       table_schema = bigquery.TableSchema()
       schema_list = [s.strip(' ') for s in schema.split(',')]
@@ -1101,7 +1103,7 @@ def insert_rows(self, project_id, dataset_id, table_id, 
rows):
     final_rows = []
     for row in rows:
       json_object = bigquery.JsonObject()
-      for k, v in row.iteritems():
+      for k, v in six.iteritems(row):
         json_object.additionalProperties.append(
             bigquery.JsonObject.AdditionalProperty(
                 key=k, value=to_json_value(v)))
@@ -1413,7 +1415,7 @@ def get_dict_table_schema(schema):
       return schema
     elif schema is None:
       return schema
-    elif isinstance(schema, basestring):
+    elif isinstance(schema, six.string_types):
       table_schema = WriteToBigQuery.get_table_schema_from_string(schema)
       return WriteToBigQuery.table_schema_to_dict(table_schema)
     elif isinstance(schema, bigquery.TableSchema):
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index ff6721e6d91..2a4c31f5be7 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -698,7 +698,7 @@ def test_rows_are_written(self):
     sample_row = {'i': 1, 'b': True, 's': 'abc', 'f': 3.14}
     expected_rows = []
     json_object = bigquery.JsonObject()
-    for k, v in sample_row.iteritems():
+    for k, v in sample_row.items():
       json_object.additionalProperties.append(
           bigquery.JsonObject.AdditionalProperty(
               key=k, value=to_json_value(v)))
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py 
b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
index e131f93d520..5b7d56ae924 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
@@ -179,7 +179,8 @@ def check_DatastoreWriteFn(self, num_entities):
       entities = [e.entity for e in
                   fake_datastore.create_entities(num_entities)]
 
-      expected_mutations = map(WriteToDatastore.to_upsert_mutation, entities)
+      expected_mutations = list(map(WriteToDatastore.to_upsert_mutation,
+                                    entities))
       actual_mutations = []
 
       self._mock_datastore.commit.side_effect = (
@@ -217,7 +218,7 @@ def test_DatastoreWriteLargeEntities(self):
 
   def verify_unique_keys(self, queries):
     """A helper function that verifies if all the queries have unique keys."""
-    keys, _ = zip(*queries)
+    keys, _ = list(zip(*queries))
     keys = set(keys)
     self.assertEqual(len(keys), len(queries))
 
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py 
b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
index b86a2fa0145..45472e64541 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
@@ -20,12 +20,16 @@
 For internal use only; no backwards-compatibility guarantees.
 """
 
+from __future__ import absolute_import
+
 import errno
 import logging
 import sys
 import time
 from socket import error as SocketError
 
+import six
+
 # pylint: disable=ungrouped-imports
 from apache_beam.internal.gcp import auth
 from apache_beam.utils import retry
@@ -251,11 +255,14 @@ def make_kind_stats_query(namespace, kind, 
latest_timestamp):
   else:
     kind_stat_query.kind.add().name = '__Stat_Ns_Kind__'
 
-  kind_filter = datastore_helper.set_property_filter(
-      query_pb2.Filter(), 'kind_name', PropertyFilter.EQUAL, unicode(kind))
-  timestamp_filter = datastore_helper.set_property_filter(
-      query_pb2.Filter(), 'timestamp', PropertyFilter.EQUAL,
-      latest_timestamp)
+  kind_filter = datastore_helper.set_property_filter(query_pb2.Filter(),
+                                                     'kind_name',
+                                                     PropertyFilter.EQUAL,
+                                                     six.text_type(kind))
+  timestamp_filter = datastore_helper.set_property_filter(query_pb2.Filter(),
+                                                          'timestamp',
+                                                          PropertyFilter.EQUAL,
+                                                          latest_timestamp)
 
   datastore_helper.set_composite_filter(kind_stat_query.filter,
                                         CompositeFilter.AND, kind_filter,
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py 
b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
index ce8b5e6e424..8c4aa7cc969 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
@@ -18,6 +18,8 @@
 
 from __future__ import absolute_import
 
+import six
+
 from apache_beam.io.filesystem import BeamIOError
 from apache_beam.io.filesystem import CompressedFile
 from apache_beam.io.filesystem import CompressionTypes
@@ -123,7 +125,7 @@ def _match(pattern, limit):
         pattern += '*'
       file_sizes = gcsio.GcsIO().size_of_files_in_glob(pattern, limit)
       metadata_list = [FileMetadata(path, size)
-                       for path, size in file_sizes.iteritems()]
+                       for path, size in six.iteritems(file_sizes)]
       return MatchResult(pattern, metadata_list)
 
     exceptions = {}
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py 
b/sdks/python/apache_beam/io/gcp/gcsio.py
index 3bdf2e64ca2..c39ad622509 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -20,7 +20,6 @@
 https://github.com/GoogleCloudPlatform/appengine-gcs-client.
 """
 
-import cStringIO
 import errno
 import fnmatch
 import io
@@ -41,6 +40,13 @@
 from apache_beam.io.filesystemio import UploaderStream
 from apache_beam.utils import retry
 
+# pylint: disable=ungrouped-imports
+try:
+  from cStringIO import StringIO as BytesIO
+except ImportError:
+  from io import BytesIO
+# pylint: enable=ungrouped-imports
+
 __all__ = ['GcsIO']
 
 
@@ -490,7 +496,7 @@ def __init__(self, client, path, buffer_size):
     self._get_request.generation = metadata.generation
 
     # Initialize read buffer state.
-    self._download_stream = cStringIO.StringIO()
+    self._download_stream = BytesIO()
     self._downloader = transfer.Download(
         self._download_stream, auto_transfer=False, 
chunksize=self._buffer_size)
     self._client.objects.Get(self._get_request, download=self._downloader)
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py 
b/sdks/python/apache_beam/io/gcp/pubsub.py
index 98aa884c71d..f67fde67a8f 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -26,6 +26,8 @@
 
 import re
 
+import six
+
 from apache_beam import coders
 from apache_beam.io.iobase import Read
 from apache_beam.io.iobase import Write
@@ -72,7 +74,7 @@ def expand(self, pvalue):
     pcoll = pvalue.pipeline | Read(self._source)
     pcoll.element_type = bytes
     pcoll = pcoll | 'DecodeString' >> Map(lambda b: b.decode('utf-8'))
-    pcoll.element_type = unicode
+    pcoll.element_type = six.text_type
     return pcoll
 
 
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py 
b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index 8bd9fa4f41a..848c4b6cc3b 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -17,10 +17,13 @@
 
 """Unit tests for PubSub sources and sinks."""
 
+from __future__ import absolute_import
+
 import logging
 import unittest
 
 import hamcrest as hc
+import six
 
 import apache_beam as beam
 from apache_beam.io.gcp.pubsub import ReadStringsFromPubSub
@@ -52,7 +55,7 @@ def test_expand_with_topic(self):
                                      None, 'a_label')
              | beam.Map(lambda x: x))
     # Ensure that the output type is str.
-    self.assertEqual(unicode, pcoll.element_type)
+    self.assertEqual(six.text_type, pcoll.element_type)
 
     # Apply the necessary PTransformOverrides.
     overrides = _get_transform_overrides(p.options)
@@ -76,7 +79,7 @@ def test_expand_with_subscription(self):
                  'a_label')
              | beam.Map(lambda x: x))
     # Ensure that the output type is str
-    self.assertEqual(unicode, pcoll.element_type)
+    self.assertEqual(six.text_type, pcoll.element_type)
 
     # Apply the necessary PTransformOverrides.
     overrides = _get_transform_overrides(p.options)
diff --git a/sdks/python/apache_beam/io/hadoopfilesystem_test.py 
b/sdks/python/apache_beam/io/hadoopfilesystem_test.py
index 2ba1da26468..fbfd6439fd1 100644
--- a/sdks/python/apache_beam/io/hadoopfilesystem_test.py
+++ b/sdks/python/apache_beam/io/hadoopfilesystem_test.py
@@ -116,7 +116,7 @@ def list(self, path, status=False):
       raise ValueError('list must be called on a directory, got file: %s', 
path)
 
     result = []
-    for file in self.files.itervalues():
+    for file in self.files.values():
       if file.stat['path'].startswith(path):
         fs = file.get_file_status()
         result.append((fs[hdfs._FILE_STATUS_PATH_SUFFIX], fs))
diff --git a/sdks/python/apache_beam/io/localfilesystem_test.py 
b/sdks/python/apache_beam/io/localfilesystem_test.py
index 29b68f61c34..205fef86c65 100644
--- a/sdks/python/apache_beam/io/localfilesystem_test.py
+++ b/sdks/python/apache_beam/io/localfilesystem_test.py
@@ -145,7 +145,7 @@ def test_match_file_exception(self):
     with self.assertRaisesRegexp(BeamIOError,
                                  r'^Match operation failed') as error:
       self.fs.match([None])
-    self.assertEqual(error.exception.exception_details.keys(), [None])
+    self.assertEqual(list(error.exception.exception_details.keys()), [None])
 
   def test_match_glob(self):
     path1 = os.path.join(self.tmpdir, 'f1')
@@ -179,7 +179,8 @@ def test_copy_error(self):
     with self.assertRaisesRegexp(BeamIOError,
                                  r'^Copy operation failed') as error:
       self.fs.copy([path1], [path2])
-    self.assertEqual(error.exception.exception_details.keys(), [(path1, 
path2)])
+    self.assertEqual(list(error.exception.exception_details.keys()),
+                     [(path1, path2)])
 
   def test_copy_directory(self):
     path_t1 = os.path.join(self.tmpdir, 't1')
@@ -211,7 +212,8 @@ def test_rename_error(self):
     with self.assertRaisesRegexp(BeamIOError,
                                  r'^Rename operation failed') as error:
       self.fs.rename([path1], [path2])
-    self.assertEqual(error.exception.exception_details.keys(), [(path1, 
path2)])
+    self.assertEqual(list(error.exception.exception_details.keys()),
+                     [(path1, path2)])
 
   def test_rename_directory(self):
     path_t1 = os.path.join(self.tmpdir, 't1')
@@ -252,4 +254,4 @@ def test_delete_error(self):
     with self.assertRaisesRegexp(BeamIOError,
                                  r'^Delete operation failed') as error:
       self.fs.delete([path1])
-    self.assertEqual(error.exception.exception_details.keys(), [path1])
+    self.assertEqual(list(error.exception.exception_details.keys()), [path1])
diff --git a/sdks/python/apache_beam/io/tfrecordio_test.py 
b/sdks/python/apache_beam/io/tfrecordio_test.py
index c540cfae050..dc1dff604ca 100644
--- a/sdks/python/apache_beam/io/tfrecordio_test.py
+++ b/sdks/python/apache_beam/io/tfrecordio_test.py
@@ -445,7 +445,7 @@ def test_end2end_example_proto(self):
       file_path_prefix = temp_dir.create_temp_file('result')
 
       example = tf.train.Example()
-      example.features.feature['int'].int64_list.value.extend(range(3))
+      example.features.feature['int'].int64_list.value.extend(list(range(3)))
       example.features.feature['bytes'].bytes_list.value.extend(
           [b'foo', b'bar'])
 
diff --git a/sdks/python/apache_beam/io/vcfio.py 
b/sdks/python/apache_beam/io/vcfio.py
index a0206d45076..5b85026209f 100644
--- a/sdks/python/apache_beam/io/vcfio.py
+++ b/sdks/python/apache_beam/io/vcfio.py
@@ -26,6 +26,7 @@
 import traceback
 from collections import namedtuple
 
+import six
 import vcf
 
 from apache_beam.coders import coders
@@ -369,7 +370,7 @@ def _convert_to_variant_record(self, record, infos, 
formats):
       if record.FILTER is not None:
         variant.filters.extend(
             record.FILTER if record.FILTER else [PASS_FILTER])
-      for k, v in record.INFO.iteritems():
+      for k, v in six.iteritems(record.INFO):
         # Special case: END info value specifies end of the record, so adjust
         # variant.end and do not include it as part of variant.info.
         if k == END_INFO_KEY:
@@ -404,7 +405,7 @@ def _convert_to_variant_record(self, record, infos, 
formats):
           # Note: this is already done for INFO fields in PyVCF.
           if (field in formats and
               formats[field].num is None and
-              isinstance(data, (int, float, long, basestring, bool))):
+              isinstance(data, (int, float, long, six.string_types, bool))):
             data = [data]
           call.info[field] = data
         variant.calls.append(call)
diff --git a/sdks/python/apache_beam/options/pipeline_options.py 
b/sdks/python/apache_beam/options/pipeline_options.py
index 7a2cd4bf1e4..2ebf5c05e27 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -175,7 +175,7 @@ def from_dictionary(cls, options):
       A PipelineOptions object representing the given arguments.
     """
     flags = []
-    for k, v in options.iteritems():
+    for k, v in options.items():
       if isinstance(v, bool):
         if v:
           flags.append('--%s' % k)
@@ -233,7 +233,7 @@ def _visible_option_list(self):
                   for option in dir(self._visible_options) if option[0] != '_')
 
   def __dir__(self):
-    return sorted(dir(type(self)) + self.__dict__.keys() +
+    return sorted(dir(type(self)) + list(self.__dict__.keys()) +
                   self._visible_option_list())
 
   def __getattr__(self, name):
diff --git a/sdks/python/apache_beam/pipeline.py 
b/sdks/python/apache_beam/pipeline.py
index 71d97ba5d21..f0e41290cca 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -53,6 +53,8 @@
 import shutil
 import tempfile
 
+import six
+
 from apache_beam import pvalue
 from apache_beam.internal import pickler
 from apache_beam.io.filesystems import FileSystems
@@ -859,7 +861,7 @@ def is_side_input(tag):
     return result
 
 
-class PTransformOverride(object):
+class PTransformOverride(six.with_metaclass(abc.ABCMeta, object)):
   """For internal use only; no backwards-compatibility guarantees.
 
   Gives a matcher and replacements for matching PTransforms.
@@ -867,7 +869,6 @@ class PTransformOverride(object):
   TODO: Update this to support cases where input and/our output types are
   different.
   """
-  __metaclass__ = abc.ABCMeta
 
   @abc.abstractmethod
   def matches(self, applied_ptransform):
diff --git a/sdks/python/apache_beam/pvalue.py 
b/sdks/python/apache_beam/pvalue.py
index 236c14bcbc6..bdb1bf3a2f1 100644
--- a/sdks/python/apache_beam/pvalue.py
+++ b/sdks/python/apache_beam/pvalue.py
@@ -29,6 +29,8 @@
 import collections
 import itertools
 
+import six
+
 from apache_beam import coders
 from apache_beam import typehints
 from apache_beam.internal import pickler
@@ -259,7 +261,7 @@ class TaggedOutput(object):
   """
 
   def __init__(self, tag, value):
-    if not isinstance(tag, basestring):
+    if not isinstance(tag, six.string_types):
       raise TypeError(
           'Attempting to create a TaggedOutput with non-string tag %s' % tag)
     self.tag = tag
diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index d5ca68307aa..df30db4a15b 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -22,9 +22,13 @@
 For internal use only; no backwards-compatibility guarantees.
 """
 
+from __future__ import absolute_import
+
 import sys
 import traceback
 
+import six
+
 from apache_beam.internal import util
 from apache_beam.metrics.execution import ScopedMetricsContainer
 from apache_beam.pvalue import TaggedOutput
@@ -512,7 +516,7 @@ def _reraise_augmented(self, exn):
           traceback.format_exception_only(type(exn), exn)[-1].strip()
           + step_annotation)
       new_exn._tagged_with_step = True
-    raise new_exn, None, original_traceback
+    six.reraise(new_exn, None, original_traceback)
 
 
 class OutputProcessor(object):
@@ -549,7 +553,7 @@ def process_outputs(self, windowed_input_element, results):
       tag = None
       if isinstance(result, TaggedOutput):
         tag = result.tag
-        if not isinstance(tag, basestring):
+        if not isinstance(tag, six.string_types):
           raise TypeError('In %s, tag %s is not a string' % (self, tag))
         result = result.value
       if isinstance(result, WindowedValue):
@@ -591,7 +595,7 @@ def finish_bundle_outputs(self, results):
       tag = None
       if isinstance(result, TaggedOutput):
         tag = result.tag
-        if not isinstance(tag, basestring):
+        if not isinstance(tag, six.string_types):
           raise TypeError('In %s, tag %s is not a string' % (self, tag))
         result = result.value
 
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
index 2e0bc8209ec..fe5c67cc977 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
@@ -145,7 +145,7 @@ def _populate_metric_results(self, response):
 
     # Now we create the MetricResult elements.
     result = []
-    for metric_key, metric in metrics_by_name.iteritems():
+    for metric_key, metric in metrics_by_name.items():
       attempted = self._get_metric_value(metric['tentative'])
       committed = self._get_metric_value(metric['committed'])
       if attempted is None or committed is None:
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
index dd3cbe1156a..79738700269 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
@@ -34,7 +34,7 @@
 class DictToObject(object):
   """Translate from a dict(list()) structure to an object structure"""
   def __init__(self, data):
-    for name, value in data.iteritems():
+    for name, value in data.items():
       setattr(self, name, self._wrap(value))
 
   def _wrap(self, value):
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index bfec89310e9..fbbfb97d036 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -25,7 +25,6 @@
 import threading
 import time
 import traceback
-import urllib
 from collections import defaultdict
 
 import apache_beam as beam
@@ -51,6 +50,8 @@
 from apache_beam.transforms.display import DisplayData
 from apache_beam.typehints import typehints
 from apache_beam.utils.plugin import BeamPlugin
+from six.moves.urllib.parse import quote
+from six.moves.urllib.parse import unquote
 
 __all__ = ['DataflowRunner']
 
@@ -883,12 +884,12 @@ def deserialize_windowing_strategy(cls, serialized_data):
   @staticmethod
   def byte_array_to_json_string(raw_bytes):
     """Implements 
org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString."""
-    return urllib.quote(raw_bytes)
+    return quote(raw_bytes)
 
   @staticmethod
   def json_string_to_byte_array(encoded_string):
     """Implements 
org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray."""
-    return urllib.unquote(encoded_string)
+    return unquote(encoded_string)
 
 
 class DataflowPipelineResult(PipelineResult):
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 1cf80b79902..01866895279 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -19,6 +19,8 @@
 
 Dataflow client utility functions."""
 
+from __future__ import absolute_import
+
 import codecs
 import getpass
 import json
@@ -29,6 +31,7 @@
 from datetime import datetime
 from StringIO import StringIO
 
+import six
 from apitools.base.py import encoding
 from apitools.base.py import exceptions
 
@@ -251,7 +254,7 @@ def __init__(self, packages, options, environment_version, 
pipeline_url):
           dataflow.Environment.SdkPipelineOptionsValue())
 
       options_dict = {k: v
-                      for k, v in sdk_pipeline_options.iteritems()
+                      for k, v in sdk_pipeline_options.items()
                       if v is not None}
       options_dict["pipelineUrl"] = pipeline_url
       self.proto.sdkPipelineOptions.additionalProperties.append(
@@ -287,7 +290,7 @@ def encode_shortstrings(input_buffer, errors='strict'):
     def decode_shortstrings(input_buffer, errors='strict'):
       """Decoder (to Unicode) that suppresses long base64 strings."""
       shortened, length = encode_shortstrings(input_buffer, errors)
-      return unicode(shortened), length
+      return six.text_type(shortened), length
 
     def shortstrings_registerer(encoding_name):
       if encoding_name == 'shortstrings':
diff --git 
a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/message_matchers.py
 
b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/message_matchers.py
index 805473a8838..35c6ed24298 100644
--- 
a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/message_matchers.py
+++ 
b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/message_matchers.py
@@ -49,7 +49,7 @@ def _matches(self, item):
     if self.origin != IGNORED and item.origin != self.origin:
       return False
     if self.context != IGNORED:
-      for key, name in self.context.iteritems():
+      for key, name in self.context.items():
         if key not in item.context:
           return False
         if name != IGNORED and item.context[key] != name:
diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py 
b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
index 2f2316f6f1d..cba7b22d5e9 100644
--- a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
+++ b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
@@ -31,7 +31,7 @@
 def _dict_printable_fields(dict_object, skip_fields):
   """Returns a list of strings for the interesting fields of a dict."""
   return ['%s=%r' % (name, value)
-          for name, value in dict_object.iteritems()
+          for name, value in dict_object.items()
           # want to output value 0 but not None nor []
           if (value or value == 0)
           and name not in skip_fields]
diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py 
b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index 46176c9e969..1f0bdbfa4a1 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -231,7 +231,7 @@ def handle_result(
 
       # Commit partial GBK states
       existing_keyed_state = self._transform_keyed_states[result.transform]
-      for k, v in result.partial_keyed_state.iteritems():
+      for k, v in result.partial_keyed_state.items():
         existing_keyed_state[k] = v
       return committed_bundles
 
diff --git a/sdks/python/apache_beam/runners/direct/executor.py 
b/sdks/python/apache_beam/runners/direct/executor.py
index 34c12c345a0..0c18a9d35af 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -22,12 +22,14 @@
 import collections
 import itertools
 import logging
-import Queue
 import sys
 import threading
 import traceback
 from weakref import WeakValueDictionary
 
+import six
+
+import six.moves.queue as queue
 from apache_beam.metrics.execution import MetricsContainer
 from apache_beam.metrics.execution import ScopedMetricsContainer
 
@@ -76,7 +78,7 @@ def _get_task_or_none(self):
         # shutdown.
         return self.queue.get(
             timeout=_ExecutorService._ExecutorServiceWorker.TIMEOUT)
-      except Queue.Empty:
+      except queue.Empty:
         return None
 
     def run(self):
@@ -95,7 +97,7 @@ def shutdown(self):
       self.shutdown_requested = True
 
   def __init__(self, num_workers):
-    self.queue = Queue.Queue()
+    self.queue = queue.Queue()
     self.workers = [_ExecutorService._ExecutorServiceWorker(
         self.queue, i) for i in range(num_workers)]
     self.shutdown_requested = False
@@ -120,7 +122,7 @@ def shutdown(self):
       try:
         self.queue.get_nowait()
         self.queue.task_done()
-      except Queue.Empty:
+      except queue.Empty:
         continue
     # All existing threads will eventually terminate (after they complete their
     # last task).
@@ -398,7 +400,7 @@ def await_completion(self):
     try:
       if update.exception:
         t, v, tb = update.exc_info
-        raise t, v, tb
+        six.reraise(t, v, tb)
     finally:
       self.executor_service.shutdown()
       self.executor_service.await_completion()
@@ -438,14 +440,14 @@ class _TypedUpdateQueue(object):
 
     def __init__(self, item_type):
       self._item_type = item_type
-      self._queue = Queue.Queue()
+      self._queue = queue.Queue()
 
     def poll(self):
       try:
         item = self._queue.get_nowait()
         self._queue.task_done()
         return  item
-      except Queue.Empty:
+      except queue.Empty:
         return None
 
     def take(self):
@@ -458,7 +460,7 @@ def take(self):
           item = self._queue.get(timeout=1)
           self._queue.task_done()
           return item
-        except Queue.Empty:
+        except queue.Empty:
           pass
 
     def offer(self, item):
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py 
b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index cbd2c9fbc25..cd5e533ffb3 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -563,7 +563,7 @@ def process_element(self, element):
 
   def finish_bundle(self):
     self.runner.finish()
-    bundles = self._tagged_receivers.values()
+    bundles = list(self._tagged_receivers.values())
     result_counters = self._counter_factory.get_counters()
     return TransformResult(
         self, bundles, [], result_counters, None,
@@ -705,7 +705,7 @@ def process_element(self, element):
   def finish_bundle(self):
     bundles = []
     bundle = None
-    for encoded_k, vs in self.gbk_items.iteritems():
+    for encoded_k, vs in self.gbk_items.items():
       if not bundle:
         bundle = self._evaluation_context.create_bundle(
             self.output_pcollection)
diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py 
b/sdks/python/apache_beam/runners/direct/watermark_manager.py
index 084073f4fe7..3634cdd448c 100644
--- a/sdks/python/apache_beam/runners/direct/watermark_manager.py
+++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py
@@ -194,7 +194,7 @@ def output_watermark(self):
 
   def hold(self, keyed_earliest_holds):
     with self._lock:
-      for key, hold_value in keyed_earliest_holds.iteritems():
+      for key, hold_value in keyed_earliest_holds.items():
         self._keyed_earliest_holds[key] = hold_value
         if (hold_value is None or
             hold_value == WatermarkManager.WATERMARK_POS_INF):
@@ -256,7 +256,7 @@ def extract_transform_timers(self):
     with self._lock:
       fired_timers = []
       has_realtime_timer = False
-      for encoded_key, state in self._keyed_states.iteritems():
+      for encoded_key, state in self._keyed_states.items():
         timers, had_realtime_timer = state.get_timers(
             watermark=self._input_watermark,
             processing_time=self._clock.time())
diff --git 
a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py
 
b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py
index 84bed4270bc..4dcd074d92b 100644
--- 
a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py
+++ 
b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py
@@ -59,7 +59,7 @@ def run(self, pipeline):
     # Submit the job to the RPC co-process
     jobName = ('Job-' +
                ''.join(random.choice(string.ascii_uppercase) for _ in 
range(6)))
-    options = {k: v for k, v in pipeline._options.get_all_options().iteritems()
+    options = {k: v for k, v in pipeline._options.get_all_options().items()
                if v is not None}
 
     try:
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index c36bae7114a..e0fadd892fe 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -20,7 +20,6 @@
 import collections
 import copy
 import logging
-import Queue as queue
 import re
 import threading
 import time
@@ -28,7 +27,9 @@
 
 import grpc
 
-import apache_beam as beam  # pylint: disable=ungrouped-imports
+# pylint: disable=ungrouped-imports,wrong-import-order
+import apache_beam as beam
+import six.moves.queue as queue
 from apache_beam import coders
 from apache_beam import metrics
 from apache_beam.coders import WindowedValueCoder
@@ -50,6 +51,9 @@
 from apache_beam.transforms.window import GlobalWindows
 from apache_beam.utils import proto_utils
 
+# pylint: enable=ungrouped-imports,wrong-import-order
+
+
 # This module is experimental. No backwards-compatibility guarantees.
 
 
@@ -1162,7 +1166,7 @@ def run(self):
         self._latest_progress = progress_result.process_bundle_progress
         if self._callback:
           self._callback(self._latest_progress)
-      except Exception, exn:
+      except Exception as exn:
         logging.error("Bad progress: %s", exn)
       time.sleep(self._frequency)
 
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index e27da621e48..cb56f3d2b57 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -14,6 +14,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+from __future__ import absolute_import
+from __future__ import print_function
+
 import functools
 import logging
 import time
@@ -66,17 +69,17 @@ def cross_product(elem, sides):
           res,
           equal_to([
               # The window [0, 5) maps to the window [0, 7).
-              (0, range(7)),
-              (1, range(7)),
-              (2, range(7)),
-              (3, range(7)),
-              (4, range(7)),
+              (0, list(range(7))),
+              (1, list(range(7))),
+              (2, list(range(7))),
+              (3, list(range(7))),
+              (4, list(range(7))),
               # The window [5, 10) maps to the window [7, 14).
-              (5, range(7, 10)),
-              (6, range(7, 10)),
-              (7, range(7, 10)),
-              (8, range(7, 10)),
-              (9, range(7, 10))]),
+              (5, list(range(7, 10))),
+              (6, list(range(7, 10))),
+              (7, list(range(7, 10))),
+              (8, list(range(7, 10))),
+              (9, list(range(7, 10)))]),
           label='windowed')
 
   def test_flattened_side_input(self):
@@ -218,7 +221,7 @@ def test_progress_metrics(self):
           m_out.processed_elements.measured.output_element_counts['twice'])
 
     except:
-      print res._metrics_by_stage
+      print(res._metrics_by_stage)
       raise
 
   # Inherits all tests from maptask_executor_runner.MapTaskExecutorRunner
diff --git 
a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py 
b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
index 74c6b03f5a6..df2c38e30eb 100644
--- a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
+++ b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
@@ -391,7 +391,7 @@ def append(self, pair):
   def freeze(self):
     if not self.frozen:
       self._encoded_elements = [self.grouped_coder.encode(kv)
-                                for kv in self.elements.iteritems()]
+                                for kv in self.elements.items()]
     self.frozen = True
     return self._encoded_elements
 
diff --git 
a/sdks/python/apache_beam/runners/portability/universal_local_runner.py 
b/sdks/python/apache_beam/runners/portability/universal_local_runner.py
index a631a0c847b..32fec0ca6a3 100644
--- a/sdks/python/apache_beam/runners/portability/universal_local_runner.py
+++ b/sdks/python/apache_beam/runners/portability/universal_local_runner.py
@@ -18,7 +18,6 @@
 import functools
 import logging
 import os
-import Queue as queue
 import socket
 import subprocess
 import sys
@@ -31,6 +30,7 @@
 import grpc
 from google.protobuf import text_format
 
+import six.moves.queue as queue
 from apache_beam.portability.api import beam_fn_api_pb2_grpc
 from apache_beam.portability.api import beam_job_api_pb2
 from apache_beam.portability.api import beam_job_api_pb2_grpc
diff --git 
a/sdks/python/apache_beam/runners/portability/universal_local_runner_test.py 
b/sdks/python/apache_beam/runners/portability/universal_local_runner_test.py
index 1fc244b20f2..1df3f809cd6 100644
--- a/sdks/python/apache_beam/runners/portability/universal_local_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/universal_local_runner_test.py
@@ -15,6 +15,8 @@
 # limitations under the License.
 #
 
+from __future__ import print_function
+
 import logging
 import platform
 import signal
@@ -41,13 +43,13 @@ def setUp(self):
     if platform.system() != 'Windows':
       def handler(signum, frame):
         msg = 'Timed out after %s seconds.' % self.TIMEOUT_SECS
-        print '=' * 20, msg, '=' * 20
+        print('=' * 20, msg, '=' * 20)
         traceback.print_stack(frame)
         threads_by_id = {th.ident: th for th in threading.enumerate()}
         for thread_id, stack in sys._current_frames().items():
           th = threads_by_id.get(thread_id)
-          print
-          print '# Thread:', th or thread_id
+          print()
+          print('# Thread:', th or thread_id)
           traceback.print_stack(stack)
         raise BaseException(msg)
       signal.signal(signal.SIGALRM, handler)
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py 
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 2e853b072ca..5520890cf39 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -26,6 +26,8 @@
 import json
 import logging
 
+import six
+
 import apache_beam as beam
 from apache_beam.coders import WindowedValueCoder
 from apache_beam.coders import coder_impl
@@ -101,7 +103,7 @@ def __init__(self, operation_name, step_name, consumers, 
counter_factory,
     # We must do this manually as we don't have a spec or spec.output_coders.
     self.receivers = [
         operations.ConsumerSet(self.counter_factory, self.step_name, 0,
-                               next(consumers.itervalues()),
+                               next(six.itervalues(consumers)),
                                self.windowed_coder)]
 
   def process(self, windowed_value):
@@ -281,7 +283,7 @@ def process_bundle(self, instruction_id):
     try:
       self.state_sampler.start()
       # Start all operations.
-      for op in reversed(self.ops.values()):
+      for op in reversed(list(self.ops.values())):
         logging.info('start %s', op)
         op.start()
 
diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py 
b/sdks/python/apache_beam/runners/worker/data_plane.py
index 2e4f2d6f69a..d6e5d54b2f1 100644
--- a/sdks/python/apache_beam/runners/worker/data_plane.py
+++ b/sdks/python/apache_beam/runners/worker/data_plane.py
@@ -24,12 +24,13 @@
 import abc
 import collections
 import logging
-import Queue as queue
 import sys
 import threading
 
 import grpc
+import six
 
+import six.moves.queue as queue
 from apache_beam.coders import coder_impl
 from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.portability.api import beam_fn_api_pb2_grpc
@@ -49,7 +50,7 @@ def close(self):
       self._close_callback(self.get())
 
 
-class DataChannel(object):
+class DataChannel(six.with_metaclass(abc.ABCMeta, object)):
   """Represents a channel for reading and writing data over the data plane.
 
   Read from this channel with the input_elements method::
@@ -68,8 +69,6 @@ class DataChannel(object):
     data_channel.close()
   """
 
-  __metaclass__ = abc.ABCMeta
-
   @abc.abstractmethod
   def input_elements(self, instruction_id, expected_targets):
     """Returns an iterable of all Element.Data bundles for instruction_id.
@@ -182,7 +181,7 @@ def input_elements(self, instruction_id, expected_targets):
           data = received.get(timeout=1)
         except queue.Empty:
           if self._exc_info:
-            raise self.exc_info[0], self.exc_info[1], self.exc_info[2]
+            six.reraise(self.exc_info[0], self.exc_info[1], self.exc_info[2])
         else:
           if not data.data and data.target in expected_targets:
             done_targets.append(data.target)
@@ -270,11 +269,9 @@ def Data(self, elements_iterator, context):
       yield elements
 
 
-class DataChannelFactory(object):
+class DataChannelFactory(six.with_metaclass(abc.ABCMeta, object)):
   """An abstract factory for creating ``DataChannel``."""
 
-  __metaclass__ = abc.ABCMeta
-
   @abc.abstractmethod
   def create_data_channel(self, remote_grpc_port):
     """Returns a ``DataChannel`` from the given RemoteGrpcPort."""
diff --git a/sdks/python/apache_beam/runners/worker/data_plane_test.py 
b/sdks/python/apache_beam/runners/worker/data_plane_test.py
index 07ba8fd44f1..b2cefbe1469 100644
--- a/sdks/python/apache_beam/runners/worker/data_plane_test.py
+++ b/sdks/python/apache_beam/runners/worker/data_plane_test.py
@@ -28,6 +28,7 @@
 from concurrent import futures
 
 import grpc
+import six
 
 from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.portability.api import beam_fn_api_pb2_grpc
@@ -50,7 +51,7 @@ def call_fn():
       thread.join(timeout_secs)
       if exc_info:
         t, v, tb = exc_info  # pylint: disable=unbalanced-tuple-unpacking
-        raise t, v, tb
+        six.reraise(t, v, tb)
       assert not thread.is_alive(), 'timed out after %s seconds' % timeout_secs
     return wrapper
   return decorate
diff --git a/sdks/python/apache_beam/runners/worker/log_handler.py 
b/sdks/python/apache_beam/runners/worker/log_handler.py
index 6d8a1d92671..4cb4b2336e4 100644
--- a/sdks/python/apache_beam/runners/worker/log_handler.py
+++ b/sdks/python/apache_beam/runners/worker/log_handler.py
@@ -16,13 +16,15 @@
 #
 """Beam fn API log handler."""
 
+from __future__ import absolute_import
+
 import logging
 import math
-import Queue as queue
 import threading
 
 import grpc
 
+import six.moves.queue as queue
 from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.portability.api import beam_fn_api_pb2_grpc
 
diff --git a/sdks/python/apache_beam/runners/worker/log_handler_test.py 
b/sdks/python/apache_beam/runners/worker/log_handler_test.py
index 647b8b7e8b4..958f081ad87 100644
--- a/sdks/python/apache_beam/runners/worker/log_handler_test.py
+++ b/sdks/python/apache_beam/runners/worker/log_handler_test.py
@@ -101,7 +101,7 @@ def _create_test(name, num_logs):
           lambda self: self._verify_fn_log_handler(num_logs))
 
 
-for test_name, num_logs_entries in data.iteritems():
+for test_name, num_logs_entries in data.items():
   _create_test(test_name, num_logs_entries)
 
 
diff --git a/sdks/python/apache_beam/runners/worker/logger_test.py 
b/sdks/python/apache_beam/runners/worker/logger_test.py
index cf3f6929282..a41547865ea 100644
--- a/sdks/python/apache_beam/runners/worker/logger_test.py
+++ b/sdks/python/apache_beam/runners/worker/logger_test.py
@@ -83,7 +83,7 @@ def create_log_record(self, **kwargs):
     class Record(object):
 
       def __init__(self, **kwargs):
-        for k, v in kwargs.iteritems():
+        for k, v in kwargs.items():
           setattr(self, k, v)
 
     return Record(**kwargs)
diff --git a/sdks/python/apache_beam/runners/worker/operation_specs.py 
b/sdks/python/apache_beam/runners/worker/operation_specs.py
index bdafbeaf44a..2bb1aa500ee 100644
--- a/sdks/python/apache_beam/runners/worker/operation_specs.py
+++ b/sdks/python/apache_beam/runners/worker/operation_specs.py
@@ -55,7 +55,7 @@ def worker_printable_fields(workerproto):
   return ['%s=%s' % (name, value)
           # _asdict is the only way and cannot subclass this generated class
           # pylint: disable=protected-access
-          for name, value in workerproto._asdict().iteritems()
+          for name, value in workerproto._asdict().items()
           # want to output value 0 but not None nor []
           if (value or value == 0)
           and name not in
diff --git a/sdks/python/apache_beam/runners/worker/operations.py 
b/sdks/python/apache_beam/runners/worker/operations.py
index 11ff909f3e9..3efe36ecd61 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -19,10 +19,14 @@
 
 """Worker operations executor."""
 
+from __future__ import absolute_import
+
 import collections
 import itertools
 import logging
 
+import six
+
 from apache_beam import pvalue
 from apache_beam.internal import pickler
 from apache_beam.io import iobase
@@ -529,7 +533,7 @@ def process(self, wkv):
         target = self.key_count * 9 // 10
         old_wkeys = []
         # TODO(robertwb): Use an LRU cache?
-        for old_wkey, old_wvalue in self.table.iteritems():
+        for old_wkey, old_wvalue in six.iteritems(self.table):
           old_wkeys.append(old_wkey)  # Can't mutate while iterating.
           self.output_key(old_wkey, old_wvalue[0])
           self.key_count -= 1
@@ -544,7 +548,7 @@ def process(self, wkv):
     entry[0] = self.combine_fn_add_input(entry[0], value)
 
   def finish(self):
-    for wkey, value in self.table.iteritems():
+    for wkey, value in six.iteritems(self.table):
       self.output_key(wkey, value[0])
     self.table = {}
     self.key_count = 0
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py 
b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index 2767530adb0..360964e392a 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -21,14 +21,15 @@
 from __future__ import print_function
 
 import logging
-import Queue as queue
 import sys
 import threading
 import traceback
 from concurrent import futures
 
 import grpc
+import six
 
+import six.moves.queue as queue
 from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.portability.api import beam_fn_api_pb2_grpc
 from apache_beam.runners.worker import bundle_processor
@@ -288,7 +289,7 @@ def _blocking_request(self, request):
     self._requests.put(request)
     while not future.wait(timeout=1):
       if self._exc_info:
-        raise self._exc_info[0], self._exc_info[1], self._exc_info[2]
+        six.reraise(self._exc_info[0], self._exc_info[1], self._exc_info[2])
       elif self._done:
         raise RuntimeError()
     del self._responses_by_id[request.id]
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py 
b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
index 46670e88964..20d4366f81a 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
@@ -16,7 +16,6 @@
 #
 """SDK Fn Harness entry point."""
 
-import BaseHTTPServer
 import json
 import logging
 import os
@@ -27,6 +26,7 @@
 
 from google.protobuf import text_format
 
+import six.moves.BaseHTTPServer
 from apache_beam.internal import pickler
 from apache_beam.portability.api import endpoints_pb2
 from apache_beam.runners.dataflow.internal import names
@@ -57,7 +57,7 @@ def start(self, status_http_port=0):
         Default is 0 which means any free unsecured port
     """
 
-    class StatusHttpHandler(BaseHTTPServer.BaseHTTPRequestHandler):
+    class StatusHttpHandler(six.moves.BaseHTTPServer.BaseHTTPRequestHandler):
       """HTTP handler for serving stacktraces of all threads."""
 
       def do_GET(self):  # pylint: disable=invalid-name
@@ -73,7 +73,7 @@ def log_message(self, f, *args):
         """Do not log any messages."""
         pass
 
-    self.httpd = httpd = BaseHTTPServer.HTTPServer(
+    self.httpd = httpd = six.moves.BaseHTTPServer.HTTPServer(
         ('localhost', status_http_port), StatusHttpHandler)
     logging.info('Status HTTP server running at %s:%s', httpd.server_name,
                  httpd.server_port)
@@ -157,10 +157,10 @@ def _get_worker_count(pipeline_options):
     an int containing the worker_threads to use. Default is 1
   """
   pipeline_options = pipeline_options.get(
-      'options') if pipeline_options.has_key('options') else {}
+      'options') if 'options' in pipeline_options else {}
   experiments = pipeline_options.get(
       'experiments'
-  ) if pipeline_options and pipeline_options.has_key('experiments') else []
+  ) if pipeline_options and 'experiments' in pipeline_options else []
 
   experiments = experiments if experiments else []
 
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs.py 
b/sdks/python/apache_beam/runners/worker/sideinputs.py
index cc405e0e477..824af8223b3 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs.py
@@ -19,10 +19,10 @@
 
 import collections
 import logging
-import Queue
 import threading
 import traceback
 
+import six.moves.queue as queue
 from apache_beam.coders import observable
 from apache_beam.io import iobase
 from apache_beam.options.value_provider import RuntimeValueProvider
@@ -61,13 +61,13 @@ def __init__(self,
     self.num_reader_threads = min(max_reader_threads, len(self.sources))
 
     # Queue for sources that are to be read.
-    self.sources_queue = Queue.Queue()
+    self.sources_queue = queue.Queue()
     for source in sources:
       self.sources_queue.put(source)
     # Queue for elements that have been read.
-    self.element_queue = Queue.Queue(ELEMENT_QUEUE_SIZE)
+    self.element_queue = queue.Queue(ELEMENT_QUEUE_SIZE)
     # Queue for exceptions encountered in reader threads; to be rethrown.
-    self.reader_exceptions = Queue.Queue()
+    self.reader_exceptions = queue.Queue()
     # Whether we have already iterated; this iterable can only be used once.
     self.already_iterated = False
     # Whether an error was encountered in any source reader.
@@ -137,7 +137,7 @@ def _reader_thread(self):
                   self.element_queue.put(value)
                 else:
                   self.element_queue.put(_globally_windowed(value))
-        except Queue.Empty:
+        except queue.Empty:
           return
     except Exception as e:  # pylint: disable=broad-except
       logging.error('Encountered exception in PrefetchingSourceSetIterable '
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs_test.py 
b/sdks/python/apache_beam/runners/worker/sideinputs_test.py
index 212dc19fde9..5d6aa8aecf5 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs_test.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs_test.py
@@ -77,7 +77,7 @@ def test_single_source_iterator_fn(self):
     ]
     iterator_fn = sideinputs.get_iterator_fn_for_sources(
         sources, max_reader_threads=2)
-    assert list(strip_windows(iterator_fn())) == range(6)
+    assert list(strip_windows(iterator_fn())) == list(range(6))
 
   def test_bytes_read_behind_experiment(self):
     mock_read_counter = mock.MagicMock()
@@ -115,7 +115,7 @@ def test_multiple_sources_iterator_fn(self):
     ]
     iterator_fn = sideinputs.get_iterator_fn_for_sources(
         sources, max_reader_threads=3)
-    assert sorted(strip_windows(iterator_fn())) == range(11)
+    assert sorted(strip_windows(iterator_fn())) == list(range(11))
 
   def test_multiple_sources_single_reader_iterator_fn(self):
     sources = [
@@ -126,7 +126,7 @@ def test_multiple_sources_single_reader_iterator_fn(self):
     ]
     iterator_fn = sideinputs.get_iterator_fn_for_sources(
         sources, max_reader_threads=1)
-    assert list(strip_windows(iterator_fn())) == range(11)
+    assert list(strip_windows(iterator_fn())) == list(range(11))
 
   def test_source_iterator_single_source_exception(self):
     class MyException(Exception):
@@ -172,7 +172,7 @@ def perpetual_generator(value):
     with self.assertRaises(MyException):
       for value in iterator_fn():
         seen.add(value.value)
-    self.assertEqual(sorted(seen), range(5))
+    self.assertEqual(sorted(seen), list(range(5)))
 
 
 class EmulatedCollectionsTest(unittest.TestCase):
diff --git a/sdks/python/apache_beam/testing/test_stream.py 
b/sdks/python/apache_beam/testing/test_stream.py
index 8a63e7bd056..a62595520ad 100644
--- a/sdks/python/apache_beam/testing/test_stream.py
+++ b/sdks/python/apache_beam/testing/test_stream.py
@@ -20,9 +20,13 @@
 For internal use only; no backwards-compatibility guarantees.
 """
 
+from __future__ import absolute_import
+
 from abc import ABCMeta
 from abc import abstractmethod
 
+import six
+
 from apache_beam import coders
 from apache_beam import core
 from apache_beam import pvalue
@@ -41,11 +45,9 @@
     ]
 
 
-class Event(object):
+class Event(six.with_metaclass(ABCMeta, object)):
   """Test stream event to be emitted during execution of a TestStream."""
 
-  __metaclass__ = ABCMeta
-
   def __cmp__(self, other):
     if type(self) is not type(other):
       return cmp(type(self), type(other))
diff --git a/sdks/python/apache_beam/testing/test_utils_test.py 
b/sdks/python/apache_beam/testing/test_utils_test.py
index 0018c0ed154..20060adec63 100644
--- a/sdks/python/apache_beam/testing/test_utils_test.py
+++ b/sdks/python/apache_beam/testing/test_utils_test.py
@@ -50,7 +50,7 @@ def test_delete_files_fails_with_io_error(self):
       utils.delete_files([path])
     self.assertTrue(
         error.exception.args[0].startswith('Delete operation failed'))
-    self.assertEqual(error.exception.exception_details.keys(), [path])
+    self.assertEqual(list(error.exception.exception_details.keys()), [path])
 
   def test_delete_files_fails_with_invalid_arg(self):
     with self.assertRaises(RuntimeError):
diff --git a/sdks/python/apache_beam/testing/util.py 
b/sdks/python/apache_beam/testing/util.py
index c08d250e116..fd95482221c 100644
--- a/sdks/python/apache_beam/testing/util.py
+++ b/sdks/python/apache_beam/testing/util.py
@@ -157,6 +157,6 @@ def open_shards(glob_pattern):
   """Returns a composite file of all shards matching the given glob pattern."""
   with tempfile.NamedTemporaryFile(delete=False) as f:
     for shard in glob.glob(glob_pattern):
-      f.write(file(shard).read())
+      f.write(open(shard).read())
     concatenated_file_name = f.name
-  return file(concatenated_file_name, 'rb')
+  return open(concatenated_file_name, 'rb')
diff --git a/sdks/python/apache_beam/transforms/combiners.py 
b/sdks/python/apache_beam/transforms/combiners.py
index b6f19c6c03e..b5ac7adc738 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -87,7 +87,7 @@ def add_input(self, sum_count, element):
     return sum_ + element, count + 1
 
   def merge_accumulators(self, accumulators):
-    sums, counts = zip(*accumulators)
+    sums, counts = list(zip(*accumulators))
     return sum(sums), sum(counts)
 
   def extract_output(self, sum_count):
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py 
b/sdks/python/apache_beam/transforms/combiners_test.py
index 705106e784c..0e94a98dd72 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -135,7 +135,9 @@ def test_combine_fn(combine_fn, shards, expected):
       final_accumulator = combine_fn.merge_accumulators(accumulators)
       self.assertEqual(combine_fn.extract_output(final_accumulator), expected)
 
-    test_combine_fn(combine.TopCombineFn(3), [range(10), range(10)], [9, 9, 8])
+    test_combine_fn(combine.TopCombineFn(3),
+                    [range(10), range(10)],
+                    [9, 9, 8])
     test_combine_fn(combine.TopCombineFn(5),
                     [range(1000), range(100), range(1001)],
                     [1000, 999, 999, 998, 998])
@@ -284,7 +286,7 @@ def match(actual):
     def matcher():
       def match(actual):
         equal_to([1])([len(actual)])
-        equal_to(pairs)(actual[0].iteritems())
+        equal_to(pairs)(actual[0].items())
       return match
     assert_that(result, matcher())
     pipeline.run()
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index 2d411ee7533..2d565852561 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -23,6 +23,8 @@
 import inspect
 import types
 
+import six
+
 from apache_beam import coders
 from apache_beam import pvalue
 from apache_beam import typehints
@@ -845,7 +847,8 @@ def with_outputs(self, *tags, **main_kw):
     """
     main_tag = main_kw.pop('main', None)
     if main_kw:
-      raise ValueError('Unexpected keyword arguments: %s' % main_kw.keys())
+      raise ValueError('Unexpected keyword arguments: %s'
+                       % list(main_kw.keys()))
     return _MultiParDo(self, tags, main_tag)
 
   def _pardo_fn_data(self):
@@ -1676,7 +1679,7 @@ def __init__(self, **kwargs):
     super(Flatten, self).__init__()
     self.pipeline = kwargs.pop('pipeline', None)
     if kwargs:
-      raise ValueError('Unexpected keyword arguments: %s' % kwargs.keys())
+      raise ValueError('Unexpected keyword arguments: %s' % 
list(kwargs.keys()))
 
   def _extract_input_pvalues(self, pvalueish):
     try:
@@ -1721,7 +1724,7 @@ def __init__(self, value):
       value: An object of values for the PCollection
     """
     super(Create, self).__init__()
-    if isinstance(value, basestring):
+    if isinstance(value, six.string_types):
       raise TypeError('PTransform Create: Refusing to treat string as '
                       'an iterable. (string=%r)' % value)
     elif isinstance(value, dict):
@@ -1751,7 +1754,7 @@ def get_windowing(self, unused_inputs):
 
   @staticmethod
   def _create_source_from_iterable(values, coder):
-    return Create._create_source(map(coder.encode, values), coder)
+    return Create._create_source(list(map(coder.encode, values)), coder)
 
   @staticmethod
   def _create_source(serialized_values, coder):
diff --git a/sdks/python/apache_beam/transforms/display.py 
b/sdks/python/apache_beam/transforms/display.py
index cb7b53eb29a..4206f2110b7 100644
--- a/sdks/python/apache_beam/transforms/display.py
+++ b/sdks/python/apache_beam/transforms/display.py
@@ -44,6 +44,8 @@
 from datetime import datetime
 from datetime import timedelta
 
+import six
+
 __all__ = ['HasDisplayData', 'DisplayDataItem', 'DisplayData']
 
 
@@ -167,7 +169,7 @@ class DisplayDataItem(object):
   display item belongs to.
   """
   typeDict = {str:'STRING',
-              unicode:'STRING',
+              six.text_type:'STRING',
               int:'INTEGER',
               float:'FLOAT',
               bool: 'BOOLEAN',
diff --git a/sdks/python/apache_beam/transforms/display_test.py 
b/sdks/python/apache_beam/transforms/display_test.py
index 5c73cf39a92..90bde8caa8c 100644
--- a/sdks/python/apache_beam/transforms/display_test.py
+++ b/sdks/python/apache_beam/transforms/display_test.py
@@ -22,7 +22,9 @@
 import unittest
 from datetime import datetime
 
+# pylint: disable=ungrouped-imports
 import hamcrest as hc
+import six
 from hamcrest.core.base_matcher import BaseMatcher
 
 import apache_beam as beam
@@ -31,6 +33,8 @@
 from apache_beam.transforms.display import DisplayDataItem
 from apache_beam.transforms.display import HasDisplayData
 
+# pylint: enable=ungrouped-imports
+
 
 class DisplayDataItemMatcher(BaseMatcher):
   """ Matcher class for DisplayDataItems in unit tests.
@@ -161,7 +165,7 @@ def test_create_list_display_data(self):
   def test_unicode_type_display_data(self):
     class MyDoFn(beam.DoFn):
       def display_data(self):
-        return {'unicode_string': unicode('my string'),
+        return {'unicode_string': six.text_type('my string'),
                 'unicode_literal_string': u'my literal string'}
 
     fn = MyDoFn()
diff --git a/sdks/python/apache_beam/transforms/ptransform.py 
b/sdks/python/apache_beam/transforms/ptransform.py
index c7fc641804d..9a2abd2e478 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -611,7 +611,7 @@ def __init__(self, fn, *args, **kwargs):
     super(PTransformWithSideInputs, self).__init__()
 
     if (any([isinstance(v, pvalue.PCollection) for v in args]) or
-        any([isinstance(v, pvalue.PCollection) for v in kwargs.itervalues()])):
+        any([isinstance(v, pvalue.PCollection) for v in kwargs.values()])):
       raise error.SideInputError(
           'PCollection used directly as side input argument. Specify '
           'AsIter(pcollection) or AsSingleton(pcollection) to indicate how the 
'
diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py 
b/sdks/python/apache_beam/transforms/sideinputs_test.py
index 1d5883481b6..59305e16233 100644
--- a/sdks/python/apache_beam/transforms/sideinputs_test.py
+++ b/sdks/python/apache_beam/transforms/sideinputs_test.py
@@ -17,6 +17,8 @@
 
 """Unit tests for side inputs."""
 
+from __future__ import absolute_import
+
 import logging
 import unittest
 
@@ -194,7 +196,7 @@ def match(actual):
         [[actual_elem, actual_list, actual_dict]] = actual
         equal_to([expected_elem])([actual_elem])
         equal_to(expected_list)(actual_list)
-        equal_to(expected_pairs)(actual_dict.iteritems())
+        equal_to(expected_pairs)(actual_dict.items())
       return match
 
     assert_that(results, matcher(1, a_list, some_pairs))
@@ -284,8 +286,8 @@ def  matcher(expected_elem, expected_kvs):
       def match(actual):
         [[actual_elem, actual_dict1, actual_dict2]] = actual
         equal_to([expected_elem])([actual_elem])
-        equal_to(expected_kvs)(actual_dict1.iteritems())
-        equal_to(expected_kvs)(actual_dict2.iteritems())
+        equal_to(expected_kvs)(actual_dict1.items())
+        equal_to(expected_kvs)(actual_dict2.items())
       return match
 
     assert_that(results, matcher(1, some_kvs))
diff --git a/sdks/python/apache_beam/transforms/timeutil.py 
b/sdks/python/apache_beam/transforms/timeutil.py
index 8d63d49baad..851b9f42714 100644
--- a/sdks/python/apache_beam/transforms/timeutil.py
+++ b/sdks/python/apache_beam/transforms/timeutil.py
@@ -22,6 +22,8 @@
 from abc import ABCMeta
 from abc import abstractmethod
 
+import six
+
 __all__ = [
     'TimeDomain',
     ]
@@ -43,11 +45,9 @@ def from_string(domain):
     raise ValueError('Unknown time domain: %s' % domain)
 
 
-class TimestampCombinerImpl(object):
+class TimestampCombinerImpl(six.with_metaclass(ABCMeta, object)):
   """Implementation of TimestampCombiner."""
 
-  __metaclass__ = ABCMeta
-
   @abstractmethod
   def assign_output_time(self, window, input_timestamp):
     pass
@@ -72,11 +72,9 @@ def merge(self, unused_result_window, merging_timestamps):
     return self.combine_all(merging_timestamps)
 
 
-class DependsOnlyOnWindow(TimestampCombinerImpl):
+class DependsOnlyOnWindow(six.with_metaclass(ABCMeta, TimestampCombinerImpl)):
   """TimestampCombinerImpl that only depends on the window."""
 
-  __metaclass__ = ABCMeta
-
   def combine(self, output_timestamp, other_output_timestamp):
     return output_timestamp
 
diff --git a/sdks/python/apache_beam/transforms/trigger.py 
b/sdks/python/apache_beam/transforms/trigger.py
index 7d03240f941..5ca6aa933da 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -20,6 +20,8 @@
 Triggers control when in processing time windows get emitted.
 """
 
+from __future__ import absolute_import
+
 import collections
 import copy
 import itertools
@@ -28,6 +30,8 @@
 from abc import ABCMeta
 from abc import abstractmethod
 
+import six
+
 from apache_beam.coders import observable
 from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.transforms import combiners
@@ -67,14 +71,13 @@ class AccumulationMode(object):
   # RETRACTING = 3
 
 
-class _StateTag(object):
+class _StateTag(six.with_metaclass(ABCMeta, object)):
   """An identifier used to store and retrieve typed, combinable state.
 
   The given tag must be unique for this stage.  If CombineFn is None then
   all elements will be returned as a list, otherwise the given CombineFn
   will be applied (possibly incrementally and eagerly) when adding elements.
   """
-  __metaclass__ = ABCMeta
 
   def __init__(self, tag):
     self.tag = tag
@@ -135,12 +138,11 @@ def with_prefix(self, prefix):
 
 # pylint: disable=unused-argument
 # TODO(robertwb): Provisional API, Java likely to change as well.
-class TriggerFn(object):
+class TriggerFn(six.with_metaclass(ABCMeta, object)):
   """A TriggerFn determines when window (panes) are emitted.
 
   See https://beam.apache.org/documentation/programming-guide/#triggers
   """
-  __metaclass__ = ABCMeta
 
   @abstractmethod
   def on_element(self, element, window, context):
@@ -511,9 +513,7 @@ def to_runner_api(self, context):
             subtrigger=self.underlying.to_runner_api(context)))
 
 
-class _ParallelTriggerFn(TriggerFn):
-
-  __metaclass__ = ABCMeta
+class _ParallelTriggerFn(six.with_metaclass(ABCMeta, TriggerFn)):
 
   def __init__(self, *triggers):
     self.triggers = triggers
@@ -740,14 +740,12 @@ def clear_state(self, tag):
 
 
 # pylint: disable=unused-argument
-class SimpleState(object):
+class SimpleState(six.with_metaclass(ABCMeta, object)):
   """Basic state storage interface used for triggering.
 
   Only timers must hold the watermark (by their timestamp).
   """
 
-  __metaclass__ = ABCMeta
-
   @abstractmethod
   def set_timer(self, window, name, time_domain, timestamp):
     pass
@@ -859,7 +857,7 @@ def merge(self, to_be_merged, merge_result):
           self._persist_window_ids()
 
   def known_windows(self):
-    return self.window_ids.keys()
+    return list(self.window_ids.keys())
 
   def get_window(self, window_id):
     for window, ids in self.window_ids.items():
@@ -913,11 +911,9 @@ def create_trigger_driver(windowing,
   return driver
 
 
-class TriggerDriver(object):
+class TriggerDriver(six.with_metaclass(ABCMeta, object)):
   """Breaks a series of bundle and timer firings into window (pane)s."""
 
-  __metaclass__ = ABCMeta
-
   @abstractmethod
   def process_elements(self, state, windowed_values, output_watermark):
     pass
@@ -963,7 +959,7 @@ def __eq__(self, other):
     if isinstance(other, collections.Iterable):
       return all(
           a == b
-          for a, b in itertools.izip_longest(self, other, fillvalue=object()))
+          for a, b in itertools.zip_longest(self, other, fillvalue=object()))
     else:
       return NotImplemented
 
@@ -1042,7 +1038,7 @@ def process_elements(self, state, windowed_values, 
output_watermark):
     # First handle merging.
     if self.is_merging:
       old_windows = set(state.known_windows())
-      all_windows = old_windows.union(windows_to_elements.keys())
+      all_windows = old_windows.union(list(windows_to_elements.keys()))
 
       if all_windows != old_windows:
         merged_away = {}
@@ -1243,7 +1239,7 @@ def get_and_clear_timers(self, watermark=MAX_TIMESTAMP):
 
   def get_earliest_hold(self):
     earliest_hold = MAX_TIMESTAMP
-    for unused_window, tagged_states in self.state.iteritems():
+    for unused_window, tagged_states in six.iteritems(self.state):
       # TODO(BEAM-2519): currently, this assumes that the watermark hold tag is
       # named "watermark".  This is currently only true because the only place
       # watermark holds are set is in the GeneralTriggerDriver, where we use
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py 
b/sdks/python/apache_beam/transforms/trigger_test.py
index d66736f6218..06c0ebcb39d 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -381,7 +381,7 @@ def test_picklable_output(self):
       pickle.dumps(unpicklable)
     for unwindowed in driver.process_elements(None, unpicklable, None):
       self.assertEqual(pickle.loads(pickle.dumps(unwindowed)).value,
-                       range(10))
+                       list(range(10)))
 
 
 class RunnerApiTest(unittest.TestCase):
@@ -425,7 +425,7 @@ def format_result(k_v):
               # A-10, A-11 never emitted due to AfterCount(3) never firing.
               'B-4': {6, 7, 8, 9},
               'B-3': {10, 15, 16},
-          }.iteritems()))
+          }.items()))
 
 
 class TranscriptTest(unittest.TestCase):
@@ -554,7 +554,7 @@ def fire_timers():
 
     for line in spec['transcript']:
 
-      action, params = line.items()[0]
+      action, params = list(line.items())[0]
 
       if action != 'expect':
         # Fail if we have output that was not expected in the transcript.
diff --git a/sdks/python/apache_beam/transforms/util.py 
b/sdks/python/apache_beam/transforms/util.py
index 8185e64a67c..103492a4312 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -25,6 +25,8 @@
 import random
 import time
 
+import six
+
 from apache_beam import typehints
 from apache_beam.metrics import Metrics
 from apache_beam.transforms import window
@@ -109,12 +111,12 @@ def __init__(self, **kwargs):
     super(CoGroupByKey, self).__init__()
     self.pipeline = kwargs.pop('pipeline', None)
     if kwargs:
-      raise ValueError('Unexpected keyword arguments: %s' % kwargs.keys())
+      raise ValueError('Unexpected keyword arguments: %s' % 
list(kwargs.keys()))
 
   def _extract_input_pvalues(self, pvalueish):
     try:
       # If this works, it's a dict.
-      return pvalueish, tuple(pvalueish.viewvalues())
+      return pvalueish, tuple(six.viewvalues(pvalueish))
     except AttributeError:
       pcolls = tuple(pvalueish)
       return pcolls, pcolls
@@ -149,7 +151,7 @@ def _merge_tagged_vals_under_key(key_grouped, result_ctor,
       # pairs. The result value constructor makes tuples with len(pcolls) 
slots.
       pcolls = list(enumerate(pcolls))
       result_ctor_arg = len(pcolls)
-      result_ctor = lambda size: tuple([] for _ in xrange(size))
+      result_ctor = lambda size: tuple([] for _ in range(size))
 
     # Check input PCollections for PCollection-ness, and that they all belong
     # to the same pipeline.
@@ -260,7 +262,7 @@ def _thin_data(self):
     odd_one_out = [sorted_data[-1]] if len(sorted_data) % 2 == 1 else []
     # Sort the pairs by how different they are.
     pairs = sorted(zip(sorted_data[::2], sorted_data[1::2]),
-                   key=lambda ((x1, _1), (x2, _2)): x2 / x1)
+                   key=lambda p1_p2: p1_p2[1][0] / p1_p2[0][0])
     # Keep the top 1/3 most different pairs, average the top 2/3 most similar.
     threshold = 2 * len(pairs) / 3
     self._data = (
diff --git a/sdks/python/apache_beam/transforms/window.py 
b/sdks/python/apache_beam/transforms/window.py
index c250e8c6d36..2955f93367b 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -51,6 +51,7 @@
 
 import abc
 
+import six
 from google.protobuf import duration_pb2
 from google.protobuf import timestamp_pb2
 
@@ -108,11 +109,9 @@ def get_impl(timestamp_combiner, window_fn):
       raise ValueError('Invalid TimestampCombiner: %s.' % timestamp_combiner)
 
 
-class WindowFn(urns.RunnerApiFn):
+class WindowFn(six.with_metaclass(abc.ABCMeta, urns.RunnerApiFn)):
   """An abstract windowing function defining a basic assign and merge."""
 
-  __metaclass__ = abc.ABCMeta
-
   class AssignContext(object):
     """Context passed to WindowFn.assign()."""
 
diff --git a/sdks/python/apache_beam/typehints/typecheck.py 
b/sdks/python/apache_beam/typehints/typecheck.py
index 7c7012c8aea..99a24cf79b1 100644
--- a/sdks/python/apache_beam/typehints/typecheck.py
+++ b/sdks/python/apache_beam/typehints/typecheck.py
@@ -20,6 +20,8 @@
 For internal use only; no backwards-compatibility guarantees.
 """
 
+from __future__ import absolute_import
+
 import collections
 import inspect
 import sys
@@ -205,7 +207,7 @@ def add_input(self, accumulator, element, *args, **kwargs):
       except TypeCheckError as e:
         error_msg = ('Runtime type violation detected within %s: '
                      '%s' % (self._label, e))
-        raise TypeCheckError, error_msg, sys.exc_info()[2]
+        six.reraise(TypeCheckError, error_msg, sys.exc_info()[2])
     return self._combinefn.add_input(accumulator, element, *args, **kwargs)
 
   def merge_accumulators(self, accumulators, *args, **kwargs):
@@ -220,7 +222,7 @@ def extract_output(self, accumulator, *args, **kwargs):
       except TypeCheckError as e:
         error_msg = ('Runtime type violation detected within %s: '
                      '%s' % (self._label, e))
-        raise TypeCheckError, error_msg, sys.exc_info()[2]
+        six.reraise(TypeCheckError, error_msg, sys.exc_info()[2])
     return result
 
 
diff --git a/sdks/python/apache_beam/typehints/typehints.py 
b/sdks/python/apache_beam/typehints/typehints.py
index 3455672e7a8..7cad96b71af 100644
--- a/sdks/python/apache_beam/typehints/typehints.py
+++ b/sdks/python/apache_beam/typehints/typehints.py
@@ -408,6 +408,9 @@ class AnyTypeConstraint(TypeConstraint):
   def __eq__(self, other):
     return type(self) == type(other)
 
+  def __hash__(self):
+    return hash(type(self))
+
   def __repr__(self):
     return 'Any'
 
@@ -420,6 +423,10 @@ class TypeVariable(AnyTypeConstraint):
   def __eq__(self, other):
     return type(self) == type(other) and self.name == other.name
 
+  def __hash__(self):
+    # TODO(BEAM-3730): A proper hash causes combiners_test tests to fail
+    return id(self)
+
   def __init__(self, name):
     self.name = name
 
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index a069237e22b..27d3ce0c977 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -15,6 +15,8 @@
 # limitations under the License.
 #
 
+from __future__ import print_function
+
 """Apache Beam SDK for Python setup file."""
 
 from distutils.version import StrictVersion
@@ -94,9 +96,11 @@ def get_version():
   except ImportError:
     cythonize = lambda *args, **kwargs: []
 
+_AVRO = ('avro>=1.8.1,<2.0.0' if sys.version_info[0] == 2
+         else 'avro-python3>=1.8.1,<2.0.0')
 
 REQUIRED_PACKAGES = [
-    'avro>=1.8.1,<2.0.0',
+    _AVRO,
     'crcmod>=1.7,<2.0',
     'dill==0.2.6',
     'grpcio>=1.0,<2',
@@ -159,7 +163,7 @@ def generate_common_urns():
   src_time = os.path.getmtime(src) if os.path.exists(src) else -1
   out_time = os.path.getmtime(out) if os.path.exists(out) else -1
   if src_time > out_time:
-    print 'Regenerating common_urns module.'
+    print('Regenerating common_urns module.')
     urns = {}
     for m in re.finditer(
         r'\b(?:urn:)?beam:(\S+):(\S+):(v\S+)', open(src).read()):


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 83041)
    Time Spent: 3h  (was: 2h 50m)

> Enable tests to run in Python 3
> -------------------------------
>
>                 Key: BEAM-3731
>                 URL: https://issues.apache.org/jira/browse/BEAM-3731
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Luke Zhu
>            Assignee: Luke Zhu
>            Priority: Major
>          Time Spent: 3h
>  Remaining Estimate: 0h
>
> Currently the Python3 tests fail to run. This makes it difficult to continue 
> with Python 3 compatibility.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to