[ 
https://issues.apache.org/jira/browse/BEAM-4003?focusedWorklogId=119886&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-119886
 ]

ASF GitHub Bot logged work on BEAM-4003:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 06/Jul/18 15:48
            Start Date: 06/Jul/18 15:48
    Worklog Time Spent: 10m 
      Work Description: charlesccychen closed pull request #5373: [BEAM-4003] 
Futurize runners subpackage
URL: https://github.com/apache/beam/pull/5373
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/runners/__init__.py 
b/sdks/python/apache_beam/runners/__init__.py
index 863e67ed6e9..ad5c3f626fa 100644
--- a/sdks/python/apache_beam/runners/__init__.py
+++ b/sdks/python/apache_beam/runners/__init__.py
@@ -20,6 +20,8 @@
 This package defines runners, which are used to execute a pipeline.
 """
 
+from __future__ import absolute_import
+
 from apache_beam.runners.direct.direct_runner import DirectRunner
 from apache_beam.runners.runner import PipelineRunner
 from apache_beam.runners.runner import PipelineState
diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index 43bbfcf7e68..03333a8df72 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 #
 
+# cython: language_level=3
 # cython: profile=True
 
 """Worker operations executor.
@@ -22,10 +23,17 @@
 For internal use only; no backwards-compatibility guarantees.
 """
 
+from __future__ import absolute_import
+
 import sys
 import traceback
+from builtins import next
+from builtins import object
+from builtins import zip
 
-import six
+from future.utils import raise_
+from past.builtins import basestring
+from past.builtins import unicode
 
 from apache_beam.internal import util
 from apache_beam.pvalue import TaggedOutput
@@ -615,7 +623,7 @@ def _reraise_augmented(self, exn):
           traceback.format_exception_only(type(exn), exn)[-1].strip()
           + step_annotation)
       new_exn._tagged_with_step = True
-    six.reraise(type(new_exn), new_exn, original_traceback)
+    raise_(type(new_exn), new_exn, original_traceback)
 
 
 class OutputProcessor(object):
@@ -652,7 +660,7 @@ def process_outputs(self, windowed_input_element, results):
       tag = None
       if isinstance(result, TaggedOutput):
         tag = result.tag
-        if not isinstance(tag, six.string_types):
+        if not isinstance(tag, basestring):
           raise TypeError('In %s, tag %s is not a string' % (self, tag))
         result = result.value
       if isinstance(result, WindowedValue):
@@ -694,7 +702,7 @@ def finish_bundle_outputs(self, results):
       tag = None
       if isinstance(result, TaggedOutput):
         tag = result.tag
-        if not isinstance(tag, six.string_types):
+        if not isinstance(tag, (str, unicode)):
           raise TypeError('In %s, tag %s is not a string' % (self, tag))
         result = result.value
 
diff --git a/sdks/python/apache_beam/runners/common_test.py 
b/sdks/python/apache_beam/runners/common_test.py
index e0f628c71ee..d4848e48abe 100644
--- a/sdks/python/apache_beam/runners/common_test.py
+++ b/sdks/python/apache_beam/runners/common_test.py
@@ -15,6 +15,8 @@
 # limitations under the License.
 #
 
+from __future__ import absolute_import
+
 import unittest
 
 from apache_beam.runners.common import DoFnSignature
diff --git a/sdks/python/apache_beam/runners/dataflow/__init__.py 
b/sdks/python/apache_beam/runners/dataflow/__init__.py
index 6674ba5d9ff..2148f1691e9 100644
--- a/sdks/python/apache_beam/runners/dataflow/__init__.py
+++ b/sdks/python/apache_beam/runners/dataflow/__init__.py
@@ -21,5 +21,7 @@
 with no backwards-compatibility guarantees.
 """
 
+from __future__ import absolute_import
+
 from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner
 from apache_beam.runners.dataflow.test_dataflow_runner import 
TestDataflowRunner
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
index 3f039f7d014..89cba3df60b 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
@@ -21,9 +21,13 @@
 service.
 """
 
+from __future__ import absolute_import
+
 import numbers
 from collections import defaultdict
 
+from future.utils import iteritems
+
 from apache_beam.metrics.cells import DistributionData
 from apache_beam.metrics.cells import DistributionResult
 from apache_beam.metrics.execution import MetricKey
@@ -145,7 +149,7 @@ def _populate_metric_results(self, response):
 
     # Now we create the MetricResult elements.
     result = []
-    for metric_key, metric in metrics_by_name.iteritems():
+    for metric_key, metric in iteritems(metrics_by_name):
       attempted = self._get_metric_value(metric['tentative'])
       committed = self._get_metric_value(metric['committed'])
       if attempted is None or committed is None:
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
index 67ef923e3d9..1823b59928f 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
@@ -18,8 +18,12 @@
 Tests corresponding to the DataflowRunner implementation of MetricsResult,
 the DataflowMetrics class.
 """
+
+from __future__ import absolute_import
+
 import types
 import unittest
+from builtins import object
 
 import mock
 
@@ -34,7 +38,7 @@
 class DictToObject(object):
   """Translate from a dict(list()) structure to an object structure"""
   def __init__(self, data):
-    for name, value in data.iteritems():
+    for name, value in data.items():
       setattr(self, name, self._wrap(value))
 
   def _wrap(self, value):
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 9c8520250e7..3d5dab9b57b 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -20,14 +20,19 @@
 The runner will create a JSON description of the job graph and then submit it
 to the Dataflow Service for remote execution by a worker.
 """
+from __future__ import absolute_import
+from __future__ import division
 
 import logging
 import threading
 import time
 import traceback
-import urllib
+from builtins import hex
 from collections import defaultdict
 
+from future.moves.urllib.parse import quote
+from future.moves.urllib.parse import unquote
+
 import apache_beam as beam
 from apache_beam import coders
 from apache_beam import error
@@ -125,7 +130,7 @@ def rank_error(msg):
 
     if duration:
       start_secs = time.time()
-      duration_secs = duration / 1000
+      duration_secs = duration // 1000
 
     job_id = result.job_id()
     while True:
@@ -642,7 +647,7 @@ def run_ParDo(self, transform_node):
       if (label_renames and
           transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn):
         # Patch PTransform proto.
-        for old, new in label_renames.iteritems():
+        for old, new in iteritems(label_renames):
           transform_proto.inputs[new] = transform_proto.inputs[old]
           del transform_proto.inputs[old]
 
@@ -650,7 +655,7 @@ def run_ParDo(self, transform_node):
         proto_type, _ = beam.PTransform._known_urns[transform_proto.spec.urn]
         proto = proto_utils.parse_Bytes(transform_proto.spec.payload,
                                         proto_type)
-        for old, new in label_renames.iteritems():
+        for old, new in iteritems(label_renames):
           proto.side_inputs[new].CopyFrom(proto.side_inputs[old])
           del proto.side_inputs[old]
         transform_proto.spec.payload = proto.SerializeToString()
@@ -965,12 +970,12 @@ def deserialize_windowing_strategy(cls, serialized_data):
   @staticmethod
   def byte_array_to_json_string(raw_bytes):
     """Implements 
org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString."""
-    return urllib.quote(raw_bytes)
+    return quote(raw_bytes)
 
   @staticmethod
   def json_string_to_byte_array(encoded_string):
     """Implements 
org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray."""
-    return urllib.unquote(encoded_string)
+    return unquote(encoded_string)
 
 
 class _DataflowSideInput(beam.pvalue.AsSideInput):
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index c8790824bed..dd913584f68 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -17,8 +17,12 @@
 
 """Unit tests for the DataflowRunner class."""
 
+from __future__ import absolute_import
+
 import json
 import unittest
+from builtins import object
+from builtins import range
 from datetime import datetime
 
 import mock
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/__init__.py 
b/sdks/python/apache_beam/runners/dataflow/internal/__init__.py
index cce3acad34a..f4f43cbb123 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/__init__.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/__init__.py
@@ -14,3 +14,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+from __future__ import absolute_import
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 784166cafda..f6c15a667a4 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -19,6 +19,9 @@
 
 Dataflow client utility functions."""
 
+from __future__ import absolute_import
+
+from builtins import object
 import codecs
 import getpass
 import json
@@ -28,12 +31,13 @@
 import tempfile
 import time
 from datetime import datetime
-from StringIO import StringIO
+import io
+
+from past.builtins import unicode
 
 import pkg_resources
 from apitools.base.py import encoding
 from apitools.base.py import exceptions
-import six
 
 from apache_beam import version as beam_version
 from apache_beam.internal.gcp.auth import get_service_credentials
