This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 63a6aba  [FLINK-21523][hive] Fix ArrayIndexOutOfBoundsException when 
running hive partitioned source with projection push down
63a6aba is described below

commit 63a6aba6722ae0e3d17381aaeb2fa464ea15d2f5
Author: KevinyhZou <[email protected]>
AuthorDate: Fri Mar 12 18:27:58 2021 +0800

    [FLINK-21523][hive] Fix ArrayIndexOutOfBoundsException when running hive 
partitioned source with projection push down
    
    Co-authored-by: zouyunhe <[email protected]>
    
    This closese #15068
---
 .../flink/connectors/hive/HiveTableSource.java     |  4 +-
 .../connectors/hive/HiveTableSourceITCase.java     | 54 ++++++++++++++++++++++
 2 files changed, 56 insertions(+), 2 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
index 721a21c..d5822d3 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
@@ -184,8 +184,8 @@ public class HiveTableSource
                                 hiveShim,
                                 new JobConfWrapper(jobConf),
                                 catalogTable.getPartitionKeys(),
-                                getProducedTableSchema().getFieldDataTypes(),
-                                getProducedTableSchema().getFieldNames(),
+                                getTableSchema().getFieldDataTypes(),
+                                getTableSchema().getFieldNames(),
                                 configuration,
                                 defaultPartitionName);
                 sourceBuilder.setFetcherContext(fetcherContext);
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
index 3bbc2f9..506f054 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
@@ -843,6 +843,60 @@ public class HiveTableSourceITCase extends 
BatchAbstractTestBase {
                 Row.of(1, 2), tEnv.executeSql("select * from 
parquet_t").collect().next());
     }
 
+    @Test(timeout = 120000)
+    public void testStreamReadWithProjectPushDown() throws Exception {
+        final String catalogName = "hive";
+        final String dbName = "source_db";
+        final String tblName = "stream_project_pushdown_test";
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(100);
+        StreamTableEnvironment tEnv =
+                HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env, 
SqlDialect.HIVE);
+        tEnv.registerCatalog(catalogName, hiveCatalog);
+        tEnv.useCatalog(catalogName);
+        tEnv.executeSql(
+                "CREATE TABLE source_db.stream_project_pushdown_test (x int, y 
string, z int)"
+                        + " PARTITIONED BY ("
+                        + " pt_year int, pt_mon string, pt_day string) 
TBLPROPERTIES("
+                        + "'streaming-source.enable'='true',"
+                        + "'streaming-source.monitor-interval'='1s',"
+                        + 
"'streaming-source.consume-start-offset'='pt_year=2019/pt_month=09/pt_day=02'"
+                        + ")");
+
+        HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName)
+                .addRow(new Object[] {0, "a", 11})
+                .commit("pt_year='2019',pt_mon='09',pt_day='01'");
+        HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName)
+                .addRow(new Object[] {1, "b", 12})
+                .commit("pt_year='2020',pt_mon='09',pt_day='03'");
+
+        TableResult result =
+                tEnv.executeSql(
+                        "select x, y from 
hive.source_db.stream_project_pushdown_test where pt_year = '2020'");
+        CloseableIterator<Row> iter = result.collect();
+
+        Assert.assertEquals(Row.of(1, "b").toString(), fetchRows(iter, 
1).get(0));
+
+        for (int i = 2; i < 6; i++) {
+            try {
+                Thread.sleep(1_000);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+            HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName)
+                    .addRow(new Object[] {i, "new_add", 11 + i})
+                    .addRow(new Object[] {i, "new_add_1", 11 + i})
+                    .commit("pt_year='2020',pt_mon='10',pt_day='0" + i + "'");
+
+            Assert.assertEquals(
+                    Arrays.asList(
+                            Row.of(i, "new_add").toString(), Row.of(i, 
"new_add_1").toString()),
+                    fetchRows(iter, 2));
+        }
+
+        result.getJobClient().get().cancel();
+    }
+
     private static TableEnvironment createTableEnv() {
         TableEnvironment tableEnv =
                 
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);

Reply via email to