[
https://issues.apache.org/jira/browse/FLINK-37971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18001381#comment-18001381
]
Xuyang Zhong commented on FLINK-37971:
--------------------------------------
I tried to reproduce this bug but failed.
{code:java}
@Test
def test(): Unit = {
util.tableEnv.executeSql(s"""
|create temporary table src (
| a int primary key not enforced,
| b int
|) with (
| 'connector' = 'values'
|)
|""".stripMargin)
util.tableEnv.executeSql(s"""
|create temporary table snk1 (
| a int
|) with (
| 'connector' = 'values',
| 'sink-insert-only' = 'false'
|)
|""".stripMargin)
util.tableEnv.executeSql(s"""
|create temporary table snk2 (
| a int
|) with (
| 'connector' = 'values',
| 'sink-insert-only' = 'false'
|)
|""".stripMargin)
val stmt = util.tableEnv.createStatementSet()
stmt.addInsertSql(s"""
| insert into snk1 select a from src
| /*+ OPTIONS('changelog-mode'='I,UA')*/
| where a > 10
|""".stripMargin)
stmt.addInsertSql(s"""
| insert into snk2 select b from src
| /*+ OPTIONS('changelog-mode'='I,UA')*/
| where a < 10
|""".stripMargin)
util.verifyRelPlan(stmt)
} {code}
!image-2025-07-07-13-53-24-292.png|width=680,height=364!
[~mxm] Could you please provide more details?
In theory, the `DynamicTableSourceSpec` in
[https://github.com/apache/flink/blob/ef5b03297092c5eda7c8862f1d7eb08aa10e480b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java#L212]
uses the correct resolved table that contains the options merged with query
hint before by `CatalogSourceTable#computeContextResolvedTable`. So I'm curious
where the problem lies...
> SQL hints are dropped when combining multiple SELECT into a single source
> -------------------------------------------------------------------------
>
> Key: FLINK-37971
> URL: https://issues.apache.org/jira/browse/FLINK-37971
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner, Table SQL / Runtime
> Affects Versions: 2.0.0, 1.19.2, 1.20.1
> Reporter: Maximilian Michels
> Priority: Major
> Attachments: image-2025-07-07-13-53-24-292.png
>
>
> The IcebergSource supports using SQL hints to set some runtime properties
> like whether to run in batch or streaming mode. For example, this creates an
> Iceberg source and an Iceberg sink:
> {noformat}
> INSERT into output
> SELECT * FROM sample /*+ OPTIONS('streaming'='true',
> 'monitor-interval'='1s')*/ ;
> {noformat}
> In the below query, the resulting DAG would contain only one IcebergSource
> and two Iceberg sinks:
> {noformat}
> INSERT into output1
> SELECT * FROM sample /*+ OPTIONS('streaming'='true',
> 'monitor-interval'='1s')*/ WHERE ... ;
> INSERT INTO output2
> SELECT * FROM sample /*+ OPTIONS('streaming'='true',
> 'monitor-interval'='1s')*/ WHERE ... ;
> {noformat}
> However, the SQL hints will not be available when creating the source, i.e.
> the {{FlinkDynamicTableFactory#createDynamicTableSource(Context context)}}
> will not receive the options derived from the SQL hint via
> {{context.getCatalogTable().getOptions()}}, as is the case before the
> optimizer combines the two SELECT statements.
> It starts to work again when {{table.optimizer.reuse-source-enabled: false}}
> is set.
> The issue manifests here, where the new {{ScanTableSource}} does not have the
> properties supplied in the SQL hints:
> https://github.com/apache/flink/blob/ef5b03297092c5eda7c8862f1d7eb08aa10e480b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java#L212
> For the issue to surface, these two if blocks need to be circumvented:
> https://github.com/apache/flink/blob/ef5b03297092c5eda7c8862f1d7eb08aa10e480b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java#L143
--
This message was sent by Atlassian Jira
(v8.20.10#820010)