This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 09445b513 PARQUET-2451: Add BYTE_STREAM_SPLIT support for 
FIXED_LEN_BYTE_ARRAY, INT32 and INT64 (#1291)
09445b513 is described below

commit 09445b513c6ed46f51d7e23a41dec67689b513d9
Author: Antoine Pitrou <[email protected]>
AuthorDate: Fri Apr 26 04:06:31 2024 +0200

    PARQUET-2451: Add BYTE_STREAM_SPLIT support for FIXED_LEN_BYTE_ARRAY, INT32 
and INT64 (#1291)
---
 .../java/org/apache/parquet/column/Encoding.java   |   9 +
 .../apache/parquet/column/ParquetProperties.java   | 100 +++--
 .../ByteStreamSplitValuesReader.java               |  47 +--
 .../ByteStreamSplitValuesReaderForDouble.java      |   9 +-
 ...ava => ByteStreamSplitValuesReaderForFLBA.java} |  18 +-
 .../ByteStreamSplitValuesReaderForFloat.java       |   9 +-
 ... => ByteStreamSplitValuesReaderForInteger.java} |  17 +-
 ...ava => ByteStreamSplitValuesReaderForLong.java} |  17 +-
 .../ByteStreamSplitValuesWriter.java               |  58 ++-
 .../factory/DefaultV1ValuesWriterFactory.java      |  54 ++-
 .../factory/DefaultV2ValuesWriterFactory.java      |  56 ++-
 .../parquet/example/data/simple/SimpleGroup.java   |  28 +-
 .../ByteStreamSplitValuesEndToEndTest.java         | 116 +++++-
 .../ByteStreamSplitValuesReaderTest.java           | 209 ++++++++---
 .../ByteStreamSplitValuesWriterTest.java           |  99 ++++-
 .../factory/DefaultValuesWriterFactoryTest.java    | 404 ++++++++++++++++++---
 .../hadoop/TestInterOpReadByteStreamSplit.java     |  39 +-
 pom.xml                                            |   2 +
 18 files changed, 1052 insertions(+), 239 deletions(-)

diff --git 
a/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java 
b/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java
index e2c231f0f..cadf8f2e0 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java
@@ -31,7 +31,10 @@ import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.column.values.bitpacking.ByteBitPackingValuesReader;
 import 
org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForDouble;
+import 
org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForFLBA;
 import 
org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForFloat;
+import 
org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForInteger;
+import 
org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForLong;
 import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
 import 
org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesReader;
 import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader;
