[
https://issues.apache.org/jira/browse/BEAM-3042?focusedWorklogId=97207&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97207
]
ASF GitHub Bot logged work on BEAM-3042:
----------------------------------------
Author: ASF GitHub Bot
Created on: 01/May/18 19:47
Start Date: 01/May/18 19:47
Worklog Time Spent: 10m
Work Description: pabloem closed pull request #5075: [BEAM-3042] Refactor
of TransformIOCounters (performance, inheritance).
URL: https://github.com/apache/beam/pull/5075
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/python/apache_beam/runners/worker/opcounters.pxd
b/sdks/python/apache_beam/runners/worker/opcounters.pxd
index 40ca72dde7e..0bcd42848d2 100644
--- a/sdks/python/apache_beam/runners/worker/opcounters.pxd
+++ b/sdks/python/apache_beam/runners/worker/opcounters.pxd
@@ -22,21 +22,26 @@ from apache_beam.utils.counters cimport Counter
cdef class TransformIOCounter(object):
+ cdef readonly object _counter_factory
+ cdef readonly object _state_sampler
+ cdef Counter bytes_read_counter
+ cdef object scoped_state
+ cdef object _latest_step
+
cpdef update_current_step(self)
cpdef add_bytes_read(self, libc.stdint.int64_t n)
cpdef __enter__(self)
cpdef __exit__(self, exc_type, exc_value, traceback)
+cdef class NoOpTransformIOCounter(TransformIOCounter):
+ pass
+
+
cdef class SideInputReadCounter(TransformIOCounter):
- cdef readonly object _counter_factory
- cdef readonly object _state_sampler
cdef readonly object declaring_step
cdef readonly object input_index
- cdef Counter bytes_read_counter
- cdef object scoped_state
-
cdef class SumAccumulator(object):
cdef libc.stdint.int64_t _value
diff --git a/sdks/python/apache_beam/runners/worker/opcounters.py
b/sdks/python/apache_beam/runners/worker/opcounters.py
index 17fead28ed2..0e4ee0a05dc 100644
--- a/sdks/python/apache_beam/runners/worker/opcounters.py
+++ b/sdks/python/apache_beam/runners/worker/opcounters.py
@@ -41,24 +41,66 @@ class TransformIOCounter(object):
Some examples of IO can be side inputs, shuffle, or streaming state.
"""
+ def __init__(self, counter_factory, state_sampler):
+ """Create a new IO read counter.
+
+ Args:
+ counter_factory: A counters.CounterFactory to create byte counters.
+ state_sampler: A statesampler.StateSampler to transition into read
states.
+ """
+ self._counter_factory = counter_factory
+ self._state_sampler = state_sampler
+ self._latest_step = None
+ self.bytes_read_counter = None
+ self.scoped_state = None
+
def update_current_step(self):
- """Update the current step within a stage as it may have changed.
+ """Update the current running step.
- If the state changed, it would mean that an initial step passed a
- data-accessor (such as a side input / shuffle Iterable) down to the
- next step in a stage.
+ Due to the fusion optimization, user code may choose to emit the data
+ structure that holds side inputs (Iterable, Dict, or others). This call
+ updates the current step, to attribute the data consumption to the step
+ that is responsible for actual consumption.
+
+ CounterName uses the io_target field for information pertinent to the
+ consumption of IO.
"""
+ current_state = self._state_sampler.current_state()
+ current_step_name = current_state.name.step_name
+ if current_step_name != self._latest_step:
+ self._latest_step = current_step_name
+ self._update_counters_for_requesting_step(current_step_name)
+
+ def _update_counters_for_requesting_step(self, step_name):
pass
def add_bytes_read(self, count):
- pass
+ if count > 0 and self.bytes_read_counter:
+ self.bytes_read_counter.update(count)
+
+ def __enter__(self):
+ self.scoped_state.__enter__()
+
+ def __exit__(self, exception_type, exception_value, traceback):
+ self.scoped_state.__exit__(exception_type, exception_value, traceback)
+
+
+class NoOpTransformIOCounter(TransformIOCounter):
+ """All operations for IO tracking are no-ops."""
+
+ def __init__(self):
+ super(NoOpTransformIOCounter, self).__init__(None, None)
- def __exit__(self, exc_type, exc_value, traceback):
- """Exit the IO state."""
+ def update_current_step(self):
pass
def __enter__(self):
- """Enter the IO state. This should track time spent blocked on IO."""
+ pass
+
+ def __exit__(self, exception_type, exception_value, traceback):
+ pass
+
+ def add_bytes_read(self, count):
pass
@@ -93,8 +135,7 @@ def __init__(self, counter_factory, state_sampler,
declaring_step,
side input, and input_index is the index of the PCollectionView within
the list of inputs.
"""
- self._counter_factory = counter_factory
- self._state_sampler = state_sampler
+ super(SideInputReadCounter, self).__init__(counter_factory, state_sampler)
self.declaring_step = declaring_step
self.input_index = input_index
@@ -102,40 +143,19 @@ def __init__(self, counter_factory, state_sampler,
declaring_step,
# step. We check the current state to create the internal counters.
self.update_current_step()
- def update_current_step(self):
- """Update the current running step.
-
- Due to the fusion optimization, user code may choose to emit the data
- structure that holds side inputs (Iterable, Dict, or others). This call
- updates the current step, to attribute the data consumption to the step
- that is responsible for actual consumption.
-
- CounterName uses the io_target field for information pertinent to the
- consumption of side inputs.
- """
- current_state = self._state_sampler.current_state()
- operation_name = current_state.name.step_name
+ def _update_counters_for_requesting_step(self, step_name):
+ side_input_id = counters.side_input_id(step_name, self.input_index)
self.scoped_state = self._state_sampler.scoped_state(
self.declaring_step,
'read-sideinput',
- io_target=counters.side_input_id(operation_name, self.input_index))
+ io_target=side_input_id)
self.bytes_read_counter = self._counter_factory.get_counter(
CounterName(
'read-sideinput-byte-count',
step_name=self.declaring_step,
- io_target=counters.side_input_id(operation_name,
self.input_index)),
+ io_target=side_input_id),
Counter.SUM)
- def add_bytes_read(self, count):
- if count > 0:
- self.bytes_read_counter.update(count)
-
- def __enter__(self):
- self.scoped_state.__enter__()
-
- def __exit__(self, exception_type, exception_value, traceback):
- self.scoped_state.__exit__(exception_type, exception_value, traceback)
-
class SumAccumulator(object):
"""Accumulator for collecting byte counts."""
diff --git a/sdks/python/apache_beam/runners/worker/operations.py
b/sdks/python/apache_beam/runners/worker/operations.py
index 0fa32e3c997..d9871f505dd 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -321,7 +321,7 @@ def _read_side_inputs(self, tags_and_types):
# Inputs are 1-indexed, so we add 1 to i in the side input id
input_index=i + 1)
else:
- si_counter = opcounters.TransformIOCounter()
+ si_counter = opcounters.NoOpTransformIOCounter()
iterator_fn = sideinputs.get_iterator_fn_for_sources(
sources, read_counter=si_counter)
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs.py
b/sdks/python/apache_beam/runners/worker/sideinputs.py
index 77157857b05..d2599fd59a5 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs.py
@@ -73,16 +73,15 @@ def __init__(self,
# Whether an error was encountered in any source reader.
self.has_errored = False
- self.read_counter = read_counter or opcounters.TransformIOCounter()
-
+ self.read_counter = read_counter or opcounters.NoOpTransformIOCounter()
self.reader_threads = []
self._start_reader_threads()
def add_byte_counter(self, reader):
"""Adds byte counter observer to a side input reader.
- If the 'sideinput_io_metrics' experiment flag is not passed in, then
nothing
- is attached to the reader.
+ If the 'sideinput_io_metrics' experiment flag is not passed in, then
+ nothing is attached to the reader.
Args:
reader: A reader that should inherit from ObservableMixin to have
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 97207)
Time Spent: 3.5h (was: 3h 20m)
> Add tracking of bytes read / time spent when reading side inputs
> ----------------------------------------------------------------
>
> Key: BEAM-3042
> URL: https://issues.apache.org/jira/browse/BEAM-3042
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Reporter: Pablo Estrada
> Assignee: Pablo Estrada
> Priority: Major
> Time Spent: 3.5h
> Remaining Estimate: 0h
>
> It is difficult for Dataflow users to understand how modifying a pipeline or
> data set can affect how much inter-transform IO is used in their job. The
> intent of this feature request is to help users understand how side inputs
> behave when they are consumed.
> This will allow users to understand how much time and how much data their
> pipeline uses to read/write to inter-transform IO. Users will also be able to
> modify their pipelines and understand how their changes affect these IO
> metrics.
> For further information, please review the internal Google doc
> go/insights-transform-io-design-doc.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)