[ https://issues.apache.org/jira/browse/BEAM-4006?focusedWorklogId=131066&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-131066 ]
ASF GitHub Bot logged work on BEAM-4006: ---------------------------------------- Author: ASF GitHub Bot Created on: 03/Aug/18 20:47 Start Date: 03/Aug/18 20:47 Worklog Time Spent: 10m Work Description: aaltay closed pull request #5729: [BEAM-4006] Futurize transforms subpackage URL: https://github.com/apache/beam/pull/5729 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 72791fc262b..9d4e79a9a9a 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -520,11 +520,11 @@ def test_dir(self): options = Breakfast() self.assertEquals( set(['from_dictionary', 'get_all_options', 'slices', 'style', - 'view_as', 'display_data']), + 'view_as', 'display_data', 'next']), set([attr for attr in dir(options) if not attr.startswith('_')])) self.assertEquals( set(['from_dictionary', 'get_all_options', 'style', 'view_as', - 'display_data']), + 'display_data', 'next']), set([attr for attr in dir(options.view_as(Eggs)) if not attr.startswith('_')])) diff --git a/sdks/python/apache_beam/transforms/__init__.py b/sdks/python/apache_beam/transforms/__init__.py index 3c04b370cfe..a207009f871 100644 --- a/sdks/python/apache_beam/transforms/__init__.py +++ b/sdks/python/apache_beam/transforms/__init__.py @@ -18,6 +18,8 @@ """PTransform and descendants.""" # pylint: disable=wildcard-import +from __future__ import absolute_import + from apache_beam.transforms import combiners from apache_beam.transforms.core import * from apache_beam.transforms.ptransform import * diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py index 9b0c0e81e35..8db0fe5e14f 100644 --- a/sdks/python/apache_beam/transforms/combiners.py +++ b/sdks/python/apache_beam/transforms/combiners.py @@ -18,9 +18,12 @@ """A library of basic combiner PTransform subclasses.""" from __future__ import absolute_import +from __future__ import division import operator import random +from builtins import object +from builtins import zip from past.builtins import long diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py index f372e881024..a768231ec6e 100644 --- a/sdks/python/apache_beam/transforms/combiners_test.py +++ b/sdks/python/apache_beam/transforms/combiners_test.py @@ -16,12 +16,15 @@ # """Unit tests for our libraries of combine PTransforms.""" +from __future__ import absolute_import +from __future__ import division import itertools import random import unittest import hamcrest as hc +from future.builtins import range import apache_beam as beam import apache_beam.transforms.combiners as combine @@ -286,7 +289,7 @@ def match(actual): def matcher(): def match(actual): equal_to([1])([len(actual)]) - equal_to(pairs)(actual[0].iteritems()) + equal_to(pairs)(actual[0].items()) return match assert_that(result, matcher()) pipeline.run() diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index bbd78342a7f..fa867e5231d 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -21,12 +21,15 @@ import copy import inspect -import itertools import random import re import types +from builtins import map +from builtins import object +from builtins import range -from six import string_types +from future.builtins import filter +from past.builtins import unicode from apache_beam import coders from apache_beam import pvalue @@ -82,7 +85,6 @@ 'Impulse', ] - # Type variables T = typehints.TypeVariable('T') K = typehints.TypeVariable('K') @@ -291,6 +293,9 @@ def __eq__(self, other): return self.param_id == other.param_id return False + def __hash__(self): + return hash(self.param_id) + def __repr__(self): return self.param_id @@ -698,7 +703,7 @@ def merge_accumulators(self, accumulators, *args, **kwargs): class ReiterableNonEmptyAccumulators(object): def __iter__(self): - return itertools.ifilter(filter_fn, accumulators) + return filter(filter_fn, accumulators) # It's (weakly) assumed that self._fn is associative. return self._fn(ReiterableNonEmptyAccumulators(), *args, **kwargs) @@ -902,7 +907,8 @@ def with_outputs(self, *tags, **main_kw): """ main_tag = main_kw.pop('main', None) if main_kw: - raise ValueError('Unexpected keyword arguments: %s' % main_kw.keys()) + raise ValueError('Unexpected keyword arguments: %s' % + list(main_kw)) return _MultiParDo(self, tags, main_tag) def _pardo_fn_data(self): @@ -1666,7 +1672,6 @@ def expand(self, pcoll): class Windowing(object): - def __init__(self, windowfn, triggerfn=None, accumulation_mode=None, timestamp_combiner=None): global AccumulationMode, DefaultTrigger # pylint: disable=global-variable-not-assigned @@ -1712,6 +1717,10 @@ def __eq__(self, other): and self.timestamp_combiner == other.timestamp_combiner) return False + def __hash__(self): + return hash((self.windowfn, self.accumulation_mode, + self.timestamp_combiner)) + def is_default(self): return self._is_default @@ -1792,7 +1801,7 @@ def __init__(self, windowfn, **kwargs): accumulation_mode = kwargs.pop('accumulation_mode', None) timestamp_combiner = kwargs.pop('timestamp_combiner', None) if kwargs: - raise ValueError('Unexpected keyword arguments: %s' % kwargs.keys()) + raise ValueError('Unexpected keyword arguments: %s' % list(kwargs)) self.windowing = Windowing( windowfn, triggerfn, accumulation_mode, timestamp_combiner) super(WindowInto, self).__init__(self.WindowIntoFn(self.windowing)) @@ -1861,7 +1870,7 @@ def __init__(self, **kwargs): super(Flatten, self).__init__() self.pipeline = kwargs.pop('pipeline', None) if kwargs: - raise ValueError('Unexpected keyword arguments: %s' % kwargs.keys()) + raise ValueError('Unexpected keyword arguments: %s' % list(kwargs)) def _extract_input_pvalues(self, pvalueish): try: @@ -1906,7 +1915,7 @@ def __init__(self, value): value: An object of values for the PCollection """ super(Create, self).__init__() - if isinstance(value, string_types): + if isinstance(value, (unicode, str, bytes)): raise TypeError('PTransform Create: Refusing to treat string as ' 'an iterable. (string=%r)' % value) elif isinstance(value, dict): @@ -1941,7 +1950,7 @@ def get_windowing(self, unused_inputs): @staticmethod def _create_source_from_iterable(values, coder): - return Create._create_source(map(coder.encode, values), coder) + return Create._create_source(list(map(coder.encode, values)), coder) @staticmethod def _create_source(serialized_values, coder): diff --git a/sdks/python/apache_beam/transforms/create_source.py b/sdks/python/apache_beam/transforms/create_source.py index 3d02d39463c..aa26cebc43f 100644 --- a/sdks/python/apache_beam/transforms/create_source.py +++ b/sdks/python/apache_beam/transforms/create_source.py @@ -15,6 +15,13 @@ # limitations under the License. # +from __future__ import absolute_import +from __future__ import division + +from builtins import map +from builtins import next +from builtins import range + from apache_beam.io import iobase from apache_beam.transforms.core import Create @@ -57,15 +64,15 @@ def split(self, desired_bundle_size, start_position=None, start_position = 0 if stop_position is None: stop_position = len(self._serialized_values) - avg_size_per_value = self._total_size / len(self._serialized_values) + avg_size_per_value = self._total_size // len(self._serialized_values) num_values_per_split = max( - int(desired_bundle_size / avg_size_per_value), 1) + int(desired_bundle_size // avg_size_per_value), 1) start = start_position while start < stop_position: end = min(start + num_values_per_split, stop_position) remaining = stop_position - end # Avoid having a too small bundle at the end. - if remaining < (num_values_per_split / 4): + if remaining < (num_values_per_split // 4): end = stop_position sub_source = Create._create_source( self._serialized_values[start:end], self._coder) diff --git a/sdks/python/apache_beam/transforms/create_test.py b/sdks/python/apache_beam/transforms/create_test.py index b5d02acc8b1..ada36725179 100644 --- a/sdks/python/apache_beam/transforms/create_test.py +++ b/sdks/python/apache_beam/transforms/create_test.py @@ -16,8 +16,12 @@ # """Unit tests for the Create and _CreateSource classes.""" +from __future__ import absolute_import +from __future__ import division + import logging import unittest +from builtins import range from apache_beam import Create from apache_beam.coders import FastPrimitivesCoder @@ -33,13 +37,13 @@ def setUp(self): def test_create_transform(self): with TestPipeline() as p: - assert_that(p | Create(range(10)), equal_to(range(10))) + assert_that(p | Create(list(range(10))), equal_to(list(range(10)))) def test_create_source_read(self): self.check_read([], self.coder) self.check_read([1], self.coder) # multiple values. - self.check_read(range(10), self.coder) + self.check_read(list(range(10)), self.coder) def check_read(self, values, coder): source = Create._create_source_from_iterable(values, coder) @@ -49,7 +53,7 @@ def check_read(self, values, coder): def test_create_source_read_with_initial_splits(self): self.check_read_with_initial_splits([], self.coder, num_splits=2) self.check_read_with_initial_splits([1], self.coder, num_splits=2) - values = range(8) + values = list(range(8)) # multiple values with a single split. self.check_read_with_initial_splits(values, self.coder, num_splits=1) # multiple values with a single split with a large desired bundle size @@ -70,7 +74,7 @@ def check_read_with_initial_splits(self, values, coder, num_splits): from the split sources. """ source = Create._create_source_from_iterable(values, coder) - desired_bundle_size = source._total_size / num_splits + desired_bundle_size = source._total_size // num_splits splits = source.split(desired_bundle_size) splits_info = [ (split.source, split.start_position, split.stop_position) diff --git a/sdks/python/apache_beam/transforms/cy_combiners.py b/sdks/python/apache_beam/transforms/cy_combiners.py index 53a440e537e..2234ef98d87 100644 --- a/sdks/python/apache_beam/transforms/cy_combiners.py +++ b/sdks/python/apache_beam/transforms/cy_combiners.py @@ -15,12 +15,17 @@ # limitations under the License. # +# cython: language_level=3 + """A library of basic cythonized CombineFn subclasses. For internal use only; no backwards-compatibility guarantees. """ from __future__ import absolute_import +from __future__ import division + +from builtins import object from apache_beam.transforms import core @@ -162,7 +167,7 @@ def extract_output(self): self.sum %= 2**64 if self.sum >= INT64_MAX: self.sum -= 2**64 - return self.sum / self.count if self.count else _NAN + return self.sum // self.count if self.count else _NAN class CountCombineFn(AccumulatorCombineFn): @@ -258,7 +263,7 @@ def merge(self, accumulators): self.count += accumulator.count def extract_output(self): - return self.sum / self.count if self.count else _NAN + return self.sum // self.count if self.count else _NAN class SumFloatFn(AccumulatorCombineFn): diff --git a/sdks/python/apache_beam/transforms/dataflow_distribution_counter_test.py b/sdks/python/apache_beam/transforms/dataflow_distribution_counter_test.py index e3d3c6e5a5a..91a888a0836 100644 --- a/sdks/python/apache_beam/transforms/dataflow_distribution_counter_test.py +++ b/sdks/python/apache_beam/transforms/dataflow_distribution_counter_test.py @@ -14,6 +14,8 @@ otherwise, test on pure python module """ +from __future__ import absolute_import + import unittest from mock import Mock diff --git a/sdks/python/apache_beam/transforms/display.py b/sdks/python/apache_beam/transforms/display.py index 4206f2110b7..ce10174e00d 100644 --- a/sdks/python/apache_beam/transforms/display.py +++ b/sdks/python/apache_beam/transforms/display.py @@ -41,10 +41,11 @@ import calendar import inspect import json +from builtins import object from datetime import datetime from datetime import timedelta -import six +from past.builtins import unicode __all__ = ['HasDisplayData', 'DisplayDataItem', 'DisplayData'] @@ -169,7 +170,7 @@ class DisplayDataItem(object): display item belongs to. """ typeDict = {str:'STRING', - six.text_type:'STRING', + unicode:'STRING', int:'INTEGER', float:'FLOAT', bool: 'BOOLEAN', diff --git a/sdks/python/apache_beam/transforms/display_test.py b/sdks/python/apache_beam/transforms/display_test.py index 90bde8caa8c..bdaade68fa0 100644 --- a/sdks/python/apache_beam/transforms/display_test.py +++ b/sdks/python/apache_beam/transforms/display_test.py @@ -24,8 +24,8 @@ # pylint: disable=ungrouped-imports import hamcrest as hc -import six from hamcrest.core.base_matcher import BaseMatcher +from past.builtins import unicode import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions @@ -165,7 +165,7 @@ def test_create_list_display_data(self): def test_unicode_type_display_data(self): class MyDoFn(beam.DoFn): def display_data(self): - return {'unicode_string': six.text_type('my string'), + return {'unicode_string': unicode('my string'), 'unicode_literal_string': u'my literal string'} fn = MyDoFn() diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 889372f9266..7a53fbe25b0 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -43,6 +43,9 @@ class and wrapper class that allows lambda functions to be used as import os import sys import threading +from builtins import hex +from builtins import object +from builtins import zip from functools import reduce from google.protobuf import message @@ -622,7 +625,7 @@ def __init__(self, fn, *args, **kwargs): super(PTransformWithSideInputs, self).__init__() if (any([isinstance(v, pvalue.PCollection) for v in args]) or - any([isinstance(v, pvalue.PCollection) for v in kwargs.itervalues()])): + any([isinstance(v, pvalue.PCollection) for v in kwargs.values()])): raise error.SideInputError( 'PCollection used directly as side input argument. Specify ' 'AsIter(pcollection) or AsSingleton(pcollection) to indicate how the ' diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 907ee04c079..c594e6ab28b 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -18,12 +18,16 @@ """Unit tests for the PTransform and descendants.""" from __future__ import absolute_import +from __future__ import division from __future__ import print_function import collections import operator import re import unittest +from builtins import map +from builtins import range +from builtins import zip from functools import reduce import hamcrest as hc @@ -382,7 +386,7 @@ def test_combine_with_combine_fn(self): pipeline = TestPipeline() pcoll = pipeline | 'Start' >> beam.Create(vals) result = pcoll | 'Mean' >> beam.CombineGlobally(self._MeanCombineFn()) - assert_that(result, equal_to([sum(vals) / len(vals)])) + assert_that(result, equal_to([sum(vals) // len(vals)])) pipeline.run() def test_combine_with_callable(self): @@ -413,8 +417,8 @@ def test_combine_per_key_with_combine_fn(self): pcoll = pipeline | 'Start' >> beam.Create(([('a', x) for x in vals_1] + [('b', x) for x in vals_2])) result = pcoll | 'Mean' >> beam.CombinePerKey(self._MeanCombineFn()) - assert_that(result, equal_to([('a', sum(vals_1) / len(vals_1)), - ('b', sum(vals_2) / len(vals_2))])) + assert_that(result, equal_to([('a', sum(vals_1) // len(vals_1)), + ('b', sum(vals_2) // len(vals_2))])) pipeline.run() def test_combine_per_key_with_callable(self): diff --git a/sdks/python/apache_beam/transforms/py_dataflow_distribution_counter.py b/sdks/python/apache_beam/transforms/py_dataflow_distribution_counter.py index fc9b4d22a8c..980abab47c4 100644 --- a/sdks/python/apache_beam/transforms/py_dataflow_distribution_counter.py +++ b/sdks/python/apache_beam/transforms/py_dataflow_distribution_counter.py @@ -17,6 +17,10 @@ """For internal use only; no backwards-compatibility guarantees.""" +from __future__ import absolute_import + +from builtins import object +from builtins import range globals()['INT64_MAX'] = 2**63 - 1 globals()['INT64_MIN'] = -2**63 diff --git a/sdks/python/apache_beam/transforms/sideinputs.py b/sdks/python/apache_beam/transforms/sideinputs.py index f10cb92ed5e..21fc919b72d 100644 --- a/sdks/python/apache_beam/transforms/sideinputs.py +++ b/sdks/python/apache_beam/transforms/sideinputs.py @@ -26,6 +26,8 @@ from __future__ import absolute_import +from builtins import object + from apache_beam.transforms import window diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py index 6b93b8e9137..f9c9ae93d62 100644 --- a/sdks/python/apache_beam/transforms/sideinputs_test.py +++ b/sdks/python/apache_beam/transforms/sideinputs_test.py @@ -17,6 +17,8 @@ """Unit tests for side inputs.""" +from __future__ import absolute_import + import logging import unittest @@ -196,7 +198,7 @@ def match(actual): [[actual_elem, actual_list, actual_dict]] = actual equal_to([expected_elem])([actual_elem]) equal_to(expected_list)(actual_list) - equal_to(expected_pairs)(actual_dict.iteritems()) + equal_to(expected_pairs)(actual_dict.items()) return match assert_that(results, matcher(1, a_list, some_pairs)) @@ -286,8 +288,8 @@ def matcher(expected_elem, expected_kvs): def match(actual): [[actual_elem, actual_dict1, actual_dict2]] = actual equal_to([expected_elem])([actual_elem]) - equal_to(expected_kvs)(actual_dict1.iteritems()) - equal_to(expected_kvs)(actual_dict2.iteritems()) + equal_to(expected_kvs)(actual_dict1.items()) + equal_to(expected_kvs)(actual_dict2.items()) return match assert_that(results, matcher(1, some_kvs)) diff --git a/sdks/python/apache_beam/transforms/timeutil.py b/sdks/python/apache_beam/transforms/timeutil.py index 8d63d49baad..bf30a131392 100644 --- a/sdks/python/apache_beam/transforms/timeutil.py +++ b/sdks/python/apache_beam/transforms/timeutil.py @@ -21,6 +21,9 @@ from abc import ABCMeta from abc import abstractmethod +from builtins import object + +from future.utils import with_metaclass __all__ = [ 'TimeDomain', @@ -43,11 +46,9 @@ def from_string(domain): raise ValueError('Unknown time domain: %s' % domain) -class TimestampCombinerImpl(object): +class TimestampCombinerImpl(with_metaclass(ABCMeta, object)): """Implementation of TimestampCombiner.""" - __metaclass__ = ABCMeta - @abstractmethod def assign_output_time(self, window, input_timestamp): pass @@ -72,11 +73,9 @@ def merge(self, unused_result_window, merging_timestamps): return self.combine_all(merging_timestamps) -class DependsOnlyOnWindow(TimestampCombinerImpl): +class DependsOnlyOnWindow(with_metaclass(ABCMeta, TimestampCombinerImpl)): """TimestampCombinerImpl that only depends on the window.""" - __metaclass__ = ABCMeta - def combine(self, output_timestamp, other_output_timestamp): return output_timestamp diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py index 159b21b2225..c185a522218 100644 --- a/sdks/python/apache_beam/transforms/trigger.py +++ b/sdks/python/apache_beam/transforms/trigger.py @@ -20,13 +20,19 @@ Triggers control when in processing time windows get emitted. """ +from __future__ import absolute_import + import collections import copy -import itertools import logging import numbers from abc import ABCMeta from abc import abstractmethod +from builtins import object + +from future.moves.itertools import zip_longest +from future.utils import iteritems +from future.utils import with_metaclass from apache_beam.coders import observable from apache_beam.portability.api import beam_runner_api_pb2 @@ -68,14 +74,13 @@ class AccumulationMode(object): # RETRACTING = 3 -class _StateTag(object): +class _StateTag(with_metaclass(ABCMeta, object)): """An identifier used to store and retrieve typed, combinable state. The given tag must be unique for this stage. If CombineFn is None then all elements will be returned as a list, otherwise the given CombineFn will be applied (possibly incrementally and eagerly) when adding elements. """ - __metaclass__ = ABCMeta def __init__(self, tag): self.tag = tag @@ -136,12 +141,11 @@ def with_prefix(self, prefix): # pylint: disable=unused-argument # TODO(robertwb): Provisional API, Java likely to change as well. -class TriggerFn(object): +class TriggerFn(with_metaclass(ABCMeta, object)): """A TriggerFn determines when window (panes) are emitted. See https://beam.apache.org/documentation/programming-guide/#triggers """ - __metaclass__ = ABCMeta @abstractmethod def on_element(self, element, window, context): @@ -260,6 +264,9 @@ def reset(self, window, context): def __eq__(self, other): return type(self) == type(other) + def __hash__(self): + return hash(type(self)) + @staticmethod def from_runner_api(proto, context): return DefaultTrigger() @@ -446,6 +453,9 @@ def __repr__(self): def __eq__(self, other): return type(self) == type(other) and self.count == other.count + def __hash__(self): + return hash(self.count) + def on_element(self, element, window, context): context.add_state(self.COUNT_TAG, 1) @@ -484,6 +494,9 @@ def __repr__(self): def __eq__(self, other): return type(self) == type(other) and self.underlying == other.underlying + def __hash__(self): + return hash(self.underlying) + def on_element(self, element, window, context): self.underlying.on_element(element, window, context) @@ -512,9 +525,7 @@ def to_runner_api(self, context): subtrigger=self.underlying.to_runner_api(context))) -class _ParallelTriggerFn(TriggerFn): - - __metaclass__ = ABCMeta +class _ParallelTriggerFn(with_metaclass(ABCMeta, TriggerFn)): def __init__(self, *triggers): self.triggers = triggers @@ -526,6 +537,9 @@ def __repr__(self): def __eq__(self, other): return type(self) == type(other) and self.triggers == other.triggers + def __hash__(self): + return hash(self.triggers) + @abstractmethod def combine_op(self, trigger_results): pass @@ -620,6 +634,9 @@ def __repr__(self): def __eq__(self, other): return type(self) == type(other) and self.triggers == other.triggers + def __hash__(self): + return hash(self.triggers) + def on_element(self, element, window, context): ix = context.get_state(self.INDEX_TAG) if ix < len(self.triggers): @@ -744,14 +761,12 @@ def clear_state(self, tag): # pylint: disable=unused-argument -class SimpleState(object): +class SimpleState(with_metaclass(ABCMeta, object)): """Basic state storage interface used for triggering. Only timers must hold the watermark (by their timestamp). """ - __metaclass__ = ABCMeta - @abstractmethod def set_timer(self, window, name, time_domain, timestamp): pass @@ -863,7 +878,7 @@ def merge(self, to_be_merged, merge_result): self._persist_window_ids() def known_windows(self): - return self.window_ids.keys() + return list(self.window_ids) def get_window(self, window_id): for window, ids in self.window_ids.items(): @@ -922,11 +937,9 @@ def create_trigger_driver(windowing, return driver -class TriggerDriver(object): +class TriggerDriver(with_metaclass(ABCMeta, object)): """Breaks a series of bundle and timer firings into window (pane)s.""" - __metaclass__ = ABCMeta - @abstractmethod def process_elements(self, state, windowed_values, output_watermark): pass @@ -972,10 +985,13 @@ def __eq__(self, other): if isinstance(other, collections.Iterable): return all( a == b - for a, b in itertools.izip_longest(self, other, fillvalue=object())) + for a, b in zip_longest(self, other, fillvalue=object())) else: return NotImplemented + def __hash__(self): + return hash(tuple(self)) + def __ne__(self, other): return not self == other @@ -1250,7 +1266,7 @@ def get_and_clear_timers(self, watermark=MAX_TIMESTAMP): def get_earliest_hold(self): earliest_hold = MAX_TIMESTAMP - for unused_window, tagged_states in self.state.iteritems(): + for unused_window, tagged_states in iteritems(self.state): # TODO(BEAM-2519): currently, this assumes that the watermark hold tag is # named "watermark". This is currently only true because the only place # watermark holds are set is in the GeneralTriggerDriver, where we use diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py index 2e672bb0cf1..034abae65c8 100644 --- a/sdks/python/apache_beam/transforms/trigger_test.py +++ b/sdks/python/apache_beam/transforms/trigger_test.py @@ -17,10 +17,14 @@ """Unit tests for the triggering classes.""" +from __future__ import absolute_import + import collections import os.path import pickle import unittest +from builtins import range +from builtins import zip import yaml @@ -382,7 +386,7 @@ def test_picklable_output(self): pickle.dumps(unpicklable) for unwindowed in driver.process_elements(None, unpicklable, None): self.assertEqual(pickle.loads(pickle.dumps(unwindowed)).value, - range(10)) + list(range(10))) class RunnerApiTest(unittest.TestCase): @@ -426,7 +430,7 @@ def format_result(k_v): # A-10, A-11 never emitted due to AfterCount(3) never firing. 'B-4': {6, 7, 8, 9}, 'B-3': {10, 15, 16}, - }.iteritems())) + }.items())) class TranscriptTest(unittest.TestCase): @@ -556,7 +560,7 @@ def fire_timers(): for line in spec['transcript']: - action, params = line.items()[0] + action, params = list(line.items())[0] if action != 'expect': # Fail if we have output that was not expected in the transcript. diff --git a/sdks/python/apache_beam/transforms/userstate.py b/sdks/python/apache_beam/transforms/userstate.py index 6a5fd581bb7..0f99da246a5 100644 --- a/sdks/python/apache_beam/transforms/userstate.py +++ b/sdks/python/apache_beam/transforms/userstate.py @@ -23,6 +23,7 @@ from __future__ import absolute_import import types +from builtins import object from apache_beam.coders import Coder from apache_beam.transforms.timeutil import TimeDomain diff --git a/sdks/python/apache_beam/transforms/userstate_test.py b/sdks/python/apache_beam/transforms/userstate_test.py index 8dbc9ce5e77..b891e628178 100644 --- a/sdks/python/apache_beam/transforms/userstate_test.py +++ b/sdks/python/apache_beam/transforms/userstate_test.py @@ -16,6 +16,7 @@ # """Unit tests for the Beam State and Timer API interfaces.""" +from __future__ import absolute_import import unittest diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 07cab545351..dbd0f709d6c 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -19,11 +19,18 @@ """ from __future__ import absolute_import +from __future__ import division import collections import contextlib import random import time +from builtins import object +from builtins import range +from builtins import zip + +from future.utils import itervalues +from past.utils import old_div from apache_beam import typehints from apache_beam.metrics import Metrics @@ -114,12 +121,12 @@ def __init__(self, **kwargs): super(CoGroupByKey, self).__init__() self.pipeline = kwargs.pop('pipeline', None) if kwargs: - raise ValueError('Unexpected keyword arguments: %s' % kwargs.keys()) + raise ValueError('Unexpected keyword arguments: %s' % list(kwargs.keys())) def _extract_input_pvalues(self, pvalueish): try: # If this works, it's a dict. - return pvalueish, tuple(pvalueish.viewvalues()) + return pvalueish, tuple(itervalues(pvalueish)) except AttributeError: pcolls = tuple(pvalueish) return pcolls, pcolls @@ -268,12 +275,12 @@ def _thin_data(self): def div_keys(kv1_kv2): (x1, _), (x2, _) = kv1_kv2 - return x2 / x1 + return old_div(x2, x1) # TODO(BEAM-4858) pairs = sorted(zip(sorted_data[::2], sorted_data[1::2]), key=div_keys) # Keep the top 1/3 most different pairs, average the top 2/3 most similar. - threshold = 2 * len(pairs) / 3 + threshold = 2 * len(pairs) // 3 self._data = ( list(sum(pairs[threshold:], ())) + [((x1 + x2) / 2.0, (t1 + t2) / 2.0) diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py index d834a1c5efe..6cec4a5bf36 100644 --- a/sdks/python/apache_beam/transforms/util_test.py +++ b/sdks/python/apache_beam/transforms/util_test.py @@ -17,9 +17,13 @@ """Unit tests for the transform.util classes.""" +from __future__ import absolute_import + import logging import time import unittest +from builtins import object +from builtins import range import apache_beam as beam from apache_beam.coders import coders diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index 5bc047b48c7..067227bb3f8 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -50,10 +50,13 @@ from __future__ import absolute_import import abc +from builtins import object +from builtins import range +from functools import total_ordering +from future.utils import with_metaclass from google.protobuf import duration_pb2 from google.protobuf import timestamp_pb2 -from past.builtins import cmp from apache_beam.coders import coders from apache_beam.portability import common_urns @@ -109,11 +112,9 @@ def get_impl(timestamp_combiner, window_fn): raise ValueError('Invalid TimestampCombiner: %s.' % timestamp_combiner) -class WindowFn(urns.RunnerApiFn): +class WindowFn(with_metaclass(abc.ABCMeta, urns.RunnerApiFn)): """An abstract windowing function defining a basic assign and merge.""" - __metaclass__ = abc.ABCMeta - class AssignContext(object): """Context passed to WindowFn.assign().""" @@ -191,15 +192,35 @@ def __init__(self, end): def max_timestamp(self): return self.end.predecessor() - def __cmp__(self, other): - # Order first by endpoint, then arbitrarily. - return cmp(self.end, other.end) or cmp(hash(self), hash(other)) - def __eq__(self, other): raise NotImplementedError + def __ne__(self, other): + # Order first by endpoint, then arbitrarily + return self.end != other.end or hash(self) != hash(other) + + def __lt__(self, other): + if self.end != other.end: + return self.end < other.end + return hash(self) < hash(other) + + def __le__(self, other): + if self.end != other.end: + return self.end <= other.end + return hash(self) <= hash(other) + + def __gt__(self, other): + if self.end != other.end: + return self.end > other.end + return hash(self) > hash(other) + + def __ge__(self, other): + if self.end != other.end: + return self.end >= other.end + return hash(self) >= hash(other) + def __hash__(self): - return hash(self.end) + raise NotImplementedError def __repr__(self): return '[?, %s)' % float(self.end) @@ -221,7 +242,12 @@ def __hash__(self): return hash((self.start, self.end)) def __eq__(self, other): - return self.start == other.start and self.end == other.end + return (self.start == other.start + and self.end == other.end + and type(self) == type(other)) + + def __ne__(self, other): + return not self == other def __repr__(self): return '[%s, %s)' % (float(self.start), float(self.end)) @@ -234,6 +260,7 @@ def union(self, other): min(self.start, other.start), max(self.end, other.end)) +@total_ordering class TimestampedValue(object): """A timestamped value having a value and a timestamp. @@ -246,10 +273,23 @@ def __init__(self, value, timestamp): self.value = value self.timestamp = Timestamp.of(timestamp) - def __cmp__(self, other): - if type(self) is not type(other): - return cmp(type(self), type(other)) - return cmp((self.value, self.timestamp), (other.value, other.timestamp)) + def __eq__(self, other): + return (type(self) == type(other) + and self.value == other.value + and self.timestamp == other.timestamp) + + def __hash__(self): + return hash((self.value, self.timestamp)) + + def __ne__(self, other): + return not self == other + + def __lt__(self, other): + if type(self) != type(other): + return type(self).__name__ < type(other).__name__ + if self.value != other.value: + return self.value < other.value + return self.timestamp < other.timestamp class GlobalWindow(BoundedWindow): @@ -275,6 +315,9 @@ def __eq__(self, other): # Global windows are always and only equal to each other. return self is other or type(self) is type(other) + def __ne__(self, other): + return not self == other + class NonMergingWindowFn(WindowFn): @@ -348,6 +391,9 @@ def __eq__(self, other): if type(self) == type(other) == FixedWindows: return self.size == other.size and self.offset == other.offset + def __hash__(self): + return hash((self.size, self.offset)) + def __ne__(self, other): return not self == other @@ -407,6 +453,12 @@ def __eq__(self, other): and self.offset == other.offset and self.period == other.period) + def __ne__(self, other): + return not self == other + + def __hash__(self): + return hash((self.offset, self.period)) + def to_runner_api_parameter(self, context): return (common_urns.sliding_windows.urn, standard_window_fns_pb2.SlidingWindowsPayload( @@ -474,6 +526,12 @@ def __eq__(self, other): if type(self) == type(other) == Sessions: return self.gap_size == other.gap_size + def __ne__(self, other): + return not self == other + + def __hash__(self): + return hash(self.gap_size) + def to_runner_api_parameter(self, context): return (common_urns.session_windows.urn, standard_window_fns_pb2.SessionsPayload( diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py index 7c1d4e99f5e..77ab47e3dd8 100644 --- a/sdks/python/apache_beam/transforms/window_test.py +++ b/sdks/python/apache_beam/transforms/window_test.py @@ -16,8 +16,11 @@ # """Unit tests for the windowing classes.""" +from __future__ import absolute_import +from __future__ import division import unittest +from builtins import range from apache_beam.runners import pipeline_context from apache_beam.testing.test_pipeline import TestPipeline @@ -236,7 +239,7 @@ def test_timestamped_with_combiners(self): # We add a 'key' to each value representing the index of the # window. This is important since there is no guarantee of # order for the elements of a PCollection. - | Map(lambda v: (v / 5, v))) + | Map(lambda v: (v // 5, v))) # Sum all elements associated with a key and window. Although it # is called CombinePerKey it is really CombinePerKeyAndWindow the # same way GroupByKey is really GroupByKeyAndWindow. diff --git a/sdks/python/apache_beam/transforms/write_ptransform_test.py b/sdks/python/apache_beam/transforms/write_ptransform_test.py index bf4941a5d5d..a8f56fd103b 100644 --- a/sdks/python/apache_beam/transforms/write_ptransform_test.py +++ b/sdks/python/apache_beam/transforms/write_ptransform_test.py @@ -16,6 +16,8 @@ # """Unit tests for the write transform.""" +from __future__ import absolute_import + import logging import unittest diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index de2352aafe9..da4b00cafc1 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -116,6 +116,7 @@ modules = apache_beam/testing apache_beam/tools apache_beam/typehints + apache_beam/transforms commands = python --version pip --version ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 131066) Time Spent: 16h 10m (was: 16h) > Futurize and fix python 2 compatibility for transforms subpackage > ----------------------------------------------------------------- > > Key: BEAM-4006 > URL: https://issues.apache.org/jira/browse/BEAM-4006 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core > Reporter: Robbe > Assignee: Matthias Feys > Priority: Major > Time Spent: 16h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)