[
https://issues.apache.org/jira/browse/FLINK-27898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17552520#comment-17552520
]
zoucao commented on FLINK-27898:
--------------------------------
Hi, [~luoyuxia], before opening a pr, I think we should discuss how to solve
it, and I am willing to know your thoughts about this problem. The
table-planner module will also be involved, such that I think we should ask
others for advice, gentle ping [~jark], what do you think about it?
> fix PartitionPushDown in streaming mode for hive source
> -------------------------------------------------------
>
> Key: FLINK-27898
> URL: https://issues.apache.org/jira/browse/FLINK-27898
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Hive
> Reporter: zoucao
> Priority: Major
>
> In hive source, the PartitionPushDown will cause some problems in
> streaming-mode, we can add the following test in {*}HiveTableSourceITCase{*}
> {code:java}
> @Test
> public void testPushDown() throws Exception {
> final String catalogName = "hive";
> final String dbName = "source_db";
> final String tblName = "stream_test";
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(10 * 1000);
> StreamTableEnvironment tEnv =
> HiveTestUtils.createTableEnvInStreamingMode(env, SqlDialect.HIVE);
> tEnv.registerCatalog(catalogName, hiveCatalog);
> tEnv.useCatalog(catalogName);
> tEnv.executeSql(
> "CREATE TABLE source_db.stream_test ("
> + " a INT,"
> + " b STRING"
> + ") PARTITIONED BY (ts int) TBLPROPERTIES ("
> + "'streaming-source.enable'='true',"
> + "'streaming-source.monitor-interval'='10s',"
> + "'streaming-source.consume-order'='partition-name',"
> + "'streaming-source.consume-start-offset'='ts=1'"
> + ")");
> HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName)
> .addRow(new Object[]{0, "a0"})
> .addRow(new Object[]{1, "a0"})
> .commit("ts=0");
> HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName)
> .addRow(new Object[]{1, "a1"})
> .addRow(new Object[]{2, "a1"})
> .commit("ts=1");
> HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName)
> .addRow(new Object[]{1, "a2"})
> .addRow(new Object[]{2, "a2"})
> .commit("ts=2");
> System.out.println(tEnv.explainSql("select * from hive.source_db.stream_test
> where ts > 1"));
> TableResult result = tEnv.executeSql("select * from
> hive.source_db.stream_test where ts > 1");
> result.print();
> }
> {code}
> {code:java}
> +----+-------------+--------------------------------+-------------+
> | op | a | b | ts |
> +----+-------------+--------------------------------+-------------+
> | +I | 1 | a2 | 2 |
> | +I | 2 | a2 | 2 |
> | +I | 1 | a1 | 1 |
> | +I | 2 | a1 | 1 |
> {code}
> {code:java}
> == Abstract Syntax Tree ==
> LogicalProject(a=[$0], b=[$1], ts=[$2])
> +- LogicalFilter(condition=[>($2, 1)])
> +- LogicalTableScan(table=[[hive, source_db, stream_test]])
> == Optimized Physical Plan ==
> TableSourceScan(table=[[hive, source_db, stream_test, partitions=[{ts=2}]]],
> fields=[a, b, ts])
> == Optimized Execution Plan ==
> TableSourceScan(table=[[hive, source_db, stream_test, partitions=[{ts=2}]]],
> fields=[a, b, ts])
> {code}
> The PartitionPushDown rule can generate the correct partitions that need to
> consume by using the existing partition. If the partitions are pushed to the
> hive source, the filter node will be removed. But hive source will not use
> the partition info which is pushed down in streaming mode, I think it causes
> some problems.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)