This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 63a6aba [FLINK-21523][hive] Fix ArrayIndexOutOfBoundsException when
running hive partitioned source with projection push down
63a6aba is described below
commit 63a6aba6722ae0e3d17381aaeb2fa464ea15d2f5
Author: KevinyhZou <[email protected]>
AuthorDate: Fri Mar 12 18:27:58 2021 +0800
[FLINK-21523][hive] Fix ArrayIndexOutOfBoundsException when running hive
partitioned source with projection push down
Co-authored-by: zouyunhe <[email protected]>
This closese #15068
---
.../flink/connectors/hive/HiveTableSource.java | 4 +-
.../connectors/hive/HiveTableSourceITCase.java | 54 ++++++++++++++++++++++
2 files changed, 56 insertions(+), 2 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
index 721a21c..d5822d3 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
@@ -184,8 +184,8 @@ public class HiveTableSource
hiveShim,
new JobConfWrapper(jobConf),
catalogTable.getPartitionKeys(),
- getProducedTableSchema().getFieldDataTypes(),
- getProducedTableSchema().getFieldNames(),
+ getTableSchema().getFieldDataTypes(),
+ getTableSchema().getFieldNames(),
configuration,
defaultPartitionName);
sourceBuilder.setFetcherContext(fetcherContext);
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
index 3bbc2f9..506f054 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
@@ -843,6 +843,60 @@ public class HiveTableSourceITCase extends
BatchAbstractTestBase {
Row.of(1, 2), tEnv.executeSql("select * from
parquet_t").collect().next());
}
+ @Test(timeout = 120000)
+ public void testStreamReadWithProjectPushDown() throws Exception {
+ final String catalogName = "hive";
+ final String dbName = "source_db";
+ final String tblName = "stream_project_pushdown_test";
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(100);
+ StreamTableEnvironment tEnv =
+ HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env,
SqlDialect.HIVE);
+ tEnv.registerCatalog(catalogName, hiveCatalog);
+ tEnv.useCatalog(catalogName);
+ tEnv.executeSql(
+ "CREATE TABLE source_db.stream_project_pushdown_test (x int, y
string, z int)"
+ + " PARTITIONED BY ("
+ + " pt_year int, pt_mon string, pt_day string)
TBLPROPERTIES("
+ + "'streaming-source.enable'='true',"
+ + "'streaming-source.monitor-interval'='1s',"
+ +
"'streaming-source.consume-start-offset'='pt_year=2019/pt_month=09/pt_day=02'"
+ + ")");
+
+ HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName)
+ .addRow(new Object[] {0, "a", 11})
+ .commit("pt_year='2019',pt_mon='09',pt_day='01'");
+ HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName)
+ .addRow(new Object[] {1, "b", 12})
+ .commit("pt_year='2020',pt_mon='09',pt_day='03'");
+
+ TableResult result =
+ tEnv.executeSql(
+ "select x, y from
hive.source_db.stream_project_pushdown_test where pt_year = '2020'");
+ CloseableIterator<Row> iter = result.collect();
+
+ Assert.assertEquals(Row.of(1, "b").toString(), fetchRows(iter,
1).get(0));
+
+ for (int i = 2; i < 6; i++) {
+ try {
+ Thread.sleep(1_000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName)
+ .addRow(new Object[] {i, "new_add", 11 + i})
+ .addRow(new Object[] {i, "new_add_1", 11 + i})
+ .commit("pt_year='2020',pt_mon='10',pt_day='0" + i + "'");
+
+ Assert.assertEquals(
+ Arrays.asList(
+ Row.of(i, "new_add").toString(), Row.of(i,
"new_add_1").toString()),
+ fetchRows(iter, 2));
+ }
+
+ result.getJobClient().get().cancel();
+ }
+
private static TableEnvironment createTableEnv() {
TableEnvironment tableEnv =
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);