lsyldliu commented on code in PR #21703:
URL: https://github.com/apache/flink/pull/21703#discussion_r1090124175


##########
docs/content.zh/docs/connectors/table/hive/hive_read_write.md:
##########
@@ -528,6 +528,69 @@ INSERT INTO TABLE fact_tz PARTITION (day, hour) select 1, 
'2022-8-8', '14';
 **注意:**
 - 只有批模式才支持自动收集统计信息,流模式目前还不支持自动收集统计信息。
 
+### 文件合并
+
+在使用 Flink 写 Hive 表的时候,Flink 也支持自动对小文件进行合并以减少小文件的数量。
+
+#### Stream Mode
+
+流模式下,合并小文件的行为与写 `文件系统` 一样,更多细节请参考 [文件合并]({{< ref 
"docs/connectors/table/filesystem" >}}#file-compaction)。
+
+#### Batch Mode
+
+在批模式,并且自动合并小文件已经开启的情况下,在结束写 Hive 表后,Flink 
会计算每个分区下文件的平均大小,如果文件的平均大小小于用户指定的一个阈值,Flink 则会将这些文件合并成指定大小的文件。下面是文件合并涉及到的参数:
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 8%">Forwarded</th>
+        <th class="text-left" style="width: 7%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 42%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><h5>auto-compaction</h5></td>
+        <td>optional</td>
+        <td>no</td>
+        <td style="word-wrap: break-word;">false</td>
+        <td>Boolean</td>
+        <td>是否开启自动合并, 数据将会首先被写入临时文件,合并结束后,文件才可见。</td>
+    </tr>
+    <tr>
+        <td><h5>compaction.small-files.avg-size</h5></td>
+        <td>optional</td>
+        <td>yes</td>
+        <td style="word-wrap: break-word;">16MB</td>
+        <td>MemorySize</td>
+        <td>合并文件的阈值,当文件的平均大小小于该阈值, Flink 将对这些文件进行合并. 默认值是 16MB.</td>

Review Comment:
   ```suggestion
           <td>合并文件的阈值,当文件的平均大小小于该阈值,Flink 将对这些文件进行合并。 默认值是 16MB。</td>
   ```
   
   Please note the Chinese and English symbols.



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableCompactSinkITCase.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT case for Hive table compaction in batch mode. */
+public class HiveTableCompactSinkITCase {
+
+    @RegisterExtension
+    private static final MiniClusterExtension MINI_CLUSTER = new 
MiniClusterExtension();
+
+    private TableEnvironment tableEnv;
+    private HiveCatalog hiveCatalog;
+    private String warehouse;
+
+    @BeforeEach
+    public void setUp() {
+        hiveCatalog = HiveTestUtils.createHiveCatalog();
+        hiveCatalog.open();
+        warehouse = 
hiveCatalog.getHiveConf().getVar(HiveConf.ConfVars.METASTOREWAREHOUSE);
+        tableEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
+        tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+        tableEnv.useCatalog(hiveCatalog.getName());
+    }
+
+    @AfterEach
+    public void tearDown() {
+        if (hiveCatalog != null) {
+            hiveCatalog.close();
+        }
+    }
+
+    @Test
+    public void testNoCompaction() throws Exception {

Review Comment:
   I think we should test it manually.



##########
docs/content/docs/connectors/table/hive/hive_read_write.md:
##########
@@ -558,6 +558,70 @@ use more threads to speed the gathering.
 **NOTE:**
 - Only `BATCH` mode supports to auto gather statistic, `STREAMING` mode 
doesn't support it yet.
 
+### File Compaction
+
+The Hive sink also supports file compactions, which allows applications to 
reduce the number of files generated while writing into Hive.
+
+#### Stream Mode
+
+In stream mode, the behavior is the same as `FileSystem` sink. Please refer to 
[File Compaction]({{< ref "docs/connectors/table/filesystem" 
>}}#file-compaction) for more details.
+
+#### Batch Mode
+
+When it's in batch mode and auto compaction is enabled, after finishing 
writing files, Flink will calculate the average size of written files for each 
partition. And if the average size is less than the
+configured threshold, then Flink will try to compact these files to files with 
a target size. The following are the table's options for file compaction.
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 8%">Forwarded</th>
+        <th class="text-left" style="width: 7%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 42%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><h5>auto-compaction</h5></td>
+        <td>optional</td>
+        <td>no</td>
+        <td style="word-wrap: break-word;">false</td>
+        <td>Boolean</td>
+        <td>Whether to enable automatic compaction in Hive sink or not. The 
data will be written to temporary files. The temporary files are invisible 
before compaction.</td>

Review Comment:
   ```suggestion
           <td>Whether to enable automatic compaction in Hive sink or not. The 
data will be written to temporary files first. The temporary files are 
invisible before compaction.</td>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to