Repository: tajo
Updated Branches:
  refs/heads/master 72948b63a -> 633109ac7


TAJO-1494: Add SeekableScanner support to DelimitedTextFileScanner.

Closes #489


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/633109ac
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/633109ac
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/633109ac

Branch: refs/heads/master
Commit: 633109ac75bc8036e49eba8ea48c025fc0f342da
Parents: 72948b6
Author: Jinho Kim <[email protected]>
Authored: Tue Apr 7 15:32:58 2015 +0900
Committer: Jinho Kim <[email protected]>
Committed: Tue Apr 7 15:32:58 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../apache/tajo/LocalTajoTestingUtility.java    |   2 +-
 .../org/apache/tajo/storage/BufferPool.java     |   2 +-
 .../tajo/storage/ByteBufInputChannel.java       |  38 +----
 .../apache/tajo/storage/FSDataInputChannel.java |  67 ++++++++
 .../org/apache/tajo/storage/InputChannel.java   |  36 +++++
 .../tajo/storage/LocalFileInputChannel.java     |  51 ++++++
 .../apache/tajo/storage/SeekableChannel.java    |  27 ++++
 .../org/apache/tajo/storage/FileAppender.java   |   4 +
 .../tajo/storage/text/ByteBufLineReader.java    |  28 +++-
 .../tajo/storage/text/DelimitedLineReader.java  |  64 ++++++--
 .../tajo/storage/text/DelimitedTextFile.java    |  19 ++-
 .../tajo/storage/text/LineSplitProcessor.java   |   4 +
 .../thirdparty/parquet/CodecFactory.java        |   2 +-
 .../tajo/storage/TestByteBufLineReader.java     | 160 +++++++++++++++++++
 .../org/apache/tajo/storage/TestLineReader.java | 113 ++++++++++++-
 .../org/apache/tajo/storage/TestStorages.java   |   2 +-
 .../apache/tajo/storage/index/TestBSTIndex.java |   3 +-
 18 files changed, 565 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 90fa245..c2016ab 100644
--- a/CHANGES
+++ b/CHANGES
@@ -4,6 +4,9 @@ Release 0.11.0 - unreleased
 
   NEW FEATURES
 
+    TAJO-1494: Add SeekableScanner support to DelimitedTextFileScanner.
+    (jinho)
+
     TAJO-921: Add STDDEV_SAMP and STDDEV_POP window functions. (Keuntae Park)
 
     TAJO-1135: Implement queryable virtual table for cluster information.

http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java 
b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
index 801c71f..5407ff5 100644
--- a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
+++ b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
@@ -116,7 +116,7 @@ public class LocalTajoTestingUtility {
       fs.mkdirs(tablePath);
       Path dfsPath = new Path(tablePath, localPath.getName());
       fs.copyFromLocalFile(localPath, dfsPath);
-      TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV, 
option);
+      TableMeta meta = 
CatalogUtil.newTableMeta(CatalogProtos.StoreType.TEXTFILE, option);
 
       // Add fake table statistic data to tables.
       // It gives more various situations to unit tests.

http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java
index 85c79fa..e4f9072 100644
--- 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java
@@ -44,7 +44,7 @@ public class BufferPool {
   }
 
 
