This is an automated email from the ASF dual-hosted git repository.
tombentley pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new 2ceb8cf KAFKA-10846: Grow buffer in FileSourceStreamTask only when
needed (#9735)
2ceb8cf is described below
commit 2ceb8cfb3fd09ee8b8f30458c80f9dd4f460ad6f
Author: Tom Bentley <[email protected]>
AuthorDate: Fri Dec 18 04:01:38 2020 +0000
KAFKA-10846: Grow buffer in FileSourceStreamTask only when needed (#9735)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/connect/file/FileStreamSourceTask.java | 31 +++++++++---
.../connect/file/FileStreamSourceTaskTest.java | 59 ++++++++++++++++++++--
2 files changed, 78 insertions(+), 12 deletions(-)
diff --git
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
index 582889b..8e3fb89 100644
---
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
+++
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
@@ -48,13 +48,22 @@ public class FileStreamSourceTask extends SourceTask {
private String filename;
private InputStream stream;
private BufferedReader reader = null;
- private char[] buffer = new char[1024];
+ private char[] buffer;
private int offset = 0;
private String topic = null;
private int batchSize = FileStreamSourceConnector.DEFAULT_TASK_BATCH_SIZE;
private Long streamOffset;
+ public FileStreamSourceTask() {
+ this(1024);
+ }
+
+ /* visible for testing */
+ FileStreamSourceTask(int initialBufferSize) {
+ buffer = new char[initialBufferSize];
+ }
+
@Override
public String version() {
return new FileStreamSourceConnector().version();
@@ -137,16 +146,12 @@ public class FileStreamSourceTask extends SourceTask {
if (nread > 0) {
offset += nread;
- if (offset == buffer.length) {
- char[] newbuf = new char[buffer.length * 2];
- System.arraycopy(buffer, 0, newbuf, 0, buffer.length);
- buffer = newbuf;
- }
-
String line;
+ boolean foundOneLine = false;
do {
line = extractLine();
if (line != null) {
+ foundOneLine = true;
log.trace("Read a line from {}", logFilename());
if (records == null)
records = new ArrayList<>();
@@ -158,6 +163,13 @@ public class FileStreamSourceTask extends SourceTask {
}
}
} while (line != null);
+
+ if (!foundOneLine && offset == buffer.length) {
+ char[] newbuf = new char[buffer.length * 2];
+ System.arraycopy(buffer, 0, newbuf, 0, buffer.length);
+ log.info("Increased buffer from {} to {}",
buffer.length, newbuf.length);
+ buffer = newbuf;
+ }
}
}
@@ -231,4 +243,9 @@ public class FileStreamSourceTask extends SourceTask {
private String logFilename() {
return filename == null ? "stdin" : filename;
}
+
+ /* visible for testing */
+ int bufferSize() {
+ return buffer.length;
+ }
}
diff --git
a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
index feacf8f..d81c621 100644
---
a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
+++
b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
@@ -30,6 +30,7 @@ import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -56,7 +57,7 @@ public class FileStreamSourceTaskTest extends EasyMockSupport
{
config.put(FileStreamSourceConnector.FILE_CONFIG,
tempFile.getAbsolutePath());
config.put(FileStreamSourceConnector.TOPIC_CONFIG, TOPIC);
config.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG,
String.valueOf(FileStreamSourceConnector.DEFAULT_TASK_BATCH_SIZE));
- task = new FileStreamSourceTask();
+ task = new FileStreamSourceTask(2);
offsetStorageReader = createMock(OffsetStorageReader.class);
context = createMock(SourceTaskContext.class);
task.initialize(context);
@@ -137,21 +138,69 @@ public class FileStreamSourceTaskTest extends
EasyMockSupport {
task.start(config);
OutputStream os = Files.newOutputStream(tempFile.toPath());
- for (int i = 0; i < 10_000; i++) {
- os.write("Neque porro quisquam est qui dolorem ipsum quia dolor
sit amet, consectetur, adipisci velit...\n".getBytes());
- }
- os.flush();
+ writeTimesAndFlush(os, 10_000,
+ "Neque porro quisquam est qui dolorem ipsum quia dolor sit
amet, consectetur, adipisci velit...\n".getBytes()
+ );
+ assertEquals(2, task.bufferSize());
List<SourceRecord> records = task.poll();
assertEquals(5000, records.size());
+ assertEquals(128, task.bufferSize());
records = task.poll();
assertEquals(5000, records.size());
+ assertEquals(128, task.bufferSize());
+
+ os.close();
+ task.stop();
+ }
+
+ @Test
+ public void testBufferResize() throws IOException, InterruptedException {
+ int batchSize = 1000;
+ expectOffsetLookupReturnNone();
+ replay();
+
+ config.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG,
Integer.toString(batchSize));
+ task.start(config);
+
+ OutputStream os = Files.newOutputStream(tempFile.toPath());
+ assertEquals(2, task.bufferSize());
+ writeAndAssertBufferSize(batchSize, os, "1\n".getBytes(), 2);
+ writeAndAssertBufferSize(batchSize, os, "3 \n".getBytes(), 4);
+ writeAndAssertBufferSize(batchSize, os, "7 \n".getBytes(), 8);
+ writeAndAssertBufferSize(batchSize, os, "8 \n".getBytes(), 8);
+ writeAndAssertBufferSize(batchSize, os, "9 \n".getBytes(), 16);
+
+ byte[] bytes = new byte[1025];
+ Arrays.fill(bytes, (byte) '*');
+ bytes[bytes.length - 1] = '\n';
+ writeAndAssertBufferSize(batchSize, os, bytes, 2048);
+ writeAndAssertBufferSize(batchSize, os, "9 \n".getBytes(), 2048);
os.close();
task.stop();
}
+ private void writeAndAssertBufferSize(int batchSize, OutputStream os,
byte[] bytes, int expectBufferSize)
+ throws IOException, InterruptedException {
+ writeTimesAndFlush(os, batchSize, bytes);
+ List<SourceRecord> records = task.poll();
+ assertEquals(batchSize, records.size());
+ String expectedLine = new String(bytes, 0, bytes.length - 1); //
remove \n
+ for (SourceRecord record : records) {
+ assertEquals(expectedLine, record.value());
+ }
+ assertEquals(expectBufferSize, task.bufferSize());
+ }
+
+ private void writeTimesAndFlush(OutputStream os, int times, byte[] line)
throws IOException {
+ for (int i = 0; i < times; i++) {
+ os.write(line);
+ }
+ os.flush();
+ }
+
@Test
public void testMissingFile() throws InterruptedException {
replay();