@@ -262,7 +266,7 @@ def __init__(self, packages, options, environment_version, 
pipeline_url):
           dataflow.Environment.SdkPipelineOptionsValue())
 
       options_dict = {k: v
-                      for k, v in sdk_pipeline_options.iteritems()
+                      for k, v in sdk_pipeline_options.items()
                       if v is not None}
       options_dict["pipelineUrl"] = pipeline_url
       self.proto.sdkPipelineOptions.additionalProperties.append(
@@ -298,7 +302,7 @@ def encode_shortstrings(input_buffer, errors='strict'):
     def decode_shortstrings(input_buffer, errors='strict'):
       """Decoder (to Unicode) that suppresses long base64 strings."""
       shortened, length = encode_shortstrings(input_buffer, errors)
-      return six.text_type(shortened), length
+      return unicode(shortened), length
 
     def shortstrings_registerer(encoding_name):
       if encoding_name == 'shortstrings':
@@ -493,7 +497,7 @@ def create_job(self, job):
     if job_location:
       gcs_or_local_path = os.path.dirname(job_location)
       file_name = os.path.basename(job_location)
-      self.stage_file(gcs_or_local_path, file_name, StringIO(job.json()))
+      self.stage_file(gcs_or_local_path, file_name, io.BytesIO(job.json()))
 
     if not template_location:
       return self.submit_job_description(job)
@@ -508,7 +512,7 @@ def create_job_description(self, job):
     # Stage the pipeline for the runner harness
     self.stage_file(job.google_cloud_options.staging_location,
                     names.STAGED_PIPELINE_FILENAME,
-                    StringIO(job.proto_pipeline.SerializeToString()))
+                    io.BytesIO(job.proto_pipeline.SerializeToString()))
 
     # Stage other resources for the SDK harness
     resources = self._stage_resources(job.options)
diff --git 
a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index 2ba4e840cd3..aa0983e6fdb 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -15,6 +15,9 @@
 # limitations under the License.
 #
 """Unit tests for the apiclient module."""
+
+from __future__ import absolute_import
+
 import unittest
 
 import mock
diff --git 
a/sdks/python/apache_beam/runners/dataflow/internal/clients/__init__.py 
b/sdks/python/apache_beam/runners/dataflow/internal/clients/__init__.py
index cce3acad34a..f4f43cbb123 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/clients/__init__.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/clients/__init__.py
@@ -14,3 +14,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+from __future__ import absolute_import
diff --git 
a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/__init__.py
 
b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/__init__.py
index c0d20c3ec8f..ce260c515f0 100644
--- 
a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/__init__.py
+++ 
b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/__init__.py
@@ -18,6 +18,8 @@
 """Common imports for generated dataflow client library."""
 # pylint:disable=wildcard-import
 
+from __future__ import absolute_import
+
 import pkgutil
 
 # Protect against environments where apitools library is not available.
diff --git 
a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_client.py
 
b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_client.py
index 61d02730ab5..02b424eb994 100644
--- 
a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_client.py
+++ 
b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_client.py
@@ -17,6 +17,9 @@
 
 """Generated client library for dataflow version v1b3."""
 # NOTE: This file is autogenerated and should not be edited by hand.
+
+from __future__ import absolute_import
+
 from apitools.base.py import base_api
 
 from apache_beam.runners.dataflow.internal.clients.dataflow import 
dataflow_v1b3_messages as messages
diff --git 
a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py
 
b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py
index fdc1681f33e..bdb5c6d2662 100644
--- 
a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py
+++ 
b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py
@@ -22,6 +22,8 @@
 """
 # NOTE: This file is autogenerated and should not be edited by hand.
 
+from __future__ import absolute_import
+
 from apitools.base.protorpclite import messages as _messages
 from apitools.base.py import encoding
 from apitools.base.py import extra_types
diff --git 
a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/message_matchers.py
 
b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/message_matchers.py
index 805473a8838..8389e627f40 100644
--- 
a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/message_matchers.py
+++ 
b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/message_matchers.py
@@ -14,7 +14,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+from __future__ import absolute_import
 
+from future.utils import iteritems
 from hamcrest.core.base_matcher import BaseMatcher
 
 IGNORED = object()
@@ -49,7 +51,7 @@ def _matches(self, item):
     if self.origin != IGNORED and item.origin != self.origin:
       return False
     if self.context != IGNORED:
-      for key, name in self.context.iteritems():
+      for key, name in iteritems(self.context):
         if key not in item.context:
           return False
         if name != IGNORED and item.context[key] != name:
diff --git 
a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/message_matchers_test.py
 
b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/message_matchers_test.py
index 15bb9eff083..3e6b6d71b57 100644
--- 
a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/message_matchers_test.py
+++ 
b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/message_matchers_test.py
@@ -14,6 +14,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+from __future__ import absolute_import
+
 import unittest
 
 import hamcrest as hc
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py 
b/sdks/python/apache_beam/runners/dataflow/internal/names.py
index 7e0b81e2130..6f2b8c29735 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/names.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py
@@ -20,8 +20,12 @@
 # All constants are for internal use only; no backwards-compatibility
 # guarantees.
 
+from __future__ import absolute_import
+
 # TODO (altay): Move shared names to a common location.
 # Standard file names used for staging files.
+from builtins import object
+
 PICKLED_MAIN_SESSION_FILE = 'pickled_main_session'
 DATAFLOW_SDK_TARBALL_FILE = 'dataflow_python_sdk.tar'
 STAGED_PIPELINE_FILENAME = "pipeline.pb"
diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/__init__.py 
b/sdks/python/apache_beam/runners/dataflow/native_io/__init__.py
index cce3acad34a..f4f43cbb123 100644
--- a/sdks/python/apache_beam/runners/dataflow/native_io/__init__.py
+++ b/sdks/python/apache_beam/runners/dataflow/native_io/__init__.py
@@ -14,3 +14,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+from __future__ import absolute_import
diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py 
b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
index 2f2316f6f1d..2096dc99833 100644
--- a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
+++ b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
@@ -20,7 +20,10 @@
 For internal use only; no backwards-compatibility guarantees.
 """
 
+from __future__ import absolute_import
+
 import logging
+from builtins import object
 
 from apache_beam import pvalue
 from apache_beam.io import iobase
@@ -31,7 +34,7 @@
 def _dict_printable_fields(dict_object, skip_fields):
   """Returns a list of strings for the interesting fields of a dict."""
   return ['%s=%r' % (name, value)
-          for name, value in dict_object.iteritems()
+          for name, value in dict_object.items()
           # want to output value 0 but not None nor []
           if (value or value == 0)
           and name not in skip_fields]
diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/iobase_test.py 
b/sdks/python/apache_beam/runners/dataflow/native_io/iobase_test.py
index 01fd35f9cf9..828455b6d85 100644
--- a/sdks/python/apache_beam/runners/dataflow/native_io/iobase_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/native_io/iobase_test.py
@@ -17,6 +17,7 @@
 
 """Tests corresponding to Dataflow's iobase module."""
 
+from __future__ import absolute_import
 
 import unittest
 
diff --git 
a/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py 
b/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py
index a54ee7767c5..980ad24c796 100644
--- a/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py
+++ b/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py
@@ -17,6 +17,10 @@
 
 """Create transform for streaming."""
 
+from __future__ import absolute_import
+
+from builtins import map
+
 from apache_beam import DoFn
 from apache_beam import ParDo
 from apache_beam import PTransform
@@ -34,7 +38,7 @@ class StreamingCreate(PTransform):
 
   def __init__(self, values, coder):
     self.coder = coder
-    self.encoded_values = map(coder.encode, values)
+    self.encoded_values = list(map(coder.encode, values))
 
   class DecodeAndEmitDoFn(DoFn):
     """A DoFn which stores encoded versions of elements.
diff --git a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py 
b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
index 0ce212fa31b..5095e48d802 100644
--- a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
+++ b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
@@ -17,6 +17,8 @@
 
 """Ptransform overrides for DataflowRunner."""
 
+from __future__ import absolute_import
+
 from apache_beam.coders import typecoders
 from apache_beam.pipeline import PTransformOverride
 
diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
index eedfa60f9fd..8c61f104e39 100644
--- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
@@ -16,6 +16,8 @@
 #
 
 """Wrapper of Beam runners that's built for running and verifying e2e tests."""
+
+from __future__ import absolute_import
 from __future__ import print_function
 
 import logging
