This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 18142bff4b KAFKA-13809: Propagate full connector configuration to 
tasks in FileStream connectors (#12450)
18142bff4b is described below

commit 18142bff4b313e0bfb9d26c70a5b4b469cbc4005
Author: Yash Mayya <[email protected]>
AuthorDate: Tue Aug 16 00:55:29 2022 +0530

    KAFKA-13809: Propagate full connector configuration to tasks in FileStream 
connectors (#12450)
    
    Reviewers: Chris Egerton <[email protected]>
---
 .../connect/file/FileStreamSinkConnector.java      | 23 +++++-----
 .../kafka/connect/file/FileStreamSinkTask.java     |  6 ++-
 .../connect/file/FileStreamSourceConnector.java    | 39 +++++++----------
 .../kafka/connect/file/FileStreamSourceTask.java   | 14 +++----
 .../connect/file/FileStreamSinkConnectorTest.java  | 18 ++++++--
 .../file/FileStreamSourceConnectorTest.java        | 36 ++++++++++------
 docs/connect.html                                  | 49 +++++++++++++---------
 7 files changed, 104 insertions(+), 81 deletions(-)

diff --git 
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java
 
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java
index 136e8998c2..fbe9cfff1d 100644
--- 
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java
+++ 
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java
@@ -23,23 +23,24 @@ import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.sink.SinkConnector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 /**
- * Very simple connector that works with the console. This connector supports 
both source and
- * sink modes via its 'mode' setting.
+ * Very simple sink connector that works with stdout or a file.
  */
 public class FileStreamSinkConnector extends SinkConnector {
 
+    private static final Logger log = 
LoggerFactory.getLogger(FileStreamSinkConnector.class);
     public static final String FILE_CONFIG = "file";
-    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+    static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Destination 
filename. If not specified, the standard output will be used");
 
-    private String filename;
+    private Map<String, String> props;
 
     @Override
     public String version() {
@@ -48,8 +49,11 @@ public class FileStreamSinkConnector extends SinkConnector {
 
     @Override
     public void start(Map<String, String> props) {
-        AbstractConfig parsedConfig = new AbstractConfig(CONFIG_DEF, props);
-        filename = parsedConfig.getString(FILE_CONFIG);
+        this.props = props;
+        AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
+        String filename = config.getString(FILE_CONFIG);
+        filename = (filename == null || filename.isEmpty()) ? "standard 
output" : filename;
+        log.info("Starting file sink connector writing to {}", filename);
     }
 
     @Override
@@ -61,10 +65,7 @@ public class FileStreamSinkConnector extends SinkConnector {
     public List<Map<String, String>> taskConfigs(int maxTasks) {
         ArrayList<Map<String, String>> configs = new ArrayList<>();
         for (int i = 0; i < maxTasks; i++) {
-            Map<String, String> config = new HashMap<>();
-            if (filename != null)
-                config.put(FILE_CONFIG, filename);
-            configs.add(config);
+            configs.add(props);
         }
         return configs;
     }
diff --git 
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
 
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
index 3d1d2b8543..cb19c01e60 100644
--- 
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
+++ 
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.file;
 
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
@@ -58,8 +59,9 @@ public class FileStreamSinkTask extends SinkTask {
 
     @Override
     public void start(Map<String, String> props) {
-        filename = props.get(FileStreamSinkConnector.FILE_CONFIG);
-        if (filename == null) {
+        AbstractConfig config = new 
AbstractConfig(FileStreamSinkConnector.CONFIG_DEF, props);
+        filename = config.getString(FileStreamSinkConnector.FILE_CONFIG);
+        if (filename == null || filename.isEmpty()) {
             outputStream = System.out;
         } else {
             try {
diff --git 
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
 
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
index 74b5f7c09e..61908c61b9 100644
--- 
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
+++ 
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
@@ -20,36 +20,35 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 /**
- * Very simple connector that works with the console. This connector supports 
both source and
- * sink modes via its 'mode' setting.
+ * Very simple source connector that works with stdin or a file.
  */
 public class FileStreamSourceConnector extends SourceConnector {
+
+    private static final Logger log = 
LoggerFactory.getLogger(FileStreamSourceConnector.class);
     public static final String TOPIC_CONFIG = "topic";
     public static final String FILE_CONFIG = "file";
     public static final String TASK_BATCH_SIZE_CONFIG = "batch.size";
 
     public static final int DEFAULT_TASK_BATCH_SIZE = 2000;
 
-    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+    static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Source 
filename. If not specified, the standard input will be used")
-        .define(TOPIC_CONFIG, Type.LIST, Importance.HIGH, "The topic to 
publish data to")
+        .define(TOPIC_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new 
ConfigDef.NonEmptyString(), Importance.HIGH, "The topic to publish data to")
         .define(TASK_BATCH_SIZE_CONFIG, Type.INT, DEFAULT_TASK_BATCH_SIZE, 
Importance.LOW,
-                "The maximum number of records the Source task can read from 
file one time");
+                "The maximum number of records the source task can read from 
the file each time it is polled");
 
-    private String filename;
-    private String topic;
-    private int batchSize;
+    private Map<String, String> props;
 
     @Override
     public String version() {
@@ -58,14 +57,11 @@ public class FileStreamSourceConnector extends 
SourceConnector {
 
     @Override
     public void start(Map<String, String> props) {
-        AbstractConfig parsedConfig = new AbstractConfig(CONFIG_DEF, props);
-        filename = parsedConfig.getString(FILE_CONFIG);
-        List<String> topics = parsedConfig.getList(TOPIC_CONFIG);
-        if (topics.size() != 1) {
-            throw new ConfigException("'topic' in FileStreamSourceConnector 
configuration requires definition of a single topic");
-        }
-        topic = topics.get(0);
-        batchSize = parsedConfig.getInt(TASK_BATCH_SIZE_CONFIG);
+        this.props = props;
+        AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
+        String filename = config.getString(FILE_CONFIG);
+        filename = (filename == null || filename.isEmpty()) ? "standard input" 
: filename;
+        log.info("Starting file source connector reading from {}", filename);
     }
 
     @Override
@@ -77,12 +73,7 @@ public class FileStreamSourceConnector extends 
SourceConnector {
     public List<Map<String, String>> taskConfigs(int maxTasks) {
         ArrayList<Map<String, String>> configs = new ArrayList<>();
         // Only one input stream makes sense.
-        Map<String, String> config = new HashMap<>();
-        if (filename != null)
-            config.put(FILE_CONFIG, filename);
-        config.put(TOPIC_CONFIG, topic);
-        config.put(TASK_BATCH_SIZE_CONFIG, String.valueOf(batchSize));
-        configs.add(config);
+        configs.add(props);
         return configs;
     }
 
diff --git 
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
 
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
index 8e3fb8976a..cda58cf2d0 100644
--- 
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
+++ 
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
@@ -29,6 +29,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -50,8 +51,8 @@ public class FileStreamSourceTask extends SourceTask {
     private BufferedReader reader = null;
     private char[] buffer;
     private int offset = 0;
-    private String topic = null;
-    private int batchSize = FileStreamSourceConnector.DEFAULT_TASK_BATCH_SIZE;
+    private String topic;
+    private int batchSize;
 
     private Long streamOffset;
 
@@ -71,17 +72,16 @@ public class FileStreamSourceTask extends SourceTask {
 
     @Override
     public void start(Map<String, String> props) {
-        filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
+        AbstractConfig config = new 
AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, props);
+        filename = config.getString(FileStreamSourceConnector.FILE_CONFIG);
         if (filename == null || filename.isEmpty()) {
             stream = System.in;
             // Tracking offset for stdin doesn't make sense
             streamOffset = null;
             reader = new BufferedReader(new InputStreamReader(stream, 
StandardCharsets.UTF_8));
         }
-        // Missing topic or parsing error is not possible because we've parsed 
the config in the
-        // Connector
-        topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
-        batchSize = 
Integer.parseInt(props.get(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG));
+        topic = config.getString(FileStreamSourceConnector.TOPIC_CONFIG);
+        batchSize = 
config.getInt(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG);
     }
 
     @Override
diff --git 
a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java
 
b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java
index 548e388c41..0f1ab8e6e1 100644
--- 
a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java
+++ 
b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java
@@ -36,13 +36,12 @@ public class FileStreamSinkConnectorTest {
     private static final String FILENAME = "/afilename";
 
     private FileStreamSinkConnector connector;
-    private ConnectorContext ctx;
     private Map<String, String> sinkProperties;
 
     @BeforeEach
     public void setup() {
         connector = new FileStreamSinkConnector();
-        ctx = mock(ConnectorContext.class);
+        ConnectorContext ctx = mock(ConnectorContext.class);
         connector.initialize(ctx);
 
         sinkProperties = new HashMap<>();
@@ -74,11 +73,11 @@ public class FileStreamSinkConnectorTest {
 
     @Test
     public void testSinkTasksStdout() {
-        sinkProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
+        sinkProperties.remove(FileStreamSinkConnector.FILE_CONFIG);
         connector.start(sinkProperties);
         List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
         assertEquals(1, taskConfigs.size());
-        
assertNull(taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG));
+        
assertNull(taskConfigs.get(0).get(FileStreamSinkConnector.FILE_CONFIG));
     }
 
     @Test
@@ -86,4 +85,15 @@ public class FileStreamSinkConnectorTest {
         connector.start(sinkProperties);
         assertEquals(FileStreamSinkTask.class, connector.taskClass());
     }
+
+    @Test
+    public void testConnectorConfigsPropagateToTaskConfigs() {
+        // This is required so that updates in transforms/converters/clients 
configs get reflected
+        // in tasks without manual restarts of the tasks (see 
https://issues.apache.org/jira/browse/KAFKA-13809)
+        sinkProperties.put("transforms", "insert");
+        connector.start(sinkProperties);
+        List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
+        assertEquals(1, taskConfigs.size());
+        assertEquals("insert", taskConfigs.get(0).get("transforms"));
+    }
 }
diff --git 
a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java
 
b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java
index 8e4661d13d..0ade9ba79b 100644
--- 
a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java
+++ 
b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.file;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.connect.connector.ConnectorContext;
@@ -34,17 +35,15 @@ import java.util.Map;
 public class FileStreamSourceConnectorTest {
 
     private static final String SINGLE_TOPIC = "test";
-    private static final String MULTIPLE_TOPICS = "test1,test2";
     private static final String FILENAME = "/somefilename";
 
     private FileStreamSourceConnector connector;
-    private ConnectorContext ctx;
     private Map<String, String> sourceProperties;
 
     @BeforeEach
     public void setup() {
         connector = new FileStreamSourceConnector();
-        ctx = mock(ConnectorContext.class);
+        ConnectorContext ctx = mock(ConnectorContext.class);
         connector.initialize(ctx);
 
         sourceProperties = new HashMap<>();
@@ -89,33 +88,44 @@ public class FileStreamSourceConnectorTest {
     }
 
     @Test
-    public void testMultipleSourcesInvalid() {
-        sourceProperties.put(FileStreamSourceConnector.TOPIC_CONFIG, 
MULTIPLE_TOPICS);
-        assertThrows(ConfigException.class, () -> 
connector.start(sourceProperties));
+    public void testTaskClass() {
+        connector.start(sourceProperties);
+        assertEquals(FileStreamSourceTask.class, connector.taskClass());
     }
 
     @Test
-    public void testTaskClass() {
+    public void testConnectorConfigsPropagateToTaskConfigs() {
+        // This is required so that updates in transforms/converters/clients 
configs get reflected
+        // in tasks without manual restarts of the tasks (see 
https://issues.apache.org/jira/browse/KAFKA-13809)
+        sourceProperties.put("transforms", "insert");
         connector.start(sourceProperties);
-        assertEquals(FileStreamSourceTask.class, connector.taskClass());
+        List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
+        assertEquals(1, taskConfigs.size());
+        assertEquals("insert", taskConfigs.get(0).get("transforms"));
+    }
+
+    @Test
+    public void testValidConfigsAndDefaults() {
+        AbstractConfig config = new 
AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties);
+        assertEquals(SINGLE_TOPIC, 
config.getString(FileStreamSourceConnector.TOPIC_CONFIG));
+        assertEquals(FileStreamSourceConnector.DEFAULT_TASK_BATCH_SIZE, 
config.getInt(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG));
     }
 
     @Test
     public void testMissingTopic() {
         sourceProperties.remove(FileStreamSourceConnector.TOPIC_CONFIG);
-        assertThrows(ConfigException.class, () -> 
connector.start(sourceProperties));
+        assertThrows(ConfigException.class, () -> new 
AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties));
     }
 
     @Test
     public void testBlankTopic() {
-        // Because of trimming this tests is same as testing for empty string.
-        sourceProperties.put(FileStreamSourceConnector.TOPIC_CONFIG, "     ");
-        assertThrows(ConfigException.class, () -> 
connector.start(sourceProperties));
+        sourceProperties.put(FileStreamSourceConnector.TOPIC_CONFIG, "   ");
+        assertThrows(ConfigException.class, () -> new 
AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties));
     }
 
     @Test
     public void testInvalidBatchSize() {
         sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, 
"abcd");
-        assertThrows(ConfigException.class, () -> 
connector.start(sourceProperties));
+        assertThrows(ConfigException.class, () -> new 
AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties));
     }
 }
diff --git a/docs/connect.html b/docs/connect.html
index d13d25d313..57e9bb4809 100644
--- a/docs/connect.html
+++ b/docs/connect.html
@@ -403,12 +403,11 @@ errors.tolerance=all</pre>
 
     <h5><a id="connect_connectorexample" 
href="#connect_connectorexample">Connector Example</a></h5>
 
-    <p>We'll cover the <code>SourceConnector</code> as a simple example. 
<code>SinkConnector</code> implementations are very similar. Start by creating 
the class that inherits from <code>SourceConnector</code> and add a couple of 
fields that will store parsed configuration information (the filename to read 
from and the topic to send data to):</p>
+    <p>We'll cover the <code>SourceConnector</code> as a simple example. 
<code>SinkConnector</code> implementations are very similar. Start by creating 
the class that inherits from <code>SourceConnector</code> and add a field that 
will store the configuration information to be propagated to the task(s) (the 
topic to send data to, and optionally - the filename to read from and the 
maximum batch size):</p>
 
     <pre class="brush: java;">
 public class FileStreamSourceConnector extends SourceConnector {
-    private String filename;
-    private String topic;</pre>
+    private Map&lt;String, String&gt; props;</pre>
 
     <p>The easiest method to fill in is <code>taskClass()</code>, which 
defines the class that should be instantiated in worker processes to actually 
read the data:</p>
 
@@ -423,9 +422,14 @@ public Class&lt;? extends Task&gt; taskClass() {
     <pre class="brush: java;">
 @Override
 public void start(Map&lt;String, String&gt; props) {
-    // The complete version includes error handling as well.
-    filename = props.get(FILE_CONFIG);
-    topic = props.get(TOPIC_CONFIG);
+    // Initialization logic and setting up of resources can take place in this 
method.
+    // This connector doesn't need to do any of that, but we do log a helpful 
message to the user.
+
+    this.props = props;
+    AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
+    String filename = config.getString(FILE_CONFIG);
+    filename = (filename == null || filename.isEmpty()) ? "standard input" : 
config.getString(FILE_CONFIG);
+    log.info("Starting file source connector reading from {}", filename);
 }
 
 @Override
@@ -440,19 +444,14 @@ public void stop() {
     <pre class="brush: java;">
 @Override
 public List&lt;Map&lt;String, String&gt;&gt; taskConfigs(int maxTasks) {
+    // Note that the task configs could contain configs additional to or 
different from the connector configs if needed. For instance,
+    // if different tasks have different responsibilities, or if different 
tasks are meant to process different subsets of the source data stream).
     ArrayList&lt;Map&lt;String, String&gt;&gt; configs = new 
ArrayList&lt;&gt;();
     // Only one input stream makes sense.
-    Map&lt;String, String&gt; config = new HashMap&lt;&gt;();
-    if (filename != null)
-        config.put(FILE_CONFIG, filename);
-    config.put(TOPIC_CONFIG, topic);
-    configs.add(config);
+    configs.add(props);
     return configs;
 }</pre>
 
-    <p>Although not used in the example, <code>SourceTask</code> also provides 
two APIs to commit offsets in the source system: <code>commit</code> and 
<code>commitRecord</code>. The APIs are provided for source systems which have 
an acknowledgement mechanism for messages. Overriding these methods allows the 
source connector to acknowledge messages in the source system, either in bulk 
or individually, once they have been written to Kafka.
-    The <code>commit</code> API stores the offsets in the source system, up to 
the offsets that have been returned by <code>poll</code>. The implementation of 
this API should block until the commit is complete. The 
<code>commitRecord</code> API saves the offset in the source system for each 
<code>SourceRecord</code> after it is written to Kafka. As Kafka Connect will 
record offsets automatically, <code>SourceTask</code>s are not required to 
implement them. In cases where a connector does [...]
-
     <p>Even with multiple tasks, this method implementation is usually pretty 
simple. It just has to determine the number of input tasks, which may require 
contacting the remote service it is pulling data from, and then divvy them up. 
Because some patterns for splitting work among tasks are so common, some 
utilities are provided in <code>ConnectorUtils</code> to simplify these 
cases.</p>
 
     <p>Note that this simple example does not include dynamic input. See the 
discussion in the next section for how to trigger updates to task configs.</p>
@@ -466,15 +465,17 @@ public List&lt;Map&lt;String, String&gt;&gt; 
taskConfigs(int maxTasks) {
 
     <pre class="brush: java;">
 public class FileStreamSourceTask extends SourceTask {
-    String filename;
-    InputStream stream;
-    String topic;
+    private String filename;
+    private InputStream stream;
+    private String topic;
+    private int batchSize;
 
     @Override
     public void start(Map&lt;String, String&gt; props) {
         filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
         stream = openOrThrowError(filename);
         topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
+        batchSize = 
props.get(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG);
     }
 
     @Override
@@ -497,6 +498,9 @@ public List&lt;SourceRecord&gt; poll() throws 
InterruptedException {
                 Map&lt;String, Object&gt; sourcePartition = 
Collections.singletonMap("filename", filename);
                 Map&lt;String, Object&gt; sourceOffset = 
Collections.singletonMap("position", streamOffset);
                 records.add(new SourceRecord(sourcePartition, sourceOffset, 
topic, Schema.STRING_SCHEMA, line));
+                if (records.size() >= batchSize) {
+                    return records;
+                }
             } else {
                 Thread.sleep(1);
             }
@@ -513,6 +517,9 @@ public List&lt;SourceRecord&gt; poll() throws 
InterruptedException {
 
     <p>Note that this implementation uses the normal Java 
<code>InputStream</code> interface and may sleep if data is not available. This 
is acceptable because Kafka Connect provides each task with a dedicated thread. 
While task implementations have to conform to the basic <code>poll()</code> 
interface, they have a lot of flexibility in how they are implemented. In this 
case, an NIO-based implementation would be more efficient, but this simple 
approach works, is quick to implement, and i [...]
 
+    <p>Although not used in the example, <code>SourceTask</code> also provides 
two APIs to commit offsets in the source system: <code>commit</code> and 
<code>commitRecord</code>. The APIs are provided for source systems which have 
an acknowledgement mechanism for messages. Overriding these methods allows the 
source connector to acknowledge messages in the source system, either in bulk 
or individually, once they have been written to Kafka.
+        The <code>commit</code> API stores the offsets in the source system, 
up to the offsets that have been returned by <code>poll</code>. The 
implementation of this API should block until the commit is complete. The 
<code>commitRecord</code> API saves the offset in the source system for each 
<code>SourceRecord</code> after it is written to Kafka. As Kafka Connect will 
record offsets automatically, <code>SourceTask</code>s are not required to 
implement them. In cases where a connector  [...]
+
     <h5><a id="connect_sinktasks" href="#connect_sinktasks">Sink Tasks</a></h5>
 
     <p>The previous section described how to implement a simple 
<code>SourceTask</code>. Unlike <code>SourceConnector</code> and 
<code>SinkConnector</code>, <code>SourceTask</code> and <code>SinkTask</code> 
have very different interfaces because <code>SourceTask</code> uses a pull 
interface and <code>SinkTask</code> uses a push interface. Both share the 
common lifecycle methods, but the <code>SinkTask</code> interface is quite 
different:</p>
@@ -609,9 +616,11 @@ if (inputsChanged())
     <p>The following code in <code>FileStreamSourceConnector</code> defines 
the configuration and exposes it to the framework.</p>
 
     <pre class="brush: java;">
-private static final ConfigDef CONFIG_DEF = new ConfigDef()
-    .define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Source filename.")
-    .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish 
data to");
+static final ConfigDef CONFIG_DEF = new ConfigDef()
+    .define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Source filename. 
If not specified, the standard input will be used")
+    .define(TOPIC_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new 
ConfigDef.NonEmptyString(), Importance.HIGH, "The topic to publish data to")
+    .define(TASK_BATCH_SIZE_CONFIG, Type.INT, DEFAULT_TASK_BATCH_SIZE, 
Importance.LOW,
+        "The maximum number of records the source task can read from the file 
each time it is polled");
 
 public ConfigDef config() {
     return CONFIG_DEF;

Reply via email to