TAJO-1235: ByteBufLineReader can not read text line with CRLF. (jinho)

Closes #289


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/bebc7801
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/bebc7801
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/bebc7801

Branch: refs/heads/index_support
Commit: bebc78011798cd2fd691e8901141e94f1d445d6e
Parents: 72256fc
Author: jhkim <[email protected]>
Authored: Wed Dec 10 11:25:03 2014 +0900
Committer: jhkim <[email protected]>
Committed: Wed Dec 10 11:25:03 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 +
 .../tajo/storage/ByteBufInputChannel.java       |  4 --
 .../tajo/storage/text/ByteBufLineReader.java    | 68 ++++++++++++--------
 .../tajo/storage/text/DelimitedLineReader.java  |  7 +-
 .../org/apache/tajo/storage/TestLineReader.java | 43 ++++++++++---
 5 files changed, 81 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/bebc7801/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index b2985ba..710f463 100644
--- a/CHANGES
+++ b/CHANGES
@@ -90,6 +90,9 @@ Release 0.9.1 - unreleased
 
   BUG FIXES
 
+    TAJO-1235: ByteBufLineReader can not read text line with CRLF.
+    (jinho)
+
     TAJO-1237: Fix missing maven-module for pullserver. (jinho)
 
     TAJO-1196: Unit test hangs occasionally and randomly. (jihoon)

http://git-wip-us.apache.org/repos/asf/tajo/blob/bebc7801/tajo-storage/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java
index b1b6d65..45fb1d8 100644
--- 
a/tajo-storage/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java
+++ 
b/tajo-storage/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java
@@ -69,8 +69,4 @@ public class ByteBufInputChannel extends 
AbstractInterruptibleChannel implements
   protected void implCloseChannel() throws IOException {
     IOUtils.cleanup(null, channel, inputStream);
   }
