[ 
https://issues.apache.org/jira/browse/BEAM-3981?focusedWorklogId=88955&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88955
 ]

ASF GitHub Bot logged work on BEAM-3981:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Apr/18 14:41
            Start Date: 09/Apr/18 14:41
    Worklog Time Spent: 10m 
      Work Description: RobbeSneyders closed pull request #4990: [BEAM-3981] 
[WIP] Futurize and fix python 2 compatibility for coders subpackage
URL: https://github.com/apache/beam/pull/4990
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/coders/__init__.py 
b/sdks/python/apache_beam/coders/__init__.py
index acca89f70f4..4a7e509ed75 100644
--- a/sdks/python/apache_beam/coders/__init__.py
+++ b/sdks/python/apache_beam/coders/__init__.py
@@ -14,6 +14,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
 
 from apache_beam.coders.coders import *
 from apache_beam.coders.typecoders import registry
diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd 
b/sdks/python/apache_beam/coders/coder_impl.pxd
index 98dd508556a..dd82a00ab1a 100644
--- a/sdks/python/apache_beam/coders/coder_impl.pxd
+++ b/sdks/python/apache_beam/coders/coder_impl.pxd
@@ -74,7 +74,7 @@ cdef char STR_TYPE, UNICODE_TYPE, LIST_TYPE, TUPLE_TYPE, 
DICT_TYPE, SET_TYPE
 
 cdef class FastPrimitivesCoderImpl(StreamCoderImpl):
   cdef CoderImpl fallback_coder_impl
-  @cython.locals(unicode_value=unicode, dict_value=dict)
+  @cython.locals(dict_value = dict)
   cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
 
 
diff --git a/sdks/python/apache_beam/coders/coder_impl.py 
b/sdks/python/apache_beam/coders/coder_impl.py
index cc7ed87c3ad..ee58c3fb043 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -25,12 +25,26 @@
 coder_impl.pxd file for type hints.
 
 For internal use only; no backwards-compatibility guarantees.
