[
https://issues.apache.org/jira/browse/FLINK-15775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17452647#comment-17452647
]
Benoît Paris commented on FLINK-15775:
--------------------------------------
Thanks a lot [~Terry1897] !
Do you know if we'll be able to submit createStatementSet in pure SQL in the
CLI?
> SourceFunctions are instantiated twice when pulled on from 2 Sinks
> ------------------------------------------------------------------
>
> Key: FLINK-15775
> URL: https://issues.apache.org/jira/browse/FLINK-15775
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.9.1, 1.10.0
> Reporter: Benoît Paris
> Priority: Minor
> Labels: auto-deprioritized-major
> Fix For: 1.11.0
>
> Attachments: flink-test-duplicated-sources.zip
>
>
> When pulled on by two sinks, the SourceFunctions of a TableSource will get
> instantiated twice; (and subsequently opened by the parallelism number, which
> is expected behavior):
> The following will instantiate the FooTableSource's SourceFunction once (OK
> behavior, but not the processing we want):
>
> {code:java}
> tEnv.registerTableSource("foo_table", new FooTableSource());
> Table out0 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 0");
> tEnv.registerTableSink("syso_sink_0", new SysoSink());
> out0.insertInto("syso_sink_0");
> {code}
>
> This will instantiate the FooTableSource's SourceFunction twice (Not OK, as
> we're missing half the inputs in each SysoSink):
>
> {code:java}
> tEnv.registerTableSource("foo_table", new FooTableSource());
> Table out0 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 0");
> Table out1 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 1");
> tEnv.registerTableSink("syso_sink_0", new SysoSink());
> tEnv.registerTableSink("syso_sink_1", new SysoSink());
> out0.insertInto("syso_sink_0");
> out1.insertInto("syso_sink_1");
> {code}
>
> This might not be a problem for Kafka's SourceFunctions, as we can always
> reread from a log; but it is a data loss problem when the source data can't
> be reproduced.
> Actually, this might be me not understanding the API. Is there a way to make
> the runtime read from the same opened SourceFunctions?
> Attached is Java code that logs the faulty opening of the SourceFunctions,
> pom.xml, and logical execution plans for the duplicated case, and the
> workaround.
>
> ----
> Workaround: make a conversion to an appendStream. Somehow this makes the
> planner think it has to put a materialization barrier after the Source and
> read from that:
>
> {code:java}
> tEnv.registerTableSource("foo_table_source", new FooTableSource());
> Table sourceTable = tEnv.sqlQuery("SELECT * FROM foo_table_source");
> Table appendingSourceTable = tEnv.fromDataStream(
> tEnv.toAppendStream(sourceTable, Types.ROW(new String[]{"field_1"}, new
> TypeInformation[]{Types.LONG()}))
> );
> tEnv.registerTable("foo_table", appendingSourceTable);{code}
>
>
> Best Regards,
> Ben
--
This message was sent by Atlassian Jira
(v8.20.1#820001)