This is an automated email from the ASF dual-hosted git repository.

zouxxyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b83e8d47e1 [hive] respect hive split.minsize/maxsize configs (#5903)
b83e8d47e1 is described below

commit b83e8d47e18ab5f511b970c38e0866c8958a74e0
Author: Yann Byron <biyan900...@gmail.com>
AuthorDate: Wed Jul 16 14:08:59 2025 +0800

    [hive] respect hive split.minsize/maxsize configs (#5903)
---
 docs/content/maintenance/configurations.md         |   6 +
 .../generated/hive_connector_configuration.html    |  42 +++++++
 paimon-docs/pom.xml                                |   7 ++
 .../configuration/ConfigOptionsDocGenerator.java   |   2 +
 .../apache/paimon/hive/HiveConnectorOptions.java   |  41 +++++++
 .../paimon/hive/mapred/PaimonInputFormat.java      |   2 +-
 .../paimon/hive/utils/HiveSplitGenerator.java      | 129 +++++++++++++++++++--
 7 files changed, 219 insertions(+), 10 deletions(-)

diff --git a/docs/content/maintenance/configurations.md 
b/docs/content/maintenance/configurations.md
index 99f797e68f..ce127feb0d 100644
--- a/docs/content/maintenance/configurations.md
+++ b/docs/content/maintenance/configurations.md
@@ -44,6 +44,12 @@ Options for Hive catalog.
 
 {{< generated/hive_catalog_configuration >}}
 
+### HiveConnectorOptions
+
+Hive connector options for paimon.
+
+{{< generated/hive_connector_configuration >}}
+
 ### JdbcCatalogOptions
 
 Options for Jdbc catalog.
diff --git 
a/docs/layouts/shortcodes/generated/hive_connector_configuration.html 
b/docs/layouts/shortcodes/generated/hive_connector_configuration.html
new file mode 100644
index 0000000000..4ed329bf3e
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/hive_connector_configuration.html
@@ -0,0 +1,42 @@
+{{/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/}}
+<table class="configuration table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 10%">Type</th>
+            <th class="text-left" style="width: 55%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>paimon.respect.minmaxsplitsize.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>If true, Paimon will calculate the size of split through hive 
parameters about splits such as 'mapreduce.input.fileinputformat.split.minsize' 
and 'mapreduce.input.fileinputformat.split.maxsize', and then split.</td>
+        </tr>
+        <tr>
+            <td><h5>paimon.split.openfilecost</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Long</td>
+            <td>The cost when open a file. The config will overwrite the table 
property 'source.split.open-file-cost'.</td>
+        </tr>
+    </tbody>
+</table>
diff --git a/paimon-docs/pom.xml b/paimon-docs/pom.xml
index ba960e5eb7..5bffe074d5 100644
--- a/paimon-docs/pom.xml
+++ b/paimon-docs/pom.xml
@@ -44,6 +44,13 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-hive-connector-common</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.paimon</groupId>
             <artifactId>paimon-flink-common</artifactId>
diff --git 
a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
 
b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
index 219e7df99c..02a18f375c 100644
--- 
a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
+++ 
b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
@@ -85,6 +85,8 @@ public class ConfigOptionsDocGenerator {
                         "paimon-flink/paimon-flink-cdc", 
"org.apache.paimon.flink.kafka"),
                 new OptionsClassLocation(
                         "paimon-hive/paimon-hive-catalog", 
"org.apache.paimon.hive"),
+                new OptionsClassLocation(
+                        "paimon-hive/paimon-hive-connector-common", 
"org.apache.paimon.hive"),
                 new OptionsClassLocation(
                         "paimon-spark/paimon-spark-common", 
"org.apache.paimon.spark")
             };
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveConnectorOptions.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveConnectorOptions.java
new file mode 100644
index 0000000000..cea118002f
--- /dev/null
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveConnectorOptions.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive;
+
+import org.apache.paimon.options.ConfigOption;
+
+import static org.apache.paimon.options.ConfigOptions.key;
+
+/** Options for hive connector. */
+public class HiveConnectorOptions {
+
+    public static final ConfigOption<Boolean> 
HIVE_PAIMON_RESPECT_MINMAXSPLITSIZE_ENABLED =
+            key("paimon.respect.minmaxsplitsize.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "If true, Paimon will calculate the size of split 
through hive parameters about splits such as 
'mapreduce.input.fileinputformat.split.minsize' and 
'mapreduce.input.fileinputformat.split.maxsize', and then split.");
+
+    public static final ConfigOption<Long> HIVE_PAIMON_SPLIT_OPENFILECOST =
+            key("paimon.split.openfilecost")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The cost when open a file. The config will 
overwrite the table property 'source.split.open-file-cost'.");
+}
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
index 81fced03f2..8ea844d6a8 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
@@ -42,7 +42,7 @@ public class PaimonInputFormat implements InputFormat<Void, 
RowDataContainer> {
     @Override
     public InputSplit[] getSplits(JobConf jobConf, int numSplits) {
         FileStoreTable table = createFileStoreTable(jobConf);
-        return generateSplits(table, jobConf);
+        return generateSplits(table, jobConf, numSplits);
     }
 
     @Override
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
index 6af5da23fd..811522c151 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
@@ -18,19 +18,26 @@
 
 package org.apache.paimon.hive.utils;
 
+import org.apache.paimon.hive.HiveConnectorOptions;
 import org.apache.paimon.hive.mapred.PaimonInputSplit;
+import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.table.FallbackReadFileStoreTable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.tag.TagPreview;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.BinPacking;
 
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
@@ -41,6 +48,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static java.util.Collections.singletonMap;
 import static org.apache.paimon.CoreOptions.SCAN_TAG_NAME;
@@ -51,7 +60,10 @@ import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPred
 /** Generator to generate hive input splits. */
 public class HiveSplitGenerator {
 
-    public static InputSplit[] generateSplits(FileStoreTable table, JobConf 
jobConf) {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HiveSplitGenerator.class);
+
+    public static InputSplit[] generateSplits(
+            FileStoreTable table, JobConf jobConf, int numSplits) {
         List<Predicate> predicates = new ArrayList<>();
         createPredicate(table.schema(), jobConf, 
false).ifPresent(predicates::add);
 
@@ -102,14 +114,17 @@ public class HiveSplitGenerator {
                     
scan.withFilter(PredicateBuilder.and(predicatePerPartition));
                 }
             }
-            scan.dropStats()
-                    .plan()
-                    .splits()
-                    .forEach(
-                            split ->
-                                    splits.add(
-                                            new PaimonInputSplit(
-                                                    location, (DataSplit) 
split, table)));
+            List<DataSplit> dataSplits =
+                    scan.dropStats().plan().splits().stream()
+                            .map(s -> (DataSplit) s)
+                            .collect(Collectors.toList());
+            List<DataSplit> packed = dataSplits;
+            if (jobConf.getBoolean(
+                    
HiveConnectorOptions.HIVE_PAIMON_RESPECT_MINMAXSPLITSIZE_ENABLED.key(),
+                    false)) {
+                packed = packSplits(table, jobConf, dataSplits, numSplits);
+            }
+            packed.forEach(ss -> splits.add(new PaimonInputSplit(location, ss, 
table)));
         }
         return splits.toArray(new InputSplit[0]);
     }
@@ -142,4 +157,100 @@ public class HiveSplitGenerator {
                             partition, rowType, defaultPartName));
         }
     }