+
+isort:skip_file
 """
 from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+native_int = int
 
-from types import NoneType
+# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
+from builtins import bytes
+from builtins import chr
+from builtins import int
+from builtins import object
+from builtins import range
+from builtins import str
 
-import six
+from past.builtins import str as old_str
+from past.builtins import long
+from past.builtins import unicode
 
 from apache_beam.coders import observable
 from apache_beam.utils import windowed_value
@@ -38,7 +52,6 @@
 from apache_beam.utils.timestamp import MIN_TIMESTAMP
 from apache_beam.utils.timestamp import Timestamp
 
-# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
 try:
   from .stream import InputStream as create_InputStream
   from .stream import OutputStream as create_OutputStream
@@ -54,11 +67,6 @@
   from .slow_stream import get_varint_size
 # pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
 
-try:
-  long        # Python 2
-except NameError:
-  long = int  # Python 3
-
 
 class CoderImpl(object):
   """For internal use only; no backwards-compatibility guarantees."""
@@ -199,7 +207,7 @@ def __init__(self, coder, step_label):
     self._step_label = step_label
 
   def _check_safe(self, value):
-    if isinstance(value, (str, six.text_type, long, int, float)):
+    if isinstance(value, (str, bytes, int, float)):
       pass
     elif value is None:
       pass
@@ -279,18 +287,21 @@ def get_estimated_size_and_observables(self, value, 
nested=False):
 
   def encode_to_stream(self, value, stream, nested):
     t = type(value)
-    if t is NoneType:
+    if value is None:
       stream.write_byte(NONE_TYPE)
-    elif t is int:
+    elif t is bool:
+      stream.write_byte(BOOL_TYPE)
+      stream.write_byte(value)
+    elif t is int or t is native_int or t is long:
       stream.write_byte(INT_TYPE)
       stream.write_var_int64(value)
     elif t is float:
       stream.write_byte(FLOAT_TYPE)
       stream.write_bigendian_double(value)
-    elif t is str:
+    elif t is bytes or t is old_str:
       stream.write_byte(STR_TYPE)
       stream.write(value, nested)
-    elif t is six.text_type:
+    elif t is str or t is unicode:
       unicode_value = value  # for typing
       stream.write_byte(UNICODE_TYPE)
       stream.write(unicode_value.encode('utf-8'), nested)
@@ -304,12 +315,9 @@ def encode_to_stream(self, value, stream, nested):
       dict_value = value  # for typing
       stream.write_byte(DICT_TYPE)
       stream.write_var_int64(len(dict_value))
-      for k, v in dict_value.iteritems():
+      for k, v in dict_value.items():
         self.encode_to_stream(k, stream, True)
         self.encode_to_stream(v, stream, True)
-    elif t is bool:
-      stream.write_byte(BOOL_TYPE)
-      stream.write_byte(value)
     else:
       stream.write_byte(UNKNOWN_TYPE)
       self.fallback_coder_impl.encode_to_stream(value, stream, nested)
@@ -318,6 +326,8 @@ def decode_from_stream(self, stream, nested):
     t = stream.read_byte()
     if t == NONE_TYPE:
       return None
+    elif t == BOOL_TYPE:
+      return not not stream.read_byte()
     elif t == INT_TYPE:
       return stream.read_var_int64()
     elif t == FLOAT_TYPE:
@@ -341,8 +351,6 @@ def decode_from_stream(self, stream, nested):
         k = self.decode_from_stream(stream, True)
         v[k] = self.decode_from_stream(stream, True)
       return v
-    elif t == BOOL_TYPE:
-      return not not stream.read_byte()
 
     return self.fallback_coder_impl.decode_from_stream(stream, nested)
 
@@ -394,8 +402,9 @@ def _from_normal_time(self, value):
 
   def encode_to_stream(self, value, out, nested):
     span_micros = value.end.micros - value.start.micros
-    out.write_bigendian_uint64(self._from_normal_time(value.end.micros / 1000))
-    out.write_var_int64(span_micros / 1000)
+    out.write_bigendian_uint64(
+        self._from_normal_time(value.end.micros // 1000))
+    out.write_var_int64(span_micros // 1000)
 
   def decode_from_stream(self, in_, nested):
     end_millis = self._to_normal_time(in_.read_bigendian_uint64())
@@ -409,7 +418,7 @@ def estimate_size(self, value, nested=False):
     # An IntervalWindow is context-insensitive, with a timestamp (8 bytes)
     # and a varint timespam.
     span = value.end.micros - value.start.micros
-    return 8 + get_varint_size(span / 1000)
+    return 8 + get_varint_size(span // 1000)
 
 
 class TimestampCoderImpl(StreamCoderImpl):
@@ -427,7 +436,7 @@ def estimate_size(self, unused_value, nested=False):
     return 8
 
 
-small_ints = [chr(_) for _ in range(128)]
+small_ints = [chr(_).encode('latin-1') for _ in range(128)]
 
 
 class VarIntCoderImpl(StreamCoderImpl):
@@ -783,7 +792,7 @@ def encode_to_stream(self, value, out, nested):
         # TODO(BEAM-1524): Clean this up once we have a BEAM wide consensus on
         # precision of timestamps.
         self._from_normal_time(
-            restore_sign * (abs(wv.timestamp_micros) / 1000)))
+            restore_sign * (abs(wv.timestamp_micros) // 1000)))
     self._windows_coder.encode_to_stream(wv.windows, out, True)
     # Default PaneInfo encoded byte representing NO_FIRING.
     self._pane_info_coder.encode_to_stream(wv.pane_info, out, True)
@@ -797,9 +806,9 @@ def decode_from_stream(self, in_stream, nested):
     # were indeed MIN/MAX timestamps.
     # TODO(BEAM-1524): Clean this up once we have a BEAM wide consensus on
     # precision of timestamps.
-    if timestamp == -(abs(MIN_TIMESTAMP.micros) / 1000):
+    if timestamp == -(abs(MIN_TIMESTAMP.micros) // 1000):
       timestamp = MIN_TIMESTAMP.micros
-    elif timestamp == (MAX_TIMESTAMP.micros / 1000):
+    elif timestamp == (MAX_TIMESTAMP.micros // 1000):
       timestamp = MAX_TIMESTAMP.micros
     else:
       timestamp *= 1000
diff --git a/sdks/python/apache_beam/coders/coders.py 
b/sdks/python/apache_beam/coders/coders.py
index ecbdd538d38..b943afd0c2b 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -20,9 +20,12 @@
 Only those coders listed in __all__ are part of the public API of this module.
 """
 from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
 
 import base64
-import cPickle as pickle
+from builtins import object
+from builtins import str
 
 import google.protobuf
 from google.protobuf import wrappers_pb2
@@ -33,6 +36,12 @@
 from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.utils import proto_utils
 
+# This is for py2/3 compatibility. cPickle was renamed pickle in python 3.
+try:
+  import cPickle as pickle # Python 2
+except ImportError:
+  import pickle # Python 3
+
 # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
 try:
   from .stream import get_varint_size
@@ -216,6 +225,9 @@ def __eq__(self, other):
             and self._dict_without_impl() == other._dict_without_impl())
     # pylint: enable=protected-access
 
