[ 
https://issues.apache.org/jira/browse/BEAM-4006?focusedWorklogId=131066&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-131066
 ]

ASF GitHub Bot logged work on BEAM-4006:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/Aug/18 20:47
            Start Date: 03/Aug/18 20:47
    Worklog Time Spent: 10m 
      Work Description: aaltay closed pull request #5729: [BEAM-4006] Futurize 
transforms subpackage
URL: https://github.com/apache/beam/pull/5729
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/pipeline_test.py 
b/sdks/python/apache_beam/pipeline_test.py
index 72791fc262b..9d4e79a9a9a 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -520,11 +520,11 @@ def test_dir(self):
     options = Breakfast()
     self.assertEquals(
         set(['from_dictionary', 'get_all_options', 'slices', 'style',
-             'view_as', 'display_data']),
+             'view_as', 'display_data', 'next']),
         set([attr for attr in dir(options) if not attr.startswith('_')]))
     self.assertEquals(
         set(['from_dictionary', 'get_all_options', 'style', 'view_as',
-             'display_data']),
+             'display_data', 'next']),
         set([attr for attr in dir(options.view_as(Eggs))
              if not attr.startswith('_')]))
 
diff --git a/sdks/python/apache_beam/transforms/__init__.py 
b/sdks/python/apache_beam/transforms/__init__.py
index 3c04b370cfe..a207009f871 100644
--- a/sdks/python/apache_beam/transforms/__init__.py
+++ b/sdks/python/apache_beam/transforms/__init__.py
@@ -18,6 +18,8 @@
 """PTransform and descendants."""
 
 # pylint: disable=wildcard-import
+from __future__ import absolute_import
+
 from apache_beam.transforms import combiners
 from apache_beam.transforms.core import *
 from apache_beam.transforms.ptransform import *
diff --git a/sdks/python/apache_beam/transforms/combiners.py 
b/sdks/python/apache_beam/transforms/combiners.py
index 9b0c0e81e35..8db0fe5e14f 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -18,9 +18,12 @@
 """A library of basic combiner PTransform subclasses."""
 
 from __future__ import absolute_import
+from __future__ import division
 
 import operator
 import random
+from builtins import object
+from builtins import zip
 
 from past.builtins import long
 
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py 
b/sdks/python/apache_beam/transforms/combiners_test.py
index f372e881024..a768231ec6e 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -16,12 +16,15 @@
 #
 
 """Unit tests for our libraries of combine PTransforms."""
+from __future__ import absolute_import
+from __future__ import division
 
 import itertools
 import random
 import unittest
 
 import hamcrest as hc
+from future.builtins import range
 
 import apache_beam as beam
 import apache_beam.transforms.combiners as combine
@@ -286,7 +289,7 @@ def match(actual):
     def matcher():
       def match(actual):
         equal_to([1])([len(actual)])
-        equal_to(pairs)(actual[0].iteritems())
+        equal_to(pairs)(actual[0].items())
       return match
     assert_that(result, matcher())
     pipeline.run()
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index bbd78342a7f..fa867e5231d 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -21,12 +21,15 @@
 
 import copy
 import inspect
-import itertools
 import random
 import re
 import types
+from builtins import map
+from builtins import object
+from builtins import range
 
-from six import string_types
+from future.builtins import filter
+from past.builtins import unicode
 
 from apache_beam import coders
 from apache_beam import pvalue
@@ -82,7 +85,6 @@
     'Impulse',
     ]
 
-
 # Type variables
 T = typehints.TypeVariable('T')
 K = typehints.TypeVariable('K')
@@ -291,6 +293,9 @@ def __eq__(self, other):
       return self.param_id == other.param_id
     return False
 
+  def __hash__(self):
+    return hash(self.param_id)
+
   def __repr__(self):
     return self.param_id
 
@@ -698,7 +703,7 @@ def merge_accumulators(self, accumulators, *args, **kwargs):
 
     class ReiterableNonEmptyAccumulators(object):
       def __iter__(self):
-        return itertools.ifilter(filter_fn, accumulators)
+        return filter(filter_fn, accumulators)
 
     # It's (weakly) assumed that self._fn is associative.
     return self._fn(ReiterableNonEmptyAccumulators(), *args, **kwargs)
