[ 
https://issues.apache.org/jira/browse/FLINK-37971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18001381#comment-18001381
 ] 

Xuyang Zhong commented on FLINK-37971:
--------------------------------------

I tried to reproduce this bug but failed.
{code:java}
@Test
def test(): Unit = {
  util.tableEnv.executeSql(s"""
                              |create temporary table src (
                              | a int primary key not enforced,
                              | b int
                              |) with (
                              | 'connector' = 'values'
                              |)
                              |""".stripMargin)

  util.tableEnv.executeSql(s"""
                              |create temporary table snk1 (
                              | a int
                              |) with (
                              | 'connector' = 'values',
                              | 'sink-insert-only' = 'false'
                              |)
                              |""".stripMargin)

  util.tableEnv.executeSql(s"""
                              |create temporary table snk2 (
                              | a int
                              |) with (
                              | 'connector' = 'values',
                              | 'sink-insert-only' = 'false'
                              |)
                              |""".stripMargin)

  val stmt = util.tableEnv.createStatementSet()
  stmt.addInsertSql(s"""
                       | insert into snk1 select a from src
                       |  /*+ OPTIONS('changelog-mode'='I,UA')*/
                       |  where a > 10
                       |""".stripMargin)

  stmt.addInsertSql(s"""
                       | insert into snk2 select b from src
                       |  /*+ OPTIONS('changelog-mode'='I,UA')*/
                       |  where a < 10
                       |""".stripMargin)

  util.verifyRelPlan(stmt)
} {code}
!image-2025-07-07-13-53-24-292.png|width=680,height=364!

[~mxm] Could you please provide more details?

In theory, the `DynamicTableSourceSpec` in 
[https://github.com/apache/flink/blob/ef5b03297092c5eda7c8862f1d7eb08aa10e480b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java#L212]
 uses the correct resolved table that contains the options merged with query 
hint before by `CatalogSourceTable#computeContextResolvedTable`. So I'm curious 
where the problem lies...

> SQL hints are dropped when combining multiple SELECT into a single source
> -------------------------------------------------------------------------
>
>                 Key: FLINK-37971
>                 URL: https://issues.apache.org/jira/browse/FLINK-37971
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner, Table SQL / Runtime
>    Affects Versions: 2.0.0, 1.19.2, 1.20.1
>            Reporter: Maximilian Michels
>            Priority: Major
>         Attachments: image-2025-07-07-13-53-24-292.png
>
>
> The IcebergSource supports using SQL hints to set some runtime properties 
> like whether to run in batch or streaming mode. For example, this creates an 
> Iceberg source and an Iceberg sink:
> {noformat}
> INSERT into output
> SELECT * FROM sample /*+ OPTIONS('streaming'='true', 
> 'monitor-interval'='1s')*/ ;
> {noformat}
> In the below query, the resulting DAG would contain only one IcebergSource 
> and two Iceberg sinks:
> {noformat}
> INSERT into output1
> SELECT * FROM sample /*+ OPTIONS('streaming'='true', 
> 'monitor-interval'='1s')*/ WHERE ... ;
> INSERT INTO output2
> SELECT * FROM sample /*+ OPTIONS('streaming'='true', 
> 'monitor-interval'='1s')*/ WHERE ... ;
> {noformat}
> However, the SQL hints will not be available when creating the source, i.e. 
> the {{FlinkDynamicTableFactory#createDynamicTableSource(Context context)}} 
> will not receive the options derived from the SQL hint via 
> {{context.getCatalogTable().getOptions()}}, as is the case before the 
> optimizer combines the two SELECT statements.
> It starts to work again when {{table.optimizer.reuse-source-enabled: false}} 
> is set.
> The issue manifests here, where the new {{ScanTableSource}} does not have the 
> properties supplied in the SQL hints: 
> https://github.com/apache/flink/blob/ef5b03297092c5eda7c8862f1d7eb08aa10e480b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java#L212
> For the issue to surface, these two if blocks need to be circumvented: 
> https://github.com/apache/flink/blob/ef5b03297092c5eda7c8862f1d7eb08aa10e480b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java#L143



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to