Repository: tajo Updated Branches: refs/heads/master 72948b63a -> 633109ac7
TAJO-1494: Add SeekableScanner support to DelimitedTextFileScanner. Closes #489 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/633109ac Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/633109ac Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/633109ac Branch: refs/heads/master Commit: 633109ac75bc8036e49eba8ea48c025fc0f342da Parents: 72948b6 Author: Jinho Kim <[email protected]> Authored: Tue Apr 7 15:32:58 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Tue Apr 7 15:32:58 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../apache/tajo/LocalTajoTestingUtility.java | 2 +- .../org/apache/tajo/storage/BufferPool.java | 2 +- .../tajo/storage/ByteBufInputChannel.java | 38 +---- .../apache/tajo/storage/FSDataInputChannel.java | 67 ++++++++ .../org/apache/tajo/storage/InputChannel.java | 36 +++++ .../tajo/storage/LocalFileInputChannel.java | 51 ++++++ .../apache/tajo/storage/SeekableChannel.java | 27 ++++ .../org/apache/tajo/storage/FileAppender.java | 4 + .../tajo/storage/text/ByteBufLineReader.java | 28 +++- .../tajo/storage/text/DelimitedLineReader.java | 64 ++++++-- .../tajo/storage/text/DelimitedTextFile.java | 19 ++- .../tajo/storage/text/LineSplitProcessor.java | 4 + .../thirdparty/parquet/CodecFactory.java | 2 +- .../tajo/storage/TestByteBufLineReader.java | 160 +++++++++++++++++++ .../org/apache/tajo/storage/TestLineReader.java | 113 ++++++++++++- .../org/apache/tajo/storage/TestStorages.java | 2 +- .../apache/tajo/storage/index/TestBSTIndex.java | 3 +- 18 files changed, 565 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 90fa245..c2016ab 100644 --- a/CHANGES +++ b/CHANGES @@ -4,6 +4,9 @@ Release 0.11.0 - unreleased NEW FEATURES + TAJO-1494: Add SeekableScanner support to DelimitedTextFileScanner. + (jinho) + TAJO-921: Add STDDEV_SAMP and STDDEV_POP window functions. (Keuntae Park) TAJO-1135: Implement queryable virtual table for cluster information. http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java index 801c71f..5407ff5 100644 --- a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java +++ b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java @@ -116,7 +116,7 @@ public class LocalTajoTestingUtility { fs.mkdirs(tablePath); Path dfsPath = new Path(tablePath, localPath.getName()); fs.copyFromLocalFile(localPath, dfsPath); - TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV, option); + TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.TEXTFILE, option); // Add fake table statistic data to tables. // It gives more various situations to unit tests. http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java index 85c79fa..e4f9072 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java @@ -44,7 +44,7 @@ public class BufferPool { } - public synchronized static ByteBuf directBuffer(int size) { + public static ByteBuf directBuffer(int size) { return allocator.directBuffer(size); } http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java index 45fb1d8..bdfec91 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java @@ -18,8 +18,6 @@ package org.apache.tajo.storage; -import org.apache.hadoop.fs.ByteBufferReadable; -import org.apache.hadoop.hdfs.DFSInputStream; import org.apache.hadoop.io.IOUtils; import java.io.IOException; @@ -27,42 +25,22 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; -import java.nio.channels.ScatteringByteChannel; -import java.nio.channels.spi.AbstractInterruptibleChannel; -public class ByteBufInputChannel extends AbstractInterruptibleChannel implements ScatteringByteChannel { - - ByteBufferReadable byteBufferReadable; - ReadableByteChannel channel; - InputStream inputStream; +/** + * ByteBufInputChannel is a NIO channel wrapper from input stream + */ +public class ByteBufInputChannel extends InputChannel { + private ReadableByteChannel channel; + private InputStream inputStream; public ByteBufInputChannel(InputStream inputStream) { - if (inputStream instanceof DFSInputStream && inputStream instanceof ByteBufferReadable) { - this.byteBufferReadable = (ByteBufferReadable) inputStream; - } else { - this.channel = Channels.newChannel(inputStream); - } - + this.channel = Channels.newChannel(inputStream); this.inputStream = inputStream; } @Override - public long read(ByteBuffer[] dsts, int offset, int length) { - throw new UnsupportedOperationException(); - } - - @Override - public long read(ByteBuffer[] dsts) { - return read(dsts, 0, dsts.length); - } - - @Override public int read(ByteBuffer dst) throws IOException { - if (byteBufferReadable != null) { - return byteBufferReadable.read(dst); - } else { - return channel.read(dst); - } + return channel.read(dst); } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FSDataInputChannel.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FSDataInputChannel.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FSDataInputChannel.java new file mode 100644 index 0000000..ed84d24 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FSDataInputChannel.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.io.IOUtils; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; + +/** + * FSDataInputChannel is a NIO channel implementation of direct read ability to read from HDFS + */ +public final class FSDataInputChannel extends InputChannel implements SeekableChannel { + + private ReadableByteChannel channel; + private FSDataInputStream inputStream; + private boolean isDirectRead; + + public FSDataInputChannel(FSDataInputStream inputStream) { + if (inputStream.getWrappedStream() instanceof ByteBufferReadable) { + this.isDirectRead = true; + } else { + /* LocalFileSystem, S3 does not support ByteBufferReadable */ + this.channel = Channels.newChannel(inputStream); + } + this.inputStream = inputStream; + } + + @Override + public int read(ByteBuffer dst) throws IOException { + if (isDirectRead) { + return inputStream.read(dst); + } else { + return channel.read(dst); + } + } + + @Override + public void seek(long offset) throws IOException { + inputStream.seek(offset); + } + + @Override + protected void implCloseChannel() throws IOException { + IOUtils.cleanup(null, channel, inputStream); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/InputChannel.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/InputChannel.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/InputChannel.java new file mode 100644 index 0000000..ad778a6 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/InputChannel.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import java.nio.ByteBuffer; +import java.nio.channels.ScatteringByteChannel; +import java.nio.channels.spi.AbstractInterruptibleChannel; + +public abstract class InputChannel extends AbstractInterruptibleChannel implements ScatteringByteChannel { + + @Override + public long read(ByteBuffer[] dsts, int offset, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public long read(ByteBuffer[] dsts) { + return read(dsts, 0, dsts.length); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LocalFileInputChannel.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LocalFileInputChannel.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LocalFileInputChannel.java new file mode 100644 index 0000000..bd7d668 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LocalFileInputChannel.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.hadoop.io.IOUtils; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +/** + * LocalFileInputChannel is a FileChannel wrapper of seek ability + */ +public final class LocalFileInputChannel extends InputChannel implements SeekableChannel { + private FileChannel channel; + + public LocalFileInputChannel(FileChannel channel) { + this.channel = channel; + } + + @Override + public int read(ByteBuffer dst) throws IOException { + return channel.read(dst); + } + + @Override + public void seek(long offset) throws IOException { + this.channel.position(offset); + } + + @Override + protected void implCloseChannel() throws IOException { + IOUtils.cleanup(null, channel); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableChannel.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableChannel.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableChannel.java new file mode 100644 index 0000000..e788099 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableChannel.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import java.io.IOException; + +public interface SeekableChannel { + + public abstract void seek(long offset) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java index b208a71..3daed96 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java @@ -53,6 +53,10 @@ public abstract class FileAppender implements Appender { try { if (taskAttemptId != null) { + if (!(conf instanceof TajoConf)) { + throw new IllegalArgumentException("Configuration must be an instance of TajoConf"); + } + this.path = ((FileStorageManager)StorageManager.getFileStorageManager((TajoConf) conf)) .getAppenderFilePath(taskAttemptId, workDir); } else { http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java index 2f742c6..e23e8f8 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java @@ -21,7 +21,8 @@ package org.apache.tajo.storage.text; import io.netty.buffer.ByteBuf; import io.netty.util.CharsetUtil; import org.apache.tajo.storage.BufferPool; -import org.apache.tajo.storage.ByteBufInputChannel; +import org.apache.tajo.storage.InputChannel; +import org.apache.tajo.storage.SeekableChannel; import java.io.Closeable; import java.io.IOException; @@ -35,19 +36,25 @@ public class ByteBufLineReader implements Closeable { private int startIndex; private boolean eof = false; private ByteBuf buffer; - private final ByteBufInputChannel channel; + private final InputChannel channel; + private final SeekableChannel seekableChannel; private final AtomicInteger lineReadBytes = new AtomicInteger(); private final LineSplitProcessor processor = new LineSplitProcessor(); - public ByteBufLineReader(ByteBufInputChannel channel) { + public ByteBufLineReader(InputChannel channel) { this(channel, BufferPool.directBuffer(DEFAULT_BUFFER)); } - public ByteBufLineReader(ByteBufInputChannel channel, ByteBuf buf) { + public ByteBufLineReader(InputChannel channel, ByteBuf buf) { this.readBytes = 0; this.channel = channel; this.buffer = buf; this.bufferSize = buf.capacity(); + if (channel instanceof SeekableChannel) { + seekableChannel = (SeekableChannel) channel; + } else { + seekableChannel = null; + } } public long readBytes() { @@ -62,6 +69,19 @@ public class ByteBufLineReader implements Closeable { this.channel.close(); } + public void seek(long offset) throws IOException { + if(seekableChannel != null) { + seekableChannel.seek(offset); + this.readBytes = 0; + this.startIndex = 0; + this.eof = false; + this.buffer.clear(); + this.processor.reset(); + } else { + throw new IllegalArgumentException("Channel is not an instance of SeekableChannel"); + } + } + public String readLine() throws IOException { ByteBuf buf = readLineBuf(lineReadBytes); if (buf != null) { http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java index 8b33858..f15861c 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; @@ -31,17 +32,14 @@ import org.apache.hadoop.io.compress.Decompressor; import org.apache.hadoop.io.compress.SplittableCompressionCodec; import org.apache.tajo.common.exception.NotImplementedException; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.storage.BufferPool; -import org.apache.tajo.storage.ByteBufInputChannel; -import org.apache.tajo.storage.FileScanner; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.storage.*; import org.apache.tajo.storage.compress.CodecPool; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.unit.StorageUnit; -import java.io.Closeable; -import java.io.DataInputStream; -import java.io.IOException; -import java.io.InputStream; +import java.io.*; +import java.nio.channels.FileChannel; import java.util.concurrent.atomic.AtomicInteger; public class DelimitedLineReader implements Closeable { @@ -73,36 +71,68 @@ public class DelimitedLineReader implements Closeable { this.codec = factory.getCodec(fragment.getPath()); this.bufferSize = bufferSize; if (this.codec instanceof SplittableCompressionCodec) { - throw new NotImplementedException(); // bzip2 does not support multi-thread model + // bzip2 does not support multi-thread model + throw new NotImplementedException(this.getClass() + " does not support " + this.codec.getDefaultExtension()); } } public void init() throws IOException { + if (is != null) { + throw new IOException(this.getClass() + " was already initialized."); + } + if (fs == null) { fs = FileScanner.getFileSystem((TajoConf) conf, fragment.getPath()); } - if (fis == null) fis = fs.open(fragment.getPath()); + pos = startOffset = fragment.getStartKey(); end = startOffset + fragment.getLength(); if (codec != null) { + fis = fs.open(fragment.getPath()); + decompressor = CodecPool.getDecompressor(codec); is = new DataInputStream(codec.createInputStream(fis, decompressor)); - ByteBufInputChannel channel = new ByteBufInputChannel(is); ByteBuf buf = BufferPool.directBuffer(bufferSize); - lineReader = new ByteBufLineReader(channel, buf); + lineReader = new ByteBufLineReader(new ByteBufInputChannel(is), buf); } else { - fis.seek(startOffset); - is = fis; - - ByteBufInputChannel channel = new ByteBufInputChannel(is); - lineReader = new ByteBufLineReader(channel, - BufferPool.directBuffer((int) Math.min(bufferSize, end))); + if (fs instanceof LocalFileSystem) { + File file; + try { + if (fragment.getPath().toUri().getScheme() != null) { + file = new File(fragment.getPath().toUri()); + } else { + file = new File(fragment.getPath().toString()); + } + } catch (IllegalArgumentException iae) { + throw new IOException(iae); + } + FileInputStream inputStream = new FileInputStream(file); + FileChannel channel = inputStream.getChannel(); + channel.position(startOffset); + is = inputStream; + lineReader = new ByteBufLineReader(new LocalFileInputChannel(channel), + BufferPool.directBuffer((int) Math.min(bufferSize, end))); + } else { + fis = fs.open(fragment.getPath()); + fis.seek(startOffset); + is = fis; + lineReader = new ByteBufLineReader(new FSDataInputChannel(fis), + BufferPool.directBuffer((int) Math.min(bufferSize, end))); + } } eof = false; } + public void seek(long offset) throws IOException { + if (isCompressed()) throw new UnsupportedException(); + + lineReader.seek(offset); + pos = offset; + eof = false; + } + public long getCompressedPosition() throws IOException { long retVal; if (isCompressed()) { http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java index 4c9234e..5e7bd94 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java @@ -268,7 +268,7 @@ public class DelimitedTextFile { } } - public static class DelimitedTextFileScanner extends FileScanner { + public static class DelimitedTextFileScanner extends FileScanner implements SeekableScanner { private boolean splittable = false; private final long startOffset; @@ -309,6 +309,10 @@ public class DelimitedTextFile { reader.close(); } + if(deserializer != null) { + deserializer.release(); + } + reader = new DelimitedLineReader(conf, fragment, conf.getInt(READ_BUFFER_SIZE, 128 * StorageUnit.KB)); reader.init(); recordCount = 0; @@ -372,7 +376,7 @@ public class DelimitedTextFile { // this loop will continue until one tuple is build or EOS (end of stream). do { - + long offset = reader.getUnCompressedPosition(); ByteBuf buf = reader.readLine(); // if no more line, then return EOT (end of tuple) @@ -388,6 +392,7 @@ public class DelimitedTextFile { } tuple = new VTuple(schema.size()); + tuple.setOffset(offset); try { deserializer.deserialize(buf, tuple); @@ -478,5 +483,15 @@ public class DelimitedTextFile { } return tableStats; } + + @Override + public long getNextOffset() throws IOException { + return reader.getUnCompressedPosition(); + } + + @Override + public void seek(long offset) throws IOException { + reader.seek(offset); + } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java index a130527..8b840dd 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java @@ -42,4 +42,8 @@ public class LineSplitProcessor implements ByteBufProcessor { public boolean isPrevCharCR() { return prevCharCR; } + + public void reset() { + prevCharCR = false; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java index f76593e..4ba47c1 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java @@ -34,7 +34,7 @@ import java.util.Map; class CodecFactory { - public class BytesDecompressor { + public static class BytesDecompressor { private final CompressionCodec codec; private final Decompressor decompressor; http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestByteBufLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestByteBufLineReader.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestByteBufLineReader.java new file mode 100644 index 0000000..d127a9e --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestByteBufLineReader.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.tajo.storage; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.storage.text.ByteBufLineReader; +import org.apache.tajo.util.CommonTestingUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileInputStream; +import java.nio.channels.FileChannel; +import java.nio.charset.Charset; +import java.util.UUID; + +import static org.junit.Assert.*; + +public class TestByteBufLineReader { + private TajoConf conf; + private static String TEST_PATH = "target/test-data/TestByteBufLineReader"; + private Path testDir; + private FileSystem fs; + private static String LINE = "A big data warehouse system on Hadoop"; + + @Before + public void setUp() throws Exception { + conf = new TajoConf(); + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + fs = testDir.getFileSystem(conf); + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void testReaderWithLocalFS() throws Exception { + Path tablePath = new Path(testDir, "testReaderWithLocalFS"); + Path filePath = new Path(tablePath, "data.dat"); + + FileSystem fileSystem = filePath.getFileSystem(conf); + assertTrue(fileSystem instanceof LocalFileSystem); + + FSDataOutputStream out = fs.create(filePath, true); + out.write(LINE.getBytes(Charset.defaultCharset())); + out.write('\n'); + out.close(); + + assertTrue(fs.exists(filePath)); + + FSDataInputStream inputStream = fs.open(filePath); + assertFalse(inputStream.getWrappedStream() instanceof ByteBufferReadable); + + ByteBufLineReader lineReader = new ByteBufLineReader(new FSDataInputChannel(inputStream)); + assertEquals(LINE, lineReader.readLine()); + lineReader.seek(0); + assertEquals(LINE, lineReader.readLine()); + assertNull(lineReader.readLine()); + + lineReader.close(); + fs.close(); + } + + @Test + public void testReaderWithDFS() throws Exception { + final Configuration conf = new HdfsConfiguration(); + String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString(); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath); + conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0); + conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true); + + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(2).build(); + cluster.waitClusterUp(); + + TajoConf tajoConf = new TajoConf(conf); + tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo"); + + Path tablePath = new Path("/testReaderWithDFS"); + Path filePath = new Path(tablePath, "data.dat"); + try { + DistributedFileSystem fs = cluster.getFileSystem(); + FSDataOutputStream out = fs.create(filePath, true); + out.write(LINE.getBytes(Charset.defaultCharset())); + out.write('\n'); + out.close(); + + assertTrue(fs.exists(filePath)); + FSDataInputStream inputStream = fs.open(filePath); + assertTrue(inputStream.getWrappedStream() instanceof ByteBufferReadable); + + ByteBufLineReader lineReader = new ByteBufLineReader(new FSDataInputChannel(inputStream)); + assertEquals(LINE, lineReader.readLine()); + lineReader.seek(0); + assertEquals(LINE, lineReader.readLine()); + assertNull(lineReader.readLine()); + + lineReader.close(); + fs.close(); + } finally { + cluster.shutdown(true); + } + } + + @Test + public void testReaderWithNIO() throws Exception { + Path tablePath = new Path(testDir, "testReaderWithNIO"); + Path filePath = new Path(tablePath, "data.dat"); + + FileSystem fileSystem = filePath.getFileSystem(conf); + assertTrue(fileSystem instanceof LocalFileSystem); + + FSDataOutputStream out = fs.create(filePath, true); + out.write(LINE.getBytes(Charset.defaultCharset())); + out.write('\n'); + out.close(); + + File file = new File(filePath.toUri()); + assertTrue(file.exists()); + + FileInputStream inputStream = new FileInputStream(file); + FileChannel channel = inputStream.getChannel(); + + ByteBufLineReader lineReader = new ByteBufLineReader(new LocalFileInputChannel(channel)); + + assertEquals(LINE, lineReader.readLine()); + lineReader.seek(0); + assertEquals(LINE, lineReader.readLine()); + assertNull(lineReader.readLine()); + + lineReader.close(); + channel.close(); + inputStream.close(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java index 07e8dd7..f405734 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java @@ -104,7 +104,7 @@ public class TestLineReader { } @Test - public void testLineDelimitedReader() throws IOException { + public void testLineDelimitedReaderWithCompression() throws IOException { TajoConf conf = new TajoConf(); Path testDir = CommonTestingUtil.getTestDir(TEST_PATH); FileSystem fs = testDir.getFileSystem(conf); @@ -118,7 +118,7 @@ public class TestLineReader { TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE); meta.putOption("compression.codec", DeflateCodec.class.getCanonicalName()); - Path tablePath = new Path(testDir, "line1." + DeflateCodec.class.getSimpleName()); + Path tablePath = new Path(testDir, "testLineDelimitedReaderWithCompression." + DeflateCodec.class.getSimpleName()); FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender( null, null, meta, schema, tablePath); appender.enableStats(); @@ -160,7 +160,55 @@ public class TestLineReader { IOUtils.cleanup(null, reader, fs); assertEquals(tupleNum, i); + } + + @Test + public void testLineDelimitedReader() throws IOException { + TajoConf conf = new TajoConf(); + Path testDir = CommonTestingUtil.getTestDir(TEST_PATH); + FileSystem fs = testDir.getFileSystem(conf); + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT8); + schema.addColumn("comment", Type.TEXT); + schema.addColumn("comment2", Type.TEXT); + + TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE); + + Path tablePath = new Path(testDir, "testLineDelimitedReader"); + FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender( + null, null, meta, schema, tablePath); + appender.enableStats(); + appender.init(); + int tupleNum = 10000; + VTuple vTuple; + + for (int i = 0; i < tupleNum; i++) { + vTuple = new VTuple(4); + vTuple.put(0, DatumFactory.createInt4(i + 1)); + vTuple.put(1, DatumFactory.createInt8(25l)); + vTuple.put(2, DatumFactory.createText("emiya muljomdao")); + vTuple.put(3, NullDatum.get()); + appender.addTuple(vTuple); + } + appender.close(); + + FileFragment fragment = new FileFragment("table", tablePath, 0, appender.getOffset()); + DelimitedLineReader reader = new DelimitedLineReader(conf, fragment); + assertFalse(reader.isReadable()); + reader.init(); + assertTrue(reader.isReadable()); + + int i = 0; + while(reader.isReadable()){ + ByteBuf buf = reader.readLine(); + if(buf == null) break; + i++; + } + assertEquals(tupleNum, i); + IOUtils.cleanup(null, reader, fs); } @Test @@ -217,4 +265,65 @@ public class TestLineReader { assertEquals(status.getLen(), totalRead); assertEquals(status.getLen(), reader.readBytes()); } + + @Test + public void testSeekableByteBufLineReader() throws IOException { + TajoConf conf = new TajoConf(); + Path testDir = CommonTestingUtil.getTestDir(TEST_PATH); + FileSystem fs = testDir.getFileSystem(conf); + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT8); + schema.addColumn("comment", Type.TEXT); + schema.addColumn("comment2", Type.TEXT); + + TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE); + Path tablePath = new Path(testDir, "testSeekableByteBufLineReader.data"); + FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender( + null, null, meta, schema, tablePath); + appender.enableStats(); + appender.init(); + int tupleNum = 10000; + VTuple vTuple; + + for (int i = 0; i < tupleNum; i++) { + vTuple = new VTuple(4); + vTuple.put(0, DatumFactory.createInt4(i + 1)); + vTuple.put(1, DatumFactory.createInt8(25l)); + vTuple.put(2, DatumFactory.createText("emiya muljomdao")); + vTuple.put(3, NullDatum.get()); + appender.addTuple(vTuple); + } + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + + AtomicInteger bytes = new AtomicInteger(); + + InputChannel channel = new FSDataInputChannel(fs.open(tablePath)); + ByteBufLineReader reader = new ByteBufLineReader(channel); + + //seek to end of file + reader.seek(status.getLen()); + assertNull(reader.readLineBuf(bytes)); + assertEquals(0, bytes.get()); + + reader.seek(0); + long totalRead = 0; + int i = 0; + + for(;;){ + ByteBuf buf = reader.readLineBuf(bytes); + totalRead += bytes.get(); + if(buf == null) break; + i++; + } + + IOUtils.cleanup(null, reader, channel, fs); + + assertEquals(tupleNum, i); + assertEquals(status.getLen(), totalRead); + assertEquals(status.getLen(), reader.readBytes()); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java index 9577e3d..790ac4a 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java @@ -141,7 +141,7 @@ public class TestStorages { {StoreType.PARQUET, false, false, false}, {StoreType.SEQUENCEFILE, true, true, false}, {StoreType.AVRO, false, false, false}, - {StoreType.TEXTFILE, true, true, false}, + {StoreType.TEXTFILE, true, true, true}, {StoreType.JSON, true, true, false}, }); } http://git-wip-us.apache.org/repos/asf/tajo/blob/633109ac/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java index 383740d..068f726 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java @@ -74,7 +74,8 @@ public class TestBSTIndex { public static Collection<Object[]> generateParameters() { return Arrays.asList(new Object[][]{ {StoreType.CSV}, - {StoreType.RAW} + {StoreType.RAW}, + {StoreType.TEXTFILE} }); }