@@ -129,6 +132,12 @@ public enum Encoding {
           return new ByteStreamSplitValuesReaderForFloat();
         case DOUBLE:
           return new ByteStreamSplitValuesReaderForDouble();
+        case INT32:
+          return new ByteStreamSplitValuesReaderForInteger();
+        case INT64:
+          return new ByteStreamSplitValuesReaderForLong();
+        case FIXED_LEN_BYTE_ARRAY:
+          return new 
ByteStreamSplitValuesReaderForFLBA(descriptor.getTypeLength());
         default:
           throw new ParquetDecodingException("no byte stream split reader for 
type " + descriptor.getType());
       }
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java 
b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index 5152d5b07..0645b2b53 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -70,6 +70,12 @@ public class ParquetProperties {
 
   private static final int MIN_SLAB_SIZE = 64;
 
+  private enum ByteStreamSplitMode {
+    NONE,
+    FLOATING_POINT,
+    EXTENDED
+  }
+
   public enum WriterVersion {
     PARQUET_1_0("v1"),
     PARQUET_2_0("v2");
@@ -114,7 +120,7 @@ public class ParquetProperties {
   private final ColumnProperty<Integer> numBloomFilterCandidates;
   private final int pageRowCountLimit;
   private final boolean pageWriteChecksumEnabled;
-  private final boolean enableByteStreamSplit;
+  private final ColumnProperty<ByteStreamSplitMode> byteStreamSplitEnabled;
   private final Map<String, String> extraMetaData;
 
   private ParquetProperties(Builder builder) {
@@ -141,10 +147,18 @@ public class ParquetProperties {
     this.numBloomFilterCandidates = builder.numBloomFilterCandidates.build();
     this.pageRowCountLimit = builder.pageRowCountLimit;
     this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled;
-    this.enableByteStreamSplit = builder.enableByteStreamSplit;
+    this.byteStreamSplitEnabled = builder.byteStreamSplitEnabled.build();
     this.extraMetaData = builder.extraMetaData;
   }
 
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static Builder copy(ParquetProperties toCopy) {
+    return new Builder(toCopy);
+  }
+
   public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) {
     return newColumnDescriptorValuesWriter(path.getMaxRepetitionLevel());
   }
@@ -208,8 +222,23 @@ public class ParquetProperties {
     return dictionaryEnabled.getValue(column);
   }
 
+  @Deprecated()
   public boolean isByteStreamSplitEnabled() {
-    return enableByteStreamSplit;
+    return byteStreamSplitEnabled.getDefaultValue() != 
ByteStreamSplitMode.NONE;
+  }
+
+  public boolean isByteStreamSplitEnabled(ColumnDescriptor column) {
+    switch (column.getPrimitiveType().getPrimitiveTypeName()) {
+      case FLOAT:
+      case DOUBLE:
+        return byteStreamSplitEnabled.getValue(column) != 
ByteStreamSplitMode.NONE;
+      case INT32:
+      case INT64:
+      case FIXED_LEN_BYTE_ARRAY:
+        return byteStreamSplitEnabled.getValue(column) == 
ByteStreamSplitMode.EXTENDED;
+      default:
+        return false;
+    }
   }
 
   public ByteBufferAllocator getAllocator() {
@@ -301,14 +330,6 @@ public class ParquetProperties {
     return extraMetaData;
   }
 
-  public static Builder builder() {
-    return new Builder();
-  }
-
-  public static Builder copy(ParquetProperties toCopy) {
-    return new Builder(toCopy);
-  }
-
   @Override
   public String toString() {
     return "Parquet page size to " + getPageSizeThreshold() + '\n'
@@ -349,11 +370,16 @@ public class ParquetProperties {
     private final ColumnProperty.Builder<Boolean> bloomFilterEnabled;
     private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
     private boolean pageWriteChecksumEnabled = 
DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
-    private boolean enableByteStreamSplit = 
DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED;
+    private final ColumnProperty.Builder<ByteStreamSplitMode> 
byteStreamSplitEnabled;
     private Map<String, String> extraMetaData = new HashMap<>();
 
     private Builder() {
       enableDict = 
ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_IS_DICTIONARY_ENABLED);
+      byteStreamSplitEnabled = ColumnProperty.<ByteStreamSplitMode>builder()
+          .withDefaultValue(
+              DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED
+                  ? ByteStreamSplitMode.FLOATING_POINT
+                  : ByteStreamSplitMode.NONE);
       bloomFilterEnabled = 
ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_BLOOM_FILTER_ENABLED);
       bloomFilterNDVs = ColumnProperty.<Long>builder().withDefaultValue(null);
       bloomFilterFPPs = 
ColumnProperty.<Double>builder().withDefaultValue(DEFAULT_BLOOM_FILTER_FPP);
@@ -365,7 +391,7 @@ public class ParquetProperties {
 
     private Builder(ParquetProperties toCopy) {
       this.pageSize = toCopy.pageSizeThreshold;
-      this.enableDict = 
ColumnProperty.<Boolean>builder(toCopy.dictionaryEnabled);
+      this.enableDict = ColumnProperty.builder(toCopy.dictionaryEnabled);
       this.dictPageSize = toCopy.dictionaryPageSizeThreshold;
       this.writerVersion = toCopy.writerVersion;
       this.minRowCountForPageSizeCheck = toCopy.minRowCountForPageSizeCheck;
@@ -375,13 +401,13 @@ public class ParquetProperties {
       this.allocator = toCopy.allocator;
       this.pageRowCountLimit = toCopy.pageRowCountLimit;
       this.pageWriteChecksumEnabled = toCopy.pageWriteChecksumEnabled;
-      this.bloomFilterNDVs = 
ColumnProperty.<Long>builder(toCopy.bloomFilterNDVs);
-      this.bloomFilterFPPs = 
ColumnProperty.<Double>builder(toCopy.bloomFilterFPPs);
-      this.bloomFilterEnabled = 
ColumnProperty.<Boolean>builder(toCopy.bloomFilterEnabled);
-      this.adaptiveBloomFilterEnabled = 
ColumnProperty.<Boolean>builder(toCopy.adaptiveBloomFilterEnabled);
-      this.numBloomFilterCandidates = 
ColumnProperty.<Integer>builder(toCopy.numBloomFilterCandidates);
+      this.bloomFilterNDVs = ColumnProperty.builder(toCopy.bloomFilterNDVs);
+      this.bloomFilterFPPs = ColumnProperty.builder(toCopy.bloomFilterFPPs);
+      this.bloomFilterEnabled = 
ColumnProperty.builder(toCopy.bloomFilterEnabled);
+      this.adaptiveBloomFilterEnabled = 
ColumnProperty.builder(toCopy.adaptiveBloomFilterEnabled);
+      this.numBloomFilterCandidates = 
ColumnProperty.builder(toCopy.numBloomFilterCandidates);
       this.maxBloomFilterBytes = toCopy.maxBloomFilterBytes;
-      this.enableByteStreamSplit = toCopy.enableByteStreamSplit;
+      this.byteStreamSplitEnabled = 
ColumnProperty.builder(toCopy.byteStreamSplitEnabled);
       this.extraMetaData = toCopy.extraMetaData;
     }
 
@@ -420,8 +446,40 @@ public class ParquetProperties {
       return this;
     }
 
-    public Builder withByteStreamSplitEncoding(boolean enableByteStreamSplit) {
-      this.enableByteStreamSplit = enableByteStreamSplit;
+    /**
+     * Enable or disable BYTE_STREAM_SPLIT encoding for FLOAT and DOUBLE 
columns.
+     *
+     * @param enable whether BYTE_STREAM_SPLIT encoding should be enabled
+     * @return this builder for method chaining.
+     */
+    public Builder withByteStreamSplitEncoding(boolean enable) {
+      this.byteStreamSplitEnabled.withDefaultValue(
+          enable ? ByteStreamSplitMode.FLOATING_POINT : 
ByteStreamSplitMode.NONE);
+      return this;
+    }
+
+    /**
+     * Enable or disable BYTE_STREAM_SPLIT encoding for specified columns.
+     *
+     * @param columnPath the path of the column (dot-string)
+     * @param enable     whether BYTE_STREAM_SPLIT encoding should be enabled
+     * @return this builder for method chaining.
+     */
+    public Builder withByteStreamSplitEncoding(String columnPath, boolean 
enable) {
+      this.byteStreamSplitEnabled.withValue(
+          columnPath, enable ? ByteStreamSplitMode.EXTENDED : 
ByteStreamSplitMode.NONE);
+      return this;
+    }
+
+    /**
+     * Enable or disable BYTE_STREAM_SPLIT encoding for FLOAT, DOUBLE, INT32, 
INT64 and FIXED_LEN_BYTE_ARRAY columns.
+     *
+     * @param enable whether BYTE_STREAM_SPLIT encoding should be enabled
+     * @return this builder for method chaining.
+     */
+    public Builder withExtendedByteStreamSplitEncoding(boolean enable) {
+      this.byteStreamSplitEnabled.withDefaultValue(
+          enable ? ByteStreamSplitMode.EXTENDED : ByteStreamSplitMode.NONE);
       return this;
     }
 
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReader.java
 
b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReader.java
index 702a6f700..c8ab3043b 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReader.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReader.java
@@ -19,6 +19,8 @@
 package org.apache.parquet.column.values.bytestreamsplit;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.io.ParquetDecodingException;
@@ -26,10 +28,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public abstract class ByteStreamSplitValuesReader extends ValuesReader {
-
   private static final Logger LOG = 
LoggerFactory.getLogger(ByteStreamSplitValuesReader.class);
-  private final int elementSizeInBytes;
-  private byte[] byteStreamData;
+  protected final int elementSizeInBytes;
+  protected ByteBuffer decodedDataBuffer;
   private int indexInStream;
   private int valuesCount;
 
@@ -39,17 +40,27 @@ public abstract class ByteStreamSplitValuesReader extends 
ValuesReader {
     this.valuesCount = 0;
   }
 
-  protected void gatherElementDataFromStreams(byte[] gatheredData) throws 
ParquetDecodingException {
-    if (gatheredData.length != elementSizeInBytes) {
-      throw new ParquetDecodingException("gatherData buffer is not of the 
expected size.");
-    }
+  protected int nextElementByteOffset() {
     if (indexInStream >= valuesCount) {
       throw new ParquetDecodingException("Byte-stream data was already 
exhausted.");
     }
-    for (int i = 0; i < elementSizeInBytes; ++i) {
-      gatheredData[i] = byteStreamData[i * valuesCount + indexInStream];
-    }
+    int offset = indexInStream * elementSizeInBytes;
     ++indexInStream;
+    return offset;
+  }
+
+  // Decode an entire data page
+  private byte[] decodeData(ByteBuffer encoded, int valuesCount) {
+    assert encoded.limit() == valuesCount * elementSizeInBytes;
+    byte[] decoded = new byte[encoded.limit()];
+    int destByteIndex = 0;
+    for (int srcValueIndex = 0; srcValueIndex < valuesCount; ++srcValueIndex) {
+      for (int stream = 0; stream < elementSizeInBytes; ++stream, 
++destByteIndex) {
+        decoded[destByteIndex] = encoded.get(srcValueIndex + stream * 
valuesCount);
+      }
+    }
+    assert destByteIndex == decoded.length;
+    return decoded;
   }
 
   @Override
@@ -76,18 +87,12 @@ public abstract class ByteStreamSplitValuesReader extends 
ValuesReader {
       throw new ParquetDecodingException(errorMessage);
     }
 
-    // Allocate buffer for all of the byte stream data.
+    // Eagerly read and decode the data. This allows returning stable
+    // Binary views into the internal decode buffer for FIXED_LEN_BYTE_ARRAY.
     final int totalSizeInBytes = stream.available();
-    byteStreamData = new byte[totalSizeInBytes];
-
-    // Eagerly read the data for each stream.
-    final int numRead = stream.read(byteStreamData, 0, totalSizeInBytes);
-    if (numRead != totalSizeInBytes) {
-      String errorMessage = String.format(
-          "Failed to read requested number of bytes. Expected: %d. Read %d.", 
totalSizeInBytes, numRead);
-      throw new ParquetDecodingException(errorMessage);
-    }
-
+    final ByteBuffer encodedData = stream.slice(totalSizeInBytes).slice(); // 
possibly zero-copy
+    final byte[] decodedData = decodeData(encodedData, this.valuesCount);
+    decodedDataBuffer = 
ByteBuffer.wrap(decodedData).order(ByteOrder.LITTLE_ENDIAN);
     indexInStream = 0;
   }
 
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForDouble.java
 
b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForDouble.java
index ab09f04eb..e725dc9fc 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForDouble.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForDouble.java
@@ -18,20 +18,13 @@
  */
 package org.apache.parquet.column.values.bytestreamsplit;
 
-import org.apache.parquet.bytes.BytesUtils;
-
 public class ByteStreamSplitValuesReaderForDouble extends 
ByteStreamSplitValuesReader {
-
-  private final byte[] valueByteBuffer;
-
   public ByteStreamSplitValuesReaderForDouble() {
     super(Double.BYTES);
-    valueByteBuffer = new byte[Double.BYTES];
   }
 
   @Override
   public double readDouble() {
-    gatherElementDataFromStreams(valueByteBuffer);
-    return Double.longBitsToDouble(BytesUtils.bytesToLong(valueByteBuffer));
+    return decodedDataBuffer.getDouble(nextElementByteOffset());
   }
 }
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFloat.java
 
b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFLBA.java
similarity index 66%
copy from 
parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFloat.java
copy to 
parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFLBA.java
index 4f94c216a..d8613dd8b 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFloat.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFLBA.java
@@ -18,20 +18,16 @@
  */
 package org.apache.parquet.column.values.bytestreamsplit;
 
-import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.io.api.Binary;
 
-public class ByteStreamSplitValuesReaderForFloat extends 
ByteStreamSplitValuesReader {
-
-  private final byte[] valueByteBuffer;
-
-  public ByteStreamSplitValuesReaderForFloat() {
-    super(Float.BYTES);
-    valueByteBuffer = new byte[Float.BYTES];
+public class ByteStreamSplitValuesReaderForFLBA extends 
ByteStreamSplitValuesReader {
+  // Trivial, but overriden for clarity
+  public ByteStreamSplitValuesReaderForFLBA(int length) {
+    super(length);
   }
 
   @Override
-  public float readFloat() {
-    gatherElementDataFromStreams(valueByteBuffer);
-    return Float.intBitsToFloat(BytesUtils.bytesToInt(valueByteBuffer));
+  public Binary readBytes() {
+    return Binary.fromConstantByteBuffer(decodedDataBuffer, 
nextElementByteOffset(), elementSizeInBytes);
   }
 }
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFloat.java
 
b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFloat.java
index 4f94c216a..cecb7925d 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFloat.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFloat.java
@@ -18,20 +18,13 @@
  */
 package org.apache.parquet.column.values.bytestreamsplit;
 
-import org.apache.parquet.bytes.BytesUtils;
-
 public class ByteStreamSplitValuesReaderForFloat extends 
ByteStreamSplitValuesReader {
-
-  private final byte[] valueByteBuffer;
-
   public ByteStreamSplitValuesReaderForFloat() {
     super(Float.BYTES);
-    valueByteBuffer = new byte[Float.BYTES];
   }
 
   @Override
   public float readFloat() {
-    gatherElementDataFromStreams(valueByteBuffer);
-    return Float.intBitsToFloat(BytesUtils.bytesToInt(valueByteBuffer));
+    return decodedDataBuffer.getFloat(nextElementByteOffset());
   }
 }
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFloat.java
 
b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForInteger.java
similarity index 66%
copy from 
parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFloat.java
copy to 
parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForInteger.java
index 4f94c216a..57f9bfdf0 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFloat.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForInteger.java
@@ -18,20 +18,13 @@
  */
 package org.apache.parquet.column.values.bytestreamsplit;
 
-import org.apache.parquet.bytes.BytesUtils;
-
-public class ByteStreamSplitValuesReaderForFloat extends 
ByteStreamSplitValuesReader {
-
-  private final byte[] valueByteBuffer;
-
-  public ByteStreamSplitValuesReaderForFloat() {
-    super(Float.BYTES);
-    valueByteBuffer = new byte[Float.BYTES];
+public class ByteStreamSplitValuesReaderForInteger extends 
ByteStreamSplitValuesReader {
+  public ByteStreamSplitValuesReaderForInteger() {
+    super(4);
   }
 
   @Override
-  public float readFloat() {
-    gatherElementDataFromStreams(valueByteBuffer);
-    return Float.intBitsToFloat(BytesUtils.bytesToInt(valueByteBuffer));
+  public int readInteger() {
+    return decodedDataBuffer.getInt(nextElementByteOffset());
   }
 }
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFloat.java
 
b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForLong.java
similarity index 66%
copy from 
parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFloat.java
copy to 
parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForLong.java
index 4f94c216a..c7711d891 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFloat.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForLong.java
@@ -18,20 +18,13 @@
  */
 package org.apache.parquet.column.values.bytestreamsplit;
 
-import org.apache.parquet.bytes.BytesUtils;
-
-public class ByteStreamSplitValuesReaderForFloat extends 
ByteStreamSplitValuesReader {
-
-  private final byte[] valueByteBuffer;
-
-  public ByteStreamSplitValuesReaderForFloat() {
-    super(Float.BYTES);
-    valueByteBuffer = new byte[Float.BYTES];
+public class ByteStreamSplitValuesReaderForLong extends 
ByteStreamSplitValuesReader {
+  public ByteStreamSplitValuesReaderForLong() {
+    super(8);
   }
 
   @Override
-  public float readFloat() {
-    gatherElementDataFromStreams(valueByteBuffer);
-    return Float.intBitsToFloat(BytesUtils.bytesToInt(valueByteBuffer));
+  public long readLong() {
+    return decodedDataBuffer.getLong(nextElementByteOffset());
   }
 }
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesWriter.java
 
b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesWriter.java
index 4e2b56b0a..c197a4fd6 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesWriter.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesWriter.java
@@ -25,12 +25,13 @@ import 
org.apache.parquet.bytes.CapacityByteArrayOutputStream;
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.values.ValuesWriter;
 import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.api.Binary;
 
 public abstract class ByteStreamSplitValuesWriter extends ValuesWriter {
 
   protected final int numStreams;
   protected final int elementSizeInBytes;
-  private CapacityByteArrayOutputStream[] byteStreams;
+  private final CapacityByteArrayOutputStream[] byteStreams;
 
   public ByteStreamSplitValuesWriter(
       int elementSizeInBytes, int initialCapacity, int pageSize, 
ByteBufferAllocator allocator) {
@@ -140,4 +141,59 @@ public abstract class ByteStreamSplitValuesWriter extends 
ValuesWriter {
       return String.format("%s DoubleByteStreamSplitWriter %d bytes", prefix, 
getAllocatedSize());
     }
   }
+
+  public static class IntegerByteStreamSplitValuesWriter extends 
ByteStreamSplitValuesWriter {
+    public IntegerByteStreamSplitValuesWriter(int initialCapacity, int 
pageSize, ByteBufferAllocator allocator) {
+      super(4, initialCapacity, pageSize, allocator);
+    }
+
+    @Override
+    public void writeInteger(int v) {
+      super.scatterBytes(BytesUtils.intToBytes(v));
+    }
+
+    @Override
+    public String memUsageString(String prefix) {
+      return String.format("%s IntegerByteStreamSplitWriter %d bytes", prefix, 
getAllocatedSize());
+    }
+  }
+
+  public static class LongByteStreamSplitValuesWriter extends 
ByteStreamSplitValuesWriter {
+    public LongByteStreamSplitValuesWriter(int initialCapacity, int pageSize, 
ByteBufferAllocator allocator) {
+      super(8, initialCapacity, pageSize, allocator);
+    }
+
+    @Override
+    public void writeLong(long v) {
+      super.scatterBytes(BytesUtils.longToBytes(v));
+    }
+
+    @Override
+    public String memUsageString(String prefix) {
+      return String.format("%s LongByteStreamSplitWriter %d bytes", prefix, 
getAllocatedSize());
+    }
+  }
+
+  public static class FixedLenByteArrayByteStreamSplitValuesWriter extends 
ByteStreamSplitValuesWriter {
+    private final int length;
+
+    public FixedLenByteArrayByteStreamSplitValuesWriter(
+        int length, int initialCapacity, int pageSize, ByteBufferAllocator 
allocator) {
+      super(length, initialCapacity, pageSize, allocator);
+      this.length = length;
+    }
+
+    @Override
+    public final void writeBytes(Binary v) {
+      assert (v.length() == length)
+          : ("Fixed Binary size " + v.length() + " does not match field type 
length " + length);
+      super.scatterBytes(v.getBytesUnsafe());
+    }
+
+    @Override
+    public String memUsageString(String prefix) {
+      return String.format(
+          "%s FixedLenByteArrayByteStreamSplitValuesWriter %d bytes", prefix, 
getAllocatedSize());
+    }
+  }
 }
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java
 
b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java
index 9c536eebf..e0d12c287 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java
@@ -77,11 +77,19 @@ public class DefaultV1ValuesWriterFactory implements 
ValuesWriterFactory {
 
   private ValuesWriter getFixedLenByteArrayValuesWriter(ColumnDescriptor path) 
{
     // dictionary encoding was not enabled in PARQUET 1.0
-    return new FixedLenByteArrayPlainValuesWriter(
-        path.getTypeLength(),
-        parquetProperties.getInitialSlabSize(),
-        parquetProperties.getPageSizeThreshold(),
-        parquetProperties.getAllocator());
+    if (parquetProperties.isByteStreamSplitEnabled(path)) {
+      return new 
ByteStreamSplitValuesWriter.FixedLenByteArrayByteStreamSplitValuesWriter(
+          path.getPrimitiveType().getTypeLength(),
+          parquetProperties.getInitialSlabSize(),
+          parquetProperties.getPageSizeThreshold(),
+          parquetProperties.getAllocator());
+    } else {
+      return new FixedLenByteArrayPlainValuesWriter(
+          path.getPrimitiveType().getTypeLength(),
+          parquetProperties.getInitialSlabSize(),
+          parquetProperties.getPageSizeThreshold(),
+          parquetProperties.getAllocator());
+    }
   }
 
   private ValuesWriter getBinaryValuesWriter(ColumnDescriptor path) {
@@ -94,19 +102,35 @@ public class DefaultV1ValuesWriterFactory implements 
ValuesWriterFactory {
   }
 
   private ValuesWriter getInt32ValuesWriter(ColumnDescriptor path) {
-    ValuesWriter fallbackWriter = new PlainValuesWriter(
-        parquetProperties.getInitialSlabSize(),
-        parquetProperties.getPageSizeThreshold(),
-        parquetProperties.getAllocator());
+    final ValuesWriter fallbackWriter;
+    if (parquetProperties.isByteStreamSplitEnabled(path)) {
+      fallbackWriter = new 
ByteStreamSplitValuesWriter.IntegerByteStreamSplitValuesWriter(
+          parquetProperties.getInitialSlabSize(),
+          parquetProperties.getPageSizeThreshold(),
+          parquetProperties.getAllocator());
+    } else {
+      fallbackWriter = new PlainValuesWriter(
+          parquetProperties.getInitialSlabSize(),
+          parquetProperties.getPageSizeThreshold(),
+          parquetProperties.getAllocator());
+    }
     return DefaultValuesWriterFactory.dictWriterWithFallBack(
         path, parquetProperties, getEncodingForDictionaryPage(), 
getEncodingForDataPage(), fallbackWriter);
   }
 
   private ValuesWriter getInt64ValuesWriter(ColumnDescriptor path) {
-    ValuesWriter fallbackWriter = new PlainValuesWriter(
-        parquetProperties.getInitialSlabSize(),
-        parquetProperties.getPageSizeThreshold(),
-        parquetProperties.getAllocator());
+    final ValuesWriter fallbackWriter;
+    if (parquetProperties.isByteStreamSplitEnabled(path)) {
+      fallbackWriter = new 
ByteStreamSplitValuesWriter.LongByteStreamSplitValuesWriter(
+          parquetProperties.getInitialSlabSize(),
+          parquetProperties.getPageSizeThreshold(),
+          parquetProperties.getAllocator());
+    } else {
+      fallbackWriter = new PlainValuesWriter(
+          parquetProperties.getInitialSlabSize(),
+          parquetProperties.getPageSizeThreshold(),
+          parquetProperties.getAllocator());
+    }
     return DefaultValuesWriterFactory.dictWriterWithFallBack(
         path, parquetProperties, getEncodingForDictionaryPage(), 
getEncodingForDataPage(), fallbackWriter);
   }
@@ -123,7 +147,7 @@ public class DefaultV1ValuesWriterFactory implements 
ValuesWriterFactory {
 
   private ValuesWriter getDoubleValuesWriter(ColumnDescriptor path) {
     final ValuesWriter fallbackWriter;
-    if (this.parquetProperties.isByteStreamSplitEnabled()) {
+    if (this.parquetProperties.isByteStreamSplitEnabled(path)) {
       fallbackWriter = new 
ByteStreamSplitValuesWriter.DoubleByteStreamSplitValuesWriter(
           parquetProperties.getInitialSlabSize(),
           parquetProperties.getPageSizeThreshold(),
@@ -140,7 +164,7 @@ public class DefaultV1ValuesWriterFactory implements 
ValuesWriterFactory {
 
   private ValuesWriter getFloatValuesWriter(ColumnDescriptor path) {
     final ValuesWriter fallbackWriter;
-    if (this.parquetProperties.isByteStreamSplitEnabled()) {
+    if (this.parquetProperties.isByteStreamSplitEnabled(path)) {
       fallbackWriter = new 
ByteStreamSplitValuesWriter.FloatByteStreamSplitValuesWriter(
           parquetProperties.getInitialSlabSize(),
           parquetProperties.getPageSizeThreshold(),
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java
 
b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java
index 71a971316..c50b4e49c 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java
@@ -84,10 +84,22 @@ public class DefaultV2ValuesWriterFactory implements 
ValuesWriterFactory {
   }
 
   private ValuesWriter getFixedLenByteArrayValuesWriter(ColumnDescriptor path) 
{
-    ValuesWriter fallbackWriter = new DeltaByteArrayWriter(
-        parquetProperties.getInitialSlabSize(),
-        parquetProperties.getPageSizeThreshold(),
-        parquetProperties.getAllocator());
+    final ValuesWriter fallbackWriter;
+    // TODO:
+    //  Ideally we should only enable BYTE_STREAM_SPLIT if compression is 
enabled for a column.
+    //  However, this information is not available here.
+    if (parquetProperties.isByteStreamSplitEnabled(path)) {
+      fallbackWriter = new 
ByteStreamSplitValuesWriter.FixedLenByteArrayByteStreamSplitValuesWriter(
+          path.getTypeLength(),
+          parquetProperties.getInitialSlabSize(),
+          parquetProperties.getPageSizeThreshold(),
+          parquetProperties.getAllocator());
+    } else {
+      fallbackWriter = new DeltaByteArrayWriter(
+          parquetProperties.getInitialSlabSize(),
+          parquetProperties.getPageSizeThreshold(),
+          parquetProperties.getAllocator());
+    }
     return DefaultValuesWriterFactory.dictWriterWithFallBack(
         path, parquetProperties, getEncodingForDictionaryPage(), 
getEncodingForDataPage(), fallbackWriter);
   }
@@ -102,19 +114,35 @@ public class DefaultV2ValuesWriterFactory implements 
ValuesWriterFactory {
   }
 
   private ValuesWriter getInt32ValuesWriter(ColumnDescriptor path) {
-    ValuesWriter fallbackWriter = new DeltaBinaryPackingValuesWriterForInteger(
-        parquetProperties.getInitialSlabSize(),
-        parquetProperties.getPageSizeThreshold(),
-        parquetProperties.getAllocator());
+    final ValuesWriter fallbackWriter;
+    if (parquetProperties.isByteStreamSplitEnabled(path)) {
+      fallbackWriter = new 
ByteStreamSplitValuesWriter.IntegerByteStreamSplitValuesWriter(
+          parquetProperties.getInitialSlabSize(),
+          parquetProperties.getPageSizeThreshold(),
+          parquetProperties.getAllocator());
+    } else {
+      fallbackWriter = new DeltaBinaryPackingValuesWriterForInteger(
+          parquetProperties.getInitialSlabSize(),
+          parquetProperties.getPageSizeThreshold(),
+          parquetProperties.getAllocator());
+    }
     return DefaultValuesWriterFactory.dictWriterWithFallBack(
         path, parquetProperties, getEncodingForDictionaryPage(), 
getEncodingForDataPage(), fallbackWriter);
   }
 
   private ValuesWriter getInt64ValuesWriter(ColumnDescriptor path) {
-    ValuesWriter fallbackWriter = new DeltaBinaryPackingValuesWriterForLong(
-        parquetProperties.getInitialSlabSize(),
-        parquetProperties.getPageSizeThreshold(),
-        parquetProperties.getAllocator());
+    final ValuesWriter fallbackWriter;
+    if (parquetProperties.isByteStreamSplitEnabled(path)) {
+      fallbackWriter = new 
ByteStreamSplitValuesWriter.LongByteStreamSplitValuesWriter(
+          parquetProperties.getInitialSlabSize(),
+          parquetProperties.getPageSizeThreshold(),
+          parquetProperties.getAllocator());
+    } else {
+      fallbackWriter = new DeltaBinaryPackingValuesWriterForLong(
+          parquetProperties.getInitialSlabSize(),
+          parquetProperties.getPageSizeThreshold(),
+          parquetProperties.getAllocator());
+    }
     return DefaultValuesWriterFactory.dictWriterWithFallBack(
         path, parquetProperties, getEncodingForDictionaryPage(), 
getEncodingForDataPage(), fallbackWriter);
   }
@@ -131,7 +159,7 @@ public class DefaultV2ValuesWriterFactory implements 
ValuesWriterFactory {
 
   private ValuesWriter getDoubleValuesWriter(ColumnDescriptor path) {
     final ValuesWriter fallbackWriter;
-    if (this.parquetProperties.isByteStreamSplitEnabled()) {
+    if (this.parquetProperties.isByteStreamSplitEnabled(path)) {
       fallbackWriter = new 
ByteStreamSplitValuesWriter.DoubleByteStreamSplitValuesWriter(
           parquetProperties.getInitialSlabSize(),
           parquetProperties.getPageSizeThreshold(),
@@ -148,7 +176,7 @@ public class DefaultV2ValuesWriterFactory implements 
ValuesWriterFactory {
 
   private ValuesWriter getFloatValuesWriter(ColumnDescriptor path) {
     final ValuesWriter fallbackWriter;
-    if (this.parquetProperties.isByteStreamSplitEnabled()) {
+    if (this.parquetProperties.isByteStreamSplitEnabled(path)) {
       fallbackWriter = new 
ByteStreamSplitValuesWriter.FloatByteStreamSplitValuesWriter(
           parquetProperties.getInitialSlabSize(),
           parquetProperties.getPageSizeThreshold(),
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/example/data/simple/SimpleGroup.java
 
b/parquet-column/src/main/java/org/apache/parquet/example/data/simple/SimpleGroup.java
index a792aa252..763208139 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/example/data/simple/SimpleGroup.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/example/data/simple/SimpleGroup.java
@@ -60,7 +60,7 @@ public class SimpleGroup extends Group {
             builder.append('\n');
             ((SimpleGroup) value).appendToString(builder, indent + "  ");
           } else {
-            builder.append(": ").append(value.toString()).append('\n');
+            builder.append(": ").append(value).append('\n');
           }
         }
       }
@@ -81,6 +81,32 @@ public class SimpleGroup extends Group {
     return g;
   }
 
+  public Object getObject(String field, int index) {
+    return getObject(getType().getFieldIndex(field), index);
+  }
+
+  public Object getObject(int fieldIndex, int index) {
+    Object wrapped = getValue(fieldIndex, index);
+    // Unwrap to Java standard object, if possible
+    if (wrapped instanceof BooleanValue) {
+      return ((BooleanValue) wrapped).getBoolean();
+    } else if (wrapped instanceof IntegerValue) {
+      return ((IntegerValue) wrapped).getInteger();
+    } else if (wrapped instanceof LongValue) {
+      return ((LongValue) wrapped).getLong();
+    } else if (wrapped instanceof Int96Value) {
+      return ((Int96Value) wrapped).getInt96();
+    } else if (wrapped instanceof FloatValue) {
+      return ((FloatValue) wrapped).getFloat();
+    } else if (wrapped instanceof DoubleValue) {
+      return ((DoubleValue) wrapped).getDouble();
+    } else if (wrapped instanceof BinaryValue) {
+      return ((BinaryValue) wrapped).getBinary();
+    } else {
+      return wrapped;
+    }
+  }
+
   @Override
   public Group getGroup(int fieldIndex, int index) {
     return (Group) getValue(fieldIndex, index);
diff --git 
a/parquet-column/src/test/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesEndToEndTest.java
 
b/parquet-column/src/test/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesEndToEndTest.java
index 2234a270a..259b062b9 100644
--- 
a/parquet-column/src/test/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesEndToEndTest.java
+++ 
b/parquet-column/src/test/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesEndToEndTest.java
@@ -24,6 +24,7 @@ import java.util.Random;
 import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.bytes.DirectByteBufferAllocator;
+import org.apache.parquet.io.api.Binary;
 import org.junit.Test;
 
 public class ByteStreamSplitValuesEndToEndTest {
@@ -73,11 +74,7 @@ public class ByteStreamSplitValuesEndToEndTest {
     // Generate random data.
     Random rand = new Random(18990);
     final int numElements = 1024;
-    double[] values = new double[numElements];
-    for (int i = 0; i < numElements; ++i) {
-      double f = rand.nextDouble() * 16384.0;
-      values[i] = f;
-    }
+    double[] values = rand.doubles(numElements).toArray();
 
     // Encode data.
     ByteStreamSplitValuesWriter.DoubleByteStreamSplitValuesWriter writer =
@@ -103,4 +100,113 @@ public class ByteStreamSplitValuesEndToEndTest {
     writer.reset();
     writer.close();
   }
+
+  @Test
+  public void testIntegerPipeline() throws Exception {
+    // Generate random data.
+    Random rand = new Random(18990);
+    final int numElements = 1024;
+    int[] values = rand.ints(numElements).toArray();
+
+    // Encode data.
+    ByteStreamSplitValuesWriter.IntegerByteStreamSplitValuesWriter writer =
+        new ByteStreamSplitValuesWriter.IntegerByteStreamSplitValuesWriter(
+            numElements * 4, numElements * 4, new DirectByteBufferAllocator());
+    for (int v : values) {
+      writer.writeInteger(v);
+    }
+
+    assertEquals(numElements * 4, writer.getBufferedSize());
+    BytesInput input = writer.getBytes();
+    assertEquals(numElements * 4, input.size());
+
+    ByteStreamSplitValuesReaderForInteger reader = new 
ByteStreamSplitValuesReaderForInteger();
+    reader.initFromPage(numElements, 
ByteBufferInputStream.wrap(input.toByteBuffer()));
+
+    for (int expectedValue : values) {
+      int newValue = reader.readInteger();
+      assertEquals(expectedValue, newValue);
+    }
+
+    writer.reset();
+    writer.close();
+  }
+
+  @Test
+  public void testLongPipeline() throws Exception {
+    // Generate random data.
+    Random rand = new Random(18990);
+    final int numElements = 1024;
+    long[] values = rand.longs(numElements).toArray();
+
+    // Encode data.
+    ByteStreamSplitValuesWriter.LongByteStreamSplitValuesWriter writer =
+        new ByteStreamSplitValuesWriter.LongByteStreamSplitValuesWriter(
+            numElements * 8, numElements * 8, new DirectByteBufferAllocator());
+    for (long v : values) {
+      writer.writeLong(v);
+    }
+
+    assertEquals(numElements * 8, writer.getBufferedSize());
+    BytesInput input = writer.getBytes();
+    assertEquals(numElements * 8, input.size());
+
+    ByteStreamSplitValuesReaderForLong reader = new 
ByteStreamSplitValuesReaderForLong();
+    reader.initFromPage(numElements, 
ByteBufferInputStream.wrap(input.toByteBuffer()));
+
+    for (long expectedValue : values) {
+      long newValue = reader.readLong();
+      assertEquals(expectedValue, newValue);
+    }
+
+    writer.reset();
+    writer.close();
+  }
+
+  @Test
+  public void testFixedLenByteArrayPipeline() throws Exception {
+    // Generate random data.
+    Random rand = new Random(18990);
+    final int numElements = 1024;
+    final int typeLength = 3;
+    byte[][] values = new byte[numElements][];
+    for (int i = 0; i < numElements; ++i) {
+      values[i] = new byte[typeLength];
+      rand.nextBytes(values[i]);
+    }
+
+    // Encode data.
+    ByteStreamSplitValuesWriter.FixedLenByteArrayByteStreamSplitValuesWriter 
writer =
+        new 
ByteStreamSplitValuesWriter.FixedLenByteArrayByteStreamSplitValuesWriter(
+            typeLength,
+            numElements * typeLength,
+            numElements * typeLength,
+            new DirectByteBufferAllocator());
+    for (byte[] v : values) {
+      writer.writeBytes(Binary.fromConstantByteArray(v));
+    }
+
+    assertEquals(numElements * typeLength, writer.getBufferedSize());
+    BytesInput input = writer.getBytes();
+    assertEquals(numElements * typeLength, input.size());
+
+    ByteStreamSplitValuesReaderForFLBA reader = new 
ByteStreamSplitValuesReaderForFLBA(typeLength);
+    reader.initFromPage(numElements, 
ByteBufferInputStream.wrap(input.toByteBuffer()));
+
+    Binary previousExpected = null, previousActual = null;
+    for (byte[] expectedValue : values) {
+      Binary expected = Binary.fromConstantByteArray(expectedValue);
+      Binary actual = reader.readBytes();
+      assertEquals(expected, actual);
+      if (previousExpected != null) {
+        // The latest readBytes() call shouldn't have clobbered the result of 
the previous call.
+        assertEquals(previousExpected, previousActual);
+      }
+      previousExpected = expected;
+      previousActual = actual;
+    }
+
+    writer.reset();
+    writer.close();
+  }
 }
diff --git 
a/parquet-column/src/test/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderTest.java
 
b/parquet-column/src/test/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderTest.java
index efd8d6db6..348d24559 100644
--- 
a/parquet-column/src/test/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderTest.java
+++ 
b/parquet-column/src/test/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderTest.java
@@ -19,28 +19,40 @@
 package org.apache.parquet.column.values.bytestreamsplit;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Random;
 import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.io.ParquetDecodingException;
-import org.junit.Assert;
+import org.apache.parquet.io.api.Binary;
 import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
 
+@RunWith(Enclosed.class)
 public class ByteStreamSplitValuesReaderTest {
+  private static <Reader extends ValuesReader> Reader makeReader(byte[] input, 
int length, Class<Reader> cls)
+      throws Exception {
+    ByteBuffer buffer = ByteBuffer.wrap(input);
+    ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffer);
+    Reader reader = cls.newInstance();
+    reader.initFromPage(length, stream);
+    return reader;
+  }
 
   public static class FloatTest {
 
-    private void testReader(byte[] input, float[] values) throws IOException {
-      ByteBuffer buffer = ByteBuffer.wrap(input);
-      ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffer);
-      ByteStreamSplitValuesReaderForFloat reader = new 
ByteStreamSplitValuesReaderForFloat();
-      reader.initFromPage(values.length, stream);
+    private void testReader(byte[] input, float[] values) throws Exception {
+      ByteStreamSplitValuesReaderForFloat reader =
+          makeReader(input, values.length, 
ByteStreamSplitValuesReaderForFloat.class);
       for (float expectedValue : values) {
         float f = reader.readFloat();
         assertEquals(expectedValue, f, 0.0f);
       }
+      // All data exhausted
+      assertThrows(ParquetDecodingException.class, () -> reader.readFloat());
     }
 
     @Test
@@ -65,7 +77,7 @@ public class ByteStreamSplitValuesReaderTest {
         (byte) 0x41,
         (byte) 0x42
       };
-      float expectedValues[] = {-98.62548828125f, 23.62744140625f, 
44.62939453125f};
+      float[] expectedValues = {-98.62548828125f, 23.62744140625f, 
44.62939453125f};
       testReader(byteData, expectedValues);
     }
 
@@ -90,18 +102,11 @@ public class ByteStreamSplitValuesReaderTest {
     @Test
     public void testExtraReads() throws Exception {
       byte[] byteData = {(byte) 0x00, (byte) 0x00, (byte) 0x10, (byte) 0x40};
-      ByteBuffer buffer = ByteBuffer.wrap(byteData);
-      ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffer);
-
-      ByteStreamSplitValuesReaderForFloat reader = new 
ByteStreamSplitValuesReaderForFloat();
-      reader.initFromPage(1, stream);
+      ByteStreamSplitValuesReaderForFloat reader =
+          makeReader(byteData, 1, ByteStreamSplitValuesReaderForFloat.class);
       float f = reader.readFloat();
       assertEquals(2.25f, f, 0.0f);
-      try {
-        reader.readFloat();
-        Assert.fail("Expected an exception.");
-      } catch (ParquetDecodingException ex) {
-      }
+      assertThrows(ParquetDecodingException.class, () -> reader.readFloat());
     }
 
     @Test
@@ -114,60 +119,44 @@ public class ByteStreamSplitValuesReaderTest {
       byteData[7] = (byte) 0x00;
       byteData[11] = (byte) 0x10;
       byteData[15] = (byte) 0x40;
-      ByteBuffer buffer = ByteBuffer.wrap(byteData);
-      ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffer);
 
-      ByteStreamSplitValuesReaderForFloat reader = new 
ByteStreamSplitValuesReaderForFloat();
-      reader.initFromPage(4, stream);
+      ByteStreamSplitValuesReaderForFloat reader =
+          makeReader(byteData, 4, ByteStreamSplitValuesReaderForFloat.class);
       reader.skip(3);
       float f = reader.readFloat();
       assertEquals(2.25f, f, 0.0f);
+      // Data exhausted
+      assertThrows(ParquetDecodingException.class, () -> reader.readFloat());
     }
 
     @Test
     public void testSkipOverflow() throws Exception {
       byte[] byteData = new byte[128];
-      ByteBuffer buffer = ByteBuffer.wrap(byteData);
-      ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffer);
-
-      ByteStreamSplitValuesReaderForFloat reader = new 
ByteStreamSplitValuesReaderForFloat();
-      reader.initFromPage(32, stream);
-
-      try {
-        reader.skip(33);
-        Assert.fail("Expected an exception.");
-      } catch (ParquetDecodingException ex) {
-      }
+      ByteStreamSplitValuesReaderForFloat reader =
+          makeReader(byteData, 32, ByteStreamSplitValuesReaderForFloat.class);
+      assertThrows(ParquetDecodingException.class, () -> reader.skip(33));
     }
 
     @Test
     public void testSkipUnderflow() throws Exception {
       byte[] byteData = new byte[128];
-      ByteBuffer buffer = ByteBuffer.wrap(byteData);
-      ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffer);
-
-      ByteStreamSplitValuesReaderForFloat reader = new 
ByteStreamSplitValuesReaderForFloat();
-      reader.initFromPage(32, stream);
-
-      try {
-        reader.skip(-1);
-        Assert.fail("Expected an exception.");
-      } catch (ParquetDecodingException ex) {
-      }
+      ByteStreamSplitValuesReaderForFloat reader =
+          makeReader(byteData, 32, ByteStreamSplitValuesReaderForFloat.class);
+      assertThrows(ParquetDecodingException.class, () -> reader.skip(-1));
     }
   }
 
   public static class DoubleTest {
 
-    private void testReader(byte[] input, double[] values) throws IOException {
-      ByteBuffer buffer = ByteBuffer.wrap(input);
-      ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffer);
-      ByteStreamSplitValuesReaderForDouble reader = new 
ByteStreamSplitValuesReaderForDouble();
-      reader.initFromPage(values.length, stream);
+    private void testReader(byte[] input, double[] values) throws Exception {
+      ByteStreamSplitValuesReaderForDouble reader =
+          makeReader(input, values.length, 
ByteStreamSplitValuesReaderForDouble.class);
       for (double expectedValue : values) {
         double d = reader.readDouble();
         assertEquals(expectedValue, d, 0.0);
       }
+      // All data exhausted
+      assertThrows(ParquetDecodingException.class, () -> reader.readDouble());
     }
 
     @Test
@@ -206,7 +195,7 @@ public class ByteStreamSplitValuesReaderTest {
         (byte) 0xC0,
         (byte) 0x3F
       };
-      double expectedValues[] = {256.625449218, -78956.4455667788, 0.62565};
+      double[] expectedValues = {256.625449218, -78956.4455667788, 0.62565};
       testReader(byteData, expectedValues);
     }
 
@@ -227,4 +216,124 @@ public class ByteStreamSplitValuesReaderTest {
       testReader(byteData, values);
     }
   }
+
+  public static class IntegerTest {
+    private void testReader(byte[] input, int[] values) throws Exception {
+      ByteStreamSplitValuesReaderForInteger reader =
+          makeReader(input, values.length, 
ByteStreamSplitValuesReaderForInteger.class);
+      for (int expectedValue : values) {
+        int actual = reader.readInteger();
+        assertEquals(expectedValue, actual);
+      }
+      // All data exhausted
+      assertThrows(ParquetDecodingException.class, () -> reader.readInteger());
+    }
+
+    @Test
+    public void testSingleElement() throws Exception {
+      byte[] byteData = {(byte) 0x12, (byte) 0x34, (byte) 0x56, (byte) 0x78};
+      testReader(byteData, new int[] {0x78563412});
+    }
+
+    @Test
+    public void testSmallBuffer() throws Exception {
+      byte[] byteData = {
+        (byte) 0x12, (byte) 0x34, (byte) 0x56, (byte) 0x78,
+        (byte) 0x9A, (byte) 0xBC, (byte) 0xDE, (byte) 0xF0,
+      };
+      testReader(byteData, new int[] {0xDE9A5612, 0xF0BC7834});
+    }
+  }
+
+  public static class LongTest {
+    private void testReader(byte[] input, long[] values) throws Exception {
+      ByteStreamSplitValuesReaderForLong reader =
+          makeReader(input, values.length, 
ByteStreamSplitValuesReaderForLong.class);
+      for (long expectedValue : values) {
+        long actual = reader.readLong();
+        assertEquals(expectedValue, actual);
+      }
+      // All data exhausted
+      assertThrows(ParquetDecodingException.class, () -> reader.readLong());
+    }
+
+    @Test
+    public void testSingleElement() throws Exception {
+      byte[] byteData = {
+        (byte) 0x12, (byte) 0x34, (byte) 0x56, (byte) 0x78,
+        (byte) 0x9A, (byte) 0xBC, (byte) 0xDE, (byte) 0xF0,
+      };
+      testReader(byteData, new long[] {0xF0DEBC9A78563412L});
+    }
+
+    @Test
+    public void testSmallBuffer() throws Exception {
+      byte[] byteData = {
+        (byte) 0x00, (byte) 0x11, (byte) 0x22, (byte) 0x33,
+        (byte) 0x44, (byte) 0x55, (byte) 0x66, (byte) 0x77,
+        (byte) 0x88, (byte) 0x99, (byte) 0xAA, (byte) 0xBB,
+        (byte) 0xCC, (byte) 0xDD, (byte) 0xEE, (byte) 0xFF,
+      };
+      testReader(byteData, new long[] {0xEECCAA8866442200L, 
0xFFDDBB9977553311L});
+    }
+  }
+
+  public static class FixedLenByteArrayTest {
+    private static ByteStreamSplitValuesReaderForFLBA makeReader(byte[] input, 
int valuesCount) throws Exception {
+      ByteBuffer buffer = ByteBuffer.wrap(input);
+      ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffer);
+      ByteStreamSplitValuesReaderForFLBA reader =
+          new ByteStreamSplitValuesReaderForFLBA(input.length / valuesCount);
+      reader.initFromPage(valuesCount, stream);
+      return reader;
+    }
+
+    private void testReader(byte[] input, byte[][] values) throws Exception {
+      ByteStreamSplitValuesReaderForFLBA reader = makeReader(input, 
values.length);
+      Binary previousExpected = null, previousActual = null;
+      for (byte[] expectedValue : values) {
+        Binary expected = Binary.fromReusedByteArray(expectedValue);
+        Binary actual = reader.readBytes();
+        assertEquals(expected, actual);
+        if (previousExpected != null) {
+          // The latest readBytes() call shouldn't have clobbered the result 
of the previous call.
+          assertEquals(previousExpected, previousActual);
+        }
+        previousExpected = expected;
+        previousActual = actual;
+      }
+      // All data exhausted
+      assertThrows(ParquetDecodingException.class, () -> reader.readBytes());
+    }
+
+    @Test
+    public void testSingleElement() throws Exception {
+      byte[] byteData = {
+        (byte) 0x12, (byte) 0x34, (byte) 0x56,
+      };
+      byte[][] values = {
+        {
+          (byte) 0x12, (byte) 0x34, (byte) 0x56,
+        }
+      };
+      testReader(byteData, values);
+    }
+
+    @Test
+    public void testSmallBuffer() throws Exception {
+      byte[] byteData = {
+        (byte) 0x12, (byte) 0x34, (byte) 0x56,
+        (byte) 0x78, (byte) 0x9A, (byte) 0xBC,
+      };
+      byte[][] values = {
+        {
+          (byte) 0x12, (byte) 0x56, (byte) 0x9A,
+        },
+        {
+          (byte) 0x34, (byte) 0x78, (byte) 0xBC,
+        },
+      };
+      testReader(byteData, values);
+    }
+  }
 }
diff --git 
a/parquet-column/src/test/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesWriterTest.java
 
b/parquet-column/src/test/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesWriterTest.java
index 76cc8e679..5293adf9d 100644
--- 
a/parquet-column/src/test/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesWriterTest.java
+++ 
b/parquet-column/src/test/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesWriterTest.java
@@ -22,8 +22,12 @@ import static org.junit.Assert.assertEquals;
 
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.bytes.DirectByteBufferAllocator;
+import org.apache.parquet.io.api.Binary;
 import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
 
+@RunWith(Enclosed.class)
 public class ByteStreamSplitValuesWriterTest {
 
   public static class FloatTest {
@@ -31,7 +35,7 @@ public class ByteStreamSplitValuesWriterTest {
     static float convertType(byte[] bytes) {
       int v = 0;
       for (int i = 0; i < bytes.length; ++i) {
-        v |= (((int) (bytes[i] & 0xFF)) << (i * 8));
+        v |= ((bytes[i] & 0xFF) << (i * 8));
       }
       return Float.intBitsToFloat(v);
     }
@@ -186,4 +190,97 @@ public class ByteStreamSplitValuesWriterTest {
       }
     }
   }
+
+  public static class IntegerTest {
+    private ByteStreamSplitValuesWriter.IntegerByteStreamSplitValuesWriter 
getWriter(int capacity) {
+      return new 
ByteStreamSplitValuesWriter.IntegerByteStreamSplitValuesWriter(
+          capacity, capacity, new DirectByteBufferAllocator());
+    }
+
+    @Test
+    public void testSmallBuffer() throws Exception {
+      ByteStreamSplitValuesWriter.IntegerByteStreamSplitValuesWriter writer = 
getWriter(3);
+      writer.writeInteger(0x11223344);
+      writer.writeInteger(0xFFEEDDCC);
+      writer.writeInteger(0x66778899);
+
+      assertEquals(12, writer.getBufferedSize());
+      byte[] rawBytes = writer.getBytes().toByteArray();
+      assertEquals(12, rawBytes.length);
+
+      final byte[] expectedBytes = {
+        (byte) 0x44, (byte) 0xCC, (byte) 0x99,
+        (byte) 0x33, (byte) 0xDD, (byte) 0x88,
+        (byte) 0x22, (byte) 0xEE, (byte) 0x77,
+        (byte) 0x11, (byte) 0xFF, (byte) 0x66
+      };
+      for (int i = 0; i < expectedBytes.length; ++i) {
+        assertEquals(expectedBytes[i], rawBytes[i]);
+      }
+      writer.reset();
+      writer.close();
+    }
+  }
+
+  public static class LongTest {
+    private ByteStreamSplitValuesWriter.LongByteStreamSplitValuesWriter 
getWriter(int capacity) {
+      return new ByteStreamSplitValuesWriter.LongByteStreamSplitValuesWriter(
+          capacity, capacity, new DirectByteBufferAllocator());
+    }
+
+    @Test
+    public void testSmallBuffer() throws Exception {
+      ByteStreamSplitValuesWriter.LongByteStreamSplitValuesWriter writer = 
getWriter(2);
+      writer.writeLong(0x1122334455667700L);
+      writer.writeLong(0xFFEEDDCCBBAA9988L);
+
+      assertEquals(16, writer.getBufferedSize());
+      byte[] rawBytes = writer.getBytes().toByteArray();
+      assertEquals(16, rawBytes.length);
+
+      final byte[] expectedBytes = {
+        (byte) 0x00, (byte) 0x88,
+        (byte) 0x77, (byte) 0x99,
+        (byte) 0x66, (byte) 0xAA,
+        (byte) 0x55, (byte) 0xBB,
+        (byte) 0x44, (byte) 0xCC,
+        (byte) 0x33, (byte) 0xDD,
+        (byte) 0x22, (byte) 0xEE,
+        (byte) 0x11, (byte) 0xFF,
+      };
+      for (int i = 0; i < expectedBytes.length; ++i) {
+        assertEquals(expectedBytes[i], rawBytes[i]);
+      }
+      writer.reset();
+      writer.close();
+    }
+  }
+
+  public static class FixedLenByteArrayTest {
+    private 
ByteStreamSplitValuesWriter.FixedLenByteArrayByteStreamSplitValuesWriter 
getWriter(
+        int length, int capacity) {
+      return new 
ByteStreamSplitValuesWriter.FixedLenByteArrayByteStreamSplitValuesWriter(
+          length, capacity, capacity, new DirectByteBufferAllocator());
+    }
+
+    @Test
+    public void testSmallBuffer() throws Exception {
+      ByteStreamSplitValuesWriter.FixedLenByteArrayByteStreamSplitValuesWriter 
writer = getWriter(3, 2);
+      writer.writeBytes(Binary.fromString("abc"));
+      writer.writeBytes(Binary.fromString("ghi"));
+
+      assertEquals(6, writer.getBufferedSize());
+      byte[] rawBytes = writer.getBytes().toByteArray();
+      assertEquals(6, rawBytes.length);
+
+      final byte[] expectedBytes = {
+        'a', 'g', 'b', 'h', 'c', 'i',
+      };
+      for (int i = 0; i < expectedBytes.length; ++i) {
+        assertEquals(expectedBytes[i], rawBytes[i]);
+      }
+      writer.reset();
+      writer.close();
+    }
+  }
 }
diff --git 
a/parquet-column/src/test/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactoryTest.java
 
b/parquet-column/src/test/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactoryTest.java
index 45afd41ad..37fca55ef 100644
--- 
a/parquet-column/src/test/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactoryTest.java
+++ 
b/parquet-column/src/test/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactoryTest.java
@@ -45,6 +45,8 @@ import 
org.apache.parquet.column.values.plain.BooleanPlainValuesWriter;
 import 
org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter;
 import org.apache.parquet.column.values.plain.PlainValuesWriter;
 import 
org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 import org.junit.Test;
 
@@ -53,7 +55,12 @@ public class DefaultValuesWriterFactoryTest {
   @Test
   public void testBoolean() {
     doTestValueWriter(
-        PrimitiveTypeName.BOOLEAN, WriterVersion.PARQUET_1_0, true, false, 
BooleanPlainValuesWriter.class);
+        PrimitiveTypeName.BOOLEAN,
+        WriterVersion.PARQUET_1_0,
+        true,
+        false,
+        false,
+        BooleanPlainValuesWriter.class);
   }
 
   @Test
@@ -63,6 +70,7 @@ public class DefaultValuesWriterFactoryTest {
         WriterVersion.PARQUET_2_0,
         true,
         false,
+        false,
         RunLengthBitPackingHybridValuesWriter.class);
   }
 
@@ -73,6 +81,26 @@ public class DefaultValuesWriterFactoryTest {
         WriterVersion.PARQUET_1_0,
         true,
         false,
+        false,
+        FixedLenByteArrayPlainValuesWriter.class);
+  }
+
+  @Test
+  public void testFixedLenByteArray_WithByteStreamSplit() {
+    // No dictionary encoding for FLBA in Parquet 1.0
+    doTestValueWriter(
+        PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY,
+        WriterVersion.PARQUET_1_0,
+        true,
+        false,
+        true,
+        ByteStreamSplitValuesWriter.class);
+    doTestValueWriter(
+        PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY,
+        WriterVersion.PARQUET_1_0,
+        true,
+        true,
+        false,
         FixedLenByteArrayPlainValuesWriter.class);
   }
 
@@ -83,10 +111,23 @@ public class DefaultValuesWriterFactoryTest {
         WriterVersion.PARQUET_2_0,
         true,
         false,
+        false,
         DictionaryValuesWriter.class,
         DeltaByteArrayWriter.class);
   }
 
+  @Test
+  public void testFixedLenByteArray_V2_WithByteStreamSplit() {
+    testExtendedByteStreamSplit(
+        PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, WriterVersion.PARQUET_2_0, 
DeltaByteArrayWriter.class);
+  }
+
+  @Test
+  public void testFixedLenByteArray_V2_WithByteStreamSplit_NoDict() {
+    testExtendedByteStreamSplit_NoDict(
+        PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, WriterVersion.PARQUET_2_0, 
DeltaByteArrayWriter.class);
+  }
+
   @Test
   public void testFixedLenByteArray_V2_NoDict() {
     doTestValueWriter(
@@ -94,6 +135,7 @@ public class DefaultValuesWriterFactoryTest {
         WriterVersion.PARQUET_2_0,
         false,
         false,
+        false,
         DeltaByteArrayWriter.class);
   }
 
@@ -104,13 +146,15 @@ public class DefaultValuesWriterFactoryTest {
         WriterVersion.PARQUET_1_0,
         true,
         false,
+        false,
         PlainBinaryDictionaryValuesWriter.class,
         PlainValuesWriter.class);
   }
 
   @Test
   public void testBinary_NoDict() {
-    doTestValueWriter(PrimitiveTypeName.BINARY, WriterVersion.PARQUET_1_0, 
false, false, PlainValuesWriter.class);
+    doTestValueWriter(
+        PrimitiveTypeName.BINARY, WriterVersion.PARQUET_1_0, false, false, 
false, PlainValuesWriter.class);
   }
 
   @Test
@@ -120,6 +164,7 @@ public class DefaultValuesWriterFactoryTest {
         WriterVersion.PARQUET_2_0,
         true,
         false,
+        false,
         PlainBinaryDictionaryValuesWriter.class,
         DeltaByteArrayWriter.class);
   }
@@ -127,7 +172,7 @@ public class DefaultValuesWriterFactoryTest {
   @Test
   public void testBinary_V2_NoDict() {
     doTestValueWriter(
-        PrimitiveTypeName.BINARY, WriterVersion.PARQUET_2_0, false, false, 
DeltaByteArrayWriter.class);
+        PrimitiveTypeName.BINARY, WriterVersion.PARQUET_2_0, false, false, 
false, DeltaByteArrayWriter.class);
   }
 
   @Test
@@ -137,13 +182,25 @@ public class DefaultValuesWriterFactoryTest {
         WriterVersion.PARQUET_1_0,
         true,
         false,
+        false,
         PlainIntegerDictionaryValuesWriter.class,
         PlainValuesWriter.class);
   }
 
   @Test
   public void testInt32_NoDict() {
-    doTestValueWriter(PrimitiveTypeName.INT32, WriterVersion.PARQUET_1_0, 
false, false, PlainValuesWriter.class);
+    doTestValueWriter(
+        PrimitiveTypeName.INT32, WriterVersion.PARQUET_1_0, false, false, 
false, PlainValuesWriter.class);
+  }
+
+  @Test
+  public void testInt32_ByteStreamSplit() {
+    testExtendedByteStreamSplit(PrimitiveTypeName.INT32, 
WriterVersion.PARQUET_1_0, PlainValuesWriter.class);
+  }
+
+  @Test
+  public void testInt32_ByteStreamSplit_NoDict() {
+    testExtendedByteStreamSplit_NoDict(PrimitiveTypeName.INT32, 
WriterVersion.PARQUET_1_0, PlainValuesWriter.class);
   }
 
   @Test
@@ -153,6 +210,7 @@ public class DefaultValuesWriterFactoryTest {
         WriterVersion.PARQUET_2_0,
         true,
         false,
+        false,
         PlainIntegerDictionaryValuesWriter.class,
         DeltaBinaryPackingValuesWriter.class);
   }
@@ -160,7 +218,22 @@ public class DefaultValuesWriterFactoryTest {
   @Test
   public void testInt32_V2_NoDict() {
     doTestValueWriter(
-        PrimitiveTypeName.INT32, WriterVersion.PARQUET_2_0, false, false, 
DeltaBinaryPackingValuesWriter.class);
+        PrimitiveTypeName.INT32,
+        WriterVersion.PARQUET_2_0,
+        false,
+        false,
+        false,
+        DeltaBinaryPackingValuesWriter.class);
+  }
+
+  @Test
+  public void testInt32_V2_ByteStreamSplit() {
+    testExtendedByteStreamSplit(INT32, WriterVersion.PARQUET_2_0, 
DeltaBinaryPackingValuesWriter.class);
+  }
+
+  @Test
+  public void testInt32_V2_ByteStreamSplit_NoDict() {
+    testExtendedByteStreamSplit_NoDict(INT32, WriterVersion.PARQUET_2_0, 
DeltaBinaryPackingValuesWriter.class);
   }
 
   @Test
@@ -170,13 +243,25 @@ public class DefaultValuesWriterFactoryTest {
         WriterVersion.PARQUET_1_0,
         true,
         false,
+        false,
         PlainLongDictionaryValuesWriter.class,
         PlainValuesWriter.class);
   }
 
   @Test
   public void testInt64_NoDict() {
-    doTestValueWriter(PrimitiveTypeName.INT64, WriterVersion.PARQUET_1_0, 
false, false, PlainValuesWriter.class);
+    doTestValueWriter(
+        PrimitiveTypeName.INT64, WriterVersion.PARQUET_1_0, false, false, 
false, PlainValuesWriter.class);
+  }
+
+  @Test
+  public void testInt64_ByteStreamSplit() {
+    testExtendedByteStreamSplit(PrimitiveTypeName.INT64, 
WriterVersion.PARQUET_1_0, PlainValuesWriter.class);
+  }
+
+  @Test
+  public void testInt64_ByteStreamSplit_NoDict() {
+    testExtendedByteStreamSplit_NoDict(PrimitiveTypeName.INT64, 
WriterVersion.PARQUET_1_0, PlainValuesWriter.class);
   }
 
   @Test
@@ -186,6 +271,7 @@ public class DefaultValuesWriterFactoryTest {
         WriterVersion.PARQUET_2_0,
         true,
         false,
+        false,
         PlainLongDictionaryValuesWriter.class,
         DeltaBinaryPackingValuesWriterForLong.class);
   }
@@ -197,9 +283,22 @@ public class DefaultValuesWriterFactoryTest {
         WriterVersion.PARQUET_2_0,
         false,
         false,
+        false,
         DeltaBinaryPackingValuesWriterForLong.class);
   }
 
+  @Test
+  public void testInt64_V2_ByteStreamSplit() {
+    testExtendedByteStreamSplit(
+        PrimitiveTypeName.INT64, WriterVersion.PARQUET_2_0, 
DeltaBinaryPackingValuesWriter.class);
+  }
+
+  @Test
+  public void testInt64_V2_ByteStreamSplit_NoDict() {
+    testExtendedByteStreamSplit_NoDict(
+        PrimitiveTypeName.INT64, WriterVersion.PARQUET_2_0, 
DeltaBinaryPackingValuesWriter.class);
+  }
+
   @Test
   public void testInt96() {
     doTestValueWriter(
@@ -207,6 +306,7 @@ public class DefaultValuesWriterFactoryTest {
         WriterVersion.PARQUET_1_0,
         true,
         false,
+        false,
         PlainFixedLenArrayDictionaryValuesWriter.class,
         FixedLenByteArrayPlainValuesWriter.class);
   }
@@ -218,6 +318,7 @@ public class DefaultValuesWriterFactoryTest {
         WriterVersion.PARQUET_1_0,
         false,
         false,
+        false,
         FixedLenByteArrayPlainValuesWriter.class);
   }
 
@@ -228,6 +329,7 @@ public class DefaultValuesWriterFactoryTest {
         WriterVersion.PARQUET_2_0,
         true,
         false,
+        false,
         PlainFixedLenArrayDictionaryValuesWriter.class,
         FixedLenByteArrayPlainValuesWriter.class);
   }
@@ -239,6 +341,7 @@ public class DefaultValuesWriterFactoryTest {
         WriterVersion.PARQUET_2_0,
         false,
         false,
+        false,
         FixedLenByteArrayPlainValuesWriter.class);
   }
 
@@ -249,13 +352,15 @@ public class DefaultValuesWriterFactoryTest {
         WriterVersion.PARQUET_1_0,
         true,
         false,
+        false,
         PlainDoubleDictionaryValuesWriter.class,
         PlainValuesWriter.class);
   }
 
   @Test
   public void testDouble_NoDict() {
-    doTestValueWriter(PrimitiveTypeName.DOUBLE, WriterVersion.PARQUET_1_0, 
false, false, PlainValuesWriter.class);
+    doTestValueWriter(
+        PrimitiveTypeName.DOUBLE, WriterVersion.PARQUET_1_0, false, false, 
false, PlainValuesWriter.class);
   }
 
   @Test
@@ -265,13 +370,15 @@ public class DefaultValuesWriterFactoryTest {
         WriterVersion.PARQUET_2_0,
         true,
         false,
+        false,
         PlainDoubleDictionaryValuesWriter.class,
         PlainValuesWriter.class);
   }
 
   @Test
   public void testDouble_V2_NoDict() {
-    doTestValueWriter(PrimitiveTypeName.DOUBLE, WriterVersion.PARQUET_2_0, 
false, false, PlainValuesWriter.class);
+    doTestValueWriter(
+        PrimitiveTypeName.DOUBLE, WriterVersion.PARQUET_2_0, false, false, 
false, PlainValuesWriter.class);
   }
 
   @Test
@@ -281,13 +388,15 @@ public class DefaultValuesWriterFactoryTest {
         WriterVersion.PARQUET_1_0,
         true,
         false,
+        false,
         PlainFloatDictionaryValuesWriter.class,
         PlainValuesWriter.class);
   }
 
   @Test
   public void testFloat_NoDict() {
-    doTestValueWriter(PrimitiveTypeName.FLOAT, WriterVersion.PARQUET_1_0, 
false, false, PlainValuesWriter.class);
+    doTestValueWriter(
+        PrimitiveTypeName.FLOAT, WriterVersion.PARQUET_1_0, false, false, 
false, PlainValuesWriter.class);
   }
 
   @Test
@@ -297,81 +406,55 @@ public class DefaultValuesWriterFactoryTest {
         WriterVersion.PARQUET_2_0,
         true,
         false,
+        false,
         PlainFloatDictionaryValuesWriter.class,
         PlainValuesWriter.class);
   }
 
   @Test
   public void testFloat_V2_NoDict() {
-    doTestValueWriter(PrimitiveTypeName.FLOAT, WriterVersion.PARQUET_2_0, 
false, false, PlainValuesWriter.class);
+    doTestValueWriter(
+        PrimitiveTypeName.FLOAT, WriterVersion.PARQUET_2_0, false, false, 
false, PlainValuesWriter.class);
   }
 
   @Test
   public void testFloat_V1_WithByteStreamSplit() {
-    doTestValueWriter(
-        PrimitiveTypeName.FLOAT, WriterVersion.PARQUET_1_0, false, true, 
ByteStreamSplitValuesWriter.class);
+    testFloatingPoint_WithByteStreamSplit(PrimitiveTypeName.FLOAT, 
WriterVersion.PARQUET_1_0);
   }
 
   @Test
   public void testDouble_V1_WithByteStreamSplit() {
-    doTestValueWriter(
-        PrimitiveTypeName.DOUBLE, WriterVersion.PARQUET_1_0, false, true, 
ByteStreamSplitValuesWriter.class);
+    testFloatingPoint_WithByteStreamSplit(PrimitiveTypeName.DOUBLE, 
WriterVersion.PARQUET_1_0);
   }
 
   @Test
   public void testFloat_V2_WithByteStreamSplit() {
-    doTestValueWriter(
-        PrimitiveTypeName.FLOAT, WriterVersion.PARQUET_2_0, false, true, 
ByteStreamSplitValuesWriter.class);
+    testFloatingPoint_WithByteStreamSplit(PrimitiveTypeName.FLOAT, 
WriterVersion.PARQUET_2_0);
   }
 
   @Test
   public void testDouble_V2_WithByteStreamSplit() {
-    doTestValueWriter(
-        PrimitiveTypeName.DOUBLE, WriterVersion.PARQUET_2_0, false, true, 
ByteStreamSplitValuesWriter.class);
+    testFloatingPoint_WithByteStreamSplit(PrimitiveTypeName.DOUBLE, 
WriterVersion.PARQUET_2_0);
   }
 
   @Test
   public void testFloat_V1_WithByteStreamSplitAndDictionary() {
-    doTestValueWriter(
-        PrimitiveTypeName.FLOAT,
-        WriterVersion.PARQUET_1_0,
-        true,
-        true,
-        PlainFloatDictionaryValuesWriter.class,
-        ByteStreamSplitValuesWriter.class);
+    
testFloatingPoint_WithByteStreamSplitAndDictionary(PrimitiveTypeName.FLOAT, 
WriterVersion.PARQUET_1_0);
   }
 
   @Test
   public void testDouble_V1_WithByteStreamSplitAndDictionary() {
-    doTestValueWriter(
-        PrimitiveTypeName.DOUBLE,
-        WriterVersion.PARQUET_1_0,
-        true,
-        true,
-        PlainDoubleDictionaryValuesWriter.class,
-        ByteStreamSplitValuesWriter.class);
+    
testFloatingPoint_WithByteStreamSplitAndDictionary(PrimitiveTypeName.DOUBLE, 
WriterVersion.PARQUET_1_0);
   }
 
   @Test
   public void testFloat_V2_WithByteStreamSplitAndDictionary() {
-    doTestValueWriter(
-        PrimitiveTypeName.FLOAT,
-        WriterVersion.PARQUET_2_0,
-        true,
-        true,
-        PlainFloatDictionaryValuesWriter.class,
-        ByteStreamSplitValuesWriter.class);
+    
testFloatingPoint_WithByteStreamSplitAndDictionary(PrimitiveTypeName.FLOAT, 
WriterVersion.PARQUET_2_0);
   }
 
   @Test
   public void testDouble_V2_WithByteStreamSplitAndDictionary() {
-    doTestValueWriter(
-        PrimitiveTypeName.DOUBLE,
-        WriterVersion.PARQUET_2_0,
-        true,
-        true,
-        PlainDoubleDictionaryValuesWriter.class,
-        ByteStreamSplitValuesWriter.class);
+    
testFloatingPoint_WithByteStreamSplitAndDictionary(PrimitiveTypeName.DOUBLE, 
WriterVersion.PARQUET_2_0);
   }
 
   @Test
@@ -414,6 +497,137 @@ public class DefaultValuesWriterFactoryTest {
     validateFactory(factory, INT32, "int32_no_dict", 
DeltaBinaryPackingValuesWriter.class);
   }
 
+  private void testExtendedByteStreamSplit(
+      PrimitiveTypeName typeName,
+      WriterVersion writerVersion,
+      Class<? extends ValuesWriter> defaultFallbackWriterClass) {
+    // cross-column settings
+    doTestValueWriter(
+        createColumnDescriptor(typeName),
+        ParquetProperties.builder()
+            .withWriterVersion(writerVersion)
+            .withExtendedByteStreamSplitEncoding(true)
+            .build(),
+        DictionaryValuesWriter.class,
+        ByteStreamSplitValuesWriter.class);
+    doTestValueWriter(
+        createColumnDescriptor(typeName),
+        ParquetProperties.builder()
+            .withWriterVersion(writerVersion)
+            .withByteStreamSplitEncoding(true)
+            .build(),
+        DictionaryValuesWriter.class,
+        defaultFallbackWriterClass);
+    // per-column settings
+    ParquetProperties properties = ParquetProperties.builder()
+        .withWriterVersion(writerVersion)
+        .withByteStreamSplitEncoding("colA", true)
+        .build();
+    doTestValueWriter(
+        createColumnDescriptor(typeName, "colA"),
+        properties,
+        DictionaryValuesWriter.class,
+        ByteStreamSplitValuesWriter.class);
+    doTestValueWriter(
+        createColumnDescriptor(typeName, "colB"),
+        properties,
+        DictionaryValuesWriter.class,
+        defaultFallbackWriterClass);
+  }
+
+  private void testExtendedByteStreamSplit_NoDict(
+      PrimitiveTypeName typeName, WriterVersion writerVersion, Class<? extends 
ValuesWriter> defaultWriterClass) {
+    // cross-column settings
+    doTestValueWriter(
+        createColumnDescriptor(typeName),
+        ParquetProperties.builder()
+            .withWriterVersion(writerVersion)
+            .withDictionaryEncoding(false)
+            .withExtendedByteStreamSplitEncoding(true)
+            .build(),
+        ByteStreamSplitValuesWriter.class);
+    doTestValueWriter(
+        createColumnDescriptor(typeName),
+        ParquetProperties.builder()
+            .withWriterVersion(writerVersion)
+            .withDictionaryEncoding(false)
+            .withByteStreamSplitEncoding(true)
+            .build(),
+        defaultWriterClass);
+    // per-column settings
+    ParquetProperties properties = ParquetProperties.builder()
+        .withWriterVersion(writerVersion)
+        .withDictionaryEncoding(false)
+        .withByteStreamSplitEncoding("colA", true)
+        .build();
+    doTestValueWriter(createColumnDescriptor(typeName, "colA"), properties, 
ByteStreamSplitValuesWriter.class);
+    doTestValueWriter(createColumnDescriptor(typeName, "colB"), properties, 
defaultWriterClass);
+  }
+
+  private void testFloatingPoint_WithByteStreamSplit(PrimitiveTypeName 
typeName, WriterVersion writerVersion) {
+    // With cross-column settings
+    doTestValueWriter(
+        createColumnDescriptor(typeName),
+        ParquetProperties.builder()
+            .withWriterVersion(writerVersion)
+            .withDictionaryEncoding(false)
+            .withByteStreamSplitEncoding(true)
+            .build(),
+        ByteStreamSplitValuesWriter.class);
+    doTestValueWriter(
+        createColumnDescriptor(typeName),
+        ParquetProperties.builder()
+            .withWriterVersion(writerVersion)
+            .withDictionaryEncoding(false)
+            .withExtendedByteStreamSplitEncoding(true)
+            .build(),
+        ByteStreamSplitValuesWriter.class);
+    // With per-column settings
+    ParquetProperties properties = ParquetProperties.builder()
+        .withWriterVersion(writerVersion)
+        .withDictionaryEncoding(false)
+        .withByteStreamSplitEncoding("colA", true)
+        .build();
+    doTestValueWriter(createColumnDescriptor(typeName, "colA"), properties, 
ByteStreamSplitValuesWriter.class);
+    doTestValueWriter(createColumnDescriptor(typeName, "colB"), properties, 
PlainValuesWriter.class);
+  }
+
+  private void testFloatingPoint_WithByteStreamSplitAndDictionary(
+      PrimitiveTypeName typeName, WriterVersion writerVersion) {
+    // With cross-column settings
+    doTestValueWriter(
+        createColumnDescriptor(typeName),
+        ParquetProperties.builder()
+            .withWriterVersion(writerVersion)
+            .withByteStreamSplitEncoding(true)
+            .build(),
+        DictionaryValuesWriter.class,
+        ByteStreamSplitValuesWriter.class);
+    doTestValueWriter(
+        createColumnDescriptor(typeName),
+        ParquetProperties.builder()
+            .withWriterVersion(writerVersion)
+            .withExtendedByteStreamSplitEncoding(true)
+            .build(),
+        DictionaryValuesWriter.class,
+        ByteStreamSplitValuesWriter.class);
+    // With per-column settings
+    ParquetProperties properties = ParquetProperties.builder()
+        .withWriterVersion(writerVersion)
+        .withByteStreamSplitEncoding("colA", true)
+        .build();
+    doTestValueWriter(
+        createColumnDescriptor(typeName, "colA"),
+        properties,
+        DictionaryValuesWriter.class,
+        ByteStreamSplitValuesWriter.class);
+    doTestValueWriter(
+        createColumnDescriptor(typeName, "colB"),
+        properties,
+        DictionaryValuesWriter.class,
+        PlainValuesWriter.class);
+  }
+
   private void validateFactory(
       ValuesWriterFactory factory,
       PrimitiveTypeName typeName,
@@ -440,10 +654,28 @@ public class DefaultValuesWriterFactoryTest {
       WriterVersion version,
       boolean enableDictionary,
       boolean enableByteStreamSplit,
+      boolean enableExtendedByteStreamSplit,
       Class<? extends ValuesWriter> expectedValueWriterClass) {
     ColumnDescriptor mockPath = createColumnDescriptor(typeName);
-    ValuesWriterFactory factory = getDefaultFactory(version, enableDictionary, 
enableByteStreamSplit);
-    ValuesWriter writer = factory.newValuesWriter(mockPath);
+    doTestValueWriter(
+        mockPath,
+        version,
+        enableDictionary,
+        enableByteStreamSplit,
+        enableExtendedByteStreamSplit,
+        expectedValueWriterClass);
+  }
+
+  private void doTestValueWriter(
+      ColumnDescriptor path,
+      WriterVersion version,
+      boolean enableDictionary,
+      boolean enableByteStreamSplit,
+      boolean enableExtendedByteStreamSplit,
+      Class<? extends ValuesWriter> expectedValueWriterClass) {
+    ValuesWriterFactory factory =
+        getDefaultFactory(version, enableDictionary, enableByteStreamSplit, 
enableExtendedByteStreamSplit);
+    ValuesWriter writer = factory.newValuesWriter(path);
 
     validateWriterType(writer, expectedValueWriterClass);
   }
@@ -453,30 +685,83 @@ public class DefaultValuesWriterFactoryTest {
       WriterVersion version,
       boolean enableDictionary,
       boolean enableByteStreamSplit,
+      boolean enableExtendedByteStreamSplit,
       Class<? extends ValuesWriter> initialValueWriterClass,
       Class<? extends ValuesWriter> fallbackValueWriterClass) {
     ColumnDescriptor mockPath = createColumnDescriptor(typeName);
-    ValuesWriterFactory factory = getDefaultFactory(version, enableDictionary, 
enableByteStreamSplit);
-    ValuesWriter writer = factory.newValuesWriter(mockPath);
+    doTestValueWriter(
+        mockPath,
+        version,
+        enableDictionary,
+        enableByteStreamSplit,
+        enableExtendedByteStreamSplit,
+        initialValueWriterClass,
+        fallbackValueWriterClass);
+  }
+
+  private void doTestValueWriter(
+      ColumnDescriptor path,
+      WriterVersion version,
+      boolean enableDictionary,
+      boolean enableByteStreamSplit,
+      boolean enableExtendedByteStreamSplit,
+      Class<? extends ValuesWriter> initialValueWriterClass,
+      Class<? extends ValuesWriter> fallbackValueWriterClass) {
+    ValuesWriterFactory factory =
+        getDefaultFactory(version, enableDictionary, enableByteStreamSplit, 
enableExtendedByteStreamSplit);
+    ValuesWriter writer = factory.newValuesWriter(path);
 
     validateFallbackWriter(writer, initialValueWriterClass, 
fallbackValueWriterClass);
   }
 
+  private void doTestValueWriter(
+      ColumnDescriptor path,
+      ParquetProperties properties,
+      Class<? extends ValuesWriter> initialValueWriterClass,
+      Class<? extends ValuesWriter> fallbackValueWriterClass) {
+    ValuesWriterFactory factory = getDefaultFactory(properties);
+    ValuesWriter writer = factory.newValuesWriter(path);
+    validateFallbackWriter(writer, initialValueWriterClass, 
fallbackValueWriterClass);
+  }
+
+  private void doTestValueWriter(
+      ColumnDescriptor path,
+      ParquetProperties properties,
+      Class<? extends ValuesWriter> expectedValueWriterClass) {
+    ValuesWriterFactory factory = getDefaultFactory(properties);
+    ValuesWriter writer = factory.newValuesWriter(path);
+    validateWriterType(writer, expectedValueWriterClass);
+  }
+
   private ColumnDescriptor createColumnDescriptor(PrimitiveTypeName typeName) {
-    return createColumnDescriptor(typeName, "fake_" + 
typeName.name().toLowerCase() + "_col");
+    return createColumnDescriptor(typeName, (LogicalTypeAnnotation) null);
+  }
+
+  private ColumnDescriptor createColumnDescriptor(PrimitiveTypeName typeName, 
LogicalTypeAnnotation logicalType) {
+    return createColumnDescriptor(typeName, "fake_" + 
typeName.name().toLowerCase() + "_col", logicalType);
   }
 
   private ColumnDescriptor createColumnDescriptor(PrimitiveTypeName typeName, 
String name) {
-    return new ColumnDescriptor(
-        new String[] {name}, required(typeName).length(1).named(name), 0, 0);
+    return createColumnDescriptor(typeName, name, null);
+  }
+
+  private ColumnDescriptor createColumnDescriptor(
+      PrimitiveTypeName typeName, String name, LogicalTypeAnnotation 
logicalType) {
+    PrimitiveType type = 
required(typeName).length(1).named(name).withLogicalTypeAnnotation(logicalType);
+    return new ColumnDescriptor(new String[] {name}, type, 0, 0);
   }
 
   private ValuesWriterFactory getDefaultFactory(
-      WriterVersion writerVersion, boolean enableDictionary, boolean 
enableByteStreamSplit) {
+      WriterVersion writerVersion,
+      boolean enableDictionary,
+      boolean enableByteStreamSplit,
+      boolean enableExtendedByteStreamSplit) {
     ValuesWriterFactory factory = new DefaultValuesWriterFactory();
+    // Initialize factory with the given properties
     ParquetProperties.builder()
         .withDictionaryEncoding(enableDictionary)
         .withByteStreamSplitEncoding(enableByteStreamSplit)
+        .withExtendedByteStreamSplitEncoding(enableExtendedByteStreamSplit)
         .withWriterVersion(writerVersion)
         .withValuesWriterFactory(factory)
         .build();
@@ -484,6 +769,12 @@ public class DefaultValuesWriterFactoryTest {
     return factory;
   }
 
+  private ValuesWriterFactory getDefaultFactory(ParquetProperties properties) {
+    ValuesWriterFactory factory = new DefaultValuesWriterFactory();
+    factory.initialize(properties);
+    return factory;
+  }
+
   private ValuesWriterFactory getDefaultFactory(
       WriterVersion writerVersion, boolean dictEnabledDefault, String... 
dictInverseColumns) {
     ValuesWriterFactory factory = new DefaultValuesWriterFactory();
@@ -500,7 +791,10 @@ public class DefaultValuesWriterFactoryTest {
   }
 
   private void validateWriterType(ValuesWriter writer, Class<? extends 
ValuesWriter> valuesWriterClass) {
-    assertTrue("Not instance of: " + valuesWriterClass.getName(), 
valuesWriterClass.isInstance(writer));
+    assertTrue(
+        "Not instance of " + valuesWriterClass.getName() + ": actual class is "
+            + writer.getClass().getName(),
+        valuesWriterClass.isInstance(writer));
   }
 
   private void validateFallbackWriter(
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInterOpReadByteStreamSplit.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInterOpReadByteStreamSplit.java
index 6e7b1581c..73ae79922 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInterOpReadByteStreamSplit.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInterOpReadByteStreamSplit.java
@@ -20,18 +20,22 @@
 package org.apache.parquet.hadoop;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroup;
 import org.apache.parquet.hadoop.example.GroupReadSupport;
 import org.junit.Test;
 
 public class TestInterOpReadByteStreamSplit {
-  private InterOpTester interop = new InterOpTester();
   private static final String FLOATS_FILE = "byte_stream_split.zstd.parquet";
-  private static final String CHANGESET = "4cb3cff";
+  private static final String EXTENDED_FILE = 
"byte_stream_split_extended.gzip.parquet";
+  private static final String CHANGESET = "74278bc";
+  private final InterOpTester interop = new InterOpTester();
 
   @Test
   public void testReadFloats() throws IOException {
@@ -42,7 +46,7 @@ public class TestInterOpReadByteStreamSplit {
         ParquetReader.builder(new GroupReadSupport(), floatsFile).build()) {
       for (int i = 0; i < expectRows; ++i) {
         Group group = reader.read();
-        assertTrue(group != null);
+        assertNotNull(group);
         float fval = group.getFloat("f32", 0);
         double dval = group.getDouble("f64", 0);
         // Values are from the normal distribution
@@ -67,7 +71,34 @@ public class TestInterOpReadByteStreamSplit {
             break;
         }
       }
-      assertTrue(reader.read() == null);
+      assertNull(reader.read());
     }
   }
+
+  private void compareColumnValues(Path path, int expectRows, String leftCol, 
String rightCol) throws IOException {
+    try (ParquetReader<Group> reader =
+        ParquetReader.builder(new GroupReadSupport(), path).build()) {
+      for (int i = 0; i < expectRows; ++i) {
+        SimpleGroup group = (SimpleGroup) reader.read();
+        assertNotNull(group);
+        Object left = group.getObject(leftCol, 0);
+        Object right = group.getObject(rightCol, 0);
+        assertEquals(left, right);
+      }
+      assertNull(reader.read());
+    }
+  }
+
+  @Test
+  public void testReadAllSupportedTypes() throws IOException {
+    Path extendedFile = interop.GetInterOpFile(EXTENDED_FILE, CHANGESET);
+    final int expectRows = 200;
+    compareColumnValues(extendedFile, expectRows, "float_plain", 
"float_byte_stream_split");
+    compareColumnValues(extendedFile, expectRows, "double_plain", 
"double_byte_stream_split");
+    compareColumnValues(extendedFile, expectRows, "int32_plain", 
"int32_byte_stream_split");
+    compareColumnValues(extendedFile, expectRows, "int64_plain", 
"int64_byte_stream_split");
+    compareColumnValues(extendedFile, expectRows, "float16_plain", 
"float16_byte_stream_split");
+    compareColumnValues(extendedFile, expectRows, "flba5_plain", 
"flba5_byte_stream_split");
+    compareColumnValues(extendedFile, expectRows, "decimal_plain", 
"decimal_byte_stream_split");
+  }
 }
diff --git a/pom.xml b/pom.xml
index 4554c48f9..a5625da6e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -595,6 +595,8 @@
               
<exclude>org.apache.parquet.hadoop.thrift.TBaseWriteSupport#setThriftClass(org.apache.parquet.conf.ParquetConfiguration,java.lang.Class)</exclude>
               
<exclude>org.apache.parquet.proto.ProtoParquetReader#builder(org.apache.hadoop.fs.Path,boolean)</exclude>
               
<exclude>org.apache.parquet.proto.ProtoParquetReader#builder(org.apache.parquet.io.InputFile,boolean)</exclude>
+              <!-- removal of a protected method in a class that's not 
supposed to be subclassed by third-party code -->
+              
<exclude>org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReader#gatherElementDataFromStreams(byte[])</exclude>
 
               <!-- Due to the removal of deprecated methods -->
               <exclude>org.apache.parquet.arrow.schema.SchemaMapping</exclude>


Reply via email to