gaoyunhaii commented on a change in pull request #18797:
URL: https://github.com/apache/flink/pull/18797#discussion_r808637597
##########
File path: docs/content/docs/connectors/datastream/filesystem.md
##########
@@ -958,6 +958,73 @@ val sink = FileSink
{{< /tab >}}
{{< /tabs >}}
+### Compaction
+
+Since version 1.15 `FileSink` supports compaction of the `pending` files,
+which allows the application to have smaller checkpoint interval without
generating a lot of small files,
+especially when using the [bulk encoded formats]({{< ref
"docs/connectors/datastream/filesystem#bulk-encoded-formats" >}})
+that have to rolling on taking checkpoints.
+
+Compaction could be enabled with
+
+{{< tabs "enablecompaction" >}}
+{{< tab "Java" >}}
+```java
+
+FileSink<Integer> fileSink=
+ FileSink.forRowFormat(new Path(path),new SimpleStringEncoder<Integer>())
+ .enableCompact(
+ FileCompactStrategy.Builder.newBuilder()
+ .setSizeThreshold(1024)
+ .enableCompactionOnCheckpoint(5)
+ .build(),
+ new RecordWiseFileCompactor<>(
+ new DecoderBasedReader.Factory<>(SimpleStringDecoder::new)))
+ .build();
+
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+
+val fileSink: FileSink[Integer] =
+ FileSink.forRowFormat(new Path(path), new SimpleStringEncoder[Integer]())
+ .enableCompact(
+ FileCompactStrategy.Builder.newBuilder()
+ .setSizeThreshold(1024)
+ .enableCompactionOnCheckpoint(5)
+ .build(),
+ new RecordWiseFileCompactor(
+ new DecoderBasedReader.Factory(() => new SimpleStringDecoder)))
+ .build()
+
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+Once enabled, the compaction happens between the files become `pending` and
get committed. The pending files will
+be first committed to temporary files whose path starts with `.`. Then these
files will be compacted according to
+the strategy by the compactor specified by the users, and the new compacted
pending files will be generated.
+Then these pending files will be emitted to the committer to be committed to
the formal files. After that, the source files will be removed.
+
+When enabling compaction, you need to specify the {{< javadoc
file="org/apache/flink/connector/file/sink/compactor/FileCompactStrategy.html"
name="FileCompactStrategy">}}
+and the {{< javadoc
file="org/apache/flink/connector/file/sink/compactor/FileCompactor.html"
name="FileCompactor">}}.
+
+The {{< javadoc
file="org/apache/flink/connector/file/sink/compactor/FileCompactStrategy.html"
name="FileCompactStrategy">}} specifies
+when and which files get compacted. Currently, there are two parallel
conditions: the target file size and the number of checkpoints get passed.
+Once the total size of the cached files has reached the size threshold or the
number of checkpoints since the last compaction has reached the specified
number,
+the cached files will be scheduled to compact.
+
+The {{< javadoc
file="org/apache/flink/connector/file/sink/compactor/FileCompactor.html"
name="FileCompactor">}} specifies how to compact
+the give list of `Path` and write the result to {{< javadoc
file="org/apache/flink/connector/file/sink/filesystem/CompactingFileWriter.html"
name="CompactingFileWriter">}}. It could be classified into two types
according to the type of the give `CompactingFileWriter`:
Review comment:
The link to CompactingFileWriter seems not right. Also the same with the
Chinese version
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]