@@ -902,7 +907,8 @@ def with_outputs(self, *tags, **main_kw):
     """
     main_tag = main_kw.pop('main', None)
     if main_kw:
-      raise ValueError('Unexpected keyword arguments: %s' % main_kw.keys())
+      raise ValueError('Unexpected keyword arguments: %s' %
+                       list(main_kw))
     return _MultiParDo(self, tags, main_tag)
 
   def _pardo_fn_data(self):
@@ -1666,7 +1672,6 @@ def expand(self, pcoll):
 
 
 class Windowing(object):
-
   def __init__(self, windowfn, triggerfn=None, accumulation_mode=None,
                timestamp_combiner=None):
     global AccumulationMode, DefaultTrigger  # pylint: 
disable=global-variable-not-assigned
@@ -1712,6 +1717,10 @@ def __eq__(self, other):
           and self.timestamp_combiner == other.timestamp_combiner)
     return False
 
+  def __hash__(self):
+    return hash((self.windowfn, self.accumulation_mode,
+                 self.timestamp_combiner))
+
   def is_default(self):
     return self._is_default
 
@@ -1792,7 +1801,7 @@ def __init__(self, windowfn, **kwargs):
     accumulation_mode = kwargs.pop('accumulation_mode', None)
     timestamp_combiner = kwargs.pop('timestamp_combiner', None)
     if kwargs:
-      raise ValueError('Unexpected keyword arguments: %s' % kwargs.keys())
+      raise ValueError('Unexpected keyword arguments: %s' % list(kwargs))
     self.windowing = Windowing(
         windowfn, triggerfn, accumulation_mode, timestamp_combiner)
     super(WindowInto, self).__init__(self.WindowIntoFn(self.windowing))
@@ -1861,7 +1870,7 @@ def __init__(self, **kwargs):
     super(Flatten, self).__init__()
     self.pipeline = kwargs.pop('pipeline', None)
     if kwargs:
-      raise ValueError('Unexpected keyword arguments: %s' % kwargs.keys())
+      raise ValueError('Unexpected keyword arguments: %s' % list(kwargs))
 
   def _extract_input_pvalues(self, pvalueish):
     try:
@@ -1906,7 +1915,7 @@ def __init__(self, value):
       value: An object of values for the PCollection
     """
     super(Create, self).__init__()
-    if isinstance(value, string_types):
+    if isinstance(value, (unicode, str, bytes)):
       raise TypeError('PTransform Create: Refusing to treat string as '
                       'an iterable. (string=%r)' % value)
     elif isinstance(value, dict):
@@ -1941,7 +1950,7 @@ def get_windowing(self, unused_inputs):
 
   @staticmethod
   def _create_source_from_iterable(values, coder):
-    return Create._create_source(map(coder.encode, values), coder)
+    return Create._create_source(list(map(coder.encode, values)), coder)
 
   @staticmethod
   def _create_source(serialized_values, coder):
diff --git a/sdks/python/apache_beam/transforms/create_source.py 
b/sdks/python/apache_beam/transforms/create_source.py
index 3d02d39463c..aa26cebc43f 100644
--- a/sdks/python/apache_beam/transforms/create_source.py
+++ b/sdks/python/apache_beam/transforms/create_source.py
@@ -15,6 +15,13 @@
 # limitations under the License.
 #
 
+from __future__ import absolute_import
+from __future__ import division
+
+from builtins import map
+from builtins import next
+from builtins import range
+
 from apache_beam.io import iobase
 from apache_beam.transforms.core import Create
 
