This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 11bc21d Change reading sideinput experimental flag into new pattern.
11bc21d is described below
commit 11bc21ded1a4eb1370f99344ea07ac4f8e38f83d
Author: Boyuan Zhang <[email protected]>
AuthorDate: Wed May 9 09:49:59 2018 -0700
Change reading sideinput experimental flag into new pattern.
---
sdks/python/apache_beam/runners/worker/operations.py | 5 +----
sdks/python/apache_beam/runners/worker/sideinputs.py | 3 +--
2 files changed, 2 insertions(+), 6 deletions(-)
diff --git a/sdks/python/apache_beam/runners/worker/operations.py
b/sdks/python/apache_beam/runners/worker/operations.py
index e30effc..1c425ae 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -303,9 +303,6 @@ class DoOperation(Operation):
# provided directly.
assert self.side_input_maps is None
- # Get experiments active in the worker to check for side input metrics exp.
- experiments = RuntimeValueProvider.get_value('experiments', list, [])
-
# We will read the side inputs in the order prescribed by the
# tags_and_types argument because this is exactly the order needed to
# replace the ArgumentPlaceholder objects in the args/kwargs of the DoFn
@@ -328,7 +325,7 @@ class DoOperation(Operation):
sources.append(si.source)
# The tracking of time spend reading and bytes read from side inputs is
# behind an experiment flag to test its performance impact.
- if 'sideinput_io_metrics' in experiments:
+ if 'sideinput_io_metrics' in RuntimeValueProvider.experiments:
si_counter = opcounters.SideInputReadCounter(
self.counter_factory,
self.state_sampler,
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs.py
b/sdks/python/apache_beam/runners/worker/sideinputs.py
index d2599fd..c36182d 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs.py
@@ -104,7 +104,6 @@ class PrefetchingSourceSetIterable(object):
def _reader_thread(self):
# pylint: disable=too-many-nested-blocks
- experiments = RuntimeValueProvider.get_value('experiments', list, [])
try:
while True:
try:
@@ -124,7 +123,7 @@ class PrefetchingSourceSetIterable(object):
# The tracking of time spend reading and bytes read from side
# inputs is kept behind an experiment flag to test performance
# impact.
- if 'sideinput_io_metrics' in experiments:
+ if 'sideinput_io_metrics' in RuntimeValueProvider.experiments:
self.add_byte_counter(reader)
returns_windowed_values = reader.returns_windowed_values
for value in reader:
--
To stop receiving notification emails like this one, please contact
[email protected].