Repository: kafka Updated Branches: refs/heads/trunk b836bd18f -> 198a43d84
KAFKA-5412: Using connect-console-sink/source.properties raises an exception related to "file" property not found Author: ppatierno <[email protected]> Reviewers: Randall Hauch <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #3279 from ppatierno/kafka-5412 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/198a43d8 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/198a43d8 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/198a43d8 Branch: refs/heads/trunk Commit: 198a43d84693153c0f0177b8314117525c05047b Parents: b836bd1 Author: ppatierno <[email protected]> Authored: Mon Jun 19 09:22:19 2017 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Mon Jun 19 09:22:19 2017 -0700 ---------------------------------------------------------------------- .../connect/file/FileStreamSinkConnector.java | 2 +- .../connect/file/FileStreamSourceConnector.java | 2 +- .../file/FileStreamSinkConnectorTest.java | 14 ++++++++++++++ .../connect/file/FileStreamSourceTaskTest.java | 19 +++++++++++++++++++ 4 files changed, 35 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/198a43d8/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java ---------------------------------------------------------------------- diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java index 449b9b1..4ae7f4b 100644 --- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java +++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java @@ -36,7 +36,7 @@ public class FileStreamSinkConnector extends SinkConnector { public static final String FILE_CONFIG = "file"; private static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Destination filename."); + .define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Destination filename. If not specified, the standard output will be used"); private String filename; http://git-wip-us.apache.org/repos/asf/kafka/blob/198a43d8/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java ---------------------------------------------------------------------- diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java index bf79b8a..335fe92 100644 --- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java +++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java @@ -38,7 +38,7 @@ public class FileStreamSourceConnector extends SourceConnector { public static final String FILE_CONFIG = "file"; private static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Source filename.") + .define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Source filename. If not specified, the standard input will be used") .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish data to"); private String filename; http://git-wip-us.apache.org/repos/asf/kafka/blob/198a43d8/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java ---------------------------------------------------------------------- diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java index 660a44c..aead7ef 100644 --- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java +++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; public class FileStreamSinkConnectorTest { @@ -74,6 +75,19 @@ public class FileStreamSinkConnectorTest { } @Test + public void testSinkTasksStdout() { + PowerMock.replayAll(); + + sinkProperties.remove(FileStreamSourceConnector.FILE_CONFIG); + connector.start(sinkProperties); + List<Map<String, String>> taskConfigs = connector.taskConfigs(1); + assertEquals(1, taskConfigs.size()); + assertNull(taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG)); + + PowerMock.verifyAll(); + } + + @Test public void testTaskClass() { PowerMock.replayAll(); http://git-wip-us.apache.org/repos/asf/kafka/blob/198a43d8/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java ---------------------------------------------------------------------- diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java index e8637f2..eb91dbd 100644 --- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java +++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java @@ -26,6 +26,7 @@ import org.junit.Before; import org.junit.Test; import org.powermock.api.easymock.PowerMock; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; @@ -133,6 +134,24 @@ public class FileStreamSourceTaskTest { task.start(config); } + @Test + public void testMissingFile() throws InterruptedException { + replay(); + + String data = "line\n"; + System.setIn(new ByteArrayInputStream(data.getBytes())); + + config.remove(FileStreamSourceConnector.FILE_CONFIG); + task.start(config); + + List<SourceRecord> records = task.poll(); + assertEquals(1, records.size()); + assertEquals(TOPIC, records.get(0).topic()); + assertEquals("line", records.get(0).value()); + + task.stop(); + } + public void testInvalidFile() throws InterruptedException { config.put(FileStreamSourceConnector.FILE_CONFIG, "bogusfilename"); task.start(config);