-  public synchronized static ByteBuf directBuffer(int size) {
+  public static ByteBuf directBuffer(int size) {
     return allocator.directBuffer(size);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java
index 45fb1d8..bdfec91 100644
--- 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java
@@ -18,8 +18,6 @@
 
 package org.apache.tajo.storage;
 
-import org.apache.hadoop.fs.ByteBufferReadable;
-import org.apache.hadoop.hdfs.DFSInputStream;
 import org.apache.hadoop.io.IOUtils;
 
 import java.io.IOException;
@@ -27,42 +25,22 @@ import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.ScatteringByteChannel;
-import java.nio.channels.spi.AbstractInterruptibleChannel;
 
-public class ByteBufInputChannel extends AbstractInterruptibleChannel 
implements ScatteringByteChannel {
-
-  ByteBufferReadable byteBufferReadable;
-  ReadableByteChannel channel;
-  InputStream inputStream;
+/**
+ * ByteBufInputChannel is a NIO channel wrapper from input stream
+ */
+public class ByteBufInputChannel extends InputChannel {
+  private ReadableByteChannel channel;
+  private InputStream inputStream;
 
   public ByteBufInputChannel(InputStream inputStream) {
-    if (inputStream instanceof DFSInputStream && inputStream instanceof 
ByteBufferReadable) {
-      this.byteBufferReadable = (ByteBufferReadable) inputStream;
-    } else {
-      this.channel = Channels.newChannel(inputStream);
-    }
-
+    this.channel = Channels.newChannel(inputStream);
     this.inputStream = inputStream;
   }
 
   @Override
-  public long read(ByteBuffer[] dsts, int offset, int length) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public long read(ByteBuffer[] dsts) {
-    return read(dsts, 0, dsts.length);
-  }
-
-  @Override
   public int read(ByteBuffer dst) throws IOException {
-    if (byteBufferReadable != null) {
-      return byteBufferReadable.read(dst);
-    } else {
-      return channel.read(dst);
-    }
+    return channel.read(dst);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FSDataInputChannel.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FSDataInputChannel.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FSDataInputChannel.java
new file mode 100644
index 0000000..ed84d24
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FSDataInputChannel.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.IOUtils;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+
+/**
+ * FSDataInputChannel is a NIO channel implementation of direct read ability 
to read from HDFS
+ */
+public final class FSDataInputChannel extends InputChannel implements 
SeekableChannel {
+
+  private ReadableByteChannel channel;
+  private FSDataInputStream inputStream;
+  private boolean isDirectRead;
+
+  public FSDataInputChannel(FSDataInputStream inputStream) {
+    if (inputStream.getWrappedStream() instanceof ByteBufferReadable) {
+      this.isDirectRead = true;
+    } else {
+      /* LocalFileSystem, S3 does not support ByteBufferReadable */
+      this.channel = Channels.newChannel(inputStream);
+    }
+    this.inputStream = inputStream;
+  }
+
+  @Override
+  public int read(ByteBuffer dst) throws IOException {
+    if (isDirectRead) {
+      return inputStream.read(dst);
+    } else {
+      return channel.read(dst);
+    }
+  }
+
+  @Override
+  public void seek(long offset) throws IOException {
+    inputStream.seek(offset);
+  }
+
+  @Override
+  protected void implCloseChannel() throws IOException {
+    IOUtils.cleanup(null, channel, inputStream);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/InputChannel.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/InputChannel.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/InputChannel.java
new file mode 100644
index 0000000..ad778a6
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/InputChannel.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.ScatteringByteChannel;
+import java.nio.channels.spi.AbstractInterruptibleChannel;
+
+public abstract class InputChannel extends AbstractInterruptibleChannel 
implements ScatteringByteChannel {
+
+  @Override
+  public long read(ByteBuffer[] dsts, int offset, int length) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long read(ByteBuffer[] dsts) {
+    return read(dsts, 0, dsts.length);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LocalFileInputChannel.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LocalFileInputChannel.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LocalFileInputChannel.java
new file mode 100644
index 0000000..bd7d668
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LocalFileInputChannel.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.io.IOUtils;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+/**
+ * LocalFileInputChannel is a FileChannel wrapper of seek ability
+ */
+public final class LocalFileInputChannel extends InputChannel implements 
SeekableChannel {
+  private FileChannel channel;
+
+  public LocalFileInputChannel(FileChannel channel) {
+    this.channel = channel;
+  }
+
+  @Override
+  public int read(ByteBuffer dst) throws IOException {
+    return channel.read(dst);
+  }
+
+  @Override
+  public void seek(long offset) throws IOException {
+    this.channel.position(offset);
+  }
+
+  @Override
+  protected void implCloseChannel() throws IOException {
+    IOUtils.cleanup(null, channel);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableChannel.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableChannel.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableChannel.java
new file mode 100644
index 0000000..e788099
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableChannel.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import java.io.IOException;
+
+public interface SeekableChannel {
+
+  public abstract void seek(long offset) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
index b208a71..3daed96 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
@@ -53,6 +53,10 @@ public abstract class FileAppender implements Appender {
 
     try {
       if (taskAttemptId != null) {
+        if (!(conf instanceof TajoConf)) {
+          throw new IllegalArgumentException("Configuration must be an 
instance of TajoConf");
+        }
+
         this.path = 
((FileStorageManager)StorageManager.getFileStorageManager((TajoConf) conf))
             .getAppenderFilePath(taskAttemptId, workDir);
       } else {

http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
index 2f742c6..e23e8f8 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
@@ -21,7 +21,8 @@ package org.apache.tajo.storage.text;
 import io.netty.buffer.ByteBuf;
 import io.netty.util.CharsetUtil;
 import org.apache.tajo.storage.BufferPool;
-import org.apache.tajo.storage.ByteBufInputChannel;
+import org.apache.tajo.storage.InputChannel;
+import org.apache.tajo.storage.SeekableChannel;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -35,19 +36,25 @@ public class ByteBufLineReader implements Closeable {
   private int startIndex;
   private boolean eof = false;
   private ByteBuf buffer;
-  private final ByteBufInputChannel channel;
+  private final InputChannel channel;
+  private final SeekableChannel seekableChannel;
   private final AtomicInteger lineReadBytes = new AtomicInteger();
   private final LineSplitProcessor processor = new LineSplitProcessor();
 
-  public ByteBufLineReader(ByteBufInputChannel channel) {
+  public ByteBufLineReader(InputChannel channel) {
     this(channel, BufferPool.directBuffer(DEFAULT_BUFFER));
   }
 
-  public ByteBufLineReader(ByteBufInputChannel channel, ByteBuf buf) {
+  public ByteBufLineReader(InputChannel channel, ByteBuf buf) {
     this.readBytes = 0;
     this.channel = channel;
     this.buffer = buf;
     this.bufferSize = buf.capacity();
+    if (channel instanceof SeekableChannel) {
+      seekableChannel = (SeekableChannel) channel;
+    } else {
+      seekableChannel = null;
+    }
   }
 
   public long readBytes() {
@@ -62,6 +69,19 @@ public class ByteBufLineReader implements Closeable {
     this.channel.close();
   }
 
+  public void seek(long offset) throws IOException {
+    if(seekableChannel != null) {
+      seekableChannel.seek(offset);
+      this.readBytes = 0;
+      this.startIndex = 0;
+      this.eof = false;
+      this.buffer.clear();
+      this.processor.reset();
+    } else {
+      throw new IllegalArgumentException("Channel is not an instance of 
SeekableChannel");
+    }
+  }
+
   public String readLine() throws IOException {
     ByteBuf buf = readLineBuf(lineReadBytes);
     if (buf != null) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
index 8b33858..f15861c 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
@@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
@@ -31,17 +32,14 @@ import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.io.compress.SplittableCompressionCodec;
 import org.apache.tajo.common.exception.NotImplementedException;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.storage.BufferPool;
-import org.apache.tajo.storage.ByteBufInputChannel;
-import org.apache.tajo.storage.FileScanner;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.compress.CodecPool;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.unit.StorageUnit;
 
-import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
+import java.io.*;
+import java.nio.channels.FileChannel;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class DelimitedLineReader implements Closeable {
@@ -73,36 +71,68 @@ public class DelimitedLineReader implements Closeable {
     this.codec = factory.getCodec(fragment.getPath());
     this.bufferSize = bufferSize;
     if (this.codec instanceof SplittableCompressionCodec) {
-      throw new NotImplementedException(); // bzip2 does not support 
multi-thread model
+      // bzip2 does not support multi-thread model
+      throw new NotImplementedException(this.getClass() + " does not support " 
+ this.codec.getDefaultExtension());
     }
   }
 
   public void init() throws IOException {
+    if (is != null) {
+      throw new IOException(this.getClass() + " was already initialized.");
+    }
+
     if (fs == null) {
       fs = FileScanner.getFileSystem((TajoConf) conf, fragment.getPath());
     }
-    if (fis == null) fis = fs.open(fragment.getPath());
+
     pos = startOffset = fragment.getStartKey();
     end = startOffset + fragment.getLength();
 
     if (codec != null) {
+      fis = fs.open(fragment.getPath());
+
       decompressor = CodecPool.getDecompressor(codec);
       is = new DataInputStream(codec.createInputStream(fis, decompressor));
-      ByteBufInputChannel channel = new ByteBufInputChannel(is);
 
       ByteBuf buf = BufferPool.directBuffer(bufferSize);
-      lineReader = new ByteBufLineReader(channel, buf);
+      lineReader = new ByteBufLineReader(new ByteBufInputChannel(is), buf);
     } else {
-      fis.seek(startOffset);
-      is = fis;
-
-      ByteBufInputChannel channel = new ByteBufInputChannel(is);
-      lineReader = new ByteBufLineReader(channel,
-          BufferPool.directBuffer((int) Math.min(bufferSize, end)));
+      if (fs instanceof LocalFileSystem) {
+        File file;
+        try {
+          if (fragment.getPath().toUri().getScheme() != null) {
+            file = new File(fragment.getPath().toUri());
+          } else {
+            file = new File(fragment.getPath().toString());
+          }
+        } catch (IllegalArgumentException iae) {
+          throw new IOException(iae);
+        }
+        FileInputStream inputStream = new FileInputStream(file);
+        FileChannel channel = inputStream.getChannel();
+        channel.position(startOffset);
+        is = inputStream;
+        lineReader = new ByteBufLineReader(new LocalFileInputChannel(channel),
+            BufferPool.directBuffer((int) Math.min(bufferSize, end)));
+      } else {
+        fis = fs.open(fragment.getPath());
+        fis.seek(startOffset);
+        is = fis;
+        lineReader = new ByteBufLineReader(new FSDataInputChannel(fis),
+            BufferPool.directBuffer((int) Math.min(bufferSize, end)));
+      }
     }
     eof = false;
   }
 
+  public void seek(long offset) throws IOException {
+    if (isCompressed()) throw new UnsupportedException();
+
+    lineReader.seek(offset);
+    pos = offset;
+    eof = false;
+  }
+
   public long getCompressedPosition() throws IOException {
     long retVal;
     if (isCompressed()) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
index 4c9234e..5e7bd94 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@ -268,7 +268,7 @@ public class DelimitedTextFile {
     }
   }
 
-  public static class DelimitedTextFileScanner extends FileScanner {
+  public static class DelimitedTextFileScanner extends FileScanner implements 
SeekableScanner {
     private boolean splittable = false;
     private final long startOffset;
 
@@ -309,6 +309,10 @@ public class DelimitedTextFile {
         reader.close();
       }
 
+      if(deserializer != null) {
+        deserializer.release();
+      }
+
       reader = new DelimitedLineReader(conf, fragment, 
conf.getInt(READ_BUFFER_SIZE, 128 * StorageUnit.KB));
       reader.init();
       recordCount = 0;
@@ -372,7 +376,7 @@ public class DelimitedTextFile {
 
         // this loop will continue until one tuple is build or EOS (end of 
stream).
         do {
-
+          long offset = reader.getUnCompressedPosition();
           ByteBuf buf = reader.readLine();
 
           // if no more line, then return EOT (end of tuple)
@@ -388,6 +392,7 @@ public class DelimitedTextFile {
           }
 
           tuple = new VTuple(schema.size());
+          tuple.setOffset(offset);
 
           try {
             deserializer.deserialize(buf, tuple);
@@ -478,5 +483,15 @@ public class DelimitedTextFile {
       }
       return tableStats;
     }
+
+    @Override
+    public long getNextOffset() throws IOException {
+      return reader.getUnCompressedPosition();
+    }
+
+    @Override
+    public void seek(long offset) throws IOException {
+        reader.seek(offset);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java
index a130527..8b840dd 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java
@@ -42,4 +42,8 @@ public class LineSplitProcessor implements ByteBufProcessor {
   public boolean isPrevCharCR() {
     return prevCharCR;
   }
+
+  public void reset() {
+    prevCharCR = false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java
index f76593e..4ba47c1 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java
@@ -34,7 +34,7 @@ import java.util.Map;
 
 class CodecFactory {
 
-  public class BytesDecompressor {
+  public static class BytesDecompressor {
 
     private final CompressionCodec codec;
     private final Decompressor decompressor;

http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestByteBufLineReader.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestByteBufLineReader.java
 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestByteBufLineReader.java
new file mode 100644
index 0000000..d127a9e
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestByteBufLineReader.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.text.ByteBufLineReader;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.nio.channels.FileChannel;
+import java.nio.charset.Charset;
+import java.util.UUID;
+
+import static org.junit.Assert.*;
+
+public class TestByteBufLineReader {
+  private TajoConf conf;
+  private static String TEST_PATH = "target/test-data/TestByteBufLineReader";
+  private Path testDir;
+  private FileSystem fs;
+  private static String LINE = "A big data warehouse system on Hadoop";
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new TajoConf();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    fs = testDir.getFileSystem(conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  @Test
+  public void testReaderWithLocalFS() throws Exception {
+    Path tablePath = new Path(testDir, "testReaderWithLocalFS");
+    Path filePath = new Path(tablePath, "data.dat");
+
+    FileSystem fileSystem = filePath.getFileSystem(conf);
+    assertTrue(fileSystem instanceof LocalFileSystem);
+
+    FSDataOutputStream out = fs.create(filePath, true);
+    out.write(LINE.getBytes(Charset.defaultCharset()));
+    out.write('\n');
+    out.close();
+
+    assertTrue(fs.exists(filePath));
+
+    FSDataInputStream inputStream = fs.open(filePath);
+    assertFalse(inputStream.getWrappedStream() instanceof ByteBufferReadable);
+
+    ByteBufLineReader lineReader = new ByteBufLineReader(new 
FSDataInputChannel(inputStream));
+    assertEquals(LINE, lineReader.readLine());
+    lineReader.seek(0);
+    assertEquals(LINE, lineReader.readLine());
+    assertNull(lineReader.readLine());
+
+    lineReader.close();
+    fs.close();
+  }
+
+  @Test
+  public void testReaderWithDFS() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
+    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+    conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true);
+
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(2).build();
+    cluster.waitClusterUp();
+
+    TajoConf tajoConf = new TajoConf(conf);
+    tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, 
cluster.getFileSystem().getUri() + "/tajo");
+
+    Path tablePath = new Path("/testReaderWithDFS");
+    Path filePath = new Path(tablePath, "data.dat");
+    try {
+      DistributedFileSystem fs = cluster.getFileSystem();
+      FSDataOutputStream out = fs.create(filePath, true);
+      out.write(LINE.getBytes(Charset.defaultCharset()));
+      out.write('\n');
+      out.close();
+
+      assertTrue(fs.exists(filePath));
+      FSDataInputStream inputStream = fs.open(filePath);
+      assertTrue(inputStream.getWrappedStream() instanceof ByteBufferReadable);
+
+      ByteBufLineReader lineReader = new ByteBufLineReader(new 
FSDataInputChannel(inputStream));
+      assertEquals(LINE, lineReader.readLine());
+      lineReader.seek(0);
+      assertEquals(LINE, lineReader.readLine());
+      assertNull(lineReader.readLine());
+
+      lineReader.close();
+      fs.close();
+    } finally {
+      cluster.shutdown(true);
+    }
+  }
+
+  @Test
+  public void testReaderWithNIO() throws Exception {
+    Path tablePath = new Path(testDir, "testReaderWithNIO");
+    Path filePath = new Path(tablePath, "data.dat");
+
+    FileSystem fileSystem = filePath.getFileSystem(conf);
+    assertTrue(fileSystem instanceof LocalFileSystem);
+
+    FSDataOutputStream out = fs.create(filePath, true);
+    out.write(LINE.getBytes(Charset.defaultCharset()));
+    out.write('\n');
+    out.close();
+
+    File file = new File(filePath.toUri());
+    assertTrue(file.exists());
+
+    FileInputStream inputStream = new FileInputStream(file);
+    FileChannel channel = inputStream.getChannel();
+
+    ByteBufLineReader lineReader = new ByteBufLineReader(new 
LocalFileInputChannel(channel));
+
+    assertEquals(LINE, lineReader.readLine());
+    lineReader.seek(0);
+    assertEquals(LINE, lineReader.readLine());
+    assertNull(lineReader.readLine());
+
+    lineReader.close();
+    channel.close();
+    inputStream.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
index 07e8dd7..f405734 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
@@ -104,7 +104,7 @@ public class TestLineReader {
   }
 
   @Test
-  public void testLineDelimitedReader() throws IOException {
+  public void testLineDelimitedReaderWithCompression() throws IOException {
     TajoConf conf = new TajoConf();
     Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
     FileSystem fs = testDir.getFileSystem(conf);
@@ -118,7 +118,7 @@ public class TestLineReader {
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE);
     meta.putOption("compression.codec", DeflateCodec.class.getCanonicalName());
 
-    Path tablePath = new Path(testDir, "line1." + 
DeflateCodec.class.getSimpleName());
+    Path tablePath = new Path(testDir, 
"testLineDelimitedReaderWithCompression." + DeflateCodec.class.getSimpleName());
     FileAppender appender = (FileAppender) 
StorageManager.getFileStorageManager(conf).getAppender(
         null, null, meta, schema, tablePath);
     appender.enableStats();
@@ -160,7 +160,55 @@ public class TestLineReader {
 
     IOUtils.cleanup(null, reader, fs);
     assertEquals(tupleNum, i);
+  }
+
+  @Test
+  public void testLineDelimitedReader() throws IOException {
+    TajoConf conf = new TajoConf();
+    Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    FileSystem fs = testDir.getFileSystem(conf);
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("age", Type.INT8);
+    schema.addColumn("comment", Type.TEXT);
+    schema.addColumn("comment2", Type.TEXT);
+
+    TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE);
+
+    Path tablePath = new Path(testDir, "testLineDelimitedReader");
+    FileAppender appender = (FileAppender) 
StorageManager.getFileStorageManager(conf).getAppender(
+        null, null, meta, schema, tablePath);
+    appender.enableStats();
+    appender.init();
+    int tupleNum = 10000;
+    VTuple vTuple;
+
+    for (int i = 0; i < tupleNum; i++) {
+      vTuple = new VTuple(4);
+      vTuple.put(0, DatumFactory.createInt4(i + 1));
+      vTuple.put(1, DatumFactory.createInt8(25l));
+      vTuple.put(2, DatumFactory.createText("emiya muljomdao"));
+      vTuple.put(3, NullDatum.get());
+      appender.addTuple(vTuple);
+    }
+    appender.close();
+
+    FileFragment fragment = new FileFragment("table", tablePath, 0, 
appender.getOffset());
+    DelimitedLineReader reader = new DelimitedLineReader(conf, fragment);
+    assertFalse(reader.isReadable());
+    reader.init();
+    assertTrue(reader.isReadable());
+
 
+    int i = 0;
+    while(reader.isReadable()){
+      ByteBuf buf = reader.readLine();
+      if(buf == null) break;
+      i++;
+    }
+    assertEquals(tupleNum, i);
+    IOUtils.cleanup(null, reader, fs);
   }
 
   @Test
@@ -217,4 +265,65 @@ public class TestLineReader {
     assertEquals(status.getLen(), totalRead);
     assertEquals(status.getLen(), reader.readBytes());
   }
+
+  @Test
+  public void testSeekableByteBufLineReader() throws IOException {
+    TajoConf conf = new TajoConf();
+    Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    FileSystem fs = testDir.getFileSystem(conf);
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("age", Type.INT8);
+    schema.addColumn("comment", Type.TEXT);
+    schema.addColumn("comment2", Type.TEXT);
+
+    TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE);
+    Path tablePath = new Path(testDir, "testSeekableByteBufLineReader.data");
+    FileAppender appender = (FileAppender) 
StorageManager.getFileStorageManager(conf).getAppender(
+        null, null, meta, schema, tablePath);
+    appender.enableStats();
+    appender.init();
+    int tupleNum = 10000;
+    VTuple vTuple;
+
+    for (int i = 0; i < tupleNum; i++) {
+      vTuple = new VTuple(4);
+      vTuple.put(0, DatumFactory.createInt4(i + 1));
+      vTuple.put(1, DatumFactory.createInt8(25l));
+      vTuple.put(2, DatumFactory.createText("emiya muljomdao"));
+      vTuple.put(3, NullDatum.get());
+      appender.addTuple(vTuple);
+    }
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+
+    AtomicInteger bytes = new AtomicInteger();
+
+    InputChannel channel = new FSDataInputChannel(fs.open(tablePath));
+    ByteBufLineReader reader = new ByteBufLineReader(channel);
+
+    //seek to end of file
+    reader.seek(status.getLen());
+    assertNull(reader.readLineBuf(bytes));
+    assertEquals(0, bytes.get());
+
+    reader.seek(0);
+    long totalRead = 0;
+    int i = 0;
+
+    for(;;){
+      ByteBuf buf = reader.readLineBuf(bytes);
+      totalRead += bytes.get();
+      if(buf == null) break;
+      i++;
+    }
+
+    IOUtils.cleanup(null, reader, channel, fs);
+
+    assertEquals(tupleNum, i);
+    assertEquals(status.getLen(), totalRead);
+    assertEquals(status.getLen(), reader.readBytes());
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
index 9577e3d..790ac4a 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -141,7 +141,7 @@ public class TestStorages {
         {StoreType.PARQUET, false, false, false},
         {StoreType.SEQUENCEFILE, true, true, false},
         {StoreType.AVRO, false, false, false},
-        {StoreType.TEXTFILE, true, true, false},
+        {StoreType.TEXTFILE, true, true, true},
         {StoreType.JSON, true, true, false},
     });
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
index 383740d..068f726 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
@@ -74,7 +74,8 @@ public class TestBSTIndex {
   public static Collection<Object[]> generateParameters() {
     return Arrays.asList(new Object[][]{
         {StoreType.CSV},
-        {StoreType.RAW}
+        {StoreType.RAW},
+        {StoreType.TEXTFILE}
     });
   }
 

Reply via email to