diff --git a/sdks/python/apache_beam/runners/direct/__init__.py 
b/sdks/python/apache_beam/runners/direct/__init__.py
index 0f8275674bb..0b647fd6f7c 100644
--- a/sdks/python/apache_beam/runners/direct/__init__.py
+++ b/sdks/python/apache_beam/runners/direct/__init__.py
@@ -20,4 +20,6 @@
 Anything in this package not imported here is an internal implementation detail
 with no backwards-compatibility guarantees.
 """
+from __future__ import absolute_import
+
 from apache_beam.runners.direct.direct_runner import DirectRunner
diff --git a/sdks/python/apache_beam/runners/direct/bundle_factory.py 
b/sdks/python/apache_beam/runners/direct/bundle_factory.py
index 942d2824dbf..558e925df3e 100644
--- a/sdks/python/apache_beam/runners/direct/bundle_factory.py
+++ b/sdks/python/apache_beam/runners/direct/bundle_factory.py
@@ -19,6 +19,8 @@
 
 from __future__ import absolute_import
 
+from builtins import object
+
 from apache_beam import pvalue
 from apache_beam.runners import common
 from apache_beam.utils.windowed_value import WindowedValue
diff --git a/sdks/python/apache_beam/runners/direct/clock.py 
b/sdks/python/apache_beam/runners/direct/clock.py
index ad079941466..6dbf8b2a7c0 100644
--- a/sdks/python/apache_beam/runners/direct/clock.py
+++ b/sdks/python/apache_beam/runners/direct/clock.py
@@ -22,6 +22,7 @@
 from __future__ import absolute_import
 
 import time
+from builtins import object
 
 
 class Clock(object):
diff --git 
a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
 
b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
index 4efaa27f095..a1044e7a33b 100644
--- 
a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
+++ 
b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
@@ -16,6 +16,7 @@
 #
 
 """Tests for consumer_tracking_pipeline_visitor."""
+from __future__ import absolute_import
 
 import logging
 import unittest
diff --git a/sdks/python/apache_beam/runners/direct/direct_metrics.py 
b/sdks/python/apache_beam/runners/direct/direct_metrics.py
index 67f5780005f..4d9f83f3a5f 100644
--- a/sdks/python/apache_beam/runners/direct/direct_metrics.py
+++ b/sdks/python/apache_beam/runners/direct/direct_metrics.py
@@ -20,7 +20,10 @@
 responding to queries of current metrics, but also of keeping the common
 state consistent.
 """
+from __future__ import absolute_import
+
 import threading
+from builtins import object
 from collections import defaultdict
 
 from apache_beam.metrics.cells import CounterAggregator
diff --git a/sdks/python/apache_beam/runners/direct/direct_metrics_test.py 
b/sdks/python/apache_beam/runners/direct/direct_metrics_test.py
index f36178601ff..3ce42c1bb01 100644
--- a/sdks/python/apache_beam/runners/direct/direct_metrics_test.py
+++ b/sdks/python/apache_beam/runners/direct/direct_metrics_test.py
@@ -15,6 +15,8 @@
 # limitations under the License.
 #
 
+from __future__ import absolute_import
+
 import unittest
 
 import hamcrest as hc
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner_test.py 
b/sdks/python/apache_beam/runners/direct/direct_runner_test.py
index 231cca72476..f258168dd60 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner_test.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner_test.py
@@ -15,6 +15,8 @@
 # limitations under the License.
 #
 
+from __future__ import absolute_import
+
 import threading
 import unittest
 
diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py 
b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index 893e32e3357..01d0631c5ee 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -21,6 +21,7 @@
 
 import collections
 import threading
+from builtins import object
 
 from apache_beam.runners.direct.direct_metrics import DirectMetrics
 from apache_beam.runners.direct.executor import TransformExecutor
@@ -299,7 +300,7 @@ def handle_result(
 
       # Commit partial GBK states
       existing_keyed_state = self._transform_keyed_states[result.transform]
-      for k, v in result.partial_keyed_state.iteritems():
+      for k, v in result.partial_keyed_state.items():
         existing_keyed_state[k] = v
       return committed_bundles
 
diff --git a/sdks/python/apache_beam/runners/direct/executor.py 
b/sdks/python/apache_beam/runners/direct/executor.py
index ef6469644c2..6fe3795df4e 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -22,13 +22,15 @@
 import collections
 import itertools
 import logging
-import Queue
 import sys
 import threading
 import traceback
+from builtins import object
+from builtins import range
 from weakref import WeakValueDictionary
 
-import six
+from future.moves import queue
+from future.utils import raise_
 
 from apache_beam.metrics.execution import MetricsContainer
 from apache_beam.runners.worker import statesampler
@@ -80,7 +82,7 @@ def _get_task_or_none(self):
         # shutdown.
         return self.queue.get(
             timeout=_ExecutorService._ExecutorServiceWorker.TIMEOUT)
-      except Queue.Empty:
+      except queue.Empty:
         return None
 
     def run(self):
@@ -101,7 +103,7 @@ def shutdown(self):
       self.shutdown_requested = True
 
   def __init__(self, num_workers):
-    self.queue = Queue.Queue()
+    self.queue = queue.Queue()
     self.workers = [_ExecutorService._ExecutorServiceWorker(
         self.queue, i) for i in range(num_workers)]
     self.shutdown_requested = False
@@ -126,7 +128,7 @@ def shutdown(self):
       try:
         self.queue.get_nowait()
         self.queue.task_done()
-      except Queue.Empty:
+      except queue.Empty:
         continue
     # All existing threads will eventually terminate (after they complete their
     # last task).
@@ -441,7 +443,7 @@ def await_completion(self):
     try:
       if update.exception:
         t, v, tb = update.exc_info
-        six.reraise(t, v, tb)
+        raise_(t, v, tb)
     finally:
       self.executor_service.shutdown()
       self.executor_service.await_completion()
@@ -481,14 +483,14 @@ class _TypedUpdateQueue(object):
 
     def __init__(self, item_type):
       self._item_type = item_type
-      self._queue = Queue.Queue()
+      self._queue = queue.Queue()
 
     def poll(self):
       try:
         item = self._queue.get_nowait()
         self._queue.task_done()
         return  item
-      except Queue.Empty:
+      except queue.Empty:
         return None
 
     def take(self):
@@ -501,7 +503,7 @@ def take(self):
           item = self._queue.get(timeout=1)
           self._queue.task_done()
           return item
-        except Queue.Empty:
+        except queue.Empty:
           pass
 
     def offer(self, item):
diff --git a/sdks/python/apache_beam/runners/direct/helper_transforms.py 
b/sdks/python/apache_beam/runners/direct/helper_transforms.py
index 0c1da035126..6d894fb4ba0 100644
--- a/sdks/python/apache_beam/runners/direct/helper_transforms.py
+++ b/sdks/python/apache_beam/runners/direct/helper_transforms.py
@@ -15,6 +15,8 @@
 # limitations under the License.
 #
 
+from __future__ import absolute_import
+
 import collections
 import itertools
 
diff --git a/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py 
b/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py
index 610664be923..641863243d6 100644
--- a/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py
@@ -18,6 +18,9 @@
 """This module contains Splittable DoFn logic that is specific to DirectRunner.
 """
 
+from __future__ import absolute_import
+
+from builtins import object
 from threading import Lock
 from threading import Timer
 
diff --git a/sdks/python/apache_beam/runners/direct/sdf_direct_runner_test.py 
b/sdks/python/apache_beam/runners/direct/sdf_direct_runner_test.py
index e8ef9b6b968..d5924cb180b 100644
--- a/sdks/python/apache_beam/runners/direct/sdf_direct_runner_test.py
+++ b/sdks/python/apache_beam/runners/direct/sdf_direct_runner_test.py
@@ -17,9 +17,13 @@
 
 """Unit tests for SDF implementation for DirectRunner."""
 
+from __future__ import absolute_import
+from __future__ import division
+
 import logging
 import os
 import unittest
+from builtins import range
 
 import apache_beam as beam
 from apache_beam import Create
@@ -192,7 +196,7 @@ def test_sdf_multiple_checkpoints_multiple_element(self):
         int(self._default_max_num_outputs * 3))
 
   def test_sdf_with_resume_single_element(self):
-    resume_count = self._default_max_num_outputs / 10
+    resume_count = self._default_max_num_outputs // 10
     # Makes sure that resume_count is not trivial.
     assert resume_count > 0
 
@@ -202,7 +206,7 @@ def test_sdf_with_resume_single_element(self):
         resume_count)
 
   def test_sdf_with_resume_multiple_elements(self):
-    resume_count = self._default_max_num_outputs / 10
+    resume_count = self._default_max_num_outputs // 10
     assert resume_count > 0
 
     self.run_sdf_read_pipeline(
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py 
b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 4b392142afb..38381fa5fd2 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -22,6 +22,9 @@
 import collections
 import random
 import time
+from builtins import object
+
+from future.utils import iteritems
 
 import apache_beam.io as io
 from apache_beam import coders
@@ -445,7 +448,7 @@ def _get_element(message):
         return timestamp, parsed_message
 
       return [_get_element(message)
-              for unused_ack_id, message in results.items()]
+              for unused_ack_id, message in iteritems(results)]
 
   def finish_bundle(self):
     data = self._read_from_pubsub(self.source.timestamp_attribute)
@@ -575,7 +578,7 @@ def process_element(self, element):
 
   def finish_bundle(self):
     self.runner.finish()
-    bundles = self._tagged_receivers.values()
+    bundles = list(self._tagged_receivers.values())
     result_counters = self._counter_factory.get_counters()
     return TransformResult(
         self, bundles, [], result_counters, None)
@@ -716,7 +719,7 @@ def process_element(self, element):
   def finish_bundle(self):
     bundles = []
     bundle = None
