lsyldliu commented on code in PR #21703: URL: https://github.com/apache/flink/pull/21703#discussion_r1081042620
########## flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableCompactSinkITCase.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.hive; + +import org.apache.flink.table.api.SqlDialect; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.catalog.hive.HiveCatalog; +import org.apache.flink.table.catalog.hive.HiveTestUtils; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.types.Row; +import org.apache.flink.util.CollectionUtil; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** IT case for Hive table compaction in batch mode. */ +public class HiveTableCompactSinkITCase { + + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER = new MiniClusterExtension(); + + private TableEnvironment tableEnv; + private HiveCatalog hiveCatalog; + private String warehouse; + + @BeforeEach + public void setUp() { + hiveCatalog = HiveTestUtils.createHiveCatalog(); + hiveCatalog.open(); + warehouse = hiveCatalog.getHiveConf().getVar(HiveConf.ConfVars.METASTOREWAREHOUSE); + tableEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE); + tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); + tableEnv.useCatalog(hiveCatalog.getName()); + } + + @AfterEach + public void tearDown() { + if (hiveCatalog != null) { + hiveCatalog.close(); + } + } + + @Test + public void testNoCompaction() throws Exception { Review Comment: I think we should use JUnit parameter to cover the two cases: ALL_EXCHANGES_PIPELINED and ALL_EXCHANGES_BLOCKING. ########## docs/content/docs/connectors/table/hive/hive_read_write.md: ########## @@ -558,6 +558,70 @@ use more threads to speed the gathering. **NOTE:** - Only `BATCH` mode supports to auto gather statistic, `STREAMING` mode doesn't support it yet. +### File Compaction + +The Hive sink also supports file compactions, which allows applications to reduce the number of files generated while writing into Hive. + +#### Stream Mode + +In stream mode, the behavior is same to `FileSystem` sink. Please refer to [File Compaction]({{< ref "docs/connectors/table/filesystem" >}}#file-compaction) for more details. + +#### Batch Mode + +When it's in batch mode and auto compaction is enabled, after finishing writing files, Flink will calculate the average size of written files for each partition. And if the average size is less than the +threshold configured, Flink will then try to compact these files to files with a target size. The following is the table's options for file compactions. Review Comment: ```suggestion configured threshold, then Flink will try to compact these files to files with the target size. The following are the table's options for file compaction. ``` ########## docs/content/docs/connectors/table/hive/hive_read_write.md: ########## @@ -558,6 +558,70 @@ use more threads to speed the gathering. **NOTE:** - Only `BATCH` mode supports to auto gather statistic, `STREAMING` mode doesn't support it yet. +### File Compaction + +The Hive sink also supports file compactions, which allows applications to reduce the number of files generated while writing into Hive. + +#### Stream Mode + +In stream mode, the behavior is same to `FileSystem` sink. Please refer to [File Compaction]({{< ref "docs/connectors/table/filesystem" >}}#file-compaction) for more details. + +#### Batch Mode + +When it's in batch mode and auto compaction is enabled, after finishing writing files, Flink will calculate the average size of written files for each partition. And if the average size is less than the +threshold configured, Flink will then try to compact these files to files with a target size. The following is the table's options for file compactions. + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Option</th> + <th class="text-left" style="width: 8%">Required</th> + <th class="text-left" style="width: 8%">Forwarded</th> + <th class="text-left" style="width: 7%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 42%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>auto-compaction</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Whether to enable automatic compaction in Hive sink or not. The data will be written to temporary files. The temporary files are invisible before compaction.</td> + </tr> + <tr> + <td><h5>compaction.small-files.avg-size</h5></td> + <td>optional</td> + <td>yes</td> + <td style="word-wrap: break-word;">16MB</td> + <td>MemorySize</td> + <td>The threshold for file compaction. If the average size of the files is less than this value, FLink will then compact these files. the default value is 16MB.</td> + </tr> + <tr> + <td><h5>compaction.file-size</h5></td> + <td>optional</td> + <td>yes</td> + <td style="word-wrap: break-word;">(none)</td> + <td>MemorySize</td> + <td>The compaction target file size, the default value is the <a href="{{< ref "docs/connectors/table/filesystem" >}}#sink.rolling-policy.file-size">rolling file size</a>.</td> + </tr> + <tr> + <td><h5>compaction.parallelism</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">(none)</td> + <td>Integer</td> + <td> + The parallelism to compact files. If not set, it will use the <a href="{{< ref "docs/connectors/table/filesystem" >}}#sink-parallelism">sink parallelism</a>. + When use <a href="{{< ref "docs/deployment/elastic_scaling" >}}#adaptive-batch-scheduler">adaptive batch scheduler</a>, the parallelism may be small, which will cause taking much time to finish compaction. Review Comment: ```suggestion When using <a href="{{< ref "docs/deployment/elastic_scaling" >}}#adaptive-batch-scheduler">adaptive batch scheduler</a>, the parallelism of the compact operator deduced by the scheduler may be small, which will cause taking much time to finish compaction. ``` ########## docs/content/docs/connectors/table/hive/hive_read_write.md: ########## @@ -558,6 +558,70 @@ use more threads to speed the gathering. **NOTE:** - Only `BATCH` mode supports to auto gather statistic, `STREAMING` mode doesn't support it yet. +### File Compaction + +The Hive sink also supports file compactions, which allows applications to reduce the number of files generated while writing into Hive. + +#### Stream Mode + +In stream mode, the behavior is same to `FileSystem` sink. Please refer to [File Compaction]({{< ref "docs/connectors/table/filesystem" >}}#file-compaction) for more details. + +#### Batch Mode + +When it's in batch mode and auto compaction is enabled, after finishing writing files, Flink will calculate the average size of written files for each partition. And if the average size is less than the +threshold configured, Flink will then try to compact these files to files with a target size. The following is the table's options for file compactions. + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Option</th> + <th class="text-left" style="width: 8%">Required</th> + <th class="text-left" style="width: 8%">Forwarded</th> + <th class="text-left" style="width: 7%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 42%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>auto-compaction</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Whether to enable automatic compaction in Hive sink or not. The data will be written to temporary files. The temporary files are invisible before compaction.</td> + </tr> + <tr> + <td><h5>compaction.small-files.avg-size</h5></td> + <td>optional</td> + <td>yes</td> + <td style="word-wrap: break-word;">16MB</td> + <td>MemorySize</td> + <td>The threshold for file compaction. If the average size of the files is less than this value, FLink will then compact these files. the default value is 16MB.</td> + </tr> + <tr> + <td><h5>compaction.file-size</h5></td> + <td>optional</td> + <td>yes</td> + <td style="word-wrap: break-word;">(none)</td> + <td>MemorySize</td> + <td>The compaction target file size, the default value is the <a href="{{< ref "docs/connectors/table/filesystem" >}}#sink.rolling-policy.file-size">rolling file size</a>.</td> + </tr> + <tr> + <td><h5>compaction.parallelism</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">(none)</td> + <td>Integer</td> + <td> + The parallelism to compact files. If not set, it will use the <a href="{{< ref "docs/connectors/table/filesystem" >}}#sink-parallelism">sink parallelism</a>. + When use <a href="{{< ref "docs/deployment/elastic_scaling" >}}#adaptive-batch-scheduler">adaptive batch scheduler</a>, the parallelism may be small, which will cause taking much time to finish compaction. + In such case, please remember to set this value to a bigger value manually. Review Comment: ```suggestion In such a case, please remember to set this option to a bigger value manually. ``` ########## docs/content/docs/connectors/table/hive/hive_read_write.md: ########## @@ -558,6 +558,70 @@ use more threads to speed the gathering. **NOTE:** - Only `BATCH` mode supports to auto gather statistic, `STREAMING` mode doesn't support it yet. +### File Compaction + +The Hive sink also supports file compactions, which allows applications to reduce the number of files generated while writing into Hive. + +#### Stream Mode + +In stream mode, the behavior is same to `FileSystem` sink. Please refer to [File Compaction]({{< ref "docs/connectors/table/filesystem" >}}#file-compaction) for more details. Review Comment: ```suggestion In stream mode, the behavior is the same as `FileSystem` sink. Please refer to [File Compaction]({{< ref "docs/connectors/table/filesystem" >}}#file-compaction) for more details. ``` ########## docs/content.zh/docs/connectors/table/hive/hive_read_write.md: ########## @@ -528,6 +528,69 @@ INSERT INTO TABLE fact_tz PARTITION (day, hour) select 1, '2022-8-8', '14'; **注意:** - 只有批模式才支持自动收集统计信息,流模式目前还不支持自动收集统计信息。 +### 文件合并 + +在使用 Flink 写 Hive 表的时候,Flink 也支持自动对小文件进行合并以减少小文件的数量。 + +#### Stream Mode + +流模式下,合并小文件的行为与写 `文件系统` 一样,更多细节请参考 [文件合并]({{< ref "docs/connectors/table/filesystem" >}}#file-compaction)。 + +#### Batch Mode + +在批模式,并且自动合并小文件已经开启的情况下,在结束写 Hive 表后,Flink 会计算每个分区下的文件平均大小,如果文件的平均大小小于用户指定的一个阈值,Flink 则会将这些文件合并成指定大小的文件。下面是文件合并涉及到的参数: Review Comment: ```suggestion 在批模式,并且自动合并小文件已经开启的情况下,在结束写 Hive 表后,Flink 会计算每个分区下文件的平均大小,如果文件的平均大小小于用户指定的一个阈值,Flink 则会将这些文件合并成指定大小的文件。下面是文件合并涉及到的参数: ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org