lsyldliu commented on code in PR #21703:
URL: https://github.com/apache/flink/pull/21703#discussion_r1081042620


##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableCompactSinkITCase.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT case for Hive table compaction in batch mode. */
+public class HiveTableCompactSinkITCase {
+
+    @RegisterExtension
+    private static final MiniClusterExtension MINI_CLUSTER = new 
MiniClusterExtension();
+
+    private TableEnvironment tableEnv;
+    private HiveCatalog hiveCatalog;
+    private String warehouse;
+
+    @BeforeEach
+    public void setUp() {
+        hiveCatalog = HiveTestUtils.createHiveCatalog();
+        hiveCatalog.open();
+        warehouse = 
hiveCatalog.getHiveConf().getVar(HiveConf.ConfVars.METASTOREWAREHOUSE);
+        tableEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
+        tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+        tableEnv.useCatalog(hiveCatalog.getName());
+    }
+
+    @AfterEach
+    public void tearDown() {
+        if (hiveCatalog != null) {
+            hiveCatalog.close();
+        }
+    }
+
+    @Test
+    public void testNoCompaction() throws Exception {

Review Comment:
   I think we should use JUnit parameter to cover the two cases: 
ALL_EXCHANGES_PIPELINED and ALL_EXCHANGES_BLOCKING.



##########
docs/content/docs/connectors/table/hive/hive_read_write.md:
##########
@@ -558,6 +558,70 @@ use more threads to speed the gathering.
 **NOTE:**
 - Only `BATCH` mode supports to auto gather statistic, `STREAMING` mode 
doesn't support it yet.
 
+### File Compaction
+
+The Hive sink also supports file compactions, which allows applications to 
reduce the number of files generated while writing into Hive.
+
+#### Stream Mode
+
+In stream mode, the behavior is same to `FileSystem` sink. Please refer to 
[File Compaction]({{< ref "docs/connectors/table/filesystem" 
>}}#file-compaction) for more details.
+
+#### Batch Mode
+
+When it's in batch mode and auto compaction is enabled, after finishing 
writing files, Flink will calculate the average size of written files for each 
partition. And if the average size is less than the
+threshold configured, Flink will then try to compact these files to files with 
a target size. The following is the table's options for file compactions.

Review Comment:
   ```suggestion
   configured threshold, then Flink will try to compact these files to files 
with the target size. The following are the table's options for file compaction.
   ```
   



##########
docs/content/docs/connectors/table/hive/hive_read_write.md:
##########
@@ -558,6 +558,70 @@ use more threads to speed the gathering.
 **NOTE:**
 - Only `BATCH` mode supports to auto gather statistic, `STREAMING` mode 
doesn't support it yet.
 
+### File Compaction
+
+The Hive sink also supports file compactions, which allows applications to 
reduce the number of files generated while writing into Hive.
+
+#### Stream Mode
+
+In stream mode, the behavior is same to `FileSystem` sink. Please refer to 
[File Compaction]({{< ref "docs/connectors/table/filesystem" 
>}}#file-compaction) for more details.
+
+#### Batch Mode
+
+When it's in batch mode and auto compaction is enabled, after finishing 
writing files, Flink will calculate the average size of written files for each 
partition. And if the average size is less than the
+threshold configured, Flink will then try to compact these files to files with 
a target size. The following is the table's options for file compactions.
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 8%">Forwarded</th>
+        <th class="text-left" style="width: 7%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 42%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><h5>auto-compaction</h5></td>
+        <td>optional</td>
+        <td>no</td>
+        <td style="word-wrap: break-word;">false</td>
+        <td>Boolean</td>
+        <td>Whether to enable automatic compaction in Hive sink or not. The 
data will be written to temporary files. The temporary files are invisible 
before compaction.</td>
+    </tr>
+    <tr>
+        <td><h5>compaction.small-files.avg-size</h5></td>
+        <td>optional</td>
+        <td>yes</td>
+        <td style="word-wrap: break-word;">16MB</td>
+        <td>MemorySize</td>
+        <td>The threshold for file compaction. If the average size of the 
files is less than this value, FLink will then compact these files. the default 
value is 16MB.</td>
+    </tr>
+    <tr>
+        <td><h5>compaction.file-size</h5></td>
+        <td>optional</td>
+        <td>yes</td>
+        <td style="word-wrap: break-word;">(none)</td>
+        <td>MemorySize</td>
+        <td>The compaction target file size, the default value is the <a 
href="{{< ref "docs/connectors/table/filesystem" 
>}}#sink.rolling-policy.file-size">rolling file size</a>.</td>
+    </tr>
+    <tr>
+        <td><h5>compaction.parallelism</h5></td>
+        <td>optional</td>
+        <td>no</td>
+        <td style="word-wrap: break-word;">(none)</td>
+        <td>Integer</td>
+        <td>
+        The parallelism to compact files. If not set, it will use the <a 
href="{{< ref "docs/connectors/table/filesystem" >}}#sink-parallelism">sink 
parallelism</a>.
+        When use <a href="{{< ref "docs/deployment/elastic_scaling" 
>}}#adaptive-batch-scheduler">adaptive batch scheduler</a>, the parallelism may 
be small, which will cause taking much time to finish compaction.

Review Comment:
   ```suggestion
           When using <a href="{{< ref "docs/deployment/elastic_scaling" 
>}}#adaptive-batch-scheduler">adaptive batch scheduler</a>, the parallelism of 
the compact operator deduced by the scheduler may be small, which will cause 
taking much time to finish compaction.
   ```



