emkornfield commented on a change in pull request #11538:
URL: https://github.com/apache/arrow/pull/11538#discussion_r779310623



##########
File path: go/parquet/file/page_writer.go
##########
@@ -0,0 +1,466 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package file
+
+import (
+       "bytes"
+       "sync"
+
+       "github.com/apache/arrow/go/v7/arrow/memory"
+       "github.com/apache/arrow/go/v7/parquet"
+       "github.com/apache/arrow/go/v7/parquet/compress"
+       "github.com/apache/arrow/go/v7/parquet/internal/encoding"
+       "github.com/apache/arrow/go/v7/parquet/internal/encryption"
+       format "github.com/apache/arrow/go/v7/parquet/internal/gen-go/parquet"
+       "github.com/apache/arrow/go/v7/parquet/internal/thrift"
+       "github.com/apache/arrow/go/v7/parquet/internal/utils"
+       "github.com/apache/arrow/go/v7/parquet/metadata"
+       libthrift "github.com/apache/thrift/lib/go/thrift"
+       "golang.org/x/xerrors"
+)
+
+// PageWriter is the interface for both serialized and buffered page writers
+type PageWriter interface {
+       // Closes the current page, flushing any buffered data pages/dictionary 
pages
+       // based on the input parameters. Subsequent calls have no effect.
+       Close(hasDict, fallback bool) error
+       // Write the provided datapage out to the underlying writer
+       WriteDataPage(page DataPage) (int64, error)
+       // Write the provided dictionary page out to the underlying writer
+       WriteDictionaryPage(page *DictionaryPage) (int64, error)
+       // returns true if there is a configured compressor for the data
+       HasCompressor() bool
+       // use the configured compressor and writer properties to compress the 
data in src
+       // using the buffer buf. Returns the slice of the compressed bytes 
which may be
+       // the bytes in the provided buffer
+       Compress(buf *bytes.Buffer, src []byte) []byte
+       // Allow reuse of the pagewriter object by resetting it using these 
values instead
+       // of having to create a new object.
+       Reset(sink utils.WriterTell, codec compress.Compression, 
compressionLevel int, metadata *metadata.ColumnChunkMetaDataBuilder, rgOrdinal, 
columnOrdinal int16, metaEncryptor, dataEncryptor encryption.Encryptor) error
+}
+
+type serializedPageWriter struct {
+       mem      memory.Allocator
+       metaData *metadata.ColumnChunkMetaDataBuilder
+       sink     utils.WriterTell
+
+       nvalues           int64
+       dictPageOffset    int64
+       dataPageOffset    int64
+       totalUncompressed int64
+       totalCompressed   int64
+       pageOrdinal       int16
+       rgOrdinal         int16
+       columnOrdinal     int16
+
+       compressLevel int
+       compressor    compress.Codec
+       metaEncryptor encryption.Encryptor
+       dataEncryptor encryption.Encryptor
+       encryptionBuf bytes.Buffer
+
+       dataPageAAD       []byte
+       dataPageHeaderAAD []byte
+
+       dictEncodingStats map[parquet.Encoding]int32
+       dataEncodingStats map[parquet.Encoding]int32
+
+       thriftSerializer *thrift.Serializer
+}
+
+func createSerializedPageWriter(sink utils.WriterTell, codec 
compress.Compression, compressionLevel int, metadata 
*metadata.ColumnChunkMetaDataBuilder, rowGroupOrdinal, columnChunkOrdinal 
int16, mem memory.Allocator, metaEncryptor, dataEncryptor encryption.Encryptor) 
(PageWriter, error) {
+       var (
+               compressor compress.Codec
+               err        error
+       )
+       if codec != compress.Codecs.Uncompressed {
+               compressor, err = compress.GetCodec(codec)
+               if err != nil {
+                       return nil, err
+               }
+       }
+
+       pgwriter := &serializedPageWriter{
+               sink:              sink,
+               compressor:        compressor,
+               compressLevel:     compressionLevel,
+               metaData:          metadata,
+               rgOrdinal:         rowGroupOrdinal,
+               columnOrdinal:     columnChunkOrdinal,
+               mem:               mem,
+               metaEncryptor:     metaEncryptor,
+               dataEncryptor:     dataEncryptor,
+               dictEncodingStats: make(map[parquet.Encoding]int32),
+               dataEncodingStats: make(map[parquet.Encoding]int32),
+               thriftSerializer:  thrift.NewThriftSerializer(),
+       }
+       if metaEncryptor != nil || dataEncryptor != nil {
+               pgwriter.initEncryption()
+       }
+       return pgwriter, nil
+}
+
+// NewPageWriter returns a page writer using either the buffered or serialized 
implementations
+func NewPageWriter(sink utils.WriterTell, codec compress.Compression, 
compressionLevel int, metadata *metadata.ColumnChunkMetaDataBuilder, 
rowGroupOrdinal, columnChunkOrdinal int16, mem memory.Allocator, buffered bool, 
metaEncryptor, dataEncryptor encryption.Encryptor) (PageWriter, error) {
+       if buffered {
+               return newBufferedPageWriter(sink, codec, compressionLevel, 
metadata, rowGroupOrdinal, columnChunkOrdinal, mem, metaEncryptor, 
dataEncryptor)
+       }
+       return createSerializedPageWriter(sink, codec, compressionLevel, 
metadata, rowGroupOrdinal, columnChunkOrdinal, mem, metaEncryptor, 
dataEncryptor)
+}
+
+// Reset allows reusing the pagewriter object instead of creating a new one.
+func (pw *serializedPageWriter) Reset(sink utils.WriterTell, codec 
compress.Compression, compressionLevel int, metadata 
*metadata.ColumnChunkMetaDataBuilder, rowGroupOrdinal, columnChunkOrdinal 
int16, metaEncryptor, dataEncryptor encryption.Encryptor) error {
+       var (
+               compressor compress.Codec
+               err        error
+       )
+       if codec != compress.Codecs.Uncompressed {
+               compressor, err = compress.GetCodec(codec)
+               if err != nil {
+                       return err
+               }
+       }
+
+       pw.sink = sink
+       pw.compressor = compressor
+       pw.compressLevel = compressionLevel
+       pw.metaData = metadata
+       pw.rgOrdinal = rowGroupOrdinal
+       pw.columnOrdinal = columnChunkOrdinal
+       pw.metaEncryptor = metaEncryptor
+       pw.dataEncryptor = dataEncryptor
+       pw.dictEncodingStats = make(map[parquet.Encoding]int32)
+       pw.dataEncodingStats = make(map[parquet.Encoding]int32)
+
+       pw.nvalues = 0
+       pw.dictPageOffset = 0
+       pw.dataPageOffset = 0
+       pw.totalUncompressed = 0
+       pw.totalCompressed = 0
+       pw.pageOrdinal = 0
+
+       if metaEncryptor != nil || dataEncryptor != nil {
+               pw.initEncryption()
+       }
+       return nil
+}
+
+func (pw *serializedPageWriter) initEncryption() {
+       if pw.dataEncryptor != nil {
+               pw.dataPageAAD = 
[]byte(encryption.CreateModuleAad(pw.dataEncryptor.FileAad(), 
encryption.DataPageModule, pw.rgOrdinal, pw.columnOrdinal, -1))
+       }
+       if pw.metaEncryptor != nil {
+               pw.dataPageHeaderAAD = 
[]byte(encryption.CreateModuleAad(pw.metaEncryptor.FileAad(), 
encryption.DataPageHeaderModule, pw.rgOrdinal, pw.columnOrdinal, -1))
+       }
+}
+
+func (pw *serializedPageWriter) updateEncryption(moduleType int8) error {
+       switch moduleType {
+       case encryption.ColumnMetaModule:
+               
pw.metaEncryptor.UpdateAad(encryption.CreateModuleAad(pw.metaEncryptor.FileAad(),
 moduleType, pw.rgOrdinal, pw.columnOrdinal, -1))
+       case encryption.DataPageModule:
+               encryption.QuickUpdatePageAad(pw.dataPageAAD, pw.pageOrdinal)
+               pw.dataEncryptor.UpdateAad(string(pw.dataPageAAD))
+       case encryption.DataPageHeaderModule:
+               encryption.QuickUpdatePageAad(pw.dataPageHeaderAAD, 
pw.pageOrdinal)
+               pw.metaEncryptor.UpdateAad(string(pw.dataPageHeaderAAD))
+       case encryption.DictPageHeaderModule:
+               
pw.metaEncryptor.UpdateAad(encryption.CreateModuleAad(pw.metaEncryptor.FileAad(),
 moduleType, pw.rgOrdinal, pw.columnOrdinal, -1))
+       case encryption.DictPageModule:
+               
pw.dataEncryptor.UpdateAad(encryption.CreateModuleAad(pw.dataEncryptor.FileAad(),
 moduleType, pw.rgOrdinal, pw.columnOrdinal, -1))
+       }
+       return xerrors.New("unknown module type in updateencryption")
+}
+
+func (pw *serializedPageWriter) Close(hasDict, fallback bool) error {
+       if pw.metaEncryptor != nil {
+               pw.updateEncryption(encryption.ColumnMetaModule)
+       }
+
+       chunkInfo := metadata.ChunkMetaInfo{
+               NumValues:        pw.nvalues,
+               DictPageOffset:   pw.dictPageOffset,
+               IndexPageOffset:  -1,
+               DataPageOffset:   pw.dataPageOffset,
+               CompressedSize:   pw.totalCompressed,
+               UncompressedSize: pw.totalUncompressed,
+       }
+       encodingStats := metadata.EncodingStats{
+               DictEncodingStats: pw.dictEncodingStats,
+               DataEncodingStats: pw.dataEncodingStats,
+       }
+       pw.metaData.Finish(chunkInfo, hasDict, fallback, encodingStats, 
pw.metaEncryptor)
+       _, err := pw.metaData.WriteTo(pw.sink)
+       return err
+}
+
+func (pw *serializedPageWriter) Compress(buf *bytes.Buffer, src []byte) []byte 
{
+       maxCompressed := pw.compressor.CompressBound(int64(len(src)))
+       buf.Grow(int(maxCompressed))
+       return pw.compressor.EncodeLevel(buf.Bytes(), src, pw.compressLevel)
+}
+
+var dataPageV1HeaderPool = sync.Pool{
+       New: func() interface{} { return format.NewDataPageHeader() },
+}
+
+func (pw *serializedPageWriter) setDataPageHeader(pageHdr *format.PageHeader, 
page *DataPageV1) {
+       pageHdr.Type = format.PageType_DATA_PAGE
+       hdr := dataPageV1HeaderPool.Get().(*format.DataPageHeader)
+       hdr.NumValues = page.nvals
+       hdr.Encoding = page.encoding
+       hdr.DefinitionLevelEncoding = page.defLvlEncoding
+       hdr.RepetitionLevelEncoding = page.repLvlEncoding
+       hdr.Statistics = page.statistics.ToThrift()
+       pageHdr.DataPageHeader = hdr
+       pageHdr.DataPageHeaderV2 = nil
+       pageHdr.DictionaryPageHeader = nil
+}
+
+var dataPageV2HeaderPool = sync.Pool{
+       New: func() interface{} { return format.NewDataPageHeaderV2() },
+}
+
+func (pw *serializedPageWriter) setDataPageV2Header(pageHdr 
*format.PageHeader, page *DataPageV2) {
+       pageHdr.Type = format.PageType_DATA_PAGE_V2
+       hdr := dataPageV2HeaderPool.Get().(*format.DataPageHeaderV2)
+       hdr.NumValues = page.nvals
+       hdr.NumNulls = page.nulls
+       hdr.NumRows = page.nrows

Review comment:
       I guess it would be in the caller.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to