Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/5193#discussion_r158394094
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
---
@@ -277,88 +263,67 @@ public void
testLogTimeoutAlmostReachedWarningDuringRecovery() throws Exception
private void assertExactlyOnce(List<String> expectedValues) throws
IOException {
ArrayList<String> actualValues = new ArrayList<>();
- for (File file : targetDirectory.listFiles()) {
- actualValues.addAll(Files.readAllLines(file.toPath(),
Charset.defaultCharset()));
+ for (String name : targetDirectory.listFiles()) {
+ actualValues.addAll(targetDirectory.read(name));
}
Collections.sort(actualValues);
Collections.sort(expectedValues);
assertEquals(expectedValues, actualValues);
}
- private class FileBasedSinkFunction extends
TwoPhaseCommitSinkFunction<String, FileTransaction, Void> {
+ private class ContentDumpSinkFunction extends
TwoPhaseCommitSinkFunction<String, ContentTransaction, Void> {
- public FileBasedSinkFunction() {
+ public ContentDumpSinkFunction() {
super(
- new KryoSerializer<>(FileTransaction.class, new
ExecutionConfig()),
+ new KryoSerializer<>(ContentTransaction.class,
new ExecutionConfig()),
VoidSerializer.INSTANCE, clock);
-
- if (!tmpDirectory.isDirectory() ||
!targetDirectory.isDirectory()) {
- throw new IllegalArgumentException();
- }
}
@Override
- protected void invoke(FileTransaction transaction, String
value, Context context) throws Exception {
- transaction.writer.write(value);
+ protected void invoke(ContentTransaction transaction, String
value, Context context) throws Exception {
+ transaction.tmpContentWriter.write(value);
}
@Override
- protected FileTransaction beginTransaction() throws Exception {
- File tmpFile = new File(tmpDirectory,
UUID.randomUUID().toString());
- return new FileTransaction(tmpFile);
+ protected ContentTransaction beginTransaction() throws
Exception {
+ return new
ContentTransaction(tmpDirectory.createWriter(UUID.randomUUID().toString()));
}
@Override
- protected void preCommit(FileTransaction transaction) throws
Exception {
- transaction.writer.flush();
- transaction.writer.close();
+ protected void preCommit(ContentTransaction transaction) throws
Exception {
+ transaction.tmpContentWriter.flush();
--- End diff --
Should also call `transaction.tmpContentWriter.close()` after the precommit?
---