This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new df1d267 Full tracking of Batch side inputs. Performance tests show no regression df1d267 is described below commit df1d26748b91caf893eb14bf95b2a309192ce807 Author: Pablo <pabl...@google.com> AuthorDate: Wed Jul 11 18:21:38 2018 -0700 Full tracking of Batch side inputs. Performance tests show no regression --- sdks/python/apache_beam/runners/worker/operations.py | 16 ++++++---------- sdks/python/apache_beam/runners/worker/sideinputs.py | 7 +------ .../apache_beam/runners/worker/sideinputs_test.py | 17 ----------------- 3 files changed, 7 insertions(+), 33 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py index 78a67bc..1e561e1 100644 --- a/sdks/python/apache_beam/runners/worker/operations.py +++ b/sdks/python/apache_beam/runners/worker/operations.py @@ -33,7 +33,6 @@ from apache_beam.internal import pickler from apache_beam.io import iobase from apache_beam.metrics.execution import MetricsContainer from apache_beam.metrics.execution import ScopedMetricsContainer -from apache_beam.options.value_provider import RuntimeValueProvider from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.runners import common from apache_beam.runners.common import Receiver @@ -322,15 +321,12 @@ class DoOperation(Operation): sources.append(si.source) # The tracking of time spend reading and bytes read from side inputs is # behind an experiment flag to test its performance impact. - if 'sideinput_io_metrics_v2' in RuntimeValueProvider.experiments: - si_counter = opcounters.SideInputReadCounter( - self.counter_factory, - self.state_sampler, - declaring_step=self.name_context.step_name, - # Inputs are 1-indexed, so we add 1 to i in the side input id - input_index=i + 1) - else: - si_counter = opcounters.NoOpTransformIOCounter() + si_counter = opcounters.SideInputReadCounter( + self.counter_factory, + self.state_sampler, + declaring_step=self.name_context.step_name, + # Inputs are 1-indexed, so we add 1 to i in the side input id + input_index=i + 1) iterator_fn = sideinputs.get_iterator_fn_for_sources( sources, read_counter=si_counter) diff --git a/sdks/python/apache_beam/runners/worker/sideinputs.py b/sdks/python/apache_beam/runners/worker/sideinputs.py index d806f9e..23190a5 100644 --- a/sdks/python/apache_beam/runners/worker/sideinputs.py +++ b/sdks/python/apache_beam/runners/worker/sideinputs.py @@ -31,7 +31,6 @@ from future import standard_library from apache_beam.coders import observable from apache_beam.io import iobase -from apache_beam.options.value_provider import RuntimeValueProvider from apache_beam.runners.worker import opcounters from apache_beam.transforms import window @@ -88,9 +87,6 @@ class PrefetchingSourceSetIterable(object): def add_byte_counter(self, reader): """Adds byte counter observer to a side input reader. - If the 'sideinput_io_metrics_v2' experiment flag is not passed in, then - nothing is attached to the reader. - Args: reader: A reader that should inherit from ObservableMixin to have bytes tracked. @@ -131,8 +127,7 @@ class PrefetchingSourceSetIterable(object): # The tracking of time spend reading and bytes read from side # inputs is kept behind an experiment flag to test performance # impact. - if 'sideinput_io_metrics_v2' in RuntimeValueProvider.experiments: - self.add_byte_counter(reader) + self.add_byte_counter(reader) returns_windowed_values = reader.returns_windowed_values for value in reader: if self.has_errored: diff --git a/sdks/python/apache_beam/runners/worker/sideinputs_test.py b/sdks/python/apache_beam/runners/worker/sideinputs_test.py index 3a5ff38..4a8f7c8 100644 --- a/sdks/python/apache_beam/runners/worker/sideinputs_test.py +++ b/sdks/python/apache_beam/runners/worker/sideinputs_test.py @@ -28,7 +28,6 @@ from builtins import range import mock from apache_beam.coders import observable -from apache_beam.options.value_provider import RuntimeValueProvider from apache_beam.runners.worker import sideinputs @@ -83,20 +82,7 @@ class PrefetchingSourceIteratorTest(unittest.TestCase): sources, max_reader_threads=2) assert list(strip_windows(iterator_fn())) == list(range(6)) - def test_bytes_read_behind_experiment(self): - mock_read_counter = mock.MagicMock() - source_records = ['a', 'b', 'c', 'd'] - sources = [ - FakeSource(source_records, notify_observers=True), - ] - iterator_fn = sideinputs.get_iterator_fn_for_sources( - sources, max_reader_threads=3, read_counter=mock_read_counter) - assert list(strip_windows(iterator_fn())) == source_records - mock_read_counter.add_bytes_read.assert_not_called() - def test_bytes_read_are_reported(self): - RuntimeValueProvider.set_runtime_options( - {'experiments': ['sideinput_io_metrics_v2', 'other']}) mock_read_counter = mock.MagicMock() source_records = ['a', 'b', 'c', 'd'] sources = [ @@ -107,9 +93,6 @@ class PrefetchingSourceIteratorTest(unittest.TestCase): assert list(strip_windows(iterator_fn())) == source_records mock_read_counter.add_bytes_read.assert_called_with(4) - # Remove runtime options from the runtime value provider. - RuntimeValueProvider.set_runtime_options({}) - def test_multiple_sources_iterator_fn(self): sources = [ FakeSource([0]),