Repository: beam Updated Branches: refs/heads/master cf9ac454d -> 89ff0b145
[BEAM-1964] Fix lint issues for linter upgrade -3 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bd79f4d8 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bd79f4d8 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bd79f4d8 Branch: refs/heads/master Commit: bd79f4d8bacba116a4c7f188cad0cdbf507d36d8 Parents: bf474a0 Author: Sourabh Bajaj <[email protected]> Authored: Fri Apr 14 11:22:25 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Fri Apr 14 13:06:14 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/concat_source.py | 74 +++++++++----------- .../apache_beam/io/filebasedsource_test.py | 2 +- sdks/python/apache_beam/io/fileio.py | 6 +- sdks/python/apache_beam/io/filesystems_util.py | 3 +- sdks/python/apache_beam/io/gcp/bigquery.py | 13 ++-- sdks/python/apache_beam/io/iobase.py | 7 +- sdks/python/apache_beam/io/localfilesystem.py | 3 +- sdks/python/apache_beam/io/range_trackers.py | 19 +++-- sdks/python/apache_beam/io/source_test_utils.py | 7 +- sdks/python/apache_beam/io/textio.py | 13 ++-- sdks/python/apache_beam/transforms/combiners.py | 29 ++++---- 11 files changed, 81 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/concat_source.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/concat_source.py b/sdks/python/apache_beam/io/concat_source.py index 1656180..de51f0f 100644 --- a/sdks/python/apache_beam/io/concat_source.py +++ b/sdks/python/apache_beam/io/concat_source.py @@ -84,8 +84,7 @@ class ConcatSource(iobase.BoundedSource): # Getting coder from the first sub-sources. This assumes all sub-sources # to produce the same coder. return self._source_bundles[0].source.default_output_coder() - else: - return super(ConcatSource, self).default_output_coder() + return super(ConcatSource, self).default_output_coder() class ConcatRangeTracker(iobase.RangeTracker): @@ -165,13 +164,12 @@ class ConcatRangeTracker(iobase.RangeTracker): return False elif source_ix == self._end[0] and self._end[1] is None: return False - else: - assert source_ix >= self._claimed_source_ix - self._claimed_source_ix = source_ix - if source_pos is None: - return True - else: - return self.sub_range_tracker(source_ix).try_claim(source_pos) + + assert source_ix >= self._claimed_source_ix + self._claimed_source_ix = source_ix + if source_pos is None: + return True + return self.sub_range_tracker(source_ix).try_claim(source_pos) def try_split(self, pos): source_ix, source_pos = pos @@ -185,24 +183,24 @@ class ConcatRangeTracker(iobase.RangeTracker): elif source_ix == self._end[0] and self._end[1] is None: # At/after end. return None + + if source_ix > self._claimed_source_ix: + # Prefer to split on even boundary. + split_pos = None + ratio = self._cumulative_weights[source_ix] else: - if source_ix > self._claimed_source_ix: - # Prefer to split on even boundary. - split_pos = None - ratio = self._cumulative_weights[source_ix] - else: - # Split the current subsource. - split = self.sub_range_tracker(source_ix).try_split( - source_pos) - if not split: - return None - split_pos, frac = split - ratio = self.local_to_global(source_ix, frac) - - self._end = source_ix, split_pos - self._cumulative_weights = [min(w / ratio, 1) - for w in self._cumulative_weights] - return (source_ix, split_pos), ratio + # Split the current subsource. + split = self.sub_range_tracker(source_ix).try_split( + source_pos) + if not split: + return None + split_pos, frac = split + ratio = self.local_to_global(source_ix, frac) + + self._end = source_ix, split_pos + self._cumulative_weights = [min(w / ratio, 1) + for w in self._cumulative_weights] + return (source_ix, split_pos), ratio def set_current_position(self, pos): raise NotImplementedError('Should only be called on sub-trackers') @@ -212,10 +210,9 @@ class ConcatRangeTracker(iobase.RangeTracker): last = self._end[0] if self._end[1] is None else self._end[0] + 1 if source_ix == last: return (source_ix, None) - else: - return (source_ix, - self.sub_range_tracker(source_ix).position_at_fraction( - source_frac)) + return (source_ix, + self.sub_range_tracker(source_ix).position_at_fraction( + source_frac)) def fraction_consumed(self): with self._lock: @@ -234,15 +231,14 @@ class ConcatRangeTracker(iobase.RangeTracker): if frac == 1: last = self._end[0] if self._end[1] is None else self._end[0] + 1 return (last, None) - else: - cw = self._cumulative_weights - # Find the last source that starts at or before frac. - source_ix = bisect.bisect(cw, frac) - 1 - # Return this source, converting what's left of frac after starting - # this source into a value in [0.0, 1.0) representing how far we are - # towards the next source. - return (source_ix, - (frac - cw[source_ix]) / (cw[source_ix + 1] - cw[source_ix])) + cw = self._cumulative_weights + # Find the last source that starts at or before frac. + source_ix = bisect.bisect(cw, frac) - 1 + # Return this source, converting what's left of frac after starting + # this source into a value in [0.0, 1.0) representing how far we are + # towards the next source. + return (source_ix, + (frac - cw[source_ix]) / (cw[source_ix + 1] - cw[source_ix])) def sub_range_tracker(self, source_ix): assert self._start[0] <= source_ix <= self._end[0] http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/filebasedsource_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py index 7b7ec8a..24a31b1 100644 --- a/sdks/python/apache_beam/io/filebasedsource_test.py +++ b/sdks/python/apache_beam/io/filebasedsource_test.py @@ -115,10 +115,10 @@ def _write_prepared_data(data, directory=None, def write_prepared_pattern(data, suffixes=None): + assert data, 'Data (%s) seems to be empty' % data if suffixes is None: suffixes = [''] * len(data) temp_dir = tempfile.mkdtemp() - assert len(data) > 0 for i, d in enumerate(data): file_name = _write_prepared_data(d, temp_dir, prefix='mytemp', suffix=suffixes[i]) http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index b128dc5..dc8957e 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -65,7 +65,7 @@ class ChannelFactory(object): def rename_batch(src_dest_pairs): sources = [s for s, _ in src_dest_pairs] destinations = [d for _, d in src_dest_pairs] - if len(sources) == 0: + if not sources: return [] bfs = get_filesystem(sources[0]) try: @@ -165,7 +165,7 @@ class FileSink(iobase.Sink): if shard_name_template is None: shard_name_template = DEFAULT_SHARD_NAME_TEMPLATE - elif shard_name_template is '': + elif shard_name_template == '': num_shards = 1 self.file_path_prefix = file_path_prefix self.file_name_suffix = file_name_suffix @@ -275,7 +275,7 @@ class FileSink(iobase.Sink): return exceptions except BeamIOError as exp: if exp.exception_details is None: - raise exp + raise for (src, dest), exception in exp.exception_details.iteritems(): if exception: logging.warning('Rename not successful: %s -> %s, %s', src, dest, http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/filesystems_util.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filesystems_util.py b/sdks/python/apache_beam/io/filesystems_util.py index 6d21298..5034068 100644 --- a/sdks/python/apache_beam/io/filesystems_util.py +++ b/sdks/python/apache_beam/io/filesystems_util.py @@ -32,5 +32,4 @@ def get_filesystem(path): 'Google Cloud Platform IO not available, ' 'please install apache_beam[gcp]') return GCSFileSystem() - else: - return LocalFileSystem() + return LocalFileSystem() http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/gcp/bigquery.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 9a8174a..25f544d 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -967,13 +967,12 @@ class BigQueryWrapper(object): % (project_id, dataset_id, table_id)) if found_table and write_disposition != BigQueryDisposition.WRITE_TRUNCATE: return found_table - else: - # if write_disposition == BigQueryDisposition.WRITE_TRUNCATE we delete - # the table before this point. - return self._create_table(project_id=project_id, - dataset_id=dataset_id, - table_id=table_id, - schema=schema or found_table.schema) + # if write_disposition == BigQueryDisposition.WRITE_TRUNCATE we delete + # the table before this point. + return self._create_table(project_id=project_id, + dataset_id=dataset_id, + table_id=table_id, + schema=schema or found_table.schema) def run_query(self, project_id, query, use_legacy_sql, flatten_results, dry_run=False): http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/iobase.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index 512824b..d9df5c4 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -805,8 +805,7 @@ class Read(ptransform.PTransform): def _infer_output_coder(self, input_type=None, input_coder=None): if isinstance(self.source, BoundedSource): return self.source.default_output_coder() - else: - return self.source.coder + return self.source.coder def display_data(self): return {'source': DisplayDataItem(self.source.__class__, @@ -945,8 +944,8 @@ class _WriteKeyedBundleDoFn(core.DoFn): def process(self, element, init_result): bundle = element writer = self.sink.open_writer(init_result, str(uuid.uuid4())) - for element in bundle[1]: # values - writer.write(element) + for e in bundle[1]: # values + writer.write(e) return [window.TimestampedValue(writer.close(), window.MAX_TIMESTAMP)] http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/localfilesystem.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/localfilesystem.py b/sdks/python/apache_beam/io/localfilesystem.py index 46589b0..7637f2a 100644 --- a/sdks/python/apache_beam/io/localfilesystem.py +++ b/sdks/python/apache_beam/io/localfilesystem.py @@ -93,8 +93,7 @@ class LocalFileSystem(FileSystem): raw_file = open(path, mode) if compression_type == CompressionTypes.UNCOMPRESSED: return raw_file - else: - return CompressedFile(raw_file, compression_type=compression_type) + return CompressedFile(raw_file, compression_type=compression_type) def create(self, path, mime_type='application/octet-stream', compression_type=CompressionTypes.AUTO): http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/range_trackers.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/range_trackers.py b/sdks/python/apache_beam/io/range_trackers.py index 8627f76..6e7b84f 100644 --- a/sdks/python/apache_beam/io/range_trackers.py +++ b/sdks/python/apache_beam/io/range_trackers.py @@ -42,9 +42,9 @@ class OffsetRangeTracker(iobase.RangeTracker): raise ValueError('Start offset must not be \'None\'') if end is None: raise ValueError('End offset must not be \'None\'') - assert isinstance(start, int) or isinstance(start, long) + assert isinstance(start, (int, long)) if end != self.OFFSET_INFINITY: - assert isinstance(end, int) or isinstance(end, long) + assert isinstance(end, (int, long)) assert start <= end @@ -91,8 +91,8 @@ class OffsetRangeTracker(iobase.RangeTracker): 'The first record [starting at %d] must be at a split point' % record_start) - if (split_point and self._offset_of_last_split_point is not -1 and - record_start is self._offset_of_last_split_point): + if (split_point and self._offset_of_last_split_point != -1 and + record_start == self._offset_of_last_split_point): raise ValueError( 'Record at a split point has same offset as the previous split ' 'point: %d' % record_start) @@ -354,8 +354,7 @@ class OrderedPositionRangeTracker(iobase.RangeTracker): if self._stop_position is None or position < self._stop_position: self._last_claim = position return True - else: - return False + return False def position_at_fraction(self, fraction): return self.fraction_to_position( @@ -373,15 +372,13 @@ class OrderedPositionRangeTracker(iobase.RangeTracker): position, start=self._start_position, end=self._stop_position) self._stop_position = position return position, fraction - else: - return None + return None def fraction_consumed(self): if self._last_claim is self.UNSTARTED: return 0 - else: - return self.position_to_fraction( - self._last_claim, self._start_position, self._stop_position) + return self.position_to_fraction( + self._last_claim, self._start_position, self._stop_position) def position_to_fraction(self, pos, start, end): """ http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/source_test_utils.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/source_test_utils.py b/sdks/python/apache_beam/io/source_test_utils.py index 07de738..542e9f6 100644 --- a/sdks/python/apache_beam/io/source_test_utils.py +++ b/sdks/python/apache_beam/io/source_test_utils.py @@ -611,10 +611,9 @@ def _assertSplitAtFractionConcurrent( def read_or_split(test_params): if test_params[0]: return [val for val in test_params[1]] - else: - position = test_params[1].position_at_fraction(test_params[2]) - result = test_params[1].try_split(position) - return result + position = test_params[1].position_at_fraction(test_params[2]) + result = test_params[1].try_split(position) + return result inputs = [] pool = thread_pool if thread_pool else _ThreadPool(2) http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/textio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py index 9217e74..b6a24b0 100644 --- a/sdks/python/apache_beam/io/textio.py +++ b/sdks/python/apache_beam/io/textio.py @@ -198,9 +198,9 @@ class _TextSource(filebasedsource.FileBasedSource): if next_lf > 0 and read_buffer.data[next_lf - 1] == '\r': # Found a '\r\n'. Accepting that as the next separator. return (next_lf - 1, next_lf + 1) - else: - # Found a '\n'. Accepting that as the next separator. - return (next_lf, next_lf + 1) + + # Found a '\n'. Accepting that as the next separator. + return (next_lf, next_lf + 1) current_pos = len(read_buffer.data) @@ -256,10 +256,9 @@ class _TextSource(filebasedsource.FileBasedSource): # Current record should not contain the separator. return (read_buffer.data[record_start_position_in_buffer:sep_bounds[0]], sep_bounds[1] - record_start_position_in_buffer) - else: - # Current record should contain the separator. - return (read_buffer.data[record_start_position_in_buffer:sep_bounds[1]], - sep_bounds[1] - record_start_position_in_buffer) + # Current record should contain the separator. + return (read_buffer.data[record_start_position_in_buffer:sep_bounds[1]], + sep_bounds[1] - record_start_position_in_buffer) class _TextSink(fileio.FileSink): http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/transforms/combiners.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py index a4cd462..f812832 100644 --- a/sdks/python/apache_beam/transforms/combiners.py +++ b/sdks/python/apache_beam/transforms/combiners.py @@ -531,27 +531,26 @@ def curry_combine_fn(fn, args, kwargs): if not args and not kwargs: return fn - else: + # Create CurriedFn class for the combiner + class CurriedFn(core.CombineFn): + """CombineFn that applies extra arguments.""" - class CurriedFn(core.CombineFn): - """CombineFn that applies extra arguments.""" + def create_accumulator(self): + return fn.create_accumulator(*args, **kwargs) - def create_accumulator(self): - return fn.create_accumulator(*args, **kwargs) + def add_input(self, accumulator, element): + return fn.add_input(accumulator, element, *args, **kwargs) - def add_input(self, accumulator, element): - return fn.add_input(accumulator, element, *args, **kwargs) + def merge_accumulators(self, accumulators): + return fn.merge_accumulators(accumulators, *args, **kwargs) - def merge_accumulators(self, accumulators): - return fn.merge_accumulators(accumulators, *args, **kwargs) + def extract_output(self, accumulator): + return fn.extract_output(accumulator, *args, **kwargs) - def extract_output(self, accumulator): - return fn.extract_output(accumulator, *args, **kwargs) + def apply(self, elements): + return fn.apply(elements, *args, **kwargs) - def apply(self, elements): - return fn.apply(elements, *args, **kwargs) - - return CurriedFn() + return CurriedFn() class PhasedCombineFnExecutor(object):
