yashmayya commented on code in PR #12450:
URL: https://github.com/apache/kafka/pull/12450#discussion_r938000827
##########
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##########
@@ -16,40 +16,35 @@
*/
package org.apache.kafka.connect.file;
-import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
- * Very simple connector that works with the console. This connector supports
both source and
- * sink modes via its 'mode' setting.
+ * Very simple source connector that works with stdin or a file.
*/
public class FileStreamSourceConnector extends SourceConnector {
- public static final String TOPIC_CONFIG = "topic";
- public static final String FILE_CONFIG = "file";
- public static final String TASK_BATCH_SIZE_CONFIG = "batch.size";
- public static final int DEFAULT_TASK_BATCH_SIZE = 2000;
+ static final String TOPIC_CONFIG = "topic";
+ static final String FILE_CONFIG = "file";
+ static final String TASK_BATCH_SIZE_CONFIG = "batch.size";
- private static final ConfigDef CONFIG_DEF = new ConfigDef()
+ static final int DEFAULT_TASK_BATCH_SIZE = 2000;
Review Comment:
I didn't think we'd ever want to use them outside of the
`org.apache.kafka.connect.file` package (even accidentally), so reduced the
visibility. I can revert it if you want.
##########
docs/connect.html:
##########
@@ -403,12 +403,11 @@ <h4><a id="connect_developing"
href="#connect_developing">Developing a Simple Co
<h5><a id="connect_connectorexample"
href="#connect_connectorexample">Connector Example</a></h5>
- <p>We'll cover the <code>SourceConnector</code> as a simple example.
<code>SinkConnector</code> implementations are very similar. Start by creating
the class that inherits from <code>SourceConnector</code> and add a couple of
fields that will store parsed configuration information (the filename to read
from and the topic to send data to):</p>
+ <p>We'll cover the <code>SourceConnector</code> as a simple example.
<code>SinkConnector</code> implementations are very similar. Start by creating
the class that inherits from <code>SourceConnector</code> and add a field that
will store the configuration information to be propagated to the task(s) (the
topic to send data to, and optionally - the filename to read from and the
maximum batch size):</p>
<pre class="brush: java;">
public class FileStreamSourceConnector extends SourceConnector {
- private String filename;
- private String topic;</pre>
+ private Map<String, String>;</pre>
Review Comment:
Whoops, thanks
##########
docs/connect.html:
##########
@@ -423,9 +422,7 @@ <h5><a id="connect_connectorexample"
href="#connect_connectorexample">Connector
<pre class="brush: java;">
@Override
public void start(Map<String, String> props) {
- // The complete version includes error handling as well.
- filename = props.get(FILE_CONFIG);
- topic = props.get(TOPIC_CONFIG);
+ this.props = props;
Review Comment:
What about having the log line but also putting a comment (only in the docs)
that explains that any initialization logic and setting up of resources
typically goes in `start` and that none is required for the `FileStream`
connector?
And one more in `taskConfigs` that explains that the connector can generate
task configs there, and that they could also have configs additional to /
different from the connector configs if different tasks have different
responsibilities (also with the same disclaimer that this isn't required for
the `FileStream` connector)?
##########
docs/connect.html:
##########
@@ -466,23 +460,26 @@ <h5><a id="connect_taskexample"
href="#connect_taskexample">Task Example - Sourc
<pre class="brush: java;">
public class FileStreamSourceTask extends SourceTask {
- String filename;
- InputStream stream;
- String topic;
+ private String filename;
+ private InputStream stream;
+ private String topic;
+ private int batchSize;
@Override
public void start(Map<String, String> props) {
- filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
+ AbstractConfig config = new
AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, props);
Review Comment:
Sure, makes sense, reverted this.
##########
docs/connect.html:
##########
@@ -443,10 +440,7 @@ <h5><a id="connect_connectorexample"
href="#connect_connectorexample">Connector
ArrayList<Map<String, String>> configs = new
ArrayList<>();
// Only one input stream makes sense.
Map<String, String> config = new HashMap<>();
- if (filename != null)
- config.put(FILE_CONFIG, filename);
- config.put(TOPIC_CONFIG, topic);
- configs.add(config);
+ configs.add(props);
return configs;
}</pre>
Review Comment:
Makes sense, it looks quite odd and out of place right now.
##########
docs/connect.html:
##########
@@ -443,10 +440,7 @@ <h5><a id="connect_connectorexample"
href="#connect_connectorexample">Connector
ArrayList<Map<String, String>> configs = new
ArrayList<>();
// Only one input stream makes sense.
Map<String, String> config = new HashMap<>();
Review Comment:
Thanks, my bad for missing it earlier.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]