This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
     new cbc9d462b29 [FLINK-29126][hive] Fix splitting file optimization 
doesn't work for orc format
cbc9d462b29 is described below

commit cbc9d462b295243a61ebc544d9cf9ff6fa2a8aa6
Author: luoyuxia <[email protected]>
AuthorDate: Mon Aug 29 11:07:39 2022 +0800

    [FLINK-29126][hive] Fix splitting file optimization doesn't work for orc 
format
    
    This closes #20694
    
    (cherry picked from commit cf70844a56a0994dfcd7fb1859408683f2b621a3)
---
 .../docs/connectors/table/hive/hive_read_write.md  |  7 +-
 .../docs/connectors/table/hive/hive_read_write.md  |  9 ++-
 .../apache/flink/connectors/hive/HiveOptions.java  |  6 ++
 .../flink/connectors/hive/HiveSourceBuilder.java   | 20 ++++-
 .../connectors/hive/HiveSourceFileEnumerator.java  | 88 +++++++++++++++++-----
 .../hive/HiveSourceFileEnumeratorTest.java         | 44 ++++++++++-
 .../connectors/hive/PartitionMonitorTest.java      |  6 +-
 7 files changed, 154 insertions(+), 26 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/hive/hive_read_write.md 
b/docs/content.zh/docs/connectors/table/hive/hive_read_write.md
index 1bcb8ec4b94..033e8bb1486 100644
--- a/docs/content.zh/docs/connectors/table/hive/hive_read_write.md
+++ b/docs/content.zh/docs/connectors/table/hive/hive_read_write.md
@@ -183,7 +183,12 @@ Flink 允许你灵活的配置并发推断策略。你可以在 `TableConfig` 
   </tbody>
 </table>
 
-**注意:** 目前上述参数仅适用于 ORC 格式的 Hive 表。
+{{< hint warning >}}
+**注意:**
+- 为了调整数据分片的大小, Flink 首先将计算得到所有分区下的所有文件的大小。
+  但是这在分区数量很多的情况下会比较耗时,你可以配置作业参数 
`table.exec.hive.calculate-partition-size.thread-num`(默认为3)为一个更大的值使用更多的线程来进行加速。
+- 目前上述参数仅适用于 ORC 格式的 Hive 表。
+{{< /hint >}}
 
 ### 加载分区切片
 
diff --git a/docs/content/docs/connectors/table/hive/hive_read_write.md 
b/docs/content/docs/connectors/table/hive/hive_read_write.md
index 95f377732d6..3636eaf599c 100644
--- a/docs/content/docs/connectors/table/hive/hive_read_write.md
+++ b/docs/content/docs/connectors/table/hive/hive_read_write.md
@@ -198,8 +198,13 @@ Users can do some performance tuning by tuning the split's 
size with the follow
     </tr>
   </tbody>
 </table>
