Repository: beam Updated Branches: refs/heads/jstorm-runner f6a89b0fc -> 58d4b97c0
[BEAM-1964] Fix lint issues for linter upgrade -2 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bf474a0b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bf474a0b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bf474a0b Branch: refs/heads/jstorm-runner Commit: bf474a0b72beb2e946be39ce04e3f07800a3b307 Parents: cf9ac45 Author: Sourabh Bajaj <[email protected]> Authored: Thu Apr 13 17:19:56 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Fri Apr 14 13:06:14 2017 -0700 ---------------------------------------------------------------------- .../io/gcp/datastore/v1/datastoreio.py | 4 +-- .../apache_beam/io/gcp/datastore/v1/helper.py | 16 ++++------- .../io/gcp/datastore/v1/query_splitter.py | 2 +- sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 3 +- .../io/gcp/tests/bigquery_matcher.py | 3 +- sdks/python/apache_beam/metrics/cells.py | 28 +++++++++---------- sdks/python/apache_beam/metrics/execution.py | 3 +- sdks/python/apache_beam/metrics/metric.py | 9 ++---- sdks/python/apache_beam/runners/common.py | 9 ++---- .../runners/dataflow/dataflow_metrics_test.py | 3 +- .../runners/dataflow/dataflow_runner.py | 6 ++-- .../runners/dataflow/internal/apiclient.py | 8 +++--- .../runners/dataflow/internal/dependency.py | 6 ++-- .../runners/dataflow/test_dataflow_runner.py | 4 --- .../runners/direct/bundle_factory.py | 14 ++++------ .../runners/direct/evaluation_context.py | 10 +++---- .../apache_beam/runners/direct/executor.py | 9 +----- .../runners/direct/transform_evaluator.py | 7 ----- sdks/python/apache_beam/runners/runner.py | 3 +- .../apache_beam/tests/pipeline_verifiers.py | 7 ++--- sdks/python/apache_beam/transforms/combiners.py | 29 +++++++------------- .../apache_beam/transforms/combiners_test.py | 4 +-- sdks/python/apache_beam/typehints/decorators.py | 3 +- sdks/python/apache_beam/typehints/typehints.py | 3 +- 24 files changed, 68 insertions(+), 125 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py index e8ca05d..d9b3598 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py @@ -253,7 +253,7 @@ class ReadFromDatastore(PTransform): query = helper.make_latest_timestamp_query(namespace) req = helper.make_request(project, namespace, query) resp = datastore.run_query(req) - if len(resp.batch.entity_results) == 0: + if not resp.batch.entity_results: raise RuntimeError("Datastore total statistics unavailable.") entity = resp.batch.entity_results[0].entity @@ -281,7 +281,7 @@ class ReadFromDatastore(PTransform): req = helper.make_request(project, namespace, kind_stats_query) resp = datastore.run_query(req) - if len(resp.batch.entity_results) == 0: + if not resp.batch.entity_results: raise RuntimeError("Datastore statistics for kind %s unavailable" % kind) entity = resp.batch.entity_results[0].entity http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py index b1ef9af..d544226 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py @@ -62,8 +62,7 @@ def key_comparator(k1, k2): k2_path = next(k2_iter, None) if k2_path: return -1 - else: - return 0 + return 0 def compare_path(p1, p2): @@ -99,8 +98,7 @@ def str_compare(s1, s2): return 0 elif s1 < s2: return -1 - else: - return 1 + return 1 def get_datastore(project): @@ -131,13 +129,9 @@ def make_partition(project, namespace): def retry_on_rpc_error(exception): """A retry filter for Cloud Datastore RPCErrors.""" if isinstance(exception, RPCError): - if exception.code >= 500: - return True - else: - return False - else: - # TODO(vikasrk): Figure out what other errors should be retried. - return False + return exception.code >= 500 + # TODO(vikasrk): Figure out what other errors should be retried. + return False def fetch_entities(project, namespace, query, datastore): http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py index 8ced170..d5674f9 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py @@ -97,7 +97,7 @@ def _validate_query(query): if len(query.kind) != 1: raise ValueError('Query must have exactly one kind.') - if len(query.order) != 0: + if query.order: raise ValueError('Query cannot have any sort orders.') if query.HasField('limit'): http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index b2bc809..a10a3d2 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -93,8 +93,7 @@ class GCSFileSystem(FileSystem): raw_file = gcsio.GcsIO().open(path, mode, mime_type=mime_type) if compression_type == CompressionTypes.UNCOMPRESSED: return raw_file - else: - return CompressedFile(raw_file, compression_type=compression_type) + return CompressedFile(raw_file, compression_type=compression_type) def create(self, path, mime_type='application/octet-stream', compression_type=CompressionTypes.AUTO): http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py index cc26689..66d99b3 100644 --- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py +++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py @@ -38,8 +38,7 @@ MAX_RETRIES = 4 def retry_on_http_and_value_error(exception): """Filter allowing retries on Bigquery errors and value error.""" - return isinstance(exception, GoogleCloudError) or \ - isinstance(exception, ValueError) + return isinstance(exception, (GoogleCloudError, ValueError)) class BigqueryMatcher(BaseMatcher): http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/metrics/cells.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/metrics/cells.py b/sdks/python/apache_beam/metrics/cells.py index 5a571f5..c421949 100644 --- a/sdks/python/apache_beam/metrics/cells.py +++ b/sdks/python/apache_beam/metrics/cells.py @@ -97,9 +97,8 @@ class CellCommitState(object): with self._lock: if self._state == CellCommitState.CLEAN: return False - else: - self._state = CellCommitState.COMMITTING - return True + self._state = CellCommitState.COMMITTING + return True class MetricCell(object): @@ -218,8 +217,7 @@ class DistributionResult(object): """ if self.data.count == 0: return None - else: - return float(self.data.sum)/self.data.count + return float(self.data.sum)/self.data.count class DistributionData(object): @@ -257,16 +255,16 @@ class DistributionData(object): def combine(self, other): if other is None: return self - else: - new_min = (None if self.min is None and other.min is None else - min(x for x in (self.min, other.min) if x is not None)) - new_max = (None if self.max is None and other.max is None else - max(x for x in (self.max, other.max) if x is not None)) - return DistributionData( - self.sum + other.sum, - self.count + other.count, - new_min, - new_max) + + new_min = (None if self.min is None and other.min is None else + min(x for x in (self.min, other.min) if x is not None)) + new_max = (None if self.max is None and other.max is None else + max(x for x in (self.max, other.max) if x is not None)) + return DistributionData( + self.sum + other.sum, + self.count + other.count, + new_min, + new_max) @classmethod def singleton(cls, value): http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/metrics/execution.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/metrics/execution.py b/sdks/python/apache_beam/metrics/execution.py index f6c8990..887423b 100644 --- a/sdks/python/apache_beam/metrics/execution.py +++ b/sdks/python/apache_beam/metrics/execution.py @@ -129,8 +129,7 @@ class _MetricsEnvironment(object): index = len(self.PER_THREAD.container) - 1 if index < 0: return None - else: - return self.PER_THREAD.container[index] + return self.PER_THREAD.container[index] def set_current_container(self, container): self.set_container_stack() http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/metrics/metric.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py index f6a0923..33db4e1 100644 --- a/sdks/python/apache_beam/metrics/metric.py +++ b/sdks/python/apache_beam/metrics/metric.py @@ -103,8 +103,7 @@ class MetricResults(object): (filter.names and metric_key.metric.name in filter.names)): return True - else: - return False + return False @staticmethod def _matches_sub_path(actual_scope, filter_scope): @@ -117,8 +116,7 @@ class MetricResults(object): return False # The first entry was not exactly matched elif end_pos != len(actual_scope) and actual_scope[end_pos] != '/': return False # The last entry was not exactly matched - else: - return True + return True @staticmethod def _matches_scope(filter, metric_key): @@ -139,8 +137,7 @@ class MetricResults(object): if (MetricResults._matches_name(filter, metric_key) and MetricResults._matches_scope(filter, metric_key)): return True - else: - return False + return False def query(self, filter=None): raise NotImplementedError http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 2c1032d..8f86b75 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -414,10 +414,8 @@ def get_logging_context(maybe_logger, **kwargs): maybe_context = maybe_logger.PerThreadLoggingContext(**kwargs) if isinstance(maybe_context, LoggingContext): return maybe_context - else: - return _LoggingContextAdapter(maybe_context) - else: - return LoggingContext() + return _LoggingContextAdapter(maybe_context) + return LoggingContext() class _ReceiverAdapter(Receiver): @@ -432,5 +430,4 @@ class _ReceiverAdapter(Receiver): def as_receiver(maybe_receiver): if isinstance(maybe_receiver, Receiver): return maybe_receiver - else: - return _ReceiverAdapter(maybe_receiver) + return _ReceiverAdapter(maybe_receiver) http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py index 95027a3..ffee3e5 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py @@ -38,8 +38,7 @@ class DictToObject(object): def _wrap(self, value): if isinstance(value, (tuple, list, set, frozenset)): return type(value)([self._wrap(v) for v in value]) - else: - return DictToObject(value) if isinstance(value, dict) else value + return DictToObject(value) if isinstance(value, dict) else value class TestDataflowMetrics(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 1a92c26..2e9fc52 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -92,8 +92,7 @@ class DataflowRunner(PipelineRunner): return -1 elif 'Traceback' in msg: return 1 - else: - return 0 + return 0 job_id = result.job_id() while True: @@ -194,8 +193,7 @@ class DataflowRunner(PipelineRunner): return coders.WindowedValueCoder( coders.registry.get_coder(typehint), window_coder=window_coder) - else: - return coders.registry.get_coder(typehint) + return coders.registry.get_coder(typehint) def _get_cloud_encoding(self, coder): """Returns an encoding based on a coder object.""" http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 6a8aa93..8d44dff 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -436,10 +436,10 @@ class DataflowApplicationClient(object): if not template_location: return self.submit_job_description(job) - else: - logging.info('A template was just created at location %s', - template_location) - return None + + logging.info('A template was just created at location %s', + template_location) + return None def create_job_description(self, job): """Creates a job described by the workflow proto.""" http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py index 22de5c6..1f28b26 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py @@ -493,8 +493,7 @@ def get_sdk_name_and_version(): container_version = get_required_container_version() if container_version == BEAM_CONTAINER_VERSION: return ('Apache Beam SDK for Python', beam_version.__version__) - else: - return ('Google Cloud Dataflow SDK for Python', container_version) + return ('Google Cloud Dataflow SDK for Python', container_version) def get_sdk_package_name(): @@ -502,8 +501,7 @@ def get_sdk_package_name(): container_version = get_required_container_version() if container_version == BEAM_CONTAINER_VERSION: return BEAM_PACKAGE_NAME - else: - return GOOGLE_PACKAGE_NAME + return GOOGLE_PACKAGE_NAME def _download_pypi_sdk_package(temp_dir): http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py index 046313a..4cf4131 100644 --- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py @@ -23,10 +23,6 @@ from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner class TestDataflowRunner(DataflowRunner): - - def __init__(self): - super(TestDataflowRunner, self).__init__() - def run(self, pipeline): """Execute test pipeline and verify test matcher""" options = pipeline.options.view_as(TestOptions) http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/direct/bundle_factory.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/bundle_factory.py b/sdks/python/apache_beam/runners/direct/bundle_factory.py index 647b5f2..42c8095 100644 --- a/sdks/python/apache_beam/runners/direct/bundle_factory.py +++ b/sdks/python/apache_beam/runners/direct/bundle_factory.py @@ -127,8 +127,7 @@ class Bundle(object): if not self._stacked: if self._committed and not make_copy: return self._elements - else: - return list(self._elements) + return list(self._elements) def iterable_stacked_or_elements(elements): for e in elements: @@ -140,9 +139,8 @@ class Bundle(object): if self._committed and not make_copy: return iterable_stacked_or_elements(self._elements) - else: - # returns a copy. - return [e for e in iterable_stacked_or_elements(self._elements)] + # returns a copy. + return [e for e in iterable_stacked_or_elements(self._elements)] def has_elements(self): return len(self._elements) > 0 @@ -171,9 +169,9 @@ class Bundle(object): if not self._stacked: self._elements.append(element) return - if (len(self._elements) > 0 and - (isinstance(self._elements[-1], WindowedValue) or - isinstance(self._elements[-1], Bundle.StackedWindowedValues)) and + if (self._elements and + (isinstance(self._elements[-1], (WindowedValue, + Bundle.StackedWindowedValues))) and self._elements[-1].timestamp == element.timestamp and self._elements[-1].windows == element.windows): if isinstance(self._elements[-1], WindowedValue): http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/direct/evaluation_context.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py index 8114104..2169c7c 100644 --- a/sdks/python/apache_beam/runners/direct/evaluation_context.py +++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py @@ -281,11 +281,11 @@ class EvaluationContext(object): """ if transform: return self._is_transform_done(transform) - else: - for applied_ptransform in self._step_names: - if not self._is_transform_done(applied_ptransform): - return False - return True + + for applied_ptransform in self._step_names: + if not self._is_transform_done(applied_ptransform): + return False + return True def _is_transform_done(self, transform): tw = self._watermark_manager.get_watermarks(transform) http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/direct/executor.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py index ce6356c..f6a1d7f 100644 --- a/sdks/python/apache_beam/runners/direct/executor.py +++ b/sdks/python/apache_beam/runners/direct/executor.py @@ -240,13 +240,6 @@ class _CompletionCallback(object): _ExecutorServiceParallelExecutor.ExecutorUpdate(None, exception)) -class _TimerCompletionCallback(_CompletionCallback): - - def __init__(self, evaluation_context, all_updates, timers): - super(_TimerCompletionCallback, self).__init__( - evaluation_context, all_updates, timers) - - class TransformExecutor(ExecutorService.CallableTask): """TransformExecutor will evaluate a bundle using an applied ptransform. @@ -529,7 +522,7 @@ class _ExecutorServiceParallelExecutor(object): empty_bundle = ( self._executor.evaluation_context.create_empty_committed_bundle( applied_ptransform.inputs[0])) - timer_completion_callback = _TimerCompletionCallback( + timer_completion_callback = _CompletionCallback( self._executor.evaluation_context, self._executor.all_updates, applied_ptransform) http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/direct/transform_evaluator.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py index 662c61d..f34513c 100644 --- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py @@ -278,13 +278,6 @@ class _TaggedReceivers(dict): class _ParDoEvaluator(_TransformEvaluator): """TransformEvaluator for ParDo transform.""" - - def __init__(self, evaluation_context, applied_ptransform, - input_committed_bundle, side_inputs, scoped_metrics_container): - super(_ParDoEvaluator, self).__init__( - evaluation_context, applied_ptransform, input_committed_bundle, - side_inputs, scoped_metrics_container) - def start_bundle(self): transform = self._applied_ptransform.transform http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py index 7e7ec24..6c05951 100644 --- a/sdks/python/apache_beam/runners/runner.py +++ b/sdks/python/apache_beam/runners/runner.py @@ -111,8 +111,7 @@ def group_by_key_input_visitor(): # pylint: disable=wrong-import-order, wrong-import-position from apache_beam import GroupByKey, GroupByKeyOnly from apache_beam import typehints - if (isinstance(transform_node.transform, GroupByKey) or - isinstance(transform_node.transform, GroupByKeyOnly)): + if isinstance(transform_node.transform, (GroupByKey, GroupByKeyOnly)): pcoll = transform_node.inputs[0] input_type = pcoll.element_type # If input_type is not specified, then treat it as `Any`. http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/tests/pipeline_verifiers.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py index 3cac658..51302b0 100644 --- a/sdks/python/apache_beam/tests/pipeline_verifiers.py +++ b/sdks/python/apache_beam/tests/pipeline_verifiers.py @@ -66,11 +66,8 @@ class PipelineStateMatcher(BaseMatcher): def retry_on_io_error_and_server_error(exception): """Filter allowing retries on file I/O errors and service error.""" - if isinstance(exception, IOError) or \ - (HttpError is not None and isinstance(exception, HttpError)): - return True - else: - return False + return isinstance(exception, IOError) or \ + (HttpError is not None and isinstance(exception, HttpError)) class FileChecksumMatcher(BaseMatcher): http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/transforms/combiners.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py index f55d46a..a4cd462 100644 --- a/sdks/python/apache_beam/transforms/combiners.py +++ b/sdks/python/apache_beam/transforms/combiners.py @@ -95,8 +95,7 @@ class MeanCombineFn(core.CombineFn): return cy_combiners.MeanInt64Fn() elif input_type is float: return cy_combiners.MeanFloatFn() - else: - return self + return self class Count(object): @@ -310,23 +309,19 @@ class TopCombineFn(core.CombineFn): if len(buffer) < self._n: if not buffer: return element_key, [element] - else: - buffer.append(element) - if lt(element_key, threshold): # element_key < threshold - return element_key, buffer - else: - return accumulator # with mutated buffer + buffer.append(element) + if lt(element_key, threshold): # element_key < threshold + return element_key, buffer + return accumulator # with mutated buffer elif lt(threshold, element_key): # threshold < element_key buffer.append(element) if len(buffer) < self._buffer_size: return accumulator - else: - self._sort_buffer(buffer, lt) - min_element = buffer[-self._n] - threshold = self._key_fn(min_element) if self._key_fn else min_element - return threshold, buffer[-self._n:] - else: - return accumulator + self._sort_buffer(buffer, lt) + min_element = buffer[-self._n] + threshold = self._key_fn(min_element) if self._key_fn else min_element + return threshold, buffer[-self._n:] + return accumulator def merge_accumulators(self, accumulators, *args, **kwargs): accumulators = list(accumulators) @@ -357,10 +352,6 @@ class TopCombineFn(core.CombineFn): class Largest(TopCombineFn): - - def __init__(self, n): - super(Largest, self).__init__(n) - def default_label(self): return 'Largest(%s)' % self._n http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/transforms/combiners_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py index 6c101fe..af76889 100644 --- a/sdks/python/apache_beam/transforms/combiners_test.py +++ b/sdks/python/apache_beam/transforms/combiners_test.py @@ -164,10 +164,10 @@ class CombineTest(unittest.TestCase): DisplayDataItemMatcher('fn', sampleFn.fn.__name__), DisplayDataItemMatcher('combine_fn', transform.fn.__class__)] - if len(args) > 0: + if args: expected_items.append( DisplayDataItemMatcher('args', str(args))) - if len(kwargs) > 0: + if kwargs: expected_items.append( DisplayDataItemMatcher('kwargs', str(kwargs))) hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/typehints/decorators.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/typehints/decorators.py b/sdks/python/apache_beam/typehints/decorators.py index d8f0b1b..af6c499 100644 --- a/sdks/python/apache_beam/typehints/decorators.py +++ b/sdks/python/apache_beam/typehints/decorators.py @@ -237,8 +237,7 @@ def _unpack_positional_arg_hints(arg, hint): if isinstance(hint, typehints.TupleConstraint): return tuple(_unpack_positional_arg_hints(a, t) for a, t in zip(arg, hint.tuple_types)) - else: - return (typehints.Any,) * len(arg) + return (typehints.Any,) * len(arg) return hint http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/typehints/typehints.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/typehints/typehints.py b/sdks/python/apache_beam/typehints/typehints.py index 1557d85..9b41adb 100644 --- a/sdks/python/apache_beam/typehints/typehints.py +++ b/sdks/python/apache_beam/typehints/typehints.py @@ -1039,8 +1039,7 @@ def is_consistent_with(sub, base): if isinstance(base, TypeConstraint): if isinstance(sub, UnionConstraint): return all(is_consistent_with(c, base) for c in sub.union_types) - else: - return base._consistent_with_check_(sub) + return base._consistent_with_check_(sub) elif isinstance(sub, TypeConstraint): # Nothing but object lives above any type constraints. return base == object