@@ -57,15 +64,15 @@ def split(self, desired_bundle_size, start_position=None,
         start_position = 0
       if stop_position is None:
         stop_position = len(self._serialized_values)
-      avg_size_per_value = self._total_size / len(self._serialized_values)
+      avg_size_per_value = self._total_size // len(self._serialized_values)
       num_values_per_split = max(
-          int(desired_bundle_size / avg_size_per_value), 1)
+          int(desired_bundle_size // avg_size_per_value), 1)
       start = start_position
       while start < stop_position:
         end = min(start + num_values_per_split, stop_position)
         remaining = stop_position - end
         # Avoid having a too small bundle at the end.
-        if remaining < (num_values_per_split / 4):
+        if remaining < (num_values_per_split // 4):
           end = stop_position
         sub_source = Create._create_source(
             self._serialized_values[start:end], self._coder)
diff --git a/sdks/python/apache_beam/transforms/create_test.py 
b/sdks/python/apache_beam/transforms/create_test.py
index b5d02acc8b1..ada36725179 100644
--- a/sdks/python/apache_beam/transforms/create_test.py
+++ b/sdks/python/apache_beam/transforms/create_test.py
@@ -16,8 +16,12 @@
 #
 
 """Unit tests for the Create and _CreateSource classes."""
+from __future__ import absolute_import
+from __future__ import division
+
 import logging
 import unittest
+from builtins import range
 
 from apache_beam import Create
 from apache_beam.coders import FastPrimitivesCoder
@@ -33,13 +37,13 @@ def setUp(self):
 
   def test_create_transform(self):
     with TestPipeline() as p:
-      assert_that(p | Create(range(10)), equal_to(range(10)))
+      assert_that(p | Create(list(range(10))), equal_to(list(range(10))))
 
   def test_create_source_read(self):
     self.check_read([], self.coder)
     self.check_read([1], self.coder)
     # multiple values.
-    self.check_read(range(10), self.coder)
+    self.check_read(list(range(10)), self.coder)
 
   def check_read(self, values, coder):
     source = Create._create_source_from_iterable(values, coder)
@@ -49,7 +53,7 @@ def check_read(self, values, coder):
   def test_create_source_read_with_initial_splits(self):
     self.check_read_with_initial_splits([], self.coder, num_splits=2)
     self.check_read_with_initial_splits([1], self.coder, num_splits=2)
-    values = range(8)
+    values = list(range(8))
     # multiple values with a single split.
     self.check_read_with_initial_splits(values, self.coder, num_splits=1)
     # multiple values with a single split with a large desired bundle size
@@ -70,7 +74,7 @@ def check_read_with_initial_splits(self, values, coder, 
num_splits):
     from the split sources.
     """
     source = Create._create_source_from_iterable(values, coder)
-    desired_bundle_size = source._total_size / num_splits
+    desired_bundle_size = source._total_size // num_splits
     splits = source.split(desired_bundle_size)
     splits_info = [
         (split.source, split.start_position, split.stop_position)
diff --git a/sdks/python/apache_beam/transforms/cy_combiners.py 
b/sdks/python/apache_beam/transforms/cy_combiners.py
index 53a440e537e..2234ef98d87 100644
--- a/sdks/python/apache_beam/transforms/cy_combiners.py
+++ b/sdks/python/apache_beam/transforms/cy_combiners.py
@@ -15,12 +15,17 @@
 # limitations under the License.
 #
 
+# cython: language_level=3
+
 """A library of basic cythonized CombineFn subclasses.
 
 For internal use only; no backwards-compatibility guarantees.
 """
 
 from __future__ import absolute_import
+from __future__ import division
+
+from builtins import object
 
 from apache_beam.transforms import core
 
@@ -162,7 +167,7 @@ def extract_output(self):
       self.sum %= 2**64
       if self.sum >= INT64_MAX:
         self.sum -= 2**64
-    return self.sum / self.count if self.count else _NAN
+    return self.sum // self.count if self.count else _NAN
 
 
 class CountCombineFn(AccumulatorCombineFn):
@@ -258,7 +263,7 @@ def merge(self, accumulators):
       self.count += accumulator.count
 
   def extract_output(self):
-    return self.sum / self.count if self.count else _NAN
+    return self.sum // self.count if self.count else _NAN
 
 
 class SumFloatFn(AccumulatorCombineFn):
diff --git 
a/sdks/python/apache_beam/transforms/dataflow_distribution_counter_test.py 
b/sdks/python/apache_beam/transforms/dataflow_distribution_counter_test.py
index e3d3c6e5a5a..91a888a0836 100644
--- a/sdks/python/apache_beam/transforms/dataflow_distribution_counter_test.py
+++ b/sdks/python/apache_beam/transforms/dataflow_distribution_counter_test.py
@@ -14,6 +14,8 @@
 otherwise, test on pure python module
 """
 
+from __future__ import absolute_import
+
 import unittest
 
 from mock import Mock
diff --git a/sdks/python/apache_beam/transforms/display.py 
b/sdks/python/apache_beam/transforms/display.py
index 4206f2110b7..ce10174e00d 100644
--- a/sdks/python/apache_beam/transforms/display.py
+++ b/sdks/python/apache_beam/transforms/display.py
@@ -41,10 +41,11 @@
 import calendar
 import inspect
 import json
+from builtins import object
 from datetime import datetime
 from datetime import timedelta
 
-import six
+from past.builtins import unicode
 
 __all__ = ['HasDisplayData', 'DisplayDataItem', 'DisplayData']
 
@@ -169,7 +170,7 @@ class DisplayDataItem(object):
   display item belongs to.
   """
   typeDict = {str:'STRING',
-              six.text_type:'STRING',
+              unicode:'STRING',
               int:'INTEGER',
               float:'FLOAT',
               bool: 'BOOLEAN',
diff --git a/sdks/python/apache_beam/transforms/display_test.py 
b/sdks/python/apache_beam/transforms/display_test.py
index 90bde8caa8c..bdaade68fa0 100644
--- a/sdks/python/apache_beam/transforms/display_test.py
+++ b/sdks/python/apache_beam/transforms/display_test.py
@@ -24,8 +24,8 @@
 
 # pylint: disable=ungrouped-imports
 import hamcrest as hc
-import six
 from hamcrest.core.base_matcher import BaseMatcher
+from past.builtins import unicode
 
 import apache_beam as beam
 from apache_beam.options.pipeline_options import PipelineOptions
@@ -165,7 +165,7 @@ def test_create_list_display_data(self):
   def test_unicode_type_display_data(self):
     class MyDoFn(beam.DoFn):
       def display_data(self):
-        return {'unicode_string': six.text_type('my string'),
+        return {'unicode_string': unicode('my string'),
                 'unicode_literal_string': u'my literal string'}
 
     fn = MyDoFn()
diff --git a/sdks/python/apache_beam/transforms/ptransform.py 
b/sdks/python/apache_beam/transforms/ptransform.py
index 889372f9266..7a53fbe25b0 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -43,6 +43,9 @@ class and wrapper class that allows lambda functions to be 
used as
 import os
 import sys
 import threading
+from builtins import hex
+from builtins import object
+from builtins import zip
 from functools import reduce
 
 from google.protobuf import message
@@ -622,7 +625,7 @@ def __init__(self, fn, *args, **kwargs):
     super(PTransformWithSideInputs, self).__init__()
 
     if (any([isinstance(v, pvalue.PCollection) for v in args]) or
-        any([isinstance(v, pvalue.PCollection) for v in kwargs.itervalues()])):
+        any([isinstance(v, pvalue.PCollection) for v in kwargs.values()])):
       raise error.SideInputError(
           'PCollection used directly as side input argument. Specify '
           'AsIter(pcollection) or AsSingleton(pcollection) to indicate how the 
'
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py 
b/sdks/python/apache_beam/transforms/ptransform_test.py
index 907ee04c079..c594e6ab28b 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -18,12 +18,16 @@
 """Unit tests for the PTransform and descendants."""
 
 from __future__ import absolute_import
+from __future__ import division
 from __future__ import print_function
 
 import collections
 import operator
 import re
 import unittest
+from builtins import map
+from builtins import range
+from builtins import zip
 from functools import reduce
 
 import hamcrest as hc
@@ -382,7 +386,7 @@ def test_combine_with_combine_fn(self):
     pipeline = TestPipeline()
     pcoll = pipeline | 'Start' >> beam.Create(vals)
     result = pcoll | 'Mean' >> beam.CombineGlobally(self._MeanCombineFn())
-    assert_that(result, equal_to([sum(vals) / len(vals)]))
+    assert_that(result, equal_to([sum(vals) // len(vals)]))
     pipeline.run()
 
   def test_combine_with_callable(self):
@@ -413,8 +417,8 @@ def test_combine_per_key_with_combine_fn(self):
     pcoll = pipeline | 'Start' >> beam.Create(([('a', x) for x in vals_1] +
                                                [('b', x) for x in vals_2]))
     result = pcoll | 'Mean' >> beam.CombinePerKey(self._MeanCombineFn())
-    assert_that(result, equal_to([('a', sum(vals_1) / len(vals_1)),
-                                  ('b', sum(vals_2) / len(vals_2))]))
+    assert_that(result, equal_to([('a', sum(vals_1) // len(vals_1)),
+                                  ('b', sum(vals_2) // len(vals_2))]))
     pipeline.run()
 
   def test_combine_per_key_with_callable(self):
diff --git 
a/sdks/python/apache_beam/transforms/py_dataflow_distribution_counter.py 
b/sdks/python/apache_beam/transforms/py_dataflow_distribution_counter.py
index fc9b4d22a8c..980abab47c4 100644
--- a/sdks/python/apache_beam/transforms/py_dataflow_distribution_counter.py
+++ b/sdks/python/apache_beam/transforms/py_dataflow_distribution_counter.py
@@ -17,6 +17,10 @@
 
 """For internal use only; no backwards-compatibility guarantees."""
 
+from __future__ import absolute_import
+
+from builtins import object
+from builtins import range
 
 globals()['INT64_MAX'] = 2**63 - 1
 globals()['INT64_MIN'] = -2**63
diff --git a/sdks/python/apache_beam/transforms/sideinputs.py 
b/sdks/python/apache_beam/transforms/sideinputs.py
index f10cb92ed5e..21fc919b72d 100644
--- a/sdks/python/apache_beam/transforms/sideinputs.py
+++ b/sdks/python/apache_beam/transforms/sideinputs.py
@@ -26,6 +26,8 @@
 
 from __future__ import absolute_import
 
+from builtins import object
+
 from apache_beam.transforms import window
 
 
diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py 
b/sdks/python/apache_beam/transforms/sideinputs_test.py
index 6b93b8e9137..f9c9ae93d62 100644
--- a/sdks/python/apache_beam/transforms/sideinputs_test.py
+++ b/sdks/python/apache_beam/transforms/sideinputs_test.py
@@ -17,6 +17,8 @@
 
 """Unit tests for side inputs."""
 
+from __future__ import absolute_import
+
 import logging
 import unittest
 
@@ -196,7 +198,7 @@ def match(actual):
         [[actual_elem, actual_list, actual_dict]] = actual
         equal_to([expected_elem])([actual_elem])
         equal_to(expected_list)(actual_list)
-        equal_to(expected_pairs)(actual_dict.iteritems())
+        equal_to(expected_pairs)(actual_dict.items())
       return match
 
     assert_that(results, matcher(1, a_list, some_pairs))
@@ -286,8 +288,8 @@ def  matcher(expected_elem, expected_kvs):
       def match(actual):
         [[actual_elem, actual_dict1, actual_dict2]] = actual
         equal_to([expected_elem])([actual_elem])
-        equal_to(expected_kvs)(actual_dict1.iteritems())
-        equal_to(expected_kvs)(actual_dict2.iteritems())
+        equal_to(expected_kvs)(actual_dict1.items())
+        equal_to(expected_kvs)(actual_dict2.items())
       return match
 
     assert_that(results, matcher(1, some_kvs))
diff --git a/sdks/python/apache_beam/transforms/timeutil.py 
b/sdks/python/apache_beam/transforms/timeutil.py
index 8d63d49baad..bf30a131392 100644
--- a/sdks/python/apache_beam/transforms/timeutil.py
+++ b/sdks/python/apache_beam/transforms/timeutil.py
@@ -21,6 +21,9 @@
 
 from abc import ABCMeta
 from abc import abstractmethod
+from builtins import object
+
+from future.utils import with_metaclass
 
 __all__ = [
     'TimeDomain',
@@ -43,11 +46,9 @@ def from_string(domain):
     raise ValueError('Unknown time domain: %s' % domain)
 
 
-class TimestampCombinerImpl(object):
+class TimestampCombinerImpl(with_metaclass(ABCMeta, object)):
   """Implementation of TimestampCombiner."""
 
-  __metaclass__ = ABCMeta
-
   @abstractmethod
   def assign_output_time(self, window, input_timestamp):
     pass
@@ -72,11 +73,9 @@ def merge(self, unused_result_window, merging_timestamps):
     return self.combine_all(merging_timestamps)
 
 
-class DependsOnlyOnWindow(TimestampCombinerImpl):
+class DependsOnlyOnWindow(with_metaclass(ABCMeta, TimestampCombinerImpl)):
   """TimestampCombinerImpl that only depends on the window."""
 
-  __metaclass__ = ABCMeta
-
   def combine(self, output_timestamp, other_output_timestamp):
     return output_timestamp
 
diff --git a/sdks/python/apache_beam/transforms/trigger.py 
b/sdks/python/apache_beam/transforms/trigger.py
index 159b21b2225..c185a522218 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -20,13 +20,19 @@
 Triggers control when in processing time windows get emitted.
 """
 
+from __future__ import absolute_import
+
 import collections
 import copy
-import itertools
 import logging
 import numbers
 from abc import ABCMeta
 from abc import abstractmethod
+from builtins import object
+
+from future.moves.itertools import zip_longest
+from future.utils import iteritems
+from future.utils import with_metaclass
 
 from apache_beam.coders import observable
 from apache_beam.portability.api import beam_runner_api_pb2
@@ -68,14 +74,13 @@ class AccumulationMode(object):
   # RETRACTING = 3
 
 
-class _StateTag(object):
+class _StateTag(with_metaclass(ABCMeta, object)):
   """An identifier used to store and retrieve typed, combinable state.
 
   The given tag must be unique for this stage.  If CombineFn is None then
   all elements will be returned as a list, otherwise the given CombineFn
   will be applied (possibly incrementally and eagerly) when adding elements.
   """
-  __metaclass__ = ABCMeta
 
   def __init__(self, tag):
     self.tag = tag
@@ -136,12 +141,11 @@ def with_prefix(self, prefix):
 
 # pylint: disable=unused-argument
 # TODO(robertwb): Provisional API, Java likely to change as well.
-class TriggerFn(object):
+class TriggerFn(with_metaclass(ABCMeta, object)):
   """A TriggerFn determines when window (panes) are emitted.
 
   See https://beam.apache.org/documentation/programming-guide/#triggers
   """
-  __metaclass__ = ABCMeta
 
   @abstractmethod
   def on_element(self, element, window, context):
@@ -260,6 +264,9 @@ def reset(self, window, context):
   def __eq__(self, other):
     return type(self) == type(other)
 
+  def __hash__(self):
+    return hash(type(self))
+
   @staticmethod
   def from_runner_api(proto, context):
     return DefaultTrigger()
@@ -446,6 +453,9 @@ def __repr__(self):
   def __eq__(self, other):
     return type(self) == type(other) and self.count == other.count
 
+  def __hash__(self):
+    return hash(self.count)
+
   def on_element(self, element, window, context):
     context.add_state(self.COUNT_TAG, 1)
 
@@ -484,6 +494,9 @@ def __repr__(self):
   def __eq__(self, other):
     return type(self) == type(other) and self.underlying == other.underlying
 
+  def __hash__(self):
+    return hash(self.underlying)
+
   def on_element(self, element, window, context):
     self.underlying.on_element(element, window, context)
 
@@ -512,9 +525,7 @@ def to_runner_api(self, context):
             subtrigger=self.underlying.to_runner_api(context)))
 
 
-class _ParallelTriggerFn(TriggerFn):
-
-  __metaclass__ = ABCMeta
+class _ParallelTriggerFn(with_metaclass(ABCMeta, TriggerFn)):
 
   def __init__(self, *triggers):
     self.triggers = triggers
@@ -526,6 +537,9 @@ def __repr__(self):
   def __eq__(self, other):
     return type(self) == type(other) and self.triggers == other.triggers
 
+  def __hash__(self):
+    return hash(self.triggers)
+
   @abstractmethod
   def combine_op(self, trigger_results):
     pass
@@ -620,6 +634,9 @@ def __repr__(self):
   def __eq__(self, other):
     return type(self) == type(other) and self.triggers == other.triggers
 
+  def __hash__(self):
+    return hash(self.triggers)
+
   def on_element(self, element, window, context):
     ix = context.get_state(self.INDEX_TAG)
     if ix < len(self.triggers):
@@ -744,14 +761,12 @@ def clear_state(self, tag):
 
 
 # pylint: disable=unused-argument
-class SimpleState(object):
+class SimpleState(with_metaclass(ABCMeta, object)):
   """Basic state storage interface used for triggering.
 
   Only timers must hold the watermark (by their timestamp).
   """
 
-  __metaclass__ = ABCMeta
-
   @abstractmethod
   def set_timer(self, window, name, time_domain, timestamp):
     pass
@@ -863,7 +878,7 @@ def merge(self, to_be_merged, merge_result):
           self._persist_window_ids()
 
   def known_windows(self):
-    return self.window_ids.keys()
+    return list(self.window_ids)
 
   def get_window(self, window_id):
     for window, ids in self.window_ids.items():
@@ -922,11 +937,9 @@ def create_trigger_driver(windowing,
   return driver
 
 
-class TriggerDriver(object):
+class TriggerDriver(with_metaclass(ABCMeta, object)):
   """Breaks a series of bundle and timer firings into window (pane)s."""
 
-  __metaclass__ = ABCMeta
-
   @abstractmethod
   def process_elements(self, state, windowed_values, output_watermark):
     pass
@@ -972,10 +985,13 @@ def __eq__(self, other):
     if isinstance(other, collections.Iterable):
       return all(
           a == b
-          for a, b in itertools.izip_longest(self, other, fillvalue=object()))
+          for a, b in zip_longest(self, other, fillvalue=object()))
     else:
       return NotImplemented
 
+  def __hash__(self):
+    return hash(tuple(self))
+
   def __ne__(self, other):
     return not self == other
 
@@ -1250,7 +1266,7 @@ def get_and_clear_timers(self, watermark=MAX_TIMESTAMP):
 
   def get_earliest_hold(self):
     earliest_hold = MAX_TIMESTAMP
-    for unused_window, tagged_states in self.state.iteritems():
+    for unused_window, tagged_states in iteritems(self.state):
       # TODO(BEAM-2519): currently, this assumes that the watermark hold tag is
       # named "watermark".  This is currently only true because the only place
       # watermark holds are set is in the GeneralTriggerDriver, where we use
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py 
b/sdks/python/apache_beam/transforms/trigger_test.py
index 2e672bb0cf1..034abae65c8 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -17,10 +17,14 @@
 
 """Unit tests for the triggering classes."""
 
+from __future__ import absolute_import
+
 import collections
 import os.path
 import pickle
 import unittest
+from builtins import range
+from builtins import zip
 
 import yaml
 
@@ -382,7 +386,7 @@ def test_picklable_output(self):
       pickle.dumps(unpicklable)
     for unwindowed in driver.process_elements(None, unpicklable, None):
       self.assertEqual(pickle.loads(pickle.dumps(unwindowed)).value,
-                       range(10))
+                       list(range(10)))
 
 
 class RunnerApiTest(unittest.TestCase):
