This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 01c3fd67ac4 [FLINK-35656][hive] Fix the issue that Hive Source 
incorrectly set max parallelism in dynamic inference mode
01c3fd67ac4 is described below

commit 01c3fd67ac46898bd520477ae861cd29cceaa636
Author: sunxia <xingbe...@gmail.com>
AuthorDate: Thu Jun 20 15:35:20 2024 +0800

    [FLINK-35656][hive] Fix the issue that Hive Source incorrectly set max 
parallelism in dynamic inference mode
    
    This closes #24962.
---
 .../HiveDynamicParallelismInferenceFactory.java    | 12 +++
 .../flink/connectors/hive/HiveSourceBuilder.java   | 25 +++++-
 .../flink/connectors/hive/HiveSourceTest.java      | 91 ++++++++++------------
 3 files changed, 76 insertions(+), 52 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicParallelismInferenceFactory.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicParallelismInferenceFactory.java
index b14e0628954..4932bbee8b4 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicParallelismInferenceFactory.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicParallelismInferenceFactory.java
@@ -62,6 +62,18 @@ class HiveDynamicParallelismInferenceFactory implements 
HiveParallelismInference
                                         globalMaxParallelism),
                         globalMaxParallelism);
         int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
+        adjustInferMaxParallelism(jobConf, inferMaxParallelism);
         return new HiveParallelismInference(tablePath, infer, 
inferMaxParallelism, parallelism);
     }
+
+    /**
+     * Reset infer source max parallelism in jobConf, and {@link
+     * HiveSourceFileEnumerator#createInputSplits} will infer InputSplits 
based on the
+     * inferMaxParallelism.
+     */
+    private void adjustInferMaxParallelism(JobConf jobConf, int 
inferMaxParallelism) {
+        jobConf.set(
+                HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX.key(),
+                String.valueOf(inferMaxParallelism));
+    }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
index 28b4565047d..84c9777985c 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
@@ -19,6 +19,7 @@
 package org.apache.flink.connectors.hive;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.BatchExecutionOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.connector.file.src.ContinuousEnumerationSettings;
