Repository: parquet-mr
Updated Branches:
  refs/heads/master 81f480149 -> 8bfd9b4d8


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index 9512b93..bdde70e 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -28,6 +28,8 @@ import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.ParquetProperties.WriterVersion;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.io.OutputFile;
 import org.apache.parquet.schema.MessageType;
 
 /**
@@ -219,7 +221,8 @@ public class ParquetWriter<T> implements Closeable {
       boolean validating,
       WriterVersion writerVersion,
       Configuration conf) throws IOException {
-    this(file, mode, writeSupport, compressionCodecName, blockSize,
+    this(HadoopOutputFile.fromPath(file, conf),
+        mode, writeSupport, compressionCodecName, blockSize,
         validating, conf, MAX_PADDING_SIZE_DEFAULT,
         ParquetProperties.builder()
             .withPageSize(pageSize)
@@ -257,11 +260,11 @@ public class ParquetWriter<T> implements Closeable {
   }
 
   ParquetWriter(
-      Path file,
+      OutputFile file,
       ParquetFileWriter.Mode mode,
       WriteSupport<T> writeSupport,
       CompressionCodecName compressionCodecName,
-      int blockSize,
+      int rowGroupSize,
       boolean validating,
       Configuration conf,
       int maxPaddingSize,
@@ -271,7 +274,7 @@ public class ParquetWriter<T> implements Closeable {
     MessageType schema = writeContext.getSchema();
 
     ParquetFileWriter fileWriter = new ParquetFileWriter(
-        conf, schema, file, mode, blockSize, maxPaddingSize);
+        file, schema, mode, rowGroupSize, maxPaddingSize);
     fileWriter.start();
 
     this.codecFactory = new CodecFactory(conf, 
encodingProps.getPageSizeThreshold());
@@ -281,7 +284,7 @@ public class ParquetWriter<T> implements Closeable {
         writeSupport,
         schema,
         writeContext.getExtraMetaData(),
-        blockSize,
+        rowGroupSize,
         compressor,
         validating,
         encodingProps);
@@ -324,7 +327,8 @@ public class ParquetWriter<T> implements Closeable {
    * @param <SELF> The type of this builder that is returned by builder methods
    */
   public abstract static class Builder<T, SELF extends Builder<T, SELF>> {
-    private final Path file;
+    private OutputFile file = null;
+    private Path path = null;
     private Configuration conf = new Configuration();
     private ParquetFileWriter.Mode mode;
     private CompressionCodecName codecName = DEFAULT_COMPRESSION_CODEC_NAME;
@@ -334,8 +338,12 @@ public class ParquetWriter<T> implements Closeable {
     private ParquetProperties.Builder encodingPropsBuilder =
         ParquetProperties.builder();
 
-    protected Builder(Path file) {
-      this.file = file;
+    protected Builder(Path path) {
+      this.path = path;
+    }
+
+    protected Builder(OutputFile path) {
+      this.file = path;
     }
 
     /**
@@ -485,15 +493,35 @@ public class ParquetWriter<T> implements Closeable {
     }
 
     /**
+     * Set a property that will be available to the read path. For writers 
that use a Hadoop
+     * configuration, this is the recommended way to add configuration values.
+     *
+     * @param property a String property name
+     * @param value a String property value
+     * @return this builder for method chaining.
+     */
+    public SELF config(String property, String value) {
+      conf.set(property, value);
+      return self();
+    }
+
+    /**
      * Build a {@link ParquetWriter} with the accumulated configuration.
      *
      * @return a configured {@code ParquetWriter} instance.
      * @throws IOException
      */
     public ParquetWriter<T> build() throws IOException {
-      return new ParquetWriter<T>(file, mode, getWriteSupport(conf), codecName,
-          rowGroupSize, enableValidation, conf, maxPaddingSize,
-          encodingPropsBuilder.build());
+      if (file != null) {
+        return new ParquetWriter<>(file,
+            mode, getWriteSupport(conf), codecName, rowGroupSize, 
enableValidation, conf,
+            maxPaddingSize, encodingPropsBuilder.build());
+      } else {
+        return new ParquetWriter<>(HadoopOutputFile.fromPath(path, conf),
+            mode, getWriteSupport(conf), codecName,
+            rowGroupSize, enableValidation, conf, maxPaddingSize,
+            encodingPropsBuilder.build());
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/UnmaterializableRecordCounter.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/UnmaterializableRecordCounter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/UnmaterializableRecordCounter.java
index 4696319..a70a0d0 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/UnmaterializableRecordCounter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/UnmaterializableRecordCounter.java
@@ -20,10 +20,12 @@ package org.apache.parquet.hadoop;
 
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.io.ParquetDecodingException;
 import 
org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import java.util.Map;
 
 // Essentially taken from:
 // 
https://github.com/twitter/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/mapreduce/input/LzoRecordReader.java#L124
@@ -60,6 +62,10 @@ public class UnmaterializableRecordCounter {
      );
   }
 
+  public UnmaterializableRecordCounter(ParquetReadOptions options, long 
totalNumRecords) {
+    this(getFloat(options, BAD_RECORD_THRESHOLD_CONF_KEY, DEFAULT_THRESHOLD), 
totalNumRecords);
+  }
+
   public UnmaterializableRecordCounter(double errorThreshold, long 
totalNumRecords) {
     this.errorThreshold = errorThreshold;
     this.totalNumRecords = totalNumRecords;
@@ -85,4 +91,13 @@ public class UnmaterializableRecordCounter {
       throw new ParquetDecodingException(message, cause);
     }
   }
+
+  private static float getFloat(ParquetReadOptions options, String key, float 
defaultValue) {
+    String value = options.getProperty(key);
+    if (value != null) {
+      return Float.valueOf(value);
+    } else {
+      return defaultValue;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException.java
deleted file mode 100644
index 9657cc1..0000000
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.hadoop.codec;
-
-/**
- * This exception will be thrown when the codec is not supported by parquet, 
meaning there is no
- * matching codec defined in {@link 
org.apache.parquet.hadoop.metadata.CompressionCodecName}
- */
-public class CompressionCodecNotSupportedException extends RuntimeException {
-  private final Class codecClass;
-
-  public CompressionCodecNotSupportedException(Class codecClass) {
-    super("codec not supported: " + codecClass.getName());
-    this.codecClass = codecClass;
-  }
-
-  public Class getCodecClass() {
-    return codecClass;
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
deleted file mode 100644
index 153133e..0000000
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.hadoop.metadata;
-
-import org.apache.parquet.format.CompressionCodec;
-import org.apache.parquet.hadoop.codec.CompressionCodecNotSupportedException;
-
-import java.util.Locale;
-
-public enum CompressionCodecName {
-  UNCOMPRESSED(null, CompressionCodec.UNCOMPRESSED, ""),
-  SNAPPY("org.apache.parquet.hadoop.codec.SnappyCodec", 
CompressionCodec.SNAPPY, ".snappy"),
-  GZIP("org.apache.hadoop.io.compress.GzipCodec", CompressionCodec.GZIP, 
".gz"),
-  LZO("com.hadoop.compression.lzo.LzoCodec", CompressionCodec.LZO, ".lzo"),
-  BROTLI("org.apache.hadoop.io.compress.BrotliCodec", CompressionCodec.BROTLI, 
".br"),
-  LZ4("org.apache.hadoop.io.compress.Lz4Codec", CompressionCodec.LZ4, ".lz4"),
-  ZSTD("org.apache.hadoop.io.compress.ZStandardCodec", CompressionCodec.ZSTD, 
".zstd");
-
-  public static CompressionCodecName fromConf(String name) {
-     if (name == null) {
-       return UNCOMPRESSED;
-     }
-     return valueOf(name.toUpperCase(Locale.ENGLISH));
-  }
-
-  public static CompressionCodecName fromCompressionCodec(Class<?> clazz) {
-    if (clazz == null) {
-      return UNCOMPRESSED;
-    }
-    String name = clazz.getName();
-    for (CompressionCodecName codec : CompressionCodecName.values()) {
-      if (name.equals(codec.getHadoopCompressionCodecClassName())) {
-        return codec;
-      }
-    }
-    throw new CompressionCodecNotSupportedException(clazz);
-  }
-
-  public static CompressionCodecName fromParquet(CompressionCodec codec) {
-    for (CompressionCodecName codecName : CompressionCodecName.values()) {
-      if (codec.equals(codecName.parquetCompressionCodec)) {
-        return codecName;
-      }
-    }
-    throw new IllegalArgumentException("Unknown compression codec " + codec);
-  }
-
-  private final String hadoopCompressionCodecClass;
-  private final CompressionCodec parquetCompressionCodec;
-  private final String extension;
-
-  private CompressionCodecName(String hadoopCompressionCodecClass, 
CompressionCodec parquetCompressionCodec, String extension) {
-    this.hadoopCompressionCodecClass = hadoopCompressionCodecClass;
-    this.parquetCompressionCodec = parquetCompressionCodec;
-    this.extension = extension;
-  }
-
-  public String getHadoopCompressionCodecClassName() {
-    return hadoopCompressionCodecClass;
-  }
-
-  public Class getHadoopCompressionCodecClass() {
-    String codecClassName = getHadoopCompressionCodecClassName();
-    if (codecClassName==null) {
-      return null;
-    }
-    try {
-      return Class.forName(codecClassName);
-    } catch (ClassNotFoundException e) {
-      return null;
-    }
-  }
-
-  public CompressionCodec getParquetCompressionCodec() {
-    return parquetCompressionCodec;
-  }
-
-  public String getExtension() {
-    return extension;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java
index 4a03b1a..876a1f3 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java
@@ -20,32 +20,23 @@
 package org.apache.parquet.hadoop.util;
 
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.parquet.io.SeekableInputStream;
-import java.io.EOFException;
+import org.apache.parquet.io.DelegatingSeekableInputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 /**
  * SeekableInputStream implementation that implements read(ByteBuffer) for
  * Hadoop 1 FSDataInputStream.
  */
-class H1SeekableInputStream extends SeekableInputStream {
-
-  private final int COPY_BUFFER_SIZE = 8192;
-  private final byte[] temp = new byte[COPY_BUFFER_SIZE];
+class H1SeekableInputStream extends DelegatingSeekableInputStream {
 
   private final FSDataInputStream stream;
 
   public H1SeekableInputStream(FSDataInputStream stream) {
+    super(stream);
     this.stream = stream;
   }
 
   @Override
-  public void close() throws IOException {
-    stream.close();
-  }
-
-  @Override
   public long getPos() throws IOException {
     return stream.getPos();
   }
@@ -56,16 +47,6 @@ class H1SeekableInputStream extends SeekableInputStream {
   }
 
   @Override
-  public int read() throws IOException {
-    return stream.read();
-  }
-
-  @Override
-  public int read(byte[] b, int off, int len) throws IOException {
-    return stream.read(b, off, len);
-  }
-
-  @Override
   public void readFully(byte[] bytes) throws IOException {
     stream.readFully(bytes, 0, bytes.length);
   }
@@ -75,80 +56,4 @@ class H1SeekableInputStream extends SeekableInputStream {
     stream.readFully(bytes);
   }
 
-  @Override
-  public int read(ByteBuffer buf) throws IOException {
-    if (buf.hasArray()) {
-      return readHeapBuffer(stream, buf);
-    } else {
-      return readDirectBuffer(stream, buf, temp);
-    }
-  }
-
-  @Override
-  public void readFully(ByteBuffer buf) throws IOException {
-    if (buf.hasArray()) {
-      readFullyHeapBuffer(stream, buf);
-    } else {
-      readFullyDirectBuffer(stream, buf, temp);
-    }
-  }
-
-  // Visible for testing
-  static int readHeapBuffer(FSDataInputStream f, ByteBuffer buf) throws 
IOException {
-    int bytesRead = f.read(buf.array(), buf.arrayOffset() + buf.position(), 
buf.remaining());
-    if (bytesRead < 0) {
-      // if this resulted in EOF, don't update position
-      return bytesRead;
-    } else {
-      buf.position(buf.position() + bytesRead);
-      return bytesRead;
-    }
-  }
-
-  // Visible for testing
-  static void readFullyHeapBuffer(FSDataInputStream f, ByteBuffer buf) throws 
IOException {
-    f.readFully(buf.array(), buf.arrayOffset() + buf.position(), 
buf.remaining());
-    buf.position(buf.limit());
-  }
-
-  // Visible for testing
-  static int readDirectBuffer(FSDataInputStream f, ByteBuffer buf, byte[] 
temp) throws IOException {
-    // copy all the bytes that return immediately, stopping at the first
-    // read that doesn't return a full buffer.
-    int nextReadLength = Math.min(buf.remaining(), temp.length);
-    int totalBytesRead = 0;
-    int bytesRead;
-
-    while ((bytesRead = f.read(temp, 0, nextReadLength)) == temp.length) {
-      buf.put(temp);
-      totalBytesRead += bytesRead;
-      nextReadLength = Math.min(buf.remaining(), temp.length);
-    }
-
-    if (bytesRead < 0) {
-      // return -1 if nothing was read
-      return totalBytesRead == 0 ? -1 : totalBytesRead;
-    } else {
-      // copy the last partial buffer
-      buf.put(temp, 0, bytesRead);
-      totalBytesRead += bytesRead;
-      return totalBytesRead;
-    }
-  }
-
-  // Visible for testing
-  static void readFullyDirectBuffer(FSDataInputStream f, ByteBuffer buf, 
byte[] temp) throws IOException {
-    int nextReadLength = Math.min(buf.remaining(), temp.length);
-    int bytesRead = 0;
-
-    while (nextReadLength > 0 && (bytesRead = f.read(temp, 0, nextReadLength)) 
>= 0) {
-      buf.put(temp, 0, bytesRead);
-      nextReadLength = Math.min(buf.remaining(), temp.length);
-    }
-
-    if (bytesRead < 0 && buf.remaining() > 0) {
-      throw new EOFException(
-          "Reached the end of stream. Still have: " + buf.remaining() + " 
bytes left");
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
index ec4567e..c68f6b6 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
@@ -20,7 +20,7 @@
 package org.apache.parquet.hadoop.util;
 
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.parquet.io.SeekableInputStream;
+import org.apache.parquet.io.DelegatingSeekableInputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -29,7 +29,7 @@ import java.nio.ByteBuffer;
  * SeekableInputStream implementation for FSDataInputStream that implements
  * ByteBufferReadable in Hadoop 2.
  */
-class H2SeekableInputStream extends SeekableInputStream {
+class H2SeekableInputStream extends DelegatingSeekableInputStream {
 
   // Visible for testing
   interface Reader {
@@ -40,6 +40,7 @@ class H2SeekableInputStream extends SeekableInputStream {
   private final Reader reader;
 
   public H2SeekableInputStream(FSDataInputStream stream) {
+    super(stream);
     this.stream = stream;
     this.reader = new H2Reader();
   }
@@ -60,21 +61,6 @@ class H2SeekableInputStream extends SeekableInputStream {
   }
 
   @Override
-  public int read() throws IOException {
-    return stream.read();
-  }
-
-  @Override
-  public int read(byte[] b, int off, int len) throws IOException {
-    return stream.read(b, off, len);
-  }
-
-  @Override
-  public void readFully(byte[] bytes) throws IOException {
-    stream.readFully(bytes, 0, bytes.length);
-  }
-
-  @Override
   public void readFully(byte[] bytes, int start, int len) throws IOException {
     stream.readFully(bytes);
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java
new file mode 100644
index 0000000..a46c8db
--- /dev/null
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java
@@ -0,0 +1,39 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.CodecFactory;
+
+public class HadoopCodecs {
+  public static CompressionCodecFactory newFactory(int sizeHint) {
+    return new CodecFactory(new Configuration(), sizeHint);
+  }
+
+  public static CompressionCodecFactory newFactory(Configuration conf, int 
sizeHint) {
+    return new CodecFactory(conf, sizeHint);
+  }
+
+  public static CompressionCodecFactory newDirectFactory(Configuration conf, 
ByteBufferAllocator allocator, int sizeHint) {
+    return CodecFactory.createDirectCodecFactory(conf, allocator, sizeHint);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopOutputFile.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopOutputFile.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopOutputFile.java
new file mode 100644
index 0000000..4740fd4
--- /dev/null
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopOutputFile.java
@@ -0,0 +1,100 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.PositionOutputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+public class HadoopOutputFile implements OutputFile {
+  // need to supply a buffer size when setting block size. this is the default
+  // for hadoop 1 to present. copying it avoids loading DFSConfigKeys.
+  private static final int DFS_BUFFER_SIZE_DEFAULT = 4096;
+
+  private static final Set<String> BLOCK_FS_SCHEMES = new HashSet<String>();
+  static {
+    BLOCK_FS_SCHEMES.add("hdfs");
+    BLOCK_FS_SCHEMES.add("webhdfs");
+    BLOCK_FS_SCHEMES.add("viewfs");
+  }
+
+  // visible for testing
+  public static Set<String> getBlockFileSystems() {
+    return BLOCK_FS_SCHEMES;
+  }
+
+  private static boolean supportsBlockSize(FileSystem fs) {
+    return BLOCK_FS_SCHEMES.contains(fs.getUri().getScheme());
+  }
+
+  private final FileSystem fs;
+  private final Path path;
+  private final Configuration conf;
+
+  public static HadoopOutputFile fromPath(Path path, Configuration conf)
+      throws IOException {
+    FileSystem fs = path.getFileSystem(conf);
+    return new HadoopOutputFile(fs, fs.makeQualified(path), conf);
+  }
+
+  private HadoopOutputFile(FileSystem fs, Path path, Configuration conf) {
+    this.fs = fs;
+    this.path = path;
+    this.conf = conf;
+  }
+
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  @Override
+  public PositionOutputStream create(long blockSizeHint) throws IOException {
+    return HadoopStreams.wrap(fs.create(path, false /* do not overwrite */,
+        DFS_BUFFER_SIZE_DEFAULT, fs.getDefaultReplication(path),
+        Math.max(fs.getDefaultBlockSize(path), blockSizeHint)));
+  }
+
+  @Override
+  public PositionOutputStream createOrOverwrite(long blockSizeHint) throws 
IOException {
+    return HadoopStreams.wrap(fs.create(path, true /* overwrite if exists */,
+        DFS_BUFFER_SIZE_DEFAULT, fs.getDefaultReplication(path),
+        Math.max(fs.getDefaultBlockSize(path), blockSizeHint)));
+  }
+
+  @Override
+  public boolean supportsBlockSize() {
+    return supportsBlockSize(fs);
+  }
+
+  @Override
+  public long defaultBlockSize() {
+    return fs.getDefaultBlockSize(path);
+  }
+
+  @Override
+  public String toString() {
+    return path.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopPositionOutputStream.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopPositionOutputStream.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopPositionOutputStream.java
new file mode 100644
index 0000000..4b194aa
--- /dev/null
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopPositionOutputStream.java
@@ -0,0 +1,66 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.parquet.io.PositionOutputStream;
+import java.io.IOException;
+
+public class HadoopPositionOutputStream extends PositionOutputStream {
+  private final FSDataOutputStream wrapped;
+
+  HadoopPositionOutputStream(FSDataOutputStream wrapped) {
+    this.wrapped = wrapped;
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return wrapped.getPos();
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    wrapped.write(b);
+  }
+
+  @Override
+  public void write(byte[] b) throws IOException {
+    wrapped.write(b);
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    wrapped.write(b, off, len);
+  }
+
+  public void sync() throws IOException {
+    wrapped.hsync();
+  }
+
+  @Override
+  public void flush() throws IOException {
+    wrapped.flush();
+  }
+
+  @Override
+  public void close() throws IOException {
+    wrapped.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
index 8731bd6..c35e98f 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
@@ -20,8 +20,11 @@
 package org.apache.parquet.hadoop.util;
 
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.parquet.Preconditions;
 import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.parquet.io.SeekableInputStream;
+import org.apache.parquet.io.PositionOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,6 +49,7 @@ public class HadoopStreams {
    * @return a SeekableInputStream
    */
   public static SeekableInputStream wrap(FSDataInputStream stream) {
+    Preconditions.checkNotNull(stream, "Cannot wrap a null input stream");
     if (byteBufferReadableClass != null && h2SeekableConstructor != null &&
         byteBufferReadableClass.isInstance(stream.getWrappedStream())) {
       try {
@@ -99,4 +103,15 @@ public class HadoopStreams {
     return null;
   }
 
+  /**
+   * Wraps a {@link FSDataOutputStream} in a {@link PositionOutputStream}
+   * implementation for Parquet writers.
+   *
+   * @param stream a Hadoop FSDataOutputStream
+   * @return a SeekableOutputStream
+   */
+  public static PositionOutputStream wrap(FSDataOutputStream stream) {
+    Preconditions.checkNotNull(stream, "Cannot wrap a null output stream");
+    return new HadoopPositionOutputStream(stream);
+  }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputOutputFormatWithPadding.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputOutputFormatWithPadding.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputOutputFormatWithPadding.java
index 0ac9c0f..8e3e6c7 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputOutputFormatWithPadding.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputOutputFormatWithPadding.java
@@ -35,6 +35,8 @@ import org.apache.parquet.hadoop.example.GroupReadSupport;
 import org.apache.parquet.hadoop.example.GroupWriteSupport;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.io.OutputFile;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Types;
@@ -105,7 +107,7 @@ public class TestInputOutputFormatWithPadding {
 
   @Test
   public void testBasicBehaviorWithPadding() throws Exception {
-    ParquetFileWriter.BLOCK_FS_SCHEMES.add("file");
+    HadoopOutputFile.getBlockFileSystems().add("file");
 
     File inputFile = temp.newFile();
     FileOutputStream out = new FileOutputStream(inputFile);
@@ -186,7 +188,7 @@ public class TestInputOutputFormatWithPadding {
     Assert.assertEquals("Should match written file content",
         FILE_CONTENT, reconstructed);
 
-    ParquetFileWriter.BLOCK_FS_SCHEMES.remove("file");
+    HadoopOutputFile.getBlockFileSystems().remove("file");
   }
 
   private void waitForJob(Job job) throws Exception {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index 1442e04..6915c86 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.parquet.Version;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
 import org.junit.Assume;
 import org.junit.Rule;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockHadoopInputStream.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockHadoopInputStream.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockHadoopInputStream.java
new file mode 100644
index 0000000..b41b3c8
--- /dev/null
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockHadoopInputStream.java
@@ -0,0 +1,87 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+class MockHadoopInputStream extends ByteArrayInputStream
+    implements Seekable, PositionedReadable {
+  static final byte[] TEST_ARRAY = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 
};
+
+  private int[] lengths;
+  private int current = 0;
+  MockHadoopInputStream(int... actualReadLengths) {
+    super(TEST_ARRAY);
+    this.lengths = actualReadLengths;
+  }
+
+  @Override
+  public synchronized int read(byte[] b, int off, int len) {
+    if (current < lengths.length) {
+      if (len <= lengths[current]) {
+        // when len == lengths[current], the next read will by 0 bytes
+        int bytesRead = super.read(b, off, len);
+        lengths[current] -= bytesRead;
+        return bytesRead;
+      } else {
+        int bytesRead = super.read(b, off, lengths[current]);
+        current += 1;
+        return bytesRead;
+      }
+    } else {
+      return super.read(b, off, len);
+    }
+  }
+
+  @Override
+  public int read(long position, byte[] buffer, int offset, int length) throws 
IOException {
+    seek(position);
+    return read(buffer, offset, length);
+  }
+
+  @Override
+  public void readFully(long position, byte[] buffer, int offset, int length) 
throws IOException {
+    throw new UnsupportedOperationException("Not actually supported.");
+  }
+
+  @Override
+  public void readFully(long position, byte[] buffer) throws IOException {
+    throw new UnsupportedOperationException("Not actually supported.");
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    this.pos = (int) pos;
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return this.pos;
+  }
+
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    seek(targetPos);
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockInputStream.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockInputStream.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockInputStream.java
deleted file mode 100644
index a112288..0000000
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockInputStream.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.parquet.hadoop.util;
-
-import org.apache.hadoop.fs.PositionedReadable;
-import org.apache.hadoop.fs.Seekable;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-
-class MockInputStream extends ByteArrayInputStream
-    implements Seekable, PositionedReadable {
-  static final byte[] TEST_ARRAY = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 
};
-
-  private int[] lengths;
-  private int current = 0;
-  MockInputStream(int... actualReadLengths) {
-    super(TEST_ARRAY);
-    this.lengths = actualReadLengths;
-  }
-
-  @Override
-  public synchronized int read(byte[] b, int off, int len) {
-    if (current < lengths.length) {
-      if (len <= lengths[current]) {
-        // when len == lengths[current], the next read will by 0 bytes
-        int bytesRead = super.read(b, off, len);
-        lengths[current] -= bytesRead;
-        return bytesRead;
-      } else {
-        int bytesRead = super.read(b, off, lengths[current]);
-        current += 1;
-        return bytesRead;
-      }
-    } else {
-      return super.read(b, off, len);
-    }
-  }
-
-  @Override
-  public int read(long position, byte[] buffer, int offset, int length) throws 
IOException {
-    seek(position);
-    return read(buffer, offset, length);
-  }
-
-  @Override
-  public void readFully(long position, byte[] buffer, int offset, int length) 
throws IOException {
-    throw new UnsupportedOperationException("Not actually supported.");
-  }
-
-  @Override
-  public void readFully(long position, byte[] buffer) throws IOException {
-    throw new UnsupportedOperationException("Not actually supported.");
-  }
-
-  @Override
-  public void seek(long pos) throws IOException {
-    this.pos = (int) pos;
-  }
-
-  @Override
-  public long getPos() throws IOException {
-    return this.pos;
-  }
-
-  @Override
-  public boolean seekToNewSource(long targetPos) throws IOException {
-    seek(targetPos);
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop1ByteBufferReads.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop1ByteBufferReads.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop1ByteBufferReads.java
deleted file mode 100644
index 9e4e2a9..0000000
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop1ByteBufferReads.java
+++ /dev/null
@@ -1,761 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.parquet.hadoop.util;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.parquet.hadoop.TestUtils;
-import org.junit.Assert;
-import org.junit.Test;
-import java.io.EOFException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.Callable;
-
-import static org.apache.parquet.hadoop.util.MockInputStream.TEST_ARRAY;
-
-public class TestHadoop1ByteBufferReads {
-
-  private static final ThreadLocal<byte[]> TEMP = new ThreadLocal<byte[]>() {
-    @Override
-    protected byte[] initialValue() {
-      return new byte[8192];
-    }
-  };
-
-  @Test
-  public void testHeapRead() throws Exception {
-    ByteBuffer readBuffer = ByteBuffer.allocate(20);
-
-    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream());
-
-    int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
-    Assert.assertEquals(10, len);
-    Assert.assertEquals(10, readBuffer.position());
-    Assert.assertEquals(20, readBuffer.limit());
-
-    len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
-    Assert.assertEquals(-1, len);
-
-    readBuffer.flip();
-    Assert.assertEquals("Buffer contents should match",
-        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
-  }
-
-  @Test
-  public void testHeapSmallBuffer() throws Exception {
-    ByteBuffer readBuffer = ByteBuffer.allocate(5);
-
-    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream());
-
-    int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
-    Assert.assertEquals(5, len);
-    Assert.assertEquals(5, readBuffer.position());
-    Assert.assertEquals(5, readBuffer.limit());
-
-    len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
-    Assert.assertEquals(0, len);
-
-    readBuffer.flip();
-    Assert.assertEquals("Buffer contents should match",
-        ByteBuffer.wrap(TEST_ARRAY, 0, 5), readBuffer);
-  }
-
-  @Test
-  public void testHeapSmallReads() throws Exception {
-    ByteBuffer readBuffer = ByteBuffer.allocate(10);
-
-    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream(2, 3, 3));
-
-    int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
-    Assert.assertEquals(2, len);
-    Assert.assertEquals(2, readBuffer.position());
-    Assert.assertEquals(10, readBuffer.limit());
-
-    len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
-    Assert.assertEquals(3, len);
-    Assert.assertEquals(5, readBuffer.position());
-    Assert.assertEquals(10, readBuffer.limit());
-
-    len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
-    Assert.assertEquals(3, len);
-    Assert.assertEquals(8, readBuffer.position());
-    Assert.assertEquals(10, readBuffer.limit());
-
-    len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
-    Assert.assertEquals(2, len);
-    Assert.assertEquals(10, readBuffer.position());
-    Assert.assertEquals(10, readBuffer.limit());
-
-    readBuffer.flip();
-    Assert.assertEquals("Buffer contents should match",
-        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
-  }
-
-  @Test
-  public void testHeapPosition() throws Exception {
-    ByteBuffer readBuffer = ByteBuffer.allocate(20);
-    readBuffer.position(10);
-    readBuffer.mark();
-
-    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream(8));
-
-    int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
-    Assert.assertEquals(8, len);
-    Assert.assertEquals(18, readBuffer.position());
-    Assert.assertEquals(20, readBuffer.limit());
-
-    len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
-    Assert.assertEquals(2, len);
-    Assert.assertEquals(20, readBuffer.position());
-    Assert.assertEquals(20, readBuffer.limit());
-
-    len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
-    Assert.assertEquals(-1, len);
-
-    readBuffer.reset();
-    Assert.assertEquals("Buffer contents should match",
-        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
-  }
-
-  @Test
-  public void testHeapLimit() throws Exception {
-    ByteBuffer readBuffer = ByteBuffer.allocate(20);
-    readBuffer.limit(8);
-
-    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream(7));
-
-    int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
-    Assert.assertEquals(7, len);
-    Assert.assertEquals(7, readBuffer.position());
-    Assert.assertEquals(8, readBuffer.limit());
-
-    len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
-    Assert.assertEquals(1, len);
-    Assert.assertEquals(8, readBuffer.position());
-    Assert.assertEquals(8, readBuffer.limit());
-
-    len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
-    Assert.assertEquals(0, len);
-
-    readBuffer.flip();
-    Assert.assertEquals("Buffer contents should match",
-        ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
-  }
-
-  @Test
-  public void testHeapPositionAndLimit() throws Exception {
-    ByteBuffer readBuffer = ByteBuffer.allocate(20);
-    readBuffer.position(5);
-    readBuffer.limit(13);
-    readBuffer.mark();
-
-    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream(7));
-
-    int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
-    Assert.assertEquals(7, len);
-    Assert.assertEquals(12, readBuffer.position());
-    Assert.assertEquals(13, readBuffer.limit());
-
-    len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
-    Assert.assertEquals(1, len);
-    Assert.assertEquals(13, readBuffer.position());
-    Assert.assertEquals(13, readBuffer.limit());
-
-    len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
-    Assert.assertEquals(0, len);
-
-    readBuffer.reset();
-    Assert.assertEquals("Buffer contents should match",
-        ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
-  }
-
-  @Test
-  public void testDirectRead() throws Exception {
-    ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
-
-    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream());
-
-    int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, 
TEMP.get());
-    Assert.assertEquals(10, len);
-    Assert.assertEquals(10, readBuffer.position());
-    Assert.assertEquals(20, readBuffer.limit());
-
-    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, 
TEMP.get());
-    Assert.assertEquals(-1, len);
-
-    readBuffer.flip();
-    Assert.assertEquals("Buffer contents should match",
-        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
-  }
-
-  @Test
-  public void testDirectSmallBuffer() throws Exception {
-    ByteBuffer readBuffer = ByteBuffer.allocateDirect(5);
-
-    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream());
-
-    int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, 
TEMP.get());
-    Assert.assertEquals(5, len);
-    Assert.assertEquals(5, readBuffer.position());
-    Assert.assertEquals(5, readBuffer.limit());
-
-    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, 
TEMP.get());
-    Assert.assertEquals(0, len);
-
-    readBuffer.flip();
-    Assert.assertEquals("Buffer contents should match",
-        ByteBuffer.wrap(TEST_ARRAY, 0, 5), readBuffer);
-  }
-
-  @Test
-  public void testDirectSmallReads() throws Exception {
-    ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
-
-    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream(2, 3, 3));
-
-    int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, 
TEMP.get());
-    Assert.assertEquals(2, len);
-    Assert.assertEquals(2, readBuffer.position());
-    Assert.assertEquals(10, readBuffer.limit());
-
-    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, 
TEMP.get());
-    Assert.assertEquals(3, len);
-    Assert.assertEquals(5, readBuffer.position());
-    Assert.assertEquals(10, readBuffer.limit());
-
-    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, 
TEMP.get());
-    Assert.assertEquals(3, len);
-    Assert.assertEquals(8, readBuffer.position());
-    Assert.assertEquals(10, readBuffer.limit());
-
-    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, 
TEMP.get());
-    Assert.assertEquals(2, len);
-    Assert.assertEquals(10, readBuffer.position());
-    Assert.assertEquals(10, readBuffer.limit());
-
-    readBuffer.flip();
-    Assert.assertEquals("Buffer contents should match",
-        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
-  }
-
-  @Test
-  public void testDirectPosition() throws Exception {
-    ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
-    readBuffer.position(10);
-    readBuffer.mark();
-
-    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream(8));
-
-    int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, 
TEMP.get());
-    Assert.assertEquals(8, len);
-    Assert.assertEquals(18, readBuffer.position());
-    Assert.assertEquals(20, readBuffer.limit());
-
-    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, 
TEMP.get());
-    Assert.assertEquals(2, len);
-    Assert.assertEquals(20, readBuffer.position());
-    Assert.assertEquals(20, readBuffer.limit());
-
-    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, 
TEMP.get());
-    Assert.assertEquals(-1, len);
-
-    readBuffer.reset();
-    Assert.assertEquals("Buffer contents should match",
-        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
-  }
-
-  @Test
-  public void testDirectLimit() throws Exception {
-    ByteBuffer readBuffer = ByteBuffer.allocate(20);
-    readBuffer.limit(8);
-
-    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream(7));
-
-    int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, 
TEMP.get());
-    Assert.assertEquals(7, len);
-    Assert.assertEquals(7, readBuffer.position());
-    Assert.assertEquals(8, readBuffer.limit());
-
-    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, 
TEMP.get());
-    Assert.assertEquals(1, len);
-    Assert.assertEquals(8, readBuffer.position());
-    Assert.assertEquals(8, readBuffer.limit());
-
-    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, 
TEMP.get());
-    Assert.assertEquals(0, len);
-
-    readBuffer.flip();
-    Assert.assertEquals("Buffer contents should match",
-        ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
-  }
-
-  @Test
-  public void testDirectPositionAndLimit() throws Exception {
-    ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
-    readBuffer.position(5);
-    readBuffer.limit(13);
-    readBuffer.mark();
-
-    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream(7));
-
-    int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, 
TEMP.get());
-    Assert.assertEquals(7, len);
-    Assert.assertEquals(12, readBuffer.position());
-    Assert.assertEquals(13, readBuffer.limit());
-
-    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, 
TEMP.get());
-    Assert.assertEquals(1, len);
-    Assert.assertEquals(13, readBuffer.position());
-    Assert.assertEquals(13, readBuffer.limit());
-
-    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, 
TEMP.get());
-    Assert.assertEquals(0, len);
-
-    readBuffer.reset();
-    Assert.assertEquals("Buffer contents should match",
-        ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
-  }
-
-  @Test
-  public void testDirectSmallTempBufferSmallReads() throws Exception {
-    byte[] temp = new byte[2]; // this will cause readDirectBuffer to loop
-
-    ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
-
-    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream(2, 3, 3));
-
-    int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, 
temp);
-    Assert.assertEquals(2, len);
-    Assert.assertEquals(2, readBuffer.position());
-    Assert.assertEquals(10, readBuffer.limit());
-
-    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, 
temp);
-    Assert.assertEquals(3, len);
-    Assert.assertEquals(5, readBuffer.position());
-    Assert.assertEquals(10, readBuffer.limit());
-
-    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, 
temp);
-    Assert.assertEquals(3, len);
-    Assert.assertEquals(8, readBuffer.position());
-    Assert.assertEquals(10, readBuffer.limit());
-
-    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, 
temp);
-    Assert.assertEquals(2, len);
-    Assert.assertEquals(10, readBuffer.position());
-    Assert.assertEquals(10, readBuffer.limit());
-
-    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, 
temp);
-    Assert.assertEquals(-1, len);
-
-    readBuffer.flip();
-    Assert.assertEquals("Buffer contents should match",
-        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
-  }
-
-  @Test
-  public void testDirectSmallTempBufferWithPositionAndLimit() throws Exception 
{
-    byte[] temp = new byte[2]; // this will cause readDirectBuffer to loop
-
-    ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
-    readBuffer.position(5);
-    readBuffer.limit(13);
-    readBuffer.mark();
-
-    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream(7));
-
-    int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, 
temp);
-    Assert.assertEquals(7, len);
-    Assert.assertEquals(12, readBuffer.position());
-    Assert.assertEquals(13, readBuffer.limit());
-
-    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, 
temp);
-    Assert.assertEquals(1, len);
-    Assert.assertEquals(13, readBuffer.position());
-    Assert.assertEquals(13, readBuffer.limit());
-
-    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, 
temp);
-    Assert.assertEquals(0, len);
-
-    readBuffer.reset();
-    Assert.assertEquals("Buffer contents should match",
-        ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
-  }
-
-  @Test
-  public void testHeapReadFullySmallBuffer() throws Exception {
-    ByteBuffer readBuffer = ByteBuffer.allocate(8);
-
-    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream());
-
-    H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
-    Assert.assertEquals(8, readBuffer.position());
-    Assert.assertEquals(8, readBuffer.limit());
-
-    H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
-    Assert.assertEquals(8, readBuffer.position());
-    Assert.assertEquals(8, readBuffer.limit());
-
-    readBuffer.flip();
-    Assert.assertEquals("Buffer contents should match",
-        ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
-  }
-
-  @Test
-  public void testHeapReadFullyLargeBuffer() throws Exception {
-    final ByteBuffer readBuffer = ByteBuffer.allocate(20);
-
-    final FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream());
-
-    TestUtils.assertThrows("Should throw EOFException",
-        EOFException.class, new Callable() {
-          @Override
-          public Object call() throws Exception {
-            H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, 
readBuffer);
-            return null;
-          }
-        });
-
-    Assert.assertEquals(0, readBuffer.position());
-    Assert.assertEquals(20, readBuffer.limit());
-  }
-
-  @Test
-  public void testHeapReadFullyJustRight() throws Exception {
-    final ByteBuffer readBuffer = ByteBuffer.allocate(10);
-
-    final FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream());
-
-    // reads all of the bytes available without EOFException
-    H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
-    Assert.assertEquals(10, readBuffer.position());
-    Assert.assertEquals(10, readBuffer.limit());
-
-    // trying to read 0 more bytes doesn't result in EOFException
-    H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
-    Assert.assertEquals(10, readBuffer.position());
-    Assert.assertEquals(10, readBuffer.limit());
-
-    readBuffer.flip();
-    Assert.assertEquals("Buffer contents should match",
-        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
-  }
-
-  @Test
-  public void testHeapReadFullySmallReads() throws Exception {
-    final ByteBuffer readBuffer = ByteBuffer.allocate(10);
-
-    final FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream(2, 3, 3));
-
-    H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
-    Assert.assertEquals(10, readBuffer.position());
-    Assert.assertEquals(10, readBuffer.limit());
-
-    H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
-    Assert.assertEquals(10, readBuffer.position());
-    Assert.assertEquals(10, readBuffer.limit());
-
-    readBuffer.flip();
-    Assert.assertEquals("Buffer contents should match",
-        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
-  }
-
-  @Test
-  public void testHeapReadFullyPosition() throws Exception {
-    final ByteBuffer readBuffer = ByteBuffer.allocate(10);
-    readBuffer.position(3);
-    readBuffer.mark();
-
-    final FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream(2, 3, 3));
-
-    H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
-    Assert.assertEquals(10, readBuffer.position());
-    Assert.assertEquals(10, readBuffer.limit());
-
-    H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
-    Assert.assertEquals(10, readBuffer.position());
-    Assert.assertEquals(10, readBuffer.limit());
-
-    readBuffer.reset();
-    Assert.assertEquals("Buffer contents should match",
-        ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
-  }
-
-  @Test
-  public void testHeapReadFullyLimit() throws Exception {
-    final ByteBuffer readBuffer = ByteBuffer.allocate(10);
-    readBuffer.limit(7);
-
-    final FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream(2, 3, 3));
-
-    H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
-    Assert.assertEquals(7, readBuffer.position());
-    Assert.assertEquals(7, readBuffer.limit());
-
-    H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
-    Assert.assertEquals(7, readBuffer.position());
-    Assert.assertEquals(7, readBuffer.limit());
-
-    readBuffer.flip();
-    Assert.assertEquals("Buffer contents should match",
-        ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
-
-    readBuffer.position(7);
-    readBuffer.limit(10);
-    H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
-    Assert.assertEquals(10, readBuffer.position());
-    Assert.assertEquals(10, readBuffer.limit());
-
-    readBuffer.flip();
-    Assert.assertEquals("Buffer contents should match",
-        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
-  }
-
-  @Test
-  public void testHeapReadFullyPositionAndLimit() throws Exception {
-    final ByteBuffer readBuffer = ByteBuffer.allocate(10);
-    readBuffer.position(3);
-    readBuffer.limit(7);
-    readBuffer.mark();
-
-    final FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream(2, 3, 3));
-
-    H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
-    Assert.assertEquals(7, readBuffer.position());
-    Assert.assertEquals(7, readBuffer.limit());
-
-    H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
-    Assert.assertEquals(7, readBuffer.position());
-    Assert.assertEquals(7, readBuffer.limit());
-
-    readBuffer.reset();
-    Assert.assertEquals("Buffer contents should match",
-        ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer);
-
-    readBuffer.position(7);
-    readBuffer.limit(10);
-    H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
-    Assert.assertEquals(10, readBuffer.position());
-    Assert.assertEquals(10, readBuffer.limit());
-
-    readBuffer.reset();
-    Assert.assertEquals("Buffer contents should match",
-        ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
-  }
-
-  @Test
-  public void testDirectReadFullySmallBuffer() throws Exception {
-    ByteBuffer readBuffer = ByteBuffer.allocateDirect(8);
-
-    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream());
-
-    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, 
TEMP.get());
-    Assert.assertEquals(8, readBuffer.position());
-    Assert.assertEquals(8, readBuffer.limit());
-
-    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, 
TEMP.get());
-    Assert.assertEquals(8, readBuffer.position());
-    Assert.assertEquals(8, readBuffer.limit());
-
-    readBuffer.flip();
-    Assert.assertEquals("Buffer contents should match",
-        ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
-  }
-
-  @Test
-  public void testDirectReadFullyLargeBuffer() throws Exception {
-    final ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
-
-    final FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream());
-
-    TestUtils.assertThrows("Should throw EOFException",
-        EOFException.class, new Callable() {
-          @Override
-          public Object call() throws Exception {
-            H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, 
readBuffer, TEMP.get());
-            return null;
-          }
-        });
-
-    // NOTE: This behavior differs from readFullyHeapBuffer because direct uses
-    // several read operations that will read up to the end of the input. This
-    // is a correct value because the bytes in the buffer are valid. This
-    // behavior can't be implemented for the heap buffer without using the read
-    // method instead of the readFully method on the underlying
-    // FSDataInputStream.
-    Assert.assertEquals(10, readBuffer.position());
-    Assert.assertEquals(20, readBuffer.limit());
-  }
-
-  @Test
-  public void testDirectReadFullyJustRight() throws Exception {
-    final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
-
-    final FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream());
-
-    // reads all of the bytes available without EOFException
-    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, 
TEMP.get());
-    Assert.assertEquals(10, readBuffer.position());
-    Assert.assertEquals(10, readBuffer.limit());
-
-    // trying to read 0 more bytes doesn't result in EOFException
-    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, 
TEMP.get());
-    Assert.assertEquals(10, readBuffer.position());
-    Assert.assertEquals(10, readBuffer.limit());
-
-    readBuffer.flip();
-    Assert.assertEquals("Buffer contents should match",
-        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
-  }
-
-  @Test
-  public void testDirectReadFullySmallReads() throws Exception {
-    final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
-
-    final FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream(2, 3, 3));
-
-    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, 
TEMP.get());
-    Assert.assertEquals(10, readBuffer.position());
-    Assert.assertEquals(10, readBuffer.limit());
-
-    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, 
TEMP.get());
-    Assert.assertEquals(10, readBuffer.position());
-    Assert.assertEquals(10, readBuffer.limit());
-
-    readBuffer.flip();
-    Assert.assertEquals("Buffer contents should match",
-        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
-  }
-
-  @Test
-  public void testDirectReadFullyPosition() throws Exception {
-    final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
-    readBuffer.position(3);
-    readBuffer.mark();
-
-    final FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream(2, 3, 3));
-
-    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, 
TEMP.get());
-    Assert.assertEquals(10, readBuffer.position());
-    Assert.assertEquals(10, readBuffer.limit());
-
-    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, 
TEMP.get());
-    Assert.assertEquals(10, readBuffer.position());
-    Assert.assertEquals(10, readBuffer.limit());
-
-    readBuffer.reset();
-    Assert.assertEquals("Buffer contents should match",
-        ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
-  }
-
-  @Test
-  public void testDirectReadFullyLimit() throws Exception {
-    final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
-    readBuffer.limit(7);
-
-    final FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream(2, 3, 3));
-
-    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, 
TEMP.get());
-    Assert.assertEquals(7, readBuffer.position());
-    Assert.assertEquals(7, readBuffer.limit());
-
-    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, 
TEMP.get());
-    Assert.assertEquals(7, readBuffer.position());
-    Assert.assertEquals(7, readBuffer.limit());
-
-    readBuffer.flip();
-    Assert.assertEquals("Buffer contents should match",
-        ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
-
-    readBuffer.position(7);
-    readBuffer.limit(10);
-    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, 
TEMP.get());
-    Assert.assertEquals(10, readBuffer.position());
-    Assert.assertEquals(10, readBuffer.limit());
-
-    readBuffer.flip();
-    Assert.assertEquals("Buffer contents should match",
-        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
-  }
-
-  @Test
-  public void testDirectReadFullyPositionAndLimit() throws Exception {
-    final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
-    readBuffer.position(3);
-    readBuffer.limit(7);
-    readBuffer.mark();
-
-    final FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream(2, 3, 3));
-
-    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, 
TEMP.get());
-    Assert.assertEquals(7, readBuffer.position());
-    Assert.assertEquals(7, readBuffer.limit());
-
-    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, 
TEMP.get());
-    Assert.assertEquals(7, readBuffer.position());
-    Assert.assertEquals(7, readBuffer.limit());
-
-    readBuffer.reset();
-    Assert.assertEquals("Buffer contents should match",
-        ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer);
-
-    readBuffer.position(7);
-    readBuffer.limit(10);
-    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, 
TEMP.get());
-    Assert.assertEquals(10, readBuffer.position());
-    Assert.assertEquals(10, readBuffer.limit());
-
-    readBuffer.reset();
-    Assert.assertEquals("Buffer contents should match",
-        ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
-  }
-
-  @Test
-  public void testDirectReadFullySmallTempBufferWithPositionAndLimit() throws 
Exception {
-    byte[] temp = new byte[2]; // this will cause readFully to loop
-
-    final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
-    readBuffer.position(3);
-    readBuffer.limit(7);
-    readBuffer.mark();
-
-    final FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream(2, 3, 3));
-
-    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, 
temp);
-    Assert.assertEquals(7, readBuffer.position());
-    Assert.assertEquals(7, readBuffer.limit());
-
-    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, 
temp);
-    Assert.assertEquals(7, readBuffer.position());
-    Assert.assertEquals(7, readBuffer.limit());
-
-    readBuffer.reset();
-    Assert.assertEquals("Buffer contents should match",
-        ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer);
-
-    readBuffer.position(7);
-    readBuffer.limit(10);
-    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, 
temp);
-    Assert.assertEquals(10, readBuffer.position());
-    Assert.assertEquals(10, readBuffer.limit());
-
-    readBuffer.reset();
-    Assert.assertEquals("Buffer contents should match",
-        ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java
index 86b903c..68c9b3b 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java
@@ -28,7 +28,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.Callable;
 
-import static org.apache.parquet.hadoop.util.MockInputStream.TEST_ARRAY;
+import static org.apache.parquet.hadoop.util.MockHadoopInputStream.TEST_ARRAY;
 
 public class TestHadoop2ByteBufferReads {
 
@@ -59,7 +59,7 @@ public class TestHadoop2ByteBufferReads {
   public void testHeapReadFullySmallBuffer() throws Exception {
     ByteBuffer readBuffer = ByteBuffer.allocate(8);
 
-    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream());
+    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockHadoopInputStream());
     MockBufferReader reader = new MockBufferReader(hadoopStream);
 
     H2SeekableInputStream.readFully(reader, readBuffer);
@@ -79,7 +79,7 @@ public class TestHadoop2ByteBufferReads {
   public void testHeapReadFullyLargeBuffer() throws Exception {
     final ByteBuffer readBuffer = ByteBuffer.allocate(20);
 
-    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream());
+    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockHadoopInputStream());
     final MockBufferReader reader = new MockBufferReader(hadoopStream);
 
     TestUtils.assertThrows("Should throw EOFException",
@@ -105,7 +105,7 @@ public class TestHadoop2ByteBufferReads {
   public void testHeapReadFullyJustRight() throws Exception {
     ByteBuffer readBuffer = ByteBuffer.allocate(10);
 
-    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream());
+    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockHadoopInputStream());
     MockBufferReader reader = new MockBufferReader(hadoopStream);
 
     // reads all of the bytes available without EOFException
@@ -127,7 +127,7 @@ public class TestHadoop2ByteBufferReads {
   public void testHeapReadFullySmallReads() throws Exception {
     ByteBuffer readBuffer = ByteBuffer.allocate(10);
 
-    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream(2, 3, 3));
+    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockHadoopInputStream(2, 3, 3));
     MockBufferReader reader = new MockBufferReader(hadoopStream);
 
     H2SeekableInputStream.readFully(reader, readBuffer);
@@ -149,7 +149,7 @@ public class TestHadoop2ByteBufferReads {
     readBuffer.position(3);
     readBuffer.mark();
 
-    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream(2, 3, 3));
+    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockHadoopInputStream(2, 3, 3));
     MockBufferReader reader = new MockBufferReader(hadoopStream);
 
     H2SeekableInputStream.readFully(reader, readBuffer);
@@ -170,7 +170,7 @@ public class TestHadoop2ByteBufferReads {
     ByteBuffer readBuffer = ByteBuffer.allocate(10);
     readBuffer.limit(7);
 
-    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream(2, 3, 3));
+    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockHadoopInputStream(2, 3, 3));
     MockBufferReader reader = new MockBufferReader(hadoopStream);
 
     H2SeekableInputStream.readFully(reader, readBuffer);
@@ -203,7 +203,7 @@ public class TestHadoop2ByteBufferReads {
     readBuffer.limit(7);
     readBuffer.mark();
 
-    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream(2, 3, 3));
+    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockHadoopInputStream(2, 3, 3));
     MockBufferReader reader = new MockBufferReader(hadoopStream);
 
     H2SeekableInputStream.readFully(reader, readBuffer);
