This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new 38dc64bb fix(parquet): decryption of V2 data pages (#596)
38dc64bb is described below
commit 38dc64bb93890aa0c4b292f58ac3344ced9d12da
Author: daniel-adam-tfs <[email protected]>
AuthorDate: Wed Jan 21 20:19:23 2026 +0100
fix(parquet): decryption of V2 data pages (#596)
### Rationale for this change
Fixing the decryption of V2 pages.
### What changes are included in this PR?
This PR fixes the decryption of V2 pages.
### Are these changes tested?
Yes, tests cover various scenarios for both V1 and V2 page encryption
and decryption.
### Are there any user-facing changes?
No.
---
parquet/encryption_read_config_test.go | 55 +++++++++++++++++---------
parquet/encryption_write_config_test.go | 69 ++++++++++++++++++++++-----------
parquet/file/column_reader_test.go | 7 ++--
parquet/file/file_reader_test.go | 5 ++-
parquet/file/page_reader.go | 58 +++++++++++++++++++++++----
5 files changed, 141 insertions(+), 53 deletions(-)
diff --git a/parquet/encryption_read_config_test.go
b/parquet/encryption_read_config_test.go
index d0ddb3d2..5bf15f19 100644
--- a/parquet/encryption_read_config_test.go
+++ b/parquet/encryption_read_config_test.go
@@ -295,9 +295,7 @@ func (d *TestDecryptionSuite) decryptFile(filename string,
decryptConfigNum int)
// Read all rows in column
i = 0
for int96reader.HasNext() {
- var (
- val [1]parquet.Int96
- )
+ var val [1]parquet.Int96
// read one value at a time. the number of rows read is
returned. values
// read contains the number of non-null rows
@@ -553,15 +551,34 @@ func (d *TestDecryptionSuite) checkResults(fileName
string, decryptionConfig, en
// once the file is read and the second exists in parquet-testing/data folder
func (d *TestDecryptionSuite) TestDecryption() {
tests := []struct {
- file string
- config uint
+ file string
+ config uint
+ isInDataStorage bool
}{
- {"uniform_encryption.parquet.encrypted", 1},
- {"encrypt_columns_and_footer.parquet.encrypted", 2},
- {"encrypt_columns_plaintext_footer.parquet.encrypted", 3},
- {"encrypt_columns_and_footer_aad.parquet.encrypted", 4},
-
{"encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted", 5},
- {"encrypt_columns_and_footer_ctr.parquet.encrypted", 6},
+ {"uniform_encryption.parquet.encrypted", 1, true},
+ {"uniform_encryption.parquet.uncompressed.encrypted", 1, false},
+ {"uniform_encryption.parquet.v2.encrypted", 1, false},
+ {"uniform_encryption.parquet.v2.uncompressed.encrypted", 1,
false},
+ {"encrypt_columns_and_footer.parquet.encrypted", 2, true},
+ {"encrypt_columns_and_footer.parquet.uncompressed.encrypted",
2, false},
+ {"encrypt_columns_and_footer.parquet.v2.encrypted", 2, false},
+
{"encrypt_columns_and_footer.parquet.v2.uncompressed.encrypted", 2, false},
+ {"encrypt_columns_plaintext_footer.parquet.encrypted", 3, true},
+
{"encrypt_columns_plaintext_footer.parquet.uncompressed.encrypted", 3, false},
+ {"encrypt_columns_plaintext_footer.parquet.v2.encrypted", 3,
false},
+
{"encrypt_columns_plaintext_footer.parquet.v2.uncompressed.encrypted", 3,
false},
+ {"encrypt_columns_and_footer_aad.parquet.encrypted", 4, true},
+
{"encrypt_columns_and_footer_aad.parquet.uncompressed.encrypted", 4, false},
+ {"encrypt_columns_and_footer_aad.parquet.v2.encrypted", 4,
false},
+
{"encrypt_columns_and_footer_aad.parquet.v2.uncompressed.encrypted", 4, false},
+
{"encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted", 5, true},
+
{"encrypt_columns_and_footer_disable_aad_storage.parquet.uncompressed.encrypted",
5, false},
+
{"encrypt_columns_and_footer_disable_aad_storage.parquet.v2.encrypted", 5,
false},
+
{"encrypt_columns_and_footer_disable_aad_storage.parquet.v2.uncompressed.encrypted",
5, false},
+ {"encrypt_columns_and_footer_ctr.parquet.encrypted", 6, true},
+
{"encrypt_columns_and_footer_ctr.parquet.uncompressed.encrypted", 6, false},
+ {"encrypt_columns_and_footer_ctr.parquet.v2.encrypted", 6,
false},
+
{"encrypt_columns_and_footer_ctr.parquet.v2.uncompressed.encrypted", 6, false},
}
for _, tt := range tests {
d.Run(tt.file, func() {
@@ -576,14 +593,16 @@ func (d *TestDecryptionSuite) TestDecryption() {
}
os.Remove(tmpFile)
- file := path.Join(getDataDir(), tt.file)
- d.Require().FileExists(file)
+ if tt.isInDataStorage {
+ file := path.Join(getDataDir(), tt.file)
+ d.Require().FileExists(file)
- for idx := range d.decryptionConfigs {
- decConfig := idx + 1
- d.Run(fmt.Sprintf("config %d", decConfig),
func() {
- d.checkResults(file, uint(decConfig),
tt.config)
- })
+ for idx := range d.decryptionConfigs {
+ decConfig := idx + 1
+ d.Run(fmt.Sprintf("config %d",
decConfig), func() {
+ d.checkResults(file,
uint(decConfig), tt.config)
+ })
+ }
}
})
}
diff --git a/parquet/encryption_write_config_test.go
b/parquet/encryption_write_config_test.go
index b2c10e5b..8bb7b547 100644
--- a/parquet/encryption_write_config_test.go
+++ b/parquet/encryption_write_config_test.go
@@ -61,9 +61,7 @@ import (
* keys. Use the alternative (AES_GCM_CTR_V1)
algorithm.
*/
-var (
- tempdir string
-)
+var tempdir string
type EncryptionConfigTestSuite struct {
suite.Suite
@@ -79,13 +77,16 @@ type EncryptionConfigTestSuite struct {
columnEncryptionKey2 string
}
-func (en *EncryptionConfigTestSuite) encryptFile(configs
*parquet.FileEncryptionProperties, filename string) {
+func (en *EncryptionConfigTestSuite) encryptFile(configs
*parquet.FileEncryptionProperties, filename string, writerOpts
...parquet.WriterProperty) {
filename = filepath.Join(tempdir, filename)
- props := parquet.NewWriterProperties(
+ opts := []parquet.WriterProperty{
parquet.WithPageIndexEnabled(true),
parquet.WithCompression(compress.Codecs.Snappy),
- parquet.WithEncryptionProperties(configs))
+ parquet.WithEncryptionProperties(configs),
+ }
+ opts = append(opts, writerOpts...)
+ props := parquet.NewWriterProperties(opts...)
outFile, err := os.Create(filename)
en.Require().NoError(err)
en.Require().NotNil(outFile)
@@ -135,20 +136,18 @@ func (en *EncryptionConfigTestSuite) encryptFile(configs
*parquet.FileEncryption
// write the int64 column, each row repeats twice
int64Writer := nextColumn().(*file.Int64ColumnChunkWriter)
- for i := 0; i < 2*en.rowsPerRG; i++ {
+ for i := 0; i < en.rowsPerRG; i++ {
var (
- defLevel = [1]int16{1}
- repLevel = [1]int16{0}
- value = int64(i) * 1000 * 1000 * 1000 * 1000
+ defLevels = []int16{1, 1}
+ repLevels = []int16{0, 1}
+ values = []int64{
+ int64(i*2) * 1000 * 1000 * 1000 * 1000,
+ int64(i*2+1) * 1000 * 1000 * 1000 *
1000,
+ }
)
- if i%2 == 0 {
- repLevel[0] = 0
- } else {
- repLevel[0] = 1
- }
- n, err := int64Writer.WriteBatch([]int64{value},
defLevel[:], repLevel[:])
- en.EqualValues(1, n)
+ n, err := int64Writer.WriteBatch(values, defLevels,
repLevels)
+ en.EqualValues(2, n)
en.Require().NoError(err)
}
@@ -263,7 +262,11 @@ func (en *EncryptionConfigTestSuite) SetupSuite() {
// (uniform encryption)
func (en *EncryptionConfigTestSuite) TestUniformEncryption() {
props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey,
parquet.WithFooterKeyMetadata("kf"))
- en.encryptFile(props, "tmp_uniform_encryption.parquet.encrypted")
+ en.encryptFile(props.Clone(""),
"tmp_uniform_encryption.parquet.encrypted")
+ en.encryptFile(props.Clone(""),
"tmp_uniform_encryption.parquet.uncompressed.encrypted",
parquet.WithCompression(compress.Codecs.Uncompressed))
+ en.encryptFile(props.Clone(""),
"tmp_uniform_encryption.parquet.v2.encrypted",
parquet.WithDataPageVersion(parquet.DataPageV2))
+ en.encryptFile(props.Clone(""),
"tmp_uniform_encryption.parquet.v2.uncompressed.encrypted",
parquet.WithCompression(compress.Codecs.Uncompressed),
+ parquet.WithDataPageVersion(parquet.DataPageV2))
}
// Encryption config 2: Encrypt Two Columns and the Footer, with different keys
@@ -273,7 +276,11 @@ func (en *EncryptionConfigTestSuite)
TestEncryptTwoColumnsAndFooter() {
encryptCols[en.pathToFloatField] =
parquet.NewColumnEncryptionProperties(en.pathToFloatField,
parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2"))
props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey,
parquet.WithFooterKeyMetadata("kf"), parquet.WithEncryptedColumns(encryptCols))
- en.encryptFile(props,
"tmp_encrypt_columns_and_footer.parquet.encrypted")
+ en.encryptFile(props.Clone(""),
"tmp_encrypt_columns_and_footer.parquet.encrypted")
+ en.encryptFile(props.Clone(""),
"tmp_encrypt_columns_and_footer.parquet.uncompressed.encrypted",
parquet.WithCompression(compress.Codecs.Uncompressed))
+ en.encryptFile(props.Clone(""),
"tmp_encrypt_columns_and_footer.parquet.v2.encrypted",
parquet.WithDataPageVersion(parquet.DataPageV2))
+ en.encryptFile(props.Clone(""),
"tmp_encrypt_columns_and_footer.parquet.v2.uncompressed.encrypted",
parquet.WithCompression(compress.Codecs.Uncompressed),
+ parquet.WithDataPageVersion(parquet.DataPageV2))
}
// Encryption Config 3: encrypt two columns, with different keys.
@@ -285,7 +292,11 @@ func (en *EncryptionConfigTestSuite)
TestEncryptTwoColumnsPlaintextFooter() {
encryptCols[en.pathToFloatField] =
parquet.NewColumnEncryptionProperties(en.pathToFloatField,
parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2"))
props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey,
parquet.WithFooterKeyMetadata("kf"), parquet.WithEncryptedColumns(encryptCols),
parquet.WithPlaintextFooter())
- en.encryptFile(props,
"tmp_encrypt_columns_plaintext_footer.parquet.encrypted")
+ en.encryptFile(props.Clone(""),
"tmp_encrypt_columns_plaintext_footer.parquet.encrypted")
+ en.encryptFile(props.Clone(""),
"tmp_encrypt_columns_plaintext_footer.parquet.uncompressed.encrypted",
parquet.WithCompression(compress.Codecs.Uncompressed))
+ en.encryptFile(props.Clone(""),
"tmp_encrypt_columns_plaintext_footer.parquet.v2.encrypted",
parquet.WithDataPageVersion(parquet.DataPageV2))
+ en.encryptFile(props.Clone(""),
"tmp_encrypt_columns_plaintext_footer.parquet.v2.uncompressed.encrypted",
parquet.WithCompression(compress.Codecs.Uncompressed),
+ parquet.WithDataPageVersion(parquet.DataPageV2))
}
// Encryption Config 4: Encrypt two columns and the footer, with different keys
@@ -296,7 +307,11 @@ func (en *EncryptionConfigTestSuite)
TestEncryptTwoColumnsAndFooterWithAadPrefix
encryptCols[en.pathToFloatField] =
parquet.NewColumnEncryptionProperties(en.pathToFloatField,
parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2"))
props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey,
parquet.WithFooterKeyMetadata("kf"), parquet.WithEncryptedColumns(encryptCols),
parquet.WithAadPrefix(en.fileName))
- en.encryptFile(props,
"tmp_encrypt_columns_and_footer_aad.parquet.encrypted")
+ en.encryptFile(props.Clone(""),
"tmp_encrypt_columns_and_footer_aad.parquet.encrypted")
+ en.encryptFile(props.Clone(""),
"tmp_encrypt_columns_and_footer_aad.parquet.uncompressed.encrypted",
parquet.WithCompression(compress.Codecs.Uncompressed))
+ en.encryptFile(props.Clone(""),
"tmp_encrypt_columns_and_footer_aad.parquet.v2.encrypted",
parquet.WithDataPageVersion(parquet.DataPageV2))
+ en.encryptFile(props.Clone(""),
"tmp_encrypt_columns_and_footer_aad.parquet.v2.uncompressed.encrypted",
parquet.WithCompression(compress.Codecs.Uncompressed),
+ parquet.WithDataPageVersion(parquet.DataPageV2))
}
// Encryption Config 5: Encrypt Two columns and the footer, with different keys
@@ -307,7 +322,11 @@ func (en *EncryptionConfigTestSuite)
TestEncryptTwoColumnsAndFooterWithAadPrefix
encryptCols[en.pathToFloatField] =
parquet.NewColumnEncryptionProperties(en.pathToFloatField,
parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2"))
props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey,
parquet.WithFooterKeyMetadata("kf"), parquet.WithAadPrefix(en.fileName),
parquet.DisableAadPrefixStorage())
- en.encryptFile(props,
"tmp_encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted")
+ en.encryptFile(props.Clone(""),
"tmp_encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted")
+ en.encryptFile(props.Clone(""),
"tmp_encrypt_columns_and_footer_disable_aad_storage.parquet.uncompressed.encrypted",
parquet.WithCompression(compress.Codecs.Uncompressed))
+ en.encryptFile(props.Clone(""),
"tmp_encrypt_columns_and_footer_disable_aad_storage.parquet.v2.encrypted",
parquet.WithDataPageVersion(parquet.DataPageV2))
+ en.encryptFile(props.Clone(""),
"tmp_encrypt_columns_and_footer_disable_aad_storage.parquet.v2.uncompressed.encrypted",
parquet.WithCompression(compress.Codecs.Uncompressed),
+ parquet.WithDataPageVersion(parquet.DataPageV2))
}
// Encryption Config 6: Encrypt two columns and the footer, with different
keys.
@@ -318,7 +337,11 @@ func (en *EncryptionConfigTestSuite)
TestEncryptTwoColumnsAndFooterAesGcmCtr() {
encryptCols[en.pathToFloatField] =
parquet.NewColumnEncryptionProperties(en.pathToFloatField,
parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2"))
props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey,
parquet.WithFooterKeyMetadata("kf"), parquet.WithEncryptedColumns(encryptCols),
parquet.WithAlg(parquet.AesCtr))
- en.encryptFile(props,
"tmp_encrypt_columns_and_footer_ctr.parquet.encrypted")
+ en.encryptFile(props.Clone(""),
"tmp_encrypt_columns_and_footer_ctr.parquet.encrypted")
+ en.encryptFile(props.Clone(""),
"tmp_encrypt_columns_and_footer_ctr.parquet.uncompressed.encrypted",
parquet.WithCompression(compress.Codecs.Uncompressed))
+ en.encryptFile(props.Clone(""),
"tmp_encrypt_columns_and_footer_ctr.parquet.v2.encrypted",
parquet.WithDataPageVersion(parquet.DataPageV2))
+ en.encryptFile(props.Clone(""),
"tmp_encrypt_columns_and_footer_ctr.parquet.v2.uncompressed.encrypted",
parquet.WithCompression(compress.Codecs.Uncompressed),
+ parquet.WithDataPageVersion(parquet.DataPageV2))
}
func TestFileEncryption(t *testing.T) {
diff --git a/parquet/file/column_reader_test.go
b/parquet/file/column_reader_test.go
index 25c26bc8..581f8957 100644
--- a/parquet/file/column_reader_test.go
+++ b/parquet/file/column_reader_test.go
@@ -551,9 +551,11 @@ func (p *PrimitiveReaderSuite)
TestRepetitionLvlBytesWithMaxRepZero() {
// bytes: the page header reports 1 byte for repetition levels even
// though the max rep level is 0. If that byte isn't skipped then
// we get def levels of [1, 1, 0, 0] instead of the correct [1, 1, 1,
0].
- pageData := [...]byte{0x3, 0x3, 0x7, 0x80, 0x1, 0x4, 0x3,
+ pageData := [...]byte{
+ 0x3, 0x3, 0x7, 0x80, 0x1, 0x4, 0x3,
0x18, 0x1, 0x2, 0x0, 0x0, 0x0, 0xc,
- 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}
+ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
+ }
p.pages = append(p.pages,
file.NewDataPageV2(memory.NewBufferBytes(pageData[:]), batchSize, 1, batchSize,
parquet.Encodings.DeltaBinaryPacked, 2, 1,
int32(len(pageData)), false))
@@ -733,7 +735,6 @@ func TestFullSeekRow(t *testing.T) {
for _, dataPageVersion := range
[]parquet.DataPageVersion{parquet.DataPageV2, parquet.DataPageV1} {
t.Run(fmt.Sprintf("DataPageVersion=%v", dataPageVersion+1),
func(t *testing.T) {
-
props :=
parquet.NewWriterProperties(parquet.WithAllocator(mem),
parquet.WithDataPageVersion(dataPageVersion),
parquet.WithDataPageSize(1),
parquet.WithPageIndexEnabled(true))
diff --git a/parquet/file/file_reader_test.go b/parquet/file/file_reader_test.go
index 1927ca87..c764ff07 100644
--- a/parquet/file/file_reader_test.go
+++ b/parquet/file/file_reader_test.go
@@ -207,7 +207,7 @@ func (p *PageSerdeSuite) TestDataPageV2() {
p.dataPageHdrV2.Statistics = getDummyStats(statsSize, true)
p.dataPageHdrV2.NumValues = nrows
- p.WriteDataPageHeaderV2(1024, 20, 10)
+ p.WriteDataPageHeaderV2(1024, 0, 0)
p.InitSerializedPageReader(nrows, compress.Codecs.Uncompressed)
p.True(p.pageReader.Next())
p.CheckDataPageHeaderV2(p.dataPageHdrV2, p.pageReader.Page())
@@ -310,7 +310,8 @@ func (p *PageSerdeSuite) TestCompression() {
func TestWithEOFReader(t *testing.T) {
root, _ := schema.NewGroupNode("schema", parquet.Repetitions.Repeated,
schema.FieldList{
- schema.NewInt32Node("int_col", parquet.Repetitions.Required,
-1)}, -1)
+ schema.NewInt32Node("int_col", parquet.Repetitions.Required,
-1),
+ }, -1)
props :=
parquet.NewWriterProperties(parquet.WithVersion(parquet.V2_LATEST))
var buf bytes.Buffer
diff --git a/parquet/file/page_reader.go b/parquet/file/page_reader.go
index 1ba7ecbe..307c6c6a 100644
--- a/parquet/file/page_reader.go
+++ b/parquet/file/page_reader.go
@@ -515,6 +515,50 @@ func (p *serializedPageReader) decompress(rd io.Reader,
lenCompressed int, buf [
return p.codec.Decode(buf, data), nil
}
+func (p *serializedPageReader) readV2Encrypted(rd io.Reader, lenCompressed
int, levelsBytelen int, compressed bool, buf []byte) error {
+ // if encrypted, we need to decrypt before decompressing
+ p.decompressBuffer.ResizeNoShrink(lenCompressed)
+ b := bytes.NewBuffer(p.decompressBuffer.Bytes()[:0])
+ if _, err := io.CopyN(b, rd, int64(lenCompressed)); err != nil {
+ return err
+ }
+ data := p.cryptoCtx.DataDecryptor.Decrypt(p.decompressBuffer.Bytes())
+ // encrypted + uncompressed -> just copy the decrypted data to output
buffer
+ if !compressed {
+ copy(buf, data)
+ return nil
+ }
+
+ // definition + repetition levels are always uncompressed
+ if levelsBytelen > 0 {
+ copy(buf, data[:levelsBytelen])
+ data = data[levelsBytelen:]
+ }
+ p.codec.Decode(buf[levelsBytelen:], data)
+ return nil
+}
+
+func (p *serializedPageReader) readV2Unencrypted(rd io.Reader, lenCompressed
int, levelsBytelen int, compressed bool, buf []byte) error {
+ if !compressed {
+ // uncompressed, just read into the buffer
+ if _, err := io.ReadFull(rd, buf); err != nil {
+ return err
+ }
+ return nil
+ }
+
+ // definition + repetition levels are always uncompressed
+ if levelsBytelen > 0 {
+ if _, err := io.ReadFull(rd, buf[:levelsBytelen]); err != nil {
+ return err
+ }
+ }
+ if _, err := p.decompress(p.r, lenCompressed-levelsBytelen,
buf[levelsBytelen:]); err != nil {
+ return err
+ }
+ return nil
+}
+
type dataheader interface {
IsSetStatistics() bool
GetStatistics() *format.Statistics
@@ -628,7 +672,6 @@ func (p *serializedPageReader) readPageHeader(rd
parquet.BufferedReader, hdr *fo
}
continue
}
-
rd.Discard(len(view) - int(remaining) + extra)
break
}
@@ -812,15 +855,16 @@ func (p *serializedPageReader) Next() bool {
return false
}
- if compressed {
- if levelsBytelen > 0 {
- io.ReadFull(p.r,
buf.Bytes()[:levelsBytelen])
- }
- if _, p.err = p.decompress(p.r,
lenCompressed-levelsBytelen, buf.Bytes()[levelsBytelen:]); p.err != nil {
+ if p.cryptoCtx.DataDecryptor != nil {
+ if err := p.readV2Encrypted(p.r, lenCompressed,
levelsBytelen, compressed, buf.Bytes()); err != nil {
+ p.err = err
return false
}
} else {
- io.ReadFull(p.r, buf.Bytes())
+ if err := p.readV2Unencrypted(p.r,
lenCompressed, levelsBytelen, compressed, buf.Bytes()); err != nil {
+ p.err = err
+ return false
+ }
}
if buf.Len() != lenUncompressed {