@@ -426,7 +430,7 @@ def format_result(k_v):
               # A-10, A-11 never emitted due to AfterCount(3) never firing.
               'B-4': {6, 7, 8, 9},
               'B-3': {10, 15, 16},
-          }.iteritems()))
+          }.items()))
 
 
 class TranscriptTest(unittest.TestCase):
@@ -556,7 +560,7 @@ def fire_timers():
 
     for line in spec['transcript']:
 
-      action, params = line.items()[0]
+      action, params = list(line.items())[0]
 
       if action != 'expect':
         # Fail if we have output that was not expected in the transcript.
diff --git a/sdks/python/apache_beam/transforms/userstate.py 
b/sdks/python/apache_beam/transforms/userstate.py
index 6a5fd581bb7..0f99da246a5 100644
--- a/sdks/python/apache_beam/transforms/userstate.py
+++ b/sdks/python/apache_beam/transforms/userstate.py
@@ -23,6 +23,7 @@
 from __future__ import absolute_import
 
 import types
+from builtins import object
 
 from apache_beam.coders import Coder
 from apache_beam.transforms.timeutil import TimeDomain
diff --git a/sdks/python/apache_beam/transforms/userstate_test.py 
b/sdks/python/apache_beam/transforms/userstate_test.py
index 8dbc9ce5e77..b891e628178 100644
--- a/sdks/python/apache_beam/transforms/userstate_test.py
+++ b/sdks/python/apache_beam/transforms/userstate_test.py
@@ -16,6 +16,7 @@
 #
 
 """Unit tests for the Beam State and Timer API interfaces."""