@@ -233,7 +233,7 @@ public class TestHadoop2ByteBufferReads {
   public void testDirectReadFullySmallBuffer() throws Exception {
     ByteBuffer readBuffer = ByteBuffer.allocateDirect(8);
 
-    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream());
+    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockHadoopInputStream());
     MockBufferReader reader = new MockBufferReader(hadoopStream);
 
     H2SeekableInputStream.readFully(reader, readBuffer);
@@ -253,7 +253,7 @@ public class TestHadoop2ByteBufferReads {
   public void testDirectReadFullyLargeBuffer() throws Exception {
     final ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
 
-    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream());
+    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockHadoopInputStream());
     final MockBufferReader reader = new MockBufferReader(hadoopStream);
 
     TestUtils.assertThrows("Should throw EOFException",
@@ -279,7 +279,7 @@ public class TestHadoop2ByteBufferReads {
   public void testDirectReadFullyJustRight() throws Exception {
     ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
 
-    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream());
+    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockHadoopInputStream());
     MockBufferReader reader = new MockBufferReader(hadoopStream);
 
     // reads all of the bytes available without EOFException
@@ -301,7 +301,7 @@ public class TestHadoop2ByteBufferReads {
   public void testDirectReadFullySmallReads() throws Exception {
     ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
 
-    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream(2, 3, 3));
+    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockHadoopInputStream(2, 3, 3));
     MockBufferReader reader = new MockBufferReader(hadoopStream);
 
     H2SeekableInputStream.readFully(reader, readBuffer);
@@ -323,7 +323,7 @@ public class TestHadoop2ByteBufferReads {
     readBuffer.position(3);
     readBuffer.mark();
 
-    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream(2, 3, 3));
+    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockHadoopInputStream(2, 3, 3));
     MockBufferReader reader = new MockBufferReader(hadoopStream);
 
     H2SeekableInputStream.readFully(reader, readBuffer);
