This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 244d47c  feat(parquet): Move footerOffset into FileMetaData (#217)
244d47c is described below

commit 244d47c146e169315eefa1a18092b18c838e69cb
Author: Ruihao Chen <[email protected]>
AuthorDate: Sat Dec 14 05:35:37 2024 +0800

    feat(parquet): Move footerOffset into FileMetaData (#217)
    
    ### Rationale for this change
    
    After looking into the code, I found that `footerOffset` is mainly used
    in parsing the meta data of parquet file.
    
    I think we can store it in `FileMetaData`, so we don't need to get
    `footerOffset` again when we use `WithMetadata`.
    
    ### What changes are included in this PR?
    
    Move `Reader.footerOffset` into `FileMetadata`
    
    ### Are these changes tested?
    
    
    ### Are there any user-facing changes?
---
 parquet/file/file_reader.go      | 29 +++++++++++++----------------
 parquet/file/row_group_reader.go |  8 ++++----
 parquet/metadata/file.go         | 11 +++++++++++
 3 files changed, 28 insertions(+), 20 deletions(-)

diff --git a/parquet/file/file_reader.go b/parquet/file/file_reader.go
index c0b3445..c7ba813 100644
--- a/parquet/file/file_reader.go
+++ b/parquet/file/file_reader.go
@@ -47,7 +47,6 @@ type Reader struct {
        r             parquet.ReaderAtSeeker
        props         *parquet.ReaderProperties
        metadata      *metadata.FileMetaData
-       footerOffset  int64
        fileDecryptor encryption.FileDecryptor
 
        bufferPool sync.Pool
@@ -100,19 +99,11 @@ func OpenParquetFile(filename string, memoryMap bool, opts 
...ReadOption) (*Read
 // If no read properties are provided then the default ReaderProperties will 
be used. The WithMetadata
 // option can be used to provide a FileMetaData object rather than reading the 
file metadata from the file.
 func NewParquetReader(r parquet.ReaderAtSeeker, opts ...ReadOption) (*Reader, 
error) {
-       var err error
        f := &Reader{r: r}
        for _, o := range opts {
                o(f)
        }
 
-       if f.footerOffset <= 0 {
-               f.footerOffset, err = r.Seek(0, io.SeekEnd)
-               if err != nil {
-                       return nil, fmt.Errorf("parquet: could not retrieve 
footer offset: %w", err)
-               }
-       }
-
        if f.props == nil {
                f.props = parquet.NewReaderProperties(memory.NewGoAllocator())
        }
@@ -156,13 +147,18 @@ func (f *Reader) MetaData() *metadata.FileMetaData { 
return f.metadata }
 
 // parseMetaData handles parsing the metadata from the opened file.
 func (f *Reader) parseMetaData() error {
-       if f.footerOffset <= int64(footerSize) {
-               return fmt.Errorf("parquet: file too small (size=%d)", 
f.footerOffset)
+       footerOffset, err := f.r.Seek(0, io.SeekEnd)
+       if err != nil {
+               return fmt.Errorf("parquet: could not retrieve footer offset: 
%w", err)
+       }
+
+       if footerOffset <= int64(footerSize) {
+               return fmt.Errorf("parquet: file too small (size=%d)", 
footerOffset)
        }
 
        buf := make([]byte, footerSize)
        // backup 8 bytes to read the footer size (first four bytes) and the 
magic bytes (last 4 bytes)
-       n, err := f.r.ReadAt(buf, f.footerOffset-int64(footerSize))
+       n, err := f.r.ReadAt(buf, footerOffset-int64(footerSize))
        if err != nil && err != io.EOF {
                return fmt.Errorf("parquet: could not read footer: %w", err)
        }
@@ -171,7 +167,7 @@ func (f *Reader) parseMetaData() error {
        }
 
        size := int64(binary.LittleEndian.Uint32(buf[:4]))
-       if size < 0 || size+int64(footerSize) > f.footerOffset {
+       if size < 0 || size+int64(footerSize) > footerOffset {
                return errInconsistentFileMetadata
        }
 
@@ -180,7 +176,7 @@ func (f *Reader) parseMetaData() error {
        switch {
        case bytes.Equal(buf[4:], magicBytes): // non-encrypted metadata
                buf = make([]byte, size)
-               if _, err := f.r.ReadAt(buf, 
f.footerOffset-int64(footerSize)-size); err != nil {
+               if _, err := f.r.ReadAt(buf, 
footerOffset-int64(footerSize)-size); err != nil {
                        return fmt.Errorf("parquet: could not read footer: %w", 
err)
                }
 
@@ -188,6 +184,7 @@ func (f *Reader) parseMetaData() error {
                if err != nil {
                        return fmt.Errorf("parquet: could not read footer: %w", 
err)
                }
+               f.metadata.SetSourceFileSize(footerOffset)
 
                if !f.metadata.IsSetEncryptionAlgorithm() {
                        if fileDecryptProps != nil && 
!fileDecryptProps.PlaintextFilesAllowed() {
@@ -200,7 +197,7 @@ func (f *Reader) parseMetaData() error {
                }
        case bytes.Equal(buf[4:], magicEBytes): // encrypted metadata
                buf = make([]byte, size)
-               if _, err := f.r.ReadAt(buf, 
f.footerOffset-int64(footerSize)-size); err != nil {
+               if _, err := f.r.ReadAt(buf, 
footerOffset-int64(footerSize)-size); err != nil {
                        return fmt.Errorf("parquet: could not read footer: %w", 
err)
                }
 
@@ -223,6 +220,7 @@ func (f *Reader) parseMetaData() error {
                if err != nil {
                        return fmt.Errorf("parquet: could not read footer: %w", 
err)
                }
+               f.metadata.SetSourceFileSize(footerOffset)
        default:
                return fmt.Errorf("parquet: magic bytes not found in footer. 
Either the file is corrupted or this isn't a parquet file")
        }
@@ -310,7 +308,6 @@ func (f *Reader) RowGroup(i int) *RowGroupReader {
                rgMetadata:    metadata.NewRowGroupMetaData(rg, 
f.metadata.Schema, f.WriterVersion(), f.fileDecryptor),
                props:         f.props,
                r:             f.r,
-               sourceSz:      f.footerOffset,
                fileDecryptor: f.fileDecryptor,
                bufferPool:    &f.bufferPool,
        }
diff --git a/parquet/file/row_group_reader.go b/parquet/file/row_group_reader.go
index 9a3b242..f800d19 100644
--- a/parquet/file/row_group_reader.go
+++ b/parquet/file/row_group_reader.go
@@ -34,7 +34,6 @@ const (
 // RowGroupReader is the primary interface for reading a single row group
 type RowGroupReader struct {
        r             parquet.ReaderAtSeeker
-       sourceSz      int64
        fileMetadata  *metadata.FileMetaData
        rgMetadata    *metadata.RowGroupMetaData
        props         *parquet.ReaderProperties
@@ -85,16 +84,17 @@ func (r *RowGroupReader) GetColumnPageReader(i int) 
(PageReader, error) {
        colLen := col.TotalCompressedSize()
        // PARQUET-816 workaround for old files created by older parquet-mr
        if 
r.fileMetadata.WriterVersion().LessThan(metadata.Parquet816FixedVersion) {
+               sourceSz := r.fileMetadata.GetSourceFileSize()
                // The Parquet MR writer had a bug in 1.2.8 and below where it 
didn't include the
                // dictionary page header size in total_compressed_size and 
total_uncompressed_size
                // (see IMPALA-694). We add padding to compensate.
                if colStart < 0 || colLen < 0 {
                        return nil, fmt.Errorf("invalid column chunk metadata, 
offset (%d) and length (%d) should both be positive", colStart, colLen)
                }
-               if colStart > r.sourceSz || colLen > r.sourceSz {
-                       return nil, fmt.Errorf("invalid column chunk metadata, 
offset (%d) and length (%d) must both be less than total source size (%d)", 
colStart, colLen, r.sourceSz)
+               if colStart > sourceSz || colLen > sourceSz {
+                       return nil, fmt.Errorf("invalid column chunk metadata, 
offset (%d) and length (%d) must both be less than total source size (%d)", 
colStart, colLen, sourceSz)
                }
-               bytesRemain := r.sourceSz - (colStart + colLen)
+               bytesRemain := sourceSz - (colStart + colLen)
                padding := utils.Min(maxDictHeaderSize, bytesRemain)
                colLen += padding
        }
diff --git a/parquet/metadata/file.go b/parquet/metadata/file.go
index 070caae..c526a85 100644
--- a/parquet/metadata/file.go
+++ b/parquet/metadata/file.go
@@ -243,6 +243,10 @@ type FileMetaData struct {
        // size of the raw bytes of the metadata in the file which were
        // decoded by thrift, Size() getter returns the value.
        metadataLen int
+
+       // sourceFileSize is not a part of FileMetaData, but it is mainly used 
to parse meta data.
+       // Users can manually set this value and they are responsible for the 
validity of it.
+       sourceFileSize int64
 }
 
 // NewFileMetaData takes in the raw bytes of the serialized metadata to 
deserialize
@@ -275,6 +279,12 @@ func NewFileMetaData(data []byte, fileDecryptor 
encryption.FileDecryptor) (*File
 // Size is the length of the raw serialized metadata bytes in the footer
 func (f *FileMetaData) Size() int { return f.metadataLen }
 
+// GetSourceFileSize get the total size of the source file from meta data.
+func (f *FileMetaData) GetSourceFileSize() int64 { return f.sourceFileSize }
+
+// SetSourceFileSize set the total size of the source file in meta data.
+func (f *FileMetaData) SetSourceFileSize(sourceFileSize int64) { 
f.sourceFileSize = sourceFileSize }
+
 // NumSchemaElements is the length of the flattened schema list in the thrift
 func (f *FileMetaData) NumSchemaElements() int {
        return len(f.FileMetaData.Schema)
@@ -388,6 +398,7 @@ func (f *FileMetaData) Subset(rowGroups []int) 
(*FileMetaData, error) {
                f.FileDecryptor,
                f.version,
                0,
+               f.sourceFileSize,
        }
 
        out.RowGroups = make([]*format.RowGroup, 0, len(rowGroups))

Reply via email to