+from __future__ import absolute_import
 
 import unittest
 
diff --git a/sdks/python/apache_beam/transforms/util.py 
b/sdks/python/apache_beam/transforms/util.py
index 07cab545351..dbd0f709d6c 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -19,11 +19,18 @@
 """
 
 from __future__ import absolute_import
+from __future__ import division
 
 import collections
 import contextlib
 import random
 import time
+from builtins import object
+from builtins import range
+from builtins import zip
+
+from future.utils import itervalues
+from past.utils import old_div
 
 from apache_beam import typehints
 from apache_beam.metrics import Metrics
@@ -114,12 +121,12 @@ def __init__(self, **kwargs):
     super(CoGroupByKey, self).__init__()
     self.pipeline = kwargs.pop('pipeline', None)
     if kwargs:
-      raise ValueError('Unexpected keyword arguments: %s' % kwargs.keys())
+      raise ValueError('Unexpected keyword arguments: %s' % 
list(kwargs.keys()))
 
   def _extract_input_pvalues(self, pvalueish):
     try:
       # If this works, it's a dict.
-      return pvalueish, tuple(pvalueish.viewvalues())
+      return pvalueish, tuple(itervalues(pvalueish))
     except AttributeError:
       pcolls = tuple(pvalueish)
       return pcolls, pcolls
@@ -268,12 +275,12 @@ def _thin_data(self):
 
     def div_keys(kv1_kv2):
       (x1, _), (x2, _) = kv1_kv2
-      return x2 / x1
+      return old_div(x2, x1) # TODO(BEAM-4858)
 
     pairs = sorted(zip(sorted_data[::2], sorted_data[1::2]),
                    key=div_keys)
     # Keep the top 1/3 most different pairs, average the top 2/3 most similar.
-    threshold = 2 * len(pairs) / 3
+    threshold = 2 * len(pairs) // 3
     self._data = (
         list(sum(pairs[threshold:], ()))
         + [((x1 + x2) / 2.0, (t1 + t2) / 2.0)
diff --git a/sdks/python/apache_beam/transforms/util_test.py 
b/sdks/python/apache_beam/transforms/util_test.py
index d834a1c5efe..6cec4a5bf36 100644
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -17,9 +17,13 @@
 
 """Unit tests for the transform.util classes."""
 
+from __future__ import absolute_import
+
 import logging
 import time
 import unittest
+from builtins import object
+from builtins import range
 
 import apache_beam as beam
 from apache_beam.coders import coders
diff --git a/sdks/python/apache_beam/transforms/window.py 
b/sdks/python/apache_beam/transforms/window.py
index 5bc047b48c7..067227bb3f8 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -50,10 +50,13 @@
 from __future__ import absolute_import
 
 import abc
+from builtins import object
+from builtins import range
+from functools import total_ordering
 
+from future.utils import with_metaclass
 from google.protobuf import duration_pb2
 from google.protobuf import timestamp_pb2
-from past.builtins import cmp
 
 from apache_beam.coders import coders
 from apache_beam.portability import common_urns
@@ -109,11 +112,9 @@ def get_impl(timestamp_combiner, window_fn):
       raise ValueError('Invalid TimestampCombiner: %s.' % timestamp_combiner)
 
 
-class WindowFn(urns.RunnerApiFn):
+class WindowFn(with_metaclass(abc.ABCMeta, urns.RunnerApiFn)):
   """An abstract windowing function defining a basic assign and merge."""
 
-  __metaclass__ = abc.ABCMeta
-
   class AssignContext(object):
     """Context passed to WindowFn.assign()."""
 
@@ -191,15 +192,35 @@ def __init__(self, end):
   def max_timestamp(self):
     return self.end.predecessor()
 
-  def __cmp__(self, other):
-    # Order first by endpoint, then arbitrarily.
-    return cmp(self.end, other.end) or cmp(hash(self), hash(other))
-
   def __eq__(self, other):
     raise NotImplementedError
 
+  def __ne__(self, other):
+    #  Order first by endpoint, then arbitrarily
+    return self.end != other.end or hash(self) != hash(other)
+
+  def __lt__(self, other):
+    if self.end != other.end:
+      return self.end < other.end
+    return hash(self) < hash(other)
+
+  def __le__(self, other):
+    if self.end != other.end:
+      return self.end <= other.end
+    return hash(self) <= hash(other)
+
+  def __gt__(self, other):
+    if self.end != other.end:
+      return self.end > other.end
+    return hash(self) > hash(other)
+
+  def __ge__(self, other):
+    if self.end != other.end:
+      return self.end >= other.end
+    return hash(self) >= hash(other)
+
   def __hash__(self):
-    return hash(self.end)
+    raise NotImplementedError
 
   def __repr__(self):
     return '[?, %s)' % float(self.end)
@@ -221,7 +242,12 @@ def __hash__(self):
     return hash((self.start, self.end))
 
   def __eq__(self, other):
-    return self.start == other.start and self.end == other.end
+    return (self.start == other.start
+            and self.end == other.end
+            and type(self) == type(other))
+
+  def __ne__(self, other):
+    return not self == other
 
   def __repr__(self):
     return '[%s, %s)' % (float(self.start), float(self.end))
@@ -234,6 +260,7 @@ def union(self, other):
         min(self.start, other.start), max(self.end, other.end))
 
 
+@total_ordering
 class TimestampedValue(object):
   """A timestamped value having a value and a timestamp.
 
