This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-java.git
The following commit(s) were added to refs/heads/master by this push:
new 41e294c7f GH-3122: Correct V2 page header compression fields for
zero-size data pages (#3148)
41e294c7f is described below
commit 41e294c7f7c49c6c509d9918799bf679481848ab
Author: Xianyang Liu <[email protected]>
AuthorDate: Tue Mar 4 22:41:05 2025 +0800
GH-3122: Correct V2 page header compression fields for zero-size data pages
(#3148)
---
.../org/apache/parquet/crypto/AesCtrDecryptor.java | 4 +-
.../org/apache/parquet/crypto/AesGcmDecryptor.java | 4 +-
.../format/converter/ParquetMetadataConverter.java | 131 ++++++++++++++---
.../parquet/hadoop/ColumnChunkPageWriteStore.java | 11 +-
.../apache/parquet/hadoop/ParquetFileWriter.java | 158 ++++++++++++++++++++-
.../parquet/hadoop/rewrite/ParquetRewriter.java | 1 +
.../parquet/hadoop/TestParquetFileWriter.java | 8 +-
.../apache/parquet/hadoop/TestParquetWriter.java | 115 ++++++++++++++-
8 files changed, 394 insertions(+), 38 deletions(-)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java
b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java
index 2735f63bf..afc5054b1 100755
---
a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java
@@ -55,7 +55,7 @@ public class AesCtrDecryptor extends AesCipher implements
BlockCipher.Decryptor
public byte[] decrypt(byte[] ciphertext, int cipherTextOffset, int
cipherTextLength, byte[] AAD) {
int plainTextLength = cipherTextLength - NONCE_LENGTH;
- if (plainTextLength < 1) {
+ if (plainTextLength < 0) {
throw new ParquetCryptoRuntimeException("Wrong input length " +
plainTextLength);
}
@@ -91,7 +91,7 @@ public class AesCtrDecryptor extends AesCipher implements
BlockCipher.Decryptor
int cipherTextLength = ciphertext.limit() - ciphertext.position() -
SIZE_LENGTH;
int plainTextLength = cipherTextLength - NONCE_LENGTH;
- if (plainTextLength < 1) {
+ if (plainTextLength < 0) {
throw new ParquetCryptoRuntimeException("Wrong input length " +
plainTextLength);
}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java
b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java
index dc378effc..dc75d7e2a 100755
---
a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java
@@ -51,7 +51,7 @@ public class AesGcmDecryptor extends AesCipher implements
BlockCipher.Decryptor
public byte[] decrypt(byte[] ciphertext, int cipherTextOffset, int
cipherTextLength, byte[] AAD) {
int plainTextLength = cipherTextLength - GCM_TAG_LENGTH - NONCE_LENGTH;
- if (plainTextLength < 1) {
+ if (plainTextLength < 0) {
throw new ParquetCryptoRuntimeException("Wrong input length " +
plainTextLength);
}
@@ -81,7 +81,7 @@ public class AesGcmDecryptor extends AesCipher implements
BlockCipher.Decryptor
int cipherTextOffset = SIZE_LENGTH;
int cipherTextLength = ciphertext.limit() - ciphertext.position() -
SIZE_LENGTH;
int plainTextLength = cipherTextLength - GCM_TAG_LENGTH - NONCE_LENGTH;
- if (plainTextLength < 1) {
+ if (plainTextLength < 0) {
throw new ParquetCryptoRuntimeException("Wrong input length " +
plainTextLength);
}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index 3c38f04af..87797d1fa 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -1993,7 +1993,8 @@ public class ParquetMetadataConverter {
rowCount,
dataEncoding,
rlByteLength,
- dlByteLength),
+ dlByteLength,
+ true /* compressed by default */),
to);
}
@@ -2071,6 +2072,10 @@ public class ParquetMetadataConverter {
pageHeaderAAD);
}
+ /**
+ * @deprecated will be removed in 2.0.0. Use {@link
ParquetMetadataConverter#writeDataPageV2Header(int, int, int, int, int,
org.apache.parquet.column.Encoding, int, int, boolean, OutputStream)} instead
+ */
+ @Deprecated
public void writeDataPageV2Header(
int uncompressedSize,
int compressedSize,
@@ -2091,11 +2096,16 @@ public class ParquetMetadataConverter {
dataEncoding,
rlByteLength,
dlByteLength,
+ true, /* compressed by default */
to,
null,
null);
}
+ /**
+ * @deprecated will be removed in 2.0.0. Use {@link
ParquetMetadataConverter#writeDataPageV2Header(int, int, int, int, int,
org.apache.parquet.column.Encoding, int, int, boolean, OutputStream,
BlockCipher.Encryptor, byte[])} instead
+ */
+ @Deprecated
public void writeDataPageV2Header(
int uncompressedSize,
int compressedSize,
@@ -2109,22 +2119,26 @@ public class ParquetMetadataConverter {
BlockCipher.Encryptor blockEncryptor,
byte[] pageHeaderAAD)
throws IOException {
- writePageHeader(
- newDataPageV2Header(
- uncompressedSize,
- compressedSize,
- valueCount,
- nullCount,
- rowCount,
- dataEncoding,
- rlByteLength,
- dlByteLength),
+ writeDataPageV2Header(
+ uncompressedSize,
+ compressedSize,
+ valueCount,
+ nullCount,
+ rowCount,
+ dataEncoding,
+ rlByteLength,
+ dlByteLength,
+ true, /* compressed by default */
to,
blockEncryptor,
pageHeaderAAD);
}
- private PageHeader newDataPageV2Header(
+ /**
+ * @deprecated will be removed in 2.0.0. Use {@link
ParquetMetadataConverter#writeDataPageV2Header(int, int, int, int, int,
org.apache.parquet.column.Encoding, int, int, boolean, int, OutputStream,
BlockCipher.Encryptor, byte[])} instead
+ */
+ @Deprecated
+ public void writeDataPageV2Header(
int uncompressedSize,
int compressedSize,
int valueCount,
@@ -2132,12 +2146,26 @@ public class ParquetMetadataConverter {
int rowCount,
org.apache.parquet.column.Encoding dataEncoding,
int rlByteLength,
- int dlByteLength) {
- DataPageHeaderV2 dataPageHeaderV2 = new DataPageHeaderV2(
- valueCount, nullCount, rowCount, getEncoding(dataEncoding),
dlByteLength, rlByteLength);
- PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE_V2,
uncompressedSize, compressedSize);
- pageHeader.setData_page_header_v2(dataPageHeaderV2);
- return pageHeader;
+ int dlByteLength,
+ int crc,
+ OutputStream to,
+ BlockCipher.Encryptor blockEncryptor,
+ byte[] pageHeaderAAD)
+ throws IOException {
+ writeDataPageV2Header(
+ uncompressedSize,
+ compressedSize,
+ valueCount,
+ nullCount,
+ rowCount,
+ dataEncoding,
+ rlByteLength,
+ dlByteLength,
+ true, /* compressed by default */
+ crc,
+ to,
+ blockEncryptor,
+ pageHeaderAAD);
}
public void writeDataPageV2Header(
@@ -2149,7 +2177,34 @@ public class ParquetMetadataConverter {
org.apache.parquet.column.Encoding dataEncoding,
int rlByteLength,
int dlByteLength,
- int crc,
+ boolean compressed,
+ OutputStream to)
+ throws IOException {
+ writeDataPageV2Header(
+ uncompressedSize,
+ compressedSize,
+ valueCount,
+ nullCount,
+ rowCount,
+ dataEncoding,
+ rlByteLength,
+ dlByteLength,
+ compressed,
+ to,
+ null,
+ null);
+ }
+
+ public void writeDataPageV2Header(
+ int uncompressedSize,
+ int compressedSize,
+ int valueCount,
+ int nullCount,
+ int rowCount,
+ org.apache.parquet.column.Encoding dataEncoding,
+ int rlByteLength,
+ int dlByteLength,
+ boolean compressed,
OutputStream to,
BlockCipher.Encryptor blockEncryptor,
byte[] pageHeaderAAD)
@@ -2164,12 +2219,43 @@ public class ParquetMetadataConverter {
dataEncoding,
rlByteLength,
dlByteLength,
- crc),
+ compressed),
to,
blockEncryptor,
pageHeaderAAD);
}
+ public void writeDataPageV2Header(
+ int uncompressedSize,
+ int compressedSize,
+ int valueCount,
+ int nullCount,
+ int rowCount,
+ org.apache.parquet.column.Encoding dataEncoding,
+ int rlByteLength,
+ int dlByteLength,
+ boolean compressed,
+ int crc,
+ OutputStream to,
+ BlockCipher.Encryptor blockEncryptor,
+ byte[] pageHeaderAAD)
+ throws IOException {
+ PageHeader pageHeader = newDataPageV2Header(
+ uncompressedSize,
+ compressedSize,
+ valueCount,
+ nullCount,
+ rowCount,
+ dataEncoding,
+ rlByteLength,
+ dlByteLength,
+ compressed);
+
+ pageHeader.setCrc(crc);
+
+ writePageHeader(pageHeader, to, blockEncryptor, pageHeaderAAD);
+ }
+
private PageHeader newDataPageV2Header(
int uncompressedSize,
int compressedSize,
@@ -2179,12 +2265,13 @@ public class ParquetMetadataConverter {
org.apache.parquet.column.Encoding dataEncoding,
int rlByteLength,
int dlByteLength,
- int crc) {
+ boolean compressed) {
DataPageHeaderV2 dataPageHeaderV2 = new DataPageHeaderV2(
valueCount, nullCount, rowCount, getEncoding(dataEncoding),
dlByteLength, rlByteLength);
+ dataPageHeaderV2.setIs_compressed(compressed);
+
PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE_V2,
uncompressedSize, compressedSize);
pageHeader.setData_page_header_v2(dataPageHeaderV2);
- pageHeader.setCrc(crc);
return pageHeader;
}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
index 795063e5c..566ab76cc 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -295,8 +295,13 @@ public class ColumnChunkPageWriteStore implements
PageWriteStore, BloomFilterWri
int rlByteLength = toIntWithCheck(repetitionLevels.size());
int dlByteLength = toIntWithCheck(definitionLevels.size());
int uncompressedSize = toIntWithCheck(data.size() +
repetitionLevels.size() + definitionLevels.size());
- // TODO: decide if we compress
- BytesInput compressedData = compressor.compress(data);
+ boolean compressed = false;
+ BytesInput compressedData = BytesInput.empty();
+ if (data.size() > 0) {
+ // TODO: decide if we compress
+ compressedData = compressor.compress(data);
+ compressed = true;
+ }
if (null != pageBlockEncryptor) {
AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
compressedData =
BytesInput.from(pageBlockEncryptor.encrypt(compressedData.toByteArray(),
dataPageAAD));
@@ -327,6 +332,7 @@ public class ColumnChunkPageWriteStore implements
PageWriteStore, BloomFilterWri
dataEncoding,
rlByteLength,
dlByteLength,
+ compressed,
(int) crc.getValue(),
tempOutputStream,
headerBlockEncryptor,
@@ -341,6 +347,7 @@ public class ColumnChunkPageWriteStore implements
PageWriteStore, BloomFilterWri
dataEncoding,
rlByteLength,
dlByteLength,
+ compressed,
tempOutputStream,
headerBlockEncryptor,
dataPageHeaderAAD);
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index f0a912f59..9b2f7f445 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -1044,7 +1044,9 @@ public class ParquetFileWriter implements AutoCloseable {
* @param uncompressedDataSize the size of uncompressed data
* @param statistics the statistics of the page
* @throws IOException if any I/O error occurs during writing the file
+ * @deprecated will be removed in 2.0.0. Use {@link
ParquetFileWriter#writeDataPageV2(int, int, int, BytesInput, BytesInput,
Encoding, BytesInput, boolean, int, Statistics)} instead
*/
+ @Deprecated
public void writeDataPageV2(
int rowCount,
int nullCount,
@@ -1064,6 +1066,50 @@ public class ParquetFileWriter implements AutoCloseable {
definitionLevels,
dataEncoding,
compressedData,
+ true, /* compressed by default */
+ uncompressedDataSize,
+ statistics,
+ null,
+ null,
+ null);
+ }
+
+ /**
+ * Writes a single v2 data page
+ *
+ * @param rowCount count of rows
+ * @param nullCount count of nulls
+ * @param valueCount count of values
+ * @param repetitionLevels repetition level bytes
+ * @param definitionLevels definition level bytes
+ * @param dataEncoding encoding for data
+ * @param bytes data bytes
+ * @param compressed whether the data bytes is compressed
+ * @param uncompressedDataSize the size of uncompressed data
+ * @param statistics the statistics of the page
+ * @throws IOException if any I/O error occurs during writing the file
+ */
+ public void writeDataPageV2(
+ int rowCount,
+ int nullCount,
+ int valueCount,
+ BytesInput repetitionLevels,
+ BytesInput definitionLevels,
+ Encoding dataEncoding,
+ BytesInput bytes,
+ boolean compressed,
+ int uncompressedDataSize,
+ Statistics<?> statistics)
+ throws IOException {
+ writeDataPageV2(
+ rowCount,
+ nullCount,
+ valueCount,
+ repetitionLevels,
+ definitionLevels,
+ dataEncoding,
+ bytes,
+ compressed,
uncompressedDataSize,
statistics,
null,
@@ -1086,7 +1132,9 @@ public class ParquetFileWriter implements AutoCloseable {
* @param metadataBlockEncryptor encryptor for block data
* @param pageHeaderAAD pageHeader AAD
* @throws IOException if any I/O error occurs during writing the file
+ * @deprecated will be removed in 2.0.0. Use {@link
ParquetFileWriter#writeDataPageV2(int, int, int, BytesInput, BytesInput,
Encoding, BytesInput, boolean, int, Statistics, BlockCipher.Encryptor, byte[])}
instead
*/
+ @Deprecated
public void writeDataPageV2(
int rowCount,
int nullCount,
@@ -1108,6 +1156,54 @@ public class ParquetFileWriter implements AutoCloseable {
definitionLevels,
dataEncoding,
compressedData,
+ true, /* compressed by default */
+ uncompressedDataSize,
+ statistics,
+ metadataBlockEncryptor,
+ pageHeaderAAD,
+ null);
+ }
+
+ /**
+ * Writes a single v2 data page
+ *
+ * @param rowCount count of rows
+ * @param nullCount count of nulls
+ * @param valueCount count of values
+ * @param repetitionLevels repetition level bytes
+ * @param definitionLevels definition level bytes
+ * @param dataEncoding encoding for data
+ * @param bytes data bytes
+ * @param compressed whether the data bytes is compressed
+ * @param uncompressedDataSize the size of uncompressed data
+ * @param statistics the statistics of the page
+ * @param metadataBlockEncryptor encryptor for block data
+ * @param pageHeaderAAD pageHeader AAD
+ * @throws IOException if any I/O error occurs during writing the file
+ */
+ public void writeDataPageV2(
+ int rowCount,
+ int nullCount,
+ int valueCount,
+ BytesInput repetitionLevels,
+ BytesInput definitionLevels,
+ Encoding dataEncoding,
+ BytesInput bytes,
+ boolean compressed,
+ int uncompressedDataSize,
+ Statistics<?> statistics,
+ BlockCipher.Encryptor metadataBlockEncryptor,
+ byte[] pageHeaderAAD)
+ throws IOException {
+ writeDataPageV2(
+ rowCount,
+ nullCount,
+ valueCount,
+ repetitionLevels,
+ definitionLevels,
+ dataEncoding,
+ bytes,
+ compressed,
uncompressedDataSize,
statistics,
metadataBlockEncryptor,
@@ -1131,7 +1227,9 @@ public class ParquetFileWriter implements AutoCloseable {
* @param pageHeaderAAD pageHeader AAD
* @param sizeStatistics size statistics for the page
* @throws IOException if any I/O error occurs during writing the file
+ * @deprecated will be removed in 2.0.0. Use {@link
ParquetFileWriter#writeDataPageV2(int, int, int, BytesInput, BytesInput,
Encoding, BytesInput, boolean, int, Statistics, BlockCipher.Encryptor, byte[],
SizeStatistics)} instead
*/
+ @Deprecated
public void writeDataPageV2(
int rowCount,
int nullCount,
@@ -1146,12 +1244,60 @@ public class ParquetFileWriter implements AutoCloseable
{
byte[] pageHeaderAAD,
SizeStatistics sizeStatistics)
throws IOException {
+ writeDataPageV2(
+ rowCount,
+ nullCount,
+ valueCount,
+ repetitionLevels,
+ definitionLevels,
+ dataEncoding,
+ compressedData,
+ true, /* compressed by default */
+ uncompressedDataSize,
+ statistics,
+ metadataBlockEncryptor,
+ pageHeaderAAD,
+ sizeStatistics);
+ }
+
+ /**
+ * Writes a single v2 data page
+ *
+ * @param rowCount count of rows
+ * @param nullCount count of nulls
+ * @param valueCount count of values
+ * @param repetitionLevels repetition level bytes
+ * @param definitionLevels definition level bytes
+ * @param dataEncoding encoding for data
+ * @param bytes data bytes
+ * @param compressed whether the data bytes is compressed
+ * @param uncompressedDataSize the size of uncompressed data
+ * @param statistics the statistics of the page
+ * @param metadataBlockEncryptor encryptor for block data
+ * @param pageHeaderAAD pageHeader AAD
+ * @param sizeStatistics size statistics for the page
+ * @throws IOException if any I/O error occurs during writing the file
+ */
+ public void writeDataPageV2(
+ int rowCount,
+ int nullCount,
+ int valueCount,
+ BytesInput repetitionLevels,
+ BytesInput definitionLevels,
+ Encoding dataEncoding,
+ BytesInput bytes,
+ boolean compressed,
+ int uncompressedDataSize,
+ Statistics<?> statistics,
+ BlockCipher.Encryptor metadataBlockEncryptor,
+ byte[] pageHeaderAAD,
+ SizeStatistics sizeStatistics)
+ throws IOException {
state = state.write();
int rlByteLength = toIntWithCheck(repetitionLevels.size(), "page
repetition levels");
int dlByteLength = toIntWithCheck(definitionLevels.size(), "page
definition levels");
- int compressedSize =
- toIntWithCheck(compressedData.size() + repetitionLevels.size() +
definitionLevels.size(), "page");
+ int compressedSize = toIntWithCheck(bytes.size() + repetitionLevels.size()
+ definitionLevels.size(), "page");
int uncompressedSize =
toIntWithCheck(uncompressedDataSize + repetitionLevels.size() +
definitionLevels.size(), "page");
@@ -1169,8 +1315,8 @@ public class ParquetFileWriter implements AutoCloseable {
if (definitionLevels.size() > 0) {
crcUpdate(definitionLevels);
}
- if (compressedData.size() > 0) {
- crcUpdate(compressedData);
+ if (bytes.size() > 0) {
+ crcUpdate(bytes);
}
metadataConverter.writeDataPageV2Header(
uncompressedSize,
@@ -1181,6 +1327,7 @@ public class ParquetFileWriter implements AutoCloseable {
dataEncoding,
rlByteLength,
dlByteLength,
+ compressed,
(int) crc.getValue(),
out,
metadataBlockEncryptor,
@@ -1195,6 +1342,7 @@ public class ParquetFileWriter implements AutoCloseable {
dataEncoding,
rlByteLength,
dlByteLength,
+ compressed,
out,
metadataBlockEncryptor,
pageHeaderAAD);
@@ -1209,7 +1357,7 @@ public class ParquetFileWriter implements AutoCloseable {
currentEncodings.add(dataEncoding);
encodingStatsBuilder.addDataEncoding(dataEncoding);
- BytesInput.concat(repetitionLevels, definitionLevels,
compressedData).writeAllTo(out);
+ BytesInput.concat(repetitionLevels, definitionLevels,
bytes).writeAllTo(out);
offsetIndexBuilder.add(
toIntWithCheck(out.getPos() - beforeHeader, "page"),
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
index 9535b4335..10c84731f 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
@@ -762,6 +762,7 @@ public class ParquetRewriter implements Closeable {
dlLevels,
converter.getEncoding(headerV2.getEncoding()),
BytesInput.from(pageLoad),
+ headerV2.is_compressed,
rawDataLength,
statistics,
metaEncryptor,
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index c6be72ff7..3126e1746 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -513,15 +513,15 @@ public class TestParquetFileWriter {
w.startColumn(C1, 6, CODEC);
long c1Starts = w.getPos();
- w.writeDataPageV2(4, 1, 3, repLevels, defLevels, PLAIN, data, 4,
statsC1P1);
- w.writeDataPageV2(3, 0, 3, repLevels, defLevels, PLAIN, data, 4,
statsC1P2);
+ w.writeDataPageV2(4, 1, 3, repLevels, defLevels, PLAIN, data, false, 4,
statsC1P1);
+ w.writeDataPageV2(3, 0, 3, repLevels, defLevels, PLAIN, data, false, 4,
statsC1P2);
w.endColumn();
long c1Ends = w.getPos();
w.startColumn(C2, 5, CODEC);
long c2Starts = w.getPos();
- w.writeDataPageV2(5, 2, 3, repLevels, defLevels, PLAIN, data2, 4,
EMPTY_STATS);
- w.writeDataPageV2(2, 0, 2, repLevels, defLevels, PLAIN, data2, 4,
EMPTY_STATS);
+ w.writeDataPageV2(5, 2, 3, repLevels, defLevels, PLAIN, data2, false, 4,
EMPTY_STATS);
+ w.writeDataPageV2(2, 0, 2, repLevels, defLevels, PLAIN, data2, false, 4,
EMPTY_STATS);
w.endColumn();
long c2Ends = w.getPos();
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
index 64001bcaf..c8e8f71a9 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
@@ -33,9 +33,11 @@ import static
org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -51,20 +53,32 @@ import net.openhft.hashing.LongHashFunction;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.bytes.HeapByteBufferAllocator;
import org.apache.parquet.bytes.TrackingByteBufferAllocator;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.DecryptionKeyRetrieverMock;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.InternalColumnDecryptionSetup;
+import org.apache.parquet.crypto.InternalFileDecryptor;
+import org.apache.parquet.crypto.ModuleCipherFactory;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.GroupFactory;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.Util;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.hadoop.util.HadoopOutputFile;
@@ -232,7 +246,7 @@ public class TestParquetWriter {
return null;
});
- Assert.assertFalse("Should not create a file when schema is rejected",
file.exists());
+ assertFalse("Should not create a file when schema is rejected",
file.exists());
}
// Testing the issue of PARQUET-1531 where writing null nested rows leads to
empty pages if the page row count limit
@@ -592,4 +606,103 @@ public class TestParquetWriter {
}
}
}
+
+ @Test
+ public void testV2WriteAllNullValues() throws Exception {
+ testV2WriteAllNullValues(null, null);
+ }
+
+ @Test
+ public void testV2WriteAllNullValuesWithEncrypted() throws Exception {
+ byte[] footerEncryptionKey = "0123456789012345".getBytes();
+ byte[] columnEncryptionKey = "1234567890123450".getBytes();
+
+ String footerEncryptionKeyID = "kf";
+ String columnEncryptionKeyID = "kc";
+
+ ColumnEncryptionProperties columnProperties =
ColumnEncryptionProperties.builder("float")
+ .withKey(columnEncryptionKey)
+ .withKeyID(columnEncryptionKeyID)
+ .build();
+
+ Map<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = new
HashMap<>();
+ columnPropertiesMap.put(columnProperties.getPath(), columnProperties);
+
+ FileEncryptionProperties encryptionProperties =
FileEncryptionProperties.builder(footerEncryptionKey)
+ .withFooterKeyID(footerEncryptionKeyID)
+ .withEncryptedColumns(columnPropertiesMap)
+ .build();
+
+ DecryptionKeyRetrieverMock decryptionKeyRetrieverMock = new
DecryptionKeyRetrieverMock()
+ .putKey(footerEncryptionKeyID, footerEncryptionKey)
+ .putKey(columnEncryptionKeyID, columnEncryptionKey);
+ FileDecryptionProperties decryptionProperties =
FileDecryptionProperties.builder()
+ .withKeyRetriever(decryptionKeyRetrieverMock)
+ .build();
+
+ testV2WriteAllNullValues(encryptionProperties, decryptionProperties);
+ }
+
+ private void testV2WriteAllNullValues(
+ FileEncryptionProperties encryptionProperties, FileDecryptionProperties
decryptionProperties)
+ throws Exception {
+ MessageType schema =
Types.buildMessage().optional(FLOAT).named("float").named("msg");
+
+ Configuration conf = new Configuration();
+ GroupWriteSupport.setSchema(schema, conf);
+
+ File file = temp.newFile();
+ temp.delete();
+ Path path = new Path(file.getAbsolutePath());
+
+ SimpleGroupFactory factory = new SimpleGroupFactory(schema);
+ Group nullValue = factory.newGroup();
+ int recordCount = 10;
+
+ try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(path)
+ .withAllocator(allocator)
+ .withConf(conf)
+ .withWriterVersion(WriterVersion.PARQUET_2_0)
+ .withDictionaryEncoding(false)
+ .withEncryption(encryptionProperties)
+ .build()) {
+ for (int i = 0; i < recordCount; i++) {
+ writer.write(nullValue);
+ }
+ }
+
+ try (ParquetReader<Group> reader = ParquetReader.builder(new
GroupReadSupport(), path)
+ .withDecryption(decryptionProperties)
+ .build()) {
+ int readRecordCount = 0;
+ for (Group group = reader.read(); group != null; group = reader.read()) {
+ assertEquals(nullValue.toString(), group.toString());
+ ++readRecordCount;
+ }
+ assertEquals("Number of written records should be equal to the read
one", recordCount, readRecordCount);
+ }
+
+ ParquetReadOptions options = ParquetReadOptions.builder()
+ .withDecryption(decryptionProperties)
+ .build();
+ try (ParquetFileReader reader =
ParquetFileReader.open(HadoopInputFile.fromPath(path, conf), options)) {
+ BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(0);
+ reader.f.seek(blockMetaData.getStartingPos());
+
+ if (decryptionProperties != null) {
+ InternalFileDecryptor fileDecryptor =
+ reader.getFooter().getFileMetaData().getFileDecryptor();
+ InternalColumnDecryptionSetup columnDecryptionSetup =
+ fileDecryptor.getColumnSetup(ColumnPath.fromDotString("float"));
+ byte[] dataPageHeaderAAD = AesCipher.createModuleAAD(
+ fileDecryptor.getFileAAD(),
ModuleCipherFactory.ModuleType.DataPageHeader, 0, 0, 0);
+ PageHeader pageHeader =
+ Util.readPageHeader(reader.f,
columnDecryptionSetup.getMetaDataDecryptor(), dataPageHeaderAAD);
+ assertFalse(pageHeader.getData_page_header_v2().isIs_compressed());
+ } else {
+ PageHeader pageHeader = Util.readPageHeader(reader.f);
+ assertFalse(pageHeader.getData_page_header_v2().isIs_compressed());
+ }
+ }
+ }
}