@@ -344,7 +344,7 @@ public class TestHadoop2ByteBufferReads {
     ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
     readBuffer.limit(7);
 
-    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream(2, 3, 3));
+    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockHadoopInputStream(2, 3, 3));
     H2SeekableInputStream.Reader reader = new MockBufferReader(hadoopStream);
 
     H2SeekableInputStream.readFully(reader, readBuffer);
@@ -377,7 +377,7 @@ public class TestHadoop2ByteBufferReads {
     readBuffer.limit(7);
     readBuffer.mark();
 
-    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockInputStream(2, 3, 3));
+    FSDataInputStream hadoopStream = new FSDataInputStream(new 
MockHadoopInputStream(2, 3, 3));
     MockBufferReader reader = new MockBufferReader(hadoopStream);
 
     H2SeekableInputStream.readFully(reader, readBuffer);

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
----------------------------------------------------------------------
diff --git 
a/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
 
b/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
index 5d79a49..fe64587 100644
--- 
a/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
+++ 
b/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
 import org.apache.parquet.hadoop.util.HiddenFileFilter;
 import org.apache.parquet.hadoop.ParquetFileWriter;
 import org.apache.parquet.hadoop.metadata.FileMetaData;
@@ -91,7 +92,7 @@ public class MergeCommand extends ArgsOnlyCommand {
         tooSmallFilesMerged = true;
       }
 
