[ 
https://issues.apache.org/jira/browse/FLINK-25342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17460540#comment-17460540
 ] 

Hang Ruan edited comment on FLINK-25342 at 12/16/21, 9:55 AM:
--------------------------------------------------------------

[~fpaul] Thanks for replying.

So if we want to get all sinks from the StreamGraph, we should walk through the 
StreamNodes instead of using this field, right? So as the sources.

Please close this ticket.

 


was (Author: ruanhang1993):
[~fpaul] Thanks for replying.

So if we want to get all sinks from the StreamGraph, we should walk through the 
StreamNodes instead of using this field, right? So as the sources.

I will close this ticket later.

 

> DataStream.sinkTo will not add sink to the sinks field in the StreamGraph
> -------------------------------------------------------------------------
>
>                 Key: FLINK-25342
>                 URL: https://issues.apache.org/jira/browse/FLINK-25342
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.14.0
>            Reporter: Hang Ruan
>            Priority: Minor
>         Attachments: streamGraph.png
>
>
> I run a test in my IDEA and watch the generated StreamGraph. It seems like 
> the sink is not in the field sinks in the StreamGraph. My test is as follows:
> {code:java}
> @Test
> public void selfTest() throws Exception {
>     StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>     DataStream<String> source = execEnv.fromSource(
>             KafkaSource.<String>builder()
>                     .setGroupId("flink-kafka-test")
>                     .setDeserializer(
>                             
> KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
>                     .setTopics("scaleDownTest")
>                     .setBootstrapServers("localhost:9092")
>                     .build(),
>             WatermarkStrategy.noWatermarks(), "Kafka Source");
>     Properties props = new Properties();
>     props.setProperty("transaction.timeout.ms", "900000");
>     source.sinkTo(KafkaSink.<String>builder()
>             .setBootstrapServers("localhost:9092")
>             .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
>             .setTransactionalIdPrefix("tp-test-")
>             .setKafkaProducerConfig(props)
>             .setRecordSerializer(new 
> SelfSerializationSchema("scaleDownTestSink1", new SimpleStringSchema()))
>             .build());
>     execEnv.execute("ScaleDownTest");
> } {code}
> The screen shot lies in the attachments.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to