Repository: beam Updated Branches: refs/heads/master 3fb75d3c2 -> 08da94df4
[BEAM-1964] Fix lint issues for linter upgrade Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d2a7d1e8 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d2a7d1e8 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d2a7d1e8 Branch: refs/heads/master Commit: d2a7d1e8af904d432f0a5613ed1ff0b87a79d3ec Parents: 3fb75d3 Author: Sourabh Bajaj <[email protected]> Authored: Thu Apr 13 16:18:32 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Thu Apr 13 17:17:24 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/coders/coder_impl.py | 50 +++++---- sdks/python/apache_beam/coders/coders.py | 6 +- .../examples/complete/game/user_score.py | 3 - .../examples/complete/top_wikipedia_sessions.py | 8 -- .../examples/snippets/snippets_test.py | 3 +- .../apache_beam/examples/wordcount_debugging.py | 4 - sdks/python/apache_beam/pvalue.py | 16 +-- sdks/python/apache_beam/transforms/core.py | 102 ++++++++----------- sdks/python/apache_beam/transforms/display.py | 3 +- .../python/apache_beam/transforms/ptransform.py | 36 +++---- .../apache_beam/transforms/ptransform_test.py | 3 +- .../python/apache_beam/transforms/sideinputs.py | 11 +- sdks/python/apache_beam/transforms/trigger.py | 24 ++--- .../apache_beam/transforms/trigger_test.py | 6 +- sdks/python/apache_beam/typehints/decorators.py | 23 ++--- .../apache_beam/typehints/trivial_inference.py | 26 ++--- .../typehints/trivial_inference_test.py | 3 +- sdks/python/apache_beam/typehints/typecheck.py | 7 +- sdks/python/apache_beam/typehints/typehints.py | 63 +++++------- .../apache_beam/typehints/typehints_test.py | 5 +- .../apache_beam/utils/annotations_test.py | 2 +- sdks/python/apache_beam/utils/path.py | 3 +- sdks/python/apache_beam/utils/proto_utils.py | 15 ++- sdks/python/apache_beam/utils/retry.py | 12 +-- sdks/python/apache_beam/utils/timestamp.py | 6 +- sdks/python/apache_beam/utils/windowed_value.py | 17 ++-- sdks/python/run_pylint.sh | 2 +- 27 files changed, 177 insertions(+), 282 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d2a7d1e8/sdks/python/apache_beam/coders/coder_impl.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index 137d1be..d56606d 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -161,8 +161,8 @@ class CallbackCoderImpl(CoderImpl): if isinstance(value, observable.ObservableMixin): # CallbackCoderImpl can presumably encode the elements too. return 1, [(value, self)] - else: - return self.estimate_size(value, nested), [] + + return self.estimate_size(value, nested), [] class DeterministicFastPrimitivesCoderImpl(CoderImpl): @@ -243,10 +243,10 @@ class FastPrimitivesCoderImpl(StreamCoderImpl): if isinstance(value, observable.ObservableMixin): # FastPrimitivesCoderImpl can presumably encode the elements too. return 1, [(value, self)] - else: - out = ByteCountingOutputStream() - self.encode_to_stream(value, out, nested) - return out.get_count(), [] + + out = ByteCountingOutputStream() + self.encode_to_stream(value, out, nested) + return out.get_count(), [] def encode_to_stream(self, value, stream, nested): t = type(value) @@ -304,8 +304,7 @@ class FastPrimitivesCoderImpl(StreamCoderImpl): return vlist elif t == TUPLE_TYPE: return tuple(vlist) - else: - return set(vlist) + return set(vlist) elif t == DICT_TYPE: vlen = stream.read_var_int64() v = {} @@ -315,8 +314,8 @@ class FastPrimitivesCoderImpl(StreamCoderImpl): return v elif t == BOOL_TYPE: return not not stream.read_byte() - else: - return self.fallback_coder_impl.decode_from_stream(stream, nested) + + return self.fallback_coder_impl.decode_from_stream(stream, nested) class BytesCoderImpl(CoderImpl): @@ -408,8 +407,7 @@ class VarIntCoderImpl(StreamCoderImpl): ivalue = value # type cast if 0 <= ivalue < len(small_ints): return small_ints[ivalue] - else: - return StreamCoderImpl.encode(self, value) + return StreamCoderImpl.encode(self, value) def decode(self, encoded): if len(encoded) == 1: @@ -596,20 +594,20 @@ class SequenceCoderImpl(StreamCoderImpl): estimated_size += 4 if isinstance(value, observable.ObservableMixin): return estimated_size, [(value, self._elem_coder)] - else: - observables = [] - for elem in value: - child_size, child_observables = ( - self._elem_coder.get_estimated_size_and_observables( - elem, nested=True)) - estimated_size += child_size - observables += child_observables - # TODO: (BEAM-1537) Update to use an accurate count depending on size and - # count, currently we are underestimating the size by up to 10 bytes - # per block of data since we are not including the count prefix which - # occurs at most once per 64k of data and is upto 10 bytes long. The upper - # bound of the underestimate is 10 / 65536 ~= 0.0153% of the actual size. - return estimated_size, observables + + observables = [] + for elem in value: + child_size, child_observables = ( + self._elem_coder.get_estimated_size_and_observables( + elem, nested=True)) + estimated_size += child_size + observables += child_observables + # TODO: (BEAM-1537) Update to use an accurate count depending on size and + # count, currently we are underestimating the size by up to 10 bytes + # per block of data since we are not including the count prefix which + # occurs at most once per 64k of data and is upto 10 bytes long. The upper + # bound of the underestimate is 10 / 65536 ~= 0.0153% of the actual size. + return estimated_size, observables class TupleSequenceCoderImpl(SequenceCoderImpl): http://git-wip-us.apache.org/repos/asf/beam/blob/d2a7d1e8/sdks/python/apache_beam/coders/coders.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 5955317..8ef0a46 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -129,8 +129,7 @@ class Coder(object): d = dict(self.__dict__) del d['_impl'] return d - else: - return self.__dict__ + return self.__dict__ @classmethod def from_type_hint(cls, unused_typehint, unused_registry): @@ -224,8 +223,7 @@ class ToStringCoder(Coder): return value.encode('utf-8') elif isinstance(value, str): return value - else: - return str(value) + return str(value) def decode(self, _): raise NotImplementedError('ToStringCoder cannot be used for decoding.') http://git-wip-us.apache.org/repos/asf/beam/blob/d2a7d1e8/sdks/python/apache_beam/examples/complete/game/user_score.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/game/user_score.py b/sdks/python/apache_beam/examples/complete/game/user_score.py index 27b60cb..b53d5e9 100644 --- a/sdks/python/apache_beam/examples/complete/game/user_score.py +++ b/sdks/python/apache_beam/examples/complete/game/user_score.py @@ -172,9 +172,6 @@ class WriteToBigQuery(beam.PTransform): class UserScore(beam.PTransform): - def __init__(self): - super(UserScore, self).__init__() - def expand(self, pcoll): return (pcoll | 'ParseGameEvent' >> beam.ParDo(ParseEventFn()) http://git-wip-us.apache.org/repos/asf/beam/blob/d2a7d1e8/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py index e6cab18..c24eb75 100644 --- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py +++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py @@ -78,10 +78,6 @@ class ComputeSessions(beam.PTransform): A session is defined as a string of edits where each is separated from the next by less than an hour. """ - - def __init__(self): - super(ComputeSessions, self).__init__() - def expand(self, pcoll): return (pcoll | 'ComputeSessionsWindow' >> beam.WindowInto( @@ -91,10 +87,6 @@ class ComputeSessions(beam.PTransform): class TopPerMonth(beam.PTransform): """Computes the longest session ending in each month.""" - - def __init__(self): - super(TopPerMonth, self).__init__() - def expand(self, pcoll): return (pcoll | 'TopPerMonthWindow' >> beam.WindowInto( http://git-wip-us.apache.org/repos/asf/beam/blob/d2a7d1e8/sdks/python/apache_beam/examples/snippets/snippets_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index 64f3dfb..c3984bb 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -469,8 +469,7 @@ class SnippetsTest(unittest.TestCase): if sorted_output: return sorted(s.rstrip('\n') for s in all_lines) - else: - return all_lines + return all_lines def test_model_pipelines(self): temp_path = self.create_temp_file('aa bb cc\n bb cc\n cc') http://git-wip-us.apache.org/repos/asf/beam/blob/d2a7d1e8/sdks/python/apache_beam/examples/wordcount_debugging.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py index 3d9cda4..9779b82 100644 --- a/sdks/python/apache_beam/examples/wordcount_debugging.py +++ b/sdks/python/apache_beam/examples/wordcount_debugging.py @@ -90,10 +90,6 @@ class CountWords(beam.PTransform): A PTransform that converts a PCollection containing lines of text into a PCollection of (word, count) tuples. """ - - def __init__(self): - super(CountWords, self).__init__() - def expand(self, pcoll): return (pcoll | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) http://git-wip-us.apache.org/repos/asf/beam/blob/d2a7d1e8/sdks/python/apache_beam/pvalue.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py index bfe1745..5709b38 100644 --- a/sdks/python/apache_beam/pvalue.py +++ b/sdks/python/apache_beam/pvalue.py @@ -96,10 +96,6 @@ class PCollection(PValue): pipelines. """ - def __init__(self, pipeline, **kwargs): - """Initializes a PCollection. Do not call directly.""" - super(PCollection, self).__init__(pipeline, **kwargs) - def __eq__(self, other): if isinstance(other, PCollection): return self.tag == other.tag and self.producer == other.producer @@ -312,20 +308,18 @@ class AsSingleton(AsSideInput): base = super(AsSingleton, self)._view_options() if self.default_value != AsSingleton._NO_DEFAULT: return dict(base, default=self.default_value) - else: - return base + return base @staticmethod def _from_runtime_iterable(it, options): head = list(itertools.islice(it, 2)) - if len(head) == 0: + if not head: return options.get('default', EmptySideInput()) elif len(head) == 1: return head[0] - else: - raise ValueError( - 'PCollection with more than one element accessed as ' - 'a singleton view.') + raise ValueError( + 'PCollection with more than one element accessed as ' + 'a singleton view.') @property def element_type(self): http://git-wip-us.apache.org/repos/asf/beam/blob/d2a7d1e8/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index cf313d1..b1a33ea 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -172,9 +172,8 @@ class DoFn(WithTypeHints, HasDisplayData): if hasattr(self, func_name): f = getattr(self, func_name) return f() - else: - f = getattr(self, func) - return inspect.getargspec(f) + f = getattr(self, func) + return inspect.getargspec(f) # TODO(sourabhbajaj): Do we want to remove the responsiblity of these from # the DoFn or maybe the runner @@ -191,8 +190,7 @@ class DoFn(WithTypeHints, HasDisplayData): if (type_hint in annotations or trivial_inference.element_type(type_hint) in annotations): return Any - else: - return type_hint + return type_hint def process_argspec_fn(self): """Returns the Python callable that will eventually be invoked. @@ -446,20 +444,19 @@ class CallableWrapperCombineFn(CombineFn): def add_input(self, accumulator, element, *args, **kwargs): if accumulator is self._EMPTY: return element - else: - return self._fn([accumulator, element], *args, **kwargs) + return self._fn([accumulator, element], *args, **kwargs) def add_inputs(self, accumulator, elements, *args, **kwargs): if accumulator is self._EMPTY: return self._fn(elements, *args, **kwargs) elif isinstance(elements, (list, tuple)): return self._fn([accumulator] + list(elements), *args, **kwargs) - else: - def union(): - yield accumulator - for e in elements: - yield e - return self._fn(union(), *args, **kwargs) + + def union(): + yield accumulator + for e in elements: + yield e + return self._fn(union(), *args, **kwargs) def merge_accumulators(self, accumulators, *args, **kwargs): # It's (weakly) assumed that self._fn is associative. @@ -859,8 +856,7 @@ class CombineGlobally(PTransform): type_hints = self.get_type_hints() if type_hints.input_types: return transform.with_input_types(type_hints.input_types[0][0]) - else: - return transform + return transform combined = (pcoll | 'KeyWithVoid' >> add_input_types( @@ -897,8 +893,7 @@ class CombineGlobally(PTransform): # TODO(robertwb): We should infer this. if combined.element_type: return transform.with_output_types(combined.element_type) - else: - return transform + return transform return (pcoll.pipeline | 'DoOnce' >> Create([None]) | 'InjectDefault' >> typed(Map(lambda _, s: s, view))) @@ -987,24 +982,24 @@ class CombineValuesDoFn(DoFn): return [ (element[0], self.combinefn.apply(element[1], *args, **kwargs))] - else: - # Add the elements into three accumulators (for testing of merge). - elements = element[1] - accumulators = [] - for k in range(3): - if len(elements) <= k: - break - accumulators.append( - self.combinefn.add_inputs( - self.combinefn.create_accumulator(*args, **kwargs), - elements[k::3], - *args, **kwargs)) - # Merge the accumulators. - accumulator = self.combinefn.merge_accumulators( - accumulators, *args, **kwargs) - # Convert accumulator to the final result. - return [(element[0], - self.combinefn.extract_output(accumulator, *args, **kwargs))] + + # Add the elements into three accumulators (for testing of merge). + elements = element[1] + accumulators = [] + for k in range(3): + if len(elements) <= k: + break + accumulators.append( + self.combinefn.add_inputs( + self.combinefn.create_accumulator(*args, **kwargs), + elements[k::3], + *args, **kwargs)) + # Merge the accumulators. + accumulator = self.combinefn.merge_accumulators( + accumulators, *args, **kwargs) + # Convert accumulator to the final result. + return [(element[0], + self.combinefn.extract_output(accumulator, *args, **kwargs))] def default_type_hints(self): hints = self.combinefn.get_type_hints().copy() @@ -1112,22 +1107,18 @@ class GroupByKey(PTransform): self.GroupAlsoByWindow(pcoll.windowing)) .with_input_types(gbk_input_type) .with_output_types(gbk_output_type))) - else: - return (pcoll - | 'reify_windows' >> ParDo(self.ReifyWindows()) - | 'group_by_key' >> GroupByKeyOnly() - | 'group_by_window' >> ParDo( - self.GroupAlsoByWindow(pcoll.windowing))) + # If the input_type is None, run the default + return (pcoll + | 'reify_windows' >> ParDo(self.ReifyWindows()) + | 'group_by_key' >> GroupByKeyOnly() + | 'group_by_window' >> ParDo( + self.GroupAlsoByWindow(pcoll.windowing))) @typehints.with_input_types(typehints.KV[K, V]) @typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]]) class GroupByKeyOnly(PTransform): """A group by key transform, ignoring windows.""" - - def __init__(self): - super(GroupByKeyOnly, self).__init__() - def infer_output_type(self, input_type): key_type, value_type = trivial_inference.key_value_types(input_type) return KV[key_type, Iterable[value_type]] @@ -1212,12 +1203,12 @@ class Windowing(object): if type(self) == type(other): if self._is_default and other._is_default: return True - else: - return ( - self.windowfn == other.windowfn - and self.triggerfn == other.triggerfn - and self.accumulation_mode == other.accumulation_mode - and self.output_time_fn == other.output_time_fn) + return ( + self.windowfn == other.windowfn + and self.triggerfn == other.triggerfn + and self.accumulation_mode == other.accumulation_mode + and self.output_time_fn == other.output_time_fn) + return False def is_default(self): return self._is_default @@ -1342,8 +1333,7 @@ class Flatten(PTransform): if not inputs: # TODO(robertwb): Return something compatible with every windowing? return Windowing(GlobalWindows()) - else: - return super(Flatten, self).get_windowing(inputs) + return super(Flatten, self).get_windowing(inputs) class Create(PTransform): @@ -1366,8 +1356,7 @@ class Create(PTransform): def infer_output_type(self, unused_input_type): if not self.value: return Any - else: - return Union[[trivial_inference.instance_to_type(v) for v in self.value]] + return Union[[trivial_inference.instance_to_type(v) for v in self.value]] def expand(self, pbegin): assert isinstance(pbegin, pvalue.PBegin) @@ -1404,8 +1393,7 @@ class Create(PTransform): def split_points_unclaimed(stop_position): if current_position >= stop_position: return 0 - else: - return stop_position - current_position - 1 + return stop_position - current_position - 1 range_tracker.set_split_points_unclaimed_callback( split_points_unclaimed) http://git-wip-us.apache.org/repos/asf/beam/blob/d2a7d1e8/sdks/python/apache_beam/transforms/display.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/display.py b/sdks/python/apache_beam/transforms/display.py index 2ced1af..e4dddbf 100644 --- a/sdks/python/apache_beam/transforms/display.py +++ b/sdks/python/apache_beam/transforms/display.py @@ -305,8 +305,7 @@ class DisplayDataItem(object): """ if type_ == 'CLASS': return value.__name__ - else: - return None + return None @classmethod def _get_value_type(cls, value): http://git-wip-us.apache.org/repos/asf/beam/blob/d2a7d1e8/sdks/python/apache_beam/transforms/ptransform.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 93d751d..0ac8b5b 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -83,8 +83,7 @@ class _SetInputPValues(_PValueishTransform): def visit(self, node, replacements): if id(node) in replacements: return replacements[id(node)] - else: - return super(_SetInputPValues, self).visit(node, replacements) + return super(_SetInputPValues, self).visit(node, replacements) class _MaterializedDoOutputsTuple(pvalue.DoOutputsTuple): @@ -107,8 +106,7 @@ class _MaterializePValues(_PValueishTransform): return self._pvalue_cache.get_unwindowed_pvalue(node) elif isinstance(node, pvalue.DoOutputsTuple): return _MaterializedDoOutputsTuple(node, self._pvalue_cache) - else: - return super(_MaterializePValues, self).visit(node) + return super(_MaterializePValues, self).visit(node) class GetPValues(_PValueishTransform): @@ -340,8 +338,7 @@ class PTransform(WithTypeHints, HasDisplayData): """Used to compose PTransforms, e.g., ptransform1 | ptransform2.""" if isinstance(right, PTransform): return ChainedPTransform(self, right) - else: - return NotImplemented + return NotImplemented def __ror__(self, left, label=None): """Used to apply this PTransform to non-PValues, e.g., a tuple.""" @@ -380,12 +377,11 @@ class PTransform(WithTypeHints, HasDisplayData): result = p.apply(self, pvalueish, label) if deferred: return result - else: - # Get a reference to the runners internal cache, otherwise runner may - # clean it after run. - cache = p.runner.cache - p.run().wait_until_finish() - return _MaterializePValues(cache).visit(result) + # Get a reference to the runners internal cache, otherwise runner may + # clean it after run. + cache = p.runner.cache + p.run().wait_until_finish() + return _MaterializePValues(cache).visit(result) def _extract_input_pvalues(self, pvalueish): """Extract all the pvalues contained in the input pvalueish. @@ -431,8 +427,7 @@ class ChainedPTransform(PTransform): # Create a flat list rather than a nested tree of composite # transforms for better monitoring, etc. return ChainedPTransform(*(self._parts + (right,))) - else: - return NotImplemented + return NotImplemented def expand(self, pval): return reduce(operator.or_, self._parts, pval) @@ -521,8 +516,8 @@ class PTransformWithSideInputs(PTransform): def element_type(side_input): if isinstance(side_input, pvalue.AsSideInput): return side_input.element_type - else: - return instance_to_type(side_input) + return instance_to_type(side_input) + arg_types = [pvalueish.element_type] + [element_type(v) for v in args] kwargs_types = {k: element_type(v) for (k, v) in kwargs.items()} argspec_fn = self.process_argspec_fn() @@ -598,8 +593,7 @@ class CallablePTransform(PTransform): if self._args: return '%s(%s)' % ( label_from_callable(self.fn), label_from_callable(self._args[0])) - else: - return label_from_callable(self.fn) + return label_from_callable(self.fn) def ptransform_fn(fn): @@ -652,10 +646,8 @@ def label_from_callable(fn): return '<lambda at %s:%s>' % ( os.path.basename(fn.func_code.co_filename), fn.func_code.co_firstlineno) - else: - return fn.__name__ - else: - return str(fn) + return fn.__name__ + return str(fn) class _NamedPTransform(PTransform): http://git-wip-us.apache.org/repos/asf/beam/blob/d2a7d1e8/sdks/python/apache_beam/transforms/ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index cb1dd77..4da5443 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -194,8 +194,7 @@ class PTransformTest(unittest.TestCase): def some_fn(v): if v % 2 == 0: return [v, pvalue.SideOutputValue('even', v)] - else: - return [v, pvalue.SideOutputValue('odd', v)] + return [v, pvalue.SideOutputValue('odd', v)] pipeline = TestPipeline() nums = pipeline | 'Some Numbers' >> beam.Create([1, 2, 3, 4]) http://git-wip-us.apache.org/repos/asf/beam/blob/d2a7d1e8/sdks/python/apache_beam/transforms/sideinputs.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/sideinputs.py b/sdks/python/apache_beam/transforms/sideinputs.py index 1de7bac..6ba5311 100644 --- a/sdks/python/apache_beam/transforms/sideinputs.py +++ b/sdks/python/apache_beam/transforms/sideinputs.py @@ -35,11 +35,12 @@ def _global_window_mapping_fn(w, global_window=window.GlobalWindow()): def default_window_mapping_fn(target_window_fn): if target_window_fn == window.GlobalWindows(): return _global_window_mapping_fn - else: - def map_via_end(source_window): - return list(target_window_fn.assign( - window.WindowFn.AssignContext(source_window.max_timestamp())))[-1] - return map_via_end + + def map_via_end(source_window): + return list(target_window_fn.assign( + window.WindowFn.AssignContext(source_window.max_timestamp())))[-1] + + return map_via_end class SideInputMap(object): http://git-wip-us.apache.org/repos/asf/beam/blob/d2a7d1e8/sdks/python/apache_beam/transforms/trigger.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py index 5976af4..a527c82 100644 --- a/sdks/python/apache_beam/transforms/trigger.py +++ b/sdks/python/apache_beam/transforms/trigger.py @@ -91,10 +91,6 @@ class CombiningValueStateTag(StateTag): class ListStateTag(StateTag): """StateTag pointing to a list of elements.""" - - def __init__(self, tag): - super(ListStateTag, self).__init__(tag) - def __repr__(self): return 'ListStateTag(%s)' % self.tag @@ -304,8 +300,7 @@ class AfterWatermark(TriggerFn): elif self.early: return self.early.should_fire( watermark, window, NestedContext(context, 'early')) - else: - return False + return False def on_fire(self, watermark, window, context): if self.is_late(context): @@ -493,8 +488,7 @@ class ParallelTriggerFn(TriggerFn): in proto.after_all.subtriggers or proto.after_any.subtriggers] if proto.after_all.subtriggers: return AfterAll(*subtriggers) - else: - return AfterFirst(*subtriggers) + return AfterFirst(*subtriggers) def to_runner_api(self, context): subtriggers = [ @@ -596,10 +590,6 @@ class AfterEach(TriggerFn): class OrFinally(AfterFirst): - - def __init__(self, body_trigger, exit_trigger): - super(OrFinally, self).__init__(body_trigger, exit_trigger) - @staticmethod def from_runner_api(proto, context): return OrFinally( @@ -792,11 +782,11 @@ class MergeableStateAdapter(SimpleState): def _get_id(self, window): if window in self.window_ids: return self.window_ids[window][0] - else: - window_id = self._get_next_counter() - self.window_ids[window] = [window_id] - self._persist_window_ids() - return window_id + + window_id = self._get_next_counter() + self.window_ids[window] = [window_id] + self._persist_window_ids() + return window_id def _get_ids(self, window): return self.window_ids.get(window, []) http://git-wip-us.apache.org/repos/asf/beam/blob/d2a7d1e8/sdks/python/apache_beam/transforms/trigger_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py index 827aa33..9f2046a 100644 --- a/sdks/python/apache_beam/transforms/trigger_test.py +++ b/sdks/python/apache_beam/transforms/trigger_test.py @@ -456,8 +456,7 @@ class TranscriptTest(unittest.TestCase): assert s[0] == '[' and s[-1] == ']', s if not s[1:-1].strip(): return [] - else: - return [int(x) for x in s[1:-1].split(',')] + return [int(x) for x in s[1:-1].split(',')] def split_args(s): """Splits 'a, b, [c, d]' into ['a', 'b', '[c, d]'].""" @@ -507,8 +506,7 @@ class TranscriptTest(unittest.TestCase): fn = parse(s, names) if isinstance(fn, type): return fn() - else: - return fn + return fn # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.transforms import window as window_module http://git-wip-us.apache.org/repos/asf/beam/blob/d2a7d1e8/sdks/python/apache_beam/typehints/decorators.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/typehints/decorators.py b/sdks/python/apache_beam/typehints/decorators.py index 9ac9168..d8f0b1b 100644 --- a/sdks/python/apache_beam/typehints/decorators.py +++ b/sdks/python/apache_beam/typehints/decorators.py @@ -162,9 +162,8 @@ class IOTypeHints(object): return self elif not self: return hints - else: - return IOTypeHints(self.input_types or hints.input_types, - self.output_types or hints.output_types) + return IOTypeHints(self.input_types or hints.input_types, + self.output_types or hints.output_types) def __nonzero__(self): return bool(self.input_types or self.output_types) @@ -220,8 +219,7 @@ def _positional_arg_hints(arg, hints): """ if isinstance(arg, list): return typehints.Tuple[[_positional_arg_hints(a, hints) for a in arg]] - else: - return hints.get(arg, typehints.Any) + return hints.get(arg, typehints.Any) def _unpack_positional_arg_hints(arg, hint): @@ -241,8 +239,7 @@ def _unpack_positional_arg_hints(arg, hint): for a, t in zip(arg, hint.tuple_types)) else: return (typehints.Any,) * len(arg) - else: - return hint + return hint def getcallargs_forhints(func, *typeargs, **typekwargs): @@ -483,11 +480,10 @@ def _interleave_type_check(type_constraint, var_name=None): def wrapper(gen): if isinstance(gen, GeneratorWrapper): return gen - else: - return GeneratorWrapper( - gen, - lambda x: _check_instance_type(type_constraint, x, var_name) - ) + return GeneratorWrapper( + gen, + lambda x: _check_instance_type(type_constraint, x, var_name) + ) return wrapper @@ -517,8 +513,7 @@ class GeneratorWrapper(object): return self.__next__() elif attr == '__iter__': return self.__iter__() - else: - return getattr(self.internal_gen, attr) + return getattr(self.internal_gen, attr) def next(self): next_val = next(self.internal_gen) http://git-wip-us.apache.org/repos/asf/beam/blob/d2a7d1e8/sdks/python/apache_beam/typehints/trivial_inference.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/typehints/trivial_inference.py b/sdks/python/apache_beam/typehints/trivial_inference.py index b611103..4581aa1 100644 --- a/sdks/python/apache_beam/typehints/trivial_inference.py +++ b/sdks/python/apache_beam/typehints/trivial_inference.py @@ -45,8 +45,7 @@ def instance_to_type(o): return o.__class__ elif t == BoundMethod: return types.MethodType - else: - return t + return t elif t == tuple: return typehints.Tuple[[instance_to_type(item) for item in o]] elif t == list: @@ -90,8 +89,7 @@ class Const(object): def unwrap(x): if isinstance(x, Const): return x.type - else: - return x + return x @staticmethod def unwrap_all(xs): @@ -121,8 +119,7 @@ class FrameState(object): ncellvars = len(self.co.co_cellvars) if i < ncellvars: return Any - else: - return Const(self.f.func_closure[i - ncellvars].cell_contents) + return Const(self.f.func_closure[i - ncellvars].cell_contents) def get_global(self, i): name = self.get_name(i) @@ -130,8 +127,7 @@ class FrameState(object): return Const(self.f.func_globals[name]) if name in __builtin__.__dict__: return Const(__builtin__.__dict__[name]) - else: - return Any + return Any def get_name(self, i): return self.co.co_names[i] @@ -144,9 +140,8 @@ class FrameState(object): return other.copy() elif other is None: return self.copy() - else: - return FrameState(self.f, union_list(self.vars, other.vars), union_list( - self.stack, other.stack)) + return FrameState(self.f, union_list(self.vars, other.vars), union_list( + self.stack, other.stack)) def __ror__(self, left): return self | left @@ -168,8 +163,7 @@ def union(a, b): return b elif type(a) == type(b) and element_type(b) == typehints.Union[()]: return a - else: - return typehints.Union[a, b] + return typehints.Union[a, b] def element_type(hint): @@ -180,8 +174,7 @@ def element_type(hint): return hint.inner_type elif isinstance(hint, typehints.TupleHint.TupleConstraint): return typehints.Union[hint.tuple_types] - else: - return Any + return Any def key_value_types(kv_type): @@ -248,8 +241,7 @@ def infer_return_type(c, input_types, debug=False, depth=5): tuple: typehints.Tuple[Any, ...], dict: typehints.Dict[Any, Any] }[c] - else: - return c + return c else: return Any except TypeInferenceError: http://git-wip-us.apache.org/repos/asf/beam/blob/d2a7d1e8/sdks/python/apache_beam/typehints/trivial_inference_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/typehints/trivial_inference_test.py b/sdks/python/apache_beam/typehints/trivial_inference_test.py index a94acf5..ac00baa 100644 --- a/sdks/python/apache_beam/typehints/trivial_inference_test.py +++ b/sdks/python/apache_beam/typehints/trivial_inference_test.py @@ -130,8 +130,7 @@ class TrivialInferenceTest(unittest.TestCase): def some_fn(v): if v: return 1 - else: - return 2 + return 2 self.assertReturnType(int, some_fn) http://git-wip-us.apache.org/repos/asf/beam/blob/d2a7d1e8/sdks/python/apache_beam/typehints/typecheck.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/typehints/typecheck.py b/sdks/python/apache_beam/typehints/typecheck.py index defa71e..2e7176e 100644 --- a/sdks/python/apache_beam/typehints/typecheck.py +++ b/sdks/python/apache_beam/typehints/typecheck.py @@ -144,10 +144,9 @@ class TypeCheckWrapperDoFn(AbstractDoFnWrapper): # generator initially just by type-checking its yielded contents. if isinstance(transform_results, types.GeneratorType): return GeneratorWrapper(transform_results, type_check_output) - else: - for o in transform_results: - type_check_output(o) - return transform_results + for o in transform_results: + type_check_output(o) + return transform_results def _type_check(self, type_constraint, datum, is_input): """Typecheck a PTransform related datum according to a type constraint. http://git-wip-us.apache.org/repos/asf/beam/blob/d2a7d1e8/sdks/python/apache_beam/typehints/typehints.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/typehints/typehints.py b/sdks/python/apache_beam/typehints/typehints.py index 20f842a..1557d85 100644 --- a/sdks/python/apache_beam/typehints/typehints.py +++ b/sdks/python/apache_beam/typehints/typehints.py @@ -156,15 +156,13 @@ class TypeConstraint(object): def match_type_variables(type_constraint, concrete_type): if isinstance(type_constraint, TypeConstraint): return type_constraint.match_type_variables(concrete_type) - else: - return {} + return {} def bind_type_variables(type_constraint, bindings): if isinstance(type_constraint, TypeConstraint): return type_constraint.bind_type_variables(bindings) - else: - return type_constraint + return type_constraint class SequenceTypeConstraint(TypeConstraint): @@ -230,17 +228,15 @@ class SequenceTypeConstraint(TypeConstraint): def match_type_variables(self, concrete_type): if isinstance(concrete_type, SequenceTypeConstraint): return match_type_variables(self.inner_type, concrete_type.inner_type) - else: - return {} + return {} def bind_type_variables(self, bindings): bound_inner_type = bind_type_variables(self.inner_type, bindings) if bound_inner_type == self.inner_type: return self - else: - bound_self = copy.copy(self) - bound_self.inner_type = bound_inner_type - return bound_self + bound_self = copy.copy(self) + bound_self.inner_type = bound_inner_type + return bound_self class CompositeTypeHint(object): @@ -433,11 +429,10 @@ class UnionHint(CompositeTypeHint): # E.g. Union[A, B, C] > Union[A, B]. return all(is_consistent_with(elem, self) for elem in sub.union_types) - else: - # Other must be compatible with at least one of this union's subtypes. - # E.g. Union[A, B, C] > T if T > A or T > B or T > C. - return any(is_consistent_with(sub, elem) - for elem in self.union_types) + # Other must be compatible with at least one of this union's subtypes. + # E.g. Union[A, B, C] > T if T > A or T > B or T > C. + return any(is_consistent_with(sub, elem) + for elem in self.union_types) def type_check(self, instance): error_msg = '' @@ -476,8 +471,7 @@ class UnionHint(CompositeTypeHint): return Any elif len(params) == 1: return iter(params).next() - else: - return self.UnionConstraint(params) + return self.UnionConstraint(params) UnionConstraint = UnionHint.UnionConstraint @@ -529,8 +523,7 @@ class TupleHint(CompositeTypeHint): # E.g. Tuple[A, B] < Tuple[C, ...] iff A < C and B < C. return all(is_consistent_with(elem, self.inner_type) for elem in sub.tuple_types) - else: - return super(TupleSequenceConstraint, self)._consistent_with_check_(sub) + return super(TupleSequenceConstraint, self)._consistent_with_check_(sub) class TupleConstraint(TypeConstraint): @@ -603,8 +596,7 @@ class TupleHint(CompositeTypeHint): bind_type_variables(t, bindings) for t in self.tuple_types) if bound_tuple_types == self.tuple_types: return self - else: - return Tuple[bound_tuple_types] + return Tuple[bound_tuple_types] def __getitem__(self, type_params): ellipsis = False @@ -630,8 +622,7 @@ class TupleHint(CompositeTypeHint): if ellipsis: return self.TupleSequenceConstraint(type_params[0]) - else: - return self.TupleConstraint(type_params) + return self.TupleConstraint(type_params) TupleConstraint = TupleHint.TupleConstraint @@ -787,16 +778,14 @@ class DictHint(CompositeTypeHint): bindings.update( match_type_variables(self.value_type, concrete_type.value_type)) return bindings - else: - return {} + return {} def bind_type_variables(self, bindings): bound_key_type = bind_type_variables(self.key_type, bindings) bound_value_type = bind_type_variables(self.value_type, bindings) if (bound_key_type, self.key_type) == (bound_value_type, self.value_type): return self - else: - return Dict[bound_key_type, bound_value_type] + return Dict[bound_key_type, bound_value_type] def __getitem__(self, type_params): # Type param must be a (k, v) pair. @@ -879,14 +868,12 @@ class IterableHint(CompositeTypeHint): if not sub.tuple_types: # The empty tuple is consistent with Iterator[T] for any T. return True - else: - # Each element in the hetrogenious tuple must be consistent with - # the iterator type. - # E.g. Tuple[A, B] < Iterable[C] if A < C and B < C. - return all(is_consistent_with(elem, self.inner_type) - for elem in sub.tuple_types) - else: - return False + # Each element in the hetrogenious tuple must be consistent with + # the iterator type. + # E.g. Tuple[A, B] < Iterable[C] if A < C and B < C. + return all(is_consistent_with(elem, self.inner_type) + for elem in sub.tuple_types) + return False def __getitem__(self, type_param): validate_composite_type_param( @@ -1030,8 +1017,7 @@ _KNOWN_PRIMITIVE_TYPES = { def normalize(x): if x in _KNOWN_PRIMITIVE_TYPES: return _KNOWN_PRIMITIVE_TYPES[x] - else: - return x + return x def is_consistent_with(sub, base): @@ -1058,5 +1044,4 @@ def is_consistent_with(sub, base): elif isinstance(sub, TypeConstraint): # Nothing but object lives above any type constraints. return base == object - else: - return issubclass(sub, base) + return issubclass(sub, base) http://git-wip-us.apache.org/repos/asf/beam/blob/d2a7d1e8/sdks/python/apache_beam/typehints/typehints_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/typehints/typehints_test.py b/sdks/python/apache_beam/typehints/typehints_test.py index 8df844c..4e82fbc 100644 --- a/sdks/python/apache_beam/typehints/typehints_test.py +++ b/sdks/python/apache_beam/typehints/typehints_test.py @@ -41,9 +41,8 @@ def check_or_interleave(hint, value, var): return value elif isinstance(hint, typehints.IteratorHint.IteratorTypeConstraint): return _interleave_type_check(hint, var)(value) - else: - _check_instance_type(hint, value, var) - return value + _check_instance_type(hint, value, var) + return value def check_type_hints(f): http://git-wip-us.apache.org/repos/asf/beam/blob/d2a7d1e8/sdks/python/apache_beam/utils/annotations_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/annotations_test.py b/sdks/python/apache_beam/utils/annotations_test.py index 90f3b41..64a24ee 100644 --- a/sdks/python/apache_beam/utils/annotations_test.py +++ b/sdks/python/apache_beam/utils/annotations_test.py @@ -58,7 +58,7 @@ class AnnotationTests(unittest.TestCase): def fnc_test_deprecated_without_since_should_fail(): return 'lol' fnc_test_deprecated_without_since_should_fail() - assert len(w) == 0 + assert not w def test_experimental_with_current(self): with warnings.catch_warnings(record=True) as w: http://git-wip-us.apache.org/repos/asf/beam/blob/d2a7d1e8/sdks/python/apache_beam/utils/path.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/path.py b/sdks/python/apache_beam/utils/path.py index 6b3b978..86dc4db 100644 --- a/sdks/python/apache_beam/utils/path.py +++ b/sdks/python/apache_beam/utils/path.py @@ -43,5 +43,4 @@ def join(path, *paths): # posixpath.join('gs://bucket/path', '/to/file') return '/to/file' instead # of the slightly less surprising result 'gs://bucket/path//to/file'. return '/'.join((path,) + paths) - else: - return os.path.join(path, *paths) + return os.path.join(path, *paths) http://git-wip-us.apache.org/repos/asf/beam/blob/d2a7d1e8/sdks/python/apache_beam/utils/proto_utils.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/proto_utils.py b/sdks/python/apache_beam/utils/proto_utils.py index b4bfdca..0243495 100644 --- a/sdks/python/apache_beam/utils/proto_utils.py +++ b/sdks/python/apache_beam/utils/proto_utils.py @@ -26,10 +26,10 @@ def pack_Any(msg): """ if msg is None: return None - else: - result = any_pb2.Any() - result.Pack(msg) - return result + + result = any_pb2.Any() + result.Pack(msg) + return result def unpack_Any(any_msg, msg_class): @@ -39,10 +39,9 @@ def unpack_Any(any_msg, msg_class): """ if msg_class is None: return None - else: - msg = msg_class() - any_msg.Unpack(msg) - return msg + msg = msg_class() + any_msg.Unpack(msg) + return msg def pack_Struct(**kwargs): http://git-wip-us.apache.org/repos/asf/beam/blob/d2a7d1e8/sdks/python/apache_beam/utils/retry.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/retry.py b/sdks/python/apache_beam/utils/retry.py index 8f7152a..4b137e2 100644 --- a/sdks/python/apache_beam/utils/retry.py +++ b/sdks/python/apache_beam/utils/retry.py @@ -86,16 +86,8 @@ class FuzzedExponentialIntervals(object): def retry_on_server_errors_filter(exception): """Filter allowing retries on server errors and non-HttpErrors.""" if (HttpError is not None) and isinstance(exception, HttpError): - if exception.status_code >= 500: - return True - else: - return False - elif isinstance(exception, PermanentException): - return False - else: - # We may get here for non HttpErrors such as socket timeouts, SSL - # exceptions, etc. - return True + return exception.status_code >= 500 + return not isinstance(exception, PermanentException) def retry_on_server_errors_and_timeout_filter(exception): http://git-wip-us.apache.org/repos/asf/beam/blob/d2a7d1e8/sdks/python/apache_beam/utils/timestamp.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/timestamp.py b/sdks/python/apache_beam/utils/timestamp.py index cfabb77..647f4bd 100644 --- a/sdks/python/apache_beam/utils/timestamp.py +++ b/sdks/python/apache_beam/utils/timestamp.py @@ -69,8 +69,7 @@ class Timestamp(object): frac_part = micros % 1000000 if frac_part: return 'Timestamp(%s%d.%06d)' % (sign, int_part, frac_part) - else: - return 'Timestamp(%s%d)' % (sign, int_part) + return 'Timestamp(%s%d)' % (sign, int_part) def to_utc_datetime(self): epoch = datetime.datetime.utcfromtimestamp(0) @@ -162,8 +161,7 @@ class Duration(object): frac_part = micros % 1000000 if frac_part: return 'Duration(%s%d.%06d)' % (sign, int_part, frac_part) - else: - return 'Duration(%s%d)' % (sign, int_part) + return 'Duration(%s%d)' % (sign, int_part) def __float__(self): # Note that the returned value may have lost precision. http://git-wip-us.apache.org/repos/asf/beam/blob/d2a7d1e8/sdks/python/apache_beam/utils/windowed_value.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/windowed_value.py b/sdks/python/apache_beam/utils/windowed_value.py index 35cc52f..87c26d1 100644 --- a/sdks/python/apache_beam/utils/windowed_value.py +++ b/sdks/python/apache_beam/utils/windowed_value.py @@ -79,19 +79,16 @@ class WindowedValue(object): """ if type(left) is not type(right): return cmp(type(left), type(right)) - else: - # TODO(robertwb): Avoid the type checks? - # Returns False (0) if equal, and True (1) if not. - return not WindowedValue._typed_eq(left, right) + + # TODO(robertwb): Avoid the type checks? + # Returns False (0) if equal, and True (1) if not. + return not WindowedValue._typed_eq(left, right) @staticmethod def _typed_eq(left, right): - if (left.timestamp_micros == right.timestamp_micros - and left.value == right.value - and left.windows == right.windows): - return True - else: - return False + return (left.timestamp_micros == right.timestamp_micros + and left.value == right.value + and left.windows == right.windows) def with_value(self, new_value): """Creates a new WindowedValue with the same timestamps and windows as this. http://git-wip-us.apache.org/repos/asf/beam/blob/d2a7d1e8/sdks/python/run_pylint.sh ---------------------------------------------------------------------- diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh index 5e63856..80cbe6e 100755 --- a/sdks/python/run_pylint.sh +++ b/sdks/python/run_pylint.sh @@ -42,7 +42,7 @@ for file in "${EXCLUDED_GENERATED_FILES[@]}"; do if [[ $FILES_TO_IGNORE ]]; then FILES_TO_IGNORE="$FILES_TO_IGNORE, " fi - FILES_TO_IGNORE="$FILES_TO_IGNORE$(basename $file)" + FILES_TO_IGNORE="$FILES_TO_IGNORE$(basename $file)" done echo "Skipping lint for generated files: $FILES_TO_IGNORE"
