zeroshade commented on a change in pull request #11538:
URL: https://github.com/apache/arrow/pull/11538#discussion_r744306232



##########
File path: go/parquet/encryption_read_config_test.go
##########
@@ -0,0 +1,443 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package parquet_test
+
+import (
+       "encoding/binary"
+       "fmt"
+       "os"
+       "path"
+       "testing"
+
+       "github.com/apache/arrow/go/arrow/memory"
+       "github.com/apache/arrow/go/parquet"
+       "github.com/apache/arrow/go/parquet/file"
+       "github.com/apache/arrow/go/parquet/internal/encryption"
+       "github.com/stretchr/testify/suite"
+)
+
+/*
+ * This file contains a unit-test for reading encrypted Parquet files with
+ * different decryption configurations.
+ *
+ * The unit-test is called multiple times, each time to decrypt parquet files 
using
+ * different decryption configuration as described below.
+ * In each call two encrypted files are read: one temporary file that was 
generated using
+ * encryption_write_config_test.go test and will be deleted upon
+ * reading it, while the second resides in
+ * parquet-testing/data repository. Those two encrypted files were encrypted 
using the
+ * same encryption configuration.
+ * The encrypted parquet file names are passed as parameter to the unit-test.
+ *
+ * A detailed description of the Parquet Modular Encryption specification can 
be found
+ * here:
+ * https://github.com/apache/parquet-format/blob/encryption/Encryption.md
+ *
+ * The following decryption configurations are used to decrypt each parquet 
file:
+ *
+ *  - Decryption configuration 1:   Decrypt using key retriever that holds the 
keys of
+ *                                  two encrypted columns and the footer key.
+ *  - Decryption configuration 2:   Decrypt using key retriever that holds the 
keys of
+ *                                  two encrypted columns and the footer key. 
Supplies
+ *                                  aad_prefix to verify file identity.
+ *  - Decryption configuration 3:   Decrypt using explicit column and footer 
keys
+ *                                  (instead of key retrieval callback).
+ *  - Decryption Configuration 4:   PlainText Footer mode - test legacy reads,
+ *                                  read the footer + all non-encrypted 
columns.
+ *                                  (pairs with encryption configuration 3)
+ *
+ * The encrypted parquet files that is read was encrypted using one of the 
configurations
+ * below:
+ *
+ *  - Encryption configuration 1:   Encrypt all columns and the footer with 
the same key.
+ *                                  (uniform encryption)
+ *  - Encryption configuration 2:   Encrypt two columns and the footer, with 
different
+ *                                  keys.
+ *  - Encryption configuration 3:   Encrypt two columns, with different keys.
+ *                                  Don’t encrypt footer (to enable legacy 
readers)
+ *                                  - plaintext footer mode.
+ *  - Encryption configuration 4:   Encrypt two columns and the footer, with 
different
+ *                                  keys. Supply aad_prefix for file identity
+ *                                  verification.
+ *  - Encryption configuration 5:   Encrypt two columns and the footer, with 
different
+ *                                  keys. Supply aad_prefix, and call
+ *                                  disable_aad_prefix_storage to prevent file
+ *                                  identity storage in file metadata.
+ *  - Encryption configuration 6:   Encrypt two columns and the footer, with 
different
+ *                                  keys. Use the alternative (AES_GCM_CTR_V1) 
algorithm.
+ */
+
+func getDataDir() string {
+       datadir := os.Getenv("PARQUET_TEST_DATA")
+       if datadir == "" {
+               panic("please point the PARQUET_TEST_DATA environment variable 
to the test data dir")
+       }
+       return datadir
+}
+
+type TestDecryptionSuite struct {
+       suite.Suite
+
+       pathToDouble        string
+       pathToFloat         string
+       decryptionConfigs   []*parquet.FileDecryptionProperties
+       footerEncryptionKey string
+       colEncryptionKey1   string
+       colEncryptionKey2   string
+       fileName            string
+}
+
+func (d *TestDecryptionSuite) TearDownSuite() {
+       os.Remove(tempdir)
+}
+
+func TestFileEncryptionDecryption(t *testing.T) {
+       suite.Run(t, new(EncryptionConfigTestSuite))
+       suite.Run(t, new(TestDecryptionSuite))
+}
+
+func (d *TestDecryptionSuite) SetupSuite() {
+       d.pathToDouble = "double_field"
+       d.pathToFloat = "float_field"
+       d.footerEncryptionKey = FooterEncryptionKey
+       d.colEncryptionKey1 = ColumnEncryptionKey1
+       d.colEncryptionKey2 = ColumnEncryptionKey2
+       d.fileName = FileName
+
+       d.createDecryptionConfigs()
+}
+
+func (d *TestDecryptionSuite) createDecryptionConfigs() {
+       // Decryption configuration 1: Decrypt using key retriever callback 
that holds the
+       // keys of two encrypted columns and the footer key.
+       stringKr1 := make(encryption.StringKeyIDRetriever)
+       stringKr1.PutKey("kf", d.footerEncryptionKey)
+       stringKr1.PutKey("kc1", d.colEncryptionKey1)
+       stringKr1.PutKey("kc2", d.colEncryptionKey2)
+
+       d.decryptionConfigs = append(d.decryptionConfigs,
+               
parquet.NewFileDecryptionProperties(parquet.WithKeyRetriever(stringKr1)))
+
+       // Decryption configuration 2: Decrypt using key retriever callback 
that holds the
+       // keys of two encrypted columns and the footer key. Supply aad_prefix.
+       stringKr2 := make(encryption.StringKeyIDRetriever)
+       stringKr2.PutKey("kf", d.footerEncryptionKey)
+       stringKr2.PutKey("kc1", d.colEncryptionKey1)
+       stringKr2.PutKey("kc2", d.colEncryptionKey2)
+       d.decryptionConfigs = append(d.decryptionConfigs,
+               
parquet.NewFileDecryptionProperties(parquet.WithKeyRetriever(stringKr2), 
parquet.WithDecryptAadPrefix(d.fileName)))
+
+       // Decryption configuration 3: Decrypt using explicit column and footer 
keys. Supply
+       // aad_prefix.
+       decryptCols := make(parquet.ColumnPathToDecryptionPropsMap)
+       decryptCols[d.pathToFloat] = 
parquet.NewColumnDecryptionProperties(d.pathToFloat, 
parquet.WithDecryptKey(d.colEncryptionKey2))
+       decryptCols[d.pathToDouble] = 
parquet.NewColumnDecryptionProperties(d.pathToDouble, 
parquet.WithDecryptKey(d.colEncryptionKey1))
+       d.decryptionConfigs = append(d.decryptionConfigs,
+               
parquet.NewFileDecryptionProperties(parquet.WithFooterKey(d.footerEncryptionKey),
 parquet.WithColumnKeys(decryptCols)))
+
+       // Decryption Configuration 4: use plaintext footer mode, read only 
footer + plaintext
+       // columns.
+       d.decryptionConfigs = append(d.decryptionConfigs, nil)
+}
+
+func (d *TestDecryptionSuite) decryptFile(filename string, decryptConfigNum 
int) {
+       // if we get decryption_config_num = x then it means the actual number 
is x+1
+       // and since we want decryption_config_num=4 we set the condition to 3
+       props := parquet.NewReaderProperties(memory.DefaultAllocator)
+       if decryptConfigNum != 3 {
+               props.FileDecryptProps = 
d.decryptionConfigs[decryptConfigNum].Clone("")
+       }
+
+       fileReader, err := file.OpenParquetFile(filename, false, 
file.WithReadProps(props))
+       if err != nil {
+               panic(err)
+       }
+       defer fileReader.Close()
+       // get metadata
+       fileMetadata := fileReader.MetaData()
+       // get number of rowgroups
+       numRowGroups := len(fileMetadata.RowGroups)
+       // number of columns
+       numColumns := fileMetadata.Schema.NumColumns()
+       d.Equal(8, numColumns)
+
+       for r := 0; r < numRowGroups; r++ {
+               rowGroupReader := fileReader.RowGroup(r)
+
+               // get rowgroup meta
+               rgMeta := fileMetadata.RowGroup(r)
+
+               valuesRead := 0
+               rowsRead := int64(0)
+
+               // get col reader for boolean column
+               colReader := rowGroupReader.Column(0)
+               boolReader := colReader.(*file.BooleanColumnChunkReader)
+
+               // get column chunk metadata for boolean column
+               boolMd, _ := rgMeta.ColumnChunk(0)
+
+               // Read all rows in column
+               i := 0
+               for boolReader.HasNext() {
+                       var val [1]bool
+                       // read one value at a time. the number of rows read is 
returned. values
+                       // read contains the number of non-null rows
+                       rowsRead, valuesRead, _ = boolReader.ReadBatch(1, 
val[:], nil, nil)
+                       // ensure only 1 value is read
+                       d.EqualValues(1, rowsRead)
+                       // there are no null values
+                       d.EqualValues(1, valuesRead)
+                       // verify the value
+                       expected := i%2 == 0
+                       d.Equal(expected, val[0], "i: ", i)
+                       i++
+               }
+               d.EqualValues(i, boolMd.NumValues())
+
+               // Get column reader for int32 column
+               colReader = rowGroupReader.Column(1)
+               int32reader := colReader.(*file.Int32ColumnChunkReader)
+
+               int32md, _ := rgMeta.ColumnChunk(1)
+               // Read all rows in column
+               i = 0
+               for int32reader.HasNext() {
+                       var val [1]int32
+                       // read one value at a time. the number of rows read is 
returned. values
+                       // read contains the number of non-null rows
+                       rowsRead, valuesRead, _ = int32reader.ReadBatch(1, 
val[:], nil, nil)
+                       // ensure only 1 value is read
+                       d.EqualValues(1, rowsRead)
+                       // there are no null values
+                       d.EqualValues(1, valuesRead)
+                       // verify the value
+                       d.EqualValues(i, val[0])
+                       i++
+               }
+               d.EqualValues(i, int32md.NumValues())
+
+               // Get column reader for int64 column
+               colReader = rowGroupReader.Column(2)
+               int64reader := colReader.(*file.Int64ColumnChunkReader)
+
+               int64md, _ := rgMeta.ColumnChunk(2)
+               // Read all rows in column
+               i = 0
+               for int64reader.HasNext() {
+                       var (
+                               val [1]int64
+                               def [1]int16
+                               rep [1]int16
+                       )
+
+                       // read one value at a time. the number of rows read is 
returned. values
+                       // read contains the number of non-null rows
+                       rowsRead, valuesRead, _ = int64reader.ReadBatch(1, 
val[:], def[:], rep[:])
+                       // ensure only 1 value is read
+                       d.EqualValues(1, rowsRead)
+                       // there are no null values
+                       d.EqualValues(1, valuesRead)
+                       // verify the value
+                       expectedValue := int64(i) * 1000 * 1000 * 1000 * 1000
+                       d.Equal(expectedValue, val[0])
+                       if i%2 == 0 {
+                               d.EqualValues(1, rep[0])
+                       } else {
+                               d.Zero(rep[0])
+                       }
+                       i++
+               }
+               d.EqualValues(i, int64md.NumValues())
+
+               // Get column reader for int96 column
+               colReader = rowGroupReader.Column(3)
+               int96reader := colReader.(*file.Int96ColumnChunkReader)
+
+               int96md, _ := rgMeta.ColumnChunk(3)
+               // Read all rows in column
+               i = 0
+               for int96reader.HasNext() {
+                       var (
+                               val [1]parquet.Int96
+                       )
+
+                       // read one value at a time. the number of rows read is 
returned. values
+                       // read contains the number of non-null rows
+                       rowsRead, valuesRead, _ = int96reader.ReadBatch(1, 
val[:], nil, nil)
+                       // ensure only 1 value is read
+                       d.EqualValues(1, rowsRead)
+                       // there are no null values
+                       d.EqualValues(1, valuesRead)
+                       // verify the value
+                       var expectedValue parquet.Int96
+                       binary.LittleEndian.PutUint32(expectedValue[:4], 
uint32(i))
+                       binary.LittleEndian.PutUint32(expectedValue[4:], 
uint32(i+1))
+                       binary.LittleEndian.PutUint32(expectedValue[8:], 
uint32(i+2))
+                       d.Equal(expectedValue, val[0])
+                       i++
+               }
+               d.EqualValues(i, int96md.NumValues())
+
+               if decryptConfigNum != 3 {

Review comment:
       added a comment

##########
File path: go/parquet/file/column_writer.go
##########
@@ -0,0 +1,567 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package file
+
+import (
+       "bytes"
+       "encoding/binary"
+       "io"
+
+       "github.com/apache/arrow/go/arrow"
+       "github.com/apache/arrow/go/arrow/array"
+       "github.com/apache/arrow/go/arrow/bitutil"
+       "github.com/apache/arrow/go/arrow/memory"
+       "github.com/apache/arrow/go/parquet"
+       "github.com/apache/arrow/go/parquet/internal/encoding"
+       "github.com/apache/arrow/go/parquet/metadata"
+       "github.com/apache/arrow/go/parquet/schema"
+)
+
+//go:generate go run ../../arrow/_tools/tmpl/main.go -i 
-data=../internal/encoding/physical_types.tmpldata 
column_writer_types.gen.go.tmpl
+
+// ColumnChunkWriter is the base interface for all columnwriters. To directly 
write
+// data to the column, you need to assert it to the correctly typed 
ColumnChunkWriter
+// instance, such as Int32ColumnWriter.
+type ColumnChunkWriter interface {
+       // Close ends this column and returns the number of bytes written
+       Close() error
+       // Type returns the underlying physical parquet type for this column
+       Type() parquet.Type
+       // Descr returns the column information for this writer
+       Descr() *schema.Column
+       // RowsWritten returns the number of rows that have so far been written 
with this writer
+       RowsWritten() int64
+       // TotalCompressedBytes returns the number of bytes, after compression, 
that have been written so far
+       TotalCompressedBytes() int64
+       // TotalBytesWritten includes the bytes for writing dictionary pages, 
while TotalCompressedBytes is
+       // just the data and page headers
+       TotalBytesWritten() int64
+       // Properties returns the current WriterProperties in use for this 
writer
+       Properties() *parquet.WriterProperties
+
+       LevelInfo() LevelInfo
+       SetBitsBuffer(*memory.Buffer)
+}
+
+func computeLevelInfo(descr *schema.Column) (info LevelInfo) {
+       info.DefLevel = descr.MaxDefinitionLevel()
+       info.RepLevel = descr.MaxRepetitionLevel()
+
+       minSpacedDefLevel := descr.MaxDefinitionLevel()
+       n := descr.SchemaNode()
+       for n != nil && n.RepetitionType() != parquet.Repetitions.Repeated {
+               if n.RepetitionType() == parquet.Repetitions.Optional {
+                       minSpacedDefLevel--
+               }
+               n = n.Parent()
+       }
+       info.RepeatedAncestorDefLevel = minSpacedDefLevel
+       return
+}
+
+type columnWriter struct {
+       metaData *metadata.ColumnChunkMetaDataBuilder
+       descr    *schema.Column
+
+       // scratch buffer if validity bits need to be recalculated
+       bitsBuffer *memory.Buffer
+       levelInfo  LevelInfo
+       pager      PageWriter
+       hasDict    bool
+       encoding   parquet.Encoding
+       props      *parquet.WriterProperties
+       defEncoder encoding.LevelEncoder
+       repEncoder encoding.LevelEncoder
+       mem        memory.Allocator
+
+       pageStatistics  metadata.TypedStatistics
+       chunkStatistics metadata.TypedStatistics
+
+       // total number of values stored in the data page. this is the maximum
+       // of the number of encoded def levels or encoded values. for
+       // non-repeated, required columns, this is equal to the number of 
encoded
+       // values. For repeated or optional values, there may be fewer data 
values
+       // than levels, and this tells you how many encoded levels there are in 
that case
+       numBuffered int64
+
+       // the total number of stored values. for repeated or optional values. 
this
+       // number may be lower than numBuffered
+       numBufferedEncoded int64

Review comment:
       done

##########
File path: go/parquet/file/column_writer.go
##########
@@ -0,0 +1,567 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package file
+
+import (
+       "bytes"
+       "encoding/binary"
+       "io"
+
+       "github.com/apache/arrow/go/arrow"
+       "github.com/apache/arrow/go/arrow/array"
+       "github.com/apache/arrow/go/arrow/bitutil"
+       "github.com/apache/arrow/go/arrow/memory"
+       "github.com/apache/arrow/go/parquet"
+       "github.com/apache/arrow/go/parquet/internal/encoding"
+       "github.com/apache/arrow/go/parquet/metadata"
+       "github.com/apache/arrow/go/parquet/schema"
+)
+
+//go:generate go run ../../arrow/_tools/tmpl/main.go -i 
-data=../internal/encoding/physical_types.tmpldata 
column_writer_types.gen.go.tmpl
+
+// ColumnChunkWriter is the base interface for all columnwriters. To directly 
write
+// data to the column, you need to assert it to the correctly typed 
ColumnChunkWriter
+// instance, such as Int32ColumnWriter.
+type ColumnChunkWriter interface {
+       // Close ends this column and returns the number of bytes written
+       Close() error
+       // Type returns the underlying physical parquet type for this column
+       Type() parquet.Type
+       // Descr returns the column information for this writer
+       Descr() *schema.Column
+       // RowsWritten returns the number of rows that have so far been written 
with this writer
+       RowsWritten() int64
+       // TotalCompressedBytes returns the number of bytes, after compression, 
that have been written so far
+       TotalCompressedBytes() int64
+       // TotalBytesWritten includes the bytes for writing dictionary pages, 
while TotalCompressedBytes is
+       // just the data and page headers
+       TotalBytesWritten() int64
+       // Properties returns the current WriterProperties in use for this 
writer
+       Properties() *parquet.WriterProperties
+
+       LevelInfo() LevelInfo
+       SetBitsBuffer(*memory.Buffer)
+}
+
+func computeLevelInfo(descr *schema.Column) (info LevelInfo) {
+       info.DefLevel = descr.MaxDefinitionLevel()
+       info.RepLevel = descr.MaxRepetitionLevel()
+
+       minSpacedDefLevel := descr.MaxDefinitionLevel()
+       n := descr.SchemaNode()
+       for n != nil && n.RepetitionType() != parquet.Repetitions.Repeated {
+               if n.RepetitionType() == parquet.Repetitions.Optional {
+                       minSpacedDefLevel--
+               }
+               n = n.Parent()
+       }
+       info.RepeatedAncestorDefLevel = minSpacedDefLevel
+       return
+}
+
+type columnWriter struct {
+       metaData *metadata.ColumnChunkMetaDataBuilder
+       descr    *schema.Column
+
+       // scratch buffer if validity bits need to be recalculated
+       bitsBuffer *memory.Buffer
+       levelInfo  LevelInfo
+       pager      PageWriter
+       hasDict    bool
+       encoding   parquet.Encoding
+       props      *parquet.WriterProperties
+       defEncoder encoding.LevelEncoder
+       repEncoder encoding.LevelEncoder
+       mem        memory.Allocator
+
+       pageStatistics  metadata.TypedStatistics
+       chunkStatistics metadata.TypedStatistics
+
+       // total number of values stored in the data page. this is the maximum
+       // of the number of encoded def levels or encoded values. for
+       // non-repeated, required columns, this is equal to the number of 
encoded
+       // values. For repeated or optional values, there may be fewer data 
values
+       // than levels, and this tells you how many encoded levels there are in 
that case
+       numBuffered int64

Review comment:
       done

##########
File path: go/parquet/file/column_writer.go
##########
@@ -0,0 +1,567 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package file
+
+import (
+       "bytes"
+       "encoding/binary"
+       "io"
+
+       "github.com/apache/arrow/go/arrow"
+       "github.com/apache/arrow/go/arrow/array"
+       "github.com/apache/arrow/go/arrow/bitutil"
+       "github.com/apache/arrow/go/arrow/memory"
+       "github.com/apache/arrow/go/parquet"
+       "github.com/apache/arrow/go/parquet/internal/encoding"
+       "github.com/apache/arrow/go/parquet/metadata"
+       "github.com/apache/arrow/go/parquet/schema"
+)
+
+//go:generate go run ../../arrow/_tools/tmpl/main.go -i 
-data=../internal/encoding/physical_types.tmpldata 
column_writer_types.gen.go.tmpl
+
+// ColumnChunkWriter is the base interface for all columnwriters. To directly 
write
+// data to the column, you need to assert it to the correctly typed 
ColumnChunkWriter
+// instance, such as Int32ColumnWriter.
+type ColumnChunkWriter interface {
+       // Close ends this column and returns the number of bytes written
+       Close() error
+       // Type returns the underlying physical parquet type for this column
+       Type() parquet.Type
+       // Descr returns the column information for this writer
+       Descr() *schema.Column
+       // RowsWritten returns the number of rows that have so far been written 
with this writer
+       RowsWritten() int64
+       // TotalCompressedBytes returns the number of bytes, after compression, 
that have been written so far
+       TotalCompressedBytes() int64
+       // TotalBytesWritten includes the bytes for writing dictionary pages, 
while TotalCompressedBytes is
+       // just the data and page headers
+       TotalBytesWritten() int64
+       // Properties returns the current WriterProperties in use for this 
writer
+       Properties() *parquet.WriterProperties
+
+       LevelInfo() LevelInfo
+       SetBitsBuffer(*memory.Buffer)
+}
+
+func computeLevelInfo(descr *schema.Column) (info LevelInfo) {
+       info.DefLevel = descr.MaxDefinitionLevel()
+       info.RepLevel = descr.MaxRepetitionLevel()
+
+       minSpacedDefLevel := descr.MaxDefinitionLevel()
+       n := descr.SchemaNode()
+       for n != nil && n.RepetitionType() != parquet.Repetitions.Repeated {
+               if n.RepetitionType() == parquet.Repetitions.Optional {
+                       minSpacedDefLevel--
+               }
+               n = n.Parent()
+       }
+       info.RepeatedAncestorDefLevel = minSpacedDefLevel
+       return
+}
+
+type columnWriter struct {
+       metaData *metadata.ColumnChunkMetaDataBuilder
+       descr    *schema.Column
+
+       // scratch buffer if validity bits need to be recalculated
+       bitsBuffer *memory.Buffer
+       levelInfo  LevelInfo
+       pager      PageWriter
+       hasDict    bool
+       encoding   parquet.Encoding
+       props      *parquet.WriterProperties
+       defEncoder encoding.LevelEncoder
+       repEncoder encoding.LevelEncoder
+       mem        memory.Allocator
+
+       pageStatistics  metadata.TypedStatistics
+       chunkStatistics metadata.TypedStatistics
+
+       // total number of values stored in the data page. this is the maximum
+       // of the number of encoded def levels or encoded values. for
+       // non-repeated, required columns, this is equal to the number of 
encoded
+       // values. For repeated or optional values, there may be fewer data 
values
+       // than levels, and this tells you how many encoded levels there are in 
that case
+       numBuffered int64
+
+       // the total number of stored values. for repeated or optional values. 
this
+       // number may be lower than numBuffered
+       numBufferedEncoded int64
+
+       rowsWritten       int
+       totalBytesWritten int64
+       // records the current number of compressed bytes in a column
+       totalCompressedBytes int64
+       closed               bool
+       fallback             bool

Review comment:
       renamed

##########
File path: go/parquet/file/column_writer.go
##########
@@ -0,0 +1,567 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package file
+
+import (
+       "bytes"
+       "encoding/binary"
+       "io"
+
+       "github.com/apache/arrow/go/arrow"
+       "github.com/apache/arrow/go/arrow/array"
+       "github.com/apache/arrow/go/arrow/bitutil"
+       "github.com/apache/arrow/go/arrow/memory"
+       "github.com/apache/arrow/go/parquet"
+       "github.com/apache/arrow/go/parquet/internal/encoding"
+       "github.com/apache/arrow/go/parquet/metadata"
+       "github.com/apache/arrow/go/parquet/schema"
+)
+
+//go:generate go run ../../arrow/_tools/tmpl/main.go -i 
-data=../internal/encoding/physical_types.tmpldata 
column_writer_types.gen.go.tmpl
+
+// ColumnChunkWriter is the base interface for all columnwriters. To directly 
write
+// data to the column, you need to assert it to the correctly typed 
ColumnChunkWriter
+// instance, such as Int32ColumnWriter.
+type ColumnChunkWriter interface {
+       // Close ends this column and returns the number of bytes written
+       Close() error
+       // Type returns the underlying physical parquet type for this column
+       Type() parquet.Type
+       // Descr returns the column information for this writer
+       Descr() *schema.Column
+       // RowsWritten returns the number of rows that have so far been written 
with this writer
+       RowsWritten() int64
+       // TotalCompressedBytes returns the number of bytes, after compression, 
that have been written so far
+       TotalCompressedBytes() int64
+       // TotalBytesWritten includes the bytes for writing dictionary pages, 
while TotalCompressedBytes is
+       // just the data and page headers
+       TotalBytesWritten() int64
+       // Properties returns the current WriterProperties in use for this 
writer
+       Properties() *parquet.WriterProperties
+
+       LevelInfo() LevelInfo
+       SetBitsBuffer(*memory.Buffer)
+}
+
+func computeLevelInfo(descr *schema.Column) (info LevelInfo) {
+       info.DefLevel = descr.MaxDefinitionLevel()
+       info.RepLevel = descr.MaxRepetitionLevel()
+
+       minSpacedDefLevel := descr.MaxDefinitionLevel()
+       n := descr.SchemaNode()
+       for n != nil && n.RepetitionType() != parquet.Repetitions.Repeated {
+               if n.RepetitionType() == parquet.Repetitions.Optional {
+                       minSpacedDefLevel--
+               }
+               n = n.Parent()
+       }
+       info.RepeatedAncestorDefLevel = minSpacedDefLevel
+       return
+}
+
+type columnWriter struct {
+       metaData *metadata.ColumnChunkMetaDataBuilder
+       descr    *schema.Column
+
+       // scratch buffer if validity bits need to be recalculated
+       bitsBuffer *memory.Buffer
+       levelInfo  LevelInfo
+       pager      PageWriter
+       hasDict    bool
+       encoding   parquet.Encoding
+       props      *parquet.WriterProperties
+       defEncoder encoding.LevelEncoder
+       repEncoder encoding.LevelEncoder
+       mem        memory.Allocator
+
+       pageStatistics  metadata.TypedStatistics
+       chunkStatistics metadata.TypedStatistics
+
+       // total number of values stored in the data page. this is the maximum
+       // of the number of encoded def levels or encoded values. for
+       // non-repeated, required columns, this is equal to the number of 
encoded
+       // values. For repeated or optional values, there may be fewer data 
values
+       // than levels, and this tells you how many encoded levels there are in 
that case
+       numBuffered int64
+
+       // the total number of stored values. for repeated or optional values. 
this
+       // number may be lower than numBuffered
+       numBufferedEncoded int64
+
+       rowsWritten       int
+       totalBytesWritten int64
+       // records the current number of compressed bytes in a column
+       totalCompressedBytes int64
+       closed               bool
+       fallback             bool
+
+       pages []DataPage
+
+       defLevelSink *encoding.PooledBufferWriter
+       repLevelSink *encoding.PooledBufferWriter
+
+       uncompressedData bytes.Buffer
+       compressedTemp   *bytes.Buffer
+
+       currentEncoder encoding.TypedEncoder
+}
+
+func newColumnWriterBase(metaData *metadata.ColumnChunkMetaDataBuilder, pager 
PageWriter, useDict bool, enc parquet.Encoding, props 
*parquet.WriterProperties) columnWriter {
+       ret := columnWriter{
+               metaData:     metaData,
+               descr:        metaData.Descr(),
+               levelInfo:    computeLevelInfo(metaData.Descr()),
+               pager:        pager,
+               hasDict:      useDict,
+               encoding:     enc,
+               props:        props,
+               mem:          props.Allocator(),
+               defLevelSink: encoding.NewPooledBufferWriter(0),
+               repLevelSink: encoding.NewPooledBufferWriter(0),
+       }
+       if pager.HasCompressor() {
+               ret.compressedTemp = new(bytes.Buffer)
+       }
+       if props.StatisticsEnabledFor(ret.descr.Path()) && 
ret.descr.SortOrder() != schema.SortUNKNOWN {
+               ret.pageStatistics = metadata.NewStatistics(ret.descr, 
props.Allocator())
+               ret.chunkStatistics = metadata.NewStatistics(ret.descr, 
props.Allocator())
+       }
+
+       if ret.props.DataPageVersion() == parquet.DataPageV1 {
+               if ret.descr.MaxDefinitionLevel() > 0 {

Review comment:
       comment added




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to