-    for encoded_k, vs in self.gbk_items.iteritems():
+    for encoded_k, vs in iteritems(self.gbk_items):
       if not bundle:
         bundle = self._evaluation_context.create_bundle(
             self.output_pcollection)
diff --git a/sdks/python/apache_beam/runners/direct/util.py 
b/sdks/python/apache_beam/runners/direct/util.py
index 797a7432644..407ea39a21a 100644
--- a/sdks/python/apache_beam/runners/direct/util.py
+++ b/sdks/python/apache_beam/runners/direct/util.py
@@ -22,6 +22,8 @@
 
 from __future__ import absolute_import
 
+from builtins import object
+
 
 class TransformResult(object):
   """Result of evaluating an AppliedPTransform with a TransformEvaluator."""
diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py 
b/sdks/python/apache_beam/runners/direct/watermark_manager.py
index 3cdc24830a5..8b50919aaf6 100644
--- a/sdks/python/apache_beam/runners/direct/watermark_manager.py
+++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py
@@ -20,6 +20,7 @@
 from __future__ import absolute_import
 
 import threading
+from builtins import object
 
 from apache_beam import pipeline
 from apache_beam import pvalue
@@ -156,7 +157,7 @@ def extract_all_timers(self):
     and reports if there are any timers set."""
     all_timers = []
     has_realtime_timer = False
-    for applied_ptransform, tw in self._transform_to_watermarks.iteritems():
+    for applied_ptransform, tw in self._transform_to_watermarks.items():
       fired_timers, had_realtime_timer = tw.extract_transform_timers()
       if fired_timers:
         all_timers.append((applied_ptransform, fired_timers))
@@ -203,7 +204,7 @@ def output_watermark(self):
 
   def hold(self, keyed_earliest_holds):
     with self._lock:
-      for key, hold_value in keyed_earliest_holds.iteritems():
+      for key, hold_value in keyed_earliest_holds.items():
         self._keyed_earliest_holds[key] = hold_value
         if (hold_value is None or
             hold_value == WatermarkManager.WATERMARK_POS_INF):
@@ -265,7 +266,7 @@ def extract_transform_timers(self):
     with self._lock:
       fired_timers = []
       has_realtime_timer = False
-      for encoded_key, state in self._keyed_states.iteritems():
+      for encoded_key, state in self._keyed_states.items():
         timers, had_realtime_timer = state.get_timers(
             watermark=self._input_watermark,
             processing_time=self._clock.time())
diff --git a/sdks/python/apache_beam/runners/experimental/__init__.py 
b/sdks/python/apache_beam/runners/experimental/__init__.py
index cce3acad34a..f4f43cbb123 100644
--- a/sdks/python/apache_beam/runners/experimental/__init__.py
+++ b/sdks/python/apache_beam/runners/experimental/__init__.py
@@ -14,3 +14,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+from __future__ import absolute_import
diff --git 
a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/__init__.py 
b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/__init__.py
index 5d140301393..0b416aa8e52 100644
--- a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/__init__.py
+++ b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/__init__.py
@@ -19,4 +19,6 @@
 sends a runner API proto over the API and then runs it on the other side.
 """
 
+from __future__ import absolute_import
+
 from 
apache_beam.runners.experimental.python_rpc_direct.python_rpc_direct_runner 
import PythonRPCDirectRunner
diff --git 
a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py
 
b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py
index 84bed4270bc..57056abf8b6 100644
--- 
a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py
+++ 
b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py
@@ -18,9 +18,12 @@
 """A runner implementation that submits a job for remote execution.
 """
 
+from __future__ import absolute_import
+
 import logging
 import random
 import string
+from builtins import range
 
 import grpc
 
