Repository: beam Updated Branches: refs/heads/master 5181e619f -> 260c4fce8
Adding IOTargetName and unittests for CounterName Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/185daffa Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/185daffa Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/185daffa Branch: refs/heads/master Commit: 185daffa53595b3a6d900252d69132c85013aa4c Parents: 5181e61 Author: Pablo <[email protected]> Authored: Tue Aug 22 16:53:32 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Thu Aug 24 09:47:40 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/utils/counters.py | 58 +++++++++------ sdks/python/apache_beam/utils/counters_test.py | 78 +++++++++++++++++++++ 2 files changed, 115 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/185daffa/sdks/python/apache_beam/utils/counters.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/counters.py b/sdks/python/apache_beam/utils/counters.py index 5d029dc..08685aa 100644 --- a/sdks/python/apache_beam/utils/counters.py +++ b/sdks/python/apache_beam/utils/counters.py @@ -23,40 +23,56 @@ For internal use only; no backwards-compatibility guarantees. """ +from collections import namedtuple import threading + from apache_beam.transforms import cy_combiners -class CounterName(object): +# Information identifying the IO being measured by a counter. +IOTargetName = namedtuple('IOTargetName', ['side_input_step_name', + 'side_input_index', + 'original_shuffle_step_name']) + + +def side_input_id(step_name, input_index): + """Create an IOTargetName that identifies the reading of a side input.""" + return IOTargetName(step_name, input_index, None) + + +def shuffle_id(step_name): + """Create an IOTargetName that identifies a GBK step.""" + return IOTargetName(None, None, step_name) + + +_CounterName = namedtuple('_CounterName', ['name', + 'stage_name', + 'step_name', + 'system_name', + 'namespace', + 'origin', + 'output_index', + 'io_target']) + + +class CounterName(_CounterName): """Naming information for a counter.""" SYSTEM = object() USER = object() - def __init__(self, name, stage_name=None, step_name=None, - system_name=None, namespace=None, - origin=None, output_index=None): - self.name = name - self.origin = origin or CounterName.SYSTEM - self.namespace = namespace - self.stage_name = stage_name - self.step_name = step_name - self.system_name = system_name - self.output_index = output_index - - def __hash__(self): - return hash((self.name, - self.origin, - self.namespace, - self.stage_name, - self.step_name, - self.system_name, - self.output_index)) + def __new__(cls, name, stage_name=None, step_name=None, + system_name=None, namespace=None, + origin=None, output_index=None, io_target=None): + origin = origin or CounterName.SYSTEM + return super(CounterName, cls).__new__(cls, name, stage_name, step_name, + system_name, namespace, + origin, output_index, io_target) def __str__(self): return '%s' % self._str_internal() def __repr__(self): - return '<%s at %s>' % (self._str_internal(), hex(id(self))) + return '<CounterName<%s> at %s>' % (self._str_internal(), hex(id(self))) def _str_internal(self): if self.origin == CounterName.USER: http://git-wip-us.apache.org/repos/asf/beam/blob/185daffa/sdks/python/apache_beam/utils/counters_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/counters_test.py b/sdks/python/apache_beam/utils/counters_test.py new file mode 100644 index 0000000..37cab88 --- /dev/null +++ b/sdks/python/apache_beam/utils/counters_test.py @@ -0,0 +1,78 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for counters and counter names.""" + +from __future__ import absolute_import + +import unittest + +from apache_beam.utils import counters +from apache_beam.utils.counters import CounterName + + +class CounterNameTest(unittest.TestCase): + + def test_equal_objects(self): + self.assertEqual(CounterName('counter_name', + 'stage_name', + 'step_name'), + CounterName('counter_name', + 'stage_name', + 'step_name')) + self.assertNotEqual(CounterName('counter_name', + 'stage_name', + 'step_name'), + CounterName('counter_name', + 'stage_name', + 'step_nam')) + + # Testing objects with an IOTarget. + self.assertEqual(CounterName('counter_name', + 'stage_name', + 'step_name', + io_target=counters.side_input_id(1, 's9')), + CounterName('counter_name', + 'stage_name', + 'step_name', + io_target=counters.side_input_id(1, 's9'))) + self.assertNotEqual(CounterName('counter_name', + 'stage_name', + 'step_name', + io_target=counters.side_input_id(1, 's')), + CounterName('counter_name', + 'stage_name', + 'step_name', + io_target=counters.side_input_id(1, 's9'))) + + def test_hash_two_objects(self): + self.assertEqual(hash(CounterName('counter_name', + 'stage_name', + 'step_name')), + hash(CounterName('counter_name', + 'stage_name', + 'step_name'))) + self.assertNotEqual(hash(CounterName('counter_name', + 'stage_name', + 'step_name')), + hash(CounterName('counter_name', + 'stage_name', + 'step_nam'))) + + +if __name__ == '__main__': + unittest.main()
