Repository: parquet-mr Updated Branches: refs/heads/master c8d78b21b -> 898f3d0f6
PARQUET-400: Replace CompatibilityUtil with SeekableInputStream. This fixes PARQUET-400 by replacing `CompatibilityUtil` with `SeekableInputStream` that's implemented for hadoop-1 and hadoop-2. The benefit of this approach is that `SeekableInputStream` can be used for non-Hadoop file systems in the future. This also changes the default Hadoop version to Hadoop-2. The library is still compatible with Hadoop 1.x, but this makes building Hadoop-2 classes, like `H2SeekableInputStream`, much easier and removes the need for multiple hadoop versions during compilation. Author: Ryan Blue <[email protected]> Closes #349 from rdblue/PARQUET-400-byte-buffers and squashes the following commits: 1bcb8a8 [Ryan Blue] PARQUET-400: Fix review nits. 823ca00 [Ryan Blue] PARQUET-400: Add tests for Hadoop 2 readFully. 02d3709 [Ryan Blue] PARQUET-400: Remove unused property. b543013 [Ryan Blue] PARQUET-400: Fix logger for HadoopStreams. 2cb6934 [Ryan Blue] PARQUET-400: Remove H2SeekableInputStream tests. abaa695 [Ryan Blue] PARQUET-400: Fix review items. 5dc50a5 [Ryan Blue] PARQUET-400: Add tests for H1SeekableInputStream methods. 730a9e2 [Ryan Blue] PARQUET-400: Move SeekableInputStream to io package. 506a556 [Ryan Blue] PARQUET-400: Remove Hadoop dependencies from SeekableInputStream. c80580c [Ryan Blue] PARQUET-400: Handle UnsupportedOperationException from read(ByteBuffer). ba08b3f [Ryan Blue] PARQUET-400: Replace CompatibilityUtil with SeekableInputStream. Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/898f3d0f Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/898f3d0f Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/898f3d0f Branch: refs/heads/master Commit: 898f3d0f652f313473c67fef32e22d94d8294d4f Parents: c8d78b2 Author: Ryan Blue <[email protected]> Authored: Tue Aug 16 10:12:00 2016 -0700 Committer: Ryan Blue <[email protected]> Committed: Tue Aug 16 10:12:00 2016 -0700 ---------------------------------------------------------------------- .travis.yml | 4 +- .../apache/parquet/io/SeekableInputStream.java | 106 +++ .../parquet/hadoop/ParquetFileReader.java | 31 +- .../parquet/hadoop/ParquetFileWriter.java | 17 +- .../parquet/hadoop/util/CompatibilityUtil.java | 114 --- .../hadoop/util/H1SeekableInputStream.java | 154 ++++ .../hadoop/util/H2SeekableInputStream.java | 107 +++ .../parquet/hadoop/util/HadoopStreams.java | 100 +++ .../parquet/hadoop/util/MockInputStream.java | 87 +++ .../hadoop/util/TestHadoop1ByteBufferReads.java | 761 +++++++++++++++++++ .../hadoop/util/TestHadoop2ByteBufferReads.java | 405 ++++++++++ pom.xml | 14 +- 12 files changed, 1761 insertions(+), 139 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index 890a372..ff9b356 100644 --- a/.travis.yml +++ b/.travis.yml @@ -24,8 +24,8 @@ before_install: - cd .. env: - - HADOOP_PROFILE=default TEST_CODECS=uncompressed - - HADOOP_PROFILE=hadoop-2 TEST_CODECS=gzip,snappy + - HADOOP_PROFILE=hadoop-1 TEST_CODECS=uncompressed + - HADOOP_PROFILE=default TEST_CODECS=gzip,snappy install: mvn install --batch-mode -DskipTests=true -Dmaven.javadoc.skip=true -Dsource.skip=true > mvn_install.log || mvn install --batch-mode -DskipTests=true -Dmaven.javadoc.skip=true -Dsource.skip=true > mvn_install.log || (cat mvn_install.log && false) script: mvn test -P $HADOOP_PROFILE http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/parquet-common/src/main/java/org/apache/parquet/io/SeekableInputStream.java ---------------------------------------------------------------------- diff --git a/parquet-common/src/main/java/org/apache/parquet/io/SeekableInputStream.java b/parquet-common/src/main/java/org/apache/parquet/io/SeekableInputStream.java new file mode 100644 index 0000000..7247817 --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/io/SeekableInputStream.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.io; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +/** + * {@code SeekableInputStream} is an interface with the methods needed by + * Parquet to read data from a file or Hadoop data stream. + */ +public abstract class SeekableInputStream extends InputStream { + + /** + * Return the current position in the InputStream. + * + * @return current position in bytes from the start of the stream + * @throws IOException If the underlying stream throws IOException + */ + public abstract long getPos() throws IOException; + + /** + * Seek to a new position in the InputStream. + * + * @param newPos the new position to seek to + * @throws IOException If the underlying stream throws IOException + */ + public abstract void seek(long newPos) throws IOException; + + /** + * Read a byte array of data, from position 0 to the end of the array. + * <p> + * This method is equivalent to {@code read(bytes, 0, bytes.length)}. + * <p> + * This method will block until len bytes are available to copy into the + * array, or will throw {@link EOFException} if the stream ends before the + * array is full. + * + * @param bytes a byte array to fill with data from the stream + * @throws IOException If the underlying stream throws IOException + * @throws EOFException If the stream has fewer bytes left than are needed to + * fill the array, {@code bytes.length} + */ + public abstract void readFully(byte[] bytes) throws IOException; + + /** + * Read {@code len} bytes of data into an array, at position {@code start}. + * <p> + * This method will block until len bytes are available to copy into the + * array, or will throw {@link EOFException} if the stream ends before the + * array is full. + * + * @param bytes a byte array to fill with data from the stream + * @throws IOException If the underlying stream throws IOException + * @throws EOFException If the stream has fewer than {@code len} bytes left + */ + public abstract void readFully(byte[] bytes, int start, int len) throws IOException; + + /** + * Read {@code buf.remaining()} bytes of data into a {@link ByteBuffer}. + * <p> + * This method will copy available bytes into the buffer, reading at most + * {@code buf.remaining()} bytes. The number of bytes actually copied is + * returned by the method, or -1 is returned to signal that the end of the + * underlying stream has been reached. + * + * @param buf a byte array to fill with data from the stream + * @return the number of bytes read or -1 if the stream ended + * @throws IOException If the underlying stream throws IOException + */ + public abstract int read(ByteBuffer buf) throws IOException; + + /** + * Read {@code buf.remaining()} bytes of data into a {@link ByteBuffer}. + * <p> + * This method will block until {@code buf.remaining()} bytes are available + * to copy into the buffer, or will throw {@link EOFException} if the stream + * ends before the buffer is full. + * + * @param buf a byte array to fill with data from the stream + * @throws IOException If the underlying stream throws IOException + * @throws EOFException If the stream has fewer bytes left than are needed to + * fill the buffer, {@code buf.remaining()} + */ + public abstract void readFully(ByteBuffer buf) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 83542d5..59a7e46 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -54,7 +54,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -66,7 +65,6 @@ import org.apache.parquet.column.Encoding; import org.apache.parquet.column.page.DictionaryPageReadStore; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.compat.RowGroupFilter; -import org.apache.parquet.hadoop.util.CompatibilityUtil; import org.apache.parquet.Log; import org.apache.parquet.bytes.BytesInput; @@ -91,6 +89,8 @@ import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.util.HiddenFileFilter; +import org.apache.parquet.hadoop.util.HadoopStreams; +import org.apache.parquet.io.SeekableInputStream; import org.apache.parquet.hadoop.util.counters.BenchmarkCounter; import org.apache.parquet.io.ParquetDecodingException; @@ -432,7 +432,7 @@ public class ParquetFileReader implements Closeable { */ public static final ParquetMetadata readFooter(Configuration configuration, FileStatus file, MetadataFilter filter) throws IOException { FileSystem fileSystem = file.getPath().getFileSystem(configuration); - FSDataInputStream in = fileSystem.open(file.getPath()); + SeekableInputStream in = HadoopStreams.wrap(fileSystem.open(file.getPath())); try { return readFooter(file.getLen(), file.getPath().toString(), in, filter); } finally { @@ -449,7 +449,7 @@ public class ParquetFileReader implements Closeable { * @return the metadata blocks in the footer * @throws IOException if an error occurs while reading the file */ - public static final ParquetMetadata readFooter(long fileLen, String filePath, FSDataInputStream f, MetadataFilter filter) throws IOException { + public static final ParquetMetadata readFooter(long fileLen, String filePath, SeekableInputStream f, MetadataFilter filter) throws IOException { if (Log.DEBUG) { LOG.debug("File length " + fileLen); } @@ -493,7 +493,7 @@ public class ParquetFileReader implements Closeable { } private final CodecFactory codecFactory; - private final FSDataInputStream f; + private final SeekableInputStream f; private final FileStatus fileStatus; private final Map<ColumnPath, ColumnDescriptor> paths = new HashMap<ColumnPath, ColumnDescriptor>(); private final FileMetaData fileMetaData; // may be null @@ -531,7 +531,7 @@ public class ParquetFileReader implements Closeable { this.conf = configuration; this.fileMetaData = fileMetaData; FileSystem fs = filePath.getFileSystem(configuration); - this.f = fs.open(filePath); + this.f = HadoopStreams.wrap(fs.open(filePath)); this.fileStatus = fs.getFileStatus(filePath); this.blocks = blocks; for (ColumnDescriptor col : columns) { @@ -562,7 +562,7 @@ public class ParquetFileReader implements Closeable { this.conf = conf; FileSystem fs = file.getFileSystem(conf); this.fileStatus = fs.getFileStatus(file); - this.f = fs.open(file); + this.f = HadoopStreams.wrap(fs.open(file)); this.footer = readFooter(fileStatus.getLen(), fileStatus.getPath().toString(), f, filter); this.fileMetaData = footer.getFileMetaData(); this.blocks = footer.getBlocks(); @@ -585,7 +585,7 @@ public class ParquetFileReader implements Closeable { this.conf = conf; FileSystem fs = file.getFileSystem(conf); this.fileStatus = fs.getFileStatus(file); - this.f = fs.open(file); + this.f = HadoopStreams.wrap(fs.open(file)); this.footer = footer; this.fileMetaData = footer.getFileMetaData(); this.blocks = footer.getBlocks(); @@ -772,7 +772,7 @@ public class ParquetFileReader implements Closeable { } private static DictionaryPage readCompressedDictionary( - PageHeader pageHeader, FSDataInputStream fin) throws IOException { + PageHeader pageHeader, SeekableInputStream fin) throws IOException { DictionaryPageHeader dictHeader = pageHeader.getDictionary_page_header(); int uncompressedPageSize = pageHeader.getUncompressed_page_size(); @@ -940,7 +940,7 @@ public class ParquetFileReader implements Closeable { */ private class WorkaroundChunk extends Chunk { - private final FSDataInputStream f; + private final SeekableInputStream f; /** * @param descriptor the descriptor of the chunk @@ -948,7 +948,7 @@ public class ParquetFileReader implements Closeable { * @param offset where the chunk starts in data * @param f the file stream positioned at the end of this chunk */ - private WorkaroundChunk(ChunkDescriptor descriptor, ByteBuffer byteBuf, int offset, FSDataInputStream f) { + private WorkaroundChunk(ChunkDescriptor descriptor, ByteBuffer byteBuf, int offset, SeekableInputStream f) { super(descriptor, byteBuf, offset); this.f = f; } @@ -964,7 +964,7 @@ public class ParquetFileReader implements Closeable { // to allow reading older files (using dictionary) we need this. // usually 13 to 19 bytes are missing // if the last page is smaller than this, the page header itself is truncated in the buffer. - this.byteBuf.rewind(); // resetting the buffer to the position before we got the error + this.byteBuf.position(initialPos); // resetting the buffer to the position before we got the error LOG.info("completing the column chunk to read the page header"); pageHeader = Util.readPageHeader(new SequenceInputStream(this, f)); // trying again from the buffer + remainder of the stream. } @@ -1050,11 +1050,14 @@ public class ParquetFileReader implements Closeable { * @return the chunks * @throws IOException */ - public List<Chunk> readAll(FSDataInputStream f) throws IOException { + public List<Chunk> readAll(SeekableInputStream f) throws IOException { List<Chunk> result = new ArrayList<Chunk>(chunks.size()); f.seek(offset); + + // Allocate the bytebuffer based on whether the FS can support it. ByteBuffer chunksByteBuffer = allocator.allocate(length); - CompatibilityUtil.getBuf(f, chunksByteBuffer, length); + f.readFully(chunksByteBuffer); + // report in a counter the data we just scanned BenchmarkCounter.incrementBytesRead(length); int currentChunkOffset = 0; http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index 523d01f..f0fa7f5 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -61,6 +61,8 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.metadata.GlobalMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopStreams; +import org.apache.parquet.io.SeekableInputStream; import org.apache.parquet.io.ParquetEncodingException; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; @@ -495,6 +497,12 @@ public class ParquetFileWriter { public void appendRowGroups(FSDataInputStream file, List<BlockMetaData> rowGroups, boolean dropColumns) throws IOException { + appendRowGroups(HadoopStreams.wrap(file), rowGroups, dropColumns); + } + + public void appendRowGroups(SeekableInputStream file, + List<BlockMetaData> rowGroups, + boolean dropColumns) throws IOException { for (BlockMetaData block : rowGroups) { appendRowGroup(file, block, dropColumns); } @@ -502,6 +510,11 @@ public class ParquetFileWriter { public void appendRowGroup(FSDataInputStream from, BlockMetaData rowGroup, boolean dropColumns) throws IOException { + appendRowGroup(from, rowGroup, dropColumns); + } + + public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup, + boolean dropColumns) throws IOException { startBlock(rowGroup.getRowCount()); Map<String, ColumnChunkMetaData> columnsToCopy = @@ -596,8 +609,8 @@ public class ParquetFileWriter { * @param length the number of bytes to copy * @throws IOException */ - private static void copy(FSDataInputStream from, FSDataOutputStream to, - long start, long length) throws IOException{ + private static void copy(SeekableInputStream from, FSDataOutputStream to, + long start, long length) throws IOException{ if (DEBUG) LOG.debug( "Copying " + length + " bytes at " + start + " to " + to.getPos()); from.seek(start); http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityUtil.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityUtil.java deleted file mode 100644 index bacf222..0000000 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityUtil.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.hadoop.util; - -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.parquet.ShouldNeverHappenException; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; - -public class CompatibilityUtil { - - // Will be set to true if the implementation of FSDataInputSteam supports - // the 2.x APIs, in particular reading using a provided ByteBuffer - private static boolean useV21; - public static final V21FileAPI fileAPI; - - private static class V21FileAPI { - private final Method PROVIDE_BUF_READ_METHOD; - private final Class<?> FSDataInputStreamCls; - - private V21FileAPI() throws ReflectiveOperationException { - final String PACKAGE = "org.apache.hadoop"; - FSDataInputStreamCls = Class.forName(PACKAGE + ".fs.FSDataInputStream"); - PROVIDE_BUF_READ_METHOD = FSDataInputStreamCls.getMethod("read", ByteBuffer.class); - } - } - - static { - // Test to see if a class from the Hadoop 2.x API is available - boolean v21 = true; - try { - Class.forName("org.apache.hadoop.io.compress.DirectDecompressor"); - } catch (ClassNotFoundException cnfe) { - v21 = false; - } - - useV21 = v21; - try { - if (v21) { - fileAPI = new V21FileAPI(); - } else { - fileAPI = null; - } - - } catch (ReflectiveOperationException e) { - throw new IllegalArgumentException("Error finding appropriate interfaces using reflection.", e); - } - } - - private static Object invoke(Method method, String errorMsg, Object instance, Object... args) { - try { - return method.invoke(instance, args); - } catch (IllegalAccessException e) { - throw new IllegalArgumentException(errorMsg, e); - } catch (InvocationTargetException e) { - throw new IllegalArgumentException(errorMsg, e); - } - } - - public static int getBuf(FSDataInputStream f, ByteBuffer readBuf, int maxSize) throws IOException { - int res; - if (useV21) { - try { - res = (Integer) fileAPI.PROVIDE_BUF_READ_METHOD.invoke(f, readBuf); - } catch (InvocationTargetException e) { - if (e.getCause() instanceof UnsupportedOperationException) { - // the FSDataInputStream docs say specifically that implementations - // can choose to throw UnsupportedOperationException, so this should - // be a reasonable check to make to see if the interface is - // present but not implemented and we should be falling back - useV21 = false; - return getBuf(f, readBuf, maxSize); - } else if (e.getCause() instanceof IOException) { - throw (IOException) e.getCause(); - } else { - // To handle any cases where a Runtime exception occurs and provide - // some additional context information. A stacktrace would just give - // a line number, this at least tells them we were using the version - // of the read method designed for using a ByteBuffer. - throw new IOException("Error reading out of an FSDataInputStream " + - "using the Hadoop 2 ByteBuffer based read method.", e.getCause()); - } - } catch (IllegalAccessException e) { - // This method is public because it is defined in an interface, - // there should be no problems accessing it - throw new ShouldNeverHappenException(e); - } - } else { - byte[] buf = new byte[maxSize]; - res = f.read(buf); - readBuf.put(buf, 0, res); - } - return res; - } -} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java new file mode 100644 index 0000000..4a03b1a --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.hadoop.util; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.parquet.io.SeekableInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * SeekableInputStream implementation that implements read(ByteBuffer) for + * Hadoop 1 FSDataInputStream. + */ +class H1SeekableInputStream extends SeekableInputStream { + + private final int COPY_BUFFER_SIZE = 8192; + private final byte[] temp = new byte[COPY_BUFFER_SIZE]; + + private final FSDataInputStream stream; + + public H1SeekableInputStream(FSDataInputStream stream) { + this.stream = stream; + } + + @Override + public void close() throws IOException { + stream.close(); + } + + @Override + public long getPos() throws IOException { + return stream.getPos(); + } + + @Override + public void seek(long newPos) throws IOException { + stream.seek(newPos); + } + + @Override + public int read() throws IOException { + return stream.read(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return stream.read(b, off, len); + } + + @Override + public void readFully(byte[] bytes) throws IOException { + stream.readFully(bytes, 0, bytes.length); + } + + @Override + public void readFully(byte[] bytes, int start, int len) throws IOException { + stream.readFully(bytes); + } + + @Override + public int read(ByteBuffer buf) throws IOException { + if (buf.hasArray()) { + return readHeapBuffer(stream, buf); + } else { + return readDirectBuffer(stream, buf, temp); + } + } + + @Override + public void readFully(ByteBuffer buf) throws IOException { + if (buf.hasArray()) { + readFullyHeapBuffer(stream, buf); + } else { + readFullyDirectBuffer(stream, buf, temp); + } + } + + // Visible for testing + static int readHeapBuffer(FSDataInputStream f, ByteBuffer buf) throws IOException { + int bytesRead = f.read(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining()); + if (bytesRead < 0) { + // if this resulted in EOF, don't update position + return bytesRead; + } else { + buf.position(buf.position() + bytesRead); + return bytesRead; + } + } + + // Visible for testing + static void readFullyHeapBuffer(FSDataInputStream f, ByteBuffer buf) throws IOException { + f.readFully(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining()); + buf.position(buf.limit()); + } + + // Visible for testing + static int readDirectBuffer(FSDataInputStream f, ByteBuffer buf, byte[] temp) throws IOException { + // copy all the bytes that return immediately, stopping at the first + // read that doesn't return a full buffer. + int nextReadLength = Math.min(buf.remaining(), temp.length); + int totalBytesRead = 0; + int bytesRead; + + while ((bytesRead = f.read(temp, 0, nextReadLength)) == temp.length) { + buf.put(temp); + totalBytesRead += bytesRead; + nextReadLength = Math.min(buf.remaining(), temp.length); + } + + if (bytesRead < 0) { + // return -1 if nothing was read + return totalBytesRead == 0 ? -1 : totalBytesRead; + } else { + // copy the last partial buffer + buf.put(temp, 0, bytesRead); + totalBytesRead += bytesRead; + return totalBytesRead; + } + } + + // Visible for testing + static void readFullyDirectBuffer(FSDataInputStream f, ByteBuffer buf, byte[] temp) throws IOException { + int nextReadLength = Math.min(buf.remaining(), temp.length); + int bytesRead = 0; + + while (nextReadLength > 0 && (bytesRead = f.read(temp, 0, nextReadLength)) >= 0) { + buf.put(temp, 0, bytesRead); + nextReadLength = Math.min(buf.remaining(), temp.length); + } + + if (bytesRead < 0 && buf.remaining() > 0) { + throw new EOFException( + "Reached the end of stream. Still have: " + buf.remaining() + " bytes left"); + } + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java new file mode 100644 index 0000000..a706546 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.hadoop.util; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.parquet.io.SeekableInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * SeekableInputStream implementation for FSDataInputStream that implements + * ByteBufferReadable in Hadoop 2. + */ +class H2SeekableInputStream extends SeekableInputStream { + + // Visible for testing + interface Reader { + int read(ByteBuffer buf) throws IOException; + } + + private final FSDataInputStream stream; + private final Reader reader; + + public H2SeekableInputStream(FSDataInputStream stream) { + this.stream = stream; + this.reader = new H2Reader(); + } + + @Override + public long getPos() throws IOException { + return stream.getPos(); + } + + @Override + public void seek(long newPos) throws IOException { + stream.seek(newPos); + } + + @Override + public int read() throws IOException { + return stream.read(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return stream.read(b, off, len); + } + + @Override + public void readFully(byte[] bytes) throws IOException { + stream.readFully(bytes, 0, bytes.length); + } + + @Override + public void readFully(byte[] bytes, int start, int len) throws IOException { + stream.readFully(bytes); + } + + @Override + public int read(ByteBuffer buf) throws IOException { + return stream.read(buf); + } + + @Override + public void readFully(ByteBuffer buf) throws IOException { + readFully(reader, buf); + } + + private class H2Reader implements Reader { + @Override + public int read(ByteBuffer buf) throws IOException { + return stream.read(buf); + } + } + + public static void readFully(Reader reader, ByteBuffer buf) throws IOException { + // unfortunately the Hadoop APIs seem to not have a 'readFully' equivalent for the byteBuffer read + // calls. The read(ByteBuffer) call might read fewer than byteBuffer.hasRemaining() bytes. Thus we + // have to loop to ensure we read them. + while (buf.hasRemaining()) { + int readCount = reader.read(buf); + if (readCount == -1) { + // this is probably a bug in the ParquetReader. We shouldn't have called readFully with a buffer + // that has more remaining than the amount of data in the stream. + throw new EOFException("Reached the end of stream. Still have: " + buf.remaining() + " bytes left"); + } + } + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java new file mode 100644 index 0000000..7c321cd --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.hadoop.util; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.parquet.Log; +import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.io.SeekableInputStream; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +/** + * Convenience methods to get Parquet abstractions for Hadoop data streams. + */ +public class HadoopStreams { + + private static final Log LOG = Log.getLog(HadoopStreams.class); + + private static final Class<?> byteBufferReadableClass = getReadableClass(); + static final Constructor<SeekableInputStream> h2SeekableConstructor = getH2SeekableConstructor(); + + /** + * Wraps a {@link FSDataInputStream} in a {@link SeekableInputStream} + * implementation for Parquet readers. + * + * @param stream a Hadoop FSDataInputStream + * @return a SeekableInputStream + */ + public static SeekableInputStream wrap(FSDataInputStream stream) { + if (byteBufferReadableClass != null && h2SeekableConstructor != null && + byteBufferReadableClass.isInstance(stream.getWrappedStream())) { + try { + return h2SeekableConstructor.newInstance(stream); + } catch (InstantiationException e) { + LOG.warn("Could not instantiate H2SeekableInputStream, falling back to byte array reads", e); + return new H1SeekableInputStream(stream); + } catch (IllegalAccessException e) { + LOG.warn("Could not instantiate H2SeekableInputStream, falling back to byte array reads", e); + return new H1SeekableInputStream(stream); + } catch (InvocationTargetException e) { + throw new ParquetDecodingException( + "Could not instantiate H2SeekableInputStream", e.getTargetException()); + } + } else { + return new H1SeekableInputStream(stream); + } + } + + private static Class<?> getReadableClass() { + try { + return Class.forName("org.apache.hadoop.fs.ByteBufferReadable"); + } catch (ClassNotFoundException e) { + return null; + } catch (NoClassDefFoundError e) { + return null; + } + } + + @SuppressWarnings("unchecked") + private static Class<SeekableInputStream> getH2SeekableClass() { + try { + return (Class<SeekableInputStream>) Class.forName( + "org.apache.parquet.hadoop.util.H2SeekableInputStream"); + } catch (ClassNotFoundException e) { + return null; + } catch (NoClassDefFoundError e) { + return null; + } + } + + private static Constructor<SeekableInputStream> getH2SeekableConstructor() { + Class<SeekableInputStream> h2SeekableClass = getH2SeekableClass(); + if (h2SeekableClass != null) { + try { + return h2SeekableClass.getConstructor(FSDataInputStream.class); + } catch (NoSuchMethodException e) { + return null; + } + } + return null; + } + +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockInputStream.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockInputStream.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockInputStream.java new file mode 100644 index 0000000..a112288 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockInputStream.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.hadoop.util; + +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; +import java.io.ByteArrayInputStream; +import java.io.IOException; + +class MockInputStream extends ByteArrayInputStream + implements Seekable, PositionedReadable { + static final byte[] TEST_ARRAY = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; + + private int[] lengths; + private int current = 0; + MockInputStream(int... actualReadLengths) { + super(TEST_ARRAY); + this.lengths = actualReadLengths; + } + + @Override + public synchronized int read(byte[] b, int off, int len) { + if (current < lengths.length) { + if (len <= lengths[current]) { + // when len == lengths[current], the next read will by 0 bytes + int bytesRead = super.read(b, off, len); + lengths[current] -= bytesRead; + return bytesRead; + } else { + int bytesRead = super.read(b, off, lengths[current]); + current += 1; + return bytesRead; + } + } else { + return super.read(b, off, len); + } + } + + @Override + public int read(long position, byte[] buffer, int offset, int length) throws IOException { + seek(position); + return read(buffer, offset, length); + } + + @Override + public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { + throw new UnsupportedOperationException("Not actually supported."); + } + + @Override + public void readFully(long position, byte[] buffer) throws IOException { + throw new UnsupportedOperationException("Not actually supported."); + } + + @Override + public void seek(long pos) throws IOException { + this.pos = (int) pos; + } + + @Override + public long getPos() throws IOException { + return this.pos; + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + seek(targetPos); + return true; + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop1ByteBufferReads.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop1ByteBufferReads.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop1ByteBufferReads.java new file mode 100644 index 0000000..9e4e2a9 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop1ByteBufferReads.java @@ -0,0 +1,761 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.hadoop.util; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.parquet.hadoop.TestUtils; +import org.junit.Assert; +import org.junit.Test; +import java.io.EOFException; +import java.nio.ByteBuffer; +import java.util.concurrent.Callable; + +import static org.apache.parquet.hadoop.util.MockInputStream.TEST_ARRAY; + +public class TestHadoop1ByteBufferReads { + + private static final ThreadLocal<byte[]> TEMP = new ThreadLocal<byte[]>() { + @Override + protected byte[] initialValue() { + return new byte[8192]; + } + }; + + @Test + public void testHeapRead() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocate(20); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream()); + + int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); + Assert.assertEquals(10, len); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(20, readBuffer.limit()); + + len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); + Assert.assertEquals(-1, len); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY), readBuffer); + } + + @Test + public void testHeapSmallBuffer() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocate(5); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream()); + + int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); + Assert.assertEquals(5, len); + Assert.assertEquals(5, readBuffer.position()); + Assert.assertEquals(5, readBuffer.limit()); + + len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); + Assert.assertEquals(0, len); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY, 0, 5), readBuffer); + } + + @Test + public void testHeapSmallReads() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocate(10); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); + + int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); + Assert.assertEquals(2, len); + Assert.assertEquals(2, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); + Assert.assertEquals(3, len); + Assert.assertEquals(5, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); + Assert.assertEquals(3, len); + Assert.assertEquals(8, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); + Assert.assertEquals(2, len); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY), readBuffer); + } + + @Test + public void testHeapPosition() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocate(20); + readBuffer.position(10); + readBuffer.mark(); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(8)); + + int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); + Assert.assertEquals(8, len); + Assert.assertEquals(18, readBuffer.position()); + Assert.assertEquals(20, readBuffer.limit()); + + len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); + Assert.assertEquals(2, len); + Assert.assertEquals(20, readBuffer.position()); + Assert.assertEquals(20, readBuffer.limit()); + + len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); + Assert.assertEquals(-1, len); + + readBuffer.reset(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY), readBuffer); + } + + @Test + public void testHeapLimit() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocate(20); + readBuffer.limit(8); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(7)); + + int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); + Assert.assertEquals(7, len); + Assert.assertEquals(7, readBuffer.position()); + Assert.assertEquals(8, readBuffer.limit()); + + len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); + Assert.assertEquals(1, len); + Assert.assertEquals(8, readBuffer.position()); + Assert.assertEquals(8, readBuffer.limit()); + + len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); + Assert.assertEquals(0, len); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer); + } + + @Test + public void testHeapPositionAndLimit() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocate(20); + readBuffer.position(5); + readBuffer.limit(13); + readBuffer.mark(); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(7)); + + int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); + Assert.assertEquals(7, len); + Assert.assertEquals(12, readBuffer.position()); + Assert.assertEquals(13, readBuffer.limit()); + + len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); + Assert.assertEquals(1, len); + Assert.assertEquals(13, readBuffer.position()); + Assert.assertEquals(13, readBuffer.limit()); + + len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer); + Assert.assertEquals(0, len); + + readBuffer.reset(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer); + } + + @Test + public void testDirectRead() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocateDirect(20); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream()); + + int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); + Assert.assertEquals(10, len); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(20, readBuffer.limit()); + + len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); + Assert.assertEquals(-1, len); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY), readBuffer); + } + + @Test + public void testDirectSmallBuffer() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocateDirect(5); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream()); + + int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); + Assert.assertEquals(5, len); + Assert.assertEquals(5, readBuffer.position()); + Assert.assertEquals(5, readBuffer.limit()); + + len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); + Assert.assertEquals(0, len); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY, 0, 5), readBuffer); + } + + @Test + public void testDirectSmallReads() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); + + int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); + Assert.assertEquals(2, len); + Assert.assertEquals(2, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); + Assert.assertEquals(3, len); + Assert.assertEquals(5, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); + Assert.assertEquals(3, len); + Assert.assertEquals(8, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); + Assert.assertEquals(2, len); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY), readBuffer); + } + + @Test + public void testDirectPosition() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocateDirect(20); + readBuffer.position(10); + readBuffer.mark(); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(8)); + + int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); + Assert.assertEquals(8, len); + Assert.assertEquals(18, readBuffer.position()); + Assert.assertEquals(20, readBuffer.limit()); + + len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); + Assert.assertEquals(2, len); + Assert.assertEquals(20, readBuffer.position()); + Assert.assertEquals(20, readBuffer.limit()); + + len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); + Assert.assertEquals(-1, len); + + readBuffer.reset(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY), readBuffer); + } + + @Test + public void testDirectLimit() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocate(20); + readBuffer.limit(8); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(7)); + + int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); + Assert.assertEquals(7, len); + Assert.assertEquals(7, readBuffer.position()); + Assert.assertEquals(8, readBuffer.limit()); + + len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); + Assert.assertEquals(1, len); + Assert.assertEquals(8, readBuffer.position()); + Assert.assertEquals(8, readBuffer.limit()); + + len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); + Assert.assertEquals(0, len); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer); + } + + @Test + public void testDirectPositionAndLimit() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocateDirect(20); + readBuffer.position(5); + readBuffer.limit(13); + readBuffer.mark(); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(7)); + + int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); + Assert.assertEquals(7, len); + Assert.assertEquals(12, readBuffer.position()); + Assert.assertEquals(13, readBuffer.limit()); + + len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); + Assert.assertEquals(1, len); + Assert.assertEquals(13, readBuffer.position()); + Assert.assertEquals(13, readBuffer.limit()); + + len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get()); + Assert.assertEquals(0, len); + + readBuffer.reset(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer); + } + + @Test + public void testDirectSmallTempBufferSmallReads() throws Exception { + byte[] temp = new byte[2]; // this will cause readDirectBuffer to loop + + ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); + + int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp); + Assert.assertEquals(2, len); + Assert.assertEquals(2, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp); + Assert.assertEquals(3, len); + Assert.assertEquals(5, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp); + Assert.assertEquals(3, len); + Assert.assertEquals(8, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp); + Assert.assertEquals(2, len); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp); + Assert.assertEquals(-1, len); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY), readBuffer); + } + + @Test + public void testDirectSmallTempBufferWithPositionAndLimit() throws Exception { + byte[] temp = new byte[2]; // this will cause readDirectBuffer to loop + + ByteBuffer readBuffer = ByteBuffer.allocateDirect(20); + readBuffer.position(5); + readBuffer.limit(13); + readBuffer.mark(); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(7)); + + int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp); + Assert.assertEquals(7, len); + Assert.assertEquals(12, readBuffer.position()); + Assert.assertEquals(13, readBuffer.limit()); + + len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp); + Assert.assertEquals(1, len); + Assert.assertEquals(13, readBuffer.position()); + Assert.assertEquals(13, readBuffer.limit()); + + len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp); + Assert.assertEquals(0, len); + + readBuffer.reset(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer); + } + + @Test + public void testHeapReadFullySmallBuffer() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocate(8); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream()); + + H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer); + Assert.assertEquals(8, readBuffer.position()); + Assert.assertEquals(8, readBuffer.limit()); + + H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer); + Assert.assertEquals(8, readBuffer.position()); + Assert.assertEquals(8, readBuffer.limit()); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer); + } + + @Test + public void testHeapReadFullyLargeBuffer() throws Exception { + final ByteBuffer readBuffer = ByteBuffer.allocate(20); + + final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream()); + + TestUtils.assertThrows("Should throw EOFException", + EOFException.class, new Callable() { + @Override + public Object call() throws Exception { + H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer); + return null; + } + }); + + Assert.assertEquals(0, readBuffer.position()); + Assert.assertEquals(20, readBuffer.limit()); + } + + @Test + public void testHeapReadFullyJustRight() throws Exception { + final ByteBuffer readBuffer = ByteBuffer.allocate(10); + + final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream()); + + // reads all of the bytes available without EOFException + H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + // trying to read 0 more bytes doesn't result in EOFException + H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY), readBuffer); + } + + @Test + public void testHeapReadFullySmallReads() throws Exception { + final ByteBuffer readBuffer = ByteBuffer.allocate(10); + + final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); + + H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY), readBuffer); + } + + @Test + public void testHeapReadFullyPosition() throws Exception { + final ByteBuffer readBuffer = ByteBuffer.allocate(10); + readBuffer.position(3); + readBuffer.mark(); + + final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); + + H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + readBuffer.reset(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); + } + + @Test + public void testHeapReadFullyLimit() throws Exception { + final ByteBuffer readBuffer = ByteBuffer.allocate(10); + readBuffer.limit(7); + + final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); + + H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer); + Assert.assertEquals(7, readBuffer.position()); + Assert.assertEquals(7, readBuffer.limit()); + + H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer); + Assert.assertEquals(7, readBuffer.position()); + Assert.assertEquals(7, readBuffer.limit()); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); + + readBuffer.position(7); + readBuffer.limit(10); + H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY), readBuffer); + } + + @Test + public void testHeapReadFullyPositionAndLimit() throws Exception { + final ByteBuffer readBuffer = ByteBuffer.allocate(10); + readBuffer.position(3); + readBuffer.limit(7); + readBuffer.mark(); + + final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); + + H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer); + Assert.assertEquals(7, readBuffer.position()); + Assert.assertEquals(7, readBuffer.limit()); + + H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer); + Assert.assertEquals(7, readBuffer.position()); + Assert.assertEquals(7, readBuffer.limit()); + + readBuffer.reset(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer); + + readBuffer.position(7); + readBuffer.limit(10); + H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + readBuffer.reset(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); + } + + @Test + public void testDirectReadFullySmallBuffer() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocateDirect(8); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream()); + + H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get()); + Assert.assertEquals(8, readBuffer.position()); + Assert.assertEquals(8, readBuffer.limit()); + + H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get()); + Assert.assertEquals(8, readBuffer.position()); + Assert.assertEquals(8, readBuffer.limit()); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer); + } + + @Test + public void testDirectReadFullyLargeBuffer() throws Exception { + final ByteBuffer readBuffer = ByteBuffer.allocateDirect(20); + + final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream()); + + TestUtils.assertThrows("Should throw EOFException", + EOFException.class, new Callable() { + @Override + public Object call() throws Exception { + H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get()); + return null; + } + }); + + // NOTE: This behavior differs from readFullyHeapBuffer because direct uses + // several read operations that will read up to the end of the input. This + // is a correct value because the bytes in the buffer are valid. This + // behavior can't be implemented for the heap buffer without using the read + // method instead of the readFully method on the underlying + // FSDataInputStream. + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(20, readBuffer.limit()); + } + + @Test + public void testDirectReadFullyJustRight() throws Exception { + final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); + + final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream()); + + // reads all of the bytes available without EOFException + H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get()); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + // trying to read 0 more bytes doesn't result in EOFException + H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get()); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY), readBuffer); + } + + @Test + public void testDirectReadFullySmallReads() throws Exception { + final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); + + final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); + + H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get()); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get()); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY), readBuffer); + } + + @Test + public void testDirectReadFullyPosition() throws Exception { + final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); + readBuffer.position(3); + readBuffer.mark(); + + final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); + + H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get()); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get()); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + readBuffer.reset(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); + } + + @Test + public void testDirectReadFullyLimit() throws Exception { + final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); + readBuffer.limit(7); + + final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); + + H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get()); + Assert.assertEquals(7, readBuffer.position()); + Assert.assertEquals(7, readBuffer.limit()); + + H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get()); + Assert.assertEquals(7, readBuffer.position()); + Assert.assertEquals(7, readBuffer.limit()); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); + + readBuffer.position(7); + readBuffer.limit(10); + H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get()); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY), readBuffer); + } + + @Test + public void testDirectReadFullyPositionAndLimit() throws Exception { + final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); + readBuffer.position(3); + readBuffer.limit(7); + readBuffer.mark(); + + final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); + + H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get()); + Assert.assertEquals(7, readBuffer.position()); + Assert.assertEquals(7, readBuffer.limit()); + + H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get()); + Assert.assertEquals(7, readBuffer.position()); + Assert.assertEquals(7, readBuffer.limit()); + + readBuffer.reset(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer); + + readBuffer.position(7); + readBuffer.limit(10); + H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get()); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + readBuffer.reset(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); + } + + @Test + public void testDirectReadFullySmallTempBufferWithPositionAndLimit() throws Exception { + byte[] temp = new byte[2]; // this will cause readFully to loop + + final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); + readBuffer.position(3); + readBuffer.limit(7); + readBuffer.mark(); + + final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); + + H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, temp); + Assert.assertEquals(7, readBuffer.position()); + Assert.assertEquals(7, readBuffer.limit()); + + H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, temp); + Assert.assertEquals(7, readBuffer.position()); + Assert.assertEquals(7, readBuffer.limit()); + + readBuffer.reset(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer); + + readBuffer.position(7); + readBuffer.limit(10); + H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, temp); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + readBuffer.reset(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java new file mode 100644 index 0000000..86b903c --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.hadoop.util; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.parquet.hadoop.TestUtils; +import org.junit.Assert; +import org.junit.Test; +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.Callable; + +import static org.apache.parquet.hadoop.util.MockInputStream.TEST_ARRAY; + +public class TestHadoop2ByteBufferReads { + + /** + * This mimics ByteBuffer reads from streams in Hadoop 2 + */ + private static class MockBufferReader implements H2SeekableInputStream.Reader { + private final FSDataInputStream stream; + + public MockBufferReader(FSDataInputStream stream) { + this.stream = stream; + } + + @Override + public int read(ByteBuffer buf) throws IOException { + // this is inefficient, but simple for correctness tests of + // readFully(ByteBuffer) + byte[] temp = new byte[buf.remaining()]; + int bytesRead = stream.read(temp, 0, temp.length); + if (bytesRead > 0) { + buf.put(temp, 0, bytesRead); + } + return bytesRead; + } + } + + @Test + public void testHeapReadFullySmallBuffer() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocate(8); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream()); + MockBufferReader reader = new MockBufferReader(hadoopStream); + + H2SeekableInputStream.readFully(reader, readBuffer); + Assert.assertEquals(8, readBuffer.position()); + Assert.assertEquals(8, readBuffer.limit()); + + H2SeekableInputStream.readFully(reader, readBuffer); + Assert.assertEquals(8, readBuffer.position()); + Assert.assertEquals(8, readBuffer.limit()); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer); + } + + @Test + public void testHeapReadFullyLargeBuffer() throws Exception { + final ByteBuffer readBuffer = ByteBuffer.allocate(20); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream()); + final MockBufferReader reader = new MockBufferReader(hadoopStream); + + TestUtils.assertThrows("Should throw EOFException", + EOFException.class, new Callable() { + @Override + public Object call() throws Exception { + H2SeekableInputStream.readFully(reader, readBuffer); + return null; + } + }); + + // NOTE: This behavior differs from readFullyHeapBuffer because direct uses + // several read operations that will read up to the end of the input. This + // is a correct value because the bytes in the buffer are valid. This + // behavior can't be implemented for the heap buffer without using the read + // method instead of the readFully method on the underlying + // FSDataInputStream. + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(20, readBuffer.limit()); + } + + @Test + public void testHeapReadFullyJustRight() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocate(10); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream()); + MockBufferReader reader = new MockBufferReader(hadoopStream); + + // reads all of the bytes available without EOFException + H2SeekableInputStream.readFully(reader, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + // trying to read 0 more bytes doesn't result in EOFException + H2SeekableInputStream.readFully(reader, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY), readBuffer); + } + + @Test + public void testHeapReadFullySmallReads() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocate(10); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); + MockBufferReader reader = new MockBufferReader(hadoopStream); + + H2SeekableInputStream.readFully(reader, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + H2SeekableInputStream.readFully(reader, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY), readBuffer); + } + + @Test + public void testHeapReadFullyPosition() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocate(10); + readBuffer.position(3); + readBuffer.mark(); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); + MockBufferReader reader = new MockBufferReader(hadoopStream); + + H2SeekableInputStream.readFully(reader, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + H2SeekableInputStream.readFully(reader, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + readBuffer.reset(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); + } + + @Test + public void testHeapReadFullyLimit() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocate(10); + readBuffer.limit(7); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); + MockBufferReader reader = new MockBufferReader(hadoopStream); + + H2SeekableInputStream.readFully(reader, readBuffer); + Assert.assertEquals(7, readBuffer.position()); + Assert.assertEquals(7, readBuffer.limit()); + + H2SeekableInputStream.readFully(reader, readBuffer); + Assert.assertEquals(7, readBuffer.position()); + Assert.assertEquals(7, readBuffer.limit()); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); + + readBuffer.position(7); + readBuffer.limit(10); + H2SeekableInputStream.readFully(reader, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY), readBuffer); + } + + @Test + public void testHeapReadFullyPositionAndLimit() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocate(10); + readBuffer.position(3); + readBuffer.limit(7); + readBuffer.mark(); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); + MockBufferReader reader = new MockBufferReader(hadoopStream); + + H2SeekableInputStream.readFully(reader, readBuffer); + Assert.assertEquals(7, readBuffer.position()); + Assert.assertEquals(7, readBuffer.limit()); + + H2SeekableInputStream.readFully(reader, readBuffer); + Assert.assertEquals(7, readBuffer.position()); + Assert.assertEquals(7, readBuffer.limit()); + + readBuffer.reset(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer); + + readBuffer.position(7); + readBuffer.limit(10); + H2SeekableInputStream.readFully(reader, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + readBuffer.reset(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); + } + + @Test + public void testDirectReadFullySmallBuffer() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocateDirect(8); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream()); + MockBufferReader reader = new MockBufferReader(hadoopStream); + + H2SeekableInputStream.readFully(reader, readBuffer); + Assert.assertEquals(8, readBuffer.position()); + Assert.assertEquals(8, readBuffer.limit()); + + H2SeekableInputStream.readFully(reader, readBuffer); + Assert.assertEquals(8, readBuffer.position()); + Assert.assertEquals(8, readBuffer.limit()); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer); + } + + @Test + public void testDirectReadFullyLargeBuffer() throws Exception { + final ByteBuffer readBuffer = ByteBuffer.allocateDirect(20); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream()); + final MockBufferReader reader = new MockBufferReader(hadoopStream); + + TestUtils.assertThrows("Should throw EOFException", + EOFException.class, new Callable() { + @Override + public Object call() throws Exception { + H2SeekableInputStream.readFully(reader, readBuffer); + return null; + } + }); + + // NOTE: This behavior differs from readFullyHeapBuffer because direct uses + // several read operations that will read up to the end of the input. This + // is a correct value because the bytes in the buffer are valid. This + // behavior can't be implemented for the heap buffer without using the read + // method instead of the readFully method on the underlying + // FSDataInputStream. + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(20, readBuffer.limit()); + } + + @Test + public void testDirectReadFullyJustRight() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream()); + MockBufferReader reader = new MockBufferReader(hadoopStream); + + // reads all of the bytes available without EOFException + H2SeekableInputStream.readFully(reader, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + // trying to read 0 more bytes doesn't result in EOFException + H2SeekableInputStream.readFully(reader, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY), readBuffer); + } + + @Test + public void testDirectReadFullySmallReads() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); + MockBufferReader reader = new MockBufferReader(hadoopStream); + + H2SeekableInputStream.readFully(reader, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + H2SeekableInputStream.readFully(reader, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY), readBuffer); + } + + @Test + public void testDirectReadFullyPosition() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); + readBuffer.position(3); + readBuffer.mark(); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); + MockBufferReader reader = new MockBufferReader(hadoopStream); + + H2SeekableInputStream.readFully(reader, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + H2SeekableInputStream.readFully(reader, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + readBuffer.reset(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); + } + + @Test + public void testDirectReadFullyLimit() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); + readBuffer.limit(7); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); + H2SeekableInputStream.Reader reader = new MockBufferReader(hadoopStream); + + H2SeekableInputStream.readFully(reader, readBuffer); + Assert.assertEquals(7, readBuffer.position()); + Assert.assertEquals(7, readBuffer.limit()); + + H2SeekableInputStream.readFully(reader, readBuffer); + Assert.assertEquals(7, readBuffer.position()); + Assert.assertEquals(7, readBuffer.limit()); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); + + readBuffer.position(7); + readBuffer.limit(10); + H2SeekableInputStream.readFully(reader, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY), readBuffer); + } + + @Test + public void testDirectReadFullyPositionAndLimit() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); + readBuffer.position(3); + readBuffer.limit(7); + readBuffer.mark(); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3)); + MockBufferReader reader = new MockBufferReader(hadoopStream); + + H2SeekableInputStream.readFully(reader, readBuffer); + Assert.assertEquals(7, readBuffer.position()); + Assert.assertEquals(7, readBuffer.limit()); + + H2SeekableInputStream.readFully(reader, readBuffer); + Assert.assertEquals(7, readBuffer.position()); + Assert.assertEquals(7, readBuffer.limit()); + + readBuffer.reset(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer); + + readBuffer.position(7); + readBuffer.limit(10); + H2SeekableInputStream.readFully(reader, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + readBuffer.reset(); + Assert.assertEquals("Buffer contents should match", + ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 2d6d7d2..ca34309 100644 --- a/pom.xml +++ b/pom.xml @@ -69,7 +69,8 @@ <jackson.version>1.9.11</jackson.version> <jackson.package>org.codehaus.jackson</jackson.package> <shade.prefix>shaded.parquet</shade.prefix> - <hadoop.version>1.1.0</hadoop.version> + <hadoop.version>2.3.0</hadoop.version> + <hadoop1.version>1.1.0</hadoop1.version> <cascading.version>2.5.3</cascading.version> <cascading3.version>3.0.3</cascading3.version> <parquet.format.version>2.3.1</parquet.format.version> @@ -80,7 +81,7 @@ <scala.binary.version>2.10</scala.binary.version> <scala.maven.test.skip>false</scala.maven.test.skip> <pig.version>0.14.0</pig.version> - <pig.classifier/> + <pig.classifier>h2</pig.classifier> <thrift.version>0.7.0</thrift.version> <fastutil.version>6.5.7</fastutil.version> <semver.api.version>0.9.33</semver.api.version> @@ -509,19 +510,18 @@ </profile> <profile> - <id>hadoop-2</id> + <id>hadoop-1</id> <activation> <property> <name>hadoop.profile</name> - <value>hadoop2</value> + <value>hadoop1</value> </property> </activation> <properties> <!-- test hadoop-1 with the same jars that were produced for default profile --> <maven.main.skip>true</maven.main.skip> - <hadoop.version>2.3.0</hadoop.version> - <pig.version>0.14.0</pig.version> - <pig.classifier>h2</pig.classifier> + <hadoop.version>${hadoop1.version}</hadoop.version> + <pig.classifier/> </properties> </profile> <profile>
