This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 01c3fd67ac4 [FLINK-35656][hive] Fix the issue that Hive Source incorrectly set max parallelism in dynamic inference mode 01c3fd67ac4 is described below commit 01c3fd67ac46898bd520477ae861cd29cceaa636 Author: sunxia <xingbe...@gmail.com> AuthorDate: Thu Jun 20 15:35:20 2024 +0800 [FLINK-35656][hive] Fix the issue that Hive Source incorrectly set max parallelism in dynamic inference mode This closes #24962. --- .../HiveDynamicParallelismInferenceFactory.java | 12 +++ .../flink/connectors/hive/HiveSourceBuilder.java | 25 +++++- .../flink/connectors/hive/HiveSourceTest.java | 91 ++++++++++------------ 3 files changed, 76 insertions(+), 52 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicParallelismInferenceFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicParallelismInferenceFactory.java index b14e0628954..4932bbee8b4 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicParallelismInferenceFactory.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicParallelismInferenceFactory.java @@ -62,6 +62,18 @@ class HiveDynamicParallelismInferenceFactory implements HiveParallelismInference globalMaxParallelism), globalMaxParallelism); int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; + adjustInferMaxParallelism(jobConf, inferMaxParallelism); return new HiveParallelismInference(tablePath, infer, inferMaxParallelism, parallelism); } + + /** + * Reset infer source max parallelism in jobConf, and {@link + * HiveSourceFileEnumerator#createInputSplits} will infer InputSplits based on the + * inferMaxParallelism. + */ + private void adjustInferMaxParallelism(JobConf jobConf, int inferMaxParallelism) { + jobConf.set( + HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX.key(), + String.valueOf(inferMaxParallelism)); + } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java index 28b4565047d..84c9777985c 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java @@ -19,6 +19,7 @@ package org.apache.flink.connectors.hive; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.BatchExecutionOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.connector.file.src.ContinuousEnumerationSettings; @@ -321,9 +322,6 @@ public class HiveSourceBuilder { jobConf.set( HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM.key(), String.valueOf(splitPartitionThreadNum)); - jobConf.set( - HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX.key(), - flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX).toString()); jobConf.set( HiveOptions.TABLE_EXEC_HIVE_SPLIT_MAX_BYTES.key(), String.valueOf( @@ -341,7 +339,6 @@ public class HiveSourceBuilder { jobConf.set( HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM.key(), String.valueOf(calPartitionSizeThreadNum)); - jobConf.setBoolean( HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM.key(), flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM)); @@ -351,6 +348,26 @@ public class HiveSourceBuilder { flinkConf.get( HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE)) .toUpperCase(Locale.ROOT)); + configureInferSourceParallelismMax(flinkConf, jobConf); + } + + /** + * If {@link HiveOptions#TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX} is not configured, the + * {@link BatchExecutionOptions#ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM} will be + * used as the upper bound for parallelism inference in dynamic parallelism inference mode. + */ + private void configureInferSourceParallelismMax(ReadableConfig flinkConf, JobConf jobConf) { + if (flinkConf + .getOptional(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX) + .isPresent() + || flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE) + == HiveOptions.InferMode.STATIC) { + jobConf.set( + HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX.key(), + flinkConf + .get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX) + .toString()); + } } private boolean isStreamingSource() { diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceTest.java index 3c08dbbb873..e7244edceaf 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceTest.java @@ -111,26 +111,7 @@ class HiveSourceTest { ObjectPath tablePath2 = new ObjectPath("default", "hiveTbl1"); createTable(tablePath2, hiveCatalog, true); - hiveSource = - new HiveSourceBuilder( - new JobConf(hiveCatalog.getHiveConf()), - new Configuration(), - HiveShimLoader.getHiveVersion(), - tablePath2.getDatabaseName(), - tablePath2.getObjectName(), - Collections.emptyMap()) - .setPartitions( - partitionSpecs.stream() - .map( - spec -> - HiveTablePartition.ofPartition( - hiveCatalog.getHiveConf(), - hiveCatalog.getHiveVersion(), - tablePath2.getDatabaseName(), - tablePath2.getObjectName(), - new LinkedHashMap<>(spec))) - .collect(Collectors.toList())) - .buildWithDefaultBulkFormat(); + hiveSource = createHiveSourceWithPartition(tablePath2, new Configuration(), -1, null); // test inferred parallelism less than maxParallelism context = genDynamicParallelismContext(10, Collections.emptyList()); @@ -149,26 +130,7 @@ class HiveSourceTest { createTable(tablePath, hiveCatalog, true); HiveSource<RowData> hiveSource = - new HiveSourceBuilder( - new JobConf(hiveCatalog.getHiveConf()), - new Configuration(), - HiveShimLoader.getHiveVersion(), - tablePath.getDatabaseName(), - tablePath.getObjectName(), - Collections.emptyMap()) - .setPartitions( - partitionSpecs.stream() - .map( - spec -> - HiveTablePartition.ofPartition( - hiveCatalog.getHiveConf(), - hiveCatalog.getHiveVersion(), - tablePath.getDatabaseName(), - tablePath.getObjectName(), - new LinkedHashMap<>(spec))) - .collect(Collectors.toList())) - .setLimit(1L) - .buildWithDefaultBulkFormat(); + createHiveSourceWithPartition(tablePath, new Configuration(), 1L, null); // test inferred parallelism less than maxParallelism DynamicParallelismInference.Context context = @@ -184,14 +146,45 @@ class HiveSourceTest { createTable(tablePath, hiveCatalog, true); HiveSource<RowData> hiveSource = + createHiveSourceWithPartition(tablePath, new Configuration(), -1, keys); + + DynamicParallelismInference.Context context = + genDynamicParallelismContext(10, Arrays.asList(1, 2)); + + assertThat(hiveSource.inferParallelism(context)).isEqualTo(2); + hiveCatalog.dropTable(tablePath, false); + } + + @Test + void testDynamicParallelismInferenceWithSettingMaxParallelism() throws Exception { + ObjectPath tablePath = new ObjectPath("default", "hiveTbl4"); + createTable(tablePath, hiveCatalog, true); + + Configuration configuration = new Configuration(); + configuration.set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX, 1); + HiveSource<RowData> hiveSource = + createHiveSourceWithPartition(tablePath, configuration, -1, null); + + DynamicParallelismInference.Context context = + genDynamicParallelismContext(10, Collections.emptyList()); + assertThat(hiveSource.inferParallelism(context)).isEqualTo(1); + + hiveCatalog.dropTable(tablePath, false); + } + + private HiveSource<RowData> createHiveSourceWithPartition( + ObjectPath tablePath, + Configuration config, + long limit, + List<String> dynamicFilterPartitionKeys) { + HiveSourceBuilder hiveSourceBuilder = new HiveSourceBuilder( new JobConf(hiveCatalog.getHiveConf()), - new Configuration(), + config, HiveShimLoader.getHiveVersion(), tablePath.getDatabaseName(), tablePath.getObjectName(), Collections.emptyMap()) - .setDynamicFilterPartitionKeys(keys) .setPartitions( partitionSpecs.stream() .map( @@ -202,14 +195,16 @@ class HiveSourceTest { tablePath.getDatabaseName(), tablePath.getObjectName(), new LinkedHashMap<>(spec))) - .collect(Collectors.toList())) - .buildWithDefaultBulkFormat(); + .collect(Collectors.toList())); + if (limit != -1) { + hiveSourceBuilder.setLimit(limit); + } - DynamicParallelismInference.Context context = - genDynamicParallelismContext(10, Arrays.asList(1, 2)); + if (dynamicFilterPartitionKeys != null) { + hiveSourceBuilder.setDynamicFilterPartitionKeys(dynamicFilterPartitionKeys); + } - assertThat(hiveSource.inferParallelism(context)).isEqualTo(2); - hiveCatalog.dropTable(tablePath, false); + return hiveSourceBuilder.buildWithDefaultBulkFormat(); } private void createTable(ObjectPath tablePath, HiveCatalog hiveCatalog, boolean isPartitioned)