@@ -246,10 +273,23 @@ def __init__(self, value, timestamp):
     self.value = value
     self.timestamp = Timestamp.of(timestamp)
 
-  def __cmp__(self, other):
-    if type(self) is not type(other):
-      return cmp(type(self), type(other))
-    return cmp((self.value, self.timestamp), (other.value, other.timestamp))
+  def __eq__(self, other):
+    return (type(self) == type(other)
+            and self.value == other.value
+            and self.timestamp == other.timestamp)
+
+  def __hash__(self):
+    return hash((self.value, self.timestamp))
+
+  def __ne__(self, other):
+    return not self == other
+
+  def __lt__(self, other):
+    if type(self) != type(other):
+      return type(self).__name__ < type(other).__name__
+    if self.value != other.value:
+      return self.value < other.value
+    return self.timestamp < other.timestamp
 
 
 class GlobalWindow(BoundedWindow):
@@ -275,6 +315,9 @@ def __eq__(self, other):
     # Global windows are always and only equal to each other.
     return self is other or type(self) is type(other)
 
+  def __ne__(self, other):
+    return not self == other
+
 
 class NonMergingWindowFn(WindowFn):
 
@@ -348,6 +391,9 @@ def __eq__(self, other):
     if type(self) == type(other) == FixedWindows:
       return self.size == other.size and self.offset == other.offset
 
