[
https://issues.apache.org/jira/browse/FLINK-27898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
zoucao updated FLINK-27898:
---------------------------
Description:
In hive source, the PartitionPushDown will cause some problems in
streaming-mode, we can add the following test in {*}HiveTableSourceITCase{*}
{code:java}
@Test
public void testPushDown() throws Exception {
final String catalogName = "hive";
final String dbName = "source_db";
final String tblName = "stream_test";
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10 * 1000);
StreamTableEnvironment tEnv =
HiveTestUtils.createTableEnvInStreamingMode(env, SqlDialect.HIVE);
tEnv.registerCatalog(catalogName, hiveCatalog);
tEnv.useCatalog(catalogName);
tEnv.executeSql(
"CREATE TABLE source_db.stream_test ("
+ " a INT,"
+ " b STRING"
+ ") PARTITIONED BY (ts int) TBLPROPERTIES ("
+ "'streaming-source.enable'='true',"
+ "'streaming-source.monitor-interval'='10s',"
+ "'streaming-source.consume-order'='partition-name',"
+ "'streaming-source.consume-start-offset'='ts=1'"
+ ")");
HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName)
.addRow(new Object[]{0, "a0"})
.addRow(new Object[]{1, "a0"})
.commit("ts=0");
HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName)
.addRow(new Object[]{1, "a1"})
.addRow(new Object[]{2, "a1"})
.commit("ts=1");
HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName)
.addRow(new Object[]{1, "a2"})
.addRow(new Object[]{2, "a2"})
.commit("ts=2");
System.out.println(tEnv.explainSql("select * from hive.source_db.stream_test
where ts > 1"));
TableResult result = tEnv.executeSql("select * from hive.source_db.stream_test
where ts > 1");
result.print();
}
{code}
{code:java}
+----+-------------+--------------------------------+-------------+
| op | a | b | ts |
+----+-------------+--------------------------------+-------------+
| +I | 1 | a2 | 2 |
| +I | 2 | a2 | 2 |
| +I | 1 | a1 | 1 |
| +I | 2 | a1 | 1 |
{code}
{code:java}
== Abstract Syntax Tree ==
LogicalProject(a=[$0], b=[$1], ts=[$2])
+- LogicalFilter(condition=[>($2, 1)])
+- LogicalTableScan(table=[[hive, source_db, stream_test]])
== Optimized Physical Plan ==
TableSourceScan(table=[[hive, source_db, stream_test, partitions=[{ts=2}]]],
fields=[a, b, ts])
== Optimized Execution Plan ==
TableSourceScan(table=[[hive, source_db, stream_test, partitions=[{ts=2}]]],
fields=[a, b, ts])
{code}
The PartitionPushDown rule can generate the correct partitions that need to
consume by using the existing partition. If the partitions are pushed to the
hive source, the filter node will be removed. But hive source will not use the
partition info which is pushed down in streaming mode, I think it causes some
problems.
was:
In hive source, the PartitionPushDown will cause some problems in
streaming-mode, we can add the following test in {*}HiveTableSourceITCase{*}
{code:java}
@Test
public void testPushDown() throws Exception {
final String catalogName = "hive";
final String dbName = "source_db";
final String tblName = "stream_test";
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10 * 1000);
StreamTableEnvironment tEnv =
HiveTestUtils.createTableEnvInStreamingMode(env, SqlDialect.HIVE);
tEnv.registerCatalog(catalogName, hiveCatalog);
tEnv.useCatalog(catalogName);
tEnv.executeSql(
"CREATE TABLE source_db.stream_test ("
+ " a INT,"
+ " b STRING"
+ ") PARTITIONED BY (ts int) TBLPROPERTIES ("
+ "'streaming-source.enable'='true',"
+ "'streaming-source.monitor-interval'='10s',"
+ "'streaming-source.consume-order'='partition-name',"
+ "'streaming-source.consume-start-offset'='ts=1'"
+ ")");
HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName)
.addRow(new Object[]{0, "a0"})
.addRow(new Object[]{1, "a0"})
.commit("ts=0");
HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName)
.addRow(new Object[]{1, "a1"})
.addRow(new Object[]{2, "a1"})
.commit("ts=1");
HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName)
.addRow(new Object[]{1, "a2"})
.addRow(new Object[]{2, "a2"})
.commit("ts=2");
System.out.println(tEnv.explainSql("select * from hive.source_db.stream_test
where ts > 1"));
TableResult result = tEnv.executeSql("select * from hive.source_db.stream_test
where ts > 1");
result.print();
)
{code}
{code:java}
+----+-------------+--------------------------------+-------------+
| op | a | b | ts |
+----+-------------+--------------------------------+-------------+
| +I | 1 | a2 | 2 |
| +I | 2 | a2 | 2 |
| +I | 1 | a1 | 1 |
| +I | 2 | a1 | 1 |
{code}
{code:java}
== Abstract Syntax Tree ==
LogicalProject(a=[$0], b=[$1], ts=[$2])
+- LogicalFilter(condition=[>($2, 1)])
+- LogicalTableScan(table=[[hive, source_db, stream_test]])
== Optimized Physical Plan ==
TableSourceScan(table=[[hive, source_db, stream_test, partitions=[{ts=2}]]],
fields=[a, b, ts])
== Optimized Execution Plan ==
TableSourceScan(table=[[hive, source_db, stream_test, partitions=[{ts=2}]]],
fields=[a, b, ts])
{code}
The PartitionPushDown rule can generate the correct partitions that need to
consume by using the existing partition. If the partitions are pushed to the
hive source, the filter node will be removed. But hive source will not use the
partition info which is pushed down in streaming mode, I think it causes some
problems.
> fix PartitionPushDown in streaming mode for hive source
> -------------------------------------------------------
>
> Key: FLINK-27898
> URL: https://issues.apache.org/jira/browse/FLINK-27898
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Hive
> Reporter: zoucao
> Priority: Major
>
> In hive source, the PartitionPushDown will cause some problems in
> streaming-mode, we can add the following test in {*}HiveTableSourceITCase{*}
> {code:java}
> @Test
> public void testPushDown() throws Exception {
> final String catalogName = "hive";
> final String dbName = "source_db";
> final String tblName = "stream_test";
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(10 * 1000);
> StreamTableEnvironment tEnv =
> HiveTestUtils.createTableEnvInStreamingMode(env, SqlDialect.HIVE);
> tEnv.registerCatalog(catalogName, hiveCatalog);
> tEnv.useCatalog(catalogName);
> tEnv.executeSql(
> "CREATE TABLE source_db.stream_test ("
> + " a INT,"
> + " b STRING"
> + ") PARTITIONED BY (ts int) TBLPROPERTIES ("
> + "'streaming-source.enable'='true',"
> + "'streaming-source.monitor-interval'='10s',"
> + "'streaming-source.consume-order'='partition-name',"
> + "'streaming-source.consume-start-offset'='ts=1'"
> + ")");
> HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName)
> .addRow(new Object[]{0, "a0"})
> .addRow(new Object[]{1, "a0"})
> .commit("ts=0");
> HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName)
> .addRow(new Object[]{1, "a1"})
> .addRow(new Object[]{2, "a1"})
> .commit("ts=1");
> HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName)
> .addRow(new Object[]{1, "a2"})
> .addRow(new Object[]{2, "a2"})
> .commit("ts=2");
> System.out.println(tEnv.explainSql("select * from hive.source_db.stream_test
> where ts > 1"));
> TableResult result = tEnv.executeSql("select * from
> hive.source_db.stream_test where ts > 1");
> result.print();
> }
> {code}
> {code:java}
> +----+-------------+--------------------------------+-------------+
> | op | a | b | ts |
> +----+-------------+--------------------------------+-------------+
> | +I | 1 | a2 | 2 |
> | +I | 2 | a2 | 2 |
> | +I | 1 | a1 | 1 |
> | +I | 2 | a1 | 1 |
> {code}
> {code:java}
> == Abstract Syntax Tree ==
> LogicalProject(a=[$0], b=[$1], ts=[$2])
> +- LogicalFilter(condition=[>($2, 1)])
> +- LogicalTableScan(table=[[hive, source_db, stream_test]])
> == Optimized Physical Plan ==
> TableSourceScan(table=[[hive, source_db, stream_test, partitions=[{ts=2}]]],
> fields=[a, b, ts])
> == Optimized Execution Plan ==
> TableSourceScan(table=[[hive, source_db, stream_test, partitions=[{ts=2}]]],
> fields=[a, b, ts])
> {code}
> The PartitionPushDown rule can generate the correct partitions that need to
> consume by using the existing partition. If the partitions are pushed to the
> hive source, the filter node will be removed. But hive source will not use
> the partition info which is pushed down in streaming mode, I think it causes
> some problems.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)