This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5d2bf63 MINOR: FileStreamSinkTask should create file if it doesn't
exist (#5406)
5d2bf63 is described below
commit 5d2bf6328e1c5b054017419cf4de562dc8a3ec7a
Author: Konstantine Karantasis <[email protected]>
AuthorDate: Fri Jul 20 21:01:10 2018 -0700
MINOR: FileStreamSinkTask should create file if it doesn't exist (#5406)
A recent change from `new FileOutputStream` to `Files.newOutputStream`
missed the `CREATE` flag (which is necessary in addition to `APPEND`).
Reviewers: Ismael Juma <[email protected]>
---
.../kafka/connect/file/FileStreamSinkTask.java | 6 ++-
.../kafka/connect/file/FileStreamSinkTaskTest.java | 52 +++++++++++++++++++++-
2 files changed, 55 insertions(+), 3 deletions(-)
diff --git
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
index 328dee6..3d1d2b8 100644
---
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
+++
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
@@ -63,10 +63,12 @@ public class FileStreamSinkTask extends SinkTask {
outputStream = System.out;
} else {
try {
- outputStream = new
PrintStream(Files.newOutputStream(Paths.get(filename),
StandardOpenOption.APPEND), false,
+ outputStream = new PrintStream(
+ Files.newOutputStream(Paths.get(filename),
StandardOpenOption.CREATE, StandardOpenOption.APPEND),
+ false,
StandardCharsets.UTF_8.name());
} catch (IOException e) {
- throw new ConnectException("Couldn't find or create file for
FileStreamSinkTask", e);
+ throw new ConnectException("Couldn't find or create file '" +
filename + "' for FileStreamSinkTask", e);
}
}
}
diff --git
a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java
b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java
index c7ec9da..a5142a1 100644
---
a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java
+++
b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java
@@ -21,12 +21,21 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
import static org.junit.Assert.assertEquals;
@@ -36,11 +45,17 @@ public class FileStreamSinkTaskTest {
private ByteArrayOutputStream os;
private PrintStream printStream;
+ @Rule
+ public TemporaryFolder topDir = new TemporaryFolder();
+ private String outputFile;
+
@Before
- public void setup() {
+ public void setup() throws Exception {
os = new ByteArrayOutputStream();
printStream = new PrintStream(os);
task = new FileStreamSinkTask(printStream);
+ File outputDir = topDir.newFolder("file-stream-sink-" +
UUID.randomUUID().toString());
+ outputFile = outputDir.getCanonicalPath() + "/connect.output";
}
@Test
@@ -66,4 +81,39 @@ public class FileStreamSinkTaskTest {
task.flush(offsets);
assertEquals("line1" + newLine + "line2" + newLine + "line3" +
newLine, os.toString());
}
+
+ @Test
+ public void testStart() throws IOException {
+ task = new FileStreamSinkTask();
+ Map<String, String> props = new HashMap<>();
+ props.put(FileStreamSinkConnector.FILE_CONFIG, outputFile);
+ task.start(props);
+
+ HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+ task.put(Arrays.asList(
+ new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA,
"line0", 1)
+ ));
+ offsets.put(new TopicPartition("topic1", 0), new
OffsetAndMetadata(1L));
+ task.flush(offsets);
+
+ int numLines = 3;
+ String[] lines = new String[numLines];
+ int i = 0;
+ try (BufferedReader reader =
Files.newBufferedReader(Paths.get(outputFile))) {
+ lines[i++] = reader.readLine();
+ task.put(Arrays.asList(
+ new SinkRecord("topic1", 0, null, null,
Schema.STRING_SCHEMA, "line1", 2),
+ new SinkRecord("topic2", 0, null, null,
Schema.STRING_SCHEMA, "line2", 1)
+ ));
+ offsets.put(new TopicPartition("topic1", 0), new
OffsetAndMetadata(2L));
+ offsets.put(new TopicPartition("topic2", 0), new
OffsetAndMetadata(1L));
+ task.flush(offsets);
+ lines[i++] = reader.readLine();
+ lines[i++] = reader.readLine();
+ }
+
+ while (--i >= 0) {
+ assertEquals("line" + i, lines[i]);
+ }
+ }
}