+
+    private static List<DataSplit> packSplits(
+            FileStoreTable table, JobConf jobConf, List<DataSplit> splits, int 
numSplits) {
+        if (table.coreOptions().deletionVectorsEnabled()) {
+            return splits;
+        }
+        long openCostInBytes =
+                jobConf.getLong(
+                        
HiveConnectorOptions.HIVE_PAIMON_SPLIT_OPENFILECOST.key(),
+                        table.coreOptions().splitOpenFileCost());
+        long splitSize = computeSplitSize(jobConf, splits, numSplits, 
openCostInBytes);
+        List<DataSplit> dataSplits = new ArrayList<>();
+        List<DataSplit> toPack = new ArrayList<>();
+        int numFiles = 0;
+        for (DataSplit split : splits) {
+            if (split instanceof FallbackReadFileStoreTable.FallbackDataSplit) 
{
+                dataSplits.add(split);
+            } else if (split.beforeFiles().isEmpty() && 
split.rawConvertible()) {
+                numFiles += split.dataFiles().size();
+                toPack.add(split);
+            } else {
+                dataSplits.add(split);
+            }
+        }
+        Function<DataFileMeta, Long> weightFunc =
+                file -> Math.max(file.fileSize(), openCostInBytes);
+        DataSplit current = null;
+        List<DataFileMeta> bin = new ArrayList<>();
+        int numFilesAfterPacked = 0;
+        for (DataSplit split : toPack) {
+            if (current == null
+                    || (current.partition().equals(split.partition())
+                            && current.bucket() == split.bucket())) {
+                current = split;
+                bin.addAll(split.dataFiles());
+            } else {
+                // deal with files which belong to the previous partition or 
bucket.
+                List<List<DataFileMeta>> splitGroups =
+                        BinPacking.packForOrdered(bin, weightFunc, splitSize);
+                for (List<DataFileMeta> fileGroups : splitGroups) {
+                    DataSplit newSplit = buildDataSplit(current, fileGroups);
+                    numFilesAfterPacked += newSplit.dataFiles().size();
+                    dataSplits.add(newSplit);
+                }
+                current = split;
+                bin.clear();
+            }
+        }
+        if (!bin.isEmpty()) {
+            List<List<DataFileMeta>> splitGroups =
+                    BinPacking.packForOrdered(bin, weightFunc, splitSize);
+            for (List<DataFileMeta> fileGroups : splitGroups) {
+                DataSplit newSplit = buildDataSplit(current, fileGroups);
+                numFilesAfterPacked += newSplit.dataFiles().size();
+                dataSplits.add(newSplit);
+            }
+        }
+        LOG.info("The origin number of data files before pack: {}", numFiles);
+        LOG.info("The current number of data files after pack: {}", 
numFilesAfterPacked);
+        return dataSplits;
+    }
+
+    private static DataSplit buildDataSplit(DataSplit current, 
List<DataFileMeta> fileGroups) {
+        return DataSplit.builder()
+                .withSnapshot(current.snapshotId())
+                .withPartition(current.partition())
+                .withBucket(current.bucket())
+                .withTotalBuckets(current.totalBuckets())
+                .withDataFiles(fileGroups)
+                .rawConvertible(current.rawConvertible())
+                .withBucketPath(current.bucketPath())
+                .build();
+    }
+
+    private static Long computeSplitSize(
+            JobConf jobConf, List<DataSplit> splits, int numSplits, long 
openCostInBytes) {
+        long maxSize = HiveConf.getLongVar(jobConf, 
HiveConf.ConfVars.MAPREDMAXSPLITSIZE);
+        long minSize = HiveConf.getLongVar(jobConf, 
HiveConf.ConfVars.MAPREDMINSPLITSIZE);
+        long totalSize = 0;
+        for (DataSplit split : splits) {
+            totalSize +=
+                    split.dataFiles().stream()
+                            .map(f -> Math.max(f.fileSize(), openCostInBytes))
+                            .reduce(Long::sum)
+                            .orElse(0L);
+        }
+        long avgSize = totalSize / numSplits;
+        long splitSize = Math.min(maxSize, Math.max(avgSize, minSize));
+        LOG.info(
+                "Currently, minSplitSize: {}, maxSplitSize: {}, avgSize: {}, 
finalSplitSize: {}.",
+                minSize,
+                maxSize,
+                avgSize,
+                splitSize);
+        return splitSize;
+    }
 }

Reply via email to