This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 43eda5d3fe23de1aa95e73475dfb4792d36d4490 Author: luoyuxia <[email protected]> AuthorDate: Sun Aug 7 17:11:44 2022 +0800 [FLINK-28797][hive] HiveSource enables vector reading for complex data type with parquet format --- .../connectors/hive/read/HiveInputFormat.java | 6 ++--- .../connectors/hive/HiveTableSourceITCase.java | 28 ++++++++++++++++++++++ 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormat.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormat.java index d88ba944c67..d6ae3656381 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormat.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormat.java @@ -254,14 +254,14 @@ public class HiveInputFormat implements BulkFormat<RowData, HiveSourceSplit> { case TIME_WITHOUT_TIME_ZONE: case TIMESTAMP_WITHOUT_TIME_ZONE: case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + case ARRAY: + case MAP: + case ROW: return false; case TIMESTAMP_WITH_TIME_ZONE: case INTERVAL_YEAR_MONTH: case INTERVAL_DAY_TIME: - case ARRAY: case MULTISET: - case MAP: - case ROW: case DISTINCT_TYPE: case STRUCTURED_TYPE: case NULL: diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java index 31ee7de2113..dcce413340b 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java @@ -45,6 +45,7 @@ import org.apache.flink.table.connector.ProviderContext; import org.apache.flink.table.data.RowData; import org.apache.flink.table.factories.DynamicTableFactory; import org.apache.flink.table.factories.TableSourceFactory; +import org.apache.flink.table.module.hive.HiveModule; import org.apache.flink.table.planner.delegation.PlannerBase; import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; import org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase; @@ -156,6 +157,33 @@ public class HiveTableSourceITCase extends BatchAbstractTestBase { assertThat(rows.get(0).getField(2)).isEqualTo(Row.of(struct[0], struct[1])); } + @Test + public void testReadParquetComplexDataType() throws Exception { + batchTableEnv.executeSql( + "create table parquet_complex_type_test(" + + "a array<int>, m map<int,string>, s struct<f1:int,f2:bigint>) stored as parquet"); + String[] modules = batchTableEnv.listModules(); + // load hive module so that we can use array,map, named_struct function + // for convenient writing complex data + batchTableEnv.loadModule("hive", new HiveModule()); + String[] newModules = new String[modules.length + 1]; + newModules[0] = "hive"; + System.arraycopy(modules, 0, newModules, 1, modules.length); + batchTableEnv.useModules(newModules); + + batchTableEnv + .executeSql( + "insert into parquet_complex_type_test" + + " select array(1, 2), map(1, 'val1', 2, 'val2')," + + " named_struct('f1', 1, 'f2', 2)") + .await(); + + Table src = batchTableEnv.sqlQuery("select * from parquet_complex_type_test"); + List<Row> rows = CollectionUtil.iteratorToList(src.execute().collect()); + assertThat(rows.toString()).isEqualTo("[+I[[1, 2], {1=val1, 2=val2}, +I[1, 2]]]"); + batchTableEnv.unloadModule("hive"); + } + /** * Test to read from partition table. *