@@ -59,7 +62,7 @@ def run(self, pipeline):
     # Submit the job to the RPC co-process
     jobName = ('Job-' +
                ''.join(random.choice(string.ascii_uppercase) for _ in 
range(6)))
-    options = {k: v for k, v in pipeline._options.get_all_options().iteritems()
+    options = {k: v for k, v in pipeline._options.get_all_options().items()
                if v is not None}
 
     try:
diff --git 
a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py 
b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py
index 4986dc40abc..a0397c601a2 100644
--- a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py
+++ b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py
@@ -17,6 +17,8 @@
 
 """A runner implementation that submits a job for remote execution.
 """
+from __future__ import absolute_import
+
 import time
 import uuid
 from concurrent import futures
diff --git a/sdks/python/apache_beam/runners/job/__init__.py 
b/sdks/python/apache_beam/runners/job/__init__.py
index cce3acad34a..f4f43cbb123 100644
--- a/sdks/python/apache_beam/runners/job/__init__.py
+++ b/sdks/python/apache_beam/runners/job/__init__.py
@@ -14,3 +14,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+from __future__ import absolute_import
diff --git a/sdks/python/apache_beam/runners/job/manager.py 
b/sdks/python/apache_beam/runners/job/manager.py
index 4d88a1189f2..57901668127 100644
--- a/sdks/python/apache_beam/runners/job/manager.py
+++ b/sdks/python/apache_beam/runners/job/manager.py
@@ -18,9 +18,12 @@
 """A object to control to the Job API Co-Process
 """
 
+from __future__ import absolute_import
+
 import logging
 import subprocess
 import time
+from builtins import object
 
 import grpc
 
diff --git a/sdks/python/apache_beam/runners/job/utils.py 
b/sdks/python/apache_beam/runners/job/utils.py
index 84c727fb4eb..5a247cd2729 100644
--- a/sdks/python/apache_beam/runners/job/utils.py
+++ b/sdks/python/apache_beam/runners/job/utils.py
@@ -18,6 +18,8 @@
 """Utility functions for efficiently processing with the job API
 """
 
+from __future__ import absolute_import
+
 import json
 
 from google.protobuf import json_format
diff --git a/sdks/python/apache_beam/runners/pipeline_context.py 
b/sdks/python/apache_beam/runners/pipeline_context.py
index c2ea4ccb334..511dbd915f5 100644
--- a/sdks/python/apache_beam/runners/pipeline_context.py
+++ b/sdks/python/apache_beam/runners/pipeline_context.py
@@ -20,6 +20,9 @@
 For internal use only; no backwards-compatibility guarantees.
 """
 
+from __future__ import absolute_import
+
+from builtins import object
 
 from apache_beam import coders
 from apache_beam import pipeline
diff --git a/sdks/python/apache_beam/runners/pipeline_context_test.py 
b/sdks/python/apache_beam/runners/pipeline_context_test.py
index 6091ed8963e..1e9456a2d3f 100644
--- a/sdks/python/apache_beam/runners/pipeline_context_test.py
+++ b/sdks/python/apache_beam/runners/pipeline_context_test.py
@@ -17,6 +17,8 @@
 
 """Unit tests for the windowing classes."""
 
+from __future__ import absolute_import
+
 import unittest
 
 from apache_beam import coders
diff --git a/sdks/python/apache_beam/runners/portability/__init__.py 
b/sdks/python/apache_beam/runners/portability/__init__.py
index 7af93ed945f..d247cadd0da 100644
--- a/sdks/python/apache_beam/runners/portability/__init__.py
+++ b/sdks/python/apache_beam/runners/portability/__init__.py
@@ -16,3 +16,5 @@
 #
 
 """This runner is experimental; no backwards-compatibility guarantees."""
+
+from __future__ import absolute_import
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index b807eae9d1c..448c8793d66 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -17,16 +17,20 @@
 
 """A PipelineRunner using the SDK harness.
 """
+from __future__ import absolute_import
+
 import collections
 import contextlib
 import copy
 import logging
-import Queue as queue
+import queue
 import threading
 import time
+from builtins import object
 from concurrent import futures
 
 import grpc
+from future import standard_library
 
 import apache_beam as beam  # pylint: disable=ungrouped-imports
 from apache_beam import coders
@@ -52,6 +56,7 @@
 from apache_beam.transforms.window import GlobalWindows
 from apache_beam.utils import proto_utils
 
+standard_library.install_aliases()
 # This module is experimental. No backwards-compatibility guarantees.
 
 
@@ -297,7 +302,7 @@ def deduplicate_read(self):
         new_transforms = []
         for transform in self.transforms:
           if transform.spec.urn == bundle_processor.DATA_INPUT_URN:
-            pcoll = only_element(transform.outputs.items())[1]
+            pcoll = only_element(list(transform.outputs.items()))[1]
             if pcoll in seen_pcolls:
               continue
             seen_pcolls.add(pcoll)
@@ -408,9 +413,9 @@ def windowed_coder_id(coder_id):
               transform.spec.payload, beam_runner_api_pb2.CombinePayload)
 
           input_pcoll = pipeline_components.pcollections[only_element(
-              transform.inputs.values())]
+              list(transform.inputs.values()))]
           output_pcoll = pipeline_components.pcollections[only_element(
-              transform.outputs.values())]
+              list(transform.outputs.values()))]
 
           windowed_input_coder = pipeline_components.coders[
               input_pcoll.coder_id]
@@ -579,7 +584,7 @@ def sink_flattens(stages):
         if transform.spec.urn == common_urns.primitives.FLATTEN.urn:
           # This is used later to correlate the read and writes.
           param = str("materialize:%s" % transform.unique_name)
-          output_pcoll_id, = transform.outputs.values()
+          output_pcoll_id, = list(transform.outputs.values())
           output_coder_id = pcollections[output_pcoll_id].coder_id
           flatten_writes = []
           for local_in, pcoll_in in transform.inputs.items():
@@ -766,7 +771,7 @@ def fuse(producer, consumer):
       # Everything that was originally a stage or a replacement, but wasn't
       # replaced, should be in the final graph.
       final_stages = frozenset(stages).union(replacements.values()).difference(
-          replacements.keys())
+          list(replacements.keys()))
 
       for stage in final_stages:
         # Update all references to their final values before throwing
@@ -948,8 +953,8 @@ def get_buffer(pcoll_id):
           original_gbk_transform = pcoll_id.split(':', 1)[1]
           transform_proto = pipeline_components.transforms[
               original_gbk_transform]
-          input_pcoll = only_element(transform_proto.inputs.values())
-          output_pcoll = only_element(transform_proto.outputs.values())
+          input_pcoll = only_element(list(transform_proto.inputs.values()))
+          output_pcoll = only_element(list(transform_proto.outputs.values()))
           pre_gbk_coder = context.coders[safe_coders[
               pipeline_components.pcollections[input_pcoll].coder_id]]
           post_gbk_coder = context.coders[safe_coders[
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index 63af2847b56..6544c9d477a 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -14,6 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+from __future__ import absolute_import
 from __future__ import print_function
 
 import functools
@@ -23,6 +24,7 @@
 import time
 import traceback
 import unittest
+from builtins import range
 
 import apache_beam as beam
 from apache_beam.metrics.execution import MetricKey
@@ -152,7 +154,7 @@ def cross_product(elem, sides):
   def test_pardo_windowed_side_inputs(self):
     with self.create_pipeline() as p:
       # Now with some windowing.
-      pcoll = p | beam.Create(range(10)) | beam.Map(
+      pcoll = p | beam.Create(list(range(10))) | beam.Map(
           lambda t: window.TimestampedValue(t, t))
       # Intentionally choosing non-aligned windows to highlight the transition.
       main = pcoll | 'WindowMain' >> beam.WindowInto(window.FixedWindows(5))
@@ -163,17 +165,17 @@ def test_pardo_windowed_side_inputs(self):
           res,
           equal_to([
               # The window [0, 5) maps to the window [0, 7).
-              (0, range(7)),
-              (1, range(7)),
-              (2, range(7)),
-              (3, range(7)),
-              (4, range(7)),
+              (0, list(range(7))),
+              (1, list(range(7))),
+              (2, list(range(7))),
+              (3, list(range(7))),
+              (4, list(range(7))),
               # The window [5, 10) maps to the window [7, 14).
-              (5, range(7, 10)),
-              (6, range(7, 10)),
-              (7, range(7, 10)),
-              (8, range(7, 10)),
-              (9, range(7, 10))]),
+              (5, list(range(7, 10))),
+              (6, list(range(7, 10))),
+              (7, list(range(7, 10))),
+              (8, list(range(7, 10))),
+              (9, list(range(7, 10)))]),
           label='windowed')
 
   def test_flattened_side_input(self):
@@ -374,7 +376,7 @@ def test_progress_metrics(self):
     res.wait_until_finish()
     try:
       self.assertEqual(2, len(res._metrics_by_stage))
-      pregbk_metrics, postgbk_metrics = res._metrics_by_stage.values()
+      pregbk_metrics, postgbk_metrics = list(res._metrics_by_stage.values())
       if 'Create/Read' not in pregbk_metrics.ptransforms:
         # The metrics above are actually unordered. Swap.
         pregbk_metrics, postgbk_metrics = postgbk_metrics, pregbk_metrics
@@ -398,7 +400,7 @@ def test_progress_metrics(self):
 
       # The actual stage name ends up being something like 'm_out/lamdbda...'
       m_out, = [
-          metrics for name, metrics in postgbk_metrics.ptransforms.items()
+          metrics for name, metrics in 
list(postgbk_metrics.ptransforms.items())
           if name.startswith('m_out')]
       self.assertEqual(
           5,
diff --git a/sdks/python/apache_beam/runners/portability/local_job_service.py 
b/sdks/python/apache_beam/runners/portability/local_job_service.py
index 962c2b466e5..7b07768987d 100644
--- a/sdks/python/apache_beam/runners/portability/local_job_service.py
+++ b/sdks/python/apache_beam/runners/portability/local_job_service.py
@@ -14,18 +14,22 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+from __future__ import absolute_import
+
 import functools
 import logging
 import os
-import Queue as queue
+import queue as queue
 import subprocess
 import threading
 import time
 import traceback
 import uuid
+from builtins import object
 from concurrent import futures
 
 import grpc
+from future import standard_library
 from google.protobuf import text_format
 
 from apache_beam.portability.api import beam_fn_api_pb2_grpc
@@ -34,6 +38,8 @@
 from apache_beam.portability.api import endpoints_pb2
 from apache_beam.runners.portability import fn_api_runner
 
+standard_library.install_aliases()
+
 TERMINAL_STATES = [
     beam_job_api_pb2.JobState.DONE,
     beam_job_api_pb2.JobState.STOPPED,
diff --git 
a/sdks/python/apache_beam/runners/portability/local_job_service_main.py 
b/sdks/python/apache_beam/runners/portability/local_job_service_main.py
index b6f2ef9710c..dc70f455688 100644
--- a/sdks/python/apache_beam/runners/portability/local_job_service_main.py
+++ b/sdks/python/apache_beam/runners/portability/local_job_service_main.py
@@ -15,6 +15,8 @@
 # limitations under the License.
 #
 
+from __future__ import absolute_import
+
 import argparse
 import logging
 import sys
diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py 
b/sdks/python/apache_beam/runners/portability/portable_runner.py
index 295b985bfe0..a22c6069da3 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner.py
@@ -15,6 +15,8 @@
 # limitations under the License.
 #
 
+from __future__ import absolute_import
+
 import logging
 import os
 import threading
diff --git 
a/sdks/python/apache_beam/runners/portability/portable_runner_test.py 
b/sdks/python/apache_beam/runners/portability/portable_runner_test.py
index 3e680a35b01..88aa0e18805 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner_test.py
@@ -14,6 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+from __future__ import absolute_import
 from __future__ import print_function
 
 import logging
diff --git a/sdks/python/apache_beam/runners/portability/stager.py 
b/sdks/python/apache_beam/runners/portability/stager.py
index a6bfa497bd5..4b10de47e22 100644
--- a/sdks/python/apache_beam/runners/portability/stager.py
+++ b/sdks/python/apache_beam/runners/portability/stager.py
@@ -44,6 +44,8 @@
 TODO(silviuc): Should we allow several setup packages?
 TODO(silviuc): We should allow customizing the exact command for setup build.
 """
+from __future__ import absolute_import
+
 import glob
 import logging
 import os
diff --git a/sdks/python/apache_beam/runners/portability/stager_test.py 
b/sdks/python/apache_beam/runners/portability/stager_test.py
index 56b57d16796..17318f504d4 100644
--- a/sdks/python/apache_beam/runners/portability/stager_test.py
+++ b/sdks/python/apache_beam/runners/portability/stager_test.py
@@ -16,6 +16,8 @@
 #
 """Unit tests for the stager module."""
 
+from __future__ import absolute_import
+
 import logging
 import os
 import shutil
diff --git a/sdks/python/apache_beam/runners/runner.py 
b/sdks/python/apache_beam/runners/runner.py
index b3691722540..15cc1de5779 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -24,6 +24,7 @@
 import shelve
 import shutil
 import tempfile
+from builtins import object
 
 __all__ = ['PipelineRunner', 'PipelineState', 'PipelineResult']
 
diff --git a/sdks/python/apache_beam/runners/runner_test.py 
b/sdks/python/apache_beam/runners/runner_test.py
index e3962f89b48..196a6d91858 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -22,6 +22,8 @@
 caching and clearing values that are not tested elsewhere.
 """
 
+from __future__ import absolute_import
+
 import unittest
 
 import hamcrest as hc
diff --git a/sdks/python/apache_beam/runners/sdf_common.py 
b/sdks/python/apache_beam/runners/sdf_common.py
index 5b3554460d2..e0573289f3b 100644
--- a/sdks/python/apache_beam/runners/sdf_common.py
+++ b/sdks/python/apache_beam/runners/sdf_common.py
@@ -17,7 +17,10 @@
 
 """This module contains Splittable DoFn logic that's common to all runners."""
 
+from __future__ import absolute_import
+
 import uuid
+from builtins import object
 
 import apache_beam as beam
 from apache_beam import pvalue
diff --git a/sdks/python/apache_beam/runners/test/__init__.py 
b/sdks/python/apache_beam/runners/test/__init__.py
index 6cad4d88550..a52ea6e84df 100644
--- a/sdks/python/apache_beam/runners/test/__init__.py
+++ b/sdks/python/apache_beam/runners/test/__init__.py
@@ -23,6 +23,8 @@
 
 # Protect against environments where dataflow runner is not available.
 # pylint: disable=wrong-import-order, wrong-import-position
+from __future__ import absolute_import
+
 try:
   from apache_beam.runners.dataflow.test_dataflow_runner import 
TestDataflowRunner
 except ImportError:
diff --git a/sdks/python/apache_beam/runners/worker/__init__.py 
b/sdks/python/apache_beam/runners/worker/__init__.py
index 0bce5d68f72..9fbf21557df 100644
--- a/sdks/python/apache_beam/runners/worker/__init__.py
+++ b/sdks/python/apache_beam/runners/worker/__init__.py
@@ -16,3 +16,4 @@
 #
 
 """For internal use only; no backwards-compatibility guarantees."""
+from __future__ import absolute_import
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py 
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 7727c87c4c7..47eb4e0f968 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -26,6 +26,10 @@
 import json
 import logging
 import re
+from builtins import next
+from builtins import object
+
+from future.utils import itervalues
 
 import apache_beam as beam
 from apache_beam.coders import WindowedValueCoder
@@ -103,7 +107,7 @@ def __init__(self, operation_name, step_name, consumers, 
counter_factory,
     # We must do this manually as we don't have a spec or spec.output_coders.
     self.receivers = [
         operations.ConsumerSet(self.counter_factory, self.step_name, 0,
-                               next(consumers.itervalues()),
+                               next(itervalues(consumers)),
                                self.windowed_coder)]
 
   def process(self, windowed_value):
@@ -315,7 +319,7 @@ def _fix_output_tags(self, transform_id, metrics):
     # However, if there is exactly one output, we can fix up the name here.
     def fix_only_output_tag(actual_output_tag, mapping):
       if len(mapping) == 1:
-        fake_output_tag, count = only_element(mapping.items())
+        fake_output_tag, count = only_element(list(mapping.items()))
         if fake_output_tag != actual_output_tag:
           del mapping[fake_output_tag]
           mapping[actual_output_tag] = count
@@ -394,7 +398,7 @@ def get_input_coders(self, transform_proto):
     }
 
   def get_only_input_coder(self, transform_proto):
-    return only_element(self.get_input_coders(transform_proto).values())
+    return only_element(list(self.get_input_coders(transform_proto).values()))
 
   # TODO(robertwb): Update all operations to take these in the constructor.
   @staticmethod
@@ -411,7 +415,7 @@ def augment_oldstyle_op(op, step_name, consumers, 
tag_list=None):
 def create(factory, transform_id, transform_proto, grpc_port, consumers):
   target = beam_fn_api_pb2.Target(
       primitive_transform_reference=transform_id,
-      name=only_element(transform_proto.outputs.keys()))
+      name=only_element(list(transform_proto.outputs.keys())))
   return DataInputOperation(
       transform_proto.unique_name,
       transform_proto.unique_name,
@@ -428,7 +432,7 @@ def create(factory, transform_id, transform_proto, 
grpc_port, consumers):
 def create(factory, transform_id, transform_proto, grpc_port, consumers):
   target = beam_fn_api_pb2.Target(
       primitive_transform_reference=transform_id,
-      name=only_element(transform_proto.inputs.keys()))
+      name=only_element(list(transform_proto.inputs.keys())))
   return DataOutputOperation(
       transform_proto.unique_name,
       transform_proto.unique_name,
diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py 
b/sdks/python/apache_beam/runners/worker/data_plane.py
index 1ff60aacb9a..bb3cc2a85bb 100644
--- a/sdks/python/apache_beam/runners/worker/data_plane.py
+++ b/sdks/python/apache_beam/runners/worker/data_plane.py
@@ -24,18 +24,24 @@
 import abc
 import collections
 import logging
-import Queue as queue
+import queue
 import sys
 import threading
+from builtins import object
+from builtins import range
 
 import grpc
-import six
+from future import standard_library
+from future.utils import raise_
+from future.utils import with_metaclass
 
 from apache_beam.coders import coder_impl
 from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.portability.api import beam_fn_api_pb2_grpc
 from apache_beam.runners.worker.worker_id_interceptor import 
WorkerIdInterceptor
 
+standard_library.install_aliases()
+
 # This module is experimental. No backwards-compatibility guarantees.
 
 
@@ -51,7 +57,7 @@ def close(self):
       self._close_callback(self.get())
 
 
-class DataChannel(object):
+class DataChannel(with_metaclass(abc.ABCMeta, object)):
   """Represents a channel for reading and writing data over the data plane.
 
   Read from this channel with the input_elements method::
@@ -70,8 +76,6 @@ class DataChannel(object):
     data_channel.close()
   """
 
-  __metaclass__ = abc.ABCMeta
-
   @abc.abstractmethod
   def input_elements(self, instruction_id, expected_targets):
     """Returns an iterable of all Element.Data bundles for instruction_id.
@@ -185,7 +189,7 @@ def input_elements(self, instruction_id, expected_targets):
         except queue.Empty:
           if self._exc_info:
             t, v, tb = self._exc_info
-            six.reraise(t, v, tb)
+            raise_(t, v, tb)
         else:
           if not data.data and data.target in expected_targets:
             done_targets.append(data.target)
@@ -273,11 +277,9 @@ def Data(self, elements_iterator, context):
       yield elements
 
 
-class DataChannelFactory(object):
+class DataChannelFactory(with_metaclass(abc.ABCMeta, object)):
   """An abstract factory for creating ``DataChannel``."""
 
-  __metaclass__ = abc.ABCMeta
-
   @abc.abstractmethod
   def create_data_channel(self, remote_grpc_port):
     """Returns a ``DataChannel`` from the given RemoteGrpcPort."""
diff --git a/sdks/python/apache_beam/runners/worker/data_plane_test.py 
b/sdks/python/apache_beam/runners/worker/data_plane_test.py
index b2cefbe1469..4e9a79ff1ed 100644
--- a/sdks/python/apache_beam/runners/worker/data_plane_test.py
+++ b/sdks/python/apache_beam/runners/worker/data_plane_test.py
@@ -28,12 +28,15 @@
 from concurrent import futures
 
 import grpc
-import six
+from future import standard_library
+from future.utils import raise_
 
 from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.portability.api import beam_fn_api_pb2_grpc
 from apache_beam.runners.worker import data_plane
 
+standard_library.install_aliases()
+
 
 def timeout(timeout_secs):
   def decorate(fn):
@@ -51,7 +54,7 @@ def call_fn():
       thread.join(timeout_secs)
       if exc_info:
         t, v, tb = exc_info  # pylint: disable=unbalanced-tuple-unpacking
-        six.reraise(t, v, tb)
+        raise_(t, v, tb)
       assert not thread.is_alive(), 'timed out after %s seconds' % timeout_secs
     return wrapper
   return decorate
diff --git a/sdks/python/apache_beam/runners/worker/log_handler.py 
b/sdks/python/apache_beam/runners/worker/log_handler.py
index 152659e0a3f..02bea3e2cdb 100644
--- a/sdks/python/apache_beam/runners/worker/log_handler.py
+++ b/sdks/python/apache_beam/runners/worker/log_handler.py
@@ -16,17 +16,23 @@
 #
 """Beam fn API log handler."""
 
+from __future__ import absolute_import
+
 import logging
 import math
-import Queue as queue
+import queue
 import threading
+from builtins import range
 
 import grpc
+from future import standard_library
 
 from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.portability.api import beam_fn_api_pb2_grpc
 from apache_beam.runners.worker.worker_id_interceptor import 
WorkerIdInterceptor
 
+standard_library.install_aliases()
+
 # This module is experimental. No backwards-compatibility guarantees.
 
 
diff --git a/sdks/python/apache_beam/runners/worker/log_handler_test.py 
b/sdks/python/apache_beam/runners/worker/log_handler_test.py
index eb5045aeeb5..ab042aa4ba2 100644
--- a/sdks/python/apache_beam/runners/worker/log_handler_test.py
+++ b/sdks/python/apache_beam/runners/worker/log_handler_test.py
@@ -15,9 +15,11 @@
 # limitations under the License.
 #
 
+from __future__ import absolute_import
 
 import logging
 import unittest
+from builtins import range
 from concurrent import futures
 
 import grpc
@@ -101,7 +103,7 @@ def _create_test(name, num_logs):
           lambda self: self._verify_fn_log_handler(num_logs))
 
 
-for test_name, num_logs_entries in data.iteritems():
+for test_name, num_logs_entries in data.items():
   _create_test(test_name, num_logs_entries)
 
 
diff --git a/sdks/python/apache_beam/runners/worker/logger.py 
b/sdks/python/apache_beam/runners/worker/logger.py
index 043353807a3..07cd320ff82 100644
--- a/sdks/python/apache_beam/runners/worker/logger.py
+++ b/sdks/python/apache_beam/runners/worker/logger.py
@@ -15,8 +15,12 @@
 # limitations under the License.
 #
 
+# cython: language_level=3
+
 """Python worker logging."""
 
+from __future__ import absolute_import
+
 import json
 import logging
 import threading
diff --git a/sdks/python/apache_beam/runners/worker/logger_test.py 
b/sdks/python/apache_beam/runners/worker/logger_test.py
index cf3f6929282..73ec1aa3ad9 100644
--- a/sdks/python/apache_beam/runners/worker/logger_test.py
+++ b/sdks/python/apache_beam/runners/worker/logger_test.py
@@ -17,11 +17,14 @@
 
 """Tests for worker logging utilities."""
 
+from __future__ import absolute_import
+
 import json
 import logging
 import sys
 import threading
 import unittest
+from builtins import object
 
 from apache_beam.runners.worker import logger
 
@@ -83,7 +86,7 @@ def create_log_record(self, **kwargs):
     class Record(object):
 
       def __init__(self, **kwargs):
-        for k, v in kwargs.iteritems():
+        for k, v in kwargs.items():
           setattr(self, k, v)
 
     return Record(**kwargs)
diff --git a/sdks/python/apache_beam/runners/worker/opcounters.py 
b/sdks/python/apache_beam/runners/worker/opcounters.py
index 0e4ee0a05dc..cdbb27a0a77 100644
--- a/sdks/python/apache_beam/runners/worker/opcounters.py
+++ b/sdks/python/apache_beam/runners/worker/opcounters.py
@@ -15,14 +15,18 @@
 # limitations under the License.
 #
 
+# cython: language_level=3
 # cython: profile=True
 
 """Counters collect the progress of the Worker for reporting to the service."""
 
 from __future__ import absolute_import
+from __future__ import division
 
 import math
 import random
+from builtins import hex
+from builtins import object
 
 from apache_beam.utils import counters
 from apache_beam.utils.counters import Counter
@@ -229,7 +233,7 @@ def update_collect(self):
 
   def _compute_next_sample(self, i):
     # https://en.wikipedia.org/wiki/Reservoir_sampling#Fast_Approximation
-    gap = math.log(1.0 - random.random()) / math.log(1.0 - 10.0/i)
+    gap = math.log(1.0 - random.random()) / math.log(1.0 - (10.0 / i))
     return i + math.floor(gap)
 
   def _should_sample(self):
diff --git a/sdks/python/apache_beam/runners/worker/opcounters_test.py 
b/sdks/python/apache_beam/runners/worker/opcounters_test.py
index 41c80e87c4f..3987311112c 100644
--- a/sdks/python/apache_beam/runners/worker/opcounters_test.py
+++ b/sdks/python/apache_beam/runners/worker/opcounters_test.py
@@ -15,10 +15,15 @@
 # limitations under the License.
 #
 
+from __future__ import absolute_import
+from __future__ import division
+
 import logging
 import math
 import random
 import unittest
+from builtins import object
+from builtins import range
 
 from apache_beam import coders
 from apache_beam.runners.worker import opcounters
@@ -32,7 +37,7 @@
 # These have to be at top level so the pickler can find them.
 
 
-class OldClassThatDoesNotImplementLen:  # pylint: disable=old-style-class
+class OldClassThatDoesNotImplementLen(object):  # pylint: 
disable=old-style-class
 
   def __init__(self):
     pass
@@ -149,11 +154,11 @@ def test_update_multiple(self):
     value = GlobalWindows.windowed_value('defghij')
     opcounts.update_from(value)
     total_size += coder.estimate_size(value)
-    self.verify_counters(opcounts, 2, float(total_size) / 2)
+    self.verify_counters(opcounts, 2, (float(total_size) / 2))
     value = GlobalWindows.windowed_value('klmnop')
     opcounts.update_from(value)
     total_size += coder.estimate_size(value)
-    self.verify_counters(opcounts, 3, float(total_size) / 3)
+    self.verify_counters(opcounts, 3, (float(total_size) / 3))
 
   def test_should_sample(self):
     # Order of magnitude more buckets than highest constant in code under test.
diff --git a/sdks/python/apache_beam/runners/worker/operation_specs.py 
b/sdks/python/apache_beam/runners/worker/operation_specs.py
index d38bc7788fa..62c5bbd0117 100644
--- a/sdks/python/apache_beam/runners/worker/operation_specs.py
+++ b/sdks/python/apache_beam/runners/worker/operation_specs.py
@@ -21,6 +21,8 @@
 source, write to a sink, parallel do, etc.
 """
 
+from __future__ import absolute_import
+
 import collections
 
 from apache_beam import coders
diff --git a/sdks/python/apache_beam/runners/worker/operations.py 
b/sdks/python/apache_beam/runners/worker/operations.py
index 9aa29b87aa0..f34bcb7db7d 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -15,13 +15,18 @@
 # limitations under the License.
 #
 
+# cython: language_level=3
 # cython: profile=True
 
 """Worker operations executor."""
 
+from __future__ import absolute_import
+
 import collections
-import itertools
 import logging
+from builtins import filter
+from builtins import object
+from builtins import zip
 
 from apache_beam import pvalue
 from apache_beam.internal import pickler
@@ -318,7 +323,7 @@ def _read_side_inputs(self, tags_and_types):
       # while the variable has the value assigned by the current iteration of
       # the for loop.
       # pylint: disable=cell-var-from-loop
-      for si in itertools.ifilter(
+      for si in filter(
           lambda o: o.tag == side_tag, self.spec.side_inputs):
         if not isinstance(si, operation_specs.WorkerSideInputSource):
           raise NotImplementedError('Unknown side input type: %r' % si)
@@ -542,7 +547,7 @@ def process(self, wkv):
           target = self.key_count * 9 // 10
           old_wkeys = []
           # TODO(robertwb): Use an LRU cache?
-          for old_wkey, old_wvalue in self.table.iteritems():
+          for old_wkey, old_wvalue in self.table.items():
             old_wkeys.append(old_wkey)  # Can't mutate while iterating.
             self.output_key(old_wkey, old_wvalue[0])
             self.key_count -= 1
@@ -557,7 +562,7 @@ def process(self, wkv):
       entry[0] = self.combine_fn_add_input(entry[0], value)
 
   def finish(self):
-    for wkey, value in self.table.iteritems():
+    for wkey, value in self.table.items():
       self.output_key(wkey, value[0])
     self.table = {}
     self.key_count = 0
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py 
b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index b8fa422536b..8e94e952d68 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -23,14 +23,18 @@
 import abc
 import contextlib
 import logging
-import Queue as queue
+import queue
 import sys
 import threading
 import traceback
+from builtins import object
+from builtins import range
 from concurrent import futures
 
 import grpc
-import six
+from future import standard_library
+from future.utils import raise_
+from future.utils import with_metaclass
 
 from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.portability.api import beam_fn_api_pb2_grpc
@@ -38,6 +42,8 @@
 from apache_beam.runners.worker import data_plane
 from apache_beam.runners.worker.worker_id_interceptor import 
WorkerIdInterceptor
 
+standard_library.install_aliases()
+
 
 class SdkHarness(object):
   REQUEST_METHOD_PREFIX = '_request_'
@@ -176,7 +182,7 @@ def _request_process_bundle_progress(self, request):
     def task():
       instruction_reference = getattr(
           request, request.WhichOneof('request')).instruction_reference
-      if self._instruction_id_vs_worker.has_key(instruction_reference):
+      if instruction_reference in self._instruction_id_vs_worker:
         self._execute(
             lambda: self._instruction_id_vs_worker[
                 instruction_reference
@@ -245,11 +251,9 @@ def process_bundle_progress(self, request, instruction_id):
             metrics=processor.metrics() if processor else None))
 
 
-class StateHandlerFactory(object):
+class StateHandlerFactory(with_metaclass(abc.ABCMeta, object)):
   """An abstract factory for creating ``DataChannel``."""
 
-  __metaclass__ = abc.ABCMeta
-
   @abc.abstractmethod
   def create_state_handler(self, api_service_descriptor):
     """Returns a ``StateHandler`` from the given ApiServiceDescriptor."""
@@ -407,7 +411,7 @@ def _blocking_request(self, request):
     while not future.wait(timeout=1):
       if self._exc_info:
         t, v, tb = self._exc_info
-        six.reraise(t, v, tb)
+        raise_(t, v, tb)
       elif self._done:
         raise RuntimeError()
     del self._responses_by_id[request.id]
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py 
b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
index 46670e88964..d05217ef9b8 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
@@ -16,7 +16,9 @@
 #
 """SDK Fn Harness entry point."""
 
-import BaseHTTPServer
+from __future__ import absolute_import
+
+import http.server
 import json
 import logging
 import os
@@ -24,7 +26,9 @@
 import sys
 import threading
 import traceback
+from builtins import object
 
+from future import standard_library
 from google.protobuf import text_format
 
 from apache_beam.internal import pickler
@@ -33,6 +37,8 @@
 from apache_beam.runners.worker.log_handler import FnApiLogRecordHandler
 from apache_beam.runners.worker.sdk_worker import SdkHarness
 
+standard_library.install_aliases()
+
 # This module is experimental. No backwards-compatibility guarantees.
 
 
@@ -57,7 +63,7 @@ def start(self, status_http_port=0):
         Default is 0 which means any free unsecured port
     """
 
-    class StatusHttpHandler(BaseHTTPServer.BaseHTTPRequestHandler):
+    class StatusHttpHandler(http.server.BaseHTTPRequestHandler):
       """HTTP handler for serving stacktraces of all threads."""
 
       def do_GET(self):  # pylint: disable=invalid-name
@@ -73,7 +79,7 @@ def log_message(self, f, *args):
         """Do not log any messages."""
         pass
 
-    self.httpd = httpd = BaseHTTPServer.HTTPServer(
+    self.httpd = httpd = http.server.HTTPServer(
         ('localhost', status_http_port), StatusHttpHandler)
     logging.info('Status HTTP server running at %s:%s', httpd.server_name,
                  httpd.server_port)
@@ -157,10 +163,10 @@ def _get_worker_count(pipeline_options):
     an int containing the worker_threads to use. Default is 1
   """
   pipeline_options = pipeline_options.get(
-      'options') if pipeline_options.has_key('options') else {}
+      'options') if 'options' in pipeline_options else {}
   experiments = pipeline_options.get(
       'experiments'
-  ) if pipeline_options and pipeline_options.has_key('experiments') else []
+  ) if pipeline_options and 'experiments' in pipeline_options else []
 
   experiments = experiments if experiments else []
 
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py 
b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
index c229d6450ef..785064a172c 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
@@ -22,6 +22,7 @@
 
 import logging
 import unittest