+  def __hash__(self):
+    return hash(type(self))
+
   _known_urns = {}
 
   @classmethod
@@ -309,11 +321,6 @@ class ToStringCoder(Coder):
   """A default string coder used if no sink coder is specified."""
 
   def encode(self, value):
-    try:               # Python 2
-      if isinstance(value, unicode):
-        return value.encode('utf-8')
-    except NameError:  # Python 3
-      pass
     return str(value)
 
   def decode(self, _):
diff --git a/sdks/python/apache_beam/coders/coders_test.py 
b/sdks/python/apache_beam/coders/coders_test.py
index 705de8920d5..1a32b2df31c 100644
--- a/sdks/python/apache_beam/coders/coders_test.py
+++ b/sdks/python/apache_beam/coders/coders_test.py
@@ -14,11 +14,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
 
 import base64
 import logging
 import unittest
+from builtins import object
 
 from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message
 from apache_beam.coders import coders
@@ -99,6 +102,9 @@ def __eq__(self, other):
       return True
     return False
 
+  def __hash__(self):
+    return hash(type(self))
+
 
 class FallbackCoderTest(unittest.TestCase):
 
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py 
b/sdks/python/apache_beam/coders/coders_test_common.py
index 0ea7da2b6ad..7b709d3d645 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -17,10 +17,15 @@
 
 """Tests common to all coder implementations."""
 from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
 
 import logging
 import math
 import unittest
+from builtins import int
+from builtins import range
+from builtins import str
 
 import dill
 
@@ -40,10 +45,10 @@
 class CustomCoder(coders.Coder):
 
   def encode(self, x):
-    return str(x+1)
+    return str(x + 1).encode('latin-1')
 
   def decode(self, encoded):
-    return int(encoded) - 1
+    return int(encoded.decode('latin-1')) - 1
 
 
 class CodersTest(unittest.TestCase):
@@ -103,7 +108,7 @@ def test_custom_coder(self):
 
     self.check_coder(CustomCoder(), 1, -10, 5)
     self.check_coder(coders.TupleCoder((CustomCoder(), coders.BytesCoder())),
-                     (1, 'a'), (-10, 'b'), (5, 'c'))
+                     (1, b'a'), (-10, b'b'), (5, b'c'))
 
   def test_pickle_coder(self):
     self.check_coder(coders.PickleCoder(), 'a', 1, 1.5, (1, 2, 3))
@@ -139,7 +144,7 @@ def test_fast_primitives_coder(self):
     self.check_coder(coders.TupleCoder((coder,)), ('a',), (1,))
 
   def test_bytes_coder(self):
-    self.check_coder(coders.BytesCoder(), 'a', '\0', 'z' * 1000)
+    self.check_coder(coders.BytesCoder(), b'a', b'\0', b'z' * 1000)
 
   def test_varint_coder(self):
     # Small ints.
@@ -190,7 +195,7 @@ def test_timestamp_coder(self):
                      timestamp.Timestamp(micros=1234567890123456789))
     self.check_coder(
         coders.TupleCoder((coders.TimestampCoder(), coders.BytesCoder())),
-        (timestamp.Timestamp.of(27), 'abc'))
+        (timestamp.Timestamp.of(27), b'abc'))
 
   def test_tuple_coder(self):
     kv_coder = coders.TupleCoder((coders.VarIntCoder(), coders.BytesCoder()))
@@ -206,22 +211,22 @@ def test_tuple_coder(self):
         kv_coder.as_cloud_object())
     # Test binary representation
     self.assertEqual(
-        '\x04abc',
-        kv_coder.encode((4, 'abc')))
+        b'\x04abc',
+        kv_coder.encode((4, b'abc')))
     # Test unnested
     self.check_coder(
         kv_coder,
-        (1, 'a'),
-        (-2, 'a' * 100),
-        (300, 'abc\0' * 5))
+        (1, b'a'),
+        (-2, b'a' * 100),
+        (300, b'abc\0' * 5))
     # Test nested
     self.check_coder(
         coders.TupleCoder(
             (coders.TupleCoder((coders.PickleCoder(), coders.VarIntCoder())),
              coders.StrUtf8Coder())),
-        ((1, 2), 'a'),
+        ((1, 2), u'a'),
         ((-2, 5), u'a\u0101' * 100),
-        ((300, 1), 'abc\0' * 5))
+        ((300, 1), u'abc\0' * 5))
 
   def test_tuple_sequence_coder(self):
     int_tuple_coder = coders.TupleSequenceCoder(coders.VarIntCoder())
