This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ddb5a5355f9 [FLINK-35293][hive] Hive source supports dynamic 
parallelism inference
ddb5a5355f9 is described below

commit ddb5a5355f9aca3d223f1fff6581d83dd317c2de
Author: sunxia <xingbe...@gmail.com>
AuthorDate: Tue May 7 14:49:17 2024 +0800

    [FLINK-35293][hive] Hive source supports dynamic parallelism inference
---
 .../docs/connectors/table/hive/hive_read_write.md  |  15 +-
 .../docs/connectors/table/hive/hive_read_write.md  |  17 +-
 .../connector/file/src/AbstractFileSource.java     |   2 +-
 .../connector/file/table/LimitableBulkFormat.java  |   4 +
 .../HiveDynamicParallelismInferenceFactory.java    |  67 +++++
 .../apache/flink/connectors/hive/HiveOptions.java  |  53 +++-
 .../connectors/hive/HiveParallelismInference.java  |  33 ++-
 .../apache/flink/connectors/hive/HiveSource.java   |  55 +++-
 .../flink/connectors/hive/HiveSourceBuilder.java   |  11 +
 .../hive/HiveSourceDynamicFileEnumerator.java      |   3 +-
 .../connectors/hive/HiveSourceFileEnumerator.java  |   4 +
 .../HiveStaticParallelismInferenceFactory.java     |  64 +++++
 .../flink/connectors/hive/HiveTableSource.java     |   3 +-
 .../hive/HiveSourceDynamicFileEnumeratorTest.java  |  17 +-
 .../flink/connectors/hive/HiveSourceITCase.java    |   2 +-
 .../flink/connectors/hive/HiveSourceTest.java      | 309 +++++++++++++++++++++
 .../flink/table/catalog/hive/HiveTestUtils.java    |  21 ++
 17 files changed, 632 insertions(+), 48 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/hive/hive_read_write.md 
b/docs/content.zh/docs/connectors/table/hive/hive_read_write.md
index bdd3ac4f06d..64f9f7dc8f0 100644
--- a/docs/content.zh/docs/connectors/table/hive/hive_read_write.md
+++ b/docs/content.zh/docs/connectors/table/hive/hive_read_write.md
@@ -136,16 +136,21 @@ Flink 允许你灵活的配置并发推断策略。你可以在 `TableConfig` 
   </thead>
   <tbody>
     <tr>
-        <td><h5>table.exec.hive.infer-source-parallelism</h5></td>
-        <td style="word-wrap: break-word;">true</td>
-        <td>Boolean</td>
-        <td>如果是 true,会根据 split 的数量推断 source 的并发度。如果是 false,source 
的并发度由配置决定。</td>
+        <td><h5>table.exec.hive.infer-source-parallelism.mode</h5></td>
+        <td style="word-wrap: break-word;">dynamic</td>
+        <td>InferMode</td>
+        <td>选择Hive Source并行度推断模式的选项,根据splits数推断并行度。
+            'static' 代表静态推断,它会在作业创建阶段推断Source并行度。
+            'dynamic' 代表动态推断,它会在作业执行阶段利用运行时信息更准确地推断Source并行度。
+            'none' 代表禁用并行度推断。
+            注意,它仍然受到已弃用选项 'table.exec.hive.infer-source-parallelism' 的影响,需要其值为 
true 才能启用并行度推断。
+        </td>
     </tr>
     <tr>
         <td><h5>table.exec.hive.infer-source-parallelism.max</h5></td>
         <td style="word-wrap: break-word;">1000</td>
         <td>Integer</td>
-        <td>设置 source operator 推断的最大并发度。</td>
+        <td>设置 source operator 推断的最大并发度。请注意,默认值仅在静态并行性推断模式下有效。</td>
     </tr>
   </tbody>
 </table>