+  def __hash__(self):
+    return hash((self.size, self.offset))
+
   def __ne__(self, other):
     return not self == other
 
@@ -407,6 +453,12 @@ def __eq__(self, other):
               and self.offset == other.offset
               and self.period == other.period)
 
+  def __ne__(self, other):
+    return not self == other
+
+  def __hash__(self):
+    return hash((self.offset, self.period))
+
   def to_runner_api_parameter(self, context):
     return (common_urns.sliding_windows.urn,
             standard_window_fns_pb2.SlidingWindowsPayload(
@@ -474,6 +526,12 @@ def __eq__(self, other):
     if type(self) == type(other) == Sessions:
       return self.gap_size == other.gap_size
 
+  def __ne__(self, other):
+    return not self == other
+
+  def __hash__(self):
+    return hash(self.gap_size)
+
   def to_runner_api_parameter(self, context):
     return (common_urns.session_windows.urn,
             standard_window_fns_pb2.SessionsPayload(
diff --git a/sdks/python/apache_beam/transforms/window_test.py 
b/sdks/python/apache_beam/transforms/window_test.py
index 7c1d4e99f5e..77ab47e3dd8 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -16,8 +16,11 @@
 #
 
 """Unit tests for the windowing classes."""
+from __future__ import absolute_import
+from __future__ import division
 
 import unittest
+from builtins import range
 
 from apache_beam.runners import pipeline_context
 from apache_beam.testing.test_pipeline import TestPipeline
@@ -236,7 +239,7 @@ def test_timestamped_with_combiners(self):
                 # We add a 'key' to each value representing the index of the
                 # window. This is important since there is no guarantee of
                 # order for the elements of a PCollection.
-                | Map(lambda v: (v / 5, v)))
+                | Map(lambda v: (v // 5, v)))
       # Sum all elements associated with a key and window. Although it
       # is called CombinePerKey it is really CombinePerKeyAndWindow the
       # same way GroupByKey is really GroupByKeyAndWindow.
diff --git a/sdks/python/apache_beam/transforms/write_ptransform_test.py 
b/sdks/python/apache_beam/transforms/write_ptransform_test.py
index bf4941a5d5d..a8f56fd103b 100644
--- a/sdks/python/apache_beam/transforms/write_ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/write_ptransform_test.py
@@ -16,6 +16,8 @@
 #
 """Unit tests for the write transform."""
 
+from __future__ import absolute_import
+
 import logging
 import unittest
 
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index de2352aafe9..da4b00cafc1 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -116,6 +116,7 @@ modules =
   apache_beam/testing
   apache_beam/tools
   apache_beam/typehints
+  apache_beam/transforms
 commands =
   python --version
   pip --version


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 131066)
    Time Spent: 16h 10m  (was: 16h)

> Futurize and fix python 2 compatibility for transforms subpackage
> -----------------------------------------------------------------
>
>                 Key: BEAM-4006
>                 URL: https://issues.apache.org/jira/browse/BEAM-4006
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Robbe
>            Assignee: Matthias Feys
>            Priority: Major
>          Time Spent: 16h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to