Repository: tajo Updated Branches: refs/heads/master c46dc1a64 -> aa8969ac8
TAJO-1273: Merge DirectRawFile to master branch. (jinho) Closes #661 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/aa8969ac Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/aa8969ac Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/aa8969ac Branch: refs/heads/master Commit: aa8969ac810e3341fb87b41e521f196e425ad069 Parents: c46dc1a Author: Jinho Kim <[email protected]> Authored: Wed Jul 29 17:40:31 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Wed Jul 29 17:40:31 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../java/org/apache/tajo/BuiltinStorages.java | 1 + .../apache/tajo/storage/FSDataInputChannel.java | 16 +- .../tajo/storage/LocalFileInputChannel.java | 23 +- .../apache/tajo/storage/SeekableChannel.java | 5 +- .../tajo/storage/SeekableInputChannel.java | 34 ++ .../apache/tajo/storage/TableStatistics.java | 4 + .../tajo/tuple/offheap/OffHeapRowBlock.java | 37 ++ .../apache/tajo/tuple/offheap/UnSafeTuple.java | 4 +- .../storage/rawfile/DirectRawFileScanner.java | 219 +++++++++ .../storage/rawfile/DirectRawFileWriter.java | 214 +++++++++ .../tajo/storage/text/DelimitedLineReader.java | 2 +- .../tajo/storage/TestByteBufLineReader.java | 6 +- .../tajo/storage/raw/TestDirectRawFile.java | 480 +++++++++++++++++++ 14 files changed, 1032 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/aa8969ac/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 1389b6f..91a8346 100644 --- a/CHANGES +++ b/CHANGES @@ -370,6 +370,8 @@ Release 0.11.0 - unreleased TASKS + TAJO-1273: Merge DirectRawFile to master branch. (jinho) + TAJO-1628: Add a documentation for join operation. (jihoon) TAJO-1687: sphinx-mavan-plugin version should be 1.0.3. http://git-wip-us.apache.org/repos/asf/tajo/blob/aa8969ac/tajo-common/src/main/java/org/apache/tajo/BuiltinStorages.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/BuiltinStorages.java b/tajo-common/src/main/java/org/apache/tajo/BuiltinStorages.java index 6f1b7c6..11f0287 100644 --- a/tajo-common/src/main/java/org/apache/tajo/BuiltinStorages.java +++ b/tajo-common/src/main/java/org/apache/tajo/BuiltinStorages.java @@ -22,6 +22,7 @@ public class BuiltinStorages { public static final String TEXT = "TEXT"; public static final String JSON = "JSON"; public static final String RAW = "RAW"; + public static final String DRAW = "DRAW"; public static final String RCFILE = "RCFILE"; public static final String ROW = "ROW"; public static final String PARQUET = "PARQUET"; http://git-wip-us.apache.org/repos/asf/tajo/blob/aa8969ac/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FSDataInputChannel.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FSDataInputChannel.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FSDataInputChannel.java index ed84d24..3f638c0 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FSDataInputChannel.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FSDataInputChannel.java @@ -30,13 +30,14 @@ import java.nio.channels.ReadableByteChannel; /** * FSDataInputChannel is a NIO channel implementation of direct read ability to read from HDFS */ -public final class FSDataInputChannel extends InputChannel implements SeekableChannel { +public final class FSDataInputChannel extends SeekableInputChannel { private ReadableByteChannel channel; private FSDataInputStream inputStream; private boolean isDirectRead; + private long size; - public FSDataInputChannel(FSDataInputStream inputStream) { + public FSDataInputChannel(FSDataInputStream inputStream) throws IOException { if (inputStream.getWrappedStream() instanceof ByteBufferReadable) { this.isDirectRead = true; } else { @@ -44,6 +45,7 @@ public final class FSDataInputChannel extends InputChannel implements SeekableCh this.channel = Channels.newChannel(inputStream); } this.inputStream = inputStream; + this.size = inputStream.getPos() + inputStream.available(); } @Override @@ -61,6 +63,16 @@ public final class FSDataInputChannel extends InputChannel implements SeekableCh } @Override + public long position() throws IOException { + return inputStream.getPos(); + } + + @Override + public long size() throws IOException { + return size; + } + + @Override protected void implCloseChannel() throws IOException { IOUtils.cleanup(null, channel, inputStream); } http://git-wip-us.apache.org/repos/asf/tajo/blob/aa8969ac/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LocalFileInputChannel.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LocalFileInputChannel.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LocalFileInputChannel.java index bd7d668..fbc4df0 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LocalFileInputChannel.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LocalFileInputChannel.java @@ -20,6 +20,7 @@ package org.apache.tajo.storage; import org.apache.hadoop.io.IOUtils; +import java.io.FileInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; @@ -27,11 +28,15 @@ import java.nio.channels.FileChannel; /** * LocalFileInputChannel is a FileChannel wrapper of seek ability */ -public final class LocalFileInputChannel extends InputChannel implements SeekableChannel { +public final class LocalFileInputChannel extends SeekableInputChannel { + private FileInputStream fileInputStream; private FileChannel channel; + private long size; - public LocalFileInputChannel(FileChannel channel) { - this.channel = channel; + public LocalFileInputChannel(FileInputStream fileInputStream) throws IOException { + this.fileInputStream = fileInputStream; + this.channel = fileInputStream.getChannel(); + this.size = channel.size(); } @Override @@ -45,7 +50,17 @@ public final class LocalFileInputChannel extends InputChannel implements Seekabl } @Override + public long position() throws IOException { + return channel.position(); + } + + @Override + public long size() throws IOException { + return size; + } + + @Override protected void implCloseChannel() throws IOException { - IOUtils.cleanup(null, channel); + IOUtils.cleanup(null, channel, fileInputStream); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/aa8969ac/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableChannel.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableChannel.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableChannel.java index e788099..61b39f2 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableChannel.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableChannel.java @@ -22,6 +22,9 @@ import java.io.IOException; public interface SeekableChannel { - public abstract void seek(long offset) throws IOException; + void seek(long offset) throws IOException; + long position() throws IOException; + + long size() throws IOException; } http://git-wip-us.apache.org/repos/asf/tajo/blob/aa8969ac/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableInputChannel.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableInputChannel.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableInputChannel.java new file mode 100644 index 0000000..bdbc8c0 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableInputChannel.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import java.nio.ByteBuffer; + +public abstract class SeekableInputChannel extends InputChannel implements SeekableChannel { + + @Override + public long read(ByteBuffer[] dsts, int offset, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public long read(ByteBuffer[] dsts) { + return read(dsts, 0, dsts.length); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/aa8969ac/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java index c101b0b..aa33ea3 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java @@ -69,6 +69,10 @@ public class TableStatistics { numRows++; } + public void incrementRows(long num) { + numRows += num; + } + public long getNumRows() { return this.numRows; } http://git-wip-us.apache.org/repos/asf/tajo/blob/aa8969ac/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java index 689efb7..90d4791 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.storage.SeekableInputChannel; import org.apache.tajo.util.Deallocatable; import org.apache.tajo.util.FileUtil; import org.apache.tajo.util.SizeOf; @@ -131,6 +132,42 @@ public class OffHeapRowBlock extends OffHeapMemory implements Deallocatable { this.rowNum = rowNum; } + + public boolean copyFromChannel(SeekableInputChannel channel, TableStats stats) throws IOException { + if (channel.position() < channel.size()) { + clear(); + + buffer.clear(); + channel.read(buffer); + memorySize = buffer.position(); + + while (position < memorySize) { + long recordPtr = address + position; + + if (remain() < SizeOf.SIZE_OF_INT) { + channel.seek(channel.position() - remain()); + memorySize = (int) (memorySize - remain()); + return true; + } + + int recordSize = UNSAFE.getInt(recordPtr); + + if (remain() < recordSize) { + channel.seek(channel.position() - remain()); + memorySize = (int) (memorySize - remain()); + return true; + } + + position += recordSize; + rowNum++; + } + + return true; + } else { + return false; + } + } + public boolean copyFromChannel(FileChannel channel, TableStats stats) throws IOException { if (channel.position() < channel.size()) { clear(); http://git-wip-us.apache.org/repos/asf/tajo/blob/aa8969ac/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java index 4ccba7b..fd427ca 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java @@ -100,7 +100,7 @@ public abstract class UnSafeTuple implements Tuple { } private int getFieldOffset(int fieldId) { - return UNSAFE.getInt(bb.address() + relativePos + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT)); + return UNSAFE.getInt(bb.address() + (long)(relativePos + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT))); } public long getFieldAddr(int fieldId) { @@ -278,7 +278,7 @@ public abstract class UnSafeTuple implements Tuple { public Datum getProtobufDatum(int fieldId) { byte [] bytes = getBytes(fieldId); - ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode()); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId]); Message.Builder builder = factory.newBuilder(); try { builder.mergeFrom(bytes); http://git-wip-us.apache.org/repos/asf/tajo/blob/aa8969ac/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java new file mode 100644 index 0000000..8ae9a26 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.rawfile; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.io.IOUtils; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; +import org.apache.tajo.tuple.offheap.OffHeapRowBlockReader; +import org.apache.tajo.tuple.offheap.ZeroCopyTuple; +import org.apache.tajo.unit.StorageUnit; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; + +public class DirectRawFileScanner extends FileScanner implements SeekableScanner { + private static final Log LOG = LogFactory.getLog(DirectRawFileScanner.class); + + private SeekableInputChannel channel; + private TajoDataTypes.DataType[] columnTypes; + + private boolean eof = false; + private long fileSize; + private long recordCount; + + private ZeroCopyTuple unSafeTuple = new ZeroCopyTuple(); + private OffHeapRowBlock tupleBuffer; + private OffHeapRowBlockReader reader; + + public DirectRawFileScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) throws IOException { + super(conf, schema, meta, fragment); + } + + public void init() throws IOException { + initChannel(); + + columnTypes = new TajoDataTypes.DataType[schema.size()]; + for (int i = 0; i < schema.size(); i++) { + columnTypes[i] = schema.getColumn(i).getDataType(); + } + + tupleBuffer = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); + reader = new OffHeapRowBlockReader(tupleBuffer); + + fetchNeeded = !next(tupleBuffer); + + super.init(); + } + + private void initChannel() throws IOException { + FileSystem fs = FileScanner.getFileSystem((TajoConf) conf, fragment.getPath()); + + if (fs instanceof LocalFileSystem) { + File file; + try { + if (fragment.getPath().toUri().getScheme() != null) { + file = new File(fragment.getPath().toUri()); + } else { + file = new File(fragment.getPath().toString()); + } + } catch (IllegalArgumentException iae) { + throw new IOException(iae); + } + + channel = new LocalFileInputChannel(new FileInputStream(file)); + channel.seek(fragment.getStartKey()); + fileSize = channel.size(); + } else { + channel = new FSDataInputChannel(fs.open(fragment.getPath())); + channel.seek(fragment.getStartKey()); + fileSize = channel.size(); + } + + if (tableStats != null) { + tableStats.setNumBytes(fileSize); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("RawFileScanner open:" + fragment.getPath() + ", offset :" + + fragment.getStartKey() + ", file size :" + fileSize); + } + } + + @Override + public long getNextOffset() throws IOException { + return channel.position() - reader.remainForRead(); + } + + @Override + public void seek(long offset) throws IOException { + channel.seek(offset); + fetchNeeded = true; + } + + public boolean next(OffHeapRowBlock rowblock) throws IOException { + return rowblock.copyFromChannel(channel, tableStats); + } + + private boolean fetchNeeded = true; + + @Override + public Tuple next() throws IOException { + if(eof) { + return null; + } + + while(true) { + if (fetchNeeded) { + if (!next(tupleBuffer)) { + return null; + } + reader.reset(); + } + + fetchNeeded = !reader.next(unSafeTuple); + + if (!fetchNeeded) { + recordCount++; + return unSafeTuple; + } + } + } + + @Override + public void reset() throws IOException { + // reload initial buffer + seek(0); + eof = false; + reader.reset(); + } + + @Override + public void close() throws IOException { + if (tableStats != null) { + tableStats.setReadBytes(fileSize); + tableStats.setNumRows(recordCount); + } + tupleBuffer.release(); + tupleBuffer = null; + reader = null; + + IOUtils.cleanup(LOG, channel); + } + + @Override + public boolean isProjectable() { + return false; + } + + @Override + public boolean isSelectable() { + return false; + } + + @Override + public void setFilter(EvalNode filter) { + + } + + @Override + public boolean isSplittable(){ + return false; + } + + @Override + public float getProgress() { + if(!inited) return 0.0f; + + try { + tableStats.setNumRows(recordCount); + long filePos = 0; + if (channel != null) { + filePos = channel.position(); + tableStats.setReadBytes(filePos); + } + + if(eof || channel == null) { + tableStats.setReadBytes(fileSize); + return 1.0f; + } + + if (filePos == 0) { + return 0.0f; + } else { + return Math.min(1.0f, ((float)filePos / (float)fileSize)); + } + } catch (IOException e) { + LOG.error(e.getMessage(), e); + return 0.0f; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/aa8969ac/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java new file mode 100644 index 0000000..bb81d6e --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.rawfile; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.storage.FileAppender; +import org.apache.tajo.storage.RowStoreUtil; +import org.apache.tajo.storage.TableStatistics; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.BaseTupleBuilder; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; +import org.apache.tajo.tuple.offheap.UnSafeTuple; +import org.apache.tajo.unit.StorageUnit; + +import java.io.*; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +public class DirectRawFileWriter extends FileAppender { + public static final String FILE_EXTENSION = "draw"; + private static final Log LOG = LogFactory.getLog(DirectRawFileWriter.class); + + private FileChannel channel; + private RandomAccessFile randomAccessFile; + private FSDataOutputStream fos; + private TajoDataTypes.DataType[] columnTypes; + private boolean isLocal; + private long pos; + + private TableStatistics stats; + + private BaseTupleBuilder builder; + + public DirectRawFileWriter(Configuration conf, TaskAttemptId taskAttemptId, + final Schema schema, final TableMeta meta, final Path path) throws IOException { + super(conf, taskAttemptId, schema, meta, path); + } + + @Override + public void init() throws IOException { + File file; + FileSystem fs = path.getFileSystem(conf); + + if (fs instanceof LocalFileSystem) { + try { + if (path.toUri().getScheme() != null) { + file = new File(path.toUri()); + } else { + file = new File(path.toString()); + } + } catch (IllegalArgumentException iae) { + throw new IOException(iae); + } + + randomAccessFile = new RandomAccessFile(file, "rw"); + channel = randomAccessFile.getChannel(); + isLocal = true; + } else { + fos = fs.create(path, true); + isLocal = false; + } + + pos = 0; + columnTypes = new TajoDataTypes.DataType[schema.size()]; + for (int i = 0; i < schema.size(); i++) { + columnTypes[i] = schema.getColumn(i).getDataType(); + } + + if (enabledStats) { + this.stats = new TableStatistics(this.schema); + } + + builder = new BaseTupleBuilder(schema); + + super.init(); + } + + @Override + public long getOffset() throws IOException { + return pos; + } + + private long getFilePosition() throws IOException { + if (isLocal) { + return channel.position(); + } else { + return fos.getPos(); + } + } + + public void writeRowBlock(OffHeapRowBlock rowBlock) throws IOException { + write(rowBlock.nioBuffer()); + if (enabledStats) { + stats.incrementRows(rowBlock.rows()); + } + + pos = getFilePosition(); + } + + private ByteBuffer buffer; + private void ensureSize(int size) throws IOException { + if (buffer.remaining() < size) { + + buffer.limit(buffer.position()); + buffer.flip(); + write(buffer); + + buffer.clear(); + } + } + + private void write(ByteBuffer buffer) throws IOException { + if(isLocal) { + channel.write(buffer); + } else { + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + fos.write(bytes); + } + } + + @Override + public void addTuple(Tuple t) throws IOException { + if (enabledStats) { + for (int i = 0; i < schema.size(); i++) { + stats.analyzeField(i, t); + } + } + + if (buffer == null) { + buffer = ByteBuffer.allocateDirect(64 * StorageUnit.KB); + } + + UnSafeTuple unSafeTuple; + + if (!(t instanceof UnSafeTuple)) { + RowStoreUtil.convert(t, builder); + unSafeTuple = builder.buildToZeroCopyTuple(); + } else { + unSafeTuple = (UnSafeTuple) t; + } + + ByteBuffer bb = unSafeTuple.nioBuffer(); + ensureSize(bb.limit()); + buffer.put(bb); + + pos = getFilePosition() + (buffer.limit() - buffer.remaining()); + + if (enabledStats) { + stats.incrementRow(); + } + } + + @Override + public void flush() throws IOException { + if (buffer != null) { + buffer.limit(buffer.position()); + buffer.flip(); + write(buffer); + buffer.clear(); + } + } + + @Override + public void close() throws IOException { + flush(); + if (enabledStats) { + stats.setNumBytes(getOffset()); + } + if (LOG.isDebugEnabled()) { + LOG.debug("RawFileAppender written: " + getOffset() + " bytes, path: " + path); + } + + IOUtils.cleanup(LOG, channel, randomAccessFile, fos); + } + + @Override + public TableStats getStats() { + if (enabledStats) { + stats.setNumBytes(pos); + return stats.getTableStat(); + } else { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/aa8969ac/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java index b73f96b..0443308 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java @@ -112,7 +112,7 @@ public class DelimitedLineReader implements Closeable { FileChannel channel = inputStream.getChannel(); channel.position(startOffset); is = inputStream; - lineReader = new ByteBufLineReader(new LocalFileInputChannel(channel), + lineReader = new ByteBufLineReader(new LocalFileInputChannel(inputStream), BufferPool.directBuffer((int) Math.min(bufferSize, end))); } else { fis = fs.open(fragment.getPath()); http://git-wip-us.apache.org/repos/asf/tajo/blob/aa8969ac/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestByteBufLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestByteBufLineReader.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestByteBufLineReader.java index d127a9e..b6f65df 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestByteBufLineReader.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestByteBufLineReader.java @@ -34,7 +34,6 @@ import org.junit.Test; import java.io.File; import java.io.FileInputStream; -import java.nio.channels.FileChannel; import java.nio.charset.Charset; import java.util.UUID; @@ -144,9 +143,7 @@ public class TestByteBufLineReader { assertTrue(file.exists()); FileInputStream inputStream = new FileInputStream(file); - FileChannel channel = inputStream.getChannel(); - - ByteBufLineReader lineReader = new ByteBufLineReader(new LocalFileInputChannel(channel)); + ByteBufLineReader lineReader = new ByteBufLineReader(new LocalFileInputChannel(inputStream)); assertEquals(LINE, lineReader.readLine()); lineReader.seek(0); @@ -154,7 +151,6 @@ public class TestByteBufLineReader { assertNull(lineReader.readLine()); lineReader.close(); - channel.close(); inputStream.close(); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/aa8969ac/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java new file mode 100644 index 0000000..46c0d6e --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java @@ -0,0 +1,480 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.raw; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.tajo.BuiltinStorages; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.ProtobufDatum; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.rawfile.DirectRawFileScanner; +import org.apache.tajo.storage.rawfile.DirectRawFileWriter; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; +import org.apache.tajo.tuple.offheap.RowWriter; +import org.apache.tajo.unit.StorageUnit; +import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.ProtoUtil; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.UUID; + +import static org.junit.Assert.*; + +@RunWith(Parameterized.class) +public class TestDirectRawFile { + private static final Log LOG = LogFactory.getLog(TestDirectRawFile.class); + public static String UNICODE_FIELD_PREFIX = "abc_ê°ëë¤_"; + public static Schema schema; + + private static String TEST_PATH = "target/test-data/TestDirectRawFile"; + private static MiniDFSCluster cluster; + private static FileSystem dfs; + private static FileSystem localFs; + + private TajoConf tajoConf; + private Path testDir; + + @Parameterized.Parameters + public static Collection<Object[]> generateParameters() throws IOException { + return Arrays.asList(new Object[][]{ + {false}, + {true} + }); + } + + + public TestDirectRawFile(boolean isLocal) throws IOException { + FileSystem fs; + if (isLocal) { + fs = localFs; + } else { + fs = dfs; + } + + this.tajoConf = new TajoConf(fs.getConf()); + this.testDir = getTestDir(fs, TEST_PATH); + } + + @BeforeClass + public static void setUpClass() throws IOException, InterruptedException { + final Configuration conf = new HdfsConfiguration(); + String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString(); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath); + conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0); + conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, false); + + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(new HdfsConfiguration(conf)); + builder.numDataNodes(1); + builder.format(true); + builder.manageNameDfsDirs(true); + builder.manageDataDfsDirs(true); + builder.waitSafeMode(true); + cluster = builder.build(); + + cluster.waitClusterUp(); + dfs = cluster.getFileSystem(); + localFs = FileSystem.getLocal(new TajoConf()); + } + + @AfterClass + public static void tearDownClass() throws InterruptedException { + cluster.shutdown(true); + } + + public Path getTestDir(FileSystem fs, String dir) throws IOException { + Path path = new Path(dir); + if(fs.exists(path)) + fs.delete(path, true); + + fs.mkdirs(path); + + return fs.makeQualified(path); + } + + static { + schema = new Schema(); + schema.addColumn("col0", TajoDataTypes.Type.BOOLEAN); + schema.addColumn("col1", TajoDataTypes.Type.INT2); + schema.addColumn("col2", TajoDataTypes.Type.INT4); + schema.addColumn("col3", TajoDataTypes.Type.INT8); + schema.addColumn("col4", TajoDataTypes.Type.FLOAT4); + schema.addColumn("col5", TajoDataTypes.Type.FLOAT8); + schema.addColumn("col6", TajoDataTypes.Type.TEXT); + schema.addColumn("col7", TajoDataTypes.Type.TIMESTAMP); + schema.addColumn("col8", TajoDataTypes.Type.DATE); + schema.addColumn("col9", TajoDataTypes.Type.TIME); + schema.addColumn("col10", TajoDataTypes.Type.INTERVAL); + schema.addColumn("col11", TajoDataTypes.Type.INET4); + schema.addColumn("col12", + CatalogUtil.newDataType(TajoDataTypes.Type.PROTOBUF, PrimitiveProtos.StringProto.class.getName())); + } + + public FileStatus writeRowBlock(TajoConf conf, TableMeta meta, OffHeapRowBlock rowBlock, Path outputFile) + throws IOException { + DirectRawFileWriter writer = new DirectRawFileWriter(conf, null, schema, meta, outputFile); + writer.init(); + writer.writeRowBlock(rowBlock); + writer.close(); + + FileStatus status = outputFile.getFileSystem(conf).getFileStatus(outputFile); + assertTrue(status.getLen() > 0); + LOG.info("Written file size: " + FileUtil.humanReadableByteCount(status.getLen(), false)); + return status; + } + + public FileStatus writeRowBlock(TajoConf conf, TableMeta meta, OffHeapRowBlock rowBlock) throws IOException { + Path outputDir = new Path(testDir, UUID.randomUUID() + ""); + outputDir.getFileSystem(conf).mkdirs(outputDir); + Path outputFile = new Path(outputDir, "output.draw"); + return writeRowBlock(conf, meta, rowBlock, outputFile); + } + + @Test + public void testRWForAllTypesWithNextTuple() throws IOException { + int rowNum = 10000; + + OffHeapRowBlock rowBlock = createRowBlock(rowNum); + + TableMeta meta = CatalogUtil.newTableMeta(BuiltinStorages.DRAW); + FileStatus outputFile = writeRowBlock(tajoConf, meta, rowBlock); + rowBlock.release(); + + FileFragment fragment = + new FileFragment("testRWForAllTypesWithNextTuple", outputFile.getPath(), 0, outputFile.getLen()); + DirectRawFileScanner reader = new DirectRawFileScanner(tajoConf, schema, meta, fragment); + reader.init(); + + long readStart = System.currentTimeMillis(); + int j = 0; + Tuple tuple; + while ((tuple = reader.next()) != null) { + validateTupleResult(j, tuple); + j++; + } + + LOG.info("Total read rows: " + j); + long readEnd = System.currentTimeMillis(); + LOG.info("reading takes " + (readEnd - readStart) + " msec"); + reader.close(); + assertEquals(rowNum, j); + } + + @Test + public void testRepeatedScan() throws IOException { + int rowNum = 2; + + OffHeapRowBlock rowBlock = createRowBlock(rowNum); + TableMeta meta = CatalogUtil.newTableMeta(BuiltinStorages.DRAW); + FileStatus outputFile = writeRowBlock(tajoConf, meta, rowBlock); + + rowBlock.release(); + + FileFragment fragment = + new FileFragment("testRepeatedScan", outputFile.getPath(), 0, outputFile.getLen()); + DirectRawFileScanner reader = new DirectRawFileScanner(tajoConf, schema, meta, fragment); + reader.init(); + + int j = 0; + while (reader.next() != null) { + j++; + } + assertEquals(rowNum, j); + + for (int i = 0; i < 5; i++) { + assertNull(reader.next()); + } + + reader.close(); + } + + @Test + public void testReset() throws IOException { + int rowNum = 2; + + OffHeapRowBlock rowBlock = createRowBlock(rowNum); + + TableMeta meta = CatalogUtil.newTableMeta(BuiltinStorages.DRAW); + FileStatus outputFile = writeRowBlock(tajoConf, meta, rowBlock); + rowBlock.release(); + + FileFragment fragment = + new FileFragment("testReset", outputFile.getPath(), 0, outputFile.getLen()); + DirectRawFileScanner reader = new DirectRawFileScanner(tajoConf, schema, meta, fragment); + reader.init(); + + int j = 0; + while (reader.next() != null) { + j++; + } + assertEquals(rowNum, j); + + for (int i = 0; i < 5; i++) { + assertNull(reader.next()); + } + + reader.reset(); + + j = 0; + while (reader.next() != null) { + j++; + } + assertEquals(rowNum, j); + + for (int i = 0; i < 5; i++) { + assertNull(reader.next()); + } + reader.close(); + } + + public static OffHeapRowBlock createRowBlock(int rowNum) { + long allocateStart = System.currentTimeMillis(); + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 8); + long allocatedEnd = System.currentTimeMillis(); + LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes allocated " + + (allocatedEnd - allocateStart) + " msec"); + + long writeStart = System.currentTimeMillis(); + for (int i = 0; i < rowNum; i++) { + fillRow(i, rowBlock.getWriter()); + } + long writeEnd = System.currentTimeMillis(); + LOG.info("writing takes " + (writeEnd - writeStart) + " msec"); + + return rowBlock; + } + + public static void fillRow(int i, RowWriter builder) { + builder.startRow(); + builder.putBool(i % 1 == 0 ? true : false); // 0 + builder.putInt2((short) 1); // 1 + builder.putInt4(i); // 2 + builder.putInt8(i); // 3 + builder.putFloat4(i); // 4 + builder.putFloat8(i); // 5 + builder.putText((UNICODE_FIELD_PREFIX + i).getBytes()); // 6 + builder.putTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i); // 7 + builder.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8 + builder.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9 + builder.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 10 + builder.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); // 11 + builder.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12 + builder.endRow(); + } + + public static void validateTupleResult(int j, Tuple t) { + assertTrue((j % 1 == 0) == t.getBool(0)); + assertTrue(1 == t.getInt2(1)); + assertEquals(j, t.getInt4(2)); + assertEquals(j, t.getInt8(3)); + assertTrue(j == t.getFloat4(4)); + assertTrue(j == t.getFloat8(5)); + assertEquals(new String(UNICODE_FIELD_PREFIX + j), t.getText(6)); + assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, t.getInt8(7)); + assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, t.getInt4(8)); + assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, t.getInt8(9)); + assertEquals(DatumFactory.createInterval((j + 1) + " hours"), t.getInterval(10)); + assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, t.getInt4(11)); + assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), t.getProtobufDatum(12)); + } + + public static void fillRowBlockWithNull(int i, RowWriter writer) { + writer.startRow(); + + if (i == 0) { + writer.skipField(); + } else { + writer.putBool(i % 1 == 0 ? true : false); // 0 + } + if (i % 1 == 0) { + writer.skipField(); + } else { + writer.putInt2((short) 1); // 1 + } + + if (i % 2 == 0) { + writer.skipField(); + } else { + writer.putInt4(i); // 2 + } + + if (i % 3 == 0) { + writer.skipField(); + } else { + writer.putInt8(i); // 3 + } + + if (i % 4 == 0) { + writer.skipField(); + } else { + writer.putFloat4(i); // 4 + } + + if (i % 5 == 0) { + writer.skipField(); + } else { + writer.putFloat8(i); // 5 + } + + if (i % 6 == 0) { + writer.skipField(); + } else { + writer.putText((UNICODE_FIELD_PREFIX + i).getBytes()); // 6 + } + + if (i % 7 == 0) { + writer.skipField(); + } else { + writer.putTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i); // 7 + } + + if (i % 8 == 0) { + writer.skipField(); + } else { + writer.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8 + } + + if (i % 9 == 0) { + writer.skipField(); + } else { + writer.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9 + } + + if (i % 10 == 0) { + writer.skipField(); + } else { + writer.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 10 + } + + if (i % 11 == 0) { + writer.skipField(); + } else { + writer.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); // 11 + } + + if (i % 12 == 0) { + writer.skipField(); + } else { + writer.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12 + } + + writer.endRow(); + } + + public static void validateNullity(int j, Tuple tuple) { + if (j == 0) { + tuple.isBlankOrNull(0); + } else { + assertTrue((j % 1 == 0) == tuple.getBool(0)); + } + + if (j % 1 == 0) { + tuple.isBlankOrNull(1); + } else { + assertTrue(1 == tuple.getInt2(1)); + } + + if (j % 2 == 0) { + tuple.isBlankOrNull(2); + } else { + assertEquals(j, tuple.getInt4(2)); + } + + if (j % 3 == 0) { + tuple.isBlankOrNull(3); + } else { + assertEquals(j, tuple.getInt8(3)); + } + + if (j % 4 == 0) { + tuple.isBlankOrNull(4); + } else { + assertTrue(j == tuple.getFloat4(4)); + } + + if (j % 5 == 0) { + tuple.isBlankOrNull(5); + } else { + assertTrue(j == tuple.getFloat8(5)); + } + + if (j % 6 == 0) { + tuple.isBlankOrNull(6); + } else { + assertEquals(new String(UNICODE_FIELD_PREFIX + j), tuple.getText(6)); + } + + if (j % 7 == 0) { + tuple.isBlankOrNull(7); + } else { + assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, tuple.getInt8(7)); + } + + if (j % 8 == 0) { + tuple.isBlankOrNull(8); + } else { + assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, tuple.getInt4(8)); + } + + if (j % 9 == 0) { + tuple.isBlankOrNull(9); + } else { + assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, tuple.getInt8(9)); + } + + if (j % 10 == 0) { + tuple.isBlankOrNull(10); + } else { + assertEquals(DatumFactory.createInterval((j + 1) + " hours"), tuple.getInterval(10)); + } + + if (j % 11 == 0) { + tuple.isBlankOrNull(11); + } else { + assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, tuple.getInt4(11)); + } + + if (j % 12 == 0) { + tuple.isBlankOrNull(12); + } else { + assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), tuple.getProtobufDatum(12)); + } + } +}