-      writer.appendFile(conf, input);
+      writer.appendFile(HadoopInputFile.fromPath(input, conf));
     }
 
     if (tooSmallFilesMerged) {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 44b0b62..05e3e47 100644
--- a/pom.xml
+++ b/pom.xml
@@ -262,7 +262,14 @@
                      
<exclude>org/apache/parquet/avro/SpecificDataSupplier</exclude> <!-- made 
public -->
                      
<exclude>org/apache/parquet/io/ColumnIOFactory$ColumnIOCreatorVisitor</exclude> 
<!-- removed non-API class -->
                      
<exclude>org/apache/parquet/io/ColumnIOFactory/**</exclude> <!-- removed 
non-API class and methods-->
-                    
<exclude>org/apache/parquet/hadoop/codec/SnappyCompressor</exclude> <!-- added 
synchronized modifier -->
+                     
<exclude>org/apache/parquet/hadoop/codec/SnappyCompressor</exclude> <!-- added 
synchronized modifier -->
+                     <exclude>org/apache/parquet/bytes/BytesInput</exclude> 
<!-- moved to parquet-common -->
+                     
<exclude>org/apache/parquet/bytes/CapacityByteArrayOutputStream</exclude> <!-- 
moved to parquet-common -->
+                     
<exclude>org/apache/parquet/bytes/ConcatenatingByteArrayCollector</exclude> 
<!-- moved to parquet-common -->
+                     
<exclude>org/apache/parquet/bytes/LittleEndianDataInputStream</exclude> <!-- 
moved to parquet-common -->
+                     
<exclude>org/apache/parquet/bytes/LittleEndianDataOutputStream</exclude> <!-- 
moved to parquet-common -->
+                     
<exclude>org/apache/parquet/hadoop/metadata/CompressionCodecName</exclude> <!-- 
moved to parquet-common -->
+                     
<exclude>org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException</exclude>
 <!-- moved to parquet-common -->
                    </excludes>
                  </requireBackwardCompatibility>
                </rules>

Reply via email to