@@ -234,7 +239,7 @@ def test_base64_pickle_coder(self):
     self.check_coder(coders.Base64PickleCoder(), 'a', 1, 1.5, (1, 2, 3))
 
   def test_utf8_coder(self):
-    self.check_coder(coders.StrUtf8Coder(), 'a', u'ab\u00FF', u'\u0101\0')
+    self.check_coder(coders.StrUtf8Coder(), u'a', u'ab\u00FF', u'\u0101\0')
 
   def test_iterable_coder(self):
     iterable_coder = coders.IterableCoder(coders.VarIntCoder())
@@ -322,12 +327,12 @@ def test_windowed_value_coder(self):
         },
         coder.as_cloud_object())
     # Test binary representation
-    self.assertEqual('\x7f\xdf;dZ\x1c\xac\t\x00\x00\x00\x01\x0f\x01',
+    self.assertEqual(b'\x7f\xdf;dZ\x1c\xac\t\x00\x00\x00\x01\x0f\x01',
                      coder.encode(window.GlobalWindows.windowed_value(1)))
 
     # Test decoding large timestamp
     self.assertEqual(
-        coder.decode('\x7f\xdf;dZ\x1c\xac\x08\x00\x00\x00\x01\x0f\x00'),
+        coder.decode(b'\x7f\xdf;dZ\x1c\xac\x08\x00\x00\x00\x01\x0f\x00'),
         windowed_value.create(0, MIN_TIMESTAMP.micros, (GlobalWindow(),)))
 
     # Test unnested
@@ -348,7 +353,7 @@ def test_windowed_value_coder(self):
             coders.WindowedValueCoder(coders.FloatCoder()),
             coders.WindowedValueCoder(coders.StrUtf8Coder()))),
         (windowed_value.WindowedValue(1.5, 0, ()),
-         windowed_value.WindowedValue("abc", 10, ('window',))))
+         windowed_value.WindowedValue(u"abc", 10, (u'window',))))
 
   def test_proto_coder(self):
     # For instructions on how these test proto message were generated,
@@ -364,7 +369,7 @@ def test_proto_coder(self):
     proto_coder = coders.ProtoCoder(ma.__class__)
     self.check_coder(proto_coder, ma)
     self.check_coder(coders.TupleCoder((proto_coder, coders.BytesCoder())),
-                     (ma, 'a'), (mb, 'b'))
+                     (ma, b'a'), (mb, b'b'))
 
   def test_global_window_coder(self):
     coder = coders.GlobalWindowCoder()
@@ -391,16 +396,16 @@ def test_length_prefix_coder(self):
         },
         coder.as_cloud_object())
     # Test binary representation
-    self.assertEqual('\x00', coder.encode(''))
-    self.assertEqual('\x01a', coder.encode('a'))
-    self.assertEqual('\x02bc', coder.encode('bc'))
-    self.assertEqual('\xff\x7f' + 'z' * 16383, coder.encode('z' * 16383))
+    self.assertEqual(b'\x00', coder.encode(b''))
+    self.assertEqual(b'\x01a', coder.encode(b'a'))
+    self.assertEqual(b'\x02bc', coder.encode(b'bc'))
+    self.assertEqual(b'\xff\x7f' + b'z' * 16383, coder.encode(b'z' * 16383))
     # Test unnested
-    self.check_coder(coder, '', 'a', 'bc', 'def')
+    self.check_coder(coder, b'', b'a', b'bc', b'def')
     # Test nested
     self.check_coder(coders.TupleCoder((coder, coder)),
-                     ('', 'a'),
-                     ('bc', 'def'))
+                     (b'', b'a'),
+                     (b'bc', b'def'))
 
   def test_nested_observables(self):
     class FakeObservableIterator(observable.ObservableMixin):
diff --git a/sdks/python/apache_beam/coders/fast_coders_test.py 
b/sdks/python/apache_beam/coders/fast_coders_test.py
index a13334a2c26..8cb825769cf 100644
--- a/sdks/python/apache_beam/coders/fast_coders_test.py
+++ b/sdks/python/apache_beam/coders/fast_coders_test.py
@@ -16,6 +16,9 @@
 #
 
 """Unit tests for compiled implementation of coder impls."""
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
 
 import logging
 import unittest