##########
docs/content/docs/connectors/table/hive/hive_read_write.md:
##########
@@ -558,6 +558,70 @@ use more threads to speed the gathering.
 **NOTE:**
 - Only `BATCH` mode supports to auto gather statistic, `STREAMING` mode 
doesn't support it yet.
 
+### File Compaction
+
+The Hive sink also supports file compactions, which allows applications to 
reduce the number of files generated while writing into Hive.
+
+#### Stream Mode
+
+In stream mode, the behavior is same to `FileSystem` sink. Please refer to 
[File Compaction]({{< ref "docs/connectors/table/filesystem" 
>}}#file-compaction) for more details.
+
+#### Batch Mode
+
+When it's in batch mode and auto compaction is enabled, after finishing 
writing files, Flink will calculate the average size of written files for each 
partition. And if the average size is less than the
+threshold configured, Flink will then try to compact these files to files with 
a target size. The following is the table's options for file compactions.
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 8%">Forwarded</th>
+        <th class="text-left" style="width: 7%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 42%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><h5>auto-compaction</h5></td>
+        <td>optional</td>
+        <td>no</td>
+        <td style="word-wrap: break-word;">false</td>
+        <td>Boolean</td>
+        <td>Whether to enable automatic compaction in Hive sink or not. The 
data will be written to temporary files. The temporary files are invisible 
before compaction.</td>
+    </tr>
+    <tr>
+        <td><h5>compaction.small-files.avg-size</h5></td>
+        <td>optional</td>
+        <td>yes</td>
+        <td style="word-wrap: break-word;">16MB</td>
+        <td>MemorySize</td>
+        <td>The threshold for file compaction. If the average size of the 
files is less than this value, FLink will then compact these files. the default 
value is 16MB.</td>
+    </tr>
+    <tr>
+        <td><h5>compaction.file-size</h5></td>
+        <td>optional</td>
+        <td>yes</td>
+        <td style="word-wrap: break-word;">(none)</td>
+        <td>MemorySize</td>
+        <td>The compaction target file size, the default value is the <a 
href="{{< ref "docs/connectors/table/filesystem" 
>}}#sink.rolling-policy.file-size">rolling file size</a>.</td>
+    </tr>
+    <tr>
+        <td><h5>compaction.parallelism</h5></td>
+        <td>optional</td>
+        <td>no</td>
+        <td style="word-wrap: break-word;">(none)</td>
+        <td>Integer</td>
+        <td>
+        The parallelism to compact files. If not set, it will use the <a 
href="{{< ref "docs/connectors/table/filesystem" >}}#sink-parallelism">sink 
parallelism</a>.
+        When use <a href="{{< ref "docs/deployment/elastic_scaling" 
>}}#adaptive-batch-scheduler">adaptive batch scheduler</a>, the parallelism may 
be small, which will cause taking much time to finish compaction.
+        In such case, please remember to set this value to a bigger value 
manually.

Review Comment:
   ```suggestion
           In such a case, please remember to set this option to a bigger value 
manually.
   ```



##########
docs/content/docs/connectors/table/hive/hive_read_write.md:
##########
@@ -558,6 +558,70 @@ use more threads to speed the gathering.
 **NOTE:**
 - Only `BATCH` mode supports to auto gather statistic, `STREAMING` mode 
doesn't support it yet.
 
+### File Compaction
+
+The Hive sink also supports file compactions, which allows applications to 
reduce the number of files generated while writing into Hive.
+
+#### Stream Mode
+
+In stream mode, the behavior is same to `FileSystem` sink. Please refer to 
[File Compaction]({{< ref "docs/connectors/table/filesystem" 
>}}#file-compaction) for more details.

Review Comment:
   ```suggestion
   In stream mode, the behavior is the same as `FileSystem` sink. Please refer 
to [File Compaction]({{< ref "docs/connectors/table/filesystem" 
>}}#file-compaction) for more details.
   ```



##########
docs/content.zh/docs/connectors/table/hive/hive_read_write.md:
##########
@@ -528,6 +528,69 @@ INSERT INTO TABLE fact_tz PARTITION (day, hour) select 1, 
'2022-8-8', '14';
 **注意:**
 - 只有批模式才支持自动收集统计信息,流模式目前还不支持自动收集统计信息。
 
+### 文件合并
+
+在使用 Flink 写 Hive 表的时候,Flink 也支持自动对小文件进行合并以减少小文件的数量。
+
+#### Stream Mode
+
+流模式下,合并小文件的行为与写 `文件系统` 一样,更多细节请参考 [文件合并]({{< ref 
"docs/connectors/table/filesystem" >}}#file-compaction)。
+
+#### Batch Mode
+
+在批模式,并且自动合并小文件已经开启的情况下,在结束写 Hive 表后,Flink 
会计算每个分区下的文件平均大小,如果文件的平均大小小于用户指定的一个阈值,Flink 则会将这些文件合并成指定大小的文件。下面是文件合并涉及到的参数:

Review Comment:
   ```suggestion
   在批模式,并且自动合并小文件已经开启的情况下,在结束写 Hive 表后,Flink 
会计算每个分区下文件的平均大小,如果文件的平均大小小于用户指定的一个阈值,Flink 则会将这些文件合并成指定大小的文件。下面是文件合并涉及到的参数:
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to