[
https://issues.apache.org/jira/browse/BEAM-5791?focusedWorklogId=156860&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-156860
]
ASF GitHub Bot logged work on BEAM-5791:
----------------------------------------
Author: ASF GitHub Bot
Created on: 22/Oct/18 10:29
Start Date: 22/Oct/18 10:29
Worklog Time Spent: 10m
Work Description: robertwb closed pull request #6751: [BEAM-5791] Improve
Python SDK progress counters.
URL: https://github.com/apache/beam/pull/6751
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 834eb5c63bb..6164f11ccb3 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -515,6 +515,9 @@ def wrapper(func):
def create_operation(self, transform_id, consumers):
transform_proto = self.descriptor.transforms[transform_id]
+ if not transform_proto.unique_name:
+ logging.warn("No unique name set for transform %s" % transform_id)
+ transform_proto.unique_name = transform_id
creator, parameter_type = self._known_urns[transform_proto.spec.urn]
payload = proto_utils.parse_Bytes(
transform_proto.spec.payload, parameter_type)
diff --git a/sdks/python/apache_beam/runners/worker/opcounters.pxd
b/sdks/python/apache_beam/runners/worker/opcounters.pxd
index 1d7f296c5ce..8957a5bdb31 100644
--- a/sdks/python/apache_beam/runners/worker/opcounters.pxd
+++ b/sdks/python/apache_beam/runners/worker/opcounters.pxd
@@ -56,6 +56,7 @@ cdef class OperationCounters(object):
cdef public Counter mean_byte_counter
cdef public coder_impl
cdef public SumAccumulator active_accumulator
+ cdef public object current_size
cdef public libc.stdint.int64_t _sample_counter
cdef public libc.stdint.int64_t _next_sample
diff --git a/sdks/python/apache_beam/runners/worker/opcounters.py
b/sdks/python/apache_beam/runners/worker/opcounters.py
index cdbb27a0a77..ba629e17584 100644
--- a/sdks/python/apache_beam/runners/worker/opcounters.py
+++ b/sdks/python/apache_beam/runners/worker/opcounters.py
@@ -185,12 +185,12 @@ def __init__(self, counter_factory, step_name, coder,
output_index):
'%s-out%s-MeanByteCount' % (step_name, output_index), Counter.MEAN)
self.coder_impl = coder.get_impl() if coder else None
self.active_accumulator = None
+ self.current_size = None
self._sample_counter = 0
self._next_sample = 0
def update_from(self, windowed_value):
"""Add one value to this counter."""
- self.element_counter.update(1)
if self._should_sample():
self.do_sample(windowed_value)
@@ -212,7 +212,7 @@ def do_sample(self, windowed_value):
size, observables = (
self.coder_impl.get_estimated_size_and_observables(windowed_value))
if not observables:
- self.mean_byte_counter.update(size)
+ self.current_size = size
else:
self.active_accumulator = SumAccumulator()
self.active_accumulator.update(size)
@@ -227,7 +227,11 @@ def update_collect(self):
Now that the element has been processed, we ask our accumulator
for the total and store the result in a counter.
"""
- if self.active_accumulator is not None:
+ self.element_counter.update(1)
+ if self.current_size is not None:
+ self.mean_byte_counter.update(self.current_size)
+ self.current_size = None
+ elif self.active_accumulator is not None:
self.mean_byte_counter.update(self.active_accumulator.value())
self.active_accumulator = None
diff --git a/sdks/python/apache_beam/runners/worker/opcounters_test.py
b/sdks/python/apache_beam/runners/worker/opcounters_test.py
index d45213f3616..ba87d14659c 100644
--- a/sdks/python/apache_beam/runners/worker/opcounters_test.py
+++ b/sdks/python/apache_beam/runners/worker/opcounters_test.py
@@ -109,6 +109,7 @@ def test_update_int(self):
coders.PickleCoder(), 0)
self.verify_counters(opcounts, 0)
opcounts.update_from(GlobalWindows.windowed_value(1))
+ opcounts.update_collect()
self.verify_counters(opcounts, 1)
def test_update_str(self):
@@ -118,6 +119,7 @@ def test_update_str(self):
self.verify_counters(opcounts, 0, float('nan'))
value = GlobalWindows.windowed_value('abcde')
opcounts.update_from(value)
+ opcounts.update_collect()
estimated_size = coder.estimate_size(value)
self.verify_counters(opcounts, 1, estimated_size)
@@ -129,6 +131,7 @@ def test_update_old_object(self):
obj = OldClassThatDoesNotImplementLen()
value = GlobalWindows.windowed_value(obj)
opcounts.update_from(value)
+ opcounts.update_collect()
estimated_size = coder.estimate_size(value)
self.verify_counters(opcounts, 1, estimated_size)
@@ -141,6 +144,7 @@ def test_update_new_object(self):
obj = ObjectThatDoesNotImplementLen()
value = GlobalWindows.windowed_value(obj)
opcounts.update_from(value)
+ opcounts.update_collect()
estimated_size = coder.estimate_size(value)
self.verify_counters(opcounts, 1, estimated_size)
@@ -152,13 +156,16 @@ def test_update_multiple(self):
self.verify_counters(opcounts, 0, float('nan'))
value = GlobalWindows.windowed_value('abcde')
opcounts.update_from(value)
+ opcounts.update_collect()
total_size += coder.estimate_size(value)
value = GlobalWindows.windowed_value('defghij')
opcounts.update_from(value)
+ opcounts.update_collect()
total_size += coder.estimate_size(value)
self.verify_counters(opcounts, 2, (float(total_size) / 2))
value = GlobalWindows.windowed_value('klmnop')
opcounts.update_from(value)
+ opcounts.update_collect()
total_size += coder.estimate_size(value)
self.verify_counters(opcounts, 3, (float(total_size) / 3))
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 156860)
Time Spent: 40m (was: 0.5h)
> Bound the amount of data on the data plane by time.
> ---------------------------------------------------
>
> Key: BEAM-5791
> URL: https://issues.apache.org/jira/browse/BEAM-5791
> Project: Beam
> Issue Type: Improvement
> Components: runner-dataflow, sdk-java-harness, sdk-py-harness
> Reporter: Robert Bradshaw
> Assignee: Henning Rohde
> Priority: Major
> Time Spent: 40m
> Remaining Estimate: 0h
>
> This is especially important for Fn API reads, where each element represents
> a shard to read and may be very expensive, but many elements may be waiting
> in the Fn API buffer.
> The need for this will be mitigated with full SDF support for liquid sharding
> over the Fn API, but not eliminated unless the runner can "unread" elements
> it has already sent.
> This is especially important in for dataflow jobs that start out small but
> then detect that they need more workers (e.g. due to the initial inputs being
> an SDF).
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)