diff --git a/sdks/python/apache_beam/coders/observable.py 
b/sdks/python/apache_beam/coders/observable.py
index fc952cf4e55..e512e60b8da 100644
--- a/sdks/python/apache_beam/coders/observable.py
+++ b/sdks/python/apache_beam/coders/observable.py
@@ -20,6 +20,11 @@
 
 For internal use only; no backwards-compatibility guarantees.
 """
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+from builtins import object
 
 
 class ObservableMixin(object):
diff --git a/sdks/python/apache_beam/coders/observable_test.py 
b/sdks/python/apache_beam/coders/observable_test.py
index 09ca3041c29..a6aea633521 100644
--- a/sdks/python/apache_beam/coders/observable_test.py
+++ b/sdks/python/apache_beam/coders/observable_test.py
@@ -16,6 +16,9 @@
 #
 
 """Tests for the Observable mixin class."""
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
 
 import logging
 import unittest
diff --git a/sdks/python/apache_beam/coders/slow_coders_test.py 
b/sdks/python/apache_beam/coders/slow_coders_test.py
index 97aa39ca094..b4fe0370a69 100644
--- a/sdks/python/apache_beam/coders/slow_coders_test.py
+++ b/sdks/python/apache_beam/coders/slow_coders_test.py
@@ -16,6 +16,9 @@
 #
 
 """Unit tests for uncompiled implementation of coder impls."""
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
 
 import logging
 import unittest
diff --git a/sdks/python/apache_beam/coders/slow_stream.py 
b/sdks/python/apache_beam/coders/slow_stream.py
index 1ab55d90f98..d497e3ff811 100644
--- a/sdks/python/apache_beam/coders/slow_stream.py
+++ b/sdks/python/apache_beam/coders/slow_stream.py
@@ -19,8 +19,14 @@
 
 For internal use only; no backwards-compatibility guarantees.
 """
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
 
 import struct
+from builtins import bytes
+from builtins import chr
+from builtins import object
 
 
 class OutputStream(object):
@@ -32,13 +38,13 @@ def __init__(self):
     self.data = []
 
   def write(self, b, nested=False):
-    assert isinstance(b, str)
+    assert isinstance(b, bytes)
     if nested:
       self.write_var_int64(len(b))
     self.data.append(b)
 
   def write_byte(self, val):
-    self.data.append(chr(val))
+    self.data.append(chr(val).encode('latin-1'))
 
   def write_var_int64(self, v):
     if v < 0:
diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py 
b/sdks/python/apache_beam/coders/standard_coders_test.py
index ca13b809379..66c297f6e38 100644
--- a/sdks/python/apache_beam/coders/standard_coders_test.py
+++ b/sdks/python/apache_beam/coders/standard_coders_test.py
@@ -17,6 +17,8 @@
 
 """Unit tests for coders that must be consistent across all Beam SDKs.
 """
+from __future__ import absolute_import
+from __future__ import division
 from __future__ import print_function
 
 import json
@@ -24,6 +26,7 @@
 import os.path
 import sys
 import unittest
+from builtins import map
 
 import yaml
 
@@ -74,7 +77,7 @@ class StandardCodersTest(unittest.TestCase):
           lambda x: IntervalWindow(
               start=Timestamp(micros=(x['end'] - x['span']) * 1000),
               end=Timestamp(micros=x['end'] * 1000)),
