[ 
https://issues.apache.org/jira/browse/FLINK-25342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hang Ruan updated FLINK-25342:
------------------------------
    Description: 
I run a test in my IDEA and watch the generated StreamGraph. It seems like the 
sink is not in the field sinks in the StreamGraph. My test is as follows:
{code:java}
@Test
public void selfTest() throws Exception {
    StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<String> source = execEnv.fromSource(
            KafkaSource.<String>builder()
                    .setGroupId("flink-kafka-test")
                    .setDeserializer(
                            
KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
                    .setTopics("scaleDownTest")
                    .setBootstrapServers("localhost:9092")
                    .build(),
            WatermarkStrategy.noWatermarks(), "Kafka Source");

    Properties props = new Properties();
    props.setProperty("transaction.timeout.ms", "900000");
    source.sinkTo(KafkaSink.<String>builder()
            .setBootstrapServers("localhost:9092")
            .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
            .setTransactionalIdPrefix("tp-test-")
            .setKafkaProducerConfig(props)
            .setRecordSerializer(new 
SelfSerializationSchema("scaleDownTestSink1", new SimpleStringSchema()))
            .build());

    execEnv.execute("ScaleDownTest");
} {code}

The screen shot lies in the attachments.

  was:
I run a test in my IDEA and watch the generated StreamGraph. It seems like the 
sink is not in the field sinks in the StreamGraph. My test is as follows:
{code:java}
@Test
public void selfTest() throws Exception {
    StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<String> source = execEnv.fromSource(
            KafkaSource.<String>builder()
                    .setGroupId("flink-kafka-test")
                    .setDeserializer(
                            
KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
                    .setTopics("scaleDownTest")
                    .setBootstrapServers("localhost:9092")
                    .build(),
            WatermarkStrategy.noWatermarks(), "Kafka Source");

    Properties props = new Properties();
    props.setProperty("transaction.timeout.ms", "900000");
    source.sinkTo(KafkaSink.<String>builder()
            .setBootstrapServers("localhost:9092")
            .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
            .setTransactionalIdPrefix("tp-test-")
            .setKafkaProducerConfig(props)
            .setRecordSerializer(new 
SelfSerializationSchema("scaleDownTestSink1", new SimpleStringSchema()))
            .build());

    execEnv.execute("ScaleDownTest");
} {code}




> DataStream.sinkTo will not add sink to the sinks field in the StreamGraph
> -------------------------------------------------------------------------
>
>                 Key: FLINK-25342
>                 URL: https://issues.apache.org/jira/browse/FLINK-25342
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.14.0
>            Reporter: Hang Ruan
>            Priority: Minor
>         Attachments: streamGraph.png
>
>
> I run a test in my IDEA and watch the generated StreamGraph. It seems like 
> the sink is not in the field sinks in the StreamGraph. My test is as follows:
> {code:java}
> @Test
> public void selfTest() throws Exception {
>     StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>     DataStream<String> source = execEnv.fromSource(
>             KafkaSource.<String>builder()
>                     .setGroupId("flink-kafka-test")
>                     .setDeserializer(
>                             
> KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
>                     .setTopics("scaleDownTest")
>                     .setBootstrapServers("localhost:9092")
>                     .build(),
>             WatermarkStrategy.noWatermarks(), "Kafka Source");
>     Properties props = new Properties();
>     props.setProperty("transaction.timeout.ms", "900000");
>     source.sinkTo(KafkaSink.<String>builder()
>             .setBootstrapServers("localhost:9092")
>             .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
>             .setTransactionalIdPrefix("tp-test-")
>             .setKafkaProducerConfig(props)
>             .setRecordSerializer(new 
> SelfSerializationSchema("scaleDownTestSink1", new SimpleStringSchema()))
>             .build());
>     execEnv.execute("ScaleDownTest");
> } {code}
> The screen shot lies in the attachments.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to