emkornfield commented on a change in pull request #11538: URL: https://github.com/apache/arrow/pull/11538#discussion_r779310558
########## File path: go/parquet/file/page_writer.go ########## @@ -0,0 +1,466 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package file + +import ( + "bytes" + "sync" + + "github.com/apache/arrow/go/v7/arrow/memory" + "github.com/apache/arrow/go/v7/parquet" + "github.com/apache/arrow/go/v7/parquet/compress" + "github.com/apache/arrow/go/v7/parquet/internal/encoding" + "github.com/apache/arrow/go/v7/parquet/internal/encryption" + format "github.com/apache/arrow/go/v7/parquet/internal/gen-go/parquet" + "github.com/apache/arrow/go/v7/parquet/internal/thrift" + "github.com/apache/arrow/go/v7/parquet/internal/utils" + "github.com/apache/arrow/go/v7/parquet/metadata" + libthrift "github.com/apache/thrift/lib/go/thrift" + "golang.org/x/xerrors" +) + +// PageWriter is the interface for both serialized and buffered page writers +type PageWriter interface { + // Closes the current page, flushing any buffered data pages/dictionary pages + // based on the input parameters. Subsequent calls have no effect. + Close(hasDict, fallback bool) error + // Write the provided datapage out to the underlying writer + WriteDataPage(page DataPage) (int64, error) + // Write the provided dictionary page out to the underlying writer + WriteDictionaryPage(page *DictionaryPage) (int64, error) + // returns true if there is a configured compressor for the data + HasCompressor() bool + // use the configured compressor and writer properties to compress the data in src + // using the buffer buf. Returns the slice of the compressed bytes which may be + // the bytes in the provided buffer + Compress(buf *bytes.Buffer, src []byte) []byte + // Allow reuse of the pagewriter object by resetting it using these values instead + // of having to create a new object. + Reset(sink utils.WriterTell, codec compress.Compression, compressionLevel int, metadata *metadata.ColumnChunkMetaDataBuilder, rgOrdinal, columnOrdinal int16, metaEncryptor, dataEncryptor encryption.Encryptor) error +} + +type serializedPageWriter struct { + mem memory.Allocator + metaData *metadata.ColumnChunkMetaDataBuilder + sink utils.WriterTell + + nvalues int64 + dictPageOffset int64 + dataPageOffset int64 + totalUncompressed int64 + totalCompressed int64 + pageOrdinal int16 + rgOrdinal int16 + columnOrdinal int16 + + compressLevel int + compressor compress.Codec + metaEncryptor encryption.Encryptor + dataEncryptor encryption.Encryptor + encryptionBuf bytes.Buffer + + dataPageAAD []byte + dataPageHeaderAAD []byte + + dictEncodingStats map[parquet.Encoding]int32 + dataEncodingStats map[parquet.Encoding]int32 + + thriftSerializer *thrift.Serializer +} + +func createSerializedPageWriter(sink utils.WriterTell, codec compress.Compression, compressionLevel int, metadata *metadata.ColumnChunkMetaDataBuilder, rowGroupOrdinal, columnChunkOrdinal int16, mem memory.Allocator, metaEncryptor, dataEncryptor encryption.Encryptor) (PageWriter, error) { + var ( + compressor compress.Codec + err error + ) + if codec != compress.Codecs.Uncompressed { + compressor, err = compress.GetCodec(codec) + if err != nil { + return nil, err + } + } + + pgwriter := &serializedPageWriter{ + sink: sink, + compressor: compressor, + compressLevel: compressionLevel, + metaData: metadata, + rgOrdinal: rowGroupOrdinal, + columnOrdinal: columnChunkOrdinal, + mem: mem, + metaEncryptor: metaEncryptor, + dataEncryptor: dataEncryptor, + dictEncodingStats: make(map[parquet.Encoding]int32), + dataEncodingStats: make(map[parquet.Encoding]int32), + thriftSerializer: thrift.NewThriftSerializer(), + } + if metaEncryptor != nil || dataEncryptor != nil { + pgwriter.initEncryption() + } + return pgwriter, nil +} + +// NewPageWriter returns a page writer using either the buffered or serialized implementations +func NewPageWriter(sink utils.WriterTell, codec compress.Compression, compressionLevel int, metadata *metadata.ColumnChunkMetaDataBuilder, rowGroupOrdinal, columnChunkOrdinal int16, mem memory.Allocator, buffered bool, metaEncryptor, dataEncryptor encryption.Encryptor) (PageWriter, error) { + if buffered { + return newBufferedPageWriter(sink, codec, compressionLevel, metadata, rowGroupOrdinal, columnChunkOrdinal, mem, metaEncryptor, dataEncryptor) + } + return createSerializedPageWriter(sink, codec, compressionLevel, metadata, rowGroupOrdinal, columnChunkOrdinal, mem, metaEncryptor, dataEncryptor) +} + +// Reset allows reusing the pagewriter object instead of creating a new one. +func (pw *serializedPageWriter) Reset(sink utils.WriterTell, codec compress.Compression, compressionLevel int, metadata *metadata.ColumnChunkMetaDataBuilder, rowGroupOrdinal, columnChunkOrdinal int16, metaEncryptor, dataEncryptor encryption.Encryptor) error { + var ( + compressor compress.Codec + err error + ) + if codec != compress.Codecs.Uncompressed { + compressor, err = compress.GetCodec(codec) + if err != nil { + return err + } + } + + pw.sink = sink + pw.compressor = compressor + pw.compressLevel = compressionLevel + pw.metaData = metadata + pw.rgOrdinal = rowGroupOrdinal + pw.columnOrdinal = columnChunkOrdinal + pw.metaEncryptor = metaEncryptor + pw.dataEncryptor = dataEncryptor + pw.dictEncodingStats = make(map[parquet.Encoding]int32) + pw.dataEncodingStats = make(map[parquet.Encoding]int32) + + pw.nvalues = 0 + pw.dictPageOffset = 0 + pw.dataPageOffset = 0 + pw.totalUncompressed = 0 + pw.totalCompressed = 0 + pw.pageOrdinal = 0 + + if metaEncryptor != nil || dataEncryptor != nil { + pw.initEncryption() + } + return nil +} + +func (pw *serializedPageWriter) initEncryption() { + if pw.dataEncryptor != nil { + pw.dataPageAAD = []byte(encryption.CreateModuleAad(pw.dataEncryptor.FileAad(), encryption.DataPageModule, pw.rgOrdinal, pw.columnOrdinal, -1)) + } + if pw.metaEncryptor != nil { + pw.dataPageHeaderAAD = []byte(encryption.CreateModuleAad(pw.metaEncryptor.FileAad(), encryption.DataPageHeaderModule, pw.rgOrdinal, pw.columnOrdinal, -1)) + } +} + +func (pw *serializedPageWriter) updateEncryption(moduleType int8) error { + switch moduleType { + case encryption.ColumnMetaModule: + pw.metaEncryptor.UpdateAad(encryption.CreateModuleAad(pw.metaEncryptor.FileAad(), moduleType, pw.rgOrdinal, pw.columnOrdinal, -1)) + case encryption.DataPageModule: + encryption.QuickUpdatePageAad(pw.dataPageAAD, pw.pageOrdinal) + pw.dataEncryptor.UpdateAad(string(pw.dataPageAAD)) + case encryption.DataPageHeaderModule: + encryption.QuickUpdatePageAad(pw.dataPageHeaderAAD, pw.pageOrdinal) + pw.metaEncryptor.UpdateAad(string(pw.dataPageHeaderAAD)) + case encryption.DictPageHeaderModule: + pw.metaEncryptor.UpdateAad(encryption.CreateModuleAad(pw.metaEncryptor.FileAad(), moduleType, pw.rgOrdinal, pw.columnOrdinal, -1)) + case encryption.DictPageModule: + pw.dataEncryptor.UpdateAad(encryption.CreateModuleAad(pw.dataEncryptor.FileAad(), moduleType, pw.rgOrdinal, pw.columnOrdinal, -1)) + } + return xerrors.New("unknown module type in updateencryption") +} + +func (pw *serializedPageWriter) Close(hasDict, fallback bool) error { + if pw.metaEncryptor != nil { + pw.updateEncryption(encryption.ColumnMetaModule) + } + + chunkInfo := metadata.ChunkMetaInfo{ + NumValues: pw.nvalues, + DictPageOffset: pw.dictPageOffset, + IndexPageOffset: -1, + DataPageOffset: pw.dataPageOffset, + CompressedSize: pw.totalCompressed, + UncompressedSize: pw.totalUncompressed, + } + encodingStats := metadata.EncodingStats{ + DictEncodingStats: pw.dictEncodingStats, + DataEncodingStats: pw.dataEncodingStats, + } + pw.metaData.Finish(chunkInfo, hasDict, fallback, encodingStats, pw.metaEncryptor) + _, err := pw.metaData.WriteTo(pw.sink) + return err +} + +func (pw *serializedPageWriter) Compress(buf *bytes.Buffer, src []byte) []byte { + maxCompressed := pw.compressor.CompressBound(int64(len(src))) + buf.Grow(int(maxCompressed)) + return pw.compressor.EncodeLevel(buf.Bytes(), src, pw.compressLevel) +} + +var dataPageV1HeaderPool = sync.Pool{ + New: func() interface{} { return format.NewDataPageHeader() }, +} + +func (pw *serializedPageWriter) setDataPageHeader(pageHdr *format.PageHeader, page *DataPageV1) { + pageHdr.Type = format.PageType_DATA_PAGE + hdr := dataPageV1HeaderPool.Get().(*format.DataPageHeader) + hdr.NumValues = page.nvals + hdr.Encoding = page.encoding + hdr.DefinitionLevelEncoding = page.defLvlEncoding + hdr.RepetitionLevelEncoding = page.repLvlEncoding + hdr.Statistics = page.statistics.ToThrift() + pageHdr.DataPageHeader = hdr + pageHdr.DataPageHeaderV2 = nil + pageHdr.DictionaryPageHeader = nil +} + +var dataPageV2HeaderPool = sync.Pool{ + New: func() interface{} { return format.NewDataPageHeaderV2() }, +} + +func (pw *serializedPageWriter) setDataPageV2Header(pageHdr *format.PageHeader, page *DataPageV2) { + pageHdr.Type = format.PageType_DATA_PAGE_V2 + hdr := dataPageV2HeaderPool.Get().(*format.DataPageHeaderV2) + hdr.NumValues = page.nvals + hdr.NumNulls = page.nulls + hdr.NumRows = page.nrows Review comment: Can you check if https://issues.apache.org/jira/browse/PARQUET-2066 applies here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
