This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5fd6b44936 GH-40630: [Go][Parquet] Enable writing of Parquet footer 
without closing file (#40654)
5fd6b44936 is described below

commit 5fd6b44936a19761e45a8e43d7e76a0a23c5a222
Author: Peter Newcomb <[email protected]>
AuthorDate: Mon Mar 25 16:48:50 2024 -0400

    GH-40630: [Go][Parquet] Enable writing of Parquet footer without closing 
file (#40654)
    
    
    
    ### Rationale for this change
    
    See #40630
    
    ### What changes are included in this PR?
    
    1. Added `FlushWithFooter` method to *file.Writer
    2. To support `FlushWithFooter`, refactored `Close` in a way that changes 
the order of operations in two ways:
       a. closure of open row group writers is now done after using `defer` to 
ensure closure of the sink, instead of before
       b. wiping out of encryption keys is now done by the same deferred 
function, ensuring that it happens even upon error
    
    ### Are these changes tested?
    
    `file_writer_test.go` has been extended to cover `FlushWithFooter` in a 
manner equivalent to the existing coverage.
    
    ### Are there any user-facing changes?
    
    Only the addition of a new public method as described above.  No breaking 
changes to any existing public interfaces, unless the two minor 
order-of-operation changes described above are somehow a problem.
    
    I'm not sure it's a critical fix, but one of the minor changes described 
above may reduce the likelihood that an attack could inject an error (e.g., an 
I/O error) to prevent an encryption key from being wiped from memory.
    
    * GitHub Issue: #40630
    
    Authored-by: Peter Newcomb <[email protected]>
    Signed-off-by: Matt Topol <[email protected]>
---
 go/parquet/file/file_writer.go      | 62 +++++++++++++++++++++++--------------
 go/parquet/file/file_writer_test.go | 17 +++++++++-
 go/parquet/metadata/file.go         | 15 ++++++++-
 3 files changed, 69 insertions(+), 25 deletions(-)

diff --git a/go/parquet/file/file_writer.go b/go/parquet/file/file_writer.go
index a2cf397cbc..57344b25cf 100644
--- a/go/parquet/file/file_writer.go
+++ b/go/parquet/file/file_writer.go
@@ -32,6 +32,7 @@ import (
 type Writer struct {
        sink           utils.WriteCloserTell
        open           bool
+       footerFlushed  bool
        props          *parquet.WriterProperties
        rowGroups      int
        nrows          int
@@ -125,6 +126,7 @@ func (fw *Writer) appendRowGroup(buffered bool) 
*rowGroupWriter {
                fw.rowGroupWriter.Close()
        }
        fw.rowGroups++
+       fw.footerFlushed = false
        rgMeta := fw.metadata.AppendRowGroup()
        fw.rowGroupWriter = newRowGroupWriter(fw.sink, rgMeta, 
int16(fw.rowGroups)-1, fw.props, buffered, fw.fileEncryptor)
        return fw.rowGroupWriter
@@ -172,12 +174,9 @@ func (fw *Writer) Close() (err error) {
                // if any functions here panic, we set open to be false so
                // that this doesn't get called again
                fw.open = false
-               if fw.rowGroupWriter != nil {
-                       fw.nrows += fw.rowGroupWriter.nrows
-                       fw.rowGroupWriter.Close()
-               }
-               fw.rowGroupWriter = nil
+
                defer func() {
+                       fw.closeEncryptor()
                        ierr := fw.sink.Close()
                        if err != nil {
                                if ierr != nil {
@@ -189,30 +188,48 @@ func (fw *Writer) Close() (err error) {
                        err = ierr
                }()
 
+               err = fw.FlushWithFooter()
+               fw.metadata.Clear()
+       }
+       return nil
+}
+
+// FlushWithFooter closes any open row group writer and writes the file 
footer, leaving
+// the writer open for additional row groups.  Additional footers written by 
later
+// calls to FlushWithFooter or Close will be cumulative, so that only the last 
footer
+// written need ever be read by a reader.
+func (fw *Writer) FlushWithFooter() error {
+       if !fw.footerFlushed {
+               if fw.rowGroupWriter != nil {
+                       fw.nrows += fw.rowGroupWriter.nrows
+                       fw.rowGroupWriter.Close()
+               }
+               fw.rowGroupWriter = nil
+
+               fileMetadata, err := fw.metadata.Snapshot()
+               if err != nil {
+                       return err
+               }
+
                fileEncryptProps := fw.props.FileEncryptionProperties()
                if fileEncryptProps == nil { // non encrypted file
-                       fileMetadata, err := fw.metadata.Finish()
-                       if err != nil {
+                       if _, err = writeFileMetadata(fileMetadata, fw.sink); 
err != nil {
+                               return err
+                       }
+               } else {
+                       if err := fw.flushEncryptedFile(fileMetadata, 
fileEncryptProps); err != nil {
                                return err
                        }
-
-                       _, err = writeFileMetadata(fileMetadata, fw.sink)
-                       return err
                }
 
-               return fw.closeEncryptedFile(fileEncryptProps)
+               fw.footerFlushed = true
        }
        return nil
 }
 
-func (fw *Writer) closeEncryptedFile(props *parquet.FileEncryptionProperties) 
error {
+func (fw *Writer) flushEncryptedFile(fileMetadata *metadata.FileMetaData, 
props *parquet.FileEncryptionProperties) error {
        // encrypted file with encrypted footer
        if props.EncryptedFooter() {
-               fileMetadata, err := fw.metadata.Finish()
-               if err != nil {
-                       return err
-               }
-
                footerLen := int64(0)
 
                cryptoMetadata := fw.metadata.GetFileCryptoMetaData()
@@ -236,19 +253,18 @@ func (fw *Writer) closeEncryptedFile(props 
*parquet.FileEncryptionProperties) er
                        return err
                }
        } else {
-               fileMetadata, err := fw.metadata.Finish()
-               if err != nil {
-                       return err
-               }
                footerSigningEncryptor := 
fw.fileEncryptor.GetFooterSigningEncryptor()
-               if _, err = writeEncryptedFileMetadata(fileMetadata, fw.sink, 
footerSigningEncryptor, false); err != nil {
+               if _, err := writeEncryptedFileMetadata(fileMetadata, fw.sink, 
footerSigningEncryptor, false); err != nil {
                        return err
                }
        }
+       return nil
+}
+
+func (fw *Writer) closeEncryptor() {
        if fw.fileEncryptor != nil {
                fw.fileEncryptor.WipeOutEncryptionKeys()
        }
-       return nil
 }
 
 func writeFileMetadata(fileMetadata *metadata.FileMetaData, w io.Writer) (n 
int64, err error) {
diff --git a/go/parquet/file/file_writer_test.go 
b/go/parquet/file/file_writer_test.go
index 434c9852c5..3687fc8778 100644
--- a/go/parquet/file/file_writer_test.go
+++ b/go/parquet/file/file_writer_test.go
@@ -64,6 +64,20 @@ func (t *SerializeTestSuite) fileSerializeTest(codec 
compress.Compression, expec
 
        writer := file.NewParquetWriter(sink, t.Schema.Root(), 
file.WithWriterProps(props))
        t.GenerateData(int64(t.rowsPerRG))
+
+       t.serializeGeneratedData(writer)
+       writer.FlushWithFooter()
+
+       t.validateSerializedData(writer, sink, expected)
+
+       t.serializeGeneratedData(writer)
+       writer.Close()
+
+       t.numRowGroups *= 2
+       t.validateSerializedData(writer, sink, expected)
+}
+
+func (t *SerializeTestSuite) serializeGeneratedData(writer *file.Writer) {
        for rg := 0; rg < t.numRowGroups/2; rg++ {
                rgw := writer.AppendRowGroup()
                for col := 0; col < t.numCols; col++ {
@@ -94,8 +108,9 @@ func (t *SerializeTestSuite) fileSerializeTest(codec 
compress.Compression, expec
                }
                rgw.Close()
        }
-       writer.Close()
+}
 
+func (t *SerializeTestSuite) validateSerializedData(writer *file.Writer, sink 
*encoding.BufferWriter, expected compress.Compression) {
        nrows := t.numRowGroups * t.rowsPerRG
        t.EqualValues(nrows, writer.NumRows())
 
diff --git a/go/parquet/metadata/file.go b/go/parquet/metadata/file.go
index f40081f172..fc37638316 100644
--- a/go/parquet/metadata/file.go
+++ b/go/parquet/metadata/file.go
@@ -104,6 +104,15 @@ func (f *FileMetaDataBuilder) AppendKeyValueMetadata(key 
string, value string) e
 // version etc. This will clear out this filemetadatabuilder so it can
 // be re-used
 func (f *FileMetaDataBuilder) Finish() (*FileMetaData, error) {
+       out, err := f.Snapshot()
+       f.Clear()
+       return out, err
+}
+
+// Snapshot returns finalized metadata of the number of rows, row groups, 
version etc.
+// The snapshot must be used (e.g., serialized) before any additional 
(meta)data is
+// written, as it refers to builder datastructures that will continue to 
mutate.
+func (f *FileMetaDataBuilder) Snapshot() (*FileMetaData, error) {
        totalRows := int64(0)
        for _, rg := range f.rowGroups {
                totalRows += rg.NumRows
@@ -161,9 +170,13 @@ func (f *FileMetaDataBuilder) Finish() (*FileMetaData, 
error) {
        }
        out.initColumnOrders()
 
+       return out, nil
+}
+
+// Clears out this filemetadatabuilder so it can be re-used
+func (f *FileMetaDataBuilder) Clear() {
        f.metadata = format.NewFileMetaData()
        f.rowGroups = nil
-       return out, nil
 }
 
 // KeyValueMetadata is an alias for a slice of thrift keyvalue pairs.

Reply via email to