Repository: tajo Updated Branches: refs/heads/master cd38dffb9 -> bf68b770e
TAJO-1210: ByteBufLineReader does not handle the end of file, if newline is not appeared. (jinho) Closes #272 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/bf68b770 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/bf68b770 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/bf68b770 Branch: refs/heads/master Commit: bf68b770e6abbb4c63d696e264a348bb1ddb5982 Parents: cd38dff Author: jhkim <[email protected]> Authored: Mon Dec 1 11:31:23 2014 +0900 Committer: jhkim <[email protected]> Committed: Mon Dec 1 11:31:23 2014 +0900 ---------------------------------------------------------------------- CHANGES | 3 ++ tajo-storage/pom.xml | 3 +- .../tajo/storage/text/ByteBufLineReader.java | 22 ++++++++++++-- .../org/apache/tajo/storage/TestLineReader.java | 32 +++++++++++++++++++- .../org/apache/tajo/storage/TestStorages.java | 2 +- .../apache/tajo/storage/avro/TestAvroUtil.java | 2 +- .../src/test/resources/dataset/testLineText.txt | 2 ++ .../resources/dataset/testVariousTypes.avsc | 20 ++++++++++++ .../src/test/resources/testVariousTypes.avsc | 20 ------------ 9 files changed, 78 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/bf68b770/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 025ae88..33983bc 100644 --- a/CHANGES +++ b/CHANGES @@ -79,6 +79,9 @@ Release 0.9.1 - unreleased BUG FIXES + TAJO-1210: ByteBufLineReader does not handle the end of file, + if newline is not appeared. (jinho) + TAJO-1119: JDBC driver should support TIMESTAMP type. (jaehwa) TAJO-1166: S3 related storage causes compilation error in Hadoop 2.6.0-SNAPSHOT. (jaehwa) http://git-wip-us.apache.org/repos/asf/tajo/blob/bf68b770/tajo-storage/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/pom.xml b/tajo-storage/pom.xml index ef26a32..7ede2e1 100644 --- a/tajo-storage/pom.xml +++ b/tajo-storage/pom.xml @@ -71,8 +71,7 @@ </executions> <configuration> <excludes> - <exclude>src/test/resources/testVariousTypes.avsc</exclude> - <exclude>src/test/resources/dataset/TestJsonSerDe/*.json</exclude> + <exclude>src/test/resources/dataset/**</exclude> </excludes> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/tajo/blob/bf68b770/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java index 1448885..86319e1 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java @@ -32,6 +32,7 @@ public class ByteBufLineReader implements Closeable { private int bufferSize; private long readBytes; + private boolean eof = false; private ByteBuf buffer; private final ByteBufInputChannel channel; private final AtomicInteger tempReadBytes = new AtomicInteger(); @@ -92,6 +93,10 @@ public class ByteBufLineReader implements Closeable { for (; ; ) { int localReadBytes = buffer.writeBytes(channel, bufferSize - readBytes); if (localReadBytes < 0) { + if (tailBytes == readBytes) { + // no more bytes are in the channel + eof = true; + } break; } readBytes += localReadBytes; @@ -101,7 +106,9 @@ public class ByteBufLineReader implements Closeable { } this.readBytes += (readBytes - tailBytes); release = false; - this.buffer.readerIndex(this.buffer.readerIndex() + tailBytes); //skip past buffer (tail) + if (!eof) { + this.buffer.readerIndex(this.buffer.readerIndex() + tailBytes); //skip past buffer (tail) + } } finally { if (release) { buffer.release(); @@ -113,6 +120,8 @@ public class ByteBufLineReader implements Closeable { * Read a line terminated by one of CR, LF, or CRLF. */ public ByteBuf readLineBuf(AtomicInteger reads) throws IOException { + if(eof) return null; + int startIndex = buffer.readerIndex(); int readBytes; int readable; @@ -127,14 +136,21 @@ public class ByteBufLineReader implements Closeable { if (!buffer.isReadable()) { return null; } else { - startIndex = 0; // reset the line start position + if (!eof) startIndex = 0; // reset the line start position + else startIndex = buffer.readerIndex(); } readable = buffer.readableBytes(); } int endIndex = buffer.forEachByte(buffer.readerIndex(), readable, processor); if (endIndex < 0) { - buffer.readerIndex(buffer.writerIndex()); + //does not appeared terminating newline + buffer.readerIndex(buffer.writerIndex()); // set to end buffer + if(eof){ + readBytes = buffer.readerIndex() - startIndex; + newlineLength = 0; + break loop; + } } else { buffer.readerIndex(endIndex + 1); readBytes = buffer.readerIndex() - startIndex; http://git-wip-us.apache.org/repos/asf/tajo/blob/bf68b770/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java index ef6efdf..4512d00 100644 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java @@ -34,11 +34,14 @@ import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.datum.NullDatum; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.text.ByteBufLineReader; -import org.apache.tajo.storage.text.DelimitedTextFile; import org.apache.tajo.storage.text.DelimitedLineReader; +import org.apache.tajo.storage.text.DelimitedTextFile; import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.FileUtil; import org.junit.Test; +import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; @@ -160,4 +163,31 @@ public class TestLineReader { assertEquals(tupleNum, i); } + + @Test + public void testByteBufLineReaderWithoutTerminating() throws IOException { + String path = FileUtil.getResourcePath("dataset/testLineText.txt").getFile(); + File file = new File(path); + String data = FileUtil.readTextFile(file); + + ByteBufInputChannel channel = new ByteBufInputChannel(new FileInputStream(file)); + + assertEquals(file.length(), channel.available()); + ByteBufLineReader reader = new ByteBufLineReader(channel); + assertEquals(file.length(), reader.available()); + + long totalRead = 0; + int i = 0; + AtomicInteger bytes = new AtomicInteger(); + for(;;){ + ByteBuf buf = reader.readLineBuf(bytes); + if(buf == null) break; + totalRead += bytes.get(); + i++; + } + IOUtils.cleanup(null, reader); + assertEquals(file.length(), totalRead); + assertEquals(file.length(), reader.readBytes()); + assertEquals(data.split("\n").length, i); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/bf68b770/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java index c581926..bd1a1f9 100644 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java @@ -320,7 +320,7 @@ public class TestStorages { TableMeta meta = CatalogUtil.newTableMeta(storeType, options); meta.setOptions(CatalogUtil.newPhysicalProperties(storeType)); if (storeType == StoreType.AVRO) { - String path = FileUtil.getResourcePath("testVariousTypes.avsc").toString(); + String path = FileUtil.getResourcePath("dataset/testVariousTypes.avsc").toString(); meta.putOption(StorageConstants.AVRO_SCHEMA_URL, path); } http://git-wip-us.apache.org/repos/asf/tajo/blob/bf68b770/tajo-storage/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java b/tajo-storage/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java index 6186e9e..a79e8ab 100644 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java @@ -49,7 +49,7 @@ public class TestAvroUtil { @Before public void setUp() throws Exception { - schemaUrl = FileUtil.getResourcePath("testVariousTypes.avsc"); + schemaUrl = FileUtil.getResourcePath("dataset/testVariousTypes.avsc"); assertNotNull(schemaUrl); File file = new File(schemaUrl.getPath()); http://git-wip-us.apache.org/repos/asf/tajo/blob/bf68b770/tajo-storage/src/test/resources/dataset/testLineText.txt ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/resources/dataset/testLineText.txt b/tajo-storage/src/test/resources/dataset/testLineText.txt new file mode 100644 index 0000000..7403c26 --- /dev/null +++ b/tajo-storage/src/test/resources/dataset/testLineText.txt @@ -0,0 +1,2 @@ +1|25|emiya muljomdao +2|25|emiya muljomdao \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/bf68b770/tajo-storage/src/test/resources/dataset/testVariousTypes.avsc ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/resources/dataset/testVariousTypes.avsc b/tajo-storage/src/test/resources/dataset/testVariousTypes.avsc new file mode 100644 index 0000000..d4250a9 --- /dev/null +++ b/tajo-storage/src/test/resources/dataset/testVariousTypes.avsc @@ -0,0 +1,20 @@ +{ + "type": "record", + "namespace": "org.apache.tajo", + "name": "testVariousTypes", + "fields": [ + { "name": "col1", "type": "boolean" }, + { "name": "col2", "type": "string" }, + { "name": "col3", "type": "int" }, + { "name": "col4", "type": "int" }, + { "name": "col5", "type": "long" }, + { "name": "col6", "type": "float" }, + { "name": "col7", "type": "double" }, + { "name": "col8", "type": "string" }, + { "name": "col9", "type": "bytes" }, + { "name": "col10", "type": "bytes" }, + { "name": "col11", "type": "null" }, + { "name": "col12", "type": "bytes" } + ] +} + http://git-wip-us.apache.org/repos/asf/tajo/blob/bf68b770/tajo-storage/src/test/resources/testVariousTypes.avsc ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/resources/testVariousTypes.avsc b/tajo-storage/src/test/resources/testVariousTypes.avsc deleted file mode 100644 index d4250a9..0000000 --- a/tajo-storage/src/test/resources/testVariousTypes.avsc +++ /dev/null @@ -1,20 +0,0 @@ -{ - "type": "record", - "namespace": "org.apache.tajo", - "name": "testVariousTypes", - "fields": [ - { "name": "col1", "type": "boolean" }, - { "name": "col2", "type": "string" }, - { "name": "col3", "type": "int" }, - { "name": "col4", "type": "int" }, - { "name": "col5", "type": "long" }, - { "name": "col6", "type": "float" }, - { "name": "col7", "type": "double" }, - { "name": "col8", "type": "string" }, - { "name": "col9", "type": "bytes" }, - { "name": "col10", "type": "bytes" }, - { "name": "col11", "type": "null" }, - { "name": "col12", "type": "bytes" } - ] -} -
