This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push:
new cbc9d462b29 [FLINK-29126][hive] Fix splitting file optimization
doesn't work for orc format
cbc9d462b29 is described below
commit cbc9d462b295243a61ebc544d9cf9ff6fa2a8aa6
Author: luoyuxia <[email protected]>
AuthorDate: Mon Aug 29 11:07:39 2022 +0800
[FLINK-29126][hive] Fix splitting file optimization doesn't work for orc
format
This closes #20694
(cherry picked from commit cf70844a56a0994dfcd7fb1859408683f2b621a3)
---
.../docs/connectors/table/hive/hive_read_write.md | 7 +-
.../docs/connectors/table/hive/hive_read_write.md | 9 ++-
.../apache/flink/connectors/hive/HiveOptions.java | 6 ++
.../flink/connectors/hive/HiveSourceBuilder.java | 20 ++++-
.../connectors/hive/HiveSourceFileEnumerator.java | 88 +++++++++++++++++-----
.../hive/HiveSourceFileEnumeratorTest.java | 44 ++++++++++-
.../connectors/hive/PartitionMonitorTest.java | 6 +-
7 files changed, 154 insertions(+), 26 deletions(-)
diff --git a/docs/content.zh/docs/connectors/table/hive/hive_read_write.md
b/docs/content.zh/docs/connectors/table/hive/hive_read_write.md
index 1bcb8ec4b94..033e8bb1486 100644
--- a/docs/content.zh/docs/connectors/table/hive/hive_read_write.md
+++ b/docs/content.zh/docs/connectors/table/hive/hive_read_write.md
@@ -183,7 +183,12 @@ Flink 允许你灵活的配置并发推断策略。你可以在 `TableConfig`
</tbody>
</table>
-**注意:** 目前上述参数仅适用于 ORC 格式的 Hive 表。
+{{< hint warning >}}
+**注意:**
+- 为了调整数据分片的大小, Flink 首先将计算得到所有分区下的所有文件的大小。
+ 但是这在分区数量很多的情况下会比较耗时,你可以配置作业参数
`table.exec.hive.calculate-partition-size.thread-num`(默认为3)为一个更大的值使用更多的线程来进行加速。
+- 目前上述参数仅适用于 ORC 格式的 Hive 表。
+{{< /hint >}}
### 加载分区切片
diff --git a/docs/content/docs/connectors/table/hive/hive_read_write.md
b/docs/content/docs/connectors/table/hive/hive_read_write.md
index 95f377732d6..3636eaf599c 100644
--- a/docs/content/docs/connectors/table/hive/hive_read_write.md
+++ b/docs/content/docs/connectors/table/hive/hive_read_write.md
@@ -198,8 +198,13 @@ Users can do some performance tuning by tuning the split's
size with the follow
</tr>
</tbody>
</table>
-
-**NOTE**: Currently, these two configurations only works for the Hive table
stored as ORC format.
+{{< hint warning >}}
+**NOTE**:
+- To tune the split's size, Flink will first get all files' size for all
partitions.
+ If there are too many partitions, it maybe time-consuming,
+ then you can configure the job configuration
`table.exec.hive.calculate-partition-size.thread-num` (3 by default) to a
bigger value to enable more threads to speed up the process.
+- Currently, these configurations for tuning split size only works for the
Hive table stored as ORC format.
+{{< /hint >}}
### Load Partition Splits
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java
index 38057a6e07a..9d7ce987578 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java
@@ -96,6 +96,12 @@ public class HiveOptions {
+ " When the value is over estimated,
Flink will tend to pack Hive's data into less splits, which will be helpful
when Hive's table contains many small files."
+ " And vice versa. It only works for the
Hive table stored as ORC format.");
+ public static final ConfigOption<Integer>
TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM =
+ key("table.exec.hive.calculate-partition-size.thread-num")
+ .intType()
+ .defaultValue(3)
+ .withDeprecatedKeys("The thread number to calculate
partition's size.");
+
public static final ConfigOption<Boolean>
TABLE_EXEC_HIVE_DYNAMIC_GROUPING_ENABLED =
key("table.exec.hive.sink.sort-by-dynamic-partition.enable")
.booleanType()
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
index 3837d718817..bb6ee5a6c1e 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
@@ -296,15 +296,18 @@ public class HiveSourceBuilder {
}
private void setFlinkConfigurationToJobConf() {
+ int splitPartitionThreadNum =
+
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM);
+ Preconditions.checkArgument(
+ splitPartitionThreadNum >= 1,
+
HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM.key()
+ + " cannot be less than 1");
jobConf.set(
HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM.key(),
- flinkConf
-
.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM)
- .toString());
+ String.valueOf(splitPartitionThreadNum));
jobConf.set(
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX.key(),
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX).toString());
-
jobConf.set(
HiveOptions.TABLE_EXEC_HIVE_SPLIT_MAX_BYTES.key(),
String.valueOf(
@@ -313,6 +316,15 @@ public class HiveSourceBuilder {
HiveOptions.TABLE_EXEC_HIVE_FILE_OPEN_COST.key(),
String.valueOf(
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FILE_OPEN_COST).getBytes()));
+ int calPartitionSizeThreadNum =
+
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM);
+ Preconditions.checkArgument(
+ calPartitionSizeThreadNum >= 1,
+
HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM.key()
+ + " cannot be less than 1");
+ jobConf.set(
+
HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM.key(),
+ String.valueOf(calPartitionSizeThreadNum));
}
private boolean isStreamingSource() {
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java
index 07a2dd60c31..2373e747148 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java
@@ -18,6 +18,7 @@
package org.apache.flink.connectors.hive;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.enumerate.FileEnumerator;
import org.apache.flink.connectors.hive.read.HiveSourceSplit;
@@ -36,6 +37,13 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static
org.apache.flink.util.concurrent.Executors.newDirectExecutorService;
/**
* A {@link FileEnumerator} implementation for hive source, which generates
splits based on {@link
@@ -81,10 +89,6 @@ public class HiveSourceFileEnumerator implements
FileEnumerator {
setSplitMaxSize(partitions, jobConf, minNumSplits);
}
int threadNum = getThreadNumToSplitHiveFile(jobConf);
- Preconditions.checkArgument(
- threadNum >= 1,
-
HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM.key()
- + " cannot be less than 1");
List<HiveSourceSplit> hiveSplits = new ArrayList<>();
try (MRSplitsGetter splitsGetter = new MRSplitsGetter(threadNum)) {
for (HiveTablePartitionSplits partitionSplits :
@@ -106,8 +110,12 @@ public class HiveSourceFileEnumerator implements
FileEnumerator {
// works for orc format
for (HiveTablePartition partition : partitions) {
String serializationLib =
-
partition.getStorageDescriptor().getSerdeInfo().getSerializationLib();
- if (!"orc".equalsIgnoreCase(serializationLib)) {
+ partition
+ .getStorageDescriptor()
+ .getSerdeInfo()
+ .getSerializationLib()
+ .toLowerCase();
+ if (!serializationLib.contains("orc")) {
return false;
}
}
@@ -140,21 +148,38 @@ public class HiveSourceFileEnumerator implements
FileEnumerator {
return Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes,
bytesPerSplit));
}
- private static long calculateFilesSizeWithOpenCost(
+ @VisibleForTesting
+ static long calculateFilesSizeWithOpenCost(
List<HiveTablePartition> partitions, JobConf jobConf, long
openCost)
throws IOException {
long totalBytesWithWeight = 0;
- for (HiveTablePartition partition : partitions) {
- StorageDescriptor sd = partition.getStorageDescriptor();
- org.apache.hadoop.fs.Path inputPath = new
org.apache.hadoop.fs.Path(sd.getLocation());
- FileSystem fs = inputPath.getFileSystem(jobConf);
- // it's possible a partition exists in metastore but the data has
been removed
- if (!fs.exists(inputPath)) {
- continue;
+ int calPartitionSizeThreadNum =
+ Integer.parseInt(
+ jobConf.get(
+
HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM
+ .key()));
+ ExecutorService executorService = null;
+ try {
+ executorService =
+ calPartitionSizeThreadNum == 1
+ ? newDirectExecutorService()
+ :
Executors.newFixedThreadPool(calPartitionSizeThreadNum);
+ List<Future<Long>> partitionFilesSizeFutures = new ArrayList<>();
+ for (HiveTablePartition partition : partitions) {
+ partitionFilesSizeFutures.add(
+ executorService.submit(
+ new PartitionFilesSizeCalculator(partition,
openCost, jobConf)));
}
- for (FileStatus fileStatus : fs.listStatus(inputPath)) {
- long fileByte = fileStatus.getLen();
- totalBytesWithWeight += (fileByte + openCost);
+ for (Future<Long> fileSizeFuture : partitionFilesSizeFutures) {
+ try {
+ totalBytesWithWeight += fileSizeFuture.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IOException("Fail to calculate total files'
size.", e);
+ }
+ }
+ } finally {
+ if (executorService != null) {
+ executorService.shutdown();
}
}
return totalBytesWithWeight;
@@ -212,4 +237,33 @@ public class HiveSourceFileEnumerator implements
FileEnumerator {
return new HiveSourceFileEnumerator(partitions,
jobConfWrapper.conf());
}
}
+
+ /** The calculator to calculate the total bytes with weight for a
partition. */
+ public static class PartitionFilesSizeCalculator implements Callable<Long>
{
+ private final HiveTablePartition hiveTablePartition;
+ private final Long openCost;
+ private final JobConf jobConf;
+
+ public PartitionFilesSizeCalculator(
+ HiveTablePartition hiveTablePartition, Long openCost, JobConf
jobConf) {
+ this.hiveTablePartition = hiveTablePartition;
+ this.openCost = openCost;
+ this.jobConf = jobConf;
+ }
+
+ @Override
+ public Long call() throws Exception {
+ long totalBytesWithWeight = 0L;
+ StorageDescriptor sd = hiveTablePartition.getStorageDescriptor();
+ org.apache.hadoop.fs.Path inputPath = new
org.apache.hadoop.fs.Path(sd.getLocation());
+ FileSystem fs = inputPath.getFileSystem(jobConf);
+ if (fs.exists(inputPath)) {
+ for (FileStatus fileStatus : fs.listStatus(inputPath)) {
+ long fileByte = fileStatus.getLen();
+ totalBytesWithWeight += (fileByte + openCost);
+ }
+ }
+ return totalBytesWithWeight;
+ }
+ }
}
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceFileEnumeratorTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceFileEnumeratorTest.java
index c9bb872e1e2..4fd1449b12b 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceFileEnumeratorTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceFileEnumeratorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.connectors.hive.read.HiveSourceSplit;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.mapred.JobConf;
import org.junit.Rule;
import org.junit.Test;
@@ -30,7 +31,9 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
@@ -43,17 +46,54 @@ public class HiveSourceFileEnumeratorTest {
@Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+ @Test
+ public void testCalculateFilesSize() throws Exception {
+ String baseFilePath =
+
Objects.requireNonNull(this.getClass().getResource("/orc/test.orc")).getPath();
+ long fileSize = Paths.get(baseFilePath).toFile().length();
+ File wareHouse = temporaryFolder.newFolder("testCalculateFilesSize");
+ int partitionNum = 10;
+ long openCost = 1;
+ List<HiveTablePartition> hiveTablePartitions = new ArrayList<>();
+ for (int i = 0; i < partitionNum; i++) {
+ // create partition directory
+ Path partitionPath = Paths.get(wareHouse.getPath(), "p_" + i);
+ Files.createDirectory(partitionPath);
+ // copy file to the partition directory
+ Files.copy(Paths.get(baseFilePath),
Paths.get(partitionPath.toString(), "t.orc"));
+ StorageDescriptor sd = new StorageDescriptor();
+ sd.setLocation(partitionPath.toString());
+ hiveTablePartitions.add(new HiveTablePartition(sd, new
Properties()));
+ }
+ // test calculation with one single thread
+ JobConf jobConf = new JobConf();
+
jobConf.set(HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM.key(),
"1");
+ long totalSize =
+ HiveSourceFileEnumerator.calculateFilesSizeWithOpenCost(
+ hiveTablePartitions, jobConf, openCost);
+ long expectedSize = partitionNum * (fileSize + openCost);
+ assertThat(totalSize).isEqualTo(expectedSize);
+
+ // test calculation with multiple threads
+
jobConf.set(HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM.key(),
"3");
+ totalSize =
+ HiveSourceFileEnumerator.calculateFilesSizeWithOpenCost(
+ hiveTablePartitions, jobConf, openCost);
+ assertThat(totalSize).isEqualTo(expectedSize);
+ }
+
@Test
public void testCreateInputSplits() throws Exception {
int numSplits = 1000;
// create a jobConf with default configuration
JobConf jobConf = new JobConf();
+
jobConf.set(HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM.key(),
"1");
File wareHouse = temporaryFolder.newFolder("testCreateInputSplits");
// init the files for the partition
StorageDescriptor sd = new StorageDescriptor();
// set orc format
SerDeInfo serdeInfo = new SerDeInfo();
- serdeInfo.setSerializationLib("orc");
+ serdeInfo.setSerializationLib(OrcSerde.class.getName());
sd.setSerdeInfo(serdeInfo);
sd.setInputFormat(OrcInputFormat.class.getName());
sd.setLocation(wareHouse.toString());
@@ -72,6 +112,7 @@ public class HiveSourceFileEnumeratorTest {
// set split max size and verify it works
jobConf = new JobConf();
+
jobConf.set(HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM.key(),
"1");
jobConf.set(HiveOptions.TABLE_EXEC_HIVE_SPLIT_MAX_BYTES.key(), "10");
// the splits should be more than the number of files
hiveSourceSplits =
@@ -84,6 +125,7 @@ public class HiveSourceFileEnumeratorTest {
assertThat(hiveSourceSplits.size()).isEqualTo(2);
jobConf = new JobConf();
+
jobConf.set(HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM.key(),
"1");
// set open cost and verify it works
jobConf.set(HiveOptions.TABLE_EXEC_HIVE_FILE_OPEN_COST.key(), "1");
hiveSourceSplits =
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/PartitionMonitorTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/PartitionMonitorTest.java
index fc7feeba89b..18b3e73dfc3 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/PartitionMonitorTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/PartitionMonitorTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.catalog.ObjectPath;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
import org.apache.hadoop.mapred.JobConf;
import org.apache.http.util.Asserts;
import org.junit.Test;
@@ -89,7 +90,9 @@ public class PartitionMonitorTest {
List<String> partitionValues, Integer createTime) {
StorageDescriptor sd = new StorageDescriptor();
sd.setLocation("/tmp/test");
- sd.setSerdeInfo(new SerDeInfo());
+ SerDeInfo serDeInfo = new SerDeInfo();
+ serDeInfo.setSerializationLib(ParquetHiveSerDe.class.getName());
+ sd.setSerdeInfo(serDeInfo);
Partition partition =
new Partition(
partitionValues, "testDb", "testTable", createTime,
createTime, sd, null);
@@ -101,6 +104,7 @@ public class PartitionMonitorTest {
private void preparePartitionMonitor() {
List<List<String>> seenPartitionsSinceOffset = new ArrayList<>();
JobConf jobConf = new JobConf();
+
jobConf.set(HiveOptions.TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM.key(),
"1");
Configuration configuration = new Configuration();
ObjectPath tablePath = new ObjectPath("testDb", "testTable");