zeroshade commented on a change in pull request #10951: URL: https://github.com/apache/arrow/pull/10951#discussion_r700263758
########## File path: go/parquet/metadata/file.go ########## @@ -0,0 +1,478 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metadata + +import ( + "bytes" + "context" + "io" + "reflect" + + "github.com/apache/arrow/go/parquet" + "github.com/apache/arrow/go/parquet/compress" + "github.com/apache/arrow/go/parquet/internal/encryption" + format "github.com/apache/arrow/go/parquet/internal/gen-go/parquet" + "github.com/apache/arrow/go/parquet/internal/thrift" + "github.com/apache/arrow/go/parquet/schema" + "golang.org/x/xerrors" +) + +// DefaultCompressionType is used unless a different compression is specified +// in the properties +var DefaultCompressionType = compress.Codecs.Uncompressed + +// FileMetaDataBuilder is a proxy for more easily constructing file metadata +// particularly used when writing a file out. +type FileMetaDataBuilder struct { + metadata *format.FileMetaData + props *parquet.WriterProperties + schema *schema.Schema + rowGroups []*format.RowGroup + currentRgBldr *RowGroupMetaDataBuilder + kvmeta KeyValueMetadata + cryptoMetadata *format.FileCryptoMetaData +} + +// NewFileMetadataBuilder will use the default writer properties if nil is passed for +// the writer properties and nil is allowable for the key value metadata. +func NewFileMetadataBuilder(schema *schema.Schema, props *parquet.WriterProperties, kvmeta KeyValueMetadata) *FileMetaDataBuilder { + var crypto *format.FileCryptoMetaData + if props.FileEncryptionProperties() != nil && props.FileEncryptionProperties().EncryptedFooter() { + crypto = format.NewFileCryptoMetaData() + } + return &FileMetaDataBuilder{ + metadata: format.NewFileMetaData(), + props: props, + schema: schema, + kvmeta: kvmeta, + cryptoMetadata: crypto, + } +} + +// GetFileCryptoMetaData returns the cryptographic information for encrypting/ +// decrypting the file. +func (f *FileMetaDataBuilder) GetFileCryptoMetaData() *FileCryptoMetadata { + if f.cryptoMetadata == nil { + return nil + } + + props := f.props.FileEncryptionProperties() + f.cryptoMetadata.EncryptionAlgorithm = props.Algorithm().ToThrift() + keyMetadata := props.FooterKeyMetadata() + if keyMetadata != "" { + f.cryptoMetadata.KeyMetadata = []byte(keyMetadata) + } + + return &FileCryptoMetadata{f.cryptoMetadata, 0} +} + +// AppendRowGroup adds a rowgroup to the list and returns a builder +// for that row group +func (f *FileMetaDataBuilder) AppendRowGroup() *RowGroupMetaDataBuilder { + if f.rowGroups == nil { + f.rowGroups = make([]*format.RowGroup, 0, 1) + } + + rg := format.NewRowGroup() + f.rowGroups = append(f.rowGroups, rg) + f.currentRgBldr = NewRowGroupMetaDataBuilder(f.props, f.schema, rg) + return f.currentRgBldr +} + +// Finish will finalize the metadata of the number of rows, row groups, +// version etc. This will clear out this filemetadatabuilder so it can +// be re-used +func (f *FileMetaDataBuilder) Finish() (*FileMetaData, error) { + totalRows := int64(0) + for _, rg := range f.rowGroups { + totalRows += rg.NumRows + } + f.metadata.NumRows = totalRows + f.metadata.RowGroups = f.rowGroups + switch f.props.Version() { + case parquet.V1: + f.metadata.Version = 1 + case parquet.V2: + f.metadata.Version = 2 + default: + f.metadata.Version = 0 + } + createdBy := f.props.CreatedBy() + f.metadata.CreatedBy = &createdBy + + // Users cannot set the `ColumnOrder` since we donot not have user defined sort order + // in the spec yet. + // We always default to `TYPE_DEFINED_ORDER`. We can expose it in + // the API once we have user defined sort orders in the Parquet format. + // TypeDefinedOrder implies choose SortOrder based on ConvertedType/PhysicalType + typeDefined := format.NewTypeDefinedOrder() + colOrder := &format.ColumnOrder{TYPE_ORDER: typeDefined} + f.metadata.ColumnOrders = make([]*format.ColumnOrder, f.schema.NumColumns()) + for idx := range f.metadata.ColumnOrders { + f.metadata.ColumnOrders[idx] = colOrder + } + + fileEncProps := f.props.FileEncryptionProperties() + if fileEncProps != nil && !fileEncProps.EncryptedFooter() { + var signingAlgo parquet.Algorithm + algo := fileEncProps.Algorithm() + signingAlgo.Aad.AadFileUnique = algo.Aad.AadFileUnique + signingAlgo.Aad.SupplyAadPrefix = algo.Aad.SupplyAadPrefix + if !algo.Aad.SupplyAadPrefix { + signingAlgo.Aad.AadPrefix = algo.Aad.AadPrefix + } + signingAlgo.Algo = parquet.AesGcm + f.metadata.EncryptionAlgorithm = signingAlgo.ToThrift() + footerSigningMetadata := f.props.FileEncryptionProperties().FooterKeyMetadata() + if footerSigningMetadata != "" { + f.metadata.FooterSigningKeyMetadata = []byte(footerSigningMetadata) + } + } + + f.metadata.Schema = schema.ToThrift(f.schema.Root()) + f.metadata.KeyValueMetadata = f.kvmeta + + out := &FileMetaData{ + FileMetaData: f.metadata, + version: NewAppVersion(f.metadata.GetCreatedBy()), + } + if err := out.initSchema(); err != nil { + return nil, err + } + out.initColumnOrders() + + f.metadata = format.NewFileMetaData() + f.rowGroups = nil + return out, nil +} + +// KeyValueMetadata is an alias for a slice of thrift keyvalue pairs +type KeyValueMetadata []*format.KeyValue + +// NewKeyValueMetadata is equivalent to make(KeyValueMetadata, 0) +func NewKeyValueMetadata() KeyValueMetadata { + return make(KeyValueMetadata, 0) +} + +func (k *KeyValueMetadata) Append(key, value string) { + *k = append(*k, &format.KeyValue{Key: key, Value: &value}) +} + +func (k KeyValueMetadata) Len() int { return len(k) } + +// Equals compares all of the metadata keys and values to check they are equal +func (k KeyValueMetadata) Equals(other KeyValueMetadata) bool { + return reflect.DeepEqual(k, other) +} + +func (k KeyValueMetadata) Keys() (ret []string) { + ret = make([]string, len(k)) + for idx, v := range k { + ret[idx] = v.GetKey() + } + return +} + +func (k KeyValueMetadata) Values() (ret []string) { + ret = make([]string, len(k)) + for idx, v := range k { + ret[idx] = v.GetValue() + } + return +} + +func (k KeyValueMetadata) FindValue(key string) *string { + for _, v := range k { + if v.Key == key { + return v.Value + } + } + return nil +} + +// FileMetaData is a proxy around the underlying thrift FileMetaData object +// to make it easier to use and interact with. +type FileMetaData struct { + *format.FileMetaData + version *AppVersion + Schema *schema.Schema + FileDecryptor encryption.FileDecryptor + metadataLen int +} + +// NewFileMetaData takes in the raw bytes of the serialized metadata to deserialize +// and will attempt to decrypt the footer if a decryptor is provided. +func NewFileMetaData(data []byte, fileDecryptor encryption.FileDecryptor) (*FileMetaData, error) { + meta := format.NewFileMetaData() + if fileDecryptor != nil { + footerDecryptor := fileDecryptor.GetFooterDecryptor() + data = footerDecryptor.Decrypt(data) + } + + remain, err := thrift.DeserializeThrift(meta, data) + if err != nil { + return nil, err + } + + f := &FileMetaData{ + FileMetaData: meta, + version: NewAppVersion(meta.GetCreatedBy()), + metadataLen: len(data) - int(remain), + FileDecryptor: fileDecryptor, + } + + f.initSchema() + f.initColumnOrders() + + return f, nil +} + +// Size is the length of the raw serialized metadata bytes in the footer +func (f *FileMetaData) Size() int { return f.metadataLen } + +// NumSchemaElements is the length of the flattened schema list in the thrift +func (f *FileMetaData) NumSchemaElements() int { + return len(f.FileMetaData.Schema) +} + +// RowGroup provides the metadata for the (0-based) index of the row group +func (f *FileMetaData) RowGroup(i int) *RowGroupMetaData { + return &RowGroupMetaData{ + f.RowGroups[i], f.Schema, f.version, f.FileDecryptor, + } +} + +func (f *FileMetaData) Serialize(ctx context.Context) ([]byte, error) { + return thrift.NewThriftSerializer().Write(ctx, f.FileMetaData) +} + +func (f *FileMetaData) SerializeString(ctx context.Context) (string, error) { + return thrift.NewThriftSerializer().WriteString(ctx, f.FileMetaData) +} + +// EncryptionAlgorithm constructs the algorithm object from the thrift +// information or returns an empty instance if it was not set. +func (f *FileMetaData) EncryptionAlgorithm() parquet.Algorithm { + if f.IsSetEncryptionAlgorithm() { + return parquet.AlgorithmFromThrift(f.GetEncryptionAlgorithm()) + } + return parquet.Algorithm{} +} + +func (f *FileMetaData) initSchema() error { + root, err := schema.FromParquet(f.FileMetaData.Schema) + if err != nil { + return err + } + f.Schema = schema.NewSchema(root.(*schema.GroupNode)) + return nil +} + +func (f *FileMetaData) initColumnOrders() { + orders := make([]parquet.ColumnOrder, 0, f.Schema.NumColumns()) + if f.IsSetColumnOrders() { + for _, o := range f.GetColumnOrders() { + if o.IsSetTYPE_ORDER() { + orders = append(orders, parquet.ColumnOrders.TypeDefinedOrder) + } else { + orders = append(orders, parquet.ColumnOrders.Undefined) + } + } + } else { + orders = orders[:f.Schema.NumColumns()] + orders[0] = parquet.ColumnOrders.Undefined + for i := 1; i < len(orders); i *= 2 { + copy(orders[i:], orders[:i]) + } + } + f.Schema.UpdateColumnOrders(orders) +} + +// WriterVersion returns the constructed application version from the +// created by string +func (f *FileMetaData) WriterVersion() *AppVersion { + if f.version == nil { + f.version = NewAppVersion(f.GetCreatedBy()) + } + return f.version +} + +// SetFilePath will set the file path into all of the columns in each row group. +func (f *FileMetaData) SetFilePath(path string) { + for _, rg := range f.RowGroups { + for _, chunk := range rg.Columns { + chunk.FilePath = &path + } + } +} + +// AppendRowGroups will add all of the rowgroup metadata from other to the +// current file metadata +func (f *FileMetaData) AppendRowGroups(other *FileMetaData) error { + if !f.Schema.Equals(other.Schema) { + return xerrors.New("parquet/FileMetaData: AppendRowGroups requires equal schemas") + } + + f.RowGroups = append(f.RowGroups, other.GetRowGroups()...) + for _, rg := range other.GetRowGroups() { + f.NumRows += rg.NumRows + } + return nil +} + +// Subset will construct a new FileMetaData object containing only the requested +// row groups by index +func (f *FileMetaData) Subset(rowGroups []int) (*FileMetaData, error) { + for _, i := range rowGroups { + if i < len(f.RowGroups) { + continue + } + return nil, xerrors.Errorf("parquet: this file only has %d row groups, but requested a subset including row group: %d", len(f.RowGroups), i) + } + + out := &FileMetaData{ + &format.FileMetaData{ + Schema: f.FileMetaData.Schema, + CreatedBy: f.CreatedBy, + ColumnOrders: f.GetColumnOrders(), + EncryptionAlgorithm: f.FileMetaData.EncryptionAlgorithm, + FooterSigningKeyMetadata: f.FooterSigningKeyMetadata, + Version: f.Version, + KeyValueMetadata: f.KeyValueMetadata(), + }, + f.version, + f.Schema, + f.FileDecryptor, + 0, + } + + out.RowGroups = make([]*format.RowGroup, 0, len(rowGroups)) + for _, selected := range rowGroups { + out.RowGroups = append(out.RowGroups, f.RowGroups[selected]) Review comment: a map instead of a slice? Not sure what benefit a map would provide here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org