gaoyunhaii commented on a change in pull request #18827:
URL: https://github.com/apache/flink/pull/18827#discussion_r809855147
##########
File path: docs/content/docs/connectors/datastream/filesystem.md
##########
@@ -1016,10 +1016,14 @@ Once the total size of the cached files has reached the
size threshold or the nu
the cached files will be scheduled to compact.
The {{< javadoc
file="org/apache/flink/connector/file/sink/compactor/FileCompactor.html"
name="FileCompactor">}} specifies how to compact
-the give list of `Path` and write the result to {{< javadoc
file="org/apache/flink/streaming/api/functions/sink/filesystem/CompactingFileWriter.html"
name="CompactingFileWriter">}}. It could be classified into two types
according to the type of the give `CompactingFileWriter`:
-
-- **{{< javadoc
file="org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.html"
name="OutputStreamBasedFileCompactor">}}**: The given `CompactingFileWriter`
could be converted into an output stream that users could write the compacted
results into. An example is the {{< javadoc
file="org/apache/flink/connector/file/sink/compactor/ConcatFileCompactor.html"
name="ConcatFileCompactor">}} that concats the list of files directly.
-- **{{< javadoc
file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html"
name="RecordWiseFileCompactor">}}**: The given `CompactingFileWriter` allows
users to write records one-by-one into it. An example is the {{< javadoc
file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html"
name="RecordWiseFileCompactor">}} that reads records from the source files and
then writes them with the `CompactingFileWriter`. Users need to specify how to
read records from the source files.
+the give list of `Path` and write the result file. It could be classified into
two types according to how to write the file:
+
+- **{{< javadoc
file="org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.html"
name="OutputStreamBasedFileCompactor">}}**:
+ The users can write the compacted results into an output stream. This is
useful when the users don't want to or can't read records from the input files.
+ An example is the {{< javadoc
file="org/apache/flink/connector/file/sink/compactor/ConcatFileCompactor.html"
name="ConcatFileCompactor">}} that concats the list of files directly.
+- **{{< javadoc
file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html"
name="RecordWiseFileCompactor">}}**:
+ The compactor can read records one-by-one from the input files and write
into the result file as like the `FileWriter`.
Review comment:
`as like the` -> `similar to` ?
##########
File path:
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java
##########
@@ -95,11 +102,22 @@ public void close() {
Path targetPath = assembleCompactedFilePath(compactingFiles.get(0));
CompactingFileWriter compactingFileWriter =
bucketWriter.openNewCompactingFile(
- fileCompactor.getWriterType(),
+ compactingWriterType,
request.getBucketId(),
targetPath,
System.currentTimeMillis());
- fileCompactor.compact(compactingFiles, compactingFileWriter);
+ if (compactingWriterType == Type.RECORD_WISE) {
+ ((RecordWiseFileCompactor) fileCompactor)
+ .compact(
+ compactingFiles,
+ ((RecordWiseCompactingFileWriter)
compactingFileWriter)::write);
Review comment:
Might add `@SuppressWarnings({"rawtypes", "unchecked"})` to the method.
##########
File path:
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java
##########
@@ -95,11 +102,22 @@ public void close() {
Path targetPath = assembleCompactedFilePath(compactingFiles.get(0));
CompactingFileWriter compactingFileWriter =
bucketWriter.openNewCompactingFile(
- fileCompactor.getWriterType(),
+ compactingWriterType,
request.getBucketId(),
targetPath,
System.currentTimeMillis());
- fileCompactor.compact(compactingFiles, compactingFileWriter);
+ if (compactingWriterType == Type.RECORD_WISE) {
+ ((RecordWiseFileCompactor) fileCompactor)
Review comment:
Might change to `RecordWiseFileCompactor<?>`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]