+from builtins import range
 from concurrent import futures
 
 import grpc
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs.py 
b/sdks/python/apache_beam/runners/worker/sideinputs.py
index 980a2088c7b..d806f9e2025 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs.py
@@ -17,11 +17,17 @@
 
 """Utilities for handling side inputs."""
 
+from __future__ import absolute_import
+
 import collections
 import logging
-import Queue
+import queue
 import threading
 import traceback
+from builtins import object
+from builtins import range
+
+from future import standard_library
 
 from apache_beam.coders import observable
 from apache_beam.io import iobase
@@ -29,6 +35,8 @@
 from apache_beam.runners.worker import opcounters
 from apache_beam.transforms import window
 
+standard_library.install_aliases()
+
 # This module is experimental. No backwards-compatibility guarantees.
 
 
@@ -61,13 +69,13 @@ def __init__(self,
     self.num_reader_threads = min(max_reader_threads, len(self.sources))
 
     # Queue for sources that are to be read.
-    self.sources_queue = Queue.Queue()
+    self.sources_queue = queue.Queue()
     for source in sources:
       self.sources_queue.put(source)
     # Queue for elements that have been read.
-    self.element_queue = Queue.Queue(ELEMENT_QUEUE_SIZE)
+    self.element_queue = queue.Queue(ELEMENT_QUEUE_SIZE)
     # Queue for exceptions encountered in reader threads; to be rethrown.
-    self.reader_exceptions = Queue.Queue()
+    self.reader_exceptions = queue.Queue()
     # Whether we have already iterated; this iterable can only be used once.
     self.already_iterated = False
     # Whether an error was encountered in any source reader.
@@ -134,7 +142,7 @@ def _reader_thread(self):
                   self.element_queue.put(value)
                 else:
                   self.element_queue.put(_globally_windowed(value))
-        except Queue.Empty:
+        except queue.Empty:
           return
     except Exception as e:  # pylint: disable=broad-except
       logging.error('Encountered exception in PrefetchingSourceSetIterable '
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs_test.py 
b/sdks/python/apache_beam/runners/worker/sideinputs_test.py
index c4240dd028c..3a5ff381950 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs_test.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs_test.py
@@ -17,9 +17,13 @@
 
 """Tests for side input utilities."""
 
+from __future__ import absolute_import
+
 import logging
 import time
 import unittest
+from builtins import object
+from builtins import range
 
 import mock
 
@@ -77,7 +81,7 @@ def test_single_source_iterator_fn(self):
     ]
     iterator_fn = sideinputs.get_iterator_fn_for_sources(
         sources, max_reader_threads=2)
-    assert list(strip_windows(iterator_fn())) == range(6)
+    assert list(strip_windows(iterator_fn())) == list(range(6))
 
   def test_bytes_read_behind_experiment(self):
     mock_read_counter = mock.MagicMock()
@@ -115,7 +119,7 @@ def test_multiple_sources_iterator_fn(self):
     ]
     iterator_fn = sideinputs.get_iterator_fn_for_sources(
         sources, max_reader_threads=3)
