TAJO-1152: RawFile ByteBuffer should be reuse. (jinho) Closes #224
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/55084a8a Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/55084a8a Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/55084a8a Branch: refs/heads/index_support Commit: 55084a8ae29a3650409c1f1dea68223d6f1c340d Parents: 0055568 Author: jhkim <[email protected]> Authored: Wed Nov 12 11:31:33 2014 +0900 Committer: jhkim <[email protected]> Committed: Wed Nov 12 11:31:33 2014 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../java/org/apache/tajo/storage/RawFile.java | 184 ++++++++++--------- .../org/apache/tajo/storage/TestStorages.java | 16 +- 3 files changed, 108 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/55084a8a/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 1337f87..7f5ec76 100644 --- a/CHANGES +++ b/CHANGES @@ -75,6 +75,8 @@ Release 0.9.1 - unreleased SUB TASKS + TAJO-1152: RawFile ByteBuffer should be reuse. (jinho) + TAJO-1149: Implement direct read of DelimitedTextFile. (jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/55084a8a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java index c8ac3a2..2fae243 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java @@ -19,7 +19,7 @@ package org.apache.tajo.storage; import com.google.protobuf.Message; - +import io.netty.buffer.ByteBuf; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -51,17 +51,20 @@ public class RawFile { private DataType[] columnTypes; private ByteBuffer buffer; - private int bufferSize; + private ByteBuf buf; private Tuple tuple; private int headerSize = 0; // Header size of a tuple private BitArray nullFlags; private static final int RECORD_SIZE = 4; - private boolean eof = false; - private long fileLimit; // If this.fragment represents a complete file, this value is equal to the file's size - private long numBytesRead; + private boolean eos = false; + private long startOffset; + private long endOffset; private FileInputStream fis; private long recordCount; + private long totalReadBytes; + private long filePosition; + private boolean forceFillBuffer; public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) throws IOException { super(conf, schema, meta, fragment); @@ -81,22 +84,16 @@ public class RawFile { fis = new FileInputStream(file); channel = fis.getChannel(); - fileLimit = fragment.getStartKey() + fragment.getEndKey(); // fileLimit is less than or equal to fileSize + filePosition = startOffset = fragment.getStartKey(); + endOffset = fragment.getStartKey() + fragment.getEndKey(); - if (tableStats != null) { - tableStats.setNumBytes(fragment.getEndKey()); - } if (LOG.isDebugEnabled()) { - LOG.debug("RawFileScanner open:" + fragment + "," + channel.position() + ", total file size :" + channel.size() - + ", fragment size :" + fragment.getEndKey() + ", fileLimit: " + fileLimit); + LOG.debug("RawFileScanner open:" + fragment + "," + channel.position() + ", file size :" + channel.size() + + ", fragment length :" + fragment.getEndKey()); } - if (fragment.getEndKey() < 64 * StorageUnit.KB) { - bufferSize = fragment.getEndKey().intValue(); - } else { - bufferSize = 64 * StorageUnit.KB; - } - buffer = ByteBuffer.allocateDirect(bufferSize); + buf = BufferPool.directBuffer(64 * StorageUnit.KB); + buffer = buf.nioBuffer(0, buf.capacity()); columnTypes = new DataType[schema.size()]; for (int i = 0; i < schema.size(); i++) { @@ -107,58 +104,53 @@ public class RawFile { nullFlags = new BitArray(schema.size()); headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength(); // The middle 2 bytes is for NullFlagSize - // initial read + // initial set position if (fragment.getStartKey() > 0) { - channel.position(fragment.getStartKey()); + channel.position(fragment.getStartKey()); } - numBytesRead = channel.read(buffer); - buffer.flip(); + forceFillBuffer = true; super.init(); } @Override public long getNextOffset() throws IOException { - return channel.position() - buffer.remaining(); + return filePosition - (forceFillBuffer ? 0 : buffer.remaining()); } @Override public void seek(long offset) throws IOException { - long currentPos = channel.position(); - if(currentPos < offset && offset < currentPos + buffer.limit()){ - buffer.position((int)(offset - currentPos)); + eos = false; + filePosition = channel.position(); + + // do not fill the buffer if the offset is already included in the buffer. + if(!forceFillBuffer && filePosition > offset && offset > filePosition - buffer.limit()){ + buffer.position((int)(offset - (filePosition - buffer.limit()))); } else { - buffer.clear(); + if(offset < startOffset || offset > startOffset + fragment.getEndKey()){ + throw new IndexOutOfBoundsException(String.format("range(%d, %d), offset: %d", + startOffset, startOffset + fragment.getEndKey(), offset)); + } channel.position(offset); - int bytesRead = channel.read(buffer); - numBytesRead = bytesRead; - buffer.flip(); - eof = false; + filePosition = offset; + buffer.clear(); + forceFillBuffer = true; + fillBuffer(); } } private boolean fillBuffer() throws IOException { - if (numBytesRead >= fragment.getEndKey()) { - eof = true; - return false; - } - int currentDataSize = buffer.remaining(); - buffer.compact(); + if(!forceFillBuffer) buffer.compact(); + int bytesRead = channel.read(buffer); + forceFillBuffer = false; if (bytesRead == -1) { - eof = true; + eos = true; return false; } else { - buffer.flip(); - long realRemaining = fragment.getEndKey() - numBytesRead; - numBytesRead += bytesRead; - if (realRemaining < bufferSize) { - int newLimit = currentDataSize + (int) realRemaining; - if(newLimit > bufferSize) { - newLimit = bufferSize; - } - buffer.limit(newLimit); - } + buffer.flip(); //The limit is set to the current filePosition and then the filePosition is set to zero + filePosition += bytesRead; + totalReadBytes += bytesRead; return true; } } @@ -247,9 +239,9 @@ public class RawFile { @Override public Tuple next() throws IOException { - if(eof) return null; + if(eos) return null; - if (buffer.remaining() < headerSize) { + if (forceFillBuffer || buffer.remaining() < headerSize) { if (!fillBuffer()) { return null; } @@ -264,15 +256,16 @@ public class RawFile { nullFlags.fromByteBuffer(buffer); // restore the start of record contents buffer.limit(bufferLimit); - //buffer.position(recordOffset + headerSize); if (buffer.remaining() < (recordSize - headerSize)) { + + //if the buffer reaches the writable size, the buffer increase the record size + reSizeBuffer(recordSize); + if (!fillBuffer()) { return null; } } - recordCount++; - for (int i = 0; i < columnTypes.length; i++) { // check if the i'th column is null if (nullFlags.get(i)) { @@ -320,7 +313,7 @@ public class RawFile { int len = readRawVarint32(); byte [] strBytes = new byte[len]; buffer.get(strBytes); - tuple.put(i, DatumFactory.createText(new String(strBytes))); + tuple.put(i, DatumFactory.createText(strBytes)); break; } @@ -377,31 +370,45 @@ public class RawFile { } } - if(!buffer.hasRemaining() && channel.position() == fileLimit){ - eof = true; + recordCount++; + + if(filePosition - buffer.remaining() >= endOffset){ + eos = true; } return new VTuple(tuple); } + private void reSizeBuffer(int writableBytes){ + if (buffer.capacity() - buffer.remaining() < writableBytes) { + buf.setIndex(buffer.position(), buffer.limit()); + buf.markReaderIndex(); + buf.discardSomeReadBytes(); + buf.ensureWritable(writableBytes); + buffer = buf.nioBuffer(0, buf.capacity()); + buffer.limit(buf.writerIndex()); + } + } + @Override public void reset() throws IOException { - // clear the buffer + // reset the buffer buffer.clear(); - // reload initial buffer - channel.position(fragment.getStartKey()); - numBytesRead = channel.read(buffer); - buffer.flip(); - eof = false; + forceFillBuffer = true; + filePosition = fragment.getStartKey(); + channel.position(filePosition); + eos = false; } @Override public void close() throws IOException { - if (tableStats != null) { - tableStats.setReadBytes(fragment.getEndKey()); - tableStats.setNumRows(recordCount); + if(buf != null){ + buffer.clear(); + buffer = null; + + buf.release(); + buf = null; } - StorageUtil.closeBuffer(buffer); IOUtils.cleanup(LOG, channel, fis); } @@ -421,28 +428,25 @@ public class RawFile { } @Override - public float getProgress() { - try { + public TableStats getInputStats() { + if(tableStats != null){ tableStats.setNumRows(recordCount); - long filePos = 0; - if (channel != null) { - filePos = channel.position(); - tableStats.setReadBytes(filePos); - } + tableStats.setReadBytes(totalReadBytes); // actual read bytes (scan + rescan * n) + tableStats.setNumBytes(fragment.getEndKey()); + } + return tableStats; + } - if(eof || channel == null) { - tableStats.setReadBytes(fragment.getEndKey()); - return 1.0f; - } + @Override + public float getProgress() { + if(eos) { + return 1.0f; + } - if (filePos == 0) { - return 0.0f; - } else { - return Math.min(1.0f, ((float)filePos / fragment.getEndKey().floatValue())); - } - } catch (IOException e) { - LOG.error(e.getMessage(), e); + if (filePosition - startOffset == 0) { return 0.0f; + } else { + return Math.min(1.0f, ((float) filePosition / endOffset)); } } } @@ -453,6 +457,7 @@ public class RawFile { private DataType[] columnTypes; private ByteBuffer buffer; + private ByteBuf buf; private BitArray nullFlags; private int headerSize = 0; private static final int RECORD_SIZE = 4; @@ -485,7 +490,8 @@ public class RawFile { columnTypes[i] = schema.getColumn(i).getDataType(); } - buffer = ByteBuffer.allocateDirect(64 * 1024); + buf = BufferPool.directBuffer(64 * StorageUnit.KB); + buffer = buf.nioBuffer(0, buf.capacity()); // comput the number of bytes, representing the null flags @@ -505,7 +511,6 @@ public class RawFile { } private void flushBuffer() throws IOException { - buffer.limit(buffer.position()); buffer.flip(); channel.write(buffer); buffer.clear(); @@ -743,7 +748,14 @@ public class RawFile { LOG.debug("RawFileAppender written: " + getOffset() + " bytes, path: " + path); } - StorageUtil.closeBuffer(buffer); + if(buf != null){ + buffer.clear(); + buffer = null; + + buf.release(); + buf = null; + } + IOUtils.cleanup(LOG, channel, randomAccessFile); } http://git-wip-us.apache.org/repos/asf/tajo/blob/55084a8a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java index 56cef77..a3f80cf 100644 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java @@ -123,7 +123,7 @@ public class TestStorages { return Arrays.asList(new Object[][] { //type, splitable, statsable, seekable {StoreType.CSV, true, true, true}, - {StoreType.RAW, false, false, true}, + {StoreType.RAW, false, true, true}, {StoreType.RCFILE, true, true, false}, {StoreType.PARQUET, false, false, false}, {StoreType.SEQUENCEFILE, true, true, false}, @@ -792,7 +792,7 @@ public class TestStorages { TableMeta meta = CatalogUtil.newTableMeta(storeType); Path tablePath = new Path(testDir, "Seekable.data"); FileAppender appender = (FileAppender) StorageManager.getStorageManager(conf).getAppender(meta, schema, - tablePath); + tablePath); appender.enableStats(); appender.init(); int tupleNum = 100000; @@ -804,12 +804,12 @@ public class TestStorages { vTuple = new VTuple(3); vTuple.put(0, DatumFactory.createInt4(i + 1)); vTuple.put(1, DatumFactory.createInt8(25l)); - vTuple.put(2, DatumFactory.createText("test")); + vTuple.put(2, DatumFactory.createText("test" + i)); appender.addTuple(vTuple); // find a seek position if (i % (tupleNum / 3) == 0) { - offsets.add(appender.getOffset()); + offsets.add(appender.getOffset()); } } @@ -834,17 +834,17 @@ public class TestStorages { long readRows = 0; for (long offset : offsets) { scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, - new FileFragment("table", tablePath, prevOffset, offset - prevOffset), schema); + new FileFragment("table", tablePath, prevOffset, offset - prevOffset), schema); scanner.init(); while (scanner.next() != null) { - tupleCnt++; + tupleCnt++; } scanner.close(); if (statsable) { - readBytes += scanner.getInputStats().getNumBytes(); - readRows += scanner.getInputStats().getNumRows(); + readBytes += scanner.getInputStats().getNumBytes(); + readRows += scanner.getInputStats().getNumRows(); } prevOffset = offset; }
