apilloud commented on code in PR #22002:
URL: https://github.com/apache/beam/pull/22002#discussion_r907880931
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java:
##########
@@ -138,24 +158,11 @@ public <T> void register(
labelsMetadata);
executionStates.register(state);
- pCollectionIdsToConsumers.put(
- pCollectionId,
+ List<ConsumerAndMetadata> consumerAndMetadatas =
+ pCollectionIdsToConsumers.computeIfAbsent(pCollectionId, (unused) ->
new ArrayList<>());
+ consumerAndMetadatas.add(
ConsumerAndMetadata.forConsumer(
- consumer,
- pTransformId,
- state,
- valueCoder,
- metricsContainerRegistry.getContainer(pTransformId)));
- }
-
- /** Reset the execution states of the registered functions. */
- public void reset() {
Review Comment:
nit: it looks like this just moved ~40 lines but didn't change...
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java:
##########
@@ -663,10 +695,47 @@ private ImmutableMap<String, ByteString>
monitoringData(BundleProcessor bundlePr
ByteString payload = monitoringInfo.getPayload();
String shortId =
shortIds.getOrCreateShortId(monitoringInfo.toBuilder().clearPayload().build());
- result.put(shortId, payload);
+ monitoringData.put(shortId, payload);
}
}
- return result.build();
+ bundleProcessor
+ .getBundleProgressReporterAndRegistrar()
+ .updateIntermediateMonitoringData(monitoringData);
Review Comment:
I'm not finding where either IntermediateMonitoringData or
FinalMonitoringData below is used. It looks like they are being newly set in
this change. Are they unused? What am I missing?
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java:
##########
@@ -209,44 +235,59 @@ public Map<String, ByteString>
getExecutionTimeMonitoringData(ShortIdMap shortId
private class MetricTrackingFnDataReceiver<T> implements
FnDataReceiver<WindowedValue<T>> {
private final FnDataReceiver<WindowedValue<T>> delegate;
private final SimpleExecutionState state;
- private final Counter unboundedElementCountCounter;
- private final SampleByteSizeDistribution<T>
unboundedSampledByteSizeDistribution;
+ private final BundleCounter elementCountCounter;
+ private final SampleByteSizeDistribution<T> sampledByteSizeDistribution;
private final Coder<T> coder;
private final MetricsContainer metricsContainer;
public MetricTrackingFnDataReceiver(
- String pCollectionId, ConsumerAndMetadata consumerAndMetadata) {
+ String pCollectionId, Coder<T> coder, ConsumerAndMetadata
consumerAndMetadata) {
this.delegate = consumerAndMetadata.getConsumer();
this.state = consumerAndMetadata.getExecutionState();
- HashMap<String, String> labels = new HashMap<String, String>();
- labels.put(Labels.PCOLLECTION, pCollectionId);
-
- // Collect the metric in a metric container which is not bound to the
step name.
- // This is required to count elements from impulse steps, which will
produce elements outside
- // of a pTransform context.
- MetricsContainer unboundMetricContainer =
metricsContainerRegistry.getUnboundContainer();
+ HashMap<String, String> labels = new HashMap<>();
+ labels.put(Labels.PCOLLECTION, pCollectionId);
Review Comment:
nit: looks like this line didn't change at all but the surrounding
whitespace did...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]