-    assert sorted(strip_windows(iterator_fn())) == range(11)
+    assert sorted(strip_windows(iterator_fn())) == list(range(11))
 
   def test_multiple_sources_single_reader_iterator_fn(self):
     sources = [
@@ -126,7 +130,7 @@ def test_multiple_sources_single_reader_iterator_fn(self):
     ]
     iterator_fn = sideinputs.get_iterator_fn_for_sources(
         sources, max_reader_threads=1)
-    assert list(strip_windows(iterator_fn())) == range(11)
+    assert list(strip_windows(iterator_fn())) == list(range(11))
 
   def test_source_iterator_single_source_exception(self):
     class MyException(Exception):
@@ -172,7 +176,7 @@ def perpetual_generator(value):
     with self.assertRaises(MyException):
       for value in iterator_fn():
         seen.add(value.value)
-    self.assertEqual(sorted(seen), range(5))
+    self.assertEqual(sorted(seen), list(range(5)))
 
 
 class EmulatedCollectionsTest(unittest.TestCase):
diff --git a/sdks/python/apache_beam/runners/worker/statesampler.py 
b/sdks/python/apache_beam/runners/worker/statesampler.py
index f3916a230de..b0c2b67f9ff 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler.py
+++ b/sdks/python/apache_beam/runners/worker/statesampler.py
@@ -16,6 +16,9 @@
 #
 
 # This module is experimental. No backwards-compatibility guarantees.
