This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-hive.git

commit 82ef7b8eafc212a8936c3e1ba56d46849946c52f
Author: xuyang <[email protected]>
AuthorDate: Mon Sep 23 12:06:49 2024 +0800

    [FLINK-36273][table] Remove all deprecated configuration in 2.0 in 
flink-connector-hive module
---
 .../docs/connectors/table/hive/hive_read_write.md  |  2 +-
 .../docs/connectors/table/hive/hive_read_write.md  |  2 +-
 .../HiveDynamicParallelismInferenceFactory.java    |  8 +------
 .../apache/flink/connectors/hive/HiveOptions.java  | 17 +-------------
 .../connectors/hive/HiveParallelismInference.java  |  1 -
 .../flink/connectors/hive/HiveSourceBuilder.java   |  3 ---
 .../HiveStaticParallelismInferenceFactory.java     |  7 ++----
 .../hive/HiveDynamicPartitionPruningITCase.java    | 10 ++++++--
 .../hive/HiveDynamicTableFactoryTest.java          | 17 --------------
 .../connectors/hive/HiveSinkCompactionITCase.java  |  5 +++-
 .../flink/connectors/hive/HiveTableSinkITCase.java |  4 ++--
 .../connectors/hive/HiveTableSourceITCase.java     | 27 +++++++++++++++++-----
 .../connectors/hive/PartitionMonitorTest.java      |  2 +-
 13 files changed, 42 insertions(+), 63 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/hive/hive_read_write.md 
b/docs/content.zh/docs/connectors/table/hive/hive_read_write.md
index 64f9f7dc..ad986a47 100644
--- a/docs/content.zh/docs/connectors/table/hive/hive_read_write.md
+++ b/docs/content.zh/docs/connectors/table/hive/hive_read_write.md
@@ -69,7 +69,7 @@ Flink 支持以批和流两种模式从 Hive 表中读取数据。批读的时
         <td><h5>streaming-source.partition-order</h5></td>
         <td style="word-wrap: break-word;">partition-name</td>
         <td>String</td>
-        <td>streaming source 分区排序,支持 create-time, partition-time 和 
partition-name。 create-time 比较分区/文件创建时间, 这不是 Hive metastore 
中创建分区的时间,而是文件夹/文件在文件系统的修改时间,如果分区文件夹以某种方式更新,比如添加在文件夹里新增了一个文件,它会影响到数据的使用。partition-time
 从分区名称中抽取时间进行比较。partition-name 会比较分区名称的字典顺序。对于非分区的表,总是会比较 
'create-time'。对于分区表默认值是 'partition-name'。该选项与已经弃用的 
'streaming-source.consume-order' 的选项相同</td>
+        <td>streaming source 分区排序,支持 create-time, partition-time 和 
partition-name。 create-time 比较分区/文件创建时间, 这不是 Hive metastore 
中创建分区的时间,而是文件夹/文件在文件系统的修改时间,如果分区文件夹以某种方式更新,比如添加在文件夹里新增了一个文件,它会影响到数据的使用。partition-time
 从分区名称中抽取时间进行比较。partition-name 会比较分区名称的字典顺序。对于非分区的表,总是会比较 
'create-time'。对于分区表默认值是 'partition-name'。</td>
     </tr>
     <tr>
         <td><h5>streaming-source.consume-start-offset</h5></td>
diff --git a/docs/content/docs/connectors/table/hive/hive_read_write.md 
b/docs/content/docs/connectors/table/hive/hive_read_write.md
index 9a9834b6..2f8252a6 100644
--- a/docs/content/docs/connectors/table/hive/hive_read_write.md
+++ b/docs/content/docs/connectors/table/hive/hive_read_write.md
@@ -79,7 +79,7 @@ of new files in the folder and read new files incrementally.
         <td><h5>streaming-source.partition-order</h5></td>
         <td style="word-wrap: break-word;">partition-name</td>
         <td>String</td>
-        <td>The partition order of streaming source, support create-time, 
partition-time and partition-name. create-time compares partition/file creation 
time, this is not the partition create time in Hive metaStore, but the 
folder/file modification time in filesystem, if the partition folder somehow 
gets updated, e.g. add new file into folder, it can affect how the data is 
consumed. partition-time compares the time extracted from partition name. 
partition-name compares partition name's  [...]
+        <td>The partition order of streaming source, support create-time, 
partition-time and partition-name. create-time compares partition/file creation 
time, this is not the partition create time in Hive metaStore, but the 
folder/file modification time in filesystem, if the partition folder somehow 
gets updated, e.g. add new file into folder, it can affect how the data is 
consumed. partition-time compares the time extracted from partition name. 
partition-name compares partition name's  [...]
     </tr>
     <tr>
         <td><h5>streaming-source.consume-start-offset</h5></td>
