Repository: tajo Updated Branches: refs/heads/master a4c348423 -> cf66a3900
TAJO-1250: RawFileAppender occasionally causes BufferOverflowException. (jinho) Closes #303 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/cf66a390 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/cf66a390 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/cf66a390 Branch: refs/heads/master Commit: cf66a390060c79ba757097886703e30f93d31401 Parents: a4c3484 Author: jhkim <[email protected]> Authored: Mon Dec 22 16:28:41 2014 +0900 Committer: jhkim <[email protected]> Committed: Mon Dec 22 16:28:41 2014 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../org/apache/tajo/storage/StorageManager.java | 13 ++++ .../src/main/resources/storage-default.xml | 22 ++++++ .../src/test/resources/storage-default.xml | 22 ++++++ .../java/org/apache/tajo/storage/RawFile.java | 20 +++-- .../tajo/storage/text/DelimitedLineReader.java | 14 +++- .../tajo/storage/text/DelimitedTextFile.java | 16 ++-- .../org/apache/tajo/storage/TestStorages.java | 77 ++++++++++++++++++++ .../src/test/resources/storage-default.xml | 22 ++++++ 9 files changed, 195 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/cf66a390/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 36cff8a..245918e 100644 --- a/CHANGES +++ b/CHANGES @@ -118,6 +118,9 @@ Release 0.9.1 - unreleased BUG FIXES + TAJO-1250: RawFileAppender occasionally causes BufferOverflowException. + (jinho) + TAJO-1259: A title in catalog configuration document is different from others. (Jongyoung Park via hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/cf66a390/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java index 07a51ba..609ca20 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java @@ -18,6 +18,7 @@ package org.apache.tajo.storage; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -173,6 +174,18 @@ public abstract class StorageManager { */ public abstract void closeStorageManager(); + + /** + * Clear all class cache + */ + @VisibleForTesting + protected synchronized static void clearCache() { + CONSTRUCTOR_CACHE.clear(); + SCANNER_HANDLER_CACHE.clear(); + APPENDER_HANDLER_CACHE.clear(); + storageManagers.clear(); + } + /** * It is called by a Repartitioner for range shuffling when the SortRangeType of SortNode is USING_STORAGE_MANAGER. * In general Repartitioner determines the partition range using previous output statistics data. http://git-wip-us.apache.org/repos/asf/tajo/blob/cf66a390/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml index 67033ed..abea9de 100644 --- a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml +++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml @@ -195,4 +195,26 @@ <name>tajo.storage.appender-handler.hfile.class</name> <value>org.apache.tajo.storage.hbase.HFileAppender</value> </property> + + <!--- Storage buffer --> + <property> + <name>tajo.storage.text.io.read-buffer.bytes</name> + <value>131072</value> + <description>128KB read buffer</description> + </property> + <property> + <name>tajo.storage.text.io.write-buffer.bytes</name> + <value>131072</value> + <description>128KB write buffer</description> + </property> + <property> + <name>tajo.storage.raw.io.read-buffer.bytes</name> + <value>131072</value> + <description>128KB read buffer</description> + </property> + <property> + <name>tajo.storage.raw.io.write-buffer.bytes</name> + <value>131072</value> + <description>128KB write buffer</description> + </property> </configuration> http://git-wip-us.apache.org/repos/asf/tajo/blob/cf66a390/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml index d1c561b..712f664 100644 --- a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml +++ b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml @@ -161,4 +161,26 @@ <name>tajo.storage.appender-handler.avro.class</name> <value>org.apache.tajo.storage.avro.AvroAppender</value> </property> + + <!--- Storage buffer --> + <property> + <name>tajo.storage.text.io.read-buffer.bytes</name> + <value>131072</value> + <description>128KB read buffer</description> + </property> + <property> + <name>tajo.storage.text.io.write-buffer.bytes</name> + <value>131072</value> + <description>128KB write buffer</description> + </property> + <property> + <name>tajo.storage.raw.io.read-buffer.bytes</name> + <value>131072</value> + <description>128KB read buffer</description> + </property> + <property> + <name>tajo.storage.raw.io.write-buffer.bytes</name> + <value>131072</value> + <description>128KB write buffer</description> + </property> </configuration> http://git-wip-us.apache.org/repos/asf/tajo/blob/cf66a390/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java index 45e07d3..5213ba0 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java @@ -46,6 +46,9 @@ import java.nio.channels.FileChannel; public class RawFile { private static final Log LOG = LogFactory.getLog(RawFile.class); + public static final String READ_BUFFER_SIZE = "tajo.storage.raw.io.read-buffer.bytes"; + public static final String WRITE_BUFFER_SIZE = "tajo.storage.raw.io.write-buffer.bytes"; + public static final int DEFAULT_BUFFER_SIZE = 128 * StorageUnit.KB; public static class RawFileScanner extends FileScanner implements SeekableScanner { private FileChannel channel; @@ -92,7 +95,7 @@ public class RawFile { + ", fragment length :" + fragment.getLength()); } - buf = BufferPool.directBuffer(64 * StorageUnit.KB); + buf = BufferPool.directBuffer(conf.getInt(READ_BUFFER_SIZE, DEFAULT_BUFFER_SIZE)); buffer = buf.nioBuffer(0, buf.capacity()); columnTypes = new DataType[schema.size()]; @@ -382,7 +385,7 @@ public class RawFile { if (buffer.capacity() - buffer.remaining() < writableBytes) { buf.setIndex(buffer.position(), buffer.limit()); buf.markReaderIndex(); - buf.discardSomeReadBytes(); + buf.discardReadBytes(); buf.ensureWritable(writableBytes); buffer = buf.nioBuffer(0, buf.capacity()); buffer.limit(buf.writerIndex()); @@ -491,7 +494,7 @@ public class RawFile { columnTypes[i] = schema.getColumn(i).getDataType(); } - buf = BufferPool.directBuffer(64 * StorageUnit.KB); + buf = BufferPool.directBuffer(conf.getInt(WRITE_BUFFER_SIZE, DEFAULT_BUFFER_SIZE)); buffer = buf.nioBuffer(0, buf.capacity()); // comput the number of bytes, representing the null flags @@ -532,6 +535,13 @@ public class RawFile { buffer.limit(limit); buffer.compact(); + //increase the write-buffer + if(buffer.remaining() < sizeToBeWritten) { + buf.setIndex(buffer.position(), buffer.limit()); + buf.ensureWritable(sizeToBeWritten); + buffer = buf.nioBuffer(0, buf.capacity()); + buffer.position(buf.readerIndex()); + } return true; } else { return false; @@ -632,8 +642,8 @@ public class RawFile { continue; } - // 8 is the maximum bytes size of all types - if (flushBufferAndReplace(recordOffset, 8)) { + // 10 is the maximum bytes size of all types + if (flushBufferAndReplace(recordOffset, 10)) { recordOffset = 0; } http://git-wip-us.apache.org/repos/asf/tajo/blob/cf66a390/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java index 1b433b5..8b33858 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java @@ -36,6 +36,7 @@ import org.apache.tajo.storage.ByteBufInputChannel; import org.apache.tajo.storage.FileScanner; import org.apache.tajo.storage.compress.CodecPool; import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.unit.StorageUnit; import java.io.Closeable; import java.io.DataInputStream; @@ -45,7 +46,6 @@ import java.util.concurrent.atomic.AtomicInteger; public class DelimitedLineReader implements Closeable { private static final Log LOG = LogFactory.getLog(DelimitedLineReader.class); - private final static int DEFAULT_PAGE_SIZE = 128 * 1024; private FileSystem fs; private FSDataInputStream fis; @@ -60,12 +60,18 @@ public class DelimitedLineReader implements Closeable { private AtomicInteger lineReadBytes = new AtomicInteger(); private FileFragment fragment; private Configuration conf; + private int bufferSize; public DelimitedLineReader(Configuration conf, final FileFragment fragment) throws IOException { + this(conf, fragment, 128 * StorageUnit.KB); + } + + public DelimitedLineReader(Configuration conf, final FileFragment fragment, int bufferSize) throws IOException { this.fragment = fragment; this.conf = conf; this.factory = new CompressionCodecFactory(conf); this.codec = factory.getCodec(fragment.getPath()); + this.bufferSize = bufferSize; if (this.codec instanceof SplittableCompressionCodec) { throw new NotImplementedException(); // bzip2 does not support multi-thread model } @@ -83,14 +89,16 @@ public class DelimitedLineReader implements Closeable { decompressor = CodecPool.getDecompressor(codec); is = new DataInputStream(codec.createInputStream(fis, decompressor)); ByteBufInputChannel channel = new ByteBufInputChannel(is); - lineReader = new ByteBufLineReader(channel, BufferPool.directBuffer(DEFAULT_PAGE_SIZE)); + + ByteBuf buf = BufferPool.directBuffer(bufferSize); + lineReader = new ByteBufLineReader(channel, buf); } else { fis.seek(startOffset); is = fis; ByteBufInputChannel channel = new ByteBufInputChannel(is); lineReader = new ByteBufLineReader(channel, - BufferPool.directBuffer((int) Math.min(DEFAULT_PAGE_SIZE, end))); + BufferPool.directBuffer((int) Math.min(bufferSize, end))); } eof = false; } http://git-wip-us.apache.org/repos/asf/tajo/blob/cf66a390/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java index 59129d1..15db4c3 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java @@ -39,6 +39,7 @@ import org.apache.tajo.storage.compress.CodecPool; import org.apache.tajo.storage.exception.AlreadyExistsStorageException; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream; +import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.ReflectionUtil; import java.io.BufferedOutputStream; @@ -55,6 +56,9 @@ import static org.apache.tajo.storage.StorageConstants.TEXT_ERROR_TOLERANCE_MAXN public class DelimitedTextFile { public static final byte LF = '\n'; + public static final String READ_BUFFER_SIZE = "tajo.storage.text.io.read-buffer.bytes"; + public static final String WRITE_BUFFER_SIZE = "tajo.storage.text.io.write-buffer.bytes"; + public static final int DEFAULT_BUFFER_SIZE = 128 * StorageUnit.KB; private static final Log LOG = LogFactory.getLog(DelimitedTextFile.class); @@ -105,8 +109,7 @@ public class DelimitedTextFile { private CompressionCodecFactory codecFactory; private CompressionCodec codec; private Path compressedPath; - private byte[] nullChars; - private int BUFFER_SIZE = 128 * 1024; + private int bufferSize; private int bufferedBytes = 0; private long pos = 0; @@ -165,8 +168,9 @@ public class DelimitedTextFile { serializer = getLineSerde().createSerializer(schema, meta); serializer.init(); + bufferSize = conf.getInt(WRITE_BUFFER_SIZE, DEFAULT_BUFFER_SIZE); if (os == null) { - os = new NonSyncByteArrayOutputStream(BUFFER_SIZE); + os = new NonSyncByteArrayOutputStream(bufferSize); } os.reset(); @@ -189,7 +193,7 @@ public class DelimitedTextFile { bufferedBytes += rowBytes; // refill buffer if necessary - if (bufferedBytes > BUFFER_SIZE) { + if (bufferedBytes > bufferSize) { flushBuffer(); } // Statistical section @@ -288,7 +292,7 @@ public class DelimitedTextFile { final Fragment fragment) throws IOException { super(conf, schema, meta, fragment); - reader = new DelimitedLineReader(conf, this.fragment); + reader = new DelimitedLineReader(conf, this.fragment, conf.getInt(READ_BUFFER_SIZE, 128 * StorageUnit.KB)); if (!reader.isCompressed()) { splittable = true; } @@ -307,7 +311,7 @@ public class DelimitedTextFile { reader.close(); } - reader = new DelimitedLineReader(conf, fragment); + reader = new DelimitedLineReader(conf, fragment, conf.getInt(READ_BUFFER_SIZE, 128 * StorageUnit.KB)); reader.init(); recordCount = 0; http://git-wip-us.apache.org/repos/asf/tajo/blob/cf66a390/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java index 15998f2..9577e3d 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java @@ -94,6 +94,20 @@ public class TestStorages { " ]\n" + "}\n"; + private static String TEST_MAX_VALUE_AVRO_SCHEMA = + "{\n" + + " \"type\": \"record\",\n" + + " \"namespace\": \"org.apache.tajo\",\n" + + " \"name\": \"testMaxValue\",\n" + + " \"fields\": [\n" + + " { \"name\": \"col4\", \"type\": \"float\" },\n" + + " { \"name\": \"col5\", \"type\": \"double\" },\n" + + " { \"name\": \"col1\", \"type\": \"int\" },\n" + + " { \"name\": \"col2\", \"type\": \"int\" },\n" + + " { \"name\": \"col3\", \"type\": \"long\" }\n" + + " ]\n" + + "}\n"; + private StoreType storeType; private boolean splitable; private boolean statsable; @@ -875,4 +889,67 @@ public class TestStorages { assertEquals(appender.getStats().getNumRows().longValue(), readRows); } } + + @Test + public void testMaxValue() throws IOException { + + Schema schema = new Schema(); + schema.addColumn("col1", Type.FLOAT4); + schema.addColumn("col2", Type.FLOAT8); + schema.addColumn("col3", Type.INT2); + schema.addColumn("col4", Type.INT4); + schema.addColumn("col5", Type.INT8); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + if (storeType == StoreType.AVRO) { + meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, TEST_MAX_VALUE_AVRO_SCHEMA); + } + + if (storeType == StoreType.RAW) { + StorageManager.clearCache(); + /* TAJO-1250 reproduce BufferOverflow of RAWFile */ + int headerSize = 4 + 2 + 1; //Integer record length + Short null-flag length + 1 byte null flags + /* max varint32: 5 bytes, max varint64: 10 bytes */ + int record = 4 + 8 + 2 + 5 + 8; // required size is 27 + conf.setInt(RawFile.WRITE_BUFFER_SIZE, record + headerSize); + } + + FileStorageManager sm = (FileStorageManager) StorageManager.getFileStorageManager(conf); + Path tablePath = new Path(testDir, "testMaxValue.data"); + Appender appender = sm.getAppender(meta, schema, tablePath); + + appender.init(); + + Tuple tuple = new VTuple(5); + tuple.put(new Datum[]{ + DatumFactory.createFloat4(Float.MAX_VALUE), + DatumFactory.createFloat8(Double.MAX_VALUE), + DatumFactory.createInt2(Short.MAX_VALUE), + DatumFactory.createInt4(Integer.MAX_VALUE), + DatumFactory.createInt8(Long.MAX_VALUE) + }); + + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = sm.getScanner(meta, schema, fragment); + scanner.init(); + + Tuple retrieved; + while ((retrieved = scanner.next()) != null) { + for (int i = 0; i < tuple.size(); i++) { + assertEquals(tuple.get(i), retrieved.get(i)); + } + } + scanner.close(); + + + if (storeType == StoreType.RAW){ + StorageManager.clearCache(); + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/cf66a390/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml index 737284b..adddf66 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml +++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml @@ -175,4 +175,26 @@ <name>tajo.storage.appender-handler.avro.class</name> <value>org.apache.tajo.storage.avro.AvroAppender</value> </property> + + <!--- Storage buffer --> + <property> + <name>tajo.storage.text.io.read-buffer.bytes</name> + <value>131072</value> + <description>128KB read buffer</description> + </property> + <property> + <name>tajo.storage.text.io.write-buffer.bytes</name> + <value>131072</value> + <description>128KB write buffer</description> + </property> + <property> + <name>tajo.storage.raw.io.read-buffer.bytes</name> + <value>131072</value> + <description>128KB read buffer</description> + </property> + <property> + <name>tajo.storage.raw.io.write-buffer.bytes</name> + <value>131072</value> + <description>128KB write buffer</description> + </property> </configuration>
