This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-java.git


The following commit(s) were added to refs/heads/master by this push:
     new 41e294c7f GH-3122: Correct V2 page header compression fields for 
zero-size data pages (#3148)
41e294c7f is described below

commit 41e294c7f7c49c6c509d9918799bf679481848ab
Author: Xianyang Liu <[email protected]>
AuthorDate: Tue Mar 4 22:41:05 2025 +0800

    GH-3122: Correct V2 page header compression fields for zero-size data pages 
(#3148)
---
 .../org/apache/parquet/crypto/AesCtrDecryptor.java |   4 +-
 .../org/apache/parquet/crypto/AesGcmDecryptor.java |   4 +-
 .../format/converter/ParquetMetadataConverter.java | 131 ++++++++++++++---
 .../parquet/hadoop/ColumnChunkPageWriteStore.java  |  11 +-
 .../apache/parquet/hadoop/ParquetFileWriter.java   | 158 ++++++++++++++++++++-
 .../parquet/hadoop/rewrite/ParquetRewriter.java    |   1 +
 .../parquet/hadoop/TestParquetFileWriter.java      |   8 +-
 .../apache/parquet/hadoop/TestParquetWriter.java   | 115 ++++++++++++++-
 8 files changed, 394 insertions(+), 38 deletions(-)

diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java
index 2735f63bf..afc5054b1 100755
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java
@@ -55,7 +55,7 @@ public class AesCtrDecryptor extends AesCipher implements 
BlockCipher.Decryptor
   public byte[] decrypt(byte[] ciphertext, int cipherTextOffset, int 
cipherTextLength, byte[] AAD) {
 
     int plainTextLength = cipherTextLength - NONCE_LENGTH;
-    if (plainTextLength < 1) {
+    if (plainTextLength < 0) {
       throw new ParquetCryptoRuntimeException("Wrong input length " + 
plainTextLength);
     }
 
@@ -91,7 +91,7 @@ public class AesCtrDecryptor extends AesCipher implements 
BlockCipher.Decryptor
     int cipherTextLength = ciphertext.limit() - ciphertext.position() - 
SIZE_LENGTH;
 
     int plainTextLength = cipherTextLength - NONCE_LENGTH;
-    if (plainTextLength < 1) {
+    if (plainTextLength < 0) {
       throw new ParquetCryptoRuntimeException("Wrong input length " + 
plainTextLength);
     }
 
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java
index dc378effc..dc75d7e2a 100755
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java
@@ -51,7 +51,7 @@ public class AesGcmDecryptor extends AesCipher implements 
BlockCipher.Decryptor
   public byte[] decrypt(byte[] ciphertext, int cipherTextOffset, int 
cipherTextLength, byte[] AAD) {
 
     int plainTextLength = cipherTextLength - GCM_TAG_LENGTH - NONCE_LENGTH;
-    if (plainTextLength < 1) {
+    if (plainTextLength < 0) {
       throw new ParquetCryptoRuntimeException("Wrong input length " + 
plainTextLength);
     }
 
@@ -81,7 +81,7 @@ public class AesGcmDecryptor extends AesCipher implements 
BlockCipher.Decryptor
     int cipherTextOffset = SIZE_LENGTH;
     int cipherTextLength = ciphertext.limit() - ciphertext.position() - 
SIZE_LENGTH;
     int plainTextLength = cipherTextLength - GCM_TAG_LENGTH - NONCE_LENGTH;
-    if (plainTextLength < 1) {
+    if (plainTextLength < 0) {
       throw new ParquetCryptoRuntimeException("Wrong input length " + 
plainTextLength);
     }
 
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index 3c38f04af..87797d1fa 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -1993,7 +1993,8 @@ public class ParquetMetadataConverter {
             rowCount,
             dataEncoding,
             rlByteLength,
-            dlByteLength),
+            dlByteLength,
+            true /* compressed by default */),
         to);
   }
 
@@ -2071,6 +2072,10 @@ public class ParquetMetadataConverter {
         pageHeaderAAD);
   }
 
+  /**
+   * @deprecated will be removed in 2.0.0. Use {@link 
ParquetMetadataConverter#writeDataPageV2Header(int, int, int, int, int, 
org.apache.parquet.column.Encoding, int, int, boolean, OutputStream)} instead
+   */
+  @Deprecated
   public void writeDataPageV2Header(
       int uncompressedSize,
       int compressedSize,
@@ -2091,11 +2096,16 @@ public class ParquetMetadataConverter {
         dataEncoding,
         rlByteLength,
         dlByteLength,
+        true, /* compressed by default */
         to,
         null,
         null);
   }
 
+  /**
+   * @deprecated will be removed in 2.0.0. Use {@link 
ParquetMetadataConverter#writeDataPageV2Header(int, int, int, int, int, 
org.apache.parquet.column.Encoding, int, int, boolean, OutputStream, 
BlockCipher.Encryptor, byte[])} instead
+   */
+  @Deprecated
   public void writeDataPageV2Header(
       int uncompressedSize,
       int compressedSize,
@@ -2109,22 +2119,26 @@ public class ParquetMetadataConverter {
       BlockCipher.Encryptor blockEncryptor,
       byte[] pageHeaderAAD)
       throws IOException {
-    writePageHeader(
-        newDataPageV2Header(
-            uncompressedSize,
-            compressedSize,
-            valueCount,
-            nullCount,
-            rowCount,
-            dataEncoding,
-            rlByteLength,
-            dlByteLength),
+    writeDataPageV2Header(
+        uncompressedSize,
+        compressedSize,
+        valueCount,
+        nullCount,
+        rowCount,
+        dataEncoding,
+        rlByteLength,
+        dlByteLength,
+        true, /* compressed by default */
         to,
         blockEncryptor,
         pageHeaderAAD);
   }
 
-  private PageHeader newDataPageV2Header(
+  /**
+   * @deprecated will be removed in 2.0.0. Use {@link 
ParquetMetadataConverter#writeDataPageV2Header(int, int, int, int, int, 
org.apache.parquet.column.Encoding, int, int, boolean, int, OutputStream, 
BlockCipher.Encryptor, byte[])} instead
+   */
+  @Deprecated
+  public void writeDataPageV2Header(
       int uncompressedSize,
       int compressedSize,
       int valueCount,
@@ -2132,12 +2146,26 @@ public class ParquetMetadataConverter {
       int rowCount,
       org.apache.parquet.column.Encoding dataEncoding,
       int rlByteLength,
-      int dlByteLength) {
-    DataPageHeaderV2 dataPageHeaderV2 = new DataPageHeaderV2(
-        valueCount, nullCount, rowCount, getEncoding(dataEncoding), 
dlByteLength, rlByteLength);
-    PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE_V2, 
uncompressedSize, compressedSize);
-    pageHeader.setData_page_header_v2(dataPageHeaderV2);
-    return pageHeader;
+      int dlByteLength,
+      int crc,
+      OutputStream to,
+      BlockCipher.Encryptor blockEncryptor,
+      byte[] pageHeaderAAD)
+      throws IOException {
+    writeDataPageV2Header(
+        uncompressedSize,
+        compressedSize,
+        valueCount,
+        nullCount,
+        rowCount,
+        dataEncoding,
+        rlByteLength,
+        dlByteLength,
+        true, /* compressed by default */
+        crc,
+        to,
+        blockEncryptor,
+        pageHeaderAAD);
   }
 
   public void writeDataPageV2Header(
@@ -2149,7 +2177,34 @@ public class ParquetMetadataConverter {
       org.apache.parquet.column.Encoding dataEncoding,
       int rlByteLength,
       int dlByteLength,
-      int crc,
+      boolean compressed,
+      OutputStream to)
+      throws IOException {
+    writeDataPageV2Header(
+        uncompressedSize,
+        compressedSize,
+        valueCount,
+        nullCount,
+        rowCount,
+        dataEncoding,
+        rlByteLength,
+        dlByteLength,
+        compressed,
+        to,
+        null,
+        null);
+  }
+
+  public void writeDataPageV2Header(
+      int uncompressedSize,
+      int compressedSize,
+      int valueCount,
+      int nullCount,
+      int rowCount,
+      org.apache.parquet.column.Encoding dataEncoding,
+      int rlByteLength,
+      int dlByteLength,
+      boolean compressed,
       OutputStream to,
       BlockCipher.Encryptor blockEncryptor,
       byte[] pageHeaderAAD)
@@ -2164,12 +2219,43 @@ public class ParquetMetadataConverter {
             dataEncoding,
             rlByteLength,
             dlByteLength,
-            crc),
+            compressed),
         to,
         blockEncryptor,
         pageHeaderAAD);
   }
 
+  public void writeDataPageV2Header(
+      int uncompressedSize,
+      int compressedSize,
+      int valueCount,
+      int nullCount,
+      int rowCount,
+      org.apache.parquet.column.Encoding dataEncoding,
+      int rlByteLength,
+      int dlByteLength,
+      boolean compressed,
+      int crc,
+      OutputStream to,
+      BlockCipher.Encryptor blockEncryptor,
+      byte[] pageHeaderAAD)
+      throws IOException {
+    PageHeader pageHeader = newDataPageV2Header(
+        uncompressedSize,
+        compressedSize,
+        valueCount,
+        nullCount,
+        rowCount,
+        dataEncoding,
+        rlByteLength,
+        dlByteLength,
+        compressed);
+
+    pageHeader.setCrc(crc);
+
+    writePageHeader(pageHeader, to, blockEncryptor, pageHeaderAAD);
+  }
+
   private PageHeader newDataPageV2Header(
       int uncompressedSize,
       int compressedSize,
@@ -2179,12 +2265,13 @@ public class ParquetMetadataConverter {
       org.apache.parquet.column.Encoding dataEncoding,
       int rlByteLength,
       int dlByteLength,
-      int crc) {
+      boolean compressed) {
     DataPageHeaderV2 dataPageHeaderV2 = new DataPageHeaderV2(
         valueCount, nullCount, rowCount, getEncoding(dataEncoding), 
dlByteLength, rlByteLength);
+    dataPageHeaderV2.setIs_compressed(compressed);
+
     PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE_V2, 
uncompressedSize, compressedSize);
     pageHeader.setData_page_header_v2(dataPageHeaderV2);
-    pageHeader.setCrc(crc);
     return pageHeader;
   }
 
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
index 795063e5c..566ab76cc 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -295,8 +295,13 @@ public class ColumnChunkPageWriteStore implements 
PageWriteStore, BloomFilterWri
       int rlByteLength = toIntWithCheck(repetitionLevels.size());
       int dlByteLength = toIntWithCheck(definitionLevels.size());
       int uncompressedSize = toIntWithCheck(data.size() + 
repetitionLevels.size() + definitionLevels.size());
-      // TODO: decide if we compress
-      BytesInput compressedData = compressor.compress(data);
+      boolean compressed = false;
+      BytesInput compressedData = BytesInput.empty();
+      if (data.size() > 0) {
+        // TODO: decide if we compress
+        compressedData = compressor.compress(data);
+        compressed = true;
+      }
       if (null != pageBlockEncryptor) {
         AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
         compressedData = 
BytesInput.from(pageBlockEncryptor.encrypt(compressedData.toByteArray(), 
dataPageAAD));
@@ -327,6 +332,7 @@ public class ColumnChunkPageWriteStore implements 
PageWriteStore, BloomFilterWri
             dataEncoding,
             rlByteLength,
             dlByteLength,
+            compressed,
             (int) crc.getValue(),
             tempOutputStream,
             headerBlockEncryptor,
@@ -341,6 +347,7 @@ public class ColumnChunkPageWriteStore implements 
PageWriteStore, BloomFilterWri
             dataEncoding,
             rlByteLength,
             dlByteLength,
+            compressed,
             tempOutputStream,
             headerBlockEncryptor,
             dataPageHeaderAAD);
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index f0a912f59..9b2f7f445 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -1044,7 +1044,9 @@ public class ParquetFileWriter implements AutoCloseable {
    * @param uncompressedDataSize the size of uncompressed data
    * @param statistics           the statistics of the page
    * @throws IOException if any I/O error occurs during writing the file
+   * @deprecated will be removed in 2.0.0. Use {@link 
ParquetFileWriter#writeDataPageV2(int, int, int, BytesInput, BytesInput, 
Encoding, BytesInput, boolean, int, Statistics)} instead
    */
+  @Deprecated
   public void writeDataPageV2(
       int rowCount,
       int nullCount,
@@ -1064,6 +1066,50 @@ public class ParquetFileWriter implements AutoCloseable {
         definitionLevels,
         dataEncoding,
         compressedData,
+        true, /* compressed by default */
+        uncompressedDataSize,
+        statistics,
+        null,
+        null,
+        null);
+  }
+
+  /**
+   * Writes a single v2 data page
+   *
+   * @param rowCount             count of rows
+   * @param nullCount            count of nulls
+   * @param valueCount           count of values
+   * @param repetitionLevels     repetition level bytes
+   * @param definitionLevels     definition level bytes
+   * @param dataEncoding         encoding for data
+   * @param bytes                data bytes
+   * @param compressed           whether the data bytes is compressed
+   * @param uncompressedDataSize the size of uncompressed data
+   * @param statistics           the statistics of the page
+   * @throws IOException if any I/O error occurs during writing the file
+   */
+  public void writeDataPageV2(
+      int rowCount,
+      int nullCount,
+      int valueCount,
+      BytesInput repetitionLevels,
+      BytesInput definitionLevels,
+      Encoding dataEncoding,
+      BytesInput bytes,
+      boolean compressed,
+      int uncompressedDataSize,
+      Statistics<?> statistics)
+      throws IOException {
+    writeDataPageV2(
+        rowCount,
+        nullCount,
+        valueCount,
+        repetitionLevels,
+        definitionLevels,
+        dataEncoding,
+        bytes,
+        compressed,
         uncompressedDataSize,
         statistics,
         null,
@@ -1086,7 +1132,9 @@ public class ParquetFileWriter implements AutoCloseable {
    * @param metadataBlockEncryptor encryptor for block data
    * @param pageHeaderAAD          pageHeader AAD
    * @throws IOException if any I/O error occurs during writing the file
+   * @deprecated will be removed in 2.0.0. Use {@link 
ParquetFileWriter#writeDataPageV2(int, int, int, BytesInput, BytesInput, 
Encoding, BytesInput, boolean, int, Statistics, BlockCipher.Encryptor, byte[])} 
instead
    */
+  @Deprecated
   public void writeDataPageV2(
       int rowCount,
       int nullCount,
@@ -1108,6 +1156,54 @@ public class ParquetFileWriter implements AutoCloseable {
         definitionLevels,
         dataEncoding,
         compressedData,
+        true, /* compressed by default */
+        uncompressedDataSize,
+        statistics,
+        metadataBlockEncryptor,
+        pageHeaderAAD,
+        null);
+  }
+
+  /**
+   * Writes a single v2 data page
+   *
+   * @param rowCount               count of rows
+   * @param nullCount              count of nulls
+   * @param valueCount             count of values
+   * @param repetitionLevels       repetition level bytes
+   * @param definitionLevels       definition level bytes
+   * @param dataEncoding           encoding for data
+   * @param bytes                  data bytes
+   * @param compressed             whether the data bytes is compressed
+   * @param uncompressedDataSize   the size of uncompressed data
+   * @param statistics             the statistics of the page
+   * @param metadataBlockEncryptor encryptor for block data
+   * @param pageHeaderAAD          pageHeader AAD
+   * @throws IOException if any I/O error occurs during writing the file
+   */
+  public void writeDataPageV2(
+      int rowCount,
+      int nullCount,
+      int valueCount,
+      BytesInput repetitionLevels,
+      BytesInput definitionLevels,
+      Encoding dataEncoding,
+      BytesInput bytes,
+      boolean compressed,
+      int uncompressedDataSize,
+      Statistics<?> statistics,
+      BlockCipher.Encryptor metadataBlockEncryptor,
+      byte[] pageHeaderAAD)
+      throws IOException {
+    writeDataPageV2(
+        rowCount,
+        nullCount,
+        valueCount,
+        repetitionLevels,
+        definitionLevels,
+        dataEncoding,
+        bytes,
+        compressed,
         uncompressedDataSize,
         statistics,
         metadataBlockEncryptor,
@@ -1131,7 +1227,9 @@ public class ParquetFileWriter implements AutoCloseable {
    * @param pageHeaderAAD pageHeader AAD
    * @param sizeStatistics size statistics for the page
    * @throws IOException if any I/O error occurs during writing the file
+   * @deprecated will be removed in 2.0.0. Use {@link 
ParquetFileWriter#writeDataPageV2(int, int, int, BytesInput, BytesInput, 
Encoding, BytesInput, boolean, int, Statistics, BlockCipher.Encryptor, byte[], 
SizeStatistics)} instead
    */
+  @Deprecated
   public void writeDataPageV2(
       int rowCount,
       int nullCount,
@@ -1146,12 +1244,60 @@ public class ParquetFileWriter implements AutoCloseable 
{
       byte[] pageHeaderAAD,
       SizeStatistics sizeStatistics)
       throws IOException {
+    writeDataPageV2(
+        rowCount,
+        nullCount,
+        valueCount,
+        repetitionLevels,
+        definitionLevels,
+        dataEncoding,
+        compressedData,
+        true, /* compressed by default */
+        uncompressedDataSize,
+        statistics,
+        metadataBlockEncryptor,
+        pageHeaderAAD,
+        sizeStatistics);
+  }
+
+  /**
+   * Writes a single v2 data page
+   *
+   * @param rowCount count of rows
+   * @param nullCount count of nulls
+   * @param valueCount count of values
+   * @param repetitionLevels repetition level bytes
+   * @param definitionLevels definition level bytes
+   * @param dataEncoding encoding for data
+   * @param bytes data bytes
+   * @param compressed whether the data bytes is compressed
+   * @param uncompressedDataSize the size of uncompressed data
+   * @param statistics the statistics of the page
+   * @param metadataBlockEncryptor encryptor for block data
+   * @param pageHeaderAAD pageHeader AAD
+   * @param sizeStatistics size statistics for the page
+   * @throws IOException if any I/O error occurs during writing the file
+   */
+  public void writeDataPageV2(
+      int rowCount,
+      int nullCount,
+      int valueCount,
+      BytesInput repetitionLevels,
+      BytesInput definitionLevels,
+      Encoding dataEncoding,
+      BytesInput bytes,
+      boolean compressed,
+      int uncompressedDataSize,
+      Statistics<?> statistics,
+      BlockCipher.Encryptor metadataBlockEncryptor,
+      byte[] pageHeaderAAD,
+      SizeStatistics sizeStatistics)
+      throws IOException {
     state = state.write();
     int rlByteLength = toIntWithCheck(repetitionLevels.size(), "page 
repetition levels");
     int dlByteLength = toIntWithCheck(definitionLevels.size(), "page 
definition levels");
 
-    int compressedSize =
-        toIntWithCheck(compressedData.size() + repetitionLevels.size() + 
definitionLevels.size(), "page");
+    int compressedSize = toIntWithCheck(bytes.size() + repetitionLevels.size() 
+ definitionLevels.size(), "page");
 
     int uncompressedSize =
         toIntWithCheck(uncompressedDataSize + repetitionLevels.size() + 
definitionLevels.size(), "page");
@@ -1169,8 +1315,8 @@ public class ParquetFileWriter implements AutoCloseable {
       if (definitionLevels.size() > 0) {
         crcUpdate(definitionLevels);
       }
-      if (compressedData.size() > 0) {
-        crcUpdate(compressedData);
+      if (bytes.size() > 0) {
+        crcUpdate(bytes);
       }
       metadataConverter.writeDataPageV2Header(
           uncompressedSize,
@@ -1181,6 +1327,7 @@ public class ParquetFileWriter implements AutoCloseable {
           dataEncoding,
           rlByteLength,
           dlByteLength,
+          compressed,
           (int) crc.getValue(),
           out,
           metadataBlockEncryptor,
@@ -1195,6 +1342,7 @@ public class ParquetFileWriter implements AutoCloseable {
           dataEncoding,
           rlByteLength,
           dlByteLength,
+          compressed,
           out,
           metadataBlockEncryptor,
           pageHeaderAAD);
@@ -1209,7 +1357,7 @@ public class ParquetFileWriter implements AutoCloseable {
     currentEncodings.add(dataEncoding);
     encodingStatsBuilder.addDataEncoding(dataEncoding);
 
-    BytesInput.concat(repetitionLevels, definitionLevels, 
compressedData).writeAllTo(out);
+    BytesInput.concat(repetitionLevels, definitionLevels, 
bytes).writeAllTo(out);
 
     offsetIndexBuilder.add(
         toIntWithCheck(out.getPos() - beforeHeader, "page"),
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
index 9535b4335..10c84731f 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
@@ -762,6 +762,7 @@ public class ParquetRewriter implements Closeable {
               dlLevels,
               converter.getEncoding(headerV2.getEncoding()),
               BytesInput.from(pageLoad),
+              headerV2.is_compressed,
               rawDataLength,
               statistics,
               metaEncryptor,
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index c6be72ff7..3126e1746 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -513,15 +513,15 @@ public class TestParquetFileWriter {
 
     w.startColumn(C1, 6, CODEC);
     long c1Starts = w.getPos();
-    w.writeDataPageV2(4, 1, 3, repLevels, defLevels, PLAIN, data, 4, 
statsC1P1);
-    w.writeDataPageV2(3, 0, 3, repLevels, defLevels, PLAIN, data, 4, 
statsC1P2);
+    w.writeDataPageV2(4, 1, 3, repLevels, defLevels, PLAIN, data, false, 4, 
statsC1P1);
+    w.writeDataPageV2(3, 0, 3, repLevels, defLevels, PLAIN, data, false, 4, 
statsC1P2);
     w.endColumn();
     long c1Ends = w.getPos();
 
     w.startColumn(C2, 5, CODEC);
     long c2Starts = w.getPos();
-    w.writeDataPageV2(5, 2, 3, repLevels, defLevels, PLAIN, data2, 4, 
EMPTY_STATS);
-    w.writeDataPageV2(2, 0, 2, repLevels, defLevels, PLAIN, data2, 4, 
EMPTY_STATS);
+    w.writeDataPageV2(5, 2, 3, repLevels, defLevels, PLAIN, data2, false, 4, 
EMPTY_STATS);
+    w.writeDataPageV2(2, 0, 2, repLevels, defLevels, PLAIN, data2, false, 4, 
EMPTY_STATS);
     w.endColumn();
     long c2Ends = w.getPos();
 
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
index 64001bcaf..c8e8f71a9 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
@@ -33,9 +33,11 @@ import static 
org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
 import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
 import static 
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
 import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -51,20 +53,32 @@ import net.openhft.hashing.LongHashFunction;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.bytes.HeapByteBufferAllocator;
 import org.apache.parquet.bytes.TrackingByteBufferAllocator;
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.ParquetProperties.WriterVersion;
 import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.DecryptionKeyRetrieverMock;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.InternalColumnDecryptionSetup;
+import org.apache.parquet.crypto.InternalFileDecryptor;
+import org.apache.parquet.crypto.ModuleCipherFactory;
 import org.apache.parquet.example.data.Group;
 import org.apache.parquet.example.data.GroupFactory;
 import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.Util;
 import org.apache.parquet.hadoop.example.ExampleParquetWriter;
 import org.apache.parquet.hadoop.example.GroupReadSupport;
 import org.apache.parquet.hadoop.example.GroupWriteSupport;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.hadoop.util.HadoopInputFile;
 import org.apache.parquet.hadoop.util.HadoopOutputFile;
@@ -232,7 +246,7 @@ public class TestParquetWriter {
           return null;
         });
 
-    Assert.assertFalse("Should not create a file when schema is rejected", 
file.exists());
+    assertFalse("Should not create a file when schema is rejected", 
file.exists());
   }
 
   // Testing the issue of PARQUET-1531 where writing null nested rows leads to 
empty pages if the page row count limit
@@ -592,4 +606,103 @@ public class TestParquetWriter {
       }
     }
   }
+
+  @Test
+  public void testV2WriteAllNullValues() throws Exception {
+    testV2WriteAllNullValues(null, null);
+  }
+
+  @Test
+  public void testV2WriteAllNullValuesWithEncrypted() throws Exception {
+    byte[] footerEncryptionKey = "0123456789012345".getBytes();
+    byte[] columnEncryptionKey = "1234567890123450".getBytes();
+
+    String footerEncryptionKeyID = "kf";
+    String columnEncryptionKeyID = "kc";
+
+    ColumnEncryptionProperties columnProperties = 
ColumnEncryptionProperties.builder("float")
+        .withKey(columnEncryptionKey)
+        .withKeyID(columnEncryptionKeyID)
+        .build();
+
+    Map<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = new 
HashMap<>();
+    columnPropertiesMap.put(columnProperties.getPath(), columnProperties);
+
+    FileEncryptionProperties encryptionProperties = 
FileEncryptionProperties.builder(footerEncryptionKey)
+        .withFooterKeyID(footerEncryptionKeyID)
+        .withEncryptedColumns(columnPropertiesMap)
+        .build();
+
+    DecryptionKeyRetrieverMock decryptionKeyRetrieverMock = new 
DecryptionKeyRetrieverMock()
+        .putKey(footerEncryptionKeyID, footerEncryptionKey)
+        .putKey(columnEncryptionKeyID, columnEncryptionKey);
+    FileDecryptionProperties decryptionProperties = 
FileDecryptionProperties.builder()
+        .withKeyRetriever(decryptionKeyRetrieverMock)
+        .build();
+
+    testV2WriteAllNullValues(encryptionProperties, decryptionProperties);
+  }
+
+  private void testV2WriteAllNullValues(
+      FileEncryptionProperties encryptionProperties, FileDecryptionProperties 
decryptionProperties)
+      throws Exception {
+    MessageType schema = 
Types.buildMessage().optional(FLOAT).named("float").named("msg");
+
+    Configuration conf = new Configuration();
+    GroupWriteSupport.setSchema(schema, conf);
+
+    File file = temp.newFile();
+    temp.delete();
+    Path path = new Path(file.getAbsolutePath());
+
+    SimpleGroupFactory factory = new SimpleGroupFactory(schema);
+    Group nullValue = factory.newGroup();
+    int recordCount = 10;
+
+    try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(path)
+        .withAllocator(allocator)
+        .withConf(conf)
+        .withWriterVersion(WriterVersion.PARQUET_2_0)
+        .withDictionaryEncoding(false)
+        .withEncryption(encryptionProperties)
+        .build()) {
+      for (int i = 0; i < recordCount; i++) {
+        writer.write(nullValue);
+      }
+    }
+
+    try (ParquetReader<Group> reader = ParquetReader.builder(new 
GroupReadSupport(), path)
+        .withDecryption(decryptionProperties)
+        .build()) {
+      int readRecordCount = 0;
+      for (Group group = reader.read(); group != null; group = reader.read()) {
+        assertEquals(nullValue.toString(), group.toString());
+        ++readRecordCount;
+      }
+      assertEquals("Number of written records should be equal to the read 
one", recordCount, readRecordCount);
+    }
+
+    ParquetReadOptions options = ParquetReadOptions.builder()
+        .withDecryption(decryptionProperties)
+        .build();
+    try (ParquetFileReader reader = 
ParquetFileReader.open(HadoopInputFile.fromPath(path, conf), options)) {
+      BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(0);
+      reader.f.seek(blockMetaData.getStartingPos());
+
+      if (decryptionProperties != null) {
+        InternalFileDecryptor fileDecryptor =
+            reader.getFooter().getFileMetaData().getFileDecryptor();
+        InternalColumnDecryptionSetup columnDecryptionSetup =
+            fileDecryptor.getColumnSetup(ColumnPath.fromDotString("float"));
+        byte[] dataPageHeaderAAD = AesCipher.createModuleAAD(
+            fileDecryptor.getFileAAD(), 
ModuleCipherFactory.ModuleType.DataPageHeader, 0, 0, 0);
+        PageHeader pageHeader =
+            Util.readPageHeader(reader.f, 
columnDecryptionSetup.getMetaDataDecryptor(), dataPageHeaderAAD);
+        assertFalse(pageHeader.getData_page_header_v2().isIs_compressed());
+      } else {
+        PageHeader pageHeader = Util.readPageHeader(reader.f);
+        assertFalse(pageHeader.getData_page_header_v2().isIs_compressed());
+      }
+    }
+  }
 }

Reply via email to