Repository: parquet-mr Updated Branches: refs/heads/master 5a45ae3b1 -> 6b605a4ea
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java index 0a0b316..2eab54a 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java @@ -42,6 +42,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.CodecFactory.BytesCompressor; import org.apache.parquet.io.ParquetEncodingException; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.bytes.ByteBufferAllocator; class ColumnChunkPageWriteStore implements PageWriteStore { private static final Log LOG = Log.getLog(ColumnChunkPageWriteStore.class); @@ -65,10 +66,14 @@ class ColumnChunkPageWriteStore implements PageWriteStore { private Set<Encoding> encodings = new HashSet<Encoding>(); private Statistics totalStatistics; + private final ByteBufferAllocator allocator; - private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, int pageSize) { + private ColumnChunkPageWriter(ColumnDescriptor path, + BytesCompressor compressor, + ByteBufferAllocator allocator) { this.path = path; this.compressor = compressor; + this.allocator = allocator; this.buf = new ConcatenatingByteArrayCollector(); this.totalStatistics = getStatsBasedOnType(this.path.getType()); } @@ -84,14 +89,14 @@ class ColumnChunkPageWriteStore implements PageWriteStore { if (uncompressedSize > Integer.MAX_VALUE) { throw new ParquetEncodingException( "Cannot write page larger than Integer.MAX_VALUE bytes: " + - uncompressedSize); + uncompressedSize); } BytesInput compressedBytes = compressor.compress(bytes); long compressedSize = compressedBytes.size(); if (compressedSize > Integer.MAX_VALUE) { throw new ParquetEncodingException( "Cannot write compressed page larger than Integer.MAX_VALUE bytes: " - + compressedSize); + + compressedSize); } tempOutputStream.reset(); parquetMetadataConverter.writeDataPageHeader( @@ -151,10 +156,10 @@ class ColumnChunkPageWriteStore implements PageWriteStore { // we only allocate one buffer to copy into instead of multiple. buf.collect( BytesInput.concat( - BytesInput.from(tempOutputStream), - repetitionLevels, - definitionLevels, - compressedData) + BytesInput.from(tempOutputStream), + repetitionLevels, + definitionLevels, + compressedData) ); encodings.add(dataEncoding); } @@ -163,7 +168,7 @@ class ColumnChunkPageWriteStore implements PageWriteStore { if (size > Integer.MAX_VALUE) { throw new ParquetEncodingException( "Cannot write page larger than " + Integer.MAX_VALUE + " bytes: " + - size); + size); } return (int)size; } @@ -186,10 +191,10 @@ class ColumnChunkPageWriteStore implements PageWriteStore { String.format( "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s", buf.size(), path, totalValueCount, uncompressedLength, compressedLength, pageCount, encodings) - + (dictionaryPage != null ? String.format( - ", dic { %,d entries, %,dB raw, %,dB comp}", - dictionaryPage.getDictionarySize(), dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize()) - : "")); + + (dictionaryPage != null ? String.format( + ", dic { %,d entries, %,dB raw, %,dB comp}", + dictionaryPage.getDictionarySize(), dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize()) + : "")); } encodings.clear(); pageCount = 0; @@ -215,15 +220,16 @@ class ColumnChunkPageWriteStore implements PageWriteStore { public String memUsageString(String prefix) { return buf.memUsageString(prefix + " ColumnChunkPageWriter"); } + } private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>(); private final MessageType schema; - public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int pageSize) { + public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, ByteBufferAllocator allocator) { this.schema = schema; for (ColumnDescriptor path : schema.getColumns()) { - writers.put(path, new ColumnChunkPageWriter(path, compressor, pageSize)); + writers.put(path, new ColumnChunkPageWriter(path, compressor, allocator)); } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java new file mode 100644 index 0000000..bb711da --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java @@ -0,0 +1,522 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.parquet.hadoop; + + + +import java.lang.reflect.Method; +import java.lang.reflect.InvocationTargetException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.pool.BasePoolableObjectFactory; +import org.apache.commons.pool.impl.GenericObjectPool; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; +import org.xerial.snappy.Snappy; + +import org.apache.parquet.bytes.ByteBufferAllocator; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.Log; +import org.apache.parquet.ParquetRuntimeException; +import org.apache.parquet.Preconditions; + +/** + * Factory to produce compressors and decompressors that operate on java + * direct memory, without requiring a copy into heap memory (where possible). + */ +class DirectCodecFactory extends CodecFactory implements AutoCloseable { + private static final Log LOG = Log.getLog(DirectCodecFactory.class); + + private final ByteBufferAllocator allocator; + + // Any of these can be null depending on the version of hadoop on the classpath + private static final Class<?> DIRECT_DECOMPRESSION_CODEC_CLASS; + private static final Method DECOMPRESS_METHOD; + private static final Method CREATE_DIRECT_DECOMPRESSOR_METHOD; + + static { + Class<?> tempClass = null; + Method tempCreateMethod = null; + Method tempDecompressMethod = null; + try { + tempClass = Class.forName("org.apache.hadoop.io.compress.DirectDecompressionCodec"); + tempCreateMethod = tempClass.getMethod("createDirectDecompressor"); + tempDecompressMethod = tempClass.getMethod("decompress", ByteBuffer.class, ByteBuffer.class); + } catch (ClassNotFoundException e) { + // do nothing, the class will just be assigned null + } catch (NoSuchMethodException e) { + // do nothing, the method will just be assigned null + } + DIRECT_DECOMPRESSION_CODEC_CLASS = tempClass; + CREATE_DIRECT_DECOMPRESSOR_METHOD = tempCreateMethod; + DECOMPRESS_METHOD = tempDecompressMethod; + } + + /** + * See docs on CodecFactory#createDirectCodecFactory which is how this class is + * exposed publicly and is just a pass-through factory method for this constructor + * to hide the rest of this class from public access. + */ + DirectCodecFactory(Configuration config, ByteBufferAllocator allocator, int pageSize) { + super(config, pageSize); + Preconditions.checkNotNull(allocator, "allocator"); + Preconditions.checkState(allocator.isDirect(), + "A %s requires a direct buffer allocator be provided.", + getClass().getSimpleName()); + this.allocator = allocator; + } + + private ByteBuffer ensure(ByteBuffer buffer, int size) { + if (buffer == null) { + buffer = allocator.allocate(size); + } else if (buffer.capacity() >= size) { + buffer.clear(); + } else { + release(buffer); + buffer = allocator.allocate(size); + } + return buffer; + } + + ByteBuffer release(ByteBuffer buffer) { + if (buffer != null) { + allocator.release(buffer); + } + return null; + } + + @Override + protected BytesCompressor createCompressor(final CompressionCodecName codecName) { + + CompressionCodec codec = getCodec(codecName); + if (codec == null) { + return new NoopCompressor(); + } else if (codecName == CompressionCodecName.SNAPPY) { + // avoid using the default Snappy codec since it allocates direct buffers at awkward spots. + return new SnappyCompressor(); + } else { + // todo: create class similar to the SnappyCompressor for zlib and exclude it as + // snappy is above since it also generates allocateDirect calls. + return new HeapBytesCompressor(codecName); + } + } + + @Override + protected BytesDecompressor createDecompressor(final CompressionCodecName codecName) { + CompressionCodec codec = getCodec(codecName); + if (codec == null) { + return new NoopDecompressor(); + } else if (codecName == CompressionCodecName.SNAPPY ) { + return new SnappyDecompressor(); + } else if (DirectCodecPool.INSTANCE.codec(codec).supportsDirectDecompression()) { + return new FullDirectDecompressor(codecName); + } else { + return new IndirectDecompressor(codec); + } + } + + public void close() { + release(); + } + + /** + * Wrapper around legacy hadoop compressors that do not implement a direct memory + * based version of the decompression algorithm. + */ + public class IndirectDecompressor extends BytesDecompressor { + private final Decompressor decompressor; + + public IndirectDecompressor(CompressionCodec codec) { + this.decompressor = DirectCodecPool.INSTANCE.codec(codec).borrowDecompressor(); + } + + @Override + public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException { + decompressor.reset(); + byte[] inputBytes = bytes.toByteArray(); + decompressor.setInput(inputBytes, 0, inputBytes.length); + byte[] output = new byte[uncompressedSize]; + decompressor.decompress(output, 0, uncompressedSize); + return BytesInput.from(output); + } + + @Override + public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) + throws IOException { + + decompressor.reset(); + byte[] inputBytes = new byte[compressedSize]; + input.position(0); + input.get(inputBytes); + decompressor.setInput(inputBytes, 0, inputBytes.length); + byte[] outputBytes = new byte[uncompressedSize]; + decompressor.decompress(outputBytes, 0, uncompressedSize); + output.clear(); + output.put(outputBytes); + } + + @Override + protected void release() { + DirectCodecPool.INSTANCE.returnDecompressor(decompressor); + } + } + + /** + * Wrapper around new Hadoop compressors that implement a direct memory + * based version of a particular decompression algorithm. To maintain + * compatibility with Hadoop 1.x these classes that implement + * {@link org.apache.hadoop.io.compress.DirectDecompressionCodec} + * are currently retrieved and have their decompression method invoked + * with reflection. + */ + public class FullDirectDecompressor extends BytesDecompressor { + private final Object decompressor; + private HeapBytesDecompressor extraDecompressor; + public FullDirectDecompressor(CompressionCodecName codecName){ + CompressionCodec codec = getCodec(codecName); + this.decompressor = DirectCodecPool.INSTANCE.codec(codec).borrowDirectDecompressor(); + this.extraDecompressor = new HeapBytesDecompressor(codecName); + } + + @Override + public BytesInput decompress(BytesInput compressedBytes, int uncompressedSize) throws IOException { + return extraDecompressor.decompress(compressedBytes, uncompressedSize); + } + + @Override + public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) + throws IOException { + output.clear(); + try { + DECOMPRESS_METHOD.invoke(decompressor, (ByteBuffer) input.limit(compressedSize), (ByteBuffer) output.limit(uncompressedSize)); + } catch (IllegalAccessException e) { + throw new DirectCodecPool.ParquetCompressionCodecException(e); + } catch (InvocationTargetException e) { + throw new DirectCodecPool.ParquetCompressionCodecException(e); + } + output.position(uncompressedSize); + } + + @Override + protected void release() { + DirectCodecPool.INSTANCE.returnDirectDecompressor(decompressor); + extraDecompressor.release(); + } + + } + + public class NoopDecompressor extends BytesDecompressor { + + @Override + public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) + throws IOException { + Preconditions.checkArgument(compressedSize == uncompressedSize, + "Non-compressed data did not have matching compressed and uncompressed sizes."); + output.clear(); + output.put((ByteBuffer) input.duplicate().position(0).limit(compressedSize)); + } + + @Override + public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException { + return bytes; + } + + @Override + protected void release() {} + + } + + public class SnappyDecompressor extends BytesDecompressor { + + private HeapBytesDecompressor extraDecompressor; + public SnappyDecompressor() { + this.extraDecompressor = new HeapBytesDecompressor(CompressionCodecName.SNAPPY); + } + + @Override + public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException { + return extraDecompressor.decompress(bytes, uncompressedSize); + } + + @Override + public void decompress(ByteBuffer src, int compressedSize, ByteBuffer dst, int uncompressedSize) throws IOException { + dst.clear(); + int size = Snappy.uncompress(src, dst); + dst.limit(size); + } + + @Override + protected void release() {} + } + + public class SnappyCompressor extends BytesCompressor { + + // TODO - this outgoing buffer might be better off not being shared, this seems to + // only work because of an extra copy currently happening where this interface is + // be consumed + private ByteBuffer incoming; + private ByteBuffer outgoing; + + /** + * Compress a given buffer of bytes + * @param bytes + * @return + * @throws IOException + */ + @Override + public BytesInput compress(BytesInput bytes) throws IOException { + int maxOutputSize = Snappy.maxCompressedLength((int) bytes.size()); + ByteBuffer bufferIn = bytes.toByteBuffer(); + outgoing = ensure(outgoing, maxOutputSize); + final int size; + if (bufferIn.isDirect()) { + size = Snappy.compress(bufferIn, outgoing); + } else { + // Snappy library requires buffers be direct + this.incoming = ensure(this.incoming, (int) bytes.size()); + this.incoming.put(bufferIn); + this.incoming.flip(); + size = Snappy.compress(this.incoming, outgoing); + } + + return BytesInput.from(outgoing, 0, (int) size); + } + + @Override + public CompressionCodecName getCodecName() { + return CompressionCodecName.SNAPPY; + } + + @Override + protected void release() { + outgoing = DirectCodecFactory.this.release(outgoing); + incoming = DirectCodecFactory.this.release(incoming); + } + + } + + public static class NoopCompressor extends BytesCompressor { + + public NoopCompressor() {} + + @Override + public BytesInput compress(BytesInput bytes) throws IOException { + return bytes; + } + + @Override + public CompressionCodecName getCodecName() { + return CompressionCodecName.UNCOMPRESSED; + } + + @Override + protected void release() {} + } + + static class DirectCodecPool { + + public static final DirectCodecPool INSTANCE = new DirectCodecPool(); + + private final Map<CompressionCodec, CodecPool> codecs = + Collections.synchronizedMap(new HashMap<CompressionCodec, CodecPool>()); + private final Map<Class<?>, GenericObjectPool> directDePools = Collections + .synchronizedMap(new HashMap<Class<?>, GenericObjectPool>()); + private final Map<Class<?>, GenericObjectPool> dePools = Collections + .synchronizedMap(new HashMap<Class<?>, GenericObjectPool>()); + private final Map<Class<?>, GenericObjectPool> cPools = Collections + .synchronizedMap(new HashMap<Class<?>, GenericObjectPool>()); + + private DirectCodecPool() {} + + public class CodecPool { + private final GenericObjectPool compressorPool; + private final GenericObjectPool decompressorPool; + private final GenericObjectPool directDecompressorPool; + private final boolean supportDirectDecompressor; + private static final String BYTE_BUF_IMPL_NOT_FOUND_MSG = + "Unable to find ByteBuffer based %s for codec %s, will use a byte array based implementation instead."; + + private CodecPool(final CompressionCodec codec){ + try { + boolean supportDirectDecompressor = codec.getClass() == DIRECT_DECOMPRESSION_CODEC_CLASS; + compressorPool = new GenericObjectPool(new BasePoolableObjectFactory() { + public Object makeObject() throws Exception { + return codec.createCompressor(); + } + }, Integer.MAX_VALUE); + + Object com = compressorPool.borrowObject(); + if (com != null) { + cPools.put(com.getClass(), compressorPool); + compressorPool.returnObject(com); + } else { + if (Log.DEBUG) { + LOG.debug(String.format(BYTE_BUF_IMPL_NOT_FOUND_MSG, "compressor", codec.getClass().getName())); + } + } + + decompressorPool = new GenericObjectPool(new BasePoolableObjectFactory() { + public Object makeObject() throws Exception { + return codec.createDecompressor(); + } + }, Integer.MAX_VALUE); + + Object decom = decompressorPool.borrowObject(); + if (decom != null) { + dePools.put(decom.getClass(), decompressorPool); + decompressorPool.returnObject(decom); + } else { + if (Log.DEBUG) { + LOG.debug(String.format(BYTE_BUF_IMPL_NOT_FOUND_MSG, "decompressor" + codec.getClass().getName())); + } + } + + if (supportDirectDecompressor) { + directDecompressorPool = new GenericObjectPool( + new BasePoolableObjectFactory() { + public Object makeObject() throws Exception { + return CREATE_DIRECT_DECOMPRESSOR_METHOD.invoke(DIRECT_DECOMPRESSION_CODEC_CLASS); + } + }, Integer.MAX_VALUE); + + Object ddecom = directDecompressorPool.borrowObject(); + if (ddecom != null) { + directDePools.put(ddecom.getClass(), directDecompressorPool); + directDecompressorPool.returnObject(ddecom); + + } else { + supportDirectDecompressor = false; + if (Log.DEBUG) { + LOG.debug(String.format(BYTE_BUF_IMPL_NOT_FOUND_MSG, "compressor" + codec.getClass().getName())); + } + } + + } else { + directDecompressorPool = null; + } + + this.supportDirectDecompressor = supportDirectDecompressor; + } catch (Exception e) { + throw new ParquetCompressionCodecException("Error creating compression codec pool.", e); + } + } + + public Object borrowDirectDecompressor(){ + Preconditions.checkArgument(supportDirectDecompressor, "Tried to get a direct Decompressor from a non-direct codec."); + try { + return directDecompressorPool.borrowObject(); + } catch (Exception e) { + throw new ParquetCompressionCodecException(e); + } + } + + public boolean supportsDirectDecompression() { + return supportDirectDecompressor; + } + + public Decompressor borrowDecompressor(){ + return borrow(decompressorPool); + } + + public Compressor borrowCompressor(){ + return borrow(compressorPool); + } + } + + public CodecPool codec(CompressionCodec codec){ + CodecPool pools = codecs.get(codec); + if(pools == null){ + synchronized(this){ + pools = codecs.get(codec); + if(pools == null){ + pools = new CodecPool(codec); + codecs.put(codec, pools); + } + } + } + return pools; + } + + private void returnToPool(Object obj, Map<Class<?>, GenericObjectPool> pools) { + try { + GenericObjectPool pool = pools.get(obj.getClass()); + if (pool == null) { + throw new IllegalStateException("Received unexpected compressor or decompressor, " + + "cannot be returned to any available pool: " + obj.getClass().getSimpleName()); + } + pool.returnObject(obj); + } catch (Exception e) { + throw new ParquetCompressionCodecException(e); + } + } + + /** + * Borrow an object from a pool. + * + * @param pool - the pull to borrow from, must not be null + * @return - an object from the pool + */ + @SuppressWarnings("unchecked") + public <T> T borrow(GenericObjectPool pool) { + try { + return (T) pool.borrowObject(); + } catch (Exception e) { + throw new ParquetCompressionCodecException(e); + } + + } + + public void returnCompressor(Compressor compressor) { + returnToPool(compressor, cPools); + } + + public void returnDecompressor(Decompressor decompressor) { + returnToPool(decompressor, dePools); + } + + public void returnDirectDecompressor(Object decompressor) { + returnToPool(decompressor, directDePools); + } + + public static class ParquetCompressionCodecException extends ParquetRuntimeException { + + public ParquetCompressionCodecException() { + super(); + } + + public ParquetCompressionCodecException(String message, Throwable cause) { + super(message, cause); + } + + public ParquetCompressionCodecException(String message) { + super(message); + } + + public ParquetCompressionCodecException(Throwable cause) { + + } + } + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java index ab9cb3e..87b23a2 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -28,6 +28,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.Log; import org.apache.parquet.column.ColumnWriteStore; import org.apache.parquet.column.ParquetProperties; @@ -86,7 +87,8 @@ class InternalParquetRecordWriter<T> { int dictionaryPageSize, boolean enableDictionary, boolean validating, - WriterVersion writerVersion) { + WriterVersion writerVersion, + ByteBufferAllocator allocator) { this.parquetFileWriter = parquetFileWriter; this.writeSupport = checkNotNull(writeSupport, "writeSupport"); this.schema = schema; @@ -97,16 +99,17 @@ class InternalParquetRecordWriter<T> { this.pageSize = pageSize; this.compressor = compressor; this.validating = validating; - this.parquetProperties = new ParquetProperties(dictionaryPageSize, writerVersion, enableDictionary); + this.parquetProperties = new ParquetProperties(dictionaryPageSize, writerVersion, enableDictionary, allocator); initStore(); } private void initStore() { - pageStore = new ColumnChunkPageWriteStore(compressor, schema, pageSize); + pageStore = new ColumnChunkPageWriteStore(compressor, schema, parquetProperties.getAllocator()); columnStore = parquetProperties.newColumnWriteStore( schema, pageStore, - pageSize); + pageSize, + parquetProperties.getAllocator()); MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema); this.recordConsumer = columnIO.getRecordWriter(columnStore); writeSupport.prepareForWrite(recordConsumer); @@ -150,7 +153,9 @@ class InternalParquetRecordWriter<T> { max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(nextRowGroupSize / ((float)recordSize))) / 2), // will check halfway recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead ); - if (DEBUG) LOG.debug(format("Checked mem at %,d will check again at: %,d ", recordCount, recordCountForNextMemCheck)); + if (DEBUG) { + LOG.debug(format("Checked mem at %,d will check again at: %,d ", recordCount, recordCountForNextMemCheck)); + } } } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index f43e692..c54b2b2 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -27,10 +27,10 @@ import static org.apache.parquet.hadoop.ParquetFileWriter.MAGIC; import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_COMMON_METADATA_FILE; import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE; -import java.io.ByteArrayInputStream; import java.io.Closeable; import java.io.IOException; import java.io.SequenceInputStream; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -53,6 +53,11 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.parquet.bytes.ByteBufferAllocator; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.hadoop.util.CompatibilityUtil; + import org.apache.parquet.Log; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.ColumnDescriptor; @@ -185,7 +190,9 @@ public class ParquetFileReader implements Closeable { if (toRead.size() > 0) { // read the footers of the files that did not have a summary file - if (Log.INFO) LOG.info("reading another " + toRead.size() + " footers"); + if (Log.INFO) { + LOG.info("reading another " + toRead.size() + " footers"); + } result.addAll(readAllFootersInParallel(configuration, toRead, skipRowGroups)); } @@ -297,7 +304,7 @@ public class ParquetFileReader implements Closeable { * Read the footers of all the files under that path (recursively) * using summary files if possible * @param configuration the configuration to access the FS - * @param fileStatus the root dir + * @param pathStatus the root dir * @return all the footers * @throws IOException */ @@ -342,7 +349,9 @@ public class ParquetFileReader implements Closeable { if (Log.INFO) LOG.info("reading summary file: " + commonMetaDataFile); return readFooter(configuration, commonMetaDataFile, filter(skipRowGroups)); } else if (fileSystem.exists(metadataFile)) { - if (Log.INFO) LOG.info("reading summary file: " + metadataFile); + if (Log.INFO) { + LOG.info("reading summary file: " + metadataFile); + } return readFooter(configuration, metadataFile, filter(skipRowGroups)); } else { return null; @@ -416,13 +425,17 @@ public class ParquetFileReader implements Closeable { FSDataInputStream f = fileSystem.open(file.getPath()); try { long l = file.getLen(); - if (Log.DEBUG) LOG.debug("File length " + l); + if (Log.DEBUG) { + LOG.debug("File length " + l); + } int FOOTER_LENGTH_SIZE = 4; if (l < MAGIC.length + FOOTER_LENGTH_SIZE + MAGIC.length) { // MAGIC + data + footer + footerIndex + MAGIC throw new RuntimeException(file.getPath() + " is not a Parquet file (too small)"); } long footerLengthIndex = l - FOOTER_LENGTH_SIZE - MAGIC.length; - if (Log.DEBUG) LOG.debug("reading footer index at " + footerLengthIndex); + if (Log.DEBUG) { + LOG.debug("reading footer index at " + footerLengthIndex); + } f.seek(footerLengthIndex); int footerLength = readIntLittleEndian(f); @@ -432,7 +445,9 @@ public class ParquetFileReader implements Closeable { throw new RuntimeException(file.getPath() + " is not a Parquet file. expected magic number at tail " + Arrays.toString(MAGIC) + " but found " + Arrays.toString(magic)); } long footerIndex = footerLengthIndex - footerLength; - if (Log.DEBUG) LOG.debug("read footer length: " + footerLength + ", footer index: " + footerIndex); + if (Log.DEBUG) { + LOG.debug("read footer length: " + footerLength + ", footer index: " + footerIndex); + } if (footerIndex < MAGIC.length || footerIndex >= footerLengthIndex) { throw new RuntimeException("corrupted file: the footer index is not within the file"); } @@ -450,6 +465,7 @@ public class ParquetFileReader implements Closeable { private final Map<ColumnPath, ColumnDescriptor> paths = new HashMap<ColumnPath, ColumnDescriptor>(); private final FileMetaData fileMetaData; private final String createdBy; + private final ByteBufferAllocator allocator; private int currentBlock = 0; @@ -480,7 +496,10 @@ public class ParquetFileReader implements Closeable { for (ColumnDescriptor col : columns) { paths.put(ColumnPath.get(col.getPath()), col); } - this.codecFactory = new CodecFactory(configuration); + // the page size parameter isn't meaningful when only using + // the codec factory to get decompressors + this.codecFactory = new CodecFactory(configuration, 0); + this.allocator = new HeapByteBufferAllocator(); } @@ -540,7 +559,7 @@ public class ParquetFileReader implements Closeable { * @author Julien Le Dem * */ - private class Chunk extends ByteArrayInputStream { + private class Chunk extends ByteBufferInputStream { private final ChunkDescriptor descriptor; @@ -550,10 +569,9 @@ public class ParquetFileReader implements Closeable { * @param data contains the chunk data at offset * @param offset where the chunk starts in offset */ - public Chunk(ChunkDescriptor descriptor, byte[] data, int offset) { - super(data); + public Chunk(ChunkDescriptor descriptor, ByteBuffer data, int offset) { + super(data, offset, descriptor.size); this.descriptor = descriptor; - this.pos = offset; } protected PageHeader readPageHeader() throws IOException { @@ -626,7 +644,9 @@ public class ParquetFileReader implements Closeable { valuesCountReadSoFar += dataHeaderV2.getNum_values(); break; default: - if (DEBUG) LOG.debug("skipping page of type " + pageHeader.getType() + " of size " + compressedPageSize); + if (DEBUG) { + LOG.debug("skipping page of type " + pageHeader.getType() + " of size " + compressedPageSize); + } this.skip(compressedPageSize); break; } @@ -647,7 +667,7 @@ public class ParquetFileReader implements Closeable { * @return the current position in the chunk */ public int pos() { - return this.pos; + return this.byteBuf.position(); } /** @@ -656,8 +676,9 @@ public class ParquetFileReader implements Closeable { * @throws IOException */ public BytesInput readAsBytesInput(int size) throws IOException { - final BytesInput r = BytesInput.from(this.buf, this.pos, size); - this.pos += size; + int pos = this.byteBuf.position(); + final BytesInput r = BytesInput.from(this.byteBuf, pos, size); + this.byteBuf.position(pos + size); return r; } @@ -675,18 +696,18 @@ public class ParquetFileReader implements Closeable { /** * @param descriptor the descriptor of the chunk - * @param data contains the data of the chunk at offset + * @param byteBuf contains the data of the chunk at offset * @param offset where the chunk starts in data * @param f the file stream positioned at the end of this chunk */ - private WorkaroundChunk(ChunkDescriptor descriptor, byte[] data, int offset, FSDataInputStream f) { - super(descriptor, data, offset); + private WorkaroundChunk(ChunkDescriptor descriptor, ByteBuffer byteBuf, int offset, FSDataInputStream f) { + super(descriptor, byteBuf, offset); this.f = f; } protected PageHeader readPageHeader() throws IOException { PageHeader pageHeader; - int initialPos = this.pos; + int initialPos = pos(); try { pageHeader = Util.readPageHeader(this); } catch (IOException e) { @@ -695,7 +716,7 @@ public class ParquetFileReader implements Closeable { // to allow reading older files (using dictionary) we need this. // usually 13 to 19 bytes are missing // if the last page is smaller than this, the page header itself is truncated in the buffer. - this.pos = initialPos; // resetting the buffer to the position before we got the error + this.byteBuf.rewind(); // resetting the buffer to the position before we got the error LOG.info("completing the column chunk to read the page header"); pageHeader = Util.readPageHeader(new SequenceInputStream(this, f)); // trying again from the buffer + remainder of the stream. } @@ -703,12 +724,12 @@ public class ParquetFileReader implements Closeable { } public BytesInput readAsBytesInput(int size) throws IOException { - if (pos + size > count) { + if (pos() + size > initPos + count) { // this is to workaround a bug where the compressedLength // of the chunk is missing the size of the header of the dictionary // to allow reading older files (using dictionary) we need this. // usually 13 to 19 bytes are missing - int l1 = count - pos; + int l1 = initPos + count - pos(); int l2 = size - l1; LOG.info("completed the column chunk with " + l2 + " bytes"); return BytesInput.concat(super.readAsBytesInput(l1), BytesInput.copy(BytesInput.from(f, l2))); @@ -784,18 +805,18 @@ public class ParquetFileReader implements Closeable { public List<Chunk> readAll(FSDataInputStream f) throws IOException { List<Chunk> result = new ArrayList<Chunk>(chunks.size()); f.seek(offset); - byte[] chunksBytes = new byte[length]; - f.readFully(chunksBytes); + ByteBuffer chunksByteBuffer = allocator.allocate(length); + CompatibilityUtil.getBuf(f, chunksByteBuffer, length); // report in a counter the data we just scanned BenchmarkCounter.incrementBytesRead(length); int currentChunkOffset = 0; for (int i = 0; i < chunks.size(); i++) { ChunkDescriptor descriptor = chunks.get(i); if (i < chunks.size() - 1) { - result.add(new Chunk(descriptor, chunksBytes, currentChunkOffset)); + result.add(new Chunk(descriptor, chunksByteBuffer, currentChunkOffset)); } else { // because of a bug, the last chunk might be larger than descriptor.size - result.add(new WorkaroundChunk(descriptor, chunksBytes, currentChunkOffset, f)); + result.add(new WorkaroundChunk(descriptor, chunksByteBuffer, currentChunkOffset, f)); } currentChunkOffset += descriptor.size; } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index 664ee9d..8683a18 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -73,8 +73,9 @@ public class ParquetFileWriter { private static ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter(); public static final String PARQUET_METADATA_FILE = "_metadata"; + public static final String MAGIC_STR = "PAR1"; + public static final byte[] MAGIC = MAGIC_STR.getBytes(Charset.forName("ASCII")); public static final String PARQUET_COMMON_METADATA_FILE = "_common_metadata"; - public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII")); public static final int CURRENT_VERSION = 1; // need to supply a buffer size when setting block size. this is the default http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java index ad6c034..562bffc 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java @@ -341,7 +341,6 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> { throws IOException, InterruptedException { final WriteSupport<T> writeSupport = getWriteSupport(conf); - CodecFactory codecFactory = new CodecFactory(conf); long blockSize = getLongBlockSize(conf); if (INFO) LOG.info("Parquet block size to " + blockSize); int pageSize = getPageSize(conf); @@ -357,6 +356,8 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> { int maxPaddingSize = getMaxPaddingSize(conf); if (INFO) LOG.info("Maximum row group padding size is " + maxPaddingSize + " bytes"); + CodecFactory codecFactory = new CodecFactory(conf, pageSize); + WriteContext init = writeSupport.init(conf); ParquetFileWriter w = new ParquetFileWriter( conf, init.getSchema(), file, Mode.CREATE, blockSize, maxPaddingSize); @@ -379,7 +380,7 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> { init.getSchema(), init.getExtraMetaData(), blockSize, pageSize, - codecFactory.getCompressor(codec, pageSize), + codecFactory.getCompressor(codec), dictionaryPageSize, enableDictionary, validating, http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java index 2449192..eefb257 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,9 +20,11 @@ package org.apache.parquet.hadoop; import java.io.IOException; import java.util.Map; + import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.parquet.bytes.HeapByteBufferAllocator; import org.apache.parquet.column.ParquetProperties.WriterVersion; import org.apache.parquet.hadoop.CodecFactory.BytesCompressor; import org.apache.parquet.hadoop.api.WriteSupport; @@ -70,7 +72,7 @@ public class ParquetRecordWriter<T> extends RecordWriter<Void, T> { WriterVersion writerVersion) { internalWriter = new InternalParquetRecordWriter<T>(w, writeSupport, schema, extraMetaData, blockSize, pageSize, compressor, dictionaryPageSize, enableDictionary, - validating, writerVersion); + validating, writerVersion, new HeapByteBufferAllocator()); } /** @@ -98,8 +100,8 @@ public class ParquetRecordWriter<T> extends RecordWriter<Void, T> { WriterVersion writerVersion, MemoryManager memoryManager) { internalWriter = new InternalParquetRecordWriter<T>(w, writeSupport, schema, - extraMetaData, blockSize, pageSize, compressor, dictionaryPageSize, enableDictionary, - validating, writerVersion); + extraMetaData, blockSize, pageSize, compressor, dictionaryPageSize, enableDictionary, + validating, writerVersion, new HeapByteBufferAllocator()); this.memoryManager = checkNotNull(memoryManager, "memoryManager"); memoryManager.addWriter(internalWriter, blockSize); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index e3b7953..e2521fb 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -29,6 +29,7 @@ import org.apache.parquet.column.ParquetProperties.WriterVersion; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.bytes.HeapByteBufferAllocator; /** * Write records to a Parquet file. @@ -267,8 +268,8 @@ public class ParquetWriter<T> implements Closeable { conf, schema, file, mode, blockSize, maxPaddingSize); fileWriter.start(); - CodecFactory codecFactory = new CodecFactory(conf); - CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(compressionCodecName, 0); + CodecFactory codecFactory = new CodecFactory(conf, pageSize); + CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(compressionCodecName); this.writer = new InternalParquetRecordWriter<T>( fileWriter, writeSupport, @@ -280,7 +281,8 @@ public class ParquetWriter<T> implements Closeable { dictionaryPageSize, enableDictionary, validating, - writerVersion); + writerVersion, + new HeapByteBufferAllocator()); } public void write(T object) throws IOException { http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java index 8631267..66e3b81 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java @@ -147,4 +147,5 @@ public class SnappyDecompressor implements Decompressor { public void setDictionary(byte[] b, int off, int len) { // No-op } -} + +} //class SnappyDecompressor http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityUtil.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityUtil.java new file mode 100644 index 0000000..bacf222 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityUtil.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop.util; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.parquet.ShouldNeverHappenException; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +public class CompatibilityUtil { + + // Will be set to true if the implementation of FSDataInputSteam supports + // the 2.x APIs, in particular reading using a provided ByteBuffer + private static boolean useV21; + public static final V21FileAPI fileAPI; + + private static class V21FileAPI { + private final Method PROVIDE_BUF_READ_METHOD; + private final Class<?> FSDataInputStreamCls; + + private V21FileAPI() throws ReflectiveOperationException { + final String PACKAGE = "org.apache.hadoop"; + FSDataInputStreamCls = Class.forName(PACKAGE + ".fs.FSDataInputStream"); + PROVIDE_BUF_READ_METHOD = FSDataInputStreamCls.getMethod("read", ByteBuffer.class); + } + } + + static { + // Test to see if a class from the Hadoop 2.x API is available + boolean v21 = true; + try { + Class.forName("org.apache.hadoop.io.compress.DirectDecompressor"); + } catch (ClassNotFoundException cnfe) { + v21 = false; + } + + useV21 = v21; + try { + if (v21) { + fileAPI = new V21FileAPI(); + } else { + fileAPI = null; + } + + } catch (ReflectiveOperationException e) { + throw new IllegalArgumentException("Error finding appropriate interfaces using reflection.", e); + } + } + + private static Object invoke(Method method, String errorMsg, Object instance, Object... args) { + try { + return method.invoke(instance, args); + } catch (IllegalAccessException e) { + throw new IllegalArgumentException(errorMsg, e); + } catch (InvocationTargetException e) { + throw new IllegalArgumentException(errorMsg, e); + } + } + + public static int getBuf(FSDataInputStream f, ByteBuffer readBuf, int maxSize) throws IOException { + int res; + if (useV21) { + try { + res = (Integer) fileAPI.PROVIDE_BUF_READ_METHOD.invoke(f, readBuf); + } catch (InvocationTargetException e) { + if (e.getCause() instanceof UnsupportedOperationException) { + // the FSDataInputStream docs say specifically that implementations + // can choose to throw UnsupportedOperationException, so this should + // be a reasonable check to make to see if the interface is + // present but not implemented and we should be falling back + useV21 = false; + return getBuf(f, readBuf, maxSize); + } else if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } else { + // To handle any cases where a Runtime exception occurs and provide + // some additional context information. A stacktrace would just give + // a line number, this at least tells them we were using the version + // of the read method designed for using a ByteBuffer. + throw new IOException("Error reading out of an FSDataInputStream " + + "using the Hadoop 2 ByteBuffer based read method.", e.getCause()); + } + } catch (IllegalAccessException e) { + // This method is public because it is defined in an interface, + // there should be no problems accessing it + throw new ShouldNeverHappenException(e); + } + } else { + byte[] buf = new byte[maxSize]; + res = f.read(buf); + readBuf.put(buf, 0, res); + } + return res; + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java index 2c644b6..87574cd 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java @@ -61,6 +61,7 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; import org.apache.parquet.schema.Types; +import org.apache.parquet.bytes.HeapByteBufferAllocator; public class TestColumnChunkPageWriteStore { @@ -101,7 +102,7 @@ public class TestColumnChunkPageWriteStore { writer.start(); writer.startBlock(rowCount); { - ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(compressor(GZIP), schema , initialSize); + ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(compressor(GZIP), schema , new HeapByteBufferAllocator()); PageWriter pageWriter = store.getPageWriter(col); pageWriter.writePageV2( rowCount, nullCount, valueCount, @@ -158,8 +159,10 @@ public class TestColumnChunkPageWriteStore { int fakeCount = 3; BinaryStatistics fakeStats = new BinaryStatistics(); + // TODO - look back at this, an allocator was being passed here in the ByteBuffer changes + // see comment at this constructor ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore( - compressor(UNCOMPRESSED), schema, initialSize); + compressor(UNCOMPRESSED), schema, new HeapByteBufferAllocator()); for (ColumnDescriptor col : schema.getColumns()) { PageWriter pageWriter = store.getPageWriter(col); @@ -176,6 +179,6 @@ public class TestColumnChunkPageWriteStore { } private CodecFactory.BytesCompressor compressor(CompressionCodecName codec) { - return new CodecFactory(conf).getCompressor(codec, pageSize); + return new CodecFactory(conf, pageSize).getCompressor(codec); } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java new file mode 100644 index 0000000..caf2ed6 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.parquet.hadoop; + +import java.nio.ByteBuffer; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.bytes.ByteBufferAllocator; +import org.apache.parquet.bytes.DirectByteBufferAllocator; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +public class TestDirectCodecFactory { + + private static enum Decompression { + ON_HEAP, OFF_HEAP, OFF_HEAP_BYTES_INPUT + } + + private final int pageSize = 64 * 1024; + + private void test(int size, CompressionCodecName codec, boolean useOnHeapCompression, Decompression decomp) { + ByteBuffer rawBuf = null; + ByteBuffer outBuf = null; + ByteBufferAllocator allocator = null; + try { + allocator = new DirectByteBufferAllocator(); + final CodecFactory codecFactory = CodecFactory.createDirectCodecFactory(new Configuration(), allocator, pageSize); + rawBuf = allocator.allocate(size); + final byte[] rawArr = new byte[size]; + outBuf = allocator.allocate(size * 2); + final Random r = new Random(); + final byte[] random = new byte[1024]; + int pos = 0; + while (pos < size) { + r.nextBytes(random); + rawBuf.put(random); + System.arraycopy(random, 0, rawArr, pos, random.length); + pos += random.length; + } + rawBuf.flip(); + + final DirectCodecFactory.BytesCompressor c = codecFactory.getCompressor(codec); + final CodecFactory.BytesDecompressor d = codecFactory.getDecompressor(codec); + + final BytesInput compressed; + if (useOnHeapCompression) { + compressed = c.compress(BytesInput.from(rawArr)); + } else { + compressed = c.compress(BytesInput.from(rawBuf, 0, rawBuf.remaining())); + } + + switch (decomp) { + case OFF_HEAP: { + final ByteBuffer buf = compressed.toByteBuffer(); + final ByteBuffer b = allocator.allocate(buf.capacity()); + try { + b.put(buf); + b.flip(); + d.decompress(b, (int) compressed.size(), outBuf, size); + for (int i = 0; i < size; i++) { + Assert.assertTrue("Data didn't match at " + i, outBuf.get(i) == rawBuf.get(i)); + } + } finally { + allocator.release(b); + } + break; + } + + case OFF_HEAP_BYTES_INPUT: { + final ByteBuffer buf = compressed.toByteBuffer(); + final ByteBuffer b = allocator.allocate(buf.capacity()); + try { + b.put(buf); + b.flip(); + final BytesInput input = d.decompress(BytesInput.from(b, 0, b.capacity()), size); + Assert.assertArrayEquals( + String.format("While testing codec %s", codec), + input.toByteArray(), rawArr); + } finally { + allocator.release(b); + } + break; + } + case ON_HEAP: { + final byte[] buf = compressed.toByteArray(); + final BytesInput input = d.decompress(BytesInput.from(buf), size); + Assert.assertArrayEquals(input.toByteArray(), rawArr); + break; + } + } + } catch (Exception e) { + final String msg = String.format( + "Failure while testing Codec: %s, OnHeapCompressionInput: %s, Decompression Mode: %s, Data Size: %d", + codec.name(), + useOnHeapCompression, decomp.name(), size); + System.out.println(msg); + throw new RuntimeException(msg, e); + } finally { + if (rawBuf != null) { + allocator.release(rawBuf); + } + if (outBuf != null) { + allocator.release(rawBuf); + } + } + } + + @Test + public void createDirectFactoryWithHeapAllocatorFails() { + String errorMsg = "Test failed, creation of a direct codec factory should have failed when passed a non-direct allocator."; + try { + CodecFactory.createDirectCodecFactory(new Configuration(), new HeapByteBufferAllocator(), 0); + throw new RuntimeException(errorMsg); + } catch (IllegalStateException ex) { + // indicates successful completion of the test + Assert.assertTrue("Missing expected error message.", + ex.getMessage() + .contains("A DirectCodecFactory requires a direct buffer allocator be provided.") + ); + } catch (Exception ex) { + throw new RuntimeException(errorMsg + " Failed with the wrong error."); + } + } + + @Test + public void compressionCodecs() throws Exception { + final int[] sizes = { 4 * 1024, 1 * 1024 * 1024 }; + final boolean[] comp = { true, false }; + + for (final int size : sizes) { + for (final boolean useOnHeapComp : comp) { + for (final Decompression decomp : Decompression.values()) { + for (final CompressionCodecName codec : CompressionCodecName.values()) { + if (codec == CompressionCodecName.LZO) { + // not installed as gpl. + continue; + } + test(size, codec, useOnHeapComp, decomp); + } + } + } + } + } +} + http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-pig/src/test/java/org/apache/parquet/pig/TupleConsumerPerfTest.java ---------------------------------------------------------------------- diff --git a/parquet-pig/src/test/java/org/apache/parquet/pig/TupleConsumerPerfTest.java b/parquet-pig/src/test/java/org/apache/parquet/pig/TupleConsumerPerfTest.java index 5e1f5af..c050922 100644 --- a/parquet-pig/src/test/java/org/apache/parquet/pig/TupleConsumerPerfTest.java +++ b/parquet-pig/src/test/java/org/apache/parquet/pig/TupleConsumerPerfTest.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.Map; import java.util.logging.Level; +import org.apache.parquet.bytes.HeapByteBufferAllocator; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataBag; import org.apache.pig.data.NonSpillableDataBag; @@ -59,7 +60,7 @@ public class TupleConsumerPerfTest { MessageType schema = new PigSchemaConverter().convert(Utils.getSchemaFromString(pigSchema)); MemPageStore memPageStore = new MemPageStore(0); - ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0); + ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0, new HeapByteBufferAllocator()); write(memPageStore, columns, schema, pigSchema); columns.flush(); read(memPageStore, pigSchema, pigSchemaProjected, pigSchemaNoString); http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetReadProtocol.java ---------------------------------------------------------------------- diff --git a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetReadProtocol.java b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetReadProtocol.java index f5f3ff1..f954e4c 100644 --- a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetReadProtocol.java +++ b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetReadProtocol.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.parquet.bytes.HeapByteBufferAllocator; import thrift.test.OneOfEach; import org.apache.thrift.TBase; @@ -148,7 +149,8 @@ public class TestParquetReadProtocol { final MessageType schema = schemaConverter.convert(thriftClass); LOG.info(schema); final MessageColumnIO columnIO = new ColumnIOFactory(true).getColumnIO(schema); - final ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 10000, 10000, false, WriterVersion.PARQUET_1_0); + final ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 10000, 10000, false, + WriterVersion.PARQUET_1_0, new HeapByteBufferAllocator()); final RecordConsumer recordWriter = columnIO.getRecordWriter(columns); final StructType thriftType = schemaConverter.toStructType(thriftClass); ParquetWriteProtocol parquetWriteProtocol = new ParquetWriteProtocol(recordWriter, columnIO, thriftType); http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index dc27f4c..14feb9c 100644 --- a/pom.xml +++ b/pom.xml @@ -88,7 +88,7 @@ <scala.binary.version>2.10</scala.binary.version> <scala.maven.test.skip>false</scala.maven.test.skip> <pig.version>0.11.1</pig.version> - <pig.classifier /> + <pig.classifier/> <thrift.version>0.7.0</thrift.version> <fastutil.version>6.5.7</fastutil.version> <semver.api.version>0.9.33</semver.api.version> @@ -225,6 +225,7 @@ <exclude>org/apache/parquet/filter2/**</exclude> <exclude>org/apache/parquet/column/**</exclude> <exclude>org/apache/parquet/hadoop/ParquetInputSplit</exclude> + <exclude>org/apache/parquet/hadoop/CodecFactory**</exclude> <exclude>shaded/**</exclude> <!-- shaded by parquet --> <!-- temporary exclusions for false-positives --> <exclude>org/apache/parquet/Version</exclude>
