[
https://issues.apache.org/jira/browse/BEAM-3281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16400476#comment-16400476
]
Dawid Wysakowicz commented on BEAM-3281:
----------------------------------------
Fixed in [BEAM-3043]
> PTransform name not being propagated to the Flink Web UI
> --------------------------------------------------------
>
> Key: BEAM-3281
> URL: https://issues.apache.org/jira/browse/BEAM-3281
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Affects Versions: 2.1.0
> Reporter: Thalita Vergilio
> Assignee: Aljoscha Krettek
> Priority: Minor
> Labels: flink
> Fix For: 2.5.0
>
> Attachments: flink-dashboard.PNG
>
>
> This could be related to BEAM-1107, which was logged for Flink Batch
> processing.
> I am experiencing a similar issue for stream processing. I would have
> expected the name passed to
> {code:java}
> pipeline.apply(String name, PTransform<? super PBegin,OutputT> root)
> {code}
> to be propagated to the Flink Web UI.
> The documentation seems to suggest that this was the intended functionality:
> https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/Pipeline.html#apply-java.lang.String-org.apache.beam.sdk.transforms.PTransform-
> Here is some sample code setting the name:
> {code:java}
> p.apply("Apply Windowing Function",
> Window.into(FixedWindows.of(Duration.standardSeconds(10))))
> .apply("Transform the Pipeline to Key by Window",
> ParDo.of(
> new DoFn<KafkaRecord<byte[], byte[]>,
> KV<IntervalWindow, KafkaRecord<byte[], byte[]>>>() {
> @ProcessElement
> public void processElement(ProcessContext
> context, IntervalWindow window) {
> context.output(KV.of(window,
> context.element()));
> }
> }))
> .apply("Group by Key (window)", GroupByKey.create())
> .apply("Calculate PUE", ParDo.of(new PueCalculatorFn()))
> .apply("Write output to Kafka",
> KafkaIO.<IntervalWindowResult, PueResult>write()
> .withBootstrapServers(KAFKA_IP + ":" + KAFKA_PORT)
> .withTopic("results")
>
> .withKeySerializer(IntervalWindowResultSerialiser.class)
> .withValueSerializer(PueResultSerialiser.class)
> );
> {code}
> I will upload a screenshot of the results.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)