@@ -321,9 +322,6 @@ public class HiveSourceBuilder {
         jobConf.set(
                 
HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM.key(),
                 String.valueOf(splitPartitionThreadNum));
-        jobConf.set(
-                HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX.key(),
-                
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX).toString());
         jobConf.set(
                 HiveOptions.TABLE_EXEC_HIVE_SPLIT_MAX_BYTES.key(),
                 String.valueOf(
@@ -341,7 +339,6 @@ public class HiveSourceBuilder {
         jobConf.set(
                 
HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM.key(),
                 String.valueOf(calPartitionSizeThreadNum));
-
         jobConf.setBoolean(
                 HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM.key(),
                 
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM));
@@ -351,6 +348,26 @@ public class HiveSourceBuilder {
                                 flinkConf.get(
                                         
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE))
                         .toUpperCase(Locale.ROOT));
+        configureInferSourceParallelismMax(flinkConf, jobConf);
+    }
+
+    /**
+     * If {@link HiveOptions#TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX} is 
not configured, the
+     * {@link 
BatchExecutionOptions#ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM} 
will be
+     * used as the upper bound for parallelism inference in dynamic 
parallelism inference mode.
+     */
+    private void configureInferSourceParallelismMax(ReadableConfig flinkConf, 
JobConf jobConf) {
+        if (flinkConf
+                        
.getOptional(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX)
+                        .isPresent()
+                || 
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE)
+                        == HiveOptions.InferMode.STATIC) {
+            jobConf.set(
+                    
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX.key(),
+                    flinkConf
+                            
.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX)
+                            .toString());
+        }
     }
 
     private boolean isStreamingSource() {
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceTest.java
index 3c08dbbb873..e7244edceaf 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceTest.java
@@ -111,26 +111,7 @@ class HiveSourceTest {
         ObjectPath tablePath2 = new ObjectPath("default", "hiveTbl1");
         createTable(tablePath2, hiveCatalog, true);
 
-        hiveSource =
-                new HiveSourceBuilder(
-                                new JobConf(hiveCatalog.getHiveConf()),
-                                new Configuration(),
-                                HiveShimLoader.getHiveVersion(),
-                                tablePath2.getDatabaseName(),
-                                tablePath2.getObjectName(),
-                                Collections.emptyMap())
-                        .setPartitions(
-                                partitionSpecs.stream()
-                                        .map(
-                                                spec ->
-                                                        
HiveTablePartition.ofPartition(
-                                                                
hiveCatalog.getHiveConf(),
-                                                                
hiveCatalog.getHiveVersion(),
-                                                                
tablePath2.getDatabaseName(),
-                                                                
tablePath2.getObjectName(),
-                                                                new 
LinkedHashMap<>(spec)))
-                                        .collect(Collectors.toList()))
-                        .buildWithDefaultBulkFormat();
+        hiveSource = createHiveSourceWithPartition(tablePath2, new 
Configuration(), -1, null);
 
         // test inferred parallelism less than maxParallelism
         context = genDynamicParallelismContext(10, Collections.emptyList());
@@ -149,26 +130,7 @@ class HiveSourceTest {
         createTable(tablePath, hiveCatalog, true);
 
         HiveSource<RowData> hiveSource =
-                new HiveSourceBuilder(
-                                new JobConf(hiveCatalog.getHiveConf()),
-                                new Configuration(),
-                                HiveShimLoader.getHiveVersion(),
-                                tablePath.getDatabaseName(),
-                                tablePath.getObjectName(),
-                                Collections.emptyMap())
-                        .setPartitions(
-                                partitionSpecs.stream()
-                                        .map(
-                                                spec ->
-                                                        
HiveTablePartition.ofPartition(
-                                                                
hiveCatalog.getHiveConf(),
-                                                                
hiveCatalog.getHiveVersion(),
-                                                                
tablePath.getDatabaseName(),
-                                                                
tablePath.getObjectName(),
-                                                                new 
LinkedHashMap<>(spec)))
-                                        .collect(Collectors.toList()))
-                        .setLimit(1L)
-                        .buildWithDefaultBulkFormat();
+                createHiveSourceWithPartition(tablePath, new Configuration(), 
1L, null);
 
         // test inferred parallelism less than maxParallelism
         DynamicParallelismInference.Context context =
@@ -184,14 +146,45 @@ class HiveSourceTest {
         createTable(tablePath, hiveCatalog, true);
 
         HiveSource<RowData> hiveSource =
+                createHiveSourceWithPartition(tablePath, new Configuration(), 
-1, keys);
+
+        DynamicParallelismInference.Context context =
+                genDynamicParallelismContext(10, Arrays.asList(1, 2));
+
+        assertThat(hiveSource.inferParallelism(context)).isEqualTo(2);
+        hiveCatalog.dropTable(tablePath, false);
+    }
+
+    @Test
+    void testDynamicParallelismInferenceWithSettingMaxParallelism() throws 
Exception {
+        ObjectPath tablePath = new ObjectPath("default", "hiveTbl4");
+        createTable(tablePath, hiveCatalog, true);
+
+        Configuration configuration = new Configuration();
+        
configuration.set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX, 1);
+        HiveSource<RowData> hiveSource =
+                createHiveSourceWithPartition(tablePath, configuration, -1, 
null);
+
+        DynamicParallelismInference.Context context =
+                genDynamicParallelismContext(10, Collections.emptyList());
+        assertThat(hiveSource.inferParallelism(context)).isEqualTo(1);
+
+        hiveCatalog.dropTable(tablePath, false);
+    }
+
+    private HiveSource<RowData> createHiveSourceWithPartition(
+            ObjectPath tablePath,
+            Configuration config,
+            long limit,
+            List<String> dynamicFilterPartitionKeys) {
+        HiveSourceBuilder hiveSourceBuilder =
                 new HiveSourceBuilder(
                                 new JobConf(hiveCatalog.getHiveConf()),
-                                new Configuration(),
+                                config,
                                 HiveShimLoader.getHiveVersion(),
                                 tablePath.getDatabaseName(),
                                 tablePath.getObjectName(),
                                 Collections.emptyMap())
-                        .setDynamicFilterPartitionKeys(keys)
                         .setPartitions(
                                 partitionSpecs.stream()
                                         .map(
@@ -202,14 +195,16 @@ class HiveSourceTest {
                                                                 
tablePath.getDatabaseName(),
                                                                 
tablePath.getObjectName(),
                                                                 new 
LinkedHashMap<>(spec)))
-                                        .collect(Collectors.toList()))
-                        .buildWithDefaultBulkFormat();
+                                        .collect(Collectors.toList()));
+        if (limit != -1) {
+            hiveSourceBuilder.setLimit(limit);
+        }
 
-        DynamicParallelismInference.Context context =
-                genDynamicParallelismContext(10, Arrays.asList(1, 2));
+        if (dynamicFilterPartitionKeys != null) {
+            
hiveSourceBuilder.setDynamicFilterPartitionKeys(dynamicFilterPartitionKeys);
+        }
 
-        assertThat(hiveSource.inferParallelism(context)).isEqualTo(2);
-        hiveCatalog.dropTable(tablePath, false);
+        return hiveSourceBuilder.buildWithDefaultBulkFormat();
     }
 
     private void createTable(ObjectPath tablePath, HiveCatalog hiveCatalog, 
boolean isPartitioned)

Reply via email to