[
https://issues.apache.org/jira/browse/FLINK-32680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-32680:
-----------------------------------
Labels: pull-request-available (was: )
> Job vertex names get messed up once there is a source vertex chained with a
> MultipleInput vertex in job graph
> -------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-32680
> URL: https://issues.apache.org/jira/browse/FLINK-32680
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.16.2, 1.18.0, 1.17.1
> Reporter: Lijie Wang
> Assignee: Junrui Li
> Priority: Major
> Labels: pull-request-available
> Attachments: image-2023-07-26-15-23-29-551.png,
> image-2023-07-26-15-24-24-077.png
>
>
> Take the following test(put it to {{MultipleInputITCase}}) as example:
> {code:java}
> @Test
> public void testMultipleInputDoesNotChainedWithSource() throws Exception {
> testJobVertexName(false);
> }
>
> @Test
> public void testMultipleInputChainedWithSource() throws Exception {
> testJobVertexName(true);
> }
> public void testJobVertexName(boolean chain) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> TestListResultSink<Long> resultSink = new TestListResultSink<>();
> DataStream<Long> source1 = env.fromSequence(0L, 3L).name("source1");
> DataStream<Long> source2 = env.fromElements(4L, 6L).name("source2");
> DataStream<Long> source3 = env.fromElements(7L, 9L).name("source3");
> KeyedMultipleInputTransformation<Long> transform =
> new KeyedMultipleInputTransformation<>(
> "MultipleInput",
> new KeyedSumMultipleInputOperatorFactory(),
> BasicTypeInfo.LONG_TYPE_INFO,
> 1,
> BasicTypeInfo.LONG_TYPE_INFO);
> if (chain) {
> transform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES);
> }
> KeySelector<Long, Long> keySelector = (KeySelector<Long, Long>) value
> -> value % 3;
> env.addOperator(
> transform
> .addInput(source1.getTransformation(), keySelector)
> .addInput(source2.getTransformation(), keySelector)
> .addInput(source3.getTransformation(), keySelector));
> new
> MultipleConnectedStreams(env).transform(transform).rebalance().addSink(resultSink).name("sink");
> env.execute();
> }{code}
>
> When we run {{testMultipleInputDoesNotChainedWithSource}} , all job vertex
> names are normal:
> !image-2023-07-26-15-24-24-077.png|width=494,height=246!
> When we run {{testMultipleInputChainedWithSource}} (the MultipleInput chained
> with source1), job vertex names get messed up (all job vertex names contain
> {{{}Source: source1{}}}):
> !image-2023-07-26-15-23-29-551.png|width=515,height=182!
>
> I think it's a bug.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)