-      'urn:beam:coders:stream:0.1': lambda x, parser: map(parser, x),
+      'urn:beam:coders:stream:0.1': lambda x, parser: list(map(parser, x)),
       'urn:beam:coders:global_window:0.1': lambda x: window.GlobalWindow(),
       'urn:beam:coders:windowed_value:0.1':
           lambda x, value_parser, window_parser: windowed_value.create(
diff --git a/sdks/python/apache_beam/coders/stream.pxd 
b/sdks/python/apache_beam/coders/stream.pxd
index ade9b722c6e..df15c308632 100644
--- a/sdks/python/apache_beam/coders/stream.pxd
+++ b/sdks/python/apache_beam/coders/stream.pxd
@@ -23,7 +23,7 @@ cdef class OutputStream(object):
   cdef size_t buffer_size
   cdef size_t pos
 
-  cpdef write(self, bytes b, bint nested=*)
+  cpdef write(self, const unsigned char[:] b, bint nested=*)
   cpdef write_byte(self, unsigned char val)
   cpdef write_var_int64(self, libc.stdint.int64_t v)
   cpdef write_bigendian_int64(self, libc.stdint.int64_t signed_v)
@@ -39,7 +39,7 @@ cdef class OutputStream(object):
 cdef class ByteCountingOutputStream(OutputStream):
   cdef size_t count
 
-  cpdef write(self, bytes b, bint nested=*)
+  cpdef write(self, const unsigned char[:] b, bint nested=*)
   cpdef write_byte(self, unsigned char val)
   cpdef write_bigendian_int64(self, libc.stdint.int64_t val)
   cpdef write_bigendian_uint64(self, libc.stdint.uint64_t val)
diff --git a/sdks/python/apache_beam/coders/stream.pyx 
b/sdks/python/apache_beam/coders/stream.pyx
index 7c9521a8637..414c294c3b9 100644
--- a/sdks/python/apache_beam/coders/stream.pyx
+++ b/sdks/python/apache_beam/coders/stream.pyx
@@ -39,14 +39,15 @@ cdef class OutputStream(object):
     if self.data:
       libc.stdlib.free(self.data)
 
-  cpdef write(self, bytes b, bint nested=False):
+  cpdef write(self, const unsigned char[:] b, bint nested=False):
     cdef size_t blen = len(b)
     if nested:
       self.write_var_int64(blen)
     if self.buffer_size < self.pos + blen:
       self.extend(blen)
-    libc.string.memcpy(self.data + self.pos, <char*>b, blen)
-    self.pos += blen
+    if blen > 0:
+      libc.string.memcpy(self.data + self.pos, <char*> &b[0], blen)
+      self.pos += blen
 
   cpdef write_byte(self, unsigned char val):
     if  self.buffer_size < self.pos + 1:
@@ -122,7 +123,7 @@ cdef class ByteCountingOutputStream(OutputStream):
   def __cinit__(self):
     self.count = 0
 
-  cpdef write(self, bytes b, bint nested=False):
+  cpdef write(self, const unsigned char[:] b, bint nested=False):
     cdef size_t blen = len(b)
     if nested:
       self.write_var_int64(blen)
diff --git a/sdks/python/apache_beam/coders/stream_test.py 
b/sdks/python/apache_beam/coders/stream_test.py
index 15bc5eb9ba9..674c1730ecc 100644
--- a/sdks/python/apache_beam/coders/stream_test.py
+++ b/sdks/python/apache_beam/coders/stream_test.py
@@ -16,10 +16,15 @@
 #
 
 """Tests for the stream implementations."""
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
 
 import logging
 import math
 import unittest
+from builtins import int
+from builtins import range
 
 from apache_beam.coders import slow_stream
 
diff --git a/sdks/python/apache_beam/coders/typecoders.py 
b/sdks/python/apache_beam/coders/typecoders.py
index 355c6230f92..92c0c161c6f 100644
--- a/sdks/python/apache_beam/coders/typecoders.py
+++ b/sdks/python/apache_beam/coders/typecoders.py
@@ -63,8 +63,14 @@ def MakeXyzs(v):
 
 See apache_beam.typehints.decorators module for more details.
 """
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
 
-import six
+from builtins import bytes
+from builtins import int
+from builtins import object
+from builtins import str
 
 from apache_beam.coders import coders
 from apache_beam.typehints import typehints
@@ -84,9 +90,8 @@ def register_standard_coders(self, fallback_coder):
     """Register coders for all basic and composite types."""
     self._register_coder_internal(int, coders.VarIntCoder)
     self._register_coder_internal(float, coders.FloatCoder)
-    self._register_coder_internal(str, coders.BytesCoder)
     self._register_coder_internal(bytes, coders.BytesCoder)
-    self._register_coder_internal(six.text_type, coders.StrUtf8Coder)
+    self._register_coder_internal(str, coders.StrUtf8Coder)
     self._register_coder_internal(typehints.TupleConstraint, coders.TupleCoder)
     # Default fallback coders applied in that order until the first matching
     # coder found.
@@ -105,11 +110,21 @@ def register_coder(self, typehint_type, 
typehint_coder_class):
     self._register_coder_internal(typehint_type, typehint_coder_class)
 
   def get_coder(self, typehint):
-    coder = self._coders.get(
-        typehint.__class__ if isinstance(typehint, typehints.TypeConstraint)
-        else typehint, None)
-    if isinstance(typehint, typehints.TypeConstraint) and coder is not None:
-      return coder.from_type_hint(typehint, self)
+    if isinstance(typehint, typehints.TypeConstraint):
+      coder = self._coders.get(typehint.__class__)
+      if coder is not None:
+        return coder.from_type_hint(typehint, self)
+    else:
+      try:
+        t = typehint()
+        coder = self._coders.get(
+            str if isinstance(t, str)
+            else bytes if isinstance(t, bytes)
+            else int if isinstance(t, int) and not isinstance(t, bool)
+            else typehint, None)
+      except TypeError:
+        # typehint cannot be instantiated (without arguments)
+        coder = self._coders.get(typehint, None)
     if coder is None:
       # We use the fallback coder when there is no coder registered for a
       # typehint. For example a user defined class with no coder specified.
diff --git a/sdks/python/apache_beam/coders/typecoders_test.py 
b/sdks/python/apache_beam/coders/typecoders_test.py
index 2b6aa7a5129..fc08eedf151 100644
--- a/sdks/python/apache_beam/coders/typecoders_test.py
+++ b/sdks/python/apache_beam/coders/typecoders_test.py
@@ -16,8 +16,14 @@
 #
 
 """Unit tests for the typecoders module."""
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
 
 import unittest
+from builtins import int
+from builtins import object
+from builtins import str
 
 from apache_beam.coders import coders
 from apache_beam.coders import typecoders
@@ -33,14 +39,17 @@ def __init__(self, n):
   def __eq__(self, other):
     return self.number == other.number
 
+  def __hash__(self):
+    return self.number
+
 
 class CustomCoder(coders.Coder):
 
   def encode(self, value):
-    return str(value.number)
+    return str(value.number).encode('latin-1')
 
   def decode(self, encoded):
-    return CustomClass(int(encoded))
+    return CustomClass(encoded.decode('latin-1'))
 
   def is_deterministic(self):
     # This coder is deterministic. Though we don't use need this coder to be
diff --git a/sdks/python/generate_pydoc.sh b/sdks/python/generate_pydoc.sh
index 54795e2c90c..29cafb6911a 100755
--- a/sdks/python/generate_pydoc.sh
+++ b/sdks/python/generate_pydoc.sh
@@ -119,6 +119,9 @@ ignore_identifiers = [
   # Ignore broken built-in type references
   'tuple',
 
+  # Ignore future.builtin type references
+  'future.types.newobject.newobject',
+
   # Ignore private classes
   'apache_beam.coders.coders._PickleCoderBase',
   'apache_beam.coders.coders.FastCoder',
diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh
index 89c46ce8297..2505c20f2bb 100755
--- a/sdks/python/run_pylint.sh
+++ b/sdks/python/run_pylint.sh
@@ -90,25 +90,6 @@ pushd "$MODULE"
 isort -p apache_beam --line-width 120 --check-only --order-by-type 
--combine-star --force-single-line-imports --diff ${SKIP_PARAM}
 popd
 
-FUTURIZE_EXCLUDED=(
-  "typehints.py"
-  "pb2"
-  "trivial_infernce.py"
-)
-FUTURIZE_GREP_PARAM=$( IFS='|'; echo "${ids[*]}" )
-echo "Checking for files requiring stage 1 refactoring from futurize"
-futurize_results=$(futurize -j 8 --stage1 apache_beam 2>&1 |grep Refactored)
-futurize_filtered=$(echo "$futurize_results" |grep -v "$FUTURIZE_GREP_PARAM" 
|| echo "")
-count=${#futurize_filtered}
-if [ "$count" != "0" ]; then
-  echo "Some of the changes require futurize stage 1 changes."
-  echo "The files with required changes:"
-  echo "$futurize_filtered"
-  echo "You can run futurize apache_beam to see the proposed changes."
-  exit 1
-fi
-echo "No future changes needed"
-
 echo "Checking unittest.main for module ${MODULE}:"
 TESTS_MISSING_MAIN=$(find ${MODULE} | grep '\.py$' | xargs grep -l '^import 
unittest$' | xargs grep -L unittest.main)
 if [ -n "${TESTS_MISSING_MAIN}" ]; then
@@ -118,4 +99,4 @@ if [ -n "${TESTS_MISSING_MAIN}" ]; then
   done
   echo
   exit 1
-fi
+fi
\ No newline at end of file
diff --git a/sdks/python/run_pylint_2to3.sh b/sdks/python/run_pylint_2to3.sh
new file mode 100644
index 00000000000..63df49d40ec
--- /dev/null
+++ b/sdks/python/run_pylint_2to3.sh
@@ -0,0 +1,90 @@
+#!/bin/bash
+#
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+#
+
+# This script will run pylint with the --py3k parameter to check for python
+# 3 compatibility. This script can run on a list of modules provided as
+# command line arguments.
+#
+# The exit-code of the script indicates success or a failure.
+
+set -o errexit
+set -o pipefail
+
+DEFAULT_MODULE=apache_beam
+
+usage(){ echo "Usage: $0 [MODULE|--help]
+# The default MODULE is $DEFAULT_MODULE"; }
+
+MODULES=${DEFAULT_MODULE}
+while [[ $# -gt 0 ]] ; do
+  key="$1"
+  case ${key} in
+    --help) usage; exit 1;;
+       *)
+       if [ ${MODULES} = ${DEFAULT_MODULE} ] ; then
+         MODULES=()
+       fi
+       MODULES+=("$1")
+       shift;;
+  esac
+done
+
+FUTURIZE_EXCLUDED=(
+  "typehints.py"
+  "pb2"
+  "trivial_infernce.py"
+)
+FUTURIZE_GREP_PARAM=$( IFS='|'; echo "${ids[*]}" )
+echo "Checking for files requiring stage 1 refactoring from futurize"
+futurize_results=$(futurize -j 8 --stage1 apache_beam 2>&1 |grep Refactored)
+futurize_filtered=$(echo "$futurize_results" |grep -v "$FUTURIZE_GREP_PARAM" \
+  || echo "")
+count=${#futurize_filtered}
+if [ "$count" != "0" ]; then
+  echo "Some of the changes require futurize stage 1 changes."
+  echo "The files with required changes:"
+  echo "$futurize_filtered"
+  echo "You can run futurize apache_beam to see the proposed changes."
+  exit 1
+fi
+echo "No future changes needed"
+
+# Following generated files are excluded from lint checks.
+EXCLUDED_GENERATED_FILES=(
+"apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py"
+"apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_messages.py"
+"apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_client.py"
+"apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py"
+"apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py"
+"apache_beam/io/gcp/internal/clients/storage/storage_v1_messages.py"
+"apache_beam/coders/proto2_coder_test_messages_pb2.py"
+apache_beam/portability/api/*pb2*.py
+)
+
+FILES_TO_IGNORE=""
+for file in "${EXCLUDED_GENERATED_FILES[@]}"; do
+  if test -z "$FILES_TO_IGNORE"
+    then FILES_TO_IGNORE="$(basename $file)"
+    else FILES_TO_IGNORE="$FILES_TO_IGNORE, $(basename $file)"
+  fi
+done
+echo "Skipping lint for generated files: $FILES_TO_IGNORE"
+
+echo "Running pylint --py3k for modules $( printf "%s " "${MODULES[@]}" ):"
+pylint -j8 $( printf "%s " "${MODULES[@]}" ) \
+  --ignore-patterns="$FILES_TO_IGNORE" --py3k
\ No newline at end of file
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index b7f400e739e..86d36621187 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -69,7 +69,7 @@ def get_version():
   )
 
 
-REQUIRED_CYTHON_VERSION = '0.26.1'
+REQUIRED_CYTHON_VERSION = '0.28.1'
 try:
   _CYTHON_VERSION = get_distribution('cython').version
   if StrictVersion(_CYTHON_VERSION) < StrictVersion(REQUIRED_CYTHON_VERSION):
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index ff88ac42fa8..aa7e4ca146e 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -17,7 +17,7 @@
 
 [tox]
 # new environments will be excluded by default unless explicitly added to 
envlist.
-envlist = py27,py27-{gcp,cython,lint},py3-lint,docs
+envlist = py27,py27-{gcp,cython,lint,lint3},py3-lint,docs
 toxworkdir = {toxinidir}/target/.tox
 
 [pycodestyle]
@@ -35,7 +35,8 @@ whitelist_externals =
   time
 deps =
   grpcio-tools==1.3.5
-  cython: cython==0.26.1
+  cython: cython==0.28.1
+  future==0.16.0
 
 # These 2 magic command overrides are required for Jenkins builds.
 # Otherwise we get "OSError: [Errno 2] No such file or directory" errors.
@@ -59,6 +60,9 @@ commands =
 # `platform = linux2|darwin|...`
 # See https://docs.python.org/2/library/sys.html#sys.platform for platform 
codes
 platform = linux2
+deps =
+  cython==0.28.1
+  future==0.16.0
 commands =
   python --version
   pip --version
@@ -89,6 +93,21 @@ commands =
   pip --version
   time {toxinidir}/run_pylint.sh
 
+[testenv:py27-lint3]
+deps =
+  pycodestyle==2.3.1
+  pylint==1.7.2
+  future==0.16.0
+  isort==4.2.15
+  flake8==3.5.0
+modules =
+  apache_beam/coders
+commands =
+  python --version
+  pip --version
+  time {toxinidir}/run_pylint_2to3.sh {[testenv:py27-lint3]modules}
+
+
 [testenv:py3-lint]
 deps =
   pycodestyle==2.3.1


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 88955)
    Time Spent: 6h 10m  (was: 6h)

> Futurize and fix python 2 compatibility for coders package
> ----------------------------------------------------------
>
>                 Key: BEAM-3981
>                 URL: https://issues.apache.org/jira/browse/BEAM-3981
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Robbe
>            Assignee: Ahmet Altay
>            Priority: Major
>          Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> Run automatic conversion with futurize tool on coders subpackage and fix 
> python 2 compatibility. This prepares the subpackage for python 3 support.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to