[
https://issues.apache.org/jira/browse/BEAM-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16286601#comment-16286601
]
ASF GitHub Bot commented on BEAM-3042:
--------------------------------------
pabloem closed pull request #3943: [BEAM-3042] Add tracking of bytes read /
time spent when reading side inputs
URL: https://github.com/apache/beam/pull/3943
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/python/apache_beam/runners/worker/opcounters.py
b/sdks/python/apache_beam/runners/worker/opcounters.py
index f4ba6b9a9a8..c997d23a39d 100644
--- a/sdks/python/apache_beam/runners/worker/opcounters.py
+++ b/sdks/python/apache_beam/runners/worker/opcounters.py
@@ -25,6 +25,7 @@
import random
from apache_beam.utils.counters import Counter
+from apache_beam.utils.counters import CounterName
# This module is experimental. No backwards-compatibility guarantees.
@@ -42,6 +43,58 @@ def value(self):
return self._value
+class TransformIoCounter(object):
+
+ def add_bytes_read(self, n):
+ pass
+
+ def __enter__(self):
+ self.enter()
+
+ def __exit__(self, unused_exc_type, unused_exc_value, unused_traceback):
+ self.exit()
+
+ def enter(self):
+ pass
+
+ def exit(self):
+ pass
+
+ def check_step(self):
+ pass
+
+
+class SideInputReadCounter(TransformIoCounter):
+
+ def __init__(self, counter_factory, state_sampler, io_target):
+ self._counter_factory = counter_factory
+ self._state_sampler = state_sampler
+ self._bytes_read_cache = 0
+ self.io_target = io_target
+ self.check_step()
+
+ def check_step(self):
+ current_state = self._state_sampler.current_state()
+ operation_name = current_state.name.step_name
+ self.scoped_state = self._state_sampler.scoped_state(
+ operation_name, 'read-sideinput', io_target=self.io_target)
+ self.bytes_read_counter = self._counter_factory.get_counter(
+ CounterName('bytes-read',
+ step_name=operation_name,
+ io_target=self.io_target),
+ Counter.SUM)
+
+ def add_bytes_read(self, n):
+ if n > 0:
+ self.bytes_read_counter.update(n)
+
+ def enter(self):
+ self.scoped_state.__enter__()
+
+ def exit(self):
+ self.scoped_state.__exit__(None, None, None)
+
+
class OperationCounters(object):
"""The set of basic counters to attach to an Operation."""
diff --git a/sdks/python/apache_beam/runners/worker/operations.py
b/sdks/python/apache_beam/runners/worker/operations.py
index ed3f3b8f466..132a61fb131 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -42,6 +42,7 @@
from apache_beam.transforms.combiners import PhasedCombineFnExecutor
from apache_beam.transforms.combiners import curry_combine_fn
from apache_beam.transforms.window import GlobalWindows
+from apache_beam.utils import counters
from apache_beam.utils.windowed_value import WindowedValue
# Allow some "pure mode" declarations.
@@ -281,7 +282,7 @@ def _read_side_inputs(self, tags_and_types):
# Note that for each tag there could be several read operations in the
# specification. This can happen for instance if the source has been
# sharded into several files.
- for side_tag, view_class, view_options in tags_and_types:
+ for i, (side_tag, view_class, view_options) in enumerate(tags_and_types):
sources = []
# Using the side_tag in the lambda below will trigger a pylint warning.
# However in this case it is fine because the lambda is used right away
@@ -293,7 +294,13 @@ def _read_side_inputs(self, tags_and_types):
if not isinstance(si, operation_specs.WorkerSideInputSource):
raise NotImplementedError('Unknown side input type: %r' % si)
sources.append(si.source)
- iterator_fn = sideinputs.get_iterator_fn_for_sources(sources)
+
+ si_counter = opcounters.SideInputReadCounter(
+ self.counter_factory, self.state_sampler,
+ # Inputs are 1-indexed, so we add 1 to i in the side input id
+ counters.side_input_id(self.operation_name, i+1))
+ iterator_fn = sideinputs.get_iterator_fn_for_sources(
+ sources, read_counter=si_counter)
# Backwards compatibility for pre BEAM-733 SDKs.
if isinstance(view_options, tuple):
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs.py
b/sdks/python/apache_beam/runners/worker/sideinputs.py
index bdf9f4e71f5..b11ab3cfba6 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs.py
@@ -24,6 +24,7 @@
import traceback
from apache_beam.io import iobase
+from apache_beam.runners.worker import opcounters
from apache_beam.transforms import window
# This module is experimental. No backwards-compatibility guarantees.
@@ -51,9 +52,12 @@ class PrefetchingSourceSetIterable(object):
"""Value iterator that reads concurrently from a set of sources."""
def __init__(self, sources,
- max_reader_threads=MAX_SOURCE_READER_THREADS):
+ max_reader_threads=MAX_SOURCE_READER_THREADS,
+ read_counter=None):
self.sources = sources
self.num_reader_threads = min(max_reader_threads, len(self.sources))
+ self.read_counter = read_counter or opcounters.TransformIoCounter()
+ # self.read_counter = opcounters.TransformIoCounter()
# Queue for sources that are to be read.
self.sources_queue = Queue.Queue()
@@ -78,6 +82,13 @@ def _start_reader_threads(self):
t.start()
self.reader_threads.append(t)
+ def _get_source_position(self, range_tracker=None, reader=None):
+ if reader:
+ return reader.get_progress().position.byte_offset
+ else:
+ return range_tracker.position_at_fraction(
+ range_tracker.fraction_consumed()) if range_tracker else 0
+
def _reader_thread(self):
# pylint: disable=too-many-nested-blocks
try:
@@ -85,22 +96,37 @@ def _reader_thread(self):
try:
source = self.sources_queue.get_nowait()
if isinstance(source, iobase.BoundedSource):
- for value in source.read(source.get_range_tracker(None, None)):
+ rt = source.get_range_tracker(None, None)
+ initial_position = self._get_source_position(range_tracker=rt)
+ for value in source.read(rt):
if self.has_errored:
# If any reader has errored, just return.
return
+
+ current_position = self._get_source_position(range_tracker=rt)
+ consumed_bytes = current_position - initial_position
+ self.read_counter.add_bytes_read(consumed_bytes)
+ initial_position = initial_position + consumed_bytes
+
if isinstance(value, window.WindowedValue):
self.element_queue.put(value)
else:
self.element_queue.put(_globally_windowed(value))
else:
- # Native dataflow source.
+ # Native dataflow source / testing FakeSource
with source.reader() as reader:
+ initial_offset = self._get_source_position(reader=reader)
+
returns_windowed_values = reader.returns_windowed_values
for value in reader:
if self.has_errored:
- # If any reader has errored, just return.
+ # If any reader has errored, just return.`
return
+
+ new_offset = self._get_source_position(reader=reader)
+ self.read_counter.add_bytes_read(new_offset - initial_offset)
+ initial_offset = new_offset
+
if returns_windowed_values:
self.element_queue.put(value)
else:
@@ -128,7 +154,14 @@ def __iter__(self):
num_readers_finished = 0
try:
while True:
- element = self.element_queue.get()
+ if self.element_queue.empty():
+ # The queue is empty. We check the current state.
+ self.read_counter.check_step()
+ with self.read_counter:
+ element = self.element_queue.get()
+ else:
+ element = self.element_queue.get()
+
if element is READER_THREAD_IS_DONE_SENTINEL:
num_readers_finished += 1
if num_readers_finished == self.num_reader_threads:
@@ -150,11 +183,13 @@ def __iter__(self):
def get_iterator_fn_for_sources(
- sources, max_reader_threads=MAX_SOURCE_READER_THREADS):
+ sources, max_reader_threads=MAX_SOURCE_READER_THREADS, read_counter=None):
"""Returns callable that returns iterator over elements for given sources."""
def _inner():
return iter(PrefetchingSourceSetIterable(
- sources, max_reader_threads=max_reader_threads))
+ sources,
+ max_reader_threads=max_reader_threads,
+ read_counter=read_counter))
return _inner
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs_test.py
b/sdks/python/apache_beam/runners/worker/sideinputs_test.py
index d243bbe4e6e..296bed460d1 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs_test.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs_test.py
@@ -21,6 +21,8 @@
import time
import unittest
+import mock
+
from apache_beam.runners.worker import sideinputs
@@ -43,6 +45,11 @@ def __init__(self, items):
self.items = items
self.entered = False
self.exited = False
+ self._progress = mock.MagicMock()
+ self._progress.position.byte_offset = 0
+
+ def get_progress(self):
+ return self._progress
def __iter__(self):
return iter(self.items)
diff --git a/sdks/python/apache_beam/runners/worker/statesampler.pyx
b/sdks/python/apache_beam/runners/worker/statesampler.pyx
index f0527c6decc..2c3e3784f7b 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler.pyx
+++ b/sdks/python/apache_beam/runners/worker/statesampler.pyx
@@ -174,6 +174,13 @@ cdef class StateSampler(object):
# pythread doesn't support conditions.
self.sampling_thread.join()
+ def current_state(self):
+ """Returns the current ScopedState.
+
+ This operation is not thread safe, and should only be used to check, not to
+ update information in the current state."""
+ return self.scoped_states_by_index[self.current_state_index]
+
def stop_if_still_running(self):
if self.started and not self.finished:
self.stop()
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_fake.py
b/sdks/python/apache_beam/runners/worker/statesampler_fake.py
index bc56021520a..a106e437859 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler_fake.py
+++ b/sdks/python/apache_beam/runners/worker/statesampler_fake.py
@@ -17,14 +17,19 @@
# This module is experimental. No backwards-compatibility guarantees.
+from apache_beam.utils.counters import CounterName
+
class StateSampler(object):
def __init__(self, *args, **kwargs):
- pass
+ self._current_state = _FakeScopedState(self, 'unknown', 'unknown')
def scoped_state(self, step_name, state_name=None, io_target=None):
- return _FakeScopedState()
+ return _FakeScopedState(self, step_name, state_name)
+
+ def current_state(self):
+ return self._current_state
def start(self):
pass
@@ -41,11 +46,17 @@ def commit_counters(self):
class _FakeScopedState(object):
+ def __init__(self, sampler, step_name, state_name):
+ self.name = CounterName(state_name + '-msecs',
+ step_name=step_name)
+ self.sampler = sampler
+
def __enter__(self):
- pass
+ self.old_state = self.sampler.current_state()
+ self.sampler._current_state = self
def __exit__(self, *unused_args):
- pass
+ self.sampler._current_state = self.old_state
def sampled_seconds(self):
return 0
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Add tracking of bytes read / time spent when reading side inputs
> ----------------------------------------------------------------
>
> Key: BEAM-3042
> URL: https://issues.apache.org/jira/browse/BEAM-3042
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Reporter: Pablo Estrada
> Assignee: Pablo Estrada
>
> It is difficult for Dataflow users to understand how modifying a pipeline or
> data set can affect how much inter-transform IO is used in their job. The
> intent of this feature request is to help users understand how side inputs
> behave when they are consumed.
> This will allow users to understand how much time and how much data their
> pipeline uses to read/write to inter-transform IO. Users will also be able to
> modify their pipelines and understand how their changes affect these IO
> metrics.
> For further information, please review the internal Google doc
> go/insights-transform-io-design-doc.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)