This is an automated email from the ASF dual-hosted git repository.
ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ecf0ab4 KAFKA-4335: Add batch.size to FileStreamSource connector to
prevent OOM
ecf0ab4 is described below
commit ecf0ab42c1591be2a3784b6e107567ad883e0330
Author: Study <[email protected]>
AuthorDate: Fri Jan 5 18:31:53 2018 -0800
KAFKA-4335: Add batch.size to FileStreamSource connector to prevent OOM
When the source file of `FileStreamSource` is a large file,
`FileStreamSourceTask.poll()` will result in OOM. This pull request added
`batch.size` parameter which can restrict the poll size.
*More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.*
*Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.*
Author: Study <[email protected]>
Reviewers: Ewen Cheslack-Postava <[email protected]>
Closes #4356 from phstudy/KAFKA-4335
---
.../connect/file/FileStreamSourceConnector.java | 16 ++++++++++++++-
.../kafka/connect/file/FileStreamSourceTask.java | 13 ++++++++++++
.../connect/file/FileStreamSourceTaskTest.java | 24 ++++++++++++++++++++++
3 files changed, 52 insertions(+), 1 deletion(-)
diff --git
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
index 335fe92..59006da 100644
---
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
+++
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
@@ -36,13 +36,18 @@ import java.util.Map;
public class FileStreamSourceConnector extends SourceConnector {
public static final String TOPIC_CONFIG = "topic";
public static final String FILE_CONFIG = "file";
+ public static final String TASK_BATCH_SIZE_CONFIG = "batch.size";
+
+ public static final int DEFAULT_TASK_BATCH_SIZE = 2000;
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Source
filename. If not specified, the standard input will be used")
- .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to
publish data to");
+ .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to
publish data to")
+ .define(TASK_BATCH_SIZE_CONFIG, Type.INT, Importance.LOW, "The maximum
number of records the Source task can read from file one time");
private String filename;
private String topic;
+ private int batchSize = DEFAULT_TASK_BATCH_SIZE;
@Override
public String version() {
@@ -57,6 +62,14 @@ public class FileStreamSourceConnector extends
SourceConnector {
throw new ConnectException("FileStreamSourceConnector
configuration must include 'topic' setting");
if (topic.contains(","))
throw new ConnectException("FileStreamSourceConnector should only
have a single topic when used as a source.");
+
+ if (props.containsKey(TASK_BATCH_SIZE_CONFIG)) {
+ try {
+ batchSize =
Integer.parseInt(props.get(TASK_BATCH_SIZE_CONFIG));
+ } catch (NumberFormatException e) {
+ throw new ConnectException("Invalid FileStreamSourceConnector
configuration", e);
+ }
+ }
}
@Override
@@ -72,6 +85,7 @@ public class FileStreamSourceConnector extends
SourceConnector {
if (filename != null)
config.put(FILE_CONFIG, filename);
config.put(TOPIC_CONFIG, topic);
+ config.put(TASK_BATCH_SIZE_CONFIG, String.valueOf(batchSize));
configs.add(config);
return configs;
}
diff --git
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
index 8edf385..482102f 100644
---
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
+++
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
@@ -50,6 +50,7 @@ public class FileStreamSourceTask extends SourceTask {
private char[] buffer = new char[1024];
private int offset = 0;
private String topic = null;
+ private int batchSize = FileStreamSourceConnector.DEFAULT_TASK_BATCH_SIZE;
private Long streamOffset;
@@ -70,6 +71,14 @@ public class FileStreamSourceTask extends SourceTask {
topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
if (topic == null)
throw new ConnectException("FileStreamSourceTask config missing
topic setting");
+
+ if
(props.containsKey(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG)) {
+ try {
+ batchSize =
Integer.parseInt(props.get(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG));
+ } catch (NumberFormatException e) {
+ throw new ConnectException("Invalid FileStreamSourceTask
configuration", e);
+ }
+ }
}
@Override
@@ -146,6 +155,10 @@ public class FileStreamSourceTask extends SourceTask {
records = new ArrayList<>();
records.add(new SourceRecord(offsetKey(filename),
offsetValue(streamOffset), topic, null,
null, null, VALUE_SCHEMA, line,
System.currentTimeMillis()));
+
+ if (records.size() >= batchSize) {
+ return records;
+ }
}
} while (line != null);
}
diff --git
a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
index cde6c43..3cb7128 100644
---
a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
+++
b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
@@ -127,6 +127,30 @@ public class FileStreamSourceTaskTest extends
EasyMockSupport {
task.stop();
}
+ @Test
+ public void testBatchSize() throws IOException, InterruptedException {
+ expectOffsetLookupReturnNone();
+ replay();
+
+ config.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, "5000");
+ task.start(config);
+
+ FileOutputStream os = new FileOutputStream(tempFile);
+ for (int i = 0; i < 10_000; i++) {
+ os.write("Neque porro quisquam est qui dolorem ipsum quia dolor
sit amet, consectetur, adipisci velit...\n".getBytes());
+ }
+ os.flush();
+
+ List<SourceRecord> records = task.poll();
+ assertEquals(5000, records.size());
+
+ records = task.poll();
+ assertEquals(5000, records.size());
+
+ os.close();
+ task.stop();
+ }
+
@Test(expected = ConnectException.class)
public void testMissingTopic() throws InterruptedException {
replay();
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].