[
https://issues.apache.org/jira/browse/BEAM-12127?focusedWorklogId=579448&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-579448
]
ASF GitHub Bot logged work on BEAM-12127:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 08/Apr/21 18:26
Start Date: 08/Apr/21 18:26
Worklog Time Spent: 10m
Work Description: scwhittle commented on a change in pull request #14473:
URL: https://github.com/apache/beam/pull/14473#discussion_r609983126
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
##########
@@ -213,38 +211,39 @@ public MetricTrackingFnDataReceiver(
this.pTransformId = consumerAndMetadata.getPTransformId();
HashMap<String, String> labels = new HashMap<String, String>();
labels.put(Labels.PCOLLECTION, pCollectionId);
+
+ // Collect the metric in a metric container which is not bound to the
step name.
+ // This is required to count elements from impulse steps, which will
produce elements outside
+ // of a pTransform context.
+ MetricsContainer unboundMetricContainer =
metricsContainerRegistry.getUnboundContainer();
+
MonitoringInfoMetricName elementCountMetricName =
MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.ELEMENT_COUNT,
labels);
- this.elementCountCounter =
LabeledMetrics.counter(elementCountMetricName);
+ this.unboundedElementCountCounter =
unboundMetricContainer.getCounter(elementCountMetricName);
+
MonitoringInfoMetricName sampledByteSizeMetricName =
MonitoringInfoMetricName.named(Urns.SAMPLED_BYTE_SIZE, labels);
- this.sampledByteSizeDistribution =
- new
SampleByteSizeDistribution<>(LabeledMetrics.distribution(sampledByteSizeMetricName));
- this.coder = consumerAndMetadata.getValueCoder();
+ this.unboundSampledByteSizeDistribution =
+ new SampleByteSizeDistribution<>(
+
unboundMetricContainer.getDistribution(sampledByteSizeMetricName));
- // Collect the metric in a metric container which is not bound to the
step name.
- // This is required to count elements from impulse steps, which will
produce elements outside
- // of a pTransform context.
- this.unboundMetricContainer =
metricsContainerRegistry.getUnboundContainer();
+ this.coder = consumerAndMetadata.getValueCoder();
}
@Override
public void accept(WindowedValue<T> input) throws Exception {
- try (Closeable close =
-
MetricsEnvironment.scopedMetricsContainer(this.unboundMetricContainer)) {
- // Increment the counter for each window the element occurs in.
- this.elementCountCounter.inc(input.getWindows().size());
- // TODO(BEAM-11879): Consider updating size per window when we have
window optimization.
- this.sampledByteSizeDistribution.tryUpdate(input.getValue(),
this.coder);
- // Wrap the consumer with extra logic to set the metric container with
the appropriate
- // PTransform context. This ensures that user metrics obtain the
pTransform ID when they are
- // created. Also use the ExecutionStateTracker and enter an
appropriate state to track the
- // Process Bundle Execution time metric.
- MetricsContainerImpl container =
metricsContainerRegistry.getContainer(pTransformId);
- try (Closeable closeable =
MetricsEnvironment.scopedMetricsContainer(container)) {
- try (Closeable trackerCloseable = stateTracker.enterState(state)) {
- this.delegate.accept(input);
- }
+ // Increment the counter for each window the element occurs in.
+ this.unboundedElementCountCounter.inc(input.getWindows().size());
Review comment:
my reasoning on why this was a no-op:
- the scoped metric containers stuffs in thread-local and delegating counter
pulls it out
- the scoping is not transitive, only the last scope matters
- we are now caching the unboundedMetricContainer.getCounter() result
instead of recalculating it every accept. However the contaner appears to
reset counters but not remove them.
I was hoping existing tests would cover correctness since I'm not familiar
with this code.
@kileys can you take a look? Thanks
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
##########
@@ -213,38 +211,39 @@ public MetricTrackingFnDataReceiver(
this.pTransformId = consumerAndMetadata.getPTransformId();
HashMap<String, String> labels = new HashMap<String, String>();
labels.put(Labels.PCOLLECTION, pCollectionId);
+
+ // Collect the metric in a metric container which is not bound to the
step name.
+ // This is required to count elements from impulse steps, which will
produce elements outside
Review comment:
I don't really either, I just moved it from below.
Maybe we can do better and we can somehow ensure it *only* happens for
impulse steps
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 579448)
Time Spent: 1h (was: 50m)
> Reduce counter overhead in PCollectionConsumerRegistry.accept
> -------------------------------------------------------------
>
> Key: BEAM-12127
> URL: https://issues.apache.org/jira/browse/BEAM-12127
> Project: Beam
> Issue Type: Bug
> Components: java-fn-execution
> Reporter: Sam Whittle
> Assignee: Sam Whittle
> Priority: P2
> Time Spent: 1h
> Remaining Estimate: 0h
>
> DelegatingCounter.inc shows up as 21% of cpu on nexmark query 2 benchmark
> under PCollectionConsumerRegistry.accept
> 2% is actual counter incrementing, but the majority is the delegation of
> DelegatingCounter which involves looking up thread-local state and then
> getting the counter for the name from the counter container. However in this
> case the counter container is known and can just be bound when constructing
> the counter instaed of using DelegatingCounter.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)