diff --git a/docs/content/docs/connectors/table/hive/hive_read_write.md 
b/docs/content/docs/connectors/table/hive/hive_read_write.md
index 98742a92be7..9a9834b6579 100644
--- a/docs/content/docs/connectors/table/hive/hive_read_write.md
+++ b/docs/content/docs/connectors/table/hive/hive_read_write.md
@@ -152,16 +152,23 @@ following parameters in `TableConfig` (note that these 
parameters affect all sou
   </thead>
   <tbody>
     <tr>
-        <td><h5>table.exec.hive.infer-source-parallelism</h5></td>
-        <td style="word-wrap: break-word;">true</td>
-        <td>Boolean</td>
-        <td>If is true, source parallelism is inferred according to splits 
number. If is false, parallelism of source are set by config.</td>
+        <td><h5>table.exec.hive.infer-source-parallelism.mode</h5></td>
+        <td style="word-wrap: break-word;">dynamic</td>
+        <td>InferMode</td>
+        <td>An option for selecting the hive source parallelism inference mode 
to infer parallelism according to splits number.
+            'static' represents static inference, which will infer source 
parallelism at job creation stage.
+            'dynamic' represents dynamic inference, which will infer 
parallelism at job execution stage and could more accurately infer the source 
parallelism.
+            'none' represents disabling parallelism inference.
+            Note that it is still affected by the deprecated option 
'table.exec.hive.infer-source-parallelism', requiring its value to be true for 
enabling parallelism inference.
+        </td>
     </tr>
     <tr>
         <td><h5>table.exec.hive.infer-source-parallelism.max</h5></td>
         <td style="word-wrap: break-word;">1000</td>
         <td>Integer</td>
-        <td>Sets max infer parallelism for source operator.</td>
+        <td>Sets max infer parallelism for source operator.
+            Note that the default value is effective only in the static 
parallelism inference mode.
+        </td>
     </tr>
   </tbody>
 </table>
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
index b14d46b3f9c..c9133172070 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
@@ -100,7 +100,7 @@ public abstract class AbstractFileSource<T, SplitT extends 
FileSourceSplit>
     //  Getters
     // ------------------------------------------------------------------------
 
-    FileEnumerator.Provider getEnumeratorFactory() {
+    protected FileEnumerator.Provider getEnumeratorFactory() {
         return enumeratorFactory;
     }
 
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/LimitableBulkFormat.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/LimitableBulkFormat.java
index feaeb1d9842..2d95f557876 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/LimitableBulkFormat.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/LimitableBulkFormat.java
@@ -84,6 +84,10 @@ public class LimitableBulkFormat<T, SplitT extends 
FileSourceSplit>
         return globalNumberRead != null && globalNumberRead.get() >= limit;
     }
 
+    public long getLimit() {
+        return this.limit;
+    }
+
     @Override
     public boolean isSplittable() {
         return format.isSplittable();
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicParallelismInferenceFactory.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicParallelismInferenceFactory.java
new file mode 100644
index 00000000000..b14e0628954
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicParallelismInferenceFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.table.catalog.ObjectPath;
+
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * The factory class for {@link HiveParallelismInference} to support Hive 
source dynamic parallelism
+ * inference.
+ */
+class HiveDynamicParallelismInferenceFactory implements 
HiveParallelismInference.Provider {
+
+    private final ObjectPath tablePath;
+    private final JobConf jobConf;
+    private final int globalMaxParallelism;
+
+    HiveDynamicParallelismInferenceFactory(
+            ObjectPath tablePath, JobConf jobConf, int globalMaxParallelism) {
+        this.tablePath = tablePath;
+        this.jobConf = jobConf;
+        this.globalMaxParallelism = globalMaxParallelism;
+    }
+
+    @Override
+    public HiveParallelismInference create() {
+        boolean inferEnabled =
+                jobConf.getBoolean(
+                        
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM.key(),
+                        
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM.defaultValue());
+        HiveOptions.InferMode inferMode =
+                jobConf.getEnum(
+                        
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE.key(),
+                        
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE.defaultValue());
+        // This logic should be fixed if config option 
`table.exec.hive.infer-source-parallelism`
+        // is removed.
+        boolean infer = inferEnabled && inferMode == 
HiveOptions.InferMode.DYNAMIC;
+        int inferMaxParallelism =
+                Math.min(
+                        (int)
+                                jobConf.getLong(
+                                        
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX
+                                                .key(),
+                                        globalMaxParallelism),
+                        globalMaxParallelism);
+        int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
+        return new HiveParallelismInference(tablePath, infer, 
inferMaxParallelism, parallelism);
+    }
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java
index 01e1493681d..c8588f58a76 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connectors.hive;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.DescribedEnum;
 import org.apache.flink.configuration.MemorySize;
@@ -50,6 +51,8 @@ public class HiveOptions {
                                     "If it is true, flink will read the files 
of partitioned hive table from subdirectories under the partition directory to 
be read.\n"
                                             + "If it is false, an exception 
that 'not a file: xxx' will be thrown when the partition directory contains any 
sub-directory.");
 
+    /** @deprecated Use {@link #TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE} 
instead. */
+    @Deprecated
     public static final ConfigOption<Boolean> 
TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM =
             key("table.exec.hive.infer-source-parallelism")
                     .booleanType()
@@ -58,11 +61,33 @@ public class HiveOptions {
                             "If is false, parallelism of source are set by 
config.\n"
                                     + "If is true, source parallelism is 
inferred according to splits number.\n");
 
+    @PublicEvolving
+    public static final ConfigOption<InferMode> 
TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE =
+            key("table.exec.hive.infer-source-parallelism.mode")
+                    .enumType(InferMode.class)
+                    .defaultValue(InferMode.DYNAMIC)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "An option for selecting the hive 
source parallelism inference mode to infer parallelism according to splits 
number.")
+                                    .list(
+                                            text(
+                                                    "'static' represents 
static inference, which will infer source parallelism at job creation stage."),
+                                            text(
+                                                    "'dynamic' represents 
dynamic inference, which will infer parallelism at job execution stage and 
could more accurately infer the source parallelism."),
+                                            text(
+                                                    "'none' represents 
disabling parallelism inference."),
+                                            text(
+                                                    "Note that it is still 
affected by the deprecated option 'table.exec.hive.infer-source-parallelism', 
requiring its value to be true for enabling parallelism inference."))
+                                    .build());
+
     public static final ConfigOption<Integer> 
TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX =
             key("table.exec.hive.infer-source-parallelism.max")
                     .intType()
                     .defaultValue(1000)
-                    .withDescription("Sets max infer parallelism for source 
operator.");
+                    .withDescription(
+                            "Sets max infer parallelism for source operator. "
+                                    + "Note that the default value is 
effective only in the static parallelism inference mode.");
 
     public static final ConfigOption<Boolean> 
TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER =
             key("table.exec.hive.fallback-mapred-writer")
@@ -281,4 +306,30 @@ public class HiveOptions {
             return description;
         }
     }
+
+    /** Infer mode used for {@link 
#TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE}. */
+    public enum InferMode implements DescribedEnum {
+        STATIC("static", text("Static parallelism inference mode.")),
+        DYNAMIC("dynamic", text("Dynamic parallelism inference mode.")),
+        NONE("none", text("Disable parallelism inference."));
+
+        private final String value;
+
+        private final InlineElement description;
+
+        InferMode(String value, InlineElement description) {
+            this.value = value;
+            this.description = description;
+        }
+
+        @Override
+        public String toString() {
+            return value;
+        }
+
+        @Override
+        public InlineElement getDescription() {
+            return description;
+        }
+    }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java
index 850008fe5d2..e46eb7f0d7b 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java
@@ -18,10 +18,7 @@
 
 package org.apache.flink.connectors.hive;
 
-import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.SupplierWithException;
 
 import org.slf4j.Logger;
@@ -40,18 +37,12 @@ class HiveParallelismInference {
 
     private int parallelism;
 
-    HiveParallelismInference(ObjectPath tablePath, ReadableConfig flinkConf) {
+    HiveParallelismInference(
+            ObjectPath tablePath, boolean infer, int inferMaxParallelism, int 
parallelism) {
         this.tablePath = tablePath;
-        this.infer = 
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM);
-        this.inferMaxParallelism =
-                
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX);
-        Preconditions.checkArgument(
-                inferMaxParallelism >= 1,
-                HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX.key()
-                        + " cannot be less than 1");
-
-        this.parallelism =
-                
flinkConf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
+        this.infer = infer;
+        this.inferMaxParallelism = inferMaxParallelism;
+        this.parallelism = parallelism;
     }
 
     /**
@@ -73,7 +64,8 @@ class HiveParallelismInference {
 
     /**
      * Infer parallelism by number of files and number of splits. If {@link
-     * HiveOptions#TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM} is not set this 
method does nothing.
+     * HiveOptions#TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM} is false or {@link
+     * HiveOptions#TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE} is none, 
this method does nothing.
      */
     HiveParallelismInference infer(
             SupplierWithException<Integer, IOException> numFiles,
@@ -113,4 +105,15 @@ class HiveParallelismInference {
                 result);
         return result;
     }
+
+    /** Factory for the {@code HiveParallelismInference}. */
+    interface Provider {
+
+        /**
+         * Creates a new {@code HiveParallelismInference}.
+         *
+         * @return a new {@code HiveParallelismInference} with designated 
factors.
+         */
+        HiveParallelismInference create();
+    }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
index 5b768b0320a..6c423e747c8 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
@@ -20,6 +20,8 @@ package org.apache.flink.connectors.hive;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.DynamicFilteringInfo;
+import org.apache.flink.api.connector.source.DynamicParallelismInference;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.connector.file.src.AbstractFileSource;
@@ -29,10 +31,12 @@ import 
org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
 import org.apache.flink.connector.file.src.enumerate.FileEnumerator;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.connector.file.table.ContinuousPartitionFetcher;
+import org.apache.flink.connector.file.table.LimitableBulkFormat;
 import org.apache.flink.connectors.hive.read.HiveSourceSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.util.Preconditions;
 
@@ -55,7 +59,8 @@ import java.util.List;
  * @param <T> the type of record returned by this source
  */
 @PublicEvolving
-public class HiveSource<T> extends AbstractFileSource<T, HiveSourceSplit> {
+public class HiveSource<T> extends AbstractFileSource<T, HiveSourceSplit>
+        implements DynamicParallelismInference {
 
     private static final long serialVersionUID = 1L;
 
@@ -68,6 +73,7 @@ public class HiveSource<T> extends AbstractFileSource<T, 
HiveSourceSplit> {
     private final ContinuousPartitionFetcher<Partition, ?> fetcher;
     private final HiveTableSource.HiveContinuousPartitionFetcherContext<?> 
fetcherContext;
     private final ObjectPath tablePath;
+    private Long limit = null;
 
     HiveSource(
             Path[] inputPaths,
@@ -97,6 +103,9 @@ public class HiveSource<T> extends AbstractFileSource<T, 
HiveSourceSplit> {
         this.partitionBytes = partitionBytes;
         this.fetcher = fetcher;
         this.fetcherContext = fetcherContext;
+        if (readerFormat instanceof LimitableBulkFormat) {
+            limit = ((LimitableBulkFormat<?, ?>) readerFormat).getLimit();
+        }
     }
 
     @Override
@@ -186,4 +195,48 @@ public class HiveSource<T> extends AbstractFileSource<T, 
HiveSourceSplit> {
                         jobConfWrapper),
                 getAssignerFactory());
     }
+
+    @Override
+    public int inferParallelism(Context dynamicParallelismContext) {
+        FileEnumerator fileEnumerator;
+        List<HiveTablePartition> partitions;
+        if (dynamicFilterPartitionKeys != null) {
+            fileEnumerator =
+                    new HiveSourceDynamicFileEnumerator.Provider(
+                                    tablePath.getFullName(),
+                                    dynamicFilterPartitionKeys,
+                                    partitionBytes,
+                                    hiveVersion,
+                                    jobConfWrapper)
+                            .create();
+            if 
(dynamicParallelismContext.getDynamicFilteringInfo().isPresent()) {
+                DynamicFilteringInfo dynamicFilteringInfo =
+                        
dynamicParallelismContext.getDynamicFilteringInfo().get();
+                if (dynamicFilteringInfo instanceof DynamicFilteringEvent) {
+                    ((HiveSourceDynamicFileEnumerator) fileEnumerator)
+                            .setDynamicFilteringData(
+                                    ((DynamicFilteringEvent) 
dynamicFilteringInfo).getData());
+                }
+            }
+            partitions = ((HiveSourceDynamicFileEnumerator) 
fileEnumerator).getFinalPartitions();
+        } else {
+            fileEnumerator = getEnumeratorFactory().create();
+            partitions = ((HiveSourceFileEnumerator) 
fileEnumerator).getPartitions();
+        }
+
+        return new HiveDynamicParallelismInferenceFactory(
+                        tablePath,
+                        jobConfWrapper.conf(),
+                        
dynamicParallelismContext.getParallelismInferenceUpperBound())
+                .create()
+                .infer(
+                        () ->
+                                HiveSourceFileEnumerator.getNumFiles(
+                                        partitions, jobConfWrapper.conf()),
+                        () ->
+                                HiveSourceFileEnumerator.createInputSplits(
+                                                0, partitions, 
jobConfWrapper.conf(), true)
+                                        .size())
+                .limit(limit);
+    }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
index f04e8c514f2..28b4565047d 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
@@ -60,6 +60,7 @@ import java.time.Duration;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 
 import static 
org.apache.flink.connector.file.src.FileSource.DEFAULT_SPLIT_ASSIGNER;
@@ -340,6 +341,16 @@ public class HiveSourceBuilder {
         jobConf.set(
                 
HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM.key(),
                 String.valueOf(calPartitionSizeThreadNum));
+
+        jobConf.setBoolean(
+                HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM.key(),
+                
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM));
+        jobConf.set(
+                
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE.key(),
+                String.valueOf(
+                                flinkConf.get(
+                                        
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE))
+                        .toUpperCase(Locale.ROOT));
     }
 
     private boolean isStreamingSource() {
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumerator.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumerator.java
index 4d84fd5e048..1d592cf4d33 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumerator.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumerator.java
@@ -177,8 +177,7 @@ public class HiveSourceDynamicFileEnumerator implements 
DynamicFileEnumerator {
                         minDesiredSplits, finalPartitions, jobConf, false));
     }
 
-    @VisibleForTesting
-    List<HiveTablePartition> getFinalPartitions() {
+    public List<HiveTablePartition> getFinalPartitions() {
         return finalPartitions;
     }
 
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java
index c30cd1c8e1b..500fe54d979 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java
@@ -202,6 +202,10 @@ public class HiveSourceFileEnumerator implements 
FileEnumerator {
         return numFiles;
     }
 
+    public List<HiveTablePartition> getPartitions() {
+        return this.partitions;
+    }
+
     private static long getSplitMaxSize(JobConf jobConf) {
         return jobConf.getLong(
                 HiveOptions.TABLE_EXEC_HIVE_SPLIT_MAX_BYTES.key(),
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveStaticParallelismInferenceFactory.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveStaticParallelismInferenceFactory.java
new file mode 100644
index 00000000000..58a20e498d6
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveStaticParallelismInferenceFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * The factory class for {@link HiveParallelismInference} to support Hive 
source static parallelism
+ * inference.
+ */
+class HiveStaticParallelismInferenceFactory implements 
HiveParallelismInference.Provider {
+
+    private final ObjectPath tablePath;
+    private final ReadableConfig flinkConf;
+
+    HiveStaticParallelismInferenceFactory(ObjectPath tablePath, ReadableConfig 
flinkConf) {
+        this.tablePath = tablePath;
+        this.flinkConf = flinkConf;
+    }
+
+    @Override
+    public HiveParallelismInference create() {
+        boolean inferEnabled = 
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM);
+        HiveOptions.InferMode inferMode =
+                
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE);
+        // This logic should be fixed if config option 
`table.exec.hive.infer-source-parallelism`
+        // is removed.
+        boolean infer = inferEnabled && inferMode == 
HiveOptions.InferMode.STATIC;
+        int inferMaxParallelism =
+                
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX);
+        Preconditions.checkArgument(
+                inferMaxParallelism >= 1,
+                HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX.key()
+                        + " cannot be less than 1");
+        int parallelism =
+                
flinkConf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
+        // Keeping the parallelism unset is a prerequisite for dynamic 
parallelism inference.
+        if (inferEnabled && inferMode == HiveOptions.InferMode.DYNAMIC) {
+            parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
+        }
+
+        return new HiveParallelismInference(tablePath, infer, 
inferMaxParallelism, parallelism);
+    }
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
index 91931300217..f174d107cb0 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
@@ -171,7 +171,8 @@ public class HiveTableSource
                             remainingPartitions);
 
             int parallelism =
-                    new HiveParallelismInference(tablePath, flinkConf)
+                    new HiveStaticParallelismInferenceFactory(tablePath, 
flinkConf)
+                            .create()
                             .infer(
                                     () ->
                                             
HiveSourceFileEnumerator.getNumFiles(
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumeratorTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumeratorTest.java
index 93ddb42b339..58c7b0b2170 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumeratorTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumeratorTest.java
@@ -18,10 +18,8 @@
 
 package org.apache.flink.connectors.hive;
 
-import org.apache.flink.api.common.serialization.SerializerConfigImpl;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.connector.source.DynamicFilteringData;
 import org.apache.flink.table.data.GenericRowData;
@@ -47,8 +45,6 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.mapred.JobConf;
 import org.junit.jupiter.api.Test;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
@@ -60,6 +56,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.table.catalog.hive.HiveTestUtils.serialize;
 import static org.apache.flink.table.data.StringData.fromString;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -215,16 +212,4 @@ class HiveSourceDynamicFileEnumeratorTest {
                 HiveShimLoader.getHiveVersion(),
                 new JobConf());
     }
-
-    private byte[] serialize(TypeInformation<RowData> typeInfo, RowData row) {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        try {
-            typeInfo.createSerializer(new SerializerConfigImpl())
-                    .serialize(row, new DataOutputViewStreamWrapper(baos));
-        } catch (IOException e) {
-            // throw as RuntimeException so the function can use in lambda
-            throw new RuntimeException(e);
-        }
-        return baos.toByteArray();
-    }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceITCase.java
index 8cb63de76b0..1fb37bacddb 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceITCase.java
@@ -50,7 +50,7 @@ import static 
org.apache.flink.table.catalog.hive.util.Constants.IDENTIFIER;
 import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for {@link HiveSource}. */
+/** IT case for {@link HiveSource}. */
 public class HiveSourceITCase {
 
     private static HiveCatalog hiveCatalog;
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceTest.java
new file mode 100644
index 00000000000..3c08dbbb873
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceTest.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.DynamicFilteringInfo;
+import org.apache.flink.api.connector.source.DynamicParallelismInference;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.catalog.hive.util.Constants.IDENTIFIER;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link HiveSource}. */
+class HiveSourceTest {
+
+    private static HiveCatalog hiveCatalog;
+
+    private static final List<String> keys = Collections.singletonList("p");
+
+    private static final List<Map<String, String>> partitionSpecs =
+            Arrays.asList(
+                    Collections.singletonMap("p", "1"),
+                    Collections.singletonMap("p", "2"),
+                    Collections.singletonMap("p", "3"));
+
+    @BeforeAll
+    static void setup() throws Exception {
+        hiveCatalog = HiveTestUtils.createHiveCatalog();
+        hiveCatalog.open();
+    }
+
+    @AfterAll
+    static void tearDown() {
+        if (hiveCatalog != null) {
+            hiveCatalog.close();
+        }
+    }
+
+    @Test
+    void testDynamicParallelismInference() throws Exception {
+        // test non-partitioned table
+        ObjectPath tablePath1 = new ObjectPath("default", "hiveNonPartTbl");
+        createTable(tablePath1, hiveCatalog, false);
+
+        HiveSource<RowData> hiveSource =
+                new HiveSourceBuilder(
+                                new JobConf(hiveCatalog.getHiveConf()),
+                                new Configuration(),
+                                HiveShimLoader.getHiveVersion(),
+                                tablePath1.getDatabaseName(),
+                                tablePath1.getObjectName(),
+                                Collections.emptyMap())
+                        .buildWithDefaultBulkFormat();
+
+        DynamicParallelismInference.Context context =
+                genDynamicParallelismContext(10, Collections.emptyList());
+        assertThat(hiveSource.inferParallelism(context)).isEqualTo(1);
+
+        hiveCatalog.dropTable(tablePath1, false);
+
+        // test partitioned table
+        ObjectPath tablePath2 = new ObjectPath("default", "hiveTbl1");
+        createTable(tablePath2, hiveCatalog, true);
+
+        hiveSource =
+                new HiveSourceBuilder(
+                                new JobConf(hiveCatalog.getHiveConf()),
+                                new Configuration(),
+                                HiveShimLoader.getHiveVersion(),
+                                tablePath2.getDatabaseName(),
+                                tablePath2.getObjectName(),
+                                Collections.emptyMap())
+                        .setPartitions(
+                                partitionSpecs.stream()
+                                        .map(
+                                                spec ->
+                                                        
HiveTablePartition.ofPartition(
+                                                                
hiveCatalog.getHiveConf(),
+                                                                
hiveCatalog.getHiveVersion(),
+                                                                
tablePath2.getDatabaseName(),
+                                                                
tablePath2.getObjectName(),
+                                                                new 
LinkedHashMap<>(spec)))
+                                        .collect(Collectors.toList()))
+                        .buildWithDefaultBulkFormat();
+
+        // test inferred parallelism less than maxParallelism
+        context = genDynamicParallelismContext(10, Collections.emptyList());
+        assertThat(hiveSource.inferParallelism(context)).isEqualTo(3);
+
+        // test inferred parallelism larger than maxParallelism
+        context = genDynamicParallelismContext(2, Collections.emptyList());
+        assertThat(hiveSource.inferParallelism(context)).isEqualTo(2);
+
+        hiveCatalog.dropTable(tablePath2, false);
+    }
+
+    @Test
+    void testDynamicParallelismInferenceWithLimit() throws Exception {
+        ObjectPath tablePath = new ObjectPath("default", "hiveTbl2");
+        createTable(tablePath, hiveCatalog, true);
+
+        HiveSource<RowData> hiveSource =
+                new HiveSourceBuilder(
+                                new JobConf(hiveCatalog.getHiveConf()),
+                                new Configuration(),
+                                HiveShimLoader.getHiveVersion(),
+                                tablePath.getDatabaseName(),
+                                tablePath.getObjectName(),
+                                Collections.emptyMap())
+                        .setPartitions(
+                                partitionSpecs.stream()
+                                        .map(
+                                                spec ->
+                                                        
HiveTablePartition.ofPartition(
+                                                                
hiveCatalog.getHiveConf(),
+                                                                
hiveCatalog.getHiveVersion(),
+                                                                
tablePath.getDatabaseName(),
+                                                                
tablePath.getObjectName(),
+                                                                new 
LinkedHashMap<>(spec)))
+                                        .collect(Collectors.toList()))
+                        .setLimit(1L)
+                        .buildWithDefaultBulkFormat();
+
+        // test inferred parallelism less than maxParallelism
+        DynamicParallelismInference.Context context =
+                genDynamicParallelismContext(10, Collections.emptyList());
+        assertThat(hiveSource.inferParallelism(context)).isEqualTo(1);
+
+        hiveCatalog.dropTable(tablePath, false);
+    }
+
+    @Test
+    void testDynamicParallelismInferenceWithFiltering() throws Exception {
+        ObjectPath tablePath = new ObjectPath("default", "hiveTbl3");
+        createTable(tablePath, hiveCatalog, true);
+
+        HiveSource<RowData> hiveSource =
+                new HiveSourceBuilder(
+                                new JobConf(hiveCatalog.getHiveConf()),
+                                new Configuration(),
+                                HiveShimLoader.getHiveVersion(),
+                                tablePath.getDatabaseName(),
+                                tablePath.getObjectName(),
+                                Collections.emptyMap())
+                        .setDynamicFilterPartitionKeys(keys)
+                        .setPartitions(
+                                partitionSpecs.stream()
+                                        .map(
+                                                spec ->
+                                                        
HiveTablePartition.ofPartition(
+                                                                
hiveCatalog.getHiveConf(),
+                                                                
hiveCatalog.getHiveVersion(),
+                                                                
tablePath.getDatabaseName(),
+                                                                
tablePath.getObjectName(),
+                                                                new 
LinkedHashMap<>(spec)))
+                                        .collect(Collectors.toList()))
+                        .buildWithDefaultBulkFormat();
+
+        DynamicParallelismInference.Context context =
+                genDynamicParallelismContext(10, Arrays.asList(1, 2));
+
+        assertThat(hiveSource.inferParallelism(context)).isEqualTo(2);
+        hiveCatalog.dropTable(tablePath, false);
+    }
+
+    private void createTable(ObjectPath tablePath, HiveCatalog hiveCatalog, 
boolean isPartitioned)
+            throws Exception {
+        Map<String, String> tableOptions = new HashMap<>();
+        tableOptions.put(CONNECTOR.key(), IDENTIFIER);
+
+        List<Column> partitionTableColumns = new ArrayList<>();
+        partitionTableColumns.add(Column.physical("i", DataTypes.INT()));
+        if (isPartitioned) {
+            HiveSourceTest.keys.stream()
+                    .map(key -> Column.physical(key, DataTypes.STRING()))
+                    .forEach(partitionTableColumns::add);
+        }
+        ResolvedSchema partitionTableRSchema = 
ResolvedSchema.of(partitionTableColumns);
+
+        hiveCatalog.createTable(
+                tablePath,
+                new ResolvedCatalogTable(
+                        CatalogTable.of(
+                                Schema.newBuilder()
+                                        
.fromResolvedSchema(partitionTableRSchema)
+                                        .build(),
+                                null,
+                                isPartitioned ? keys : Collections.emptyList(),
+                                tableOptions),
+                        partitionTableRSchema),
+                false);
+
+        if (isPartitioned) {
+            HiveSourceTest.partitionSpecs.forEach(
+                    spec -> {
+                        try {
+                            HiveTestUtils.createTextTableInserter(
+                                            hiveCatalog,
+                                            tablePath.getDatabaseName(),
+                                            tablePath.getObjectName())
+                                    .addRow(new Object[] {1})
+                                    .addRow(new Object[] {2})
+                                    .commit(
+                                            spec.keySet().iterator().next()
+                                                    + "='"
+                                                    + 
spec.values().iterator().next()
+                                                    + "'");
+                        } catch (Exception e) {
+                            throw new RuntimeException(e);
+                        }
+                    });
+        } else {
+            HiveTestUtils.createTextTableInserter(
+                            hiveCatalog, tablePath.getDatabaseName(), 
tablePath.getObjectName())
+                    .addRow(new Object[] {1})
+                    .addRow(new Object[] {2})
+                    .commit();
+        }
+    }
+
+    private DynamicParallelismInference.Context genDynamicParallelismContext(
+            int maxParallelism, List<Integer> filteringPartitions) {
+        return new DynamicParallelismInference.Context() {
+            @Override
+            public Optional<DynamicFilteringInfo> getDynamicFilteringInfo() {
+                RowType rowType = RowType.of(new IntType());
+                TypeInformation<RowData> rowTypeInfo = 
InternalTypeInfo.of(rowType);
+                if (!filteringPartitions.isEmpty()) {
+                    List<byte[]> serializedRows =
+                            filteringPartitions.stream()
+                                    .map(
+                                            key -> {
+                                                GenericRowData filteringRow = 
new GenericRowData(1);
+                                                filteringRow.setField(0, key);
+                                                return HiveTestUtils.serialize(
+                                                        rowTypeInfo, 
filteringRow);
+                                            })
+                                    .collect(Collectors.toList());
+
+                    DynamicFilteringData data =
+                            new DynamicFilteringData(
+                                    InternalTypeInfo.of(rowType), rowType, 
serializedRows, true);
+                    return Optional.of(new DynamicFilteringEvent(data));
+                } else {
+                    return Optional.empty();
+                }
+            }
+
+            @Override
+            public int getParallelismInferenceUpperBound() {
+                return maxParallelism;
+            }
+
+            @Override
+            public long getDataVolumePerTask() {
+                return 10L;
+            }
+        };
+    }
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
index 2cefd641ae9..80bb609ea85 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
@@ -18,10 +18,14 @@
 
 package org.apache.flink.table.catalog.hive;
 
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.BatchExecutionOptions;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connectors.hive.HiveOptions;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Schema;
@@ -33,6 +37,7 @@ import org.apache.flink.table.catalog.CatalogTest;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.operations.ddl.AddPartitionsOperation;
 import org.apache.flink.table.types.DataType;
@@ -47,6 +52,7 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
@@ -59,6 +65,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 
+import static 
org.apache.flink.connectors.hive.HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE;
 import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
 
 /** Test utils for Hive connector. */
@@ -158,6 +165,8 @@ public class HiveTestUtils {
     public static TableEnvironment createTableEnvInBatchMode(SqlDialect 
dialect) {
         TableEnvironment tableEnv = 
TableEnvironment.create(EnvironmentSettings.inBatchMode());
         tableEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+        tableEnv.getConfig()
+                .set(TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE, 
HiveOptions.InferMode.STATIC);
         tableEnv.getConfig().setSqlDialect(dialect);
         return tableEnv;
     }
@@ -204,6 +213,18 @@ public class HiveTestUtils {
         return new TextTableInserter(hiveCatalog, dbName, tableName);
     }
 
+    public static byte[] serialize(TypeInformation<RowData> typeInfo, RowData 
row) {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try {
+            typeInfo.createSerializer(new SerializerConfigImpl())
+                    .serialize(row, new DataOutputViewStreamWrapper(baos));
+        } catch (IOException e) {
+            // throw as RuntimeException so the function can use in lambda
+            throw new RuntimeException(e);
+        }
+        return baos.toByteArray();
+    }
+
     /** insert table operation. */
     public static class TextTableInserter {
 

Reply via email to