This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 5fd6b44936 GH-40630: [Go][Parquet] Enable writing of Parquet footer
without closing file (#40654)
5fd6b44936 is described below
commit 5fd6b44936a19761e45a8e43d7e76a0a23c5a222
Author: Peter Newcomb <[email protected]>
AuthorDate: Mon Mar 25 16:48:50 2024 -0400
GH-40630: [Go][Parquet] Enable writing of Parquet footer without closing
file (#40654)
### Rationale for this change
See #40630
### What changes are included in this PR?
1. Added `FlushWithFooter` method to *file.Writer
2. To support `FlushWithFooter`, refactored `Close` in a way that changes
the order of operations in two ways:
a. closure of open row group writers is now done after using `defer` to
ensure closure of the sink, instead of before
b. wiping out of encryption keys is now done by the same deferred
function, ensuring that it happens even upon error
### Are these changes tested?
`file_writer_test.go` has been extended to cover `FlushWithFooter` in a
manner equivalent to the existing coverage.
### Are there any user-facing changes?
Only the addition of a new public method as described above. No breaking
changes to any existing public interfaces, unless the two minor
order-of-operation changes described above are somehow a problem.
I'm not sure it's a critical fix, but one of the minor changes described
above may reduce the likelihood that an attack could inject an error (e.g., an
I/O error) to prevent an encryption key from being wiped from memory.
* GitHub Issue: #40630
Authored-by: Peter Newcomb <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
---
go/parquet/file/file_writer.go | 62 +++++++++++++++++++++++--------------
go/parquet/file/file_writer_test.go | 17 +++++++++-
go/parquet/metadata/file.go | 15 ++++++++-
3 files changed, 69 insertions(+), 25 deletions(-)
diff --git a/go/parquet/file/file_writer.go b/go/parquet/file/file_writer.go
index a2cf397cbc..57344b25cf 100644
--- a/go/parquet/file/file_writer.go
+++ b/go/parquet/file/file_writer.go
@@ -32,6 +32,7 @@ import (
type Writer struct {
sink utils.WriteCloserTell
open bool
+ footerFlushed bool
props *parquet.WriterProperties
rowGroups int
nrows int
@@ -125,6 +126,7 @@ func (fw *Writer) appendRowGroup(buffered bool)
*rowGroupWriter {
fw.rowGroupWriter.Close()
}
fw.rowGroups++
+ fw.footerFlushed = false
rgMeta := fw.metadata.AppendRowGroup()
fw.rowGroupWriter = newRowGroupWriter(fw.sink, rgMeta,
int16(fw.rowGroups)-1, fw.props, buffered, fw.fileEncryptor)
return fw.rowGroupWriter
@@ -172,12 +174,9 @@ func (fw *Writer) Close() (err error) {
// if any functions here panic, we set open to be false so
// that this doesn't get called again
fw.open = false
- if fw.rowGroupWriter != nil {
- fw.nrows += fw.rowGroupWriter.nrows
- fw.rowGroupWriter.Close()
- }
- fw.rowGroupWriter = nil
+
defer func() {
+ fw.closeEncryptor()
ierr := fw.sink.Close()
if err != nil {
if ierr != nil {
@@ -189,30 +188,48 @@ func (fw *Writer) Close() (err error) {
err = ierr
}()
+ err = fw.FlushWithFooter()
+ fw.metadata.Clear()
+ }
+ return nil
+}
+
+// FlushWithFooter closes any open row group writer and writes the file
footer, leaving
+// the writer open for additional row groups. Additional footers written by
later
+// calls to FlushWithFooter or Close will be cumulative, so that only the last
footer
+// written need ever be read by a reader.
+func (fw *Writer) FlushWithFooter() error {
+ if !fw.footerFlushed {
+ if fw.rowGroupWriter != nil {
+ fw.nrows += fw.rowGroupWriter.nrows
+ fw.rowGroupWriter.Close()
+ }
+ fw.rowGroupWriter = nil
+
+ fileMetadata, err := fw.metadata.Snapshot()
+ if err != nil {
+ return err
+ }
+
fileEncryptProps := fw.props.FileEncryptionProperties()
if fileEncryptProps == nil { // non encrypted file
- fileMetadata, err := fw.metadata.Finish()
- if err != nil {
+ if _, err = writeFileMetadata(fileMetadata, fw.sink);
err != nil {
+ return err
+ }
+ } else {
+ if err := fw.flushEncryptedFile(fileMetadata,
fileEncryptProps); err != nil {
return err
}
-
- _, err = writeFileMetadata(fileMetadata, fw.sink)
- return err
}
- return fw.closeEncryptedFile(fileEncryptProps)
+ fw.footerFlushed = true
}
return nil
}
-func (fw *Writer) closeEncryptedFile(props *parquet.FileEncryptionProperties)
error {
+func (fw *Writer) flushEncryptedFile(fileMetadata *metadata.FileMetaData,
props *parquet.FileEncryptionProperties) error {
// encrypted file with encrypted footer
if props.EncryptedFooter() {
- fileMetadata, err := fw.metadata.Finish()
- if err != nil {
- return err
- }
-
footerLen := int64(0)
cryptoMetadata := fw.metadata.GetFileCryptoMetaData()
@@ -236,19 +253,18 @@ func (fw *Writer) closeEncryptedFile(props
*parquet.FileEncryptionProperties) er
return err
}
} else {
- fileMetadata, err := fw.metadata.Finish()
- if err != nil {
- return err
- }
footerSigningEncryptor :=
fw.fileEncryptor.GetFooterSigningEncryptor()
- if _, err = writeEncryptedFileMetadata(fileMetadata, fw.sink,
footerSigningEncryptor, false); err != nil {
+ if _, err := writeEncryptedFileMetadata(fileMetadata, fw.sink,
footerSigningEncryptor, false); err != nil {
return err
}
}
+ return nil
+}
+
+func (fw *Writer) closeEncryptor() {
if fw.fileEncryptor != nil {
fw.fileEncryptor.WipeOutEncryptionKeys()
}
- return nil
}
func writeFileMetadata(fileMetadata *metadata.FileMetaData, w io.Writer) (n
int64, err error) {
diff --git a/go/parquet/file/file_writer_test.go
b/go/parquet/file/file_writer_test.go
index 434c9852c5..3687fc8778 100644
--- a/go/parquet/file/file_writer_test.go
+++ b/go/parquet/file/file_writer_test.go
@@ -64,6 +64,20 @@ func (t *SerializeTestSuite) fileSerializeTest(codec
compress.Compression, expec
writer := file.NewParquetWriter(sink, t.Schema.Root(),
file.WithWriterProps(props))
t.GenerateData(int64(t.rowsPerRG))
+
+ t.serializeGeneratedData(writer)
+ writer.FlushWithFooter()
+
+ t.validateSerializedData(writer, sink, expected)
+
+ t.serializeGeneratedData(writer)
+ writer.Close()
+
+ t.numRowGroups *= 2
+ t.validateSerializedData(writer, sink, expected)
+}
+
+func (t *SerializeTestSuite) serializeGeneratedData(writer *file.Writer) {
for rg := 0; rg < t.numRowGroups/2; rg++ {
rgw := writer.AppendRowGroup()
for col := 0; col < t.numCols; col++ {
@@ -94,8 +108,9 @@ func (t *SerializeTestSuite) fileSerializeTest(codec
compress.Compression, expec
}
rgw.Close()
}
- writer.Close()
+}
+func (t *SerializeTestSuite) validateSerializedData(writer *file.Writer, sink
*encoding.BufferWriter, expected compress.Compression) {
nrows := t.numRowGroups * t.rowsPerRG
t.EqualValues(nrows, writer.NumRows())
diff --git a/go/parquet/metadata/file.go b/go/parquet/metadata/file.go
index f40081f172..fc37638316 100644
--- a/go/parquet/metadata/file.go
+++ b/go/parquet/metadata/file.go
@@ -104,6 +104,15 @@ func (f *FileMetaDataBuilder) AppendKeyValueMetadata(key
string, value string) e
// version etc. This will clear out this filemetadatabuilder so it can
// be re-used
func (f *FileMetaDataBuilder) Finish() (*FileMetaData, error) {
+ out, err := f.Snapshot()
+ f.Clear()
+ return out, err
+}
+
+// Snapshot returns finalized metadata of the number of rows, row groups,
version etc.
+// The snapshot must be used (e.g., serialized) before any additional
(meta)data is
+// written, as it refers to builder datastructures that will continue to
mutate.
+func (f *FileMetaDataBuilder) Snapshot() (*FileMetaData, error) {
totalRows := int64(0)
for _, rg := range f.rowGroups {
totalRows += rg.NumRows
@@ -161,9 +170,13 @@ func (f *FileMetaDataBuilder) Finish() (*FileMetaData,
error) {
}
out.initColumnOrders()
+ return out, nil
+}
+
+// Clears out this filemetadatabuilder so it can be re-used
+func (f *FileMetaDataBuilder) Clear() {
f.metadata = format.NewFileMetaData()
f.rowGroups = nil
- return out, nil
}
// KeyValueMetadata is an alias for a slice of thrift keyvalue pairs.