This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-hive.git
commit 82ef7b8eafc212a8936c3e1ba56d46849946c52f Author: xuyang <[email protected]> AuthorDate: Mon Sep 23 12:06:49 2024 +0800 [FLINK-36273][table] Remove all deprecated configuration in 2.0 in flink-connector-hive module --- .../docs/connectors/table/hive/hive_read_write.md | 2 +- .../docs/connectors/table/hive/hive_read_write.md | 2 +- .../HiveDynamicParallelismInferenceFactory.java | 8 +------ .../apache/flink/connectors/hive/HiveOptions.java | 17 +------------- .../connectors/hive/HiveParallelismInference.java | 1 - .../flink/connectors/hive/HiveSourceBuilder.java | 3 --- .../HiveStaticParallelismInferenceFactory.java | 7 ++---- .../hive/HiveDynamicPartitionPruningITCase.java | 10 ++++++-- .../hive/HiveDynamicTableFactoryTest.java | 17 -------------- .../connectors/hive/HiveSinkCompactionITCase.java | 5 +++- .../flink/connectors/hive/HiveTableSinkITCase.java | 4 ++-- .../connectors/hive/HiveTableSourceITCase.java | 27 +++++++++++++++++----- .../connectors/hive/PartitionMonitorTest.java | 2 +- 13 files changed, 42 insertions(+), 63 deletions(-) diff --git a/docs/content.zh/docs/connectors/table/hive/hive_read_write.md b/docs/content.zh/docs/connectors/table/hive/hive_read_write.md index 64f9f7dc..ad986a47 100644 --- a/docs/content.zh/docs/connectors/table/hive/hive_read_write.md +++ b/docs/content.zh/docs/connectors/table/hive/hive_read_write.md @@ -69,7 +69,7 @@ Flink 支持以批和流两种模式从 Hive 表中读取数据。批读的时 <td><h5>streaming-source.partition-order</h5></td> <td style="word-wrap: break-word;">partition-name</td> <td>String</td> - <td>streaming source 分区排序,支持 create-time, partition-time 和 partition-name。 create-time 比较分区/文件创建时间, 这不是 Hive metastore 中创建分区的时间,而是文件夹/文件在文件系统的修改时间,如果分区文件夹以某种方式更新,比如添加在文件夹里新增了一个文件,它会影响到数据的使用。partition-time 从分区名称中抽取时间进行比较。partition-name 会比较分区名称的字典顺序。对于非分区的表,总是会比较 'create-time'。对于分区表默认值是 'partition-name'。该选项与已经弃用的 'streaming-source.consume-order' 的选项相同</td> + <td>streaming source 分区排序,支持 create-time, partition-time 和 partition-name。 create-time 比较分区/文件创建时间, 这不是 Hive metastore 中创建分区的时间,而是文件夹/文件在文件系统的修改时间,如果分区文件夹以某种方式更新,比如添加在文件夹里新增了一个文件,它会影响到数据的使用。partition-time 从分区名称中抽取时间进行比较。partition-name 会比较分区名称的字典顺序。对于非分区的表,总是会比较 'create-time'。对于分区表默认值是 'partition-name'。</td> </tr> <tr> <td><h5>streaming-source.consume-start-offset</h5></td> diff --git a/docs/content/docs/connectors/table/hive/hive_read_write.md b/docs/content/docs/connectors/table/hive/hive_read_write.md index 9a9834b6..2f8252a6 100644 --- a/docs/content/docs/connectors/table/hive/hive_read_write.md +++ b/docs/content/docs/connectors/table/hive/hive_read_write.md @@ -79,7 +79,7 @@ of new files in the folder and read new files incrementally. <td><h5>streaming-source.partition-order</h5></td> <td style="word-wrap: break-word;">partition-name</td> <td>String</td> - <td>The partition order of streaming source, support create-time, partition-time and partition-name. create-time compares partition/file creation time, this is not the partition create time in Hive metaStore, but the folder/file modification time in filesystem, if the partition folder somehow gets updated, e.g. add new file into folder, it can affect how the data is consumed. partition-time compares the time extracted from partition name. partition-name compares partition name's [...] + <td>The partition order of streaming source, support create-time, partition-time and partition-name. create-time compares partition/file creation time, this is not the partition create time in Hive metaStore, but the folder/file modification time in filesystem, if the partition folder somehow gets updated, e.g. add new file into folder, it can affect how the data is consumed. partition-time compares the time extracted from partition name. partition-name compares partition name's [...] </tr> <tr> <td><h5>streaming-source.consume-start-offset</h5></td> diff --git a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicParallelismInferenceFactory.java b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicParallelismInferenceFactory.java index 4932bbee..36e655ec 100644 --- a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicParallelismInferenceFactory.java +++ b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicParallelismInferenceFactory.java @@ -42,17 +42,11 @@ class HiveDynamicParallelismInferenceFactory implements HiveParallelismInference @Override public HiveParallelismInference create() { - boolean inferEnabled = - jobConf.getBoolean( - HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM.key(), - HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM.defaultValue()); HiveOptions.InferMode inferMode = jobConf.getEnum( HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE.key(), HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE.defaultValue()); - // This logic should be fixed if config option `table.exec.hive.infer-source-parallelism` - // is removed. - boolean infer = inferEnabled && inferMode == HiveOptions.InferMode.DYNAMIC; + boolean infer = inferMode == HiveOptions.InferMode.DYNAMIC; int inferMaxParallelism = Math.min( (int) diff --git a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java index c8588f58..6877ef42 100644 --- a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java +++ b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java @@ -51,16 +51,6 @@ public class HiveOptions { "If it is true, flink will read the files of partitioned hive table from subdirectories under the partition directory to be read.\n" + "If it is false, an exception that 'not a file: xxx' will be thrown when the partition directory contains any sub-directory."); - /** @deprecated Use {@link #TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE} instead. */ - @Deprecated - public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM = - key("table.exec.hive.infer-source-parallelism") - .booleanType() - .defaultValue(true) - .withDescription( - "If is false, parallelism of source are set by config.\n" - + "If is true, source parallelism is inferred according to splits number.\n"); - @PublicEvolving public static final ConfigOption<InferMode> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE = key("table.exec.hive.infer-source-parallelism.mode") @@ -76,9 +66,7 @@ public class HiveOptions { text( "'dynamic' represents dynamic inference, which will infer parallelism at job execution stage and could more accurately infer the source parallelism."), text( - "'none' represents disabling parallelism inference."), - text( - "Note that it is still affected by the deprecated option 'table.exec.hive.infer-source-parallelism', requiring its value to be true for enabling parallelism inference.")) + "'none' represents disabling parallelism inference.")) .build()); public static final ConfigOption<Integer> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX = @@ -250,12 +238,9 @@ public class HiveOptions { key("streaming-source.partition-order") .enumType(PartitionOrder.class) .defaultValue(PartitionOrder.PARTITION_NAME) - .withDeprecatedKeys("streaming-source.consume-order") .withDescription( Description.builder() .text("The partition order of the streaming source.") - .text( - "This is a synonym for the deprecated 'streaming-source.consume-order' option.") .build()); public static final ConfigOption<Duration> LOOKUP_JOIN_CACHE_TTL = diff --git a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java index e46eb7f0..8c663c9b 100644 --- a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java +++ b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java @@ -64,7 +64,6 @@ class HiveParallelismInference { /** * Infer parallelism by number of files and number of splits. If {@link - * HiveOptions#TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM} is false or {@link * HiveOptions#TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE} is none, this method does nothing. */ HiveParallelismInference infer( diff --git a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java index 84c97779..c2db01d6 100644 --- a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java +++ b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java @@ -339,9 +339,6 @@ public class HiveSourceBuilder { jobConf.set( HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM.key(), String.valueOf(calPartitionSizeThreadNum)); - jobConf.setBoolean( - HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM.key(), - flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM)); jobConf.set( HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE.key(), String.valueOf( diff --git a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveStaticParallelismInferenceFactory.java b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveStaticParallelismInferenceFactory.java index 58a20e49..41b02dd3 100644 --- a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveStaticParallelismInferenceFactory.java +++ b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveStaticParallelismInferenceFactory.java @@ -40,12 +40,9 @@ class HiveStaticParallelismInferenceFactory implements HiveParallelismInference. @Override public HiveParallelismInference create() { - boolean inferEnabled = flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM); HiveOptions.InferMode inferMode = flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE); - // This logic should be fixed if config option `table.exec.hive.infer-source-parallelism` - // is removed. - boolean infer = inferEnabled && inferMode == HiveOptions.InferMode.STATIC; + boolean infer = inferMode == HiveOptions.InferMode.STATIC; int inferMaxParallelism = flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX); Preconditions.checkArgument( @@ -55,7 +52,7 @@ class HiveStaticParallelismInferenceFactory implements HiveParallelismInference. int parallelism = flinkConf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM); // Keeping the parallelism unset is a prerequisite for dynamic parallelism inference. - if (inferEnabled && inferMode == HiveOptions.InferMode.DYNAMIC) { + if (inferMode == HiveOptions.InferMode.DYNAMIC) { parallelism = ExecutionConfig.PARALLELISM_DEFAULT; } diff --git a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicPartitionPruningITCase.java b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicPartitionPruningITCase.java index dce66345..27babefb 100644 --- a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicPartitionPruningITCase.java +++ b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicPartitionPruningITCase.java @@ -123,7 +123,10 @@ public class HiveDynamicPartitionPruningITCase { tableEnv.explainSql( "select a, b, c, p, x, y from fact, dim where x = p and z = 1 order by a")); - tableEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false); + tableEnv.getConfig() + .set( + HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE, + HiveOptions.InferMode.NONE); String sql = "select a, b, c, p, x, y from fact, dim where x = p and z = 1 order by a"; String sqlSwapFactDim = @@ -192,7 +195,10 @@ public class HiveDynamicPartitionPruningITCase { "insert into fact2 partition (p=3) values (60,300,'aaa'),(61,301,'bbb'),(62,302,'ccc') ") .await(); - tableEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false); + tableEnv.getConfig() + .set( + HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE, + HiveOptions.InferMode.NONE); // two fact sources share the same dynamic filter String sql = diff --git a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java index aa4bfffa..60ba5342 100644 --- a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java +++ b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java @@ -114,23 +114,6 @@ public class HiveDynamicTableFactoryTest { hiveTableSource3.catalogTable.getOptions().forEach(configuration1::setString); PartitionOrder partitionOrder1 = configuration1.get(STREAMING_SOURCE_PARTITION_ORDER); assertThat(partitionOrder1).isEqualTo(HiveOptions.PartitionOrder.PARTITION_NAME); - - // test deprecated option key 'streaming-source.consume-order' and new key - // 'streaming-source.partition-order' - tableEnv.executeSql( - String.format( - "create table table4 (x int, y string, z int) partitioned by (" - + " pt_year int, pt_mon string, pt_day string)" - + " tblproperties ('%s' = 'true', '%s' = 'partition-time')", - STREAMING_SOURCE_ENABLE.key(), "streaming-source.consume-order")); - DynamicTableSource tableSource4 = getTableSource("table4"); - assertThat(tableSource4).isInstanceOf(HiveTableSource.class); - HiveTableSource hiveTableSource = (HiveTableSource) tableSource4; - - Configuration configuration2 = new Configuration(); - hiveTableSource.catalogTable.getOptions().forEach(configuration2::setString); - PartitionOrder partitionOrder2 = configuration2.get(STREAMING_SOURCE_PARTITION_ORDER); - assertThat(partitionOrder2).isEqualTo(HiveOptions.PartitionOrder.PARTITION_TIME); } @Test diff --git a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSinkCompactionITCase.java b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSinkCompactionITCase.java index c90836d3..9905f699 100644 --- a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSinkCompactionITCase.java +++ b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSinkCompactionITCase.java @@ -57,7 +57,10 @@ class HiveSinkCompactionITCase extends CompactionITCaseBase { tEnv().useCatalog(hiveCatalog.getName()); // avoid too large parallelism lead to scheduler dead lock in streaming mode - tEnv().getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false); + tEnv().getConfig() + .set( + HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE, + HiveOptions.InferMode.NONE); super.init(); } diff --git a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java index 160ccedb..2dc725eb 100644 --- a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java +++ b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java @@ -285,7 +285,7 @@ class HiveTableSinkITCase { + "'partition.time-extractor.timestamp-pattern'='$pt_day $pt_hour:00:00'," + "'streaming-source.enable'='true'," + "'streaming-source.monitor-interval'='1s'," - + "'streaming-source.consume-order'='partition-time'" + + "'streaming-source.partition-order'='partition-time'" + ")"); tEnv.executeSql( @@ -305,7 +305,7 @@ class HiveTableSinkITCase { + " 'sink.partition-commit.success-file.name'='_MY_SUCCESS'," + " 'streaming-source.enable'='true'," + " 'streaming-source.monitor-interval'='1s'," - + " 'streaming-source.consume-order'='partition-time'" + + " 'streaming-source.partition-order'='partition-time'" + ")"); tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); diff --git a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java index 27833a85..108c09c1 100644 --- a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java +++ b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java @@ -559,7 +559,10 @@ public class HiveTableSourceITCase extends BatchAbstractTestBase { final String dbName = "source_db"; final String tblName = "test_parallelism_limit_pushdown"; TableEnvironment tEnv = createTableEnv(); - tEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false); + tEnv.getConfig() + .set( + HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE, + HiveOptions.InferMode.NONE); tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); tEnv.executeSql( "CREATE TABLE source_db.test_parallelism_limit_pushdown " @@ -595,7 +598,10 @@ public class HiveTableSourceITCase extends BatchAbstractTestBase { tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); tEnv.registerCatalog("hive", hiveCatalog); tEnv.useCatalog("hive"); - tEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false); + tEnv.getConfig() + .set( + HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE, + HiveOptions.InferMode.NONE); tEnv.executeSql( "CREATE TABLE source_db.test_parallelism_no_infer " + "(`year` STRING, `value` INT) partitioned by (pt int)"); @@ -715,7 +721,7 @@ public class HiveTableSourceITCase extends BatchAbstractTestBase { + " p1 string, p2 string, p3 string) TBLPROPERTIES(" + "'streaming-source.enable'='true'," + "'streaming-source.partition-include'='all'," - + "'streaming-source.consume-order'='create-time'," + + "'streaming-source.partition-order'='create-time'," + "'streaming-source.monitor-interval'='1s'," + "'streaming-source.consume-start-offset'='2020-10-02 00:00:00'" + ")"); @@ -771,7 +777,7 @@ public class HiveTableSourceITCase extends BatchAbstractTestBase { + ") PARTITIONED BY (ts STRING) TBLPROPERTIES (" + "'streaming-source.enable'='true'," + "'streaming-source.monitor-interval'='1s'," - + "'streaming-source.consume-order'='partition-time'" + + "'streaming-source.partition-order'='partition-time'" + ")"); HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName) @@ -896,8 +902,17 @@ public class HiveTableSourceITCase extends BatchAbstractTestBase { TableEnvironment tableEnv = HiveTestUtils.createTableEnvInBatchMode(); tableEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, fallbackMR); - tableEnv.getConfig() - .set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, inferParallelism); + if (inferParallelism) { + tableEnv.getConfig() + .set( + HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE, + HiveOptions.InferMode.STATIC); + } else { + tableEnv.getConfig() + .set( + HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE, + HiveOptions.InferMode.NONE); + } tableEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); tableEnv.registerCatalog(catalogSpy.getName(), catalogSpy); tableEnv.useCatalog(catalogSpy.getName()); diff --git a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/PartitionMonitorTest.java b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/PartitionMonitorTest.java index 18b3e73d..d62a53b9 100644 --- a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/PartitionMonitorTest.java +++ b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/PartitionMonitorTest.java @@ -108,7 +108,7 @@ public class PartitionMonitorTest { Configuration configuration = new Configuration(); ObjectPath tablePath = new ObjectPath("testDb", "testTable"); - configuration.setString("streaming-source.consume-order", "create-time"); + configuration.setString("streaming-source.partition-order", "create-time"); HiveContinuousPartitionContext<Partition, Long> fetcherContext = new HiveContinuousPartitionContext<Partition, Long>() {
