This is an automated email from the ASF dual-hosted git repository.
gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3c99b00 [FLINK-26235][connectors/filesystem] CompactingFileWriter and
PendingFileRecoverable should not be exposed to users.
3c99b00 is described below
commit 3c99b005563debf35f43df5fb4ec92863e7775a9
Author: Gen Luo <[email protected]>
AuthorDate: Fri Feb 18 11:54:56 2022 +0800
[FLINK-26235][connectors/filesystem] CompactingFileWriter and
PendingFileRecoverable should not be exposed to users.
This closes #18827.
---
.../docs/connectors/datastream/filesystem.md | 10 +++---
.../docs/connectors/datastream/filesystem.md | 12 ++++---
.../7602816f-5c01-4b7a-9e3e-235dfedec245 | 3 +-
.../file/sink/compactor/FileCompactor.java | 23 +++---------
.../sink/compactor/IdenticalFileCompactor.java | 7 ++--
.../compactor/OutputStreamBasedFileCompactor.java | 22 ++++--------
.../sink/compactor/RecordWiseFileCompactor.java | 26 +++++++-------
.../sink/compactor/operator/CompactService.java | 41 ++++++++++++++++++----
.../sink/filesystem/CompactingFileWriter.java | 6 ++--
.../sink/filesystem/InProgressFileWriter.java | 2 --
10 files changed, 80 insertions(+), 72 deletions(-)
diff --git a/docs/content.zh/docs/connectors/datastream/filesystem.md
b/docs/content.zh/docs/connectors/datastream/filesystem.md
index e0e391c..31542da 100644
--- a/docs/content.zh/docs/connectors/datastream/filesystem.md
+++ b/docs/content.zh/docs/connectors/datastream/filesystem.md
@@ -1013,11 +1013,13 @@ val fileSink: FileSink[Integer] =
目前有两个并行的条件:目标文件大小与间隔的 Checkpoint 数量。当目前缓存的文件的总大小达到指定的阈值,或自上次合并后经过的 Checkpoint
次数已经达到指定次数时,
`FileSink` 将创建一个异步任务来合并当前缓存的文件。
-{{< javadoc
file="org/apache/flink/connector/file/sink/compactor/FileCompactor.html"
name="FileCompactor">}} 指定如何将给定的路径列表对应的文件进行合并将结果写入
-到 {{< javadoc
file="org/apache/flink/streaming/api/functions/sink/filesystem//CompactingFileWriter.html"
name="CompactingFileWriter">}} 中。根据所给定的 `CompactingFileWriter` 的类型,它可以分为两类:
+{{< javadoc
file="org/apache/flink/connector/file/sink/compactor/FileCompactor.html"
name="FileCompactor">}} 指定如何将给定的路径列表对应的文件进行合并将结果写入到文件中。
+根据如何写文件,它可以分为两类:
-- **{{< javadoc
file="org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.html"
name="OutputStreamBasedFileCompactor">}}** : 这种类型的 `CompactingFileWriter`
可以被转换为一个输出流,用户可以将合并后的结果直接写入该流中。这种类型的 `CompactingFileWriter` 的一个例子是 {{< javadoc
file="org/apache/flink/connector/file/sink/compactor/ConcatFileCompactor.html"
name="ConcatFileCompactor">}},它直接将给定的文件进行合并并将结果写到输出流中。
-- **{{< javadoc
file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html"
name="RecordWiseFileCompactor">}}** :这种类型的 `CompactingFileWriter`
允许用户将按条写入记录。`CompactingFileWriter` 的一个例子是 {{< javadoc
file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html"
name="RecordWiseFileCompactor">}} ,它从给定的文件中读出记录并写出到 `CompactingFileWriter`
中。用户需要指定如何从原始文件中读出记录。
+- **{{< javadoc
file="org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.html"
name="OutputStreamBasedFileCompactor">}}** :
+ 用户将合并后的结果写入一个输出流中。通常在用户不希望或者无法从输入文件中读取记录时使用。这种类型的 `CompactingFileWriter`
的一个例子是 {{< javadoc
file="org/apache/flink/connector/file/sink/compactor/ConcatFileCompactor.html"
name="ConcatFileCompactor">}},它直接将给定的文件进行合并并将结果写到输出流中。
+- **{{< javadoc
file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html"
name="RecordWiseFileCompactor">}}** :
+ 这种类型的 `CompactingFileWriter`
会逐条读出输入文件的记录用户,然后和`FileWriter`一样写入输出文件中。`CompactingFileWriter` 的一个例子是 {{<
javadoc
file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html"
name="RecordWiseFileCompactor">}} ,它从给定的文件中读出记录并写出到 `CompactingFileWriter`
中。用户需要指定如何从原始文件中读出记录。
{{< hint info >}}
**重要** 如果启用了文件合并功能,文件可见的时间会被延长。
diff --git a/docs/content/docs/connectors/datastream/filesystem.md
b/docs/content/docs/connectors/datastream/filesystem.md
index 7b0436c..85224c1 100644
--- a/docs/content/docs/connectors/datastream/filesystem.md
+++ b/docs/content/docs/connectors/datastream/filesystem.md
@@ -1016,10 +1016,14 @@ Once the total size of the cached files has reached the
size threshold or the nu
the cached files will be scheduled to compact.
The {{< javadoc
file="org/apache/flink/connector/file/sink/compactor/FileCompactor.html"
name="FileCompactor">}} specifies how to compact
-the give list of `Path` and write the result to {{< javadoc
file="org/apache/flink/streaming/api/functions/sink/filesystem/CompactingFileWriter.html"
name="CompactingFileWriter">}}. It could be classified into two types
according to the type of the give `CompactingFileWriter`:
-
-- **{{< javadoc
file="org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.html"
name="OutputStreamBasedFileCompactor">}}**: The given `CompactingFileWriter`
could be converted into an output stream that users could write the compacted
results into. An example is the {{< javadoc
file="org/apache/flink/connector/file/sink/compactor/ConcatFileCompactor.html"
name="ConcatFileCompactor">}} that concats the list of files directly.
-- **{{< javadoc
file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html"
name="RecordWiseFileCompactor">}}**: The given `CompactingFileWriter` allows
users to write records one-by-one into it. An example is the {{< javadoc
file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html"
name="RecordWiseFileCompactor">}} that reads records from the source files and
then writes them with the `CompactingFileWriter`. Users need to specify how to
re [...]
+the give list of `Path` and write the result file. It could be classified into
two types according to how to write the file:
+
+- **{{< javadoc
file="org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.html"
name="OutputStreamBasedFileCompactor">}}**:
+ The users can write the compacted results into an output stream. This is
useful when the users don't want to or can't read records from the input files.
+ An example is the {{< javadoc
file="org/apache/flink/connector/file/sink/compactor/ConcatFileCompactor.html"
name="ConcatFileCompactor">}} that concats the list of files directly.
+- **{{< javadoc
file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html"
name="RecordWiseFileCompactor">}}**:
+ The compactor can read records one-by-one from the input files and write
into the result file similar to the `FileWriter`.
+ An example is the {{< javadoc
file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html"
name="RecordWiseFileCompactor">}} that reads records from the source files and
then writes them with the `CompactingFileWriter`. Users need to specify how to
read records from the source files.
{{< hint info >}}
**Important** Once the compaction is enabled, the written files need to wait
for longer time before they get visible.
diff --git
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245
index f61d7d9..ae9b43c 100644
---
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245
+++
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245
@@ -225,6 +225,7 @@
org.apache.flink.streaming.api.functions.sink.filesystem.AbstractPartFileWriter
org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter$PendingFile
does not satisfy: annotated with @Internal or annotated with @Experimental or
annotated with @PublicEvolving or annotated with @Public or annotated with
@Deprecated
org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder
does not satisfy: annotated with @Internal or annotated with @Experimental or
annotated with @PublicEvolving or annotated with @Public or annotated with
@Deprecated
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter$InProgressFileRecoverable
does not satisfy: annotated with @Internal or annotated with @Experimental or
annotated with @PublicEvolving or annotated with @Public or annotated with
@Deprecated
+org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter$PendingFileRecoverable
does not satisfy: annotated with @Internal or annotated with @Experimental or
annotated with @PublicEvolving or annotated with @Public or annotated with
@Deprecated
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig does
not satisfy: annotated with @Internal or annotated with @Experimental or
annotated with @PublicEvolving or annotated with @Public or annotated with
@Deprecated
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter
does not satisfy: annotated with @Internal or annotated with @Experimental or
annotated with @PublicEvolving or annotated with @Public or annotated with
@Deprecated
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedInProgressFileRecoverable
does not satisfy: annotated with @Internal or annotated with @Experimental or
annotated with @PublicEvolving or annotated with @Public or annotated with
@Deprecated
@@ -310,4 +311,4 @@
org.apache.flink.streaming.api.windowing.triggers.TriggerResult does not satisfy
org.apache.flink.streaming.api.windowing.windows.GlobalWindow$Serializer does
not satisfy: annotated with @Internal or annotated with @Experimental or
annotated with @PublicEvolving or annotated with @Public or annotated with
@Deprecated
org.apache.flink.streaming.api.windowing.windows.GlobalWindow$Serializer$GlobalWindowSerializerSnapshot
does not satisfy: annotated with @Internal or annotated with @Experimental or
annotated with @PublicEvolving or annotated with @Public or annotated with
@Deprecated
org.apache.flink.streaming.api.windowing.windows.TimeWindow$Serializer does
not satisfy: annotated with @Internal or annotated with @Experimental or
annotated with @PublicEvolving or annotated with @Public or annotated with
@Deprecated
-org.apache.flink.streaming.api.windowing.windows.TimeWindow$Serializer$TimeWindowSerializerSnapshot
does not satisfy: annotated with @Internal or annotated with @Experimental or
annotated with @PublicEvolving or annotated with @Public or annotated with
@Deprecated
\ No newline at end of file
+org.apache.flink.streaming.api.windowing.windows.TimeWindow$Serializer$TimeWindowSerializerSnapshot
does not satisfy: annotated with @Internal or annotated with @Experimental or
annotated with @PublicEvolving or annotated with @Public or annotated with
@Deprecated
diff --git
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/FileCompactor.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/FileCompactor.java
index b737fd3..5217fee 100644
---
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/FileCompactor.java
+++
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/FileCompactor.java
@@ -19,30 +19,15 @@
package org.apache.flink.connector.file.sink.compactor;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.core.fs.Path;
-import
org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter;
import java.io.Serializable;
-import java.util.List;
/**
* The {@link FileCompactor} is responsible for compacting files into one file.
*
- * <p>The {@link FileCompactor} should declare which type of {@link
CompactingFileWriter} is
- * required, and invoke the writer correspondingly.
+ * <p>Users should never implement the interface directly but use either {@link
+ * OutputStreamBasedFileCompactor} or {@link RecordWiseFileCompactor}. Other
implementations will
+ * cause UnsupportedOperationException at runtime.
*/
@PublicEvolving
-public interface FileCompactor extends Serializable {
-
- /** @return the {@link CompactingFileWriter} type the compactor will use.
*/
- CompactingFileWriter.Type getWriterType();
-
- /**
- * Compact the given files into one file.
- *
- * @param inputFiles the files to be compacted.
- * @param writer the writer to write the compacted file.
- * @throws Exception Thrown if an exception occurs during the compacting.
- */
- void compact(List<Path> inputFiles, CompactingFileWriter writer) throws
Exception;
-}
+public interface FileCompactor extends Serializable {}
diff --git
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/IdenticalFileCompactor.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/IdenticalFileCompactor.java
index 8e6e4a1..923b56c 100644
---
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/IdenticalFileCompactor.java
+++
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/IdenticalFileCompactor.java
@@ -20,8 +20,8 @@ package org.apache.flink.connector.file.sink.compactor;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.Path;
-import
org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter;
+import java.io.OutputStream;
import java.util.List;
import static org.apache.flink.util.Preconditions.checkState;
@@ -37,9 +37,8 @@ public class IdenticalFileCompactor extends
ConcatFileCompactor {
super();
}
- @Override
- public void compact(List<Path> inputFiles, CompactingFileWriter writer)
throws Exception {
+ public void compact(List<Path> inputFiles, OutputStream outputStream)
throws Exception {
checkState(inputFiles.size() == 1, "IdenticalFileCompactor can only
copy one input file");
- super.compact(inputFiles, writer);
+ super.compact(inputFiles, outputStream);
}
}
diff --git
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.java
index 2ac3805..96bb902 100644
---
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.java
+++
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.java
@@ -20,32 +20,22 @@ package org.apache.flink.connector.file.sink.compactor;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.core.fs.Path;
-import
org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter;
-import
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedCompactingFileWriter;
import org.apache.flink.util.CloseShieldOutputStream;
import java.io.OutputStream;
import java.util.List;
/**
- * Base class for {@link FileCompactor} implementations that use the {@link
- * OutputStreamBasedCompactingFileWriter}.
+ * Base class for {@link FileCompactor} implementations that write the
compacting file by a output
+ * stream.
*/
@PublicEvolving
public abstract class OutputStreamBasedFileCompactor implements FileCompactor {
- @Override
- public final CompactingFileWriter.Type getWriterType() {
- return CompactingFileWriter.Type.OUTPUT_STREAM;
- }
- @Override
- public void compact(List<Path> inputFiles, CompactingFileWriter writer)
throws Exception {
- // The outputStream returned by
OutputStreamBasedCompactingFileWriter#asOutputStream should
- // not be closed here.
- CloseShieldOutputStream outputStream =
- new CloseShieldOutputStream(
- ((OutputStreamBasedCompactingFileWriter)
writer).asOutputStream());
- doCompact(inputFiles, outputStream);
+ public void compact(List<Path> inputFiles, OutputStream outputStream)
throws Exception {
+ // The outputStream should not be closed here.
+ CloseShieldOutputStream shieldOutputStream = new
CloseShieldOutputStream(outputStream);
+ doCompact(inputFiles, shieldOutputStream);
}
protected abstract void doCompact(List<Path> inputFiles, OutputStream
outputStream)
diff --git
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.java
index 3244961..17794cd 100644
---
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.java
+++
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.java
@@ -20,8 +20,6 @@ package org.apache.flink.connector.file.sink.compactor;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.core.fs.Path;
-import
org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter;
-import
org.apache.flink.streaming.api.functions.sink.filesystem.RecordWiseCompactingFileWriter;
import java.io.IOException;
import java.io.Serializable;
@@ -29,7 +27,7 @@ import java.util.List;
/**
* A {@link FileCompactor} implementation that reads input files with a {@link
Reader} and writes
- * with the {@link RecordWiseCompactingFileWriter}.
+ * with a {@link Writer}.
*/
@PublicEvolving
public class RecordWiseFileCompactor<IN> implements FileCompactor {
@@ -39,26 +37,28 @@ public class RecordWiseFileCompactor<IN> implements
FileCompactor {
this.readerFactory = readerFactory;
}
- @Override
- public final CompactingFileWriter.Type getWriterType() {
- return CompactingFileWriter.Type.RECORD_WISE;
- }
-
- @Override
- public void compact(List<Path> inputFiles, CompactingFileWriter writer)
throws Exception {
- RecordWiseCompactingFileWriter<IN> recordWriter =
- (RecordWiseCompactingFileWriter<IN>) writer;
+ public void compact(List<Path> inputFiles, Writer<IN> writer) throws
Exception {
for (Path input : inputFiles) {
try (Reader<IN> reader = readerFactory.createFor(input)) {
IN elem;
while ((elem = reader.read()) != null) {
- recordWriter.write(elem);
+ writer.write(elem);
}
}
}
}
/**
+ * The writer that writers record into the compacting files.
+ *
+ * @param <T> Thy type of the records that is read.
+ */
+ @PublicEvolving
+ public interface Writer<T> {
+ void write(T record) throws IOException;
+ }
+
+ /**
* The reader that reads record from the compacting files.
*
* @param <T> Thy type of the records that is read.
diff --git
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java
index 7e5c7f6..7ab5e93 100644
---
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java
+++
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java
@@ -21,11 +21,16 @@ package
org.apache.flink.connector.file.sink.compactor.operator;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.file.sink.FileSinkCommittable;
import org.apache.flink.connector.file.sink.compactor.FileCompactor;
+import
org.apache.flink.connector.file.sink.compactor.OutputStreamBasedFileCompactor;
+import org.apache.flink.connector.file.sink.compactor.RecordWiseFileCompactor;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
import
org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter.Type;
import
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedCompactingFileWriter;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.RecordWiseCompactingFileWriter;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import java.io.IOException;
@@ -44,6 +49,7 @@ public class CompactService {
private final int numCompactThreads;
private final FileCompactor fileCompactor;
+ private final CompactingFileWriter.Type compactingWriterType;
private final BucketWriter<?, String> bucketWriter;
private transient ExecutorService compactService;
@@ -55,6 +61,7 @@ public class CompactService {
this.numCompactThreads = numCompactThreads;
this.fileCompactor = fileCompactor;
this.bucketWriter = bucketWriter;
+ this.compactingWriterType = getWriterType(fileCompactor);
}
public void open() {
@@ -84,10 +91,11 @@ public class CompactService {
}
}
+ @SuppressWarnings({"rawtypes", "unchecked"})
private Iterable<FileSinkCommittable> compact(CompactorRequest request)
throws Exception {
List<FileSinkCommittable> results = new
ArrayList<>(request.getCommittableToPassthrough());
- List<Path> compactingFiles = getCompactingPath(request, results);
+ List<Path> compactingFiles = getCompactingPath(request);
if (compactingFiles.isEmpty()) {
return results;
}
@@ -95,11 +103,22 @@ public class CompactService {
Path targetPath = assembleCompactedFilePath(compactingFiles.get(0));
CompactingFileWriter compactingFileWriter =
bucketWriter.openNewCompactingFile(
- fileCompactor.getWriterType(),
+ compactingWriterType,
request.getBucketId(),
targetPath,
System.currentTimeMillis());
- fileCompactor.compact(compactingFiles, compactingFileWriter);
+ if (compactingWriterType == Type.RECORD_WISE) {
+ ((RecordWiseFileCompactor) fileCompactor)
+ .compact(
+ compactingFiles,
+ ((RecordWiseCompactingFileWriter)
compactingFileWriter)::write);
+ } else if (compactingWriterType ==
CompactingFileWriter.Type.OUTPUT_STREAM) {
+ ((OutputStreamBasedFileCompactor) fileCompactor)
+ .compact(
+ compactingFiles,
+ ((OutputStreamBasedCompactingFileWriter)
compactingFileWriter)
+ .asOutputStream());
+ }
PendingFileRecoverable compactedPendingFile =
compactingFileWriter.closeForCommit();
FileSinkCommittable compacted =
@@ -113,9 +132,7 @@ public class CompactService {
return results;
}
- // results: side output pass through committable
- private List<Path> getCompactingPath(
- CompactorRequest request, List<FileSinkCommittable> results)
throws IOException {
+ private List<Path> getCompactingPath(CompactorRequest request) throws
IOException {
List<FileSinkCommittable> compactingCommittable =
request.getCommittableToCompact();
List<Path> compactingFiles = new ArrayList<>();
@@ -144,4 +161,16 @@ public class CompactService {
}
return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX +
uncompactedName);
}
+
+ private static CompactingFileWriter.Type getWriterType(FileCompactor
fileCompactor) {
+ if (fileCompactor instanceof OutputStreamBasedFileCompactor) {
+ return CompactingFileWriter.Type.OUTPUT_STREAM;
+ } else if (fileCompactor instanceof RecordWiseFileCompactor) {
+ return CompactingFileWriter.Type.RECORD_WISE;
+ } else {
+ throw new UnsupportedOperationException(
+ "Unable to crate compacting file writer for compactor:"
+ + fileCompactor.getClass());
+ }
+ }
}
diff --git
a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/CompactingFileWriter.java
b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/CompactingFileWriter.java
index 23033aa..4c7716d 100644
---
a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/CompactingFileWriter.java
+++
b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/CompactingFileWriter.java
@@ -18,7 +18,7 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Internal;
import
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable;
import java.io.IOException;
@@ -31,7 +31,7 @@ import java.io.IOException;
* both. If an class implements both interfaces, once the write method of
either interface is
* called, the write method in the other one should be disabled.
*/
-@PublicEvolving
+@Internal
public interface CompactingFileWriter {
/**
@@ -44,7 +44,7 @@ public interface CompactingFileWriter {
PendingFileRecoverable closeForCommit() throws IOException;
/** Enum defining the types of {@link CompactingFileWriter}. */
- @PublicEvolving
+ @Internal
enum Type {
RECORD_WISE,
OUTPUT_STREAM
diff --git
a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
index dbc8159..8e81f31 100644
---
a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
+++
b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
@@ -19,7 +19,6 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.core.fs.Path;
import javax.annotation.Nullable;
@@ -67,7 +66,6 @@ public interface InProgressFileWriter<IN, BucketID>
interface InProgressFileRecoverable extends PendingFileRecoverable {}
/** The handle can be used to recover pending file. */
- @PublicEvolving
interface PendingFileRecoverable {
/** @return The target path of the pending file, null if unavailable.
*/