[
https://issues.apache.org/jira/browse/FLINK-25342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hang Ruan updated FLINK-25342:
------------------------------
Description:
I run a test in my IDEA and watch the generated StreamGraph. It seems like the
sink is not in the field sinks in the StreamGraph. My test is as follows:
{code:java}
@Test
public void selfTest() throws Exception {
StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> source = execEnv.fromSource(
KafkaSource.<String>builder()
.setGroupId("flink-kafka-test")
.setDeserializer(
KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.setTopics("scaleDownTest")
.setBootstrapServers("localhost:9092")
.build(),
WatermarkStrategy.noWatermarks(), "Kafka Source");
Properties props = new Properties();
props.setProperty("transaction.timeout.ms", "900000");
source.sinkTo(KafkaSink.<String>builder()
.setBootstrapServers("localhost:9092")
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("tp-test-")
.setKafkaProducerConfig(props)
.setRecordSerializer(new
SelfSerializationSchema("scaleDownTestSink1", new SimpleStringSchema()))
.build());
execEnv.execute("ScaleDownTest");
} {code}
The screen shot lies in the attachments.
was:
I run a test in my IDEA and watch the generated StreamGraph. It seems like the
sink is not in the field sinks in the StreamGraph. My test is as follows:
{code:java}
@Test
public void selfTest() throws Exception {
StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> source = execEnv.fromSource(
KafkaSource.<String>builder()
.setGroupId("flink-kafka-test")
.setDeserializer(
KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.setTopics("scaleDownTest")
.setBootstrapServers("localhost:9092")
.build(),
WatermarkStrategy.noWatermarks(), "Kafka Source");
Properties props = new Properties();
props.setProperty("transaction.timeout.ms", "900000");
source.sinkTo(KafkaSink.<String>builder()
.setBootstrapServers("localhost:9092")
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("tp-test-")
.setKafkaProducerConfig(props)
.setRecordSerializer(new
SelfSerializationSchema("scaleDownTestSink1", new SimpleStringSchema()))
.build());
execEnv.execute("ScaleDownTest");
} {code}
> DataStream.sinkTo will not add sink to the sinks field in the StreamGraph
> -------------------------------------------------------------------------
>
> Key: FLINK-25342
> URL: https://issues.apache.org/jira/browse/FLINK-25342
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.14.0
> Reporter: Hang Ruan
> Priority: Minor
> Attachments: streamGraph.png
>
>
> I run a test in my IDEA and watch the generated StreamGraph. It seems like
> the sink is not in the field sinks in the StreamGraph. My test is as follows:
> {code:java}
> @Test
> public void selfTest() throws Exception {
> StreamExecutionEnvironment execEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream<String> source = execEnv.fromSource(
> KafkaSource.<String>builder()
> .setGroupId("flink-kafka-test")
> .setDeserializer(
>
> KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
> .setTopics("scaleDownTest")
> .setBootstrapServers("localhost:9092")
> .build(),
> WatermarkStrategy.noWatermarks(), "Kafka Source");
> Properties props = new Properties();
> props.setProperty("transaction.timeout.ms", "900000");
> source.sinkTo(KafkaSink.<String>builder()
> .setBootstrapServers("localhost:9092")
> .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
> .setTransactionalIdPrefix("tp-test-")
> .setKafkaProducerConfig(props)
> .setRecordSerializer(new
> SelfSerializationSchema("scaleDownTestSink1", new SimpleStringSchema()))
> .build());
> execEnv.execute("ScaleDownTest");
> } {code}
> The screen shot lies in the attachments.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)