This is an automated email from the ASF dual-hosted git repository.

tombentley pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 2ceb8cf  KAFKA-10846: Grow buffer in FileSourceStreamTask only when 
needed (#9735)
2ceb8cf is described below

commit 2ceb8cfb3fd09ee8b8f30458c80f9dd4f460ad6f
Author: Tom Bentley <[email protected]>
AuthorDate: Fri Dec 18 04:01:38 2020 +0000

    KAFKA-10846: Grow buffer in FileSourceStreamTask only when needed (#9735)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/connect/file/FileStreamSourceTask.java   | 31 +++++++++---
 .../connect/file/FileStreamSourceTaskTest.java     | 59 ++++++++++++++++++++--
 2 files changed, 78 insertions(+), 12 deletions(-)

diff --git 
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
 
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
index 582889b..8e3fb89 100644
--- 
a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
+++ 
b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
@@ -48,13 +48,22 @@ public class FileStreamSourceTask extends SourceTask {
     private String filename;
     private InputStream stream;
     private BufferedReader reader = null;
-    private char[] buffer = new char[1024];
+    private char[] buffer;
     private int offset = 0;
     private String topic = null;
     private int batchSize = FileStreamSourceConnector.DEFAULT_TASK_BATCH_SIZE;
 
     private Long streamOffset;
 
+    public FileStreamSourceTask() {
+        this(1024);
+    }
+
+    /* visible for testing */
+    FileStreamSourceTask(int initialBufferSize) {
+        buffer = new char[initialBufferSize];
+    }
+
     @Override
     public String version() {
         return new FileStreamSourceConnector().version();
@@ -137,16 +146,12 @@ public class FileStreamSourceTask extends SourceTask {
 
                 if (nread > 0) {
                     offset += nread;
-                    if (offset == buffer.length) {
-                        char[] newbuf = new char[buffer.length * 2];
-                        System.arraycopy(buffer, 0, newbuf, 0, buffer.length);
-                        buffer = newbuf;
-                    }
-
                     String line;
+                    boolean foundOneLine = false;
                     do {
                         line = extractLine();
                         if (line != null) {
+                            foundOneLine = true;
                             log.trace("Read a line from {}", logFilename());
                             if (records == null)
                                 records = new ArrayList<>();
@@ -158,6 +163,13 @@ public class FileStreamSourceTask extends SourceTask {
                             }
                         }
                     } while (line != null);
+
+                    if (!foundOneLine && offset == buffer.length) {
+                        char[] newbuf = new char[buffer.length * 2];
+                        System.arraycopy(buffer, 0, newbuf, 0, buffer.length);
+                        log.info("Increased buffer from {} to {}", 
buffer.length, newbuf.length);
+                        buffer = newbuf;
+                    }
                 }
             }
 
@@ -231,4 +243,9 @@ public class FileStreamSourceTask extends SourceTask {
     private String logFilename() {
         return filename == null ? "stdin" : filename;
     }
+
+    /* visible for testing */
+    int bufferSize() {
+        return buffer.length;
+    }
 }
diff --git 
a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
 
b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
index feacf8f..d81c621 100644
--- 
a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
+++ 
b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
@@ -30,6 +30,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.file.Files;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -56,7 +57,7 @@ public class FileStreamSourceTaskTest extends EasyMockSupport 
{
         config.put(FileStreamSourceConnector.FILE_CONFIG, 
tempFile.getAbsolutePath());
         config.put(FileStreamSourceConnector.TOPIC_CONFIG, TOPIC);
         config.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, 
String.valueOf(FileStreamSourceConnector.DEFAULT_TASK_BATCH_SIZE));
-        task = new FileStreamSourceTask();
+        task = new FileStreamSourceTask(2);
         offsetStorageReader = createMock(OffsetStorageReader.class);
         context = createMock(SourceTaskContext.class);
         task.initialize(context);
@@ -137,21 +138,69 @@ public class FileStreamSourceTaskTest extends 
EasyMockSupport {
         task.start(config);
 
         OutputStream os = Files.newOutputStream(tempFile.toPath());
-        for (int i = 0; i < 10_000; i++) {
-            os.write("Neque porro quisquam est qui dolorem ipsum quia dolor 
sit amet, consectetur, adipisci velit...\n".getBytes());
-        }
-        os.flush();
+        writeTimesAndFlush(os, 10_000,
+                "Neque porro quisquam est qui dolorem ipsum quia dolor sit 
amet, consectetur, adipisci velit...\n".getBytes()
+        );
 
+        assertEquals(2, task.bufferSize());
         List<SourceRecord> records = task.poll();
         assertEquals(5000, records.size());
+        assertEquals(128, task.bufferSize());
 
         records = task.poll();
         assertEquals(5000, records.size());
+        assertEquals(128, task.bufferSize());
+
+        os.close();
+        task.stop();
+    }
+
+    @Test
+    public void testBufferResize() throws IOException, InterruptedException {
+        int batchSize = 1000;
+        expectOffsetLookupReturnNone();
+        replay();
+
+        config.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, 
Integer.toString(batchSize));
+        task.start(config);
+
+        OutputStream os = Files.newOutputStream(tempFile.toPath());
 
+        assertEquals(2, task.bufferSize());
+        writeAndAssertBufferSize(batchSize, os, "1\n".getBytes(), 2);
+        writeAndAssertBufferSize(batchSize, os, "3 \n".getBytes(), 4);
+        writeAndAssertBufferSize(batchSize, os, "7     \n".getBytes(), 8);
+        writeAndAssertBufferSize(batchSize, os, "8      \n".getBytes(), 8);
+        writeAndAssertBufferSize(batchSize, os, "9       \n".getBytes(), 16);
+
+        byte[] bytes = new byte[1025];
+        Arrays.fill(bytes, (byte) '*');
+        bytes[bytes.length - 1] = '\n';
+        writeAndAssertBufferSize(batchSize, os, bytes, 2048);
+        writeAndAssertBufferSize(batchSize, os, "9       \n".getBytes(), 2048);
         os.close();
         task.stop();
     }
 
+    private void writeAndAssertBufferSize(int batchSize, OutputStream os, 
byte[] bytes, int expectBufferSize)
+            throws IOException, InterruptedException {
+        writeTimesAndFlush(os, batchSize, bytes);
+        List<SourceRecord> records = task.poll();
+        assertEquals(batchSize, records.size());
+        String expectedLine = new String(bytes, 0, bytes.length - 1); // 
remove \n
+        for (SourceRecord record : records) {
+            assertEquals(expectedLine, record.value());
+        }
+        assertEquals(expectBufferSize, task.bufferSize());
+    }
+
+    private void writeTimesAndFlush(OutputStream os, int times, byte[] line) 
throws IOException {
+        for (int i = 0; i < times; i++) {
+            os.write(line);
+        }
+        os.flush();
+    }
+
     @Test
     public void testMissingFile() throws InterruptedException {
         replay();

Reply via email to