diff --git 
a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicParallelismInferenceFactory.java
 
b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicParallelismInferenceFactory.java
index 4932bbee..36e655ec 100644
--- 
a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicParallelismInferenceFactory.java
+++ 
b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicParallelismInferenceFactory.java
@@ -42,17 +42,11 @@ class HiveDynamicParallelismInferenceFactory implements 
HiveParallelismInference
 
     @Override
     public HiveParallelismInference create() {
-        boolean inferEnabled =
-                jobConf.getBoolean(
-                        
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM.key(),
-                        
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM.defaultValue());
         HiveOptions.InferMode inferMode =
                 jobConf.getEnum(
                         
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE.key(),
                         
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE.defaultValue());
-        // This logic should be fixed if config option 
`table.exec.hive.infer-source-parallelism`
-        // is removed.
-        boolean infer = inferEnabled && inferMode == 
HiveOptions.InferMode.DYNAMIC;
+        boolean infer = inferMode == HiveOptions.InferMode.DYNAMIC;
         int inferMaxParallelism =
                 Math.min(
                         (int)
diff --git 
a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java
 
b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java
index c8588f58..6877ef42 100644
--- 
a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java
+++ 
b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java
@@ -51,16 +51,6 @@ public class HiveOptions {
                                     "If it is true, flink will read the files 
of partitioned hive table from subdirectories under the partition directory to 
be read.\n"
                                             + "If it is false, an exception 
that 'not a file: xxx' will be thrown when the partition directory contains any 
sub-directory.");
 
-    /** @deprecated Use {@link #TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE} 
instead. */
-    @Deprecated
-    public static final ConfigOption<Boolean> 
TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM =
-            key("table.exec.hive.infer-source-parallelism")
-                    .booleanType()
-                    .defaultValue(true)
-                    .withDescription(
-                            "If is false, parallelism of source are set by 
config.\n"
-                                    + "If is true, source parallelism is 
inferred according to splits number.\n");
-
     @PublicEvolving
     public static final ConfigOption<InferMode> 
TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE =
             key("table.exec.hive.infer-source-parallelism.mode")
@@ -76,9 +66,7 @@ public class HiveOptions {
                                             text(
                                                     "'dynamic' represents 
dynamic inference, which will infer parallelism at job execution stage and 
could more accurately infer the source parallelism."),
                                             text(
-                                                    "'none' represents 
disabling parallelism inference."),
-                                            text(
-                                                    "Note that it is still 
affected by the deprecated option 'table.exec.hive.infer-source-parallelism', 
requiring its value to be true for enabling parallelism inference."))
+                                                    "'none' represents 
disabling parallelism inference."))
                                     .build());
 
     public static final ConfigOption<Integer> 
TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX =
@@ -250,12 +238,9 @@ public class HiveOptions {
             key("streaming-source.partition-order")
                     .enumType(PartitionOrder.class)
                     .defaultValue(PartitionOrder.PARTITION_NAME)
-                    .withDeprecatedKeys("streaming-source.consume-order")
                     .withDescription(
                             Description.builder()
                                     .text("The partition order of the 
streaming source.")
-                                    .text(
-                                            "This is a synonym for the 
deprecated 'streaming-source.consume-order' option.")
                                     .build());
 
     public static final ConfigOption<Duration> LOOKUP_JOIN_CACHE_TTL =
diff --git 
a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java
 
b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java
index e46eb7f0..8c663c9b 100644
--- 
a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java
+++ 
b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java
@@ -64,7 +64,6 @@ class HiveParallelismInference {
 
     /**
      * Infer parallelism by number of files and number of splits. If {@link
-     * HiveOptions#TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM} is false or {@link
      * HiveOptions#TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE} is none, 
this method does nothing.
      */
     HiveParallelismInference infer(
diff --git 
a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
 
b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
index 84c97779..c2db01d6 100644
--- 
a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
+++ 
b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
@@ -339,9 +339,6 @@ public class HiveSourceBuilder {
         jobConf.set(
                 
HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM.key(),
                 String.valueOf(calPartitionSizeThreadNum));
-        jobConf.setBoolean(
-                HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM.key(),
-                
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM));
         jobConf.set(
                 
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE.key(),
                 String.valueOf(
diff --git 
a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveStaticParallelismInferenceFactory.java
 
b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveStaticParallelismInferenceFactory.java
index 58a20e49..41b02dd3 100644
--- 
a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveStaticParallelismInferenceFactory.java
+++ 
b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveStaticParallelismInferenceFactory.java
@@ -40,12 +40,9 @@ class HiveStaticParallelismInferenceFactory implements 
HiveParallelismInference.
 
     @Override
     public HiveParallelismInference create() {
-        boolean inferEnabled = 
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM);
         HiveOptions.InferMode inferMode =
                 
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE);
-        // This logic should be fixed if config option 
`table.exec.hive.infer-source-parallelism`
-        // is removed.
-        boolean infer = inferEnabled && inferMode == 
HiveOptions.InferMode.STATIC;
+        boolean infer = inferMode == HiveOptions.InferMode.STATIC;
         int inferMaxParallelism =
                 
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX);
         Preconditions.checkArgument(
@@ -55,7 +52,7 @@ class HiveStaticParallelismInferenceFactory implements 
HiveParallelismInference.
         int parallelism =
                 
flinkConf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
         // Keeping the parallelism unset is a prerequisite for dynamic 
parallelism inference.
-        if (inferEnabled && inferMode == HiveOptions.InferMode.DYNAMIC) {
+        if (inferMode == HiveOptions.InferMode.DYNAMIC) {
             parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
         }
 
diff --git 
a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicPartitionPruningITCase.java
 
b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicPartitionPruningITCase.java
index dce66345..27babefb 100644
--- 
a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicPartitionPruningITCase.java
+++ 
b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicPartitionPruningITCase.java
@@ -123,7 +123,10 @@ public class HiveDynamicPartitionPruningITCase {
                 tableEnv.explainSql(
                         "select a, b, c, p, x, y from fact, dim where x = p 
and z = 1 order by a"));
 
-        
tableEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, 
false);
+        tableEnv.getConfig()
+                .set(
+                        
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE,
+                        HiveOptions.InferMode.NONE);
 
         String sql = "select a, b, c, p, x, y from fact, dim where x = p and z 
= 1 order by a";
         String sqlSwapFactDim =
@@ -192,7 +195,10 @@ public class HiveDynamicPartitionPruningITCase {
                         "insert into fact2 partition (p=3) values 
(60,300,'aaa'),(61,301,'bbb'),(62,302,'ccc') ")
                 .await();
 
-        
tableEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, 
false);
+        tableEnv.getConfig()
+                .set(
+                        
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE,
+                        HiveOptions.InferMode.NONE);
 
         // two fact sources share the same dynamic filter
         String sql =
diff --git 
a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java
 
b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java
index aa4bfffa..60ba5342 100644
--- 
a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java
+++ 
b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java
@@ -114,23 +114,6 @@ public class HiveDynamicTableFactoryTest {
         
hiveTableSource3.catalogTable.getOptions().forEach(configuration1::setString);
         PartitionOrder partitionOrder1 = 
configuration1.get(STREAMING_SOURCE_PARTITION_ORDER);
         
assertThat(partitionOrder1).isEqualTo(HiveOptions.PartitionOrder.PARTITION_NAME);
-
-        // test deprecated option key 'streaming-source.consume-order' and new 
key
-        // 'streaming-source.partition-order'
-        tableEnv.executeSql(
-                String.format(
-                        "create table table4 (x int, y string, z int) 
partitioned by ("
-                                + " pt_year int, pt_mon string, pt_day string)"
-                                + " tblproperties ('%s' = 'true', '%s' = 
'partition-time')",
-                        STREAMING_SOURCE_ENABLE.key(), 
"streaming-source.consume-order"));
-        DynamicTableSource tableSource4 = getTableSource("table4");
-        assertThat(tableSource4).isInstanceOf(HiveTableSource.class);
-        HiveTableSource hiveTableSource = (HiveTableSource) tableSource4;
-
-        Configuration configuration2 = new Configuration();
-        
hiveTableSource.catalogTable.getOptions().forEach(configuration2::setString);
-        PartitionOrder partitionOrder2 = 
configuration2.get(STREAMING_SOURCE_PARTITION_ORDER);
-        
assertThat(partitionOrder2).isEqualTo(HiveOptions.PartitionOrder.PARTITION_TIME);
     }
 
     @Test
diff --git 
a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSinkCompactionITCase.java
 
b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSinkCompactionITCase.java
index c90836d3..9905f699 100644
--- 
a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSinkCompactionITCase.java
+++ 
b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSinkCompactionITCase.java
@@ -57,7 +57,10 @@ class HiveSinkCompactionITCase extends CompactionITCaseBase {
         tEnv().useCatalog(hiveCatalog.getName());
 
         // avoid too large parallelism lead to scheduler dead lock in 
streaming mode
-        
tEnv().getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, 
false);
+        tEnv().getConfig()
+                .set(
+                        
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE,
+                        HiveOptions.InferMode.NONE);
 
         super.init();
     }
diff --git 
a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
 
b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
index 160ccedb..2dc725eb 100644
--- 
a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
+++ 
b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
@@ -285,7 +285,7 @@ class HiveTableSinkITCase {
                                     + 
"'partition.time-extractor.timestamp-pattern'='$pt_day $pt_hour:00:00',"
                                     + "'streaming-source.enable'='true',"
                                     + 
"'streaming-source.monitor-interval'='1s',"
-                                    + 
"'streaming-source.consume-order'='partition-time'"
+                                    + 
"'streaming-source.partition-order'='partition-time'"
                                     + ")");
 
                     tEnv.executeSql(
@@ -305,7 +305,7 @@ class HiveTableSinkITCase {
                                     + " 
'sink.partition-commit.success-file.name'='_MY_SUCCESS',"
                                     + " 'streaming-source.enable'='true',"
                                     + " 
'streaming-source.monitor-interval'='1s',"
-                                    + " 
'streaming-source.consume-order'='partition-time'"
+                                    + " 
'streaming-source.partition-order'='partition-time'"
                                     + ")");
 
                     tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
diff --git 
a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
 
b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
index 27833a85..108c09c1 100644
--- 
a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
+++ 
b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
@@ -559,7 +559,10 @@ public class HiveTableSourceITCase extends 
BatchAbstractTestBase {
         final String dbName = "source_db";
         final String tblName = "test_parallelism_limit_pushdown";
         TableEnvironment tEnv = createTableEnv();
-        
tEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, 
false);
+        tEnv.getConfig()
+                .set(
+                        
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE,
+                        HiveOptions.InferMode.NONE);
         
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 2);
         tEnv.executeSql(
                 "CREATE TABLE source_db.test_parallelism_limit_pushdown "
@@ -595,7 +598,10 @@ public class HiveTableSourceITCase extends 
BatchAbstractTestBase {
         tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
         tEnv.registerCatalog("hive", hiveCatalog);
         tEnv.useCatalog("hive");
-        
tEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, 
false);
+        tEnv.getConfig()
+                .set(
+                        
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE,
+                        HiveOptions.InferMode.NONE);
         tEnv.executeSql(
                 "CREATE TABLE source_db.test_parallelism_no_infer "
                         + "(`year` STRING, `value` INT) partitioned by (pt 
int)");
@@ -715,7 +721,7 @@ public class HiveTableSourceITCase extends 
BatchAbstractTestBase {
                         + " p1 string, p2 string, p3 string) TBLPROPERTIES("
                         + "'streaming-source.enable'='true',"
                         + "'streaming-source.partition-include'='all',"
-                        + "'streaming-source.consume-order'='create-time',"
+                        + "'streaming-source.partition-order'='create-time',"
                         + "'streaming-source.monitor-interval'='1s',"
                         + "'streaming-source.consume-start-offset'='2020-10-02 
00:00:00'"
                         + ")");
@@ -771,7 +777,7 @@ public class HiveTableSourceITCase extends 
BatchAbstractTestBase {
                         + ") PARTITIONED BY (ts STRING) TBLPROPERTIES ("
                         + "'streaming-source.enable'='true',"
                         + "'streaming-source.monitor-interval'='1s',"
-                        + "'streaming-source.consume-order'='partition-time'"
+                        + "'streaming-source.partition-order'='partition-time'"
                         + ")");
 
         HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName)
@@ -896,8 +902,17 @@ public class HiveTableSourceITCase extends 
BatchAbstractTestBase {
 
         TableEnvironment tableEnv = HiveTestUtils.createTableEnvInBatchMode();
         
tableEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, 
fallbackMR);
-        tableEnv.getConfig()
-                .set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, 
inferParallelism);
+        if (inferParallelism) {
+            tableEnv.getConfig()
+                    .set(
+                            
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE,
+                            HiveOptions.InferMode.STATIC);
+        } else {
+            tableEnv.getConfig()
+                    .set(
+                            
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE,
+                            HiveOptions.InferMode.NONE);
+        }
         
tableEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 2);
         tableEnv.registerCatalog(catalogSpy.getName(), catalogSpy);
         tableEnv.useCatalog(catalogSpy.getName());
diff --git 
a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/PartitionMonitorTest.java
 
b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/PartitionMonitorTest.java
index 18b3e73d..d62a53b9 100644
--- 
a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/PartitionMonitorTest.java
+++ 
b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/PartitionMonitorTest.java
@@ -108,7 +108,7 @@ public class PartitionMonitorTest {
         Configuration configuration = new Configuration();
 
         ObjectPath tablePath = new ObjectPath("testDb", "testTable");
-        configuration.setString("streaming-source.consume-order", 
"create-time");
+        configuration.setString("streaming-source.partition-order", 
"create-time");
 
         HiveContinuousPartitionContext<Partition, Long> fetcherContext =
                 new HiveContinuousPartitionContext<Partition, Long>() {

Reply via email to