This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new ddb5a5355f9 [FLINK-35293][hive] Hive source supports dynamic parallelism inference ddb5a5355f9 is described below commit ddb5a5355f9aca3d223f1fff6581d83dd317c2de Author: sunxia <xingbe...@gmail.com> AuthorDate: Tue May 7 14:49:17 2024 +0800 [FLINK-35293][hive] Hive source supports dynamic parallelism inference --- .../docs/connectors/table/hive/hive_read_write.md | 15 +- .../docs/connectors/table/hive/hive_read_write.md | 17 +- .../connector/file/src/AbstractFileSource.java | 2 +- .../connector/file/table/LimitableBulkFormat.java | 4 + .../HiveDynamicParallelismInferenceFactory.java | 67 +++++ .../apache/flink/connectors/hive/HiveOptions.java | 53 +++- .../connectors/hive/HiveParallelismInference.java | 33 ++- .../apache/flink/connectors/hive/HiveSource.java | 55 +++- .../flink/connectors/hive/HiveSourceBuilder.java | 11 + .../hive/HiveSourceDynamicFileEnumerator.java | 3 +- .../connectors/hive/HiveSourceFileEnumerator.java | 4 + .../HiveStaticParallelismInferenceFactory.java | 64 +++++ .../flink/connectors/hive/HiveTableSource.java | 3 +- .../hive/HiveSourceDynamicFileEnumeratorTest.java | 17 +- .../flink/connectors/hive/HiveSourceITCase.java | 2 +- .../flink/connectors/hive/HiveSourceTest.java | 309 +++++++++++++++++++++ .../flink/table/catalog/hive/HiveTestUtils.java | 21 ++ 17 files changed, 632 insertions(+), 48 deletions(-) diff --git a/docs/content.zh/docs/connectors/table/hive/hive_read_write.md b/docs/content.zh/docs/connectors/table/hive/hive_read_write.md index bdd3ac4f06d..64f9f7dc8f0 100644 --- a/docs/content.zh/docs/connectors/table/hive/hive_read_write.md +++ b/docs/content.zh/docs/connectors/table/hive/hive_read_write.md @@ -136,16 +136,21 @@ Flink 允许你灵活的配置并发推断策略。你可以在 `TableConfig` </thead> <tbody> <tr> - <td><h5>table.exec.hive.infer-source-parallelism</h5></td> - <td style="word-wrap: break-word;">true</td> - <td>Boolean</td> - <td>如果是 true,会根据 split 的数量推断 source 的并发度。如果是 false,source 的并发度由配置决定。</td> + <td><h5>table.exec.hive.infer-source-parallelism.mode</h5></td> + <td style="word-wrap: break-word;">dynamic</td> + <td>InferMode</td> + <td>选择Hive Source并行度推断模式的选项,根据splits数推断并行度。 + 'static' 代表静态推断,它会在作业创建阶段推断Source并行度。 + 'dynamic' 代表动态推断,它会在作业执行阶段利用运行时信息更准确地推断Source并行度。 + 'none' 代表禁用并行度推断。 + 注意,它仍然受到已弃用选项 'table.exec.hive.infer-source-parallelism' 的影响,需要其值为 true 才能启用并行度推断。 + </td> </tr> <tr> <td><h5>table.exec.hive.infer-source-parallelism.max</h5></td> <td style="word-wrap: break-word;">1000</td> <td>Integer</td> - <td>设置 source operator 推断的最大并发度。</td> + <td>设置 source operator 推断的最大并发度。请注意,默认值仅在静态并行性推断模式下有效。</td> </tr> </tbody> </table> diff --git a/docs/content/docs/connectors/table/hive/hive_read_write.md b/docs/content/docs/connectors/table/hive/hive_read_write.md index 98742a92be7..9a9834b6579 100644 --- a/docs/content/docs/connectors/table/hive/hive_read_write.md +++ b/docs/content/docs/connectors/table/hive/hive_read_write.md @@ -152,16 +152,23 @@ following parameters in `TableConfig` (note that these parameters affect all sou </thead> <tbody> <tr> - <td><h5>table.exec.hive.infer-source-parallelism</h5></td> - <td style="word-wrap: break-word;">true</td> - <td>Boolean</td> - <td>If is true, source parallelism is inferred according to splits number. If is false, parallelism of source are set by config.</td> + <td><h5>table.exec.hive.infer-source-parallelism.mode</h5></td> + <td style="word-wrap: break-word;">dynamic</td> + <td>InferMode</td> + <td>An option for selecting the hive source parallelism inference mode to infer parallelism according to splits number. + 'static' represents static inference, which will infer source parallelism at job creation stage. + 'dynamic' represents dynamic inference, which will infer parallelism at job execution stage and could more accurately infer the source parallelism. + 'none' represents disabling parallelism inference. + Note that it is still affected by the deprecated option 'table.exec.hive.infer-source-parallelism', requiring its value to be true for enabling parallelism inference. + </td> </tr> <tr> <td><h5>table.exec.hive.infer-source-parallelism.max</h5></td> <td style="word-wrap: break-word;">1000</td> <td>Integer</td> - <td>Sets max infer parallelism for source operator.</td> + <td>Sets max infer parallelism for source operator. + Note that the default value is effective only in the static parallelism inference mode. + </td> </tr> </tbody> </table> diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java index b14d46b3f9c..c9133172070 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java @@ -100,7 +100,7 @@ public abstract class AbstractFileSource<T, SplitT extends FileSourceSplit> // Getters // ------------------------------------------------------------------------ - FileEnumerator.Provider getEnumeratorFactory() { + protected FileEnumerator.Provider getEnumeratorFactory() { return enumeratorFactory; } diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/LimitableBulkFormat.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/LimitableBulkFormat.java index feaeb1d9842..2d95f557876 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/LimitableBulkFormat.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/LimitableBulkFormat.java @@ -84,6 +84,10 @@ public class LimitableBulkFormat<T, SplitT extends FileSourceSplit> return globalNumberRead != null && globalNumberRead.get() >= limit; } + public long getLimit() { + return this.limit; + } + @Override public boolean isSplittable() { return format.isSplittable(); diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicParallelismInferenceFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicParallelismInferenceFactory.java new file mode 100644 index 00000000000..b14e0628954 --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicParallelismInferenceFactory.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.hive; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.table.catalog.ObjectPath; + +import org.apache.hadoop.mapred.JobConf; + +/** + * The factory class for {@link HiveParallelismInference} to support Hive source dynamic parallelism + * inference. + */ +class HiveDynamicParallelismInferenceFactory implements HiveParallelismInference.Provider { + + private final ObjectPath tablePath; + private final JobConf jobConf; + private final int globalMaxParallelism; + + HiveDynamicParallelismInferenceFactory( + ObjectPath tablePath, JobConf jobConf, int globalMaxParallelism) { + this.tablePath = tablePath; + this.jobConf = jobConf; + this.globalMaxParallelism = globalMaxParallelism; + } + + @Override + public HiveParallelismInference create() { + boolean inferEnabled = + jobConf.getBoolean( + HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM.key(), + HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM.defaultValue()); + HiveOptions.InferMode inferMode = + jobConf.getEnum( + HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE.key(), + HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE.defaultValue()); + // This logic should be fixed if config option `table.exec.hive.infer-source-parallelism` + // is removed. + boolean infer = inferEnabled && inferMode == HiveOptions.InferMode.DYNAMIC; + int inferMaxParallelism = + Math.min( + (int) + jobConf.getLong( + HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX + .key(), + globalMaxParallelism), + globalMaxParallelism); + int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; + return new HiveParallelismInference(tablePath, infer, inferMaxParallelism, parallelism); + } +} diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java index 01e1493681d..c8588f58a76 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java @@ -18,6 +18,7 @@ package org.apache.flink.connectors.hive; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.DescribedEnum; import org.apache.flink.configuration.MemorySize; @@ -50,6 +51,8 @@ public class HiveOptions { "If it is true, flink will read the files of partitioned hive table from subdirectories under the partition directory to be read.\n" + "If it is false, an exception that 'not a file: xxx' will be thrown when the partition directory contains any sub-directory."); + /** @deprecated Use {@link #TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE} instead. */ + @Deprecated public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM = key("table.exec.hive.infer-source-parallelism") .booleanType() @@ -58,11 +61,33 @@ public class HiveOptions { "If is false, parallelism of source are set by config.\n" + "If is true, source parallelism is inferred according to splits number.\n"); + @PublicEvolving + public static final ConfigOption<InferMode> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE = + key("table.exec.hive.infer-source-parallelism.mode") + .enumType(InferMode.class) + .defaultValue(InferMode.DYNAMIC) + .withDescription( + Description.builder() + .text( + "An option for selecting the hive source parallelism inference mode to infer parallelism according to splits number.") + .list( + text( + "'static' represents static inference, which will infer source parallelism at job creation stage."), + text( + "'dynamic' represents dynamic inference, which will infer parallelism at job execution stage and could more accurately infer the source parallelism."), + text( + "'none' represents disabling parallelism inference."), + text( + "Note that it is still affected by the deprecated option 'table.exec.hive.infer-source-parallelism', requiring its value to be true for enabling parallelism inference.")) + .build()); + public static final ConfigOption<Integer> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX = key("table.exec.hive.infer-source-parallelism.max") .intType() .defaultValue(1000) - .withDescription("Sets max infer parallelism for source operator."); + .withDescription( + "Sets max infer parallelism for source operator. " + + "Note that the default value is effective only in the static parallelism inference mode."); public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER = key("table.exec.hive.fallback-mapred-writer") @@ -281,4 +306,30 @@ public class HiveOptions { return description; } } + + /** Infer mode used for {@link #TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE}. */ + public enum InferMode implements DescribedEnum { + STATIC("static", text("Static parallelism inference mode.")), + DYNAMIC("dynamic", text("Dynamic parallelism inference mode.")), + NONE("none", text("Disable parallelism inference.")); + + private final String value; + + private final InlineElement description; + + InferMode(String value, InlineElement description) { + this.value = value; + this.description = description; + } + + @Override + public String toString() { + return value; + } + + @Override + public InlineElement getDescription() { + return description; + } + } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java index 850008fe5d2..e46eb7f0d7b 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java @@ -18,10 +18,7 @@ package org.apache.flink.connectors.hive; -import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.catalog.ObjectPath; -import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.SupplierWithException; import org.slf4j.Logger; @@ -40,18 +37,12 @@ class HiveParallelismInference { private int parallelism; - HiveParallelismInference(ObjectPath tablePath, ReadableConfig flinkConf) { + HiveParallelismInference( + ObjectPath tablePath, boolean infer, int inferMaxParallelism, int parallelism) { this.tablePath = tablePath; - this.infer = flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM); - this.inferMaxParallelism = - flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX); - Preconditions.checkArgument( - inferMaxParallelism >= 1, - HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX.key() - + " cannot be less than 1"); - - this.parallelism = - flinkConf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM); + this.infer = infer; + this.inferMaxParallelism = inferMaxParallelism; + this.parallelism = parallelism; } /** @@ -73,7 +64,8 @@ class HiveParallelismInference { /** * Infer parallelism by number of files and number of splits. If {@link - * HiveOptions#TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM} is not set this method does nothing. + * HiveOptions#TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM} is false or {@link + * HiveOptions#TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE} is none, this method does nothing. */ HiveParallelismInference infer( SupplierWithException<Integer, IOException> numFiles, @@ -113,4 +105,15 @@ class HiveParallelismInference { result); return result; } + + /** Factory for the {@code HiveParallelismInference}. */ + interface Provider { + + /** + * Creates a new {@code HiveParallelismInference}. + * + * @return a new {@code HiveParallelismInference} with designated factors. + */ + HiveParallelismInference create(); + } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java index 5b768b0320a..6c423e747c8 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java @@ -20,6 +20,8 @@ package org.apache.flink.connectors.hive; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.DynamicFilteringInfo; +import org.apache.flink.api.connector.source.DynamicParallelismInference; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.connector.file.src.AbstractFileSource; @@ -29,10 +31,12 @@ import org.apache.flink.connector.file.src.assigners.FileSplitAssigner; import org.apache.flink.connector.file.src.enumerate.FileEnumerator; import org.apache.flink.connector.file.src.reader.BulkFormat; import org.apache.flink.connector.file.table.ContinuousPartitionFetcher; +import org.apache.flink.connector.file.table.LimitableBulkFormat; import org.apache.flink.connectors.hive.read.HiveSourceSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.connector.source.DynamicFilteringEvent; import org.apache.flink.table.data.RowData; import org.apache.flink.util.Preconditions; @@ -55,7 +59,8 @@ import java.util.List; * @param <T> the type of record returned by this source */ @PublicEvolving -public class HiveSource<T> extends AbstractFileSource<T, HiveSourceSplit> { +public class HiveSource<T> extends AbstractFileSource<T, HiveSourceSplit> + implements DynamicParallelismInference { private static final long serialVersionUID = 1L; @@ -68,6 +73,7 @@ public class HiveSource<T> extends AbstractFileSource<T, HiveSourceSplit> { private final ContinuousPartitionFetcher<Partition, ?> fetcher; private final HiveTableSource.HiveContinuousPartitionFetcherContext<?> fetcherContext; private final ObjectPath tablePath; + private Long limit = null; HiveSource( Path[] inputPaths, @@ -97,6 +103,9 @@ public class HiveSource<T> extends AbstractFileSource<T, HiveSourceSplit> { this.partitionBytes = partitionBytes; this.fetcher = fetcher; this.fetcherContext = fetcherContext; + if (readerFormat instanceof LimitableBulkFormat) { + limit = ((LimitableBulkFormat<?, ?>) readerFormat).getLimit(); + } } @Override @@ -186,4 +195,48 @@ public class HiveSource<T> extends AbstractFileSource<T, HiveSourceSplit> { jobConfWrapper), getAssignerFactory()); } + + @Override + public int inferParallelism(Context dynamicParallelismContext) { + FileEnumerator fileEnumerator; + List<HiveTablePartition> partitions; + if (dynamicFilterPartitionKeys != null) { + fileEnumerator = + new HiveSourceDynamicFileEnumerator.Provider( + tablePath.getFullName(), + dynamicFilterPartitionKeys, + partitionBytes, + hiveVersion, + jobConfWrapper) + .create(); + if (dynamicParallelismContext.getDynamicFilteringInfo().isPresent()) { + DynamicFilteringInfo dynamicFilteringInfo = + dynamicParallelismContext.getDynamicFilteringInfo().get(); + if (dynamicFilteringInfo instanceof DynamicFilteringEvent) { + ((HiveSourceDynamicFileEnumerator) fileEnumerator) + .setDynamicFilteringData( + ((DynamicFilteringEvent) dynamicFilteringInfo).getData()); + } + } + partitions = ((HiveSourceDynamicFileEnumerator) fileEnumerator).getFinalPartitions(); + } else { + fileEnumerator = getEnumeratorFactory().create(); + partitions = ((HiveSourceFileEnumerator) fileEnumerator).getPartitions(); + } + + return new HiveDynamicParallelismInferenceFactory( + tablePath, + jobConfWrapper.conf(), + dynamicParallelismContext.getParallelismInferenceUpperBound()) + .create() + .infer( + () -> + HiveSourceFileEnumerator.getNumFiles( + partitions, jobConfWrapper.conf()), + () -> + HiveSourceFileEnumerator.createInputSplits( + 0, partitions, jobConfWrapper.conf(), true) + .size()) + .limit(limit); + } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java index f04e8c514f2..28b4565047d 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java @@ -60,6 +60,7 @@ import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import static org.apache.flink.connector.file.src.FileSource.DEFAULT_SPLIT_ASSIGNER; @@ -340,6 +341,16 @@ public class HiveSourceBuilder { jobConf.set( HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM.key(), String.valueOf(calPartitionSizeThreadNum)); + + jobConf.setBoolean( + HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM.key(), + flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM)); + jobConf.set( + HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE.key(), + String.valueOf( + flinkConf.get( + HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE)) + .toUpperCase(Locale.ROOT)); } private boolean isStreamingSource() { diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumerator.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumerator.java index 4d84fd5e048..1d592cf4d33 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumerator.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumerator.java @@ -177,8 +177,7 @@ public class HiveSourceDynamicFileEnumerator implements DynamicFileEnumerator { minDesiredSplits, finalPartitions, jobConf, false)); } - @VisibleForTesting - List<HiveTablePartition> getFinalPartitions() { + public List<HiveTablePartition> getFinalPartitions() { return finalPartitions; } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java index c30cd1c8e1b..500fe54d979 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java @@ -202,6 +202,10 @@ public class HiveSourceFileEnumerator implements FileEnumerator { return numFiles; } + public List<HiveTablePartition> getPartitions() { + return this.partitions; + } + private static long getSplitMaxSize(JobConf jobConf) { return jobConf.getLong( HiveOptions.TABLE_EXEC_HIVE_SPLIT_MAX_BYTES.key(), diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveStaticParallelismInferenceFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveStaticParallelismInferenceFactory.java new file mode 100644 index 00000000000..58a20e498d6 --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveStaticParallelismInferenceFactory.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.hive; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.util.Preconditions; + +/** + * The factory class for {@link HiveParallelismInference} to support Hive source static parallelism + * inference. + */ +class HiveStaticParallelismInferenceFactory implements HiveParallelismInference.Provider { + + private final ObjectPath tablePath; + private final ReadableConfig flinkConf; + + HiveStaticParallelismInferenceFactory(ObjectPath tablePath, ReadableConfig flinkConf) { + this.tablePath = tablePath; + this.flinkConf = flinkConf; + } + + @Override + public HiveParallelismInference create() { + boolean inferEnabled = flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM); + HiveOptions.InferMode inferMode = + flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE); + // This logic should be fixed if config option `table.exec.hive.infer-source-parallelism` + // is removed. + boolean infer = inferEnabled && inferMode == HiveOptions.InferMode.STATIC; + int inferMaxParallelism = + flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX); + Preconditions.checkArgument( + inferMaxParallelism >= 1, + HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX.key() + + " cannot be less than 1"); + int parallelism = + flinkConf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM); + // Keeping the parallelism unset is a prerequisite for dynamic parallelism inference. + if (inferEnabled && inferMode == HiveOptions.InferMode.DYNAMIC) { + parallelism = ExecutionConfig.PARALLELISM_DEFAULT; + } + + return new HiveParallelismInference(tablePath, infer, inferMaxParallelism, parallelism); + } +} diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java index 91931300217..f174d107cb0 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java @@ -171,7 +171,8 @@ public class HiveTableSource remainingPartitions); int parallelism = - new HiveParallelismInference(tablePath, flinkConf) + new HiveStaticParallelismInferenceFactory(tablePath, flinkConf) + .create() .infer( () -> HiveSourceFileEnumerator.getNumFiles( diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumeratorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumeratorTest.java index 93ddb42b339..58c7b0b2170 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumeratorTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumeratorTest.java @@ -18,10 +18,8 @@ package org.apache.flink.connectors.hive; -import org.apache.flink.api.common.serialization.SerializerConfigImpl; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.table.catalog.hive.client.HiveShimLoader; import org.apache.flink.table.connector.source.DynamicFilteringData; import org.apache.flink.table.data.GenericRowData; @@ -47,8 +45,6 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.mapred.JobConf; import org.junit.jupiter.api.Test; -import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.time.LocalDate; import java.time.LocalDateTime; import java.util.ArrayList; @@ -60,6 +56,7 @@ import java.util.Map; import java.util.Properties; import java.util.stream.Collectors; +import static org.apache.flink.table.catalog.hive.HiveTestUtils.serialize; import static org.apache.flink.table.data.StringData.fromString; import static org.assertj.core.api.Assertions.assertThat; @@ -215,16 +212,4 @@ class HiveSourceDynamicFileEnumeratorTest { HiveShimLoader.getHiveVersion(), new JobConf()); } - - private byte[] serialize(TypeInformation<RowData> typeInfo, RowData row) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try { - typeInfo.createSerializer(new SerializerConfigImpl()) - .serialize(row, new DataOutputViewStreamWrapper(baos)); - } catch (IOException e) { - // throw as RuntimeException so the function can use in lambda - throw new RuntimeException(e); - } - return baos.toByteArray(); - } } diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceITCase.java index 8cb63de76b0..1fb37bacddb 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceITCase.java @@ -50,7 +50,7 @@ import static org.apache.flink.table.catalog.hive.util.Constants.IDENTIFIER; import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; import static org.assertj.core.api.Assertions.assertThat; -/** Tests for {@link HiveSource}. */ +/** IT case for {@link HiveSource}. */ public class HiveSourceITCase { private static HiveCatalog hiveCatalog; diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceTest.java new file mode 100644 index 00000000000..3c08dbbb873 --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceTest.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.hive; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.DynamicFilteringInfo; +import org.apache.flink.api.connector.source.DynamicParallelismInference; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.hive.HiveCatalog; +import org.apache.flink.table.catalog.hive.HiveTestUtils; +import org.apache.flink.table.catalog.hive.client.HiveShimLoader; +import org.apache.flink.table.connector.source.DynamicFilteringData; +import org.apache.flink.table.connector.source.DynamicFilteringEvent; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.hadoop.mapred.JobConf; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import static org.apache.flink.table.catalog.hive.util.Constants.IDENTIFIER; +import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link HiveSource}. */ +class HiveSourceTest { + + private static HiveCatalog hiveCatalog; + + private static final List<String> keys = Collections.singletonList("p"); + + private static final List<Map<String, String>> partitionSpecs = + Arrays.asList( + Collections.singletonMap("p", "1"), + Collections.singletonMap("p", "2"), + Collections.singletonMap("p", "3")); + + @BeforeAll + static void setup() throws Exception { + hiveCatalog = HiveTestUtils.createHiveCatalog(); + hiveCatalog.open(); + } + + @AfterAll + static void tearDown() { + if (hiveCatalog != null) { + hiveCatalog.close(); + } + } + + @Test + void testDynamicParallelismInference() throws Exception { + // test non-partitioned table + ObjectPath tablePath1 = new ObjectPath("default", "hiveNonPartTbl"); + createTable(tablePath1, hiveCatalog, false); + + HiveSource<RowData> hiveSource = + new HiveSourceBuilder( + new JobConf(hiveCatalog.getHiveConf()), + new Configuration(), + HiveShimLoader.getHiveVersion(), + tablePath1.getDatabaseName(), + tablePath1.getObjectName(), + Collections.emptyMap()) + .buildWithDefaultBulkFormat(); + + DynamicParallelismInference.Context context = + genDynamicParallelismContext(10, Collections.emptyList()); + assertThat(hiveSource.inferParallelism(context)).isEqualTo(1); + + hiveCatalog.dropTable(tablePath1, false); + + // test partitioned table + ObjectPath tablePath2 = new ObjectPath("default", "hiveTbl1"); + createTable(tablePath2, hiveCatalog, true); + + hiveSource = + new HiveSourceBuilder( + new JobConf(hiveCatalog.getHiveConf()), + new Configuration(), + HiveShimLoader.getHiveVersion(), + tablePath2.getDatabaseName(), + tablePath2.getObjectName(), + Collections.emptyMap()) + .setPartitions( + partitionSpecs.stream() + .map( + spec -> + HiveTablePartition.ofPartition( + hiveCatalog.getHiveConf(), + hiveCatalog.getHiveVersion(), + tablePath2.getDatabaseName(), + tablePath2.getObjectName(), + new LinkedHashMap<>(spec))) + .collect(Collectors.toList())) + .buildWithDefaultBulkFormat(); + + // test inferred parallelism less than maxParallelism + context = genDynamicParallelismContext(10, Collections.emptyList()); + assertThat(hiveSource.inferParallelism(context)).isEqualTo(3); + + // test inferred parallelism larger than maxParallelism + context = genDynamicParallelismContext(2, Collections.emptyList()); + assertThat(hiveSource.inferParallelism(context)).isEqualTo(2); + + hiveCatalog.dropTable(tablePath2, false); + } + + @Test + void testDynamicParallelismInferenceWithLimit() throws Exception { + ObjectPath tablePath = new ObjectPath("default", "hiveTbl2"); + createTable(tablePath, hiveCatalog, true); + + HiveSource<RowData> hiveSource = + new HiveSourceBuilder( + new JobConf(hiveCatalog.getHiveConf()), + new Configuration(), + HiveShimLoader.getHiveVersion(), + tablePath.getDatabaseName(), + tablePath.getObjectName(), + Collections.emptyMap()) + .setPartitions( + partitionSpecs.stream() + .map( + spec -> + HiveTablePartition.ofPartition( + hiveCatalog.getHiveConf(), + hiveCatalog.getHiveVersion(), + tablePath.getDatabaseName(), + tablePath.getObjectName(), + new LinkedHashMap<>(spec))) + .collect(Collectors.toList())) + .setLimit(1L) + .buildWithDefaultBulkFormat(); + + // test inferred parallelism less than maxParallelism + DynamicParallelismInference.Context context = + genDynamicParallelismContext(10, Collections.emptyList()); + assertThat(hiveSource.inferParallelism(context)).isEqualTo(1); + + hiveCatalog.dropTable(tablePath, false); + } + + @Test + void testDynamicParallelismInferenceWithFiltering() throws Exception { + ObjectPath tablePath = new ObjectPath("default", "hiveTbl3"); + createTable(tablePath, hiveCatalog, true); + + HiveSource<RowData> hiveSource = + new HiveSourceBuilder( + new JobConf(hiveCatalog.getHiveConf()), + new Configuration(), + HiveShimLoader.getHiveVersion(), + tablePath.getDatabaseName(), + tablePath.getObjectName(), + Collections.emptyMap()) + .setDynamicFilterPartitionKeys(keys) + .setPartitions( + partitionSpecs.stream() + .map( + spec -> + HiveTablePartition.ofPartition( + hiveCatalog.getHiveConf(), + hiveCatalog.getHiveVersion(), + tablePath.getDatabaseName(), + tablePath.getObjectName(), + new LinkedHashMap<>(spec))) + .collect(Collectors.toList())) + .buildWithDefaultBulkFormat(); + + DynamicParallelismInference.Context context = + genDynamicParallelismContext(10, Arrays.asList(1, 2)); + + assertThat(hiveSource.inferParallelism(context)).isEqualTo(2); + hiveCatalog.dropTable(tablePath, false); + } + + private void createTable(ObjectPath tablePath, HiveCatalog hiveCatalog, boolean isPartitioned) + throws Exception { + Map<String, String> tableOptions = new HashMap<>(); + tableOptions.put(CONNECTOR.key(), IDENTIFIER); + + List<Column> partitionTableColumns = new ArrayList<>(); + partitionTableColumns.add(Column.physical("i", DataTypes.INT())); + if (isPartitioned) { + HiveSourceTest.keys.stream() + .map(key -> Column.physical(key, DataTypes.STRING())) + .forEach(partitionTableColumns::add); + } + ResolvedSchema partitionTableRSchema = ResolvedSchema.of(partitionTableColumns); + + hiveCatalog.createTable( + tablePath, + new ResolvedCatalogTable( + CatalogTable.of( + Schema.newBuilder() + .fromResolvedSchema(partitionTableRSchema) + .build(), + null, + isPartitioned ? keys : Collections.emptyList(), + tableOptions), + partitionTableRSchema), + false); + + if (isPartitioned) { + HiveSourceTest.partitionSpecs.forEach( + spec -> { + try { + HiveTestUtils.createTextTableInserter( + hiveCatalog, + tablePath.getDatabaseName(), + tablePath.getObjectName()) + .addRow(new Object[] {1}) + .addRow(new Object[] {2}) + .commit( + spec.keySet().iterator().next() + + "='" + + spec.values().iterator().next() + + "'"); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } else { + HiveTestUtils.createTextTableInserter( + hiveCatalog, tablePath.getDatabaseName(), tablePath.getObjectName()) + .addRow(new Object[] {1}) + .addRow(new Object[] {2}) + .commit(); + } + } + + private DynamicParallelismInference.Context genDynamicParallelismContext( + int maxParallelism, List<Integer> filteringPartitions) { + return new DynamicParallelismInference.Context() { + @Override + public Optional<DynamicFilteringInfo> getDynamicFilteringInfo() { + RowType rowType = RowType.of(new IntType()); + TypeInformation<RowData> rowTypeInfo = InternalTypeInfo.of(rowType); + if (!filteringPartitions.isEmpty()) { + List<byte[]> serializedRows = + filteringPartitions.stream() + .map( + key -> { + GenericRowData filteringRow = new GenericRowData(1); + filteringRow.setField(0, key); + return HiveTestUtils.serialize( + rowTypeInfo, filteringRow); + }) + .collect(Collectors.toList()); + + DynamicFilteringData data = + new DynamicFilteringData( + InternalTypeInfo.of(rowType), rowType, serializedRows, true); + return Optional.of(new DynamicFilteringEvent(data)); + } else { + return Optional.empty(); + } + } + + @Override + public int getParallelismInferenceUpperBound() { + return maxParallelism; + } + + @Override + public long getDataVolumePerTask() { + return 10L; + } + }; + } +} diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java index 2cefd641ae9..80bb609ea85 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java @@ -18,10 +18,14 @@ package org.apache.flink.table.catalog.hive; +import org.apache.flink.api.common.serialization.SerializerConfigImpl; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.BatchExecutionOptions; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MemorySize; +import org.apache.flink.connectors.hive.HiveOptions; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Schema; @@ -33,6 +37,7 @@ import org.apache.flink.table.catalog.CatalogTest; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.catalog.hive.client.HiveShimLoader; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.delegation.Parser; import org.apache.flink.table.operations.ddl.AddPartitionsOperation; import org.apache.flink.table.types.DataType; @@ -47,6 +52,7 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.junit.rules.TemporaryFolder; import java.io.BufferedWriter; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileWriter; import java.io.IOException; @@ -59,6 +65,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; +import static org.apache.flink.connectors.hive.HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE; import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM; /** Test utils for Hive connector. */ @@ -158,6 +165,8 @@ public class HiveTestUtils { public static TableEnvironment createTableEnvInBatchMode(SqlDialect dialect) { TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode()); tableEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + tableEnv.getConfig() + .set(TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE, HiveOptions.InferMode.STATIC); tableEnv.getConfig().setSqlDialect(dialect); return tableEnv; } @@ -204,6 +213,18 @@ public class HiveTestUtils { return new TextTableInserter(hiveCatalog, dbName, tableName); } + public static byte[] serialize(TypeInformation<RowData> typeInfo, RowData row) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + typeInfo.createSerializer(new SerializerConfigImpl()) + .serialize(row, new DataOutputViewStreamWrapper(baos)); + } catch (IOException e) { + // throw as RuntimeException so the function can use in lambda + throw new RuntimeException(e); + } + return baos.toByteArray(); + } + /** insert table operation. */ public static class TextTableInserter {