This is an automated email from the ASF dual-hosted git repository. zouxxyy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new b83e8d47e1 [hive] respect hive split.minsize/maxsize configs (#5903) b83e8d47e1 is described below commit b83e8d47e18ab5f511b970c38e0866c8958a74e0 Author: Yann Byron <biyan900...@gmail.com> AuthorDate: Wed Jul 16 14:08:59 2025 +0800 [hive] respect hive split.minsize/maxsize configs (#5903) --- docs/content/maintenance/configurations.md | 6 + .../generated/hive_connector_configuration.html | 42 +++++++ paimon-docs/pom.xml | 7 ++ .../configuration/ConfigOptionsDocGenerator.java | 2 + .../apache/paimon/hive/HiveConnectorOptions.java | 41 +++++++ .../paimon/hive/mapred/PaimonInputFormat.java | 2 +- .../paimon/hive/utils/HiveSplitGenerator.java | 129 +++++++++++++++++++-- 7 files changed, 219 insertions(+), 10 deletions(-) diff --git a/docs/content/maintenance/configurations.md b/docs/content/maintenance/configurations.md index 99f797e68f..ce127feb0d 100644 --- a/docs/content/maintenance/configurations.md +++ b/docs/content/maintenance/configurations.md @@ -44,6 +44,12 @@ Options for Hive catalog. {{< generated/hive_catalog_configuration >}} +### HiveConnectorOptions + +Hive connector options for paimon. + +{{< generated/hive_connector_configuration >}} + ### JdbcCatalogOptions Options for Jdbc catalog. diff --git a/docs/layouts/shortcodes/generated/hive_connector_configuration.html b/docs/layouts/shortcodes/generated/hive_connector_configuration.html new file mode 100644 index 0000000000..4ed329bf3e --- /dev/null +++ b/docs/layouts/shortcodes/generated/hive_connector_configuration.html @@ -0,0 +1,42 @@ +{{/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/}} +<table class="configuration table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Key</th> + <th class="text-left" style="width: 15%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 55%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>paimon.respect.minmaxsplitsize.enabled</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>If true, Paimon will calculate the size of split through hive parameters about splits such as 'mapreduce.input.fileinputformat.split.minsize' and 'mapreduce.input.fileinputformat.split.maxsize', and then split.</td> + </tr> + <tr> + <td><h5>paimon.split.openfilecost</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>Long</td> + <td>The cost when open a file. The config will overwrite the table property 'source.split.open-file-cost'.</td> + </tr> + </tbody> +</table> diff --git a/paimon-docs/pom.xml b/paimon-docs/pom.xml index ba960e5eb7..5bffe074d5 100644 --- a/paimon-docs/pom.xml +++ b/paimon-docs/pom.xml @@ -44,6 +44,13 @@ under the License. <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.paimon</groupId> + <artifactId>paimon-hive-connector-common</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> <groupId>org.apache.paimon</groupId> <artifactId>paimon-flink-common</artifactId> diff --git a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java index 219e7df99c..02a18f375c 100644 --- a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java +++ b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java @@ -85,6 +85,8 @@ public class ConfigOptionsDocGenerator { "paimon-flink/paimon-flink-cdc", "org.apache.paimon.flink.kafka"), new OptionsClassLocation( "paimon-hive/paimon-hive-catalog", "org.apache.paimon.hive"), + new OptionsClassLocation( + "paimon-hive/paimon-hive-connector-common", "org.apache.paimon.hive"), new OptionsClassLocation( "paimon-spark/paimon-spark-common", "org.apache.paimon.spark") }; diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveConnectorOptions.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveConnectorOptions.java new file mode 100644 index 0000000000..cea118002f --- /dev/null +++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveConnectorOptions.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.hive; + +import org.apache.paimon.options.ConfigOption; + +import static org.apache.paimon.options.ConfigOptions.key; + +/** Options for hive connector. */ +public class HiveConnectorOptions { + + public static final ConfigOption<Boolean> HIVE_PAIMON_RESPECT_MINMAXSPLITSIZE_ENABLED = + key("paimon.respect.minmaxsplitsize.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "If true, Paimon will calculate the size of split through hive parameters about splits such as 'mapreduce.input.fileinputformat.split.minsize' and 'mapreduce.input.fileinputformat.split.maxsize', and then split."); + + public static final ConfigOption<Long> HIVE_PAIMON_SPLIT_OPENFILECOST = + key("paimon.split.openfilecost") + .longType() + .noDefaultValue() + .withDescription( + "The cost when open a file. The config will overwrite the table property 'source.split.open-file-cost'."); +} diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java index 81fced03f2..8ea844d6a8 100644 --- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java +++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java @@ -42,7 +42,7 @@ public class PaimonInputFormat implements InputFormat<Void, RowDataContainer> { @Override public InputSplit[] getSplits(JobConf jobConf, int numSplits) { FileStoreTable table = createFileStoreTable(jobConf); - return generateSplits(table, jobConf); + return generateSplits(table, jobConf, numSplits); } @Override diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java index 6af5da23fd..811522c151 100644 --- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java +++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java @@ -18,19 +18,26 @@ package org.apache.paimon.hive.utils; +import org.apache.paimon.hive.HiveConnectorOptions; import org.apache.paimon.hive.mapred.PaimonInputSplit; +import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.table.FallbackReadFileStoreTable; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.InnerTableScan; import org.apache.paimon.tag.TagPreview; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BinPacking; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; @@ -41,6 +48,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; import static java.util.Collections.singletonMap; import static org.apache.paimon.CoreOptions.SCAN_TAG_NAME; @@ -51,7 +60,10 @@ import static org.apache.paimon.partition.PartitionPredicate.createPartitionPred /** Generator to generate hive input splits. */ public class HiveSplitGenerator { - public static InputSplit[] generateSplits(FileStoreTable table, JobConf jobConf) { + private static final Logger LOG = LoggerFactory.getLogger(HiveSplitGenerator.class); + + public static InputSplit[] generateSplits( + FileStoreTable table, JobConf jobConf, int numSplits) { List<Predicate> predicates = new ArrayList<>(); createPredicate(table.schema(), jobConf, false).ifPresent(predicates::add); @@ -102,14 +114,17 @@ public class HiveSplitGenerator { scan.withFilter(PredicateBuilder.and(predicatePerPartition)); } } - scan.dropStats() - .plan() - .splits() - .forEach( - split -> - splits.add( - new PaimonInputSplit( - location, (DataSplit) split, table))); + List<DataSplit> dataSplits = + scan.dropStats().plan().splits().stream() + .map(s -> (DataSplit) s) + .collect(Collectors.toList()); + List<DataSplit> packed = dataSplits; + if (jobConf.getBoolean( + HiveConnectorOptions.HIVE_PAIMON_RESPECT_MINMAXSPLITSIZE_ENABLED.key(), + false)) { + packed = packSplits(table, jobConf, dataSplits, numSplits); + } + packed.forEach(ss -> splits.add(new PaimonInputSplit(location, ss, table))); } return splits.toArray(new InputSplit[0]); } @@ -142,4 +157,100 @@ public class HiveSplitGenerator { partition, rowType, defaultPartName)); } } + + private static List<DataSplit> packSplits( + FileStoreTable table, JobConf jobConf, List<DataSplit> splits, int numSplits) { + if (table.coreOptions().deletionVectorsEnabled()) { + return splits; + } + long openCostInBytes = + jobConf.getLong( + HiveConnectorOptions.HIVE_PAIMON_SPLIT_OPENFILECOST.key(), + table.coreOptions().splitOpenFileCost()); + long splitSize = computeSplitSize(jobConf, splits, numSplits, openCostInBytes); + List<DataSplit> dataSplits = new ArrayList<>(); + List<DataSplit> toPack = new ArrayList<>(); + int numFiles = 0; + for (DataSplit split : splits) { + if (split instanceof FallbackReadFileStoreTable.FallbackDataSplit) { + dataSplits.add(split); + } else if (split.beforeFiles().isEmpty() && split.rawConvertible()) { + numFiles += split.dataFiles().size(); + toPack.add(split); + } else { + dataSplits.add(split); + } + } + Function<DataFileMeta, Long> weightFunc = + file -> Math.max(file.fileSize(), openCostInBytes); + DataSplit current = null; + List<DataFileMeta> bin = new ArrayList<>(); + int numFilesAfterPacked = 0; + for (DataSplit split : toPack) { + if (current == null + || (current.partition().equals(split.partition()) + && current.bucket() == split.bucket())) { + current = split; + bin.addAll(split.dataFiles()); + } else { + // deal with files which belong to the previous partition or bucket. + List<List<DataFileMeta>> splitGroups = + BinPacking.packForOrdered(bin, weightFunc, splitSize); + for (List<DataFileMeta> fileGroups : splitGroups) { + DataSplit newSplit = buildDataSplit(current, fileGroups); + numFilesAfterPacked += newSplit.dataFiles().size(); + dataSplits.add(newSplit); + } + current = split; + bin.clear(); + } + } + if (!bin.isEmpty()) { + List<List<DataFileMeta>> splitGroups = + BinPacking.packForOrdered(bin, weightFunc, splitSize); + for (List<DataFileMeta> fileGroups : splitGroups) { + DataSplit newSplit = buildDataSplit(current, fileGroups); + numFilesAfterPacked += newSplit.dataFiles().size(); + dataSplits.add(newSplit); + } + } + LOG.info("The origin number of data files before pack: {}", numFiles); + LOG.info("The current number of data files after pack: {}", numFilesAfterPacked); + return dataSplits; + } + + private static DataSplit buildDataSplit(DataSplit current, List<DataFileMeta> fileGroups) { + return DataSplit.builder() + .withSnapshot(current.snapshotId()) + .withPartition(current.partition()) + .withBucket(current.bucket()) + .withTotalBuckets(current.totalBuckets()) + .withDataFiles(fileGroups) + .rawConvertible(current.rawConvertible()) + .withBucketPath(current.bucketPath()) + .build(); + } + + private static Long computeSplitSize( + JobConf jobConf, List<DataSplit> splits, int numSplits, long openCostInBytes) { + long maxSize = HiveConf.getLongVar(jobConf, HiveConf.ConfVars.MAPREDMAXSPLITSIZE); + long minSize = HiveConf.getLongVar(jobConf, HiveConf.ConfVars.MAPREDMINSPLITSIZE); + long totalSize = 0; + for (DataSplit split : splits) { + totalSize += + split.dataFiles().stream() + .map(f -> Math.max(f.fileSize(), openCostInBytes)) + .reduce(Long::sum) + .orElse(0L); + } + long avgSize = totalSize / numSplits; + long splitSize = Math.min(maxSize, Math.max(avgSize, minSize)); + LOG.info( + "Currently, minSplitSize: {}, maxSplitSize: {}, avgSize: {}, finalSplitSize: {}.", + minSize, + maxSize, + avgSize, + splitSize); + return splitSize; + } }