This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3d14248c104c649dfce065c8f6f6d177be35f3ef Author: luoyuxia <[email protected]> AuthorDate: Fri Apr 15 10:40:43 2022 +0800 [FLINK-27244][hive] Support read sub-directories in partition directory with Hive tables This closes #19482 --- .../docs/connectors/table/hive/hive_read_write.md | 20 +++++++++++ .../docs/connectors/table/hive/hive_read_write.md | 20 +++++++++++ .../connectors/hive/HiveDynamicTableFactory.java | 7 ++++ .../apache/flink/connectors/hive/HiveOptions.java | 9 +++++ .../flink/connectors/hive/HiveDialectITCase.java | 42 ++++++++++++++++++++++ 5 files changed, 98 insertions(+) diff --git a/docs/content.zh/docs/connectors/table/hive/hive_read_write.md b/docs/content.zh/docs/connectors/table/hive/hive_read_write.md index 63e31538f5b..ba712cc82e5 100644 --- a/docs/content.zh/docs/connectors/table/hive/hive_read_write.md +++ b/docs/content.zh/docs/connectors/table/hive/hive_read_write.md @@ -170,6 +170,26 @@ following parameters in `TableConfig` (note that these parameters affect all sou Multi-thread is used to split hive's partitions. You can use `table.exec.hive.load-partition-splits.thread-num` to configure the thread number. The default value is 3 and the configured value should be bigger than 0. +### Read Partition With Subdirectory + +In some case, you may create an external table referring another table, but the partition columns is a subset of the referred table. +For example, you have a partitioned table `fact_tz` with partition `day`/`hour`: + +```sql +CREATE TABLE fact_tz(x int) PARTITIONED BY (day STRING, hour STRING); +``` + +And you have an external table `fact_daily` referring to table `fact_tz` with a coarse-grained partition `day`: + +```sql +create external table fact_daily(x int) PARTITIONED BY (ds STRING) location 'fact_tz_localtion' ; +``` + +Then when reading the external table, there will be sub-directories in the partition directory of the external table. + +You can configure `table.exec.hive.read-partition-with-subdirectory.enabled` to allow Flink to read the sub-directories or skip them directly. +The default value is true, it will read the sub-directories. Otherwise, it will throw the exception "not a file: xxx" when the partition directory contains any sub-directory. + ## Temporal Table Join You can use a Hive table as a temporal table, and then a stream can correlate the Hive table by temporal join. diff --git a/docs/content/docs/connectors/table/hive/hive_read_write.md b/docs/content/docs/connectors/table/hive/hive_read_write.md index 507836c9bf1..3c5f7cd043a 100644 --- a/docs/content/docs/connectors/table/hive/hive_read_write.md +++ b/docs/content/docs/connectors/table/hive/hive_read_write.md @@ -170,6 +170,26 @@ following parameters in `TableConfig` (note that these parameters affect all sou Multi-thread is used to split hive's partitions. You can use `table.exec.hive.load-partition-splits.thread-num` to configure the thread number. The default value is 3 and the configured value should be bigger than 0. +### Read Partition With Subdirectory + +In some case, you may create an external table referring another table, but the partition columns is a subset of the referred table. +For example, you have a partitioned table `fact_tz` with partition `day`/`hour`: + +```sql +CREATE TABLE fact_tz(x int) PARTITIONED BY (day STRING, hour STRING); +``` + +And you have an external table `fact_daily` referring to table `fact_tz` with a coarse-grained partition `day`: + +```sql +create external table fact_daily(x int) PARTITIONED BY (ds STRING) location 'fact_tz_localtion' ; +``` + +Then when reading the external table `fact_daily`, there will be sub-directories in the partition directory of the table. + +You can configure `table.exec.hive.read-partition-with-subdirectory.enabled` to allow Flink to read the sub-directories or skip them directly. +The default value is true, it will read the sub-directories. Otherwise, it will throw the exception "not a file: xxx" when the partition directory contains any sub-directory. + ## Temporal Table Join You can use a Hive table as a temporal table, and then a stream can correlate the Hive table by temporal join. diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java index fe657c68fbf..0647f07a750 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java @@ -36,11 +36,13 @@ import org.apache.flink.util.Preconditions; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import java.util.Set; import static org.apache.flink.connectors.hive.HiveOptions.STREAMING_SOURCE_ENABLE; import static org.apache.flink.connectors.hive.HiveOptions.STREAMING_SOURCE_PARTITION_INCLUDE; +import static org.apache.flink.connectors.hive.HiveOptions.TABLE_EXEC_HIVE_READ_PARTITION_WITH_SUBDIRECTORY_ENABLED; /** A dynamic table factory implementation for Hive catalog. */ public class HiveDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { @@ -130,6 +132,11 @@ public class HiveDynamicTableFactory implements DynamicTableSourceFactory, Dynam .defaultValue() .equals(configuration.get(STREAMING_SOURCE_PARTITION_INCLUDE)); final JobConf jobConf = JobConfUtils.createJobConfWithCredentials(hiveConf); + boolean readSubDirectory = + context.getConfiguration() + .get(TABLE_EXEC_HIVE_READ_PARTITION_WITH_SUBDIRECTORY_ENABLED); + // set whether to read directory recursively + jobConf.set(FileInputFormat.INPUT_DIR_RECURSIVE, String.valueOf(readSubDirectory)); // hive table source that has not lookup ability if (isStreamingSource && includeAllPartition) { diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java index 8717e842e3d..37095bd65d7 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java @@ -39,6 +39,15 @@ public class HiveOptions { "If it is false, using flink native vectorized reader to read orc files; " + "If it is true, using hadoop mapred record reader to read orc files."); + public static final ConfigOption<Boolean> + TABLE_EXEC_HIVE_READ_PARTITION_WITH_SUBDIRECTORY_ENABLED = + key("table.exec.hive.read-partition-with-subdirectory.enabled") + .booleanType() + .defaultValue(true) + .withDescription( + "If it is true, flink will read the files of partitioned hive table from subdirectories under the partition directory to be read.\n" + + "If it is false, an exception that 'not a file: xxx' will be thrown when the partition directory contains any sub-directory."); + public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM = key("table.exec.hive.infer-source-parallelism") .booleanType() diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java index 491b19104f9..12ca33871d0 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java @@ -76,6 +76,7 @@ import java.util.Arrays; import java.util.LinkedHashMap; import java.util.List; +import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -557,6 +558,47 @@ public class HiveDialectITCase { .isEqualTo("\n"); } + @Test + public void testTableWithSubDirsInPartitionDir() throws Exception { + tableEnv.executeSql("CREATE TABLE fact_tz(x int) PARTITIONED BY (ds STRING, hr STRING)"); + tableEnv.executeSql("INSERT OVERWRITE TABLE fact_tz PARTITION (ds='1', hr='1') select 1") + .await(); + tableEnv.executeSql("INSERT OVERWRITE TABLE fact_tz PARTITION (ds='1', hr='2') select 2") + .await(); + String location = warehouse + "/fact_tz"; + // create an external table + tableEnv.executeSql( + String.format( + "create external table fact_daily(x int) PARTITIONED BY (ds STRING) location '%s'", + location)); + tableEnv.executeSql( + String.format( + "ALTER TABLE fact_daily ADD PARTITION (ds='1') location '%s'", + location + "/ds=1")); + List<Row> results = + CollectionUtil.iteratorToList( + tableEnv.executeSql("select * from fact_daily WHERE ds='1' order by x") + .collect()); + // the data read from the external table fact_daily should contain the data in + // directory 'ds=1/hr=1', 'ds=1/hr=2' + assertThat(results.toString()).isEqualTo("[+I[1, 1], +I[2, 1]]"); + + tableEnv.getConfig() + .set( + HiveOptions.TABLE_EXEC_HIVE_READ_PARTITION_WITH_SUBDIRECTORY_ENABLED.key(), + "false"); + // should throw exception when disable reading sub-dirs in partition directory + assertThatThrownBy( + () -> + CollectionUtil.iteratorToList( + tableEnv.executeSql("select * from fact_daily WHERE ds='1'") + .collect())) + .satisfies( + anyCauseMatches( + String.format( + "Not a file: file:%s", warehouse + "/fact_tz/ds=1/hr=2"))); + } + @Test public void testView() throws Exception { tableEnv.executeSql("create table tbl (x int,y string)");