-
-  public int available() throws IOException {
-    return inputStream.available();
-  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/bebc7801/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
 
b/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
index 86319e1..2f742c6 100644
--- 
a/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
+++ 
b/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
@@ -32,10 +32,11 @@ public class ByteBufLineReader implements Closeable {
 
   private int bufferSize;
   private long readBytes;
+  private int startIndex;
   private boolean eof = false;
   private ByteBuf buffer;
   private final ByteBufInputChannel channel;
-  private final AtomicInteger tempReadBytes = new AtomicInteger();
+  private final AtomicInteger lineReadBytes = new AtomicInteger();
   private final LineSplitProcessor processor = new LineSplitProcessor();
 
   public ByteBufLineReader(ByteBufInputChannel channel) {
@@ -53,10 +54,6 @@ public class ByteBufLineReader implements Closeable {
     return readBytes - buffer.readableBytes();
   }
 
-  public long available() throws IOException {
-    return channel.available() + buffer.readableBytes();
-  }
-
   @Override
   public void close() throws IOException {
     if (this.buffer.refCnt() > 0) {
@@ -66,7 +63,7 @@ public class ByteBufLineReader implements Closeable {
   }
 
   public String readLine() throws IOException {
-    ByteBuf buf = readLineBuf(tempReadBytes);
+    ByteBuf buf = readLineBuf(lineReadBytes);
     if (buf != null) {
       return buf.toString(CharsetUtil.UTF_8);
     }
@@ -77,24 +74,26 @@ public class ByteBufLineReader implements Closeable {
 
     int tailBytes = 0;
     if (this.readBytes > 0) {
+      //startIndex = 0, readIndex = tailBytes length, writable = (buffer 
capacity - tailBytes)
       this.buffer.markReaderIndex();
-      this.buffer.discardSomeReadBytes();  // compact the buffer
+      this.buffer.discardReadBytes();  // compact the buffer
       tailBytes = this.buffer.writerIndex();
       if (!this.buffer.isWritable()) {
         // a line bytes is large than the buffer
-        BufferPool.ensureWritable(buffer, bufferSize);
+        BufferPool.ensureWritable(buffer, bufferSize * 2);
         this.bufferSize = buffer.capacity();
       }
+      this.startIndex = 0;
     }
 
     boolean release = true;
     try {
       int readBytes = tailBytes;
       for (; ; ) {
-        int localReadBytes = buffer.writeBytes(channel, bufferSize - 
readBytes);
+        int localReadBytes = buffer.writeBytes(channel, this.bufferSize - 
readBytes);
         if (localReadBytes < 0) {
-          if (tailBytes == readBytes) {
-            // no more bytes are in the channel
+          if (buffer.isWritable()) {
+            //if read bytes is less than the buffer capacity,  there is no 
more bytes in the channel
             eof = true;
           }
           break;
@@ -106,9 +105,8 @@ public class ByteBufLineReader implements Closeable {
       }
       this.readBytes += (readBytes - tailBytes);
       release = false;
-      if (!eof) {
-        this.buffer.readerIndex(this.buffer.readerIndex() + tailBytes); //skip 
past buffer (tail)
-      }
+
+      this.buffer.readerIndex(this.buffer.readerIndex() + tailBytes); //skip 
past buffer (tail)
     } finally {
       if (release) {
         buffer.release();
@@ -120,24 +118,36 @@ public class ByteBufLineReader implements Closeable {
    * Read a line terminated by one of CR, LF, or CRLF.
    */
   public ByteBuf readLineBuf(AtomicInteger reads) throws IOException {
-    if(eof) return null;
-
-    int startIndex = buffer.readerIndex();
-    int readBytes;
+    int readBytes = 0; // newline + text line bytes
+    int newlineLength = 0; //length of terminating newline
     int readable;
-    int newlineLength; //length of terminating newline
+
+    this.startIndex = buffer.readerIndex();
 
     loop:
     while (true) {
       readable = buffer.readableBytes();
       if (readable <= 0) {
-        buffer.readerIndex(startIndex);
+        buffer.readerIndex(this.startIndex);
         fillBuffer(); //compact and fill buffer
-        if (!buffer.isReadable()) {
+
+        //if buffer.writerIndex() is zero, there is no bytes in buffer
+        if (!buffer.isReadable() && buffer.writerIndex() == 0) {
+          reads.set(0);
           return null;
         } else {
-          if (!eof) startIndex = 0; // reset the line start position
-          else startIndex = buffer.readerIndex();
+          //skip first newLine
+          if (processor.isPrevCharCR() && buffer.getByte(buffer.readerIndex()) 
== LineSplitProcessor.LF) {
+            buffer.skipBytes(1);
+            if(eof && !buffer.isReadable()) {
+              reads.set(1);
+              return null;
+            }
+
+            newlineLength++;
+            readBytes++;
+            startIndex = buffer.readerIndex();
+          }
         }
         readable = buffer.readableBytes();
       }
@@ -147,19 +157,21 @@ public class ByteBufLineReader implements Closeable {
         //does not appeared terminating newline
         buffer.readerIndex(buffer.writerIndex()); // set to end buffer
         if(eof){
-          readBytes = buffer.readerIndex() - startIndex;
-          newlineLength = 0;
+          readBytes += (buffer.readerIndex() - startIndex);
           break loop;
         }
       } else {
         buffer.readerIndex(endIndex + 1);
-        readBytes = buffer.readerIndex() - startIndex;
+        readBytes += (buffer.readerIndex() - startIndex); //past newline + 
text line
+
+        //appeared terminating CRLF
         if (processor.isPrevCharCR() && buffer.isReadable()
             && buffer.getByte(buffer.readerIndex()) == LineSplitProcessor.LF) {
           buffer.skipBytes(1);
-          newlineLength = 2;
+          readBytes++;
+          newlineLength += 2;
         } else {
-          newlineLength = 1;
+          newlineLength += 1;
         }
         break loop;
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/bebc7801/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
 
b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
index eb1929e..0efe030 100644
--- 
a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
+++ 
b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
@@ -57,7 +57,7 @@ public class DelimitedLineReader implements Closeable {
   private long startOffset, end, pos;
   private boolean eof = true;
   private ByteBufLineReader lineReader;
-  private AtomicInteger tempReadBytes = new AtomicInteger();
+  private AtomicInteger lineReadBytes = new AtomicInteger();
   private FileFragment fragment;
   private Configuration conf;
 
@@ -122,11 +122,10 @@ public class DelimitedLineReader implements Closeable {
       return null;
     }
 
-    ByteBuf buf = lineReader.readLineBuf(tempReadBytes);
+    ByteBuf buf = lineReader.readLineBuf(lineReadBytes);
+    pos += lineReadBytes.get();
     if (buf == null) {
       eof = true;
-    } else {
-      pos += tempReadBytes.get();
     }
 
     if (!isCompressed() && getCompressedPosition() > end) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/bebc7801/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java 
b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
index 4512d00..bfaba04 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.storage;
 
 import io.netty.buffer.ByteBuf;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -43,6 +44,7 @@ import org.junit.Test;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.nio.charset.Charset;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.*;
@@ -84,18 +86,15 @@ public class TestLineReader {
     FileStatus status = fs.getFileStatus(tablePath);
 
     ByteBufInputChannel channel = new ByteBufInputChannel(fs.open(tablePath));
-    assertEquals(status.getLen(), channel.available());
     ByteBufLineReader reader = new ByteBufLineReader(channel);
-    assertEquals(status.getLen(), reader.available());
 
     long totalRead = 0;
     int i = 0;
     AtomicInteger bytes = new AtomicInteger();
     for(;;){
       ByteBuf buf = reader.readLineBuf(bytes);
-      if(buf == null) break;
-
       totalRead += bytes.get();
+      if(buf == null) break;
       i++;
     }
     IOUtils.cleanup(null, reader, channel, fs);
@@ -171,18 +170,15 @@ public class TestLineReader {
     String data = FileUtil.readTextFile(file);
 
     ByteBufInputChannel channel = new ByteBufInputChannel(new 
FileInputStream(file));
-
-    assertEquals(file.length(), channel.available());
     ByteBufLineReader reader = new ByteBufLineReader(channel);
-    assertEquals(file.length(), reader.available());
 
     long totalRead = 0;
     int i = 0;
     AtomicInteger bytes = new AtomicInteger();
     for(;;){
       ByteBuf buf = reader.readLineBuf(bytes);
-      if(buf == null) break;
       totalRead += bytes.get();
+      if(buf == null) break;
       i++;
     }
     IOUtils.cleanup(null, reader);
@@ -190,4 +186,35 @@ public class TestLineReader {
     assertEquals(file.length(), reader.readBytes());
     assertEquals(data.split("\n").length, i);
   }
+
+  @Test
+  public void testCRLFLine() throws IOException {
+    TajoConf conf = new TajoConf();
+    Path testFile = new Path(CommonTestingUtil.getTestDir(TEST_PATH), 
"testCRLFLineText.txt");
+
+    FileSystem fs = testFile.getFileSystem(conf);
+    FSDataOutputStream outputStream = fs.create(testFile, true);
+    outputStream.write("0\r\n1\r\n".getBytes());
+    outputStream.flush();
+    IOUtils.closeStream(outputStream);
+
+    ByteBufInputChannel channel = new ByteBufInputChannel(fs.open(testFile));
+    ByteBufLineReader reader = new ByteBufLineReader(channel, 
BufferPool.directBuffer(2));
+    FileStatus status = fs.getFileStatus(testFile);
+
+    long totalRead = 0;
+    int i = 0;
+    AtomicInteger bytes = new AtomicInteger();
+    for(;;){
+      ByteBuf buf = reader.readLineBuf(bytes);
+      totalRead += bytes.get();
+      if(buf == null) break;
+      String row  = buf.toString(Charset.defaultCharset());
+      assertEquals(i, Integer.parseInt(row));
+      i++;
+    }
+    IOUtils.cleanup(null, reader);
+    assertEquals(status.getLen(), totalRead);
+    assertEquals(status.getLen(), reader.readBytes());
+  }
 }

Reply via email to