This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 18142bff4b KAFKA-13809: Propagate full connector configuration to
tasks in FileStream connectors (#12450)
18142bff4b is described below
commit 18142bff4b313e0bfb9d26c70a5b4b469cbc4005
Author: Yash Mayya <[email protected]>
AuthorDate: Tue Aug 16 00:55:29 2022 +0530
KAFKA-13809: Propagate full connector configuration to tasks in FileStream
connectors (#12450)
Reviewers: Chris Egerton <[email protected]>
---
.../connect/file/FileStreamSinkConnector.java | 23 +++++-----
.../kafka/connect/file/FileStreamSinkTask.java | 6 ++-
.../connect/file/FileStreamSourceConnector.java | 39 +++++++----------
.../kafka/connect/file/FileStreamSourceTask.java | 14 +++----
.../connect/file/FileStreamSinkConnectorTest.java | 18 ++++++--
.../file/FileStreamSourceConnectorTest.java | 36 ++++++++++------
docs/connect.html | 49 +++++++++++++---------
7 files changed, 104 insertions(+), 81 deletions(-)
diff --git
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java
index 136e8998c2..fbe9cfff1d 100644
---
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java
+++
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java
@@ -23,23 +23,24 @@ import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
- * Very simple connector that works with the console. This connector supports
both source and
- * sink modes via its 'mode' setting.
+ * Very simple sink connector that works with stdout or a file.
*/
public class FileStreamSinkConnector extends SinkConnector {
+ private static final Logger log =
LoggerFactory.getLogger(FileStreamSinkConnector.class);
public static final String FILE_CONFIG = "file";
- private static final ConfigDef CONFIG_DEF = new ConfigDef()
+ static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Destination
filename. If not specified, the standard output will be used");
- private String filename;
+ private Map<String, String> props;
@Override
public String version() {
@@ -48,8 +49,11 @@ public class FileStreamSinkConnector extends SinkConnector {
@Override
public void start(Map<String, String> props) {
- AbstractConfig parsedConfig = new AbstractConfig(CONFIG_DEF, props);
- filename = parsedConfig.getString(FILE_CONFIG);
+ this.props = props;
+ AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
+ String filename = config.getString(FILE_CONFIG);
+ filename = (filename == null || filename.isEmpty()) ? "standard
output" : filename;
+ log.info("Starting file sink connector writing to {}", filename);
}
@Override
@@ -61,10 +65,7 @@ public class FileStreamSinkConnector extends SinkConnector {
public List<Map<String, String>> taskConfigs(int maxTasks) {
ArrayList<Map<String, String>> configs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
- Map<String, String> config = new HashMap<>();
- if (filename != null)
- config.put(FILE_CONFIG, filename);
- configs.add(config);
+ configs.add(props);
}
return configs;
}
diff --git
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
index 3d1d2b8543..cb19c01e60 100644
---
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
+++
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.file;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
@@ -58,8 +59,9 @@ public class FileStreamSinkTask extends SinkTask {
@Override
public void start(Map<String, String> props) {
- filename = props.get(FileStreamSinkConnector.FILE_CONFIG);
- if (filename == null) {
+ AbstractConfig config = new
AbstractConfig(FileStreamSinkConnector.CONFIG_DEF, props);
+ filename = config.getString(FileStreamSinkConnector.FILE_CONFIG);
+ if (filename == null || filename.isEmpty()) {
outputStream = System.out;
} else {
try {
diff --git
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
index 74b5f7c09e..61908c61b9 100644
---
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
+++
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
@@ -20,36 +20,35 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
- * Very simple connector that works with the console. This connector supports
both source and
- * sink modes via its 'mode' setting.
+ * Very simple source connector that works with stdin or a file.
*/
public class FileStreamSourceConnector extends SourceConnector {
+
+ private static final Logger log =
LoggerFactory.getLogger(FileStreamSourceConnector.class);
public static final String TOPIC_CONFIG = "topic";
public static final String FILE_CONFIG = "file";
public static final String TASK_BATCH_SIZE_CONFIG = "batch.size";
public static final int DEFAULT_TASK_BATCH_SIZE = 2000;
- private static final ConfigDef CONFIG_DEF = new ConfigDef()
+ static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Source
filename. If not specified, the standard input will be used")
- .define(TOPIC_CONFIG, Type.LIST, Importance.HIGH, "The topic to
publish data to")
+ .define(TOPIC_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new
ConfigDef.NonEmptyString(), Importance.HIGH, "The topic to publish data to")
.define(TASK_BATCH_SIZE_CONFIG, Type.INT, DEFAULT_TASK_BATCH_SIZE,
Importance.LOW,
- "The maximum number of records the Source task can read from
file one time");
+ "The maximum number of records the source task can read from
the file each time it is polled");
- private String filename;
- private String topic;
- private int batchSize;
+ private Map<String, String> props;
@Override
public String version() {
@@ -58,14 +57,11 @@ public class FileStreamSourceConnector extends
SourceConnector {
@Override
public void start(Map<String, String> props) {
- AbstractConfig parsedConfig = new AbstractConfig(CONFIG_DEF, props);
- filename = parsedConfig.getString(FILE_CONFIG);
- List<String> topics = parsedConfig.getList(TOPIC_CONFIG);
- if (topics.size() != 1) {
- throw new ConfigException("'topic' in FileStreamSourceConnector
configuration requires definition of a single topic");
- }
- topic = topics.get(0);
- batchSize = parsedConfig.getInt(TASK_BATCH_SIZE_CONFIG);
+ this.props = props;
+ AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
+ String filename = config.getString(FILE_CONFIG);
+ filename = (filename == null || filename.isEmpty()) ? "standard input"
: filename;
+ log.info("Starting file source connector reading from {}", filename);
}
@Override
@@ -77,12 +73,7 @@ public class FileStreamSourceConnector extends
SourceConnector {
public List<Map<String, String>> taskConfigs(int maxTasks) {
ArrayList<Map<String, String>> configs = new ArrayList<>();
// Only one input stream makes sense.
- Map<String, String> config = new HashMap<>();
- if (filename != null)
- config.put(FILE_CONFIG, filename);
- config.put(TOPIC_CONFIG, topic);
- config.put(TASK_BATCH_SIZE_CONFIG, String.valueOf(batchSize));
- configs.add(config);
+ configs.add(props);
return configs;
}
diff --git
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
index 8e3fb8976a..cda58cf2d0 100644
---
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
+++
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
@@ -29,6 +29,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
@@ -50,8 +51,8 @@ public class FileStreamSourceTask extends SourceTask {
private BufferedReader reader = null;
private char[] buffer;
private int offset = 0;
- private String topic = null;
- private int batchSize = FileStreamSourceConnector.DEFAULT_TASK_BATCH_SIZE;
+ private String topic;
+ private int batchSize;
private Long streamOffset;
@@ -71,17 +72,16 @@ public class FileStreamSourceTask extends SourceTask {
@Override
public void start(Map<String, String> props) {
- filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
+ AbstractConfig config = new
AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, props);
+ filename = config.getString(FileStreamSourceConnector.FILE_CONFIG);
if (filename == null || filename.isEmpty()) {
stream = System.in;
// Tracking offset for stdin doesn't make sense
streamOffset = null;
reader = new BufferedReader(new InputStreamReader(stream,
StandardCharsets.UTF_8));
}
- // Missing topic or parsing error is not possible because we've parsed
the config in the
- // Connector
- topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
- batchSize =
Integer.parseInt(props.get(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG));
+ topic = config.getString(FileStreamSourceConnector.TOPIC_CONFIG);
+ batchSize =
config.getInt(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG);
}
@Override
diff --git
a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java
b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java
index 548e388c41..0f1ab8e6e1 100644
---
a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java
+++
b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java
@@ -36,13 +36,12 @@ public class FileStreamSinkConnectorTest {
private static final String FILENAME = "/afilename";
private FileStreamSinkConnector connector;
- private ConnectorContext ctx;
private Map<String, String> sinkProperties;
@BeforeEach
public void setup() {
connector = new FileStreamSinkConnector();
- ctx = mock(ConnectorContext.class);
+ ConnectorContext ctx = mock(ConnectorContext.class);
connector.initialize(ctx);
sinkProperties = new HashMap<>();
@@ -74,11 +73,11 @@ public class FileStreamSinkConnectorTest {
@Test
public void testSinkTasksStdout() {
- sinkProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
+ sinkProperties.remove(FileStreamSinkConnector.FILE_CONFIG);
connector.start(sinkProperties);
List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
assertEquals(1, taskConfigs.size());
-
assertNull(taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG));
+
assertNull(taskConfigs.get(0).get(FileStreamSinkConnector.FILE_CONFIG));
}
@Test
@@ -86,4 +85,15 @@ public class FileStreamSinkConnectorTest {
connector.start(sinkProperties);
assertEquals(FileStreamSinkTask.class, connector.taskClass());
}
+
+ @Test
+ public void testConnectorConfigsPropagateToTaskConfigs() {
+ // This is required so that updates in transforms/converters/clients
configs get reflected
+ // in tasks without manual restarts of the tasks (see
https://issues.apache.org/jira/browse/KAFKA-13809)
+ sinkProperties.put("transforms", "insert");
+ connector.start(sinkProperties);
+ List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
+ assertEquals(1, taskConfigs.size());
+ assertEquals("insert", taskConfigs.get(0).get("transforms"));
+ }
}
diff --git
a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java
b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java
index 8e4661d13d..0ade9ba79b 100644
---
a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java
+++
b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.file;
+import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.ConnectorContext;
@@ -34,17 +35,15 @@ import java.util.Map;
public class FileStreamSourceConnectorTest {
private static final String SINGLE_TOPIC = "test";
- private static final String MULTIPLE_TOPICS = "test1,test2";
private static final String FILENAME = "/somefilename";
private FileStreamSourceConnector connector;
- private ConnectorContext ctx;
private Map<String, String> sourceProperties;
@BeforeEach
public void setup() {
connector = new FileStreamSourceConnector();
- ctx = mock(ConnectorContext.class);
+ ConnectorContext ctx = mock(ConnectorContext.class);
connector.initialize(ctx);
sourceProperties = new HashMap<>();
@@ -89,33 +88,44 @@ public class FileStreamSourceConnectorTest {
}
@Test
- public void testMultipleSourcesInvalid() {
- sourceProperties.put(FileStreamSourceConnector.TOPIC_CONFIG,
MULTIPLE_TOPICS);
- assertThrows(ConfigException.class, () ->
connector.start(sourceProperties));
+ public void testTaskClass() {
+ connector.start(sourceProperties);
+ assertEquals(FileStreamSourceTask.class, connector.taskClass());
}
@Test
- public void testTaskClass() {
+ public void testConnectorConfigsPropagateToTaskConfigs() {
+ // This is required so that updates in transforms/converters/clients
configs get reflected
+ // in tasks without manual restarts of the tasks (see
https://issues.apache.org/jira/browse/KAFKA-13809)
+ sourceProperties.put("transforms", "insert");
connector.start(sourceProperties);
- assertEquals(FileStreamSourceTask.class, connector.taskClass());
+ List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
+ assertEquals(1, taskConfigs.size());
+ assertEquals("insert", taskConfigs.get(0).get("transforms"));
+ }
+
+ @Test
+ public void testValidConfigsAndDefaults() {
+ AbstractConfig config = new
AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties);
+ assertEquals(SINGLE_TOPIC,
config.getString(FileStreamSourceConnector.TOPIC_CONFIG));
+ assertEquals(FileStreamSourceConnector.DEFAULT_TASK_BATCH_SIZE,
config.getInt(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG));
}
@Test
public void testMissingTopic() {
sourceProperties.remove(FileStreamSourceConnector.TOPIC_CONFIG);
- assertThrows(ConfigException.class, () ->
connector.start(sourceProperties));
+ assertThrows(ConfigException.class, () -> new
AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties));
}
@Test
public void testBlankTopic() {
- // Because of trimming this tests is same as testing for empty string.
- sourceProperties.put(FileStreamSourceConnector.TOPIC_CONFIG, " ");
- assertThrows(ConfigException.class, () ->
connector.start(sourceProperties));
+ sourceProperties.put(FileStreamSourceConnector.TOPIC_CONFIG, " ");
+ assertThrows(ConfigException.class, () -> new
AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties));
}
@Test
public void testInvalidBatchSize() {
sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG,
"abcd");
- assertThrows(ConfigException.class, () ->
connector.start(sourceProperties));
+ assertThrows(ConfigException.class, () -> new
AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties));
}
}
diff --git a/docs/connect.html b/docs/connect.html
index d13d25d313..57e9bb4809 100644
--- a/docs/connect.html
+++ b/docs/connect.html
@@ -403,12 +403,11 @@ errors.tolerance=all</pre>
<h5><a id="connect_connectorexample"
href="#connect_connectorexample">Connector Example</a></h5>
- <p>We'll cover the <code>SourceConnector</code> as a simple example.
<code>SinkConnector</code> implementations are very similar. Start by creating
the class that inherits from <code>SourceConnector</code> and add a couple of
fields that will store parsed configuration information (the filename to read
from and the topic to send data to):</p>
+ <p>We'll cover the <code>SourceConnector</code> as a simple example.
<code>SinkConnector</code> implementations are very similar. Start by creating
the class that inherits from <code>SourceConnector</code> and add a field that
will store the configuration information to be propagated to the task(s) (the
topic to send data to, and optionally - the filename to read from and the
maximum batch size):</p>
<pre class="brush: java;">
public class FileStreamSourceConnector extends SourceConnector {
- private String filename;
- private String topic;</pre>
+ private Map<String, String> props;</pre>
<p>The easiest method to fill in is <code>taskClass()</code>, which
defines the class that should be instantiated in worker processes to actually
read the data:</p>
@@ -423,9 +422,14 @@ public Class<? extends Task> taskClass() {
<pre class="brush: java;">
@Override
public void start(Map<String, String> props) {
- // The complete version includes error handling as well.
- filename = props.get(FILE_CONFIG);
- topic = props.get(TOPIC_CONFIG);
+ // Initialization logic and setting up of resources can take place in this
method.
+ // This connector doesn't need to do any of that, but we do log a helpful
message to the user.
+
+ this.props = props;
+ AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
+ String filename = config.getString(FILE_CONFIG);
+ filename = (filename == null || filename.isEmpty()) ? "standard input" :
config.getString(FILE_CONFIG);
+ log.info("Starting file source connector reading from {}", filename);
}
@Override
@@ -440,19 +444,14 @@ public void stop() {
<pre class="brush: java;">
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
+ // Note that the task configs could contain configs additional to or
different from the connector configs if needed. For instance,
+ // if different tasks have different responsibilities, or if different
tasks are meant to process different subsets of the source data stream).
ArrayList<Map<String, String>> configs = new
ArrayList<>();
// Only one input stream makes sense.
- Map<String, String> config = new HashMap<>();
- if (filename != null)
- config.put(FILE_CONFIG, filename);
- config.put(TOPIC_CONFIG, topic);
- configs.add(config);
+ configs.add(props);
return configs;
}</pre>
- <p>Although not used in the example, <code>SourceTask</code> also provides
two APIs to commit offsets in the source system: <code>commit</code> and
<code>commitRecord</code>. The APIs are provided for source systems which have
an acknowledgement mechanism for messages. Overriding these methods allows the
source connector to acknowledge messages in the source system, either in bulk
or individually, once they have been written to Kafka.
- The <code>commit</code> API stores the offsets in the source system, up to
the offsets that have been returned by <code>poll</code>. The implementation of
this API should block until the commit is complete. The
<code>commitRecord</code> API saves the offset in the source system for each
<code>SourceRecord</code> after it is written to Kafka. As Kafka Connect will
record offsets automatically, <code>SourceTask</code>s are not required to
implement them. In cases where a connector does [...]
-
<p>Even with multiple tasks, this method implementation is usually pretty
simple. It just has to determine the number of input tasks, which may require
contacting the remote service it is pulling data from, and then divvy them up.
Because some patterns for splitting work among tasks are so common, some
utilities are provided in <code>ConnectorUtils</code> to simplify these
cases.</p>
<p>Note that this simple example does not include dynamic input. See the
discussion in the next section for how to trigger updates to task configs.</p>
@@ -466,15 +465,17 @@ public List<Map<String, String>>
taskConfigs(int maxTasks) {
<pre class="brush: java;">
public class FileStreamSourceTask extends SourceTask {
- String filename;
- InputStream stream;
- String topic;
+ private String filename;
+ private InputStream stream;
+ private String topic;
+ private int batchSize;
@Override
public void start(Map<String, String> props) {
filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
stream = openOrThrowError(filename);
topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
+ batchSize =
props.get(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG);
}
@Override
@@ -497,6 +498,9 @@ public List<SourceRecord> poll() throws
InterruptedException {
Map<String, Object> sourcePartition =
Collections.singletonMap("filename", filename);
Map<String, Object> sourceOffset =
Collections.singletonMap("position", streamOffset);
records.add(new SourceRecord(sourcePartition, sourceOffset,
topic, Schema.STRING_SCHEMA, line));
+ if (records.size() >= batchSize) {
+ return records;
+ }
} else {
Thread.sleep(1);
}
@@ -513,6 +517,9 @@ public List<SourceRecord> poll() throws
InterruptedException {
<p>Note that this implementation uses the normal Java
<code>InputStream</code> interface and may sleep if data is not available. This
is acceptable because Kafka Connect provides each task with a dedicated thread.
While task implementations have to conform to the basic <code>poll()</code>
interface, they have a lot of flexibility in how they are implemented. In this
case, an NIO-based implementation would be more efficient, but this simple
approach works, is quick to implement, and i [...]
+ <p>Although not used in the example, <code>SourceTask</code> also provides
two APIs to commit offsets in the source system: <code>commit</code> and
<code>commitRecord</code>. The APIs are provided for source systems which have
an acknowledgement mechanism for messages. Overriding these methods allows the
source connector to acknowledge messages in the source system, either in bulk
or individually, once they have been written to Kafka.
+ The <code>commit</code> API stores the offsets in the source system,
up to the offsets that have been returned by <code>poll</code>. The
implementation of this API should block until the commit is complete. The
<code>commitRecord</code> API saves the offset in the source system for each
<code>SourceRecord</code> after it is written to Kafka. As Kafka Connect will
record offsets automatically, <code>SourceTask</code>s are not required to
implement them. In cases where a connector [...]
+
<h5><a id="connect_sinktasks" href="#connect_sinktasks">Sink Tasks</a></h5>
<p>The previous section described how to implement a simple
<code>SourceTask</code>. Unlike <code>SourceConnector</code> and
<code>SinkConnector</code>, <code>SourceTask</code> and <code>SinkTask</code>
have very different interfaces because <code>SourceTask</code> uses a pull
interface and <code>SinkTask</code> uses a push interface. Both share the
common lifecycle methods, but the <code>SinkTask</code> interface is quite
different:</p>
@@ -609,9 +616,11 @@ if (inputsChanged())
<p>The following code in <code>FileStreamSourceConnector</code> defines
the configuration and exposes it to the framework.</p>
<pre class="brush: java;">
-private static final ConfigDef CONFIG_DEF = new ConfigDef()
- .define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Source filename.")
- .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish
data to");
+static final ConfigDef CONFIG_DEF = new ConfigDef()
+ .define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Source filename.
If not specified, the standard input will be used")
+ .define(TOPIC_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new
ConfigDef.NonEmptyString(), Importance.HIGH, "The topic to publish data to")
+ .define(TASK_BATCH_SIZE_CONFIG, Type.INT, DEFAULT_TASK_BATCH_SIZE,
Importance.LOW,
+ "The maximum number of records the source task can read from the file
each time it is polled");
public ConfigDef config() {
return CONFIG_DEF;