+
+from __future__ import absolute_import
+
 import threading
 from collections import namedtuple
 
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_slow.py 
b/sdks/python/apache_beam/runners/worker/statesampler_slow.py
index 59f84f7891f..2f09d0e8bb2 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler_slow.py
+++ b/sdks/python/apache_beam/runners/worker/statesampler_slow.py
@@ -17,6 +17,10 @@
 
 # This module is experimental. No backwards-compatibility guarantees.
 
+from __future__ import absolute_import
+
+from builtins import object
+
 
 class StateSampler(object):
 
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py 
b/sdks/python/apache_beam/runners/worker/statesampler_test.py
index 8b2216951dd..dfb4bc10a91 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler_test.py
+++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py
@@ -17,10 +17,12 @@
 
 """Tests for state sampler."""
 from __future__ import absolute_import
+from __future__ import division
 
 import logging
 import time
 import unittest
+from builtins import range
 
 from apache_beam.runners.worker import statesampler
 from apache_beam.utils.counters import CounterFactory
diff --git a/sdks/python/apache_beam/runners/worker/worker_id_interceptor.py 
b/sdks/python/apache_beam/runners/worker/worker_id_interceptor.py
index 0a71292f773..f2ca4e79f72 100644
--- a/sdks/python/apache_beam/runners/worker/worker_id_interceptor.py
+++ b/sdks/python/apache_beam/runners/worker/worker_id_interceptor.py
@@ -39,8 +39,7 @@ class WorkerIdInterceptor(grpc.StreamStreamClientInterceptor):
   # and throw exception in worker_id_interceptor.py after we have rolled out
   # the corresponding container changes.
   # Unique worker Id for this worker.
-  _worker_id = os.environ['WORKER_ID'] if os.environ.has_key(
-      'WORKER_ID') else str(uuid.uuid4())
+  _worker_id = os.environ.get('WORKER_ID', str(uuid.uuid4()))
 
   def __init__(self):
     pass
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index c80cefd24cb..01d5f03eaa7 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -100,6 +100,7 @@ deps =
   flake8==3.5.0
 modules =
   apache_beam/coders
+  apache_beam/runners
   apache_beam/examples
   apache_beam/portability
   apache_beam/internal


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 119886)
    Time Spent: 8h 50m  (was: 8h 40m)

> Futurize and fix python 2 compatibility for runners subpackage
> --------------------------------------------------------------
>
>                 Key: BEAM-4003
>                 URL: https://issues.apache.org/jira/browse/BEAM-4003
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Robbe
>            Assignee: Matthias Feys
>            Priority: Major
>          Time Spent: 8h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to