[ https://issues.apache.org/jira/browse/FLINK-32603 ]
Jiang Xin deleted comment on FLINK-32603:
-----------------------------------
was (Author: jiang xin):
A solution is to share a `Map<ObjectIdentifier, Transformation>` using
`org.apache.flink.table.factories.PlannerFactoryUtil$Context` between planners.
The Map is used to cache the Source ID along with the translated
transformation. Every time a source table is to be translated, we find it in
the Map first, if exists, return the cached, or else register it into the Map.
> Avoid consuming twice when two pipelines have the same table source
> -------------------------------------------------------------------
>
> Key: FLINK-32603
> URL: https://issues.apache.org/jira/browse/FLINK-32603
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Planner
> Reporter: Jiang Xin
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Here is an example to describe the issue. We have a source table that
> generates numbers from 1 to 5. Then we derive two tables from the source and
> convert them to datastream and sink to console.
> If we debug the program, we can find that the
> `org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource`
> is created twice and the numbers are generated twice. It is a waste to
> consume the same source data twice.
> {code:java}
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().enableObjectReuse();
> env.setParallelism(1);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> tEnv.executeSql(
> "CREATE TABLE source(\n"
> + " f_sequence INT\n"
> + ") WITH (\n"
> + " 'connector' = 'datagen',\n"
> + " 'rows-per-second' ='1',\n"
> + " 'fields.f_sequence.kind' ='sequence',\n"
> + " 'fields.f_sequence.start'='1',\n"
> + " 'fields.f_sequence.end'='5'\n"
> + ")")
> .await();
> Table source = tEnv.from("source");
> Table left = source.filter($("f_sequence").isGreater(3));
> Table right = source.filter($("f_sequence").isLessOrEqual(3));
> DataStream<Row> leftDataStream = tEnv.toDataStream(left);
> DataStream<Row> rightDataStream = tEnv.toDataStream(right);
> leftDataStream.addSink(new PrintSinkFunction<>());
> rightDataStream.addSink(new PrintSinkFunction<>());
> env.execute();
> } {code}
> The reason is that every time the `StreamTableEnvironmentImpl#toDataStream`
> is called, a new planner is created and translates the graph from the end
> nodes. So the two graphs do not aware that they have the same source node.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)