-
-**NOTE**: Currently, these two configurations only works for the Hive table 
stored as ORC format.
+{{< hint warning >}}
+**NOTE**:
+- To tune the split's size, Flink will first get all files' size for all 
partitions.
+  If there are too many partitions, it maybe time-consuming,
+  then you can configure the job configuration 
`table.exec.hive.calculate-partition-size.thread-num` (3 by default) to a 
bigger value to enable more threads to speed up the process.
+- Currently, these configurations for tuning split size only works for the 
Hive table stored as ORC format.
+{{< /hint >}}
 
 ### Load Partition Splits
 
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java
index 38057a6e07a..9d7ce987578 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java
@@ -96,6 +96,12 @@ public class HiveOptions {
                                     + " When the value is over estimated, 
Flink will tend to pack Hive's data into less splits, which will be helpful 
when Hive's table contains many small files."
                                     + " And vice versa. It only works for the 
Hive table stored as ORC format.");
 
+    public static final ConfigOption<Integer> 
TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM =
+            key("table.exec.hive.calculate-partition-size.thread-num")
+                    .intType()
+                    .defaultValue(3)
+                    .withDeprecatedKeys("The thread number to calculate 
partition's size.");
+
     public static final ConfigOption<Boolean> 
TABLE_EXEC_HIVE_DYNAMIC_GROUPING_ENABLED =
             key("table.exec.hive.sink.sort-by-dynamic-partition.enable")
                     .booleanType()
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
index 3837d718817..bb6ee5a6c1e 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
@@ -296,15 +296,18 @@ public class HiveSourceBuilder {
     }
 
     private void setFlinkConfigurationToJobConf() {
+        int splitPartitionThreadNum =
+                
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM);
+        Preconditions.checkArgument(
+                splitPartitionThreadNum >= 1,
+                
HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM.key()
+                        + " cannot be less than 1");
         jobConf.set(
                 
HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM.key(),
-                flinkConf
-                        
.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM)
-                        .toString());
+                String.valueOf(splitPartitionThreadNum));
         jobConf.set(
                 HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX.key(),
                 
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX).toString());
-
         jobConf.set(
                 HiveOptions.TABLE_EXEC_HIVE_SPLIT_MAX_BYTES.key(),
                 String.valueOf(
@@ -313,6 +316,15 @@ public class HiveSourceBuilder {
                 HiveOptions.TABLE_EXEC_HIVE_FILE_OPEN_COST.key(),
                 String.valueOf(
                         
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FILE_OPEN_COST).getBytes()));
+        int calPartitionSizeThreadNum =
+                
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM);
+        Preconditions.checkArgument(
+                calPartitionSizeThreadNum >= 1,
+                
HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM.key()
+                        + " cannot be less than 1");
+        jobConf.set(
+                
HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM.key(),
+                String.valueOf(calPartitionSizeThreadNum));
     }
 
     private boolean isStreamingSource() {
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java
index 07a2dd60c31..2373e747148 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connectors.hive;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.enumerate.FileEnumerator;
 import org.apache.flink.connectors.hive.read.HiveSourceSplit;
@@ -36,6 +37,13 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static 
org.apache.flink.util.concurrent.Executors.newDirectExecutorService;
 
 /**
  * A {@link FileEnumerator} implementation for hive source, which generates 
splits based on {@link
@@ -81,10 +89,6 @@ public class HiveSourceFileEnumerator implements 
FileEnumerator {
             setSplitMaxSize(partitions, jobConf, minNumSplits);
         }
         int threadNum = getThreadNumToSplitHiveFile(jobConf);
-        Preconditions.checkArgument(
-                threadNum >= 1,
-                
HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM.key()
-                        + " cannot be less than 1");
         List<HiveSourceSplit> hiveSplits = new ArrayList<>();
         try (MRSplitsGetter splitsGetter = new MRSplitsGetter(threadNum)) {
             for (HiveTablePartitionSplits partitionSplits :
@@ -106,8 +110,12 @@ public class HiveSourceFileEnumerator implements 
FileEnumerator {
         // works for orc format
         for (HiveTablePartition partition : partitions) {
             String serializationLib =
-                    
partition.getStorageDescriptor().getSerdeInfo().getSerializationLib();
-            if (!"orc".equalsIgnoreCase(serializationLib)) {
+                    partition
+                            .getStorageDescriptor()
+                            .getSerdeInfo()
+                            .getSerializationLib()
+                            .toLowerCase();
+            if (!serializationLib.contains("orc")) {
                 return false;
             }
         }
@@ -140,21 +148,38 @@ public class HiveSourceFileEnumerator implements 
FileEnumerator {
         return Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, 
bytesPerSplit));
     }
 
-    private static long calculateFilesSizeWithOpenCost(
+    @VisibleForTesting
+    static long calculateFilesSizeWithOpenCost(
             List<HiveTablePartition> partitions, JobConf jobConf, long 
openCost)
             throws IOException {
         long totalBytesWithWeight = 0;
-        for (HiveTablePartition partition : partitions) {
-            StorageDescriptor sd = partition.getStorageDescriptor();
-            org.apache.hadoop.fs.Path inputPath = new 
org.apache.hadoop.fs.Path(sd.getLocation());
-            FileSystem fs = inputPath.getFileSystem(jobConf);
-            // it's possible a partition exists in metastore but the data has 
been removed
-            if (!fs.exists(inputPath)) {
-                continue;
+        int calPartitionSizeThreadNum =
+                Integer.parseInt(
+                        jobConf.get(
+                                
HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM
+                                        .key()));
+        ExecutorService executorService = null;
+        try {
+            executorService =
+                    calPartitionSizeThreadNum == 1
+                            ? newDirectExecutorService()
+                            : 
Executors.newFixedThreadPool(calPartitionSizeThreadNum);
+            List<Future<Long>> partitionFilesSizeFutures = new ArrayList<>();
+            for (HiveTablePartition partition : partitions) {
+                partitionFilesSizeFutures.add(
+                        executorService.submit(
+                                new PartitionFilesSizeCalculator(partition, 
openCost, jobConf)));
             }
-            for (FileStatus fileStatus : fs.listStatus(inputPath)) {
-                long fileByte = fileStatus.getLen();
-                totalBytesWithWeight += (fileByte + openCost);
+            for (Future<Long> fileSizeFuture : partitionFilesSizeFutures) {
+                try {
+                    totalBytesWithWeight += fileSizeFuture.get();
+                } catch (InterruptedException | ExecutionException e) {
+                    throw new IOException("Fail to calculate total files' 
size.", e);
+                }
+            }
+        } finally {
+            if (executorService != null) {
+                executorService.shutdown();
             }
         }
         return totalBytesWithWeight;
@@ -212,4 +237,33 @@ public class HiveSourceFileEnumerator implements 
FileEnumerator {
             return new HiveSourceFileEnumerator(partitions, 
jobConfWrapper.conf());
         }
     }
+
+    /** The calculator to calculate the total bytes with weight for a 
partition. */
+    public static class PartitionFilesSizeCalculator implements Callable<Long> 
{
+        private final HiveTablePartition hiveTablePartition;
+        private final Long openCost;
+        private final JobConf jobConf;
+
+        public PartitionFilesSizeCalculator(
+                HiveTablePartition hiveTablePartition, Long openCost, JobConf 
jobConf) {
+            this.hiveTablePartition = hiveTablePartition;
+            this.openCost = openCost;
+            this.jobConf = jobConf;
+        }
+
+        @Override
+        public Long call() throws Exception {
+            long totalBytesWithWeight = 0L;
+            StorageDescriptor sd = hiveTablePartition.getStorageDescriptor();
+            org.apache.hadoop.fs.Path inputPath = new 
org.apache.hadoop.fs.Path(sd.getLocation());
+            FileSystem fs = inputPath.getFileSystem(jobConf);
+            if (fs.exists(inputPath)) {
+                for (FileStatus fileStatus : fs.listStatus(inputPath)) {
+                    long fileByte = fileStatus.getLen();
+                    totalBytesWithWeight += (fileByte + openCost);
+                }
+            }
+            return totalBytesWithWeight;
+        }
+    }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceFileEnumeratorTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceFileEnumeratorTest.java
index c9bb872e1e2..4fd1449b12b 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceFileEnumeratorTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceFileEnumeratorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.connectors.hive.read.HiveSourceSplit;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
 import org.apache.hadoop.mapred.JobConf;
 import org.junit.Rule;
 import org.junit.Test;
@@ -30,7 +31,9 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
@@ -43,17 +46,54 @@ public class HiveSourceFileEnumeratorTest {
 
     @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
 
+    @Test
+    public void testCalculateFilesSize() throws Exception {
+        String baseFilePath =
+                
Objects.requireNonNull(this.getClass().getResource("/orc/test.orc")).getPath();
+        long fileSize = Paths.get(baseFilePath).toFile().length();
+        File wareHouse = temporaryFolder.newFolder("testCalculateFilesSize");
+        int partitionNum = 10;
+        long openCost = 1;
+        List<HiveTablePartition> hiveTablePartitions = new ArrayList<>();
+        for (int i = 0; i < partitionNum; i++) {
+            // create partition directory
+            Path partitionPath = Paths.get(wareHouse.getPath(), "p_" + i);
+            Files.createDirectory(partitionPath);
+            // copy file to the partition directory
+            Files.copy(Paths.get(baseFilePath), 
Paths.get(partitionPath.toString(), "t.orc"));
+            StorageDescriptor sd = new StorageDescriptor();
+            sd.setLocation(partitionPath.toString());
+            hiveTablePartitions.add(new HiveTablePartition(sd, new 
Properties()));
+        }
+        // test calculation with one single thread
+        JobConf jobConf = new JobConf();
+        
jobConf.set(HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM.key(),
 "1");
+        long totalSize =
+                HiveSourceFileEnumerator.calculateFilesSizeWithOpenCost(
+                        hiveTablePartitions, jobConf, openCost);
+        long expectedSize = partitionNum * (fileSize + openCost);
+        assertThat(totalSize).isEqualTo(expectedSize);
+
+        // test calculation with multiple threads
+        
jobConf.set(HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM.key(),
 "3");
+        totalSize =
+                HiveSourceFileEnumerator.calculateFilesSizeWithOpenCost(
+                        hiveTablePartitions, jobConf, openCost);
+        assertThat(totalSize).isEqualTo(expectedSize);
+    }
+
     @Test
     public void testCreateInputSplits() throws Exception {
         int numSplits = 1000;
         // create a jobConf with default configuration
         JobConf jobConf = new JobConf();
+        
jobConf.set(HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM.key(),
 "1");
         File wareHouse = temporaryFolder.newFolder("testCreateInputSplits");
         // init the files for the partition
         StorageDescriptor sd = new StorageDescriptor();
         // set orc format
         SerDeInfo serdeInfo = new SerDeInfo();
-        serdeInfo.setSerializationLib("orc");
+        serdeInfo.setSerializationLib(OrcSerde.class.getName());
         sd.setSerdeInfo(serdeInfo);
         sd.setInputFormat(OrcInputFormat.class.getName());
         sd.setLocation(wareHouse.toString());
@@ -72,6 +112,7 @@ public class HiveSourceFileEnumeratorTest {
 
         // set split max size and verify it works
         jobConf = new JobConf();
+        
jobConf.set(HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM.key(),
 "1");
         jobConf.set(HiveOptions.TABLE_EXEC_HIVE_SPLIT_MAX_BYTES.key(), "10");
         // the splits should be more than the number of files
         hiveSourceSplits =
@@ -84,6 +125,7 @@ public class HiveSourceFileEnumeratorTest {
         assertThat(hiveSourceSplits.size()).isEqualTo(2);
 
         jobConf = new JobConf();
+        
jobConf.set(HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM.key(),
 "1");
         // set open cost and verify it works
         jobConf.set(HiveOptions.TABLE_EXEC_HIVE_FILE_OPEN_COST.key(), "1");
         hiveSourceSplits =
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/PartitionMonitorTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/PartitionMonitorTest.java
index fc7feeba89b..18b3e73dfc3 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/PartitionMonitorTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/PartitionMonitorTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.http.util.Asserts;
 import org.junit.Test;
@@ -89,7 +90,9 @@ public class PartitionMonitorTest {
             List<String> partitionValues, Integer createTime) {
         StorageDescriptor sd = new StorageDescriptor();
         sd.setLocation("/tmp/test");
-        sd.setSerdeInfo(new SerDeInfo());
+        SerDeInfo serDeInfo = new SerDeInfo();
+        serDeInfo.setSerializationLib(ParquetHiveSerDe.class.getName());
+        sd.setSerdeInfo(serDeInfo);
         Partition partition =
                 new Partition(
                         partitionValues, "testDb", "testTable", createTime, 
createTime, sd, null);
@@ -101,6 +104,7 @@ public class PartitionMonitorTest {
     private void preparePartitionMonitor() {
         List<List<String>> seenPartitionsSinceOffset = new ArrayList<>();
         JobConf jobConf = new JobConf();
+        
jobConf.set(HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM.key(),
 "1");
         Configuration configuration = new Configuration();
 
         ObjectPath tablePath = new ObjectPath("testDb", "testTable");

Reply via email to