Sanil15 commented on code in PR #26437:
URL: https://github.com/apache/beam/pull/26437#discussion_r1181783009
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java:
##########
@@ -283,4 +323,13 @@ public StoreIdGenerator getStoreIdGenerator() {
sendFn.accept(new EndOfStreamMessage(null));
return dummyInput;
}
+
+ boolean shouldDoAttachMetricOp(Config config, boolean enableTransformMetric)
{
Review Comment:
done
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java:
##########
@@ -159,12 +182,29 @@ public MessageStream<OpMessage<String>> getDummyStream() {
}
public <OutT> MessageStream<OpMessage<OutT>> getMessageStream(PValue pvalue)
{
+ return getMessageStream(pvalue, true);
+ }
+
+ public <OutT> MessageStream<OpMessage<OutT>> getMessageStream(
+ PValue pvalue, boolean enableTransformMetric) {
@SuppressWarnings("unchecked")
final MessageStream<OpMessage<OutT>> stream =
(MessageStream<OpMessage<OutT>>) messsageStreams.get(pvalue);
if (stream == null) {
throw new IllegalArgumentException("No stream registered for pvalue: " +
pvalue);
}
+
+ // add a step to attach InputMetricOp if registered for Op Stream
+ final Config overrideConfig = new
MapConfig(getPipelineOptions().getConfigOverride());
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]