This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new 244d47c feat(parquet): Move footerOffset into FileMetaData (#217)
244d47c is described below
commit 244d47c146e169315eefa1a18092b18c838e69cb
Author: Ruihao Chen <[email protected]>
AuthorDate: Sat Dec 14 05:35:37 2024 +0800
feat(parquet): Move footerOffset into FileMetaData (#217)
### Rationale for this change
After looking into the code, I found that `footerOffset` is mainly used
in parsing the meta data of parquet file.
I think we can store it in `FileMetaData`, so we don't need to get
`footerOffset` again when we use `WithMetadata`.
### What changes are included in this PR?
Move `Reader.footerOffset` into `FileMetadata`
### Are these changes tested?
### Are there any user-facing changes?
---
parquet/file/file_reader.go | 29 +++++++++++++----------------
parquet/file/row_group_reader.go | 8 ++++----
parquet/metadata/file.go | 11 +++++++++++
3 files changed, 28 insertions(+), 20 deletions(-)
diff --git a/parquet/file/file_reader.go b/parquet/file/file_reader.go
index c0b3445..c7ba813 100644
--- a/parquet/file/file_reader.go
+++ b/parquet/file/file_reader.go
@@ -47,7 +47,6 @@ type Reader struct {
r parquet.ReaderAtSeeker
props *parquet.ReaderProperties
metadata *metadata.FileMetaData
- footerOffset int64
fileDecryptor encryption.FileDecryptor
bufferPool sync.Pool
@@ -100,19 +99,11 @@ func OpenParquetFile(filename string, memoryMap bool, opts
...ReadOption) (*Read
// If no read properties are provided then the default ReaderProperties will
be used. The WithMetadata
// option can be used to provide a FileMetaData object rather than reading the
file metadata from the file.
func NewParquetReader(r parquet.ReaderAtSeeker, opts ...ReadOption) (*Reader,
error) {
- var err error
f := &Reader{r: r}
for _, o := range opts {
o(f)
}
- if f.footerOffset <= 0 {
- f.footerOffset, err = r.Seek(0, io.SeekEnd)
- if err != nil {
- return nil, fmt.Errorf("parquet: could not retrieve
footer offset: %w", err)
- }
- }
-
if f.props == nil {
f.props = parquet.NewReaderProperties(memory.NewGoAllocator())
}
@@ -156,13 +147,18 @@ func (f *Reader) MetaData() *metadata.FileMetaData {
return f.metadata }
// parseMetaData handles parsing the metadata from the opened file.
func (f *Reader) parseMetaData() error {
- if f.footerOffset <= int64(footerSize) {
- return fmt.Errorf("parquet: file too small (size=%d)",
f.footerOffset)
+ footerOffset, err := f.r.Seek(0, io.SeekEnd)
+ if err != nil {
+ return fmt.Errorf("parquet: could not retrieve footer offset:
%w", err)
+ }
+
+ if footerOffset <= int64(footerSize) {
+ return fmt.Errorf("parquet: file too small (size=%d)",
footerOffset)
}
buf := make([]byte, footerSize)
// backup 8 bytes to read the footer size (first four bytes) and the
magic bytes (last 4 bytes)
- n, err := f.r.ReadAt(buf, f.footerOffset-int64(footerSize))
+ n, err := f.r.ReadAt(buf, footerOffset-int64(footerSize))
if err != nil && err != io.EOF {
return fmt.Errorf("parquet: could not read footer: %w", err)
}
@@ -171,7 +167,7 @@ func (f *Reader) parseMetaData() error {
}
size := int64(binary.LittleEndian.Uint32(buf[:4]))
- if size < 0 || size+int64(footerSize) > f.footerOffset {
+ if size < 0 || size+int64(footerSize) > footerOffset {
return errInconsistentFileMetadata
}
@@ -180,7 +176,7 @@ func (f *Reader) parseMetaData() error {
switch {
case bytes.Equal(buf[4:], magicBytes): // non-encrypted metadata
buf = make([]byte, size)
- if _, err := f.r.ReadAt(buf,
f.footerOffset-int64(footerSize)-size); err != nil {
+ if _, err := f.r.ReadAt(buf,
footerOffset-int64(footerSize)-size); err != nil {
return fmt.Errorf("parquet: could not read footer: %w",
err)
}
@@ -188,6 +184,7 @@ func (f *Reader) parseMetaData() error {
if err != nil {
return fmt.Errorf("parquet: could not read footer: %w",
err)
}
+ f.metadata.SetSourceFileSize(footerOffset)
if !f.metadata.IsSetEncryptionAlgorithm() {
if fileDecryptProps != nil &&
!fileDecryptProps.PlaintextFilesAllowed() {
@@ -200,7 +197,7 @@ func (f *Reader) parseMetaData() error {
}
case bytes.Equal(buf[4:], magicEBytes): // encrypted metadata
buf = make([]byte, size)
- if _, err := f.r.ReadAt(buf,
f.footerOffset-int64(footerSize)-size); err != nil {
+ if _, err := f.r.ReadAt(buf,
footerOffset-int64(footerSize)-size); err != nil {
return fmt.Errorf("parquet: could not read footer: %w",
err)
}
@@ -223,6 +220,7 @@ func (f *Reader) parseMetaData() error {
if err != nil {
return fmt.Errorf("parquet: could not read footer: %w",
err)
}
+ f.metadata.SetSourceFileSize(footerOffset)
default:
return fmt.Errorf("parquet: magic bytes not found in footer.
Either the file is corrupted or this isn't a parquet file")
}
@@ -310,7 +308,6 @@ func (f *Reader) RowGroup(i int) *RowGroupReader {
rgMetadata: metadata.NewRowGroupMetaData(rg,
f.metadata.Schema, f.WriterVersion(), f.fileDecryptor),
props: f.props,
r: f.r,
- sourceSz: f.footerOffset,
fileDecryptor: f.fileDecryptor,
bufferPool: &f.bufferPool,
}
diff --git a/parquet/file/row_group_reader.go b/parquet/file/row_group_reader.go
index 9a3b242..f800d19 100644
--- a/parquet/file/row_group_reader.go
+++ b/parquet/file/row_group_reader.go
@@ -34,7 +34,6 @@ const (
// RowGroupReader is the primary interface for reading a single row group
type RowGroupReader struct {
r parquet.ReaderAtSeeker
- sourceSz int64
fileMetadata *metadata.FileMetaData
rgMetadata *metadata.RowGroupMetaData
props *parquet.ReaderProperties
@@ -85,16 +84,17 @@ func (r *RowGroupReader) GetColumnPageReader(i int)
(PageReader, error) {
colLen := col.TotalCompressedSize()
// PARQUET-816 workaround for old files created by older parquet-mr
if
r.fileMetadata.WriterVersion().LessThan(metadata.Parquet816FixedVersion) {
+ sourceSz := r.fileMetadata.GetSourceFileSize()
// The Parquet MR writer had a bug in 1.2.8 and below where it
didn't include the
// dictionary page header size in total_compressed_size and
total_uncompressed_size
// (see IMPALA-694). We add padding to compensate.
if colStart < 0 || colLen < 0 {
return nil, fmt.Errorf("invalid column chunk metadata,
offset (%d) and length (%d) should both be positive", colStart, colLen)
}
- if colStart > r.sourceSz || colLen > r.sourceSz {
- return nil, fmt.Errorf("invalid column chunk metadata,
offset (%d) and length (%d) must both be less than total source size (%d)",
colStart, colLen, r.sourceSz)
+ if colStart > sourceSz || colLen > sourceSz {
+ return nil, fmt.Errorf("invalid column chunk metadata,
offset (%d) and length (%d) must both be less than total source size (%d)",
colStart, colLen, sourceSz)
}
- bytesRemain := r.sourceSz - (colStart + colLen)
+ bytesRemain := sourceSz - (colStart + colLen)
padding := utils.Min(maxDictHeaderSize, bytesRemain)
colLen += padding
}
diff --git a/parquet/metadata/file.go b/parquet/metadata/file.go
index 070caae..c526a85 100644
--- a/parquet/metadata/file.go
+++ b/parquet/metadata/file.go
@@ -243,6 +243,10 @@ type FileMetaData struct {
// size of the raw bytes of the metadata in the file which were
// decoded by thrift, Size() getter returns the value.
metadataLen int
+
+ // sourceFileSize is not a part of FileMetaData, but it is mainly used
to parse meta data.
+ // Users can manually set this value and they are responsible for the
validity of it.
+ sourceFileSize int64
}
// NewFileMetaData takes in the raw bytes of the serialized metadata to
deserialize
@@ -275,6 +279,12 @@ func NewFileMetaData(data []byte, fileDecryptor
encryption.FileDecryptor) (*File
// Size is the length of the raw serialized metadata bytes in the footer
func (f *FileMetaData) Size() int { return f.metadataLen }
+// GetSourceFileSize get the total size of the source file from meta data.
+func (f *FileMetaData) GetSourceFileSize() int64 { return f.sourceFileSize }
+
+// SetSourceFileSize set the total size of the source file in meta data.
+func (f *FileMetaData) SetSourceFileSize(sourceFileSize int64) {
f.sourceFileSize = sourceFileSize }
+
// NumSchemaElements is the length of the flattened schema list in the thrift
func (f *FileMetaData) NumSchemaElements() int {
return len(f.FileMetaData.Schema)
@@ -388,6 +398,7 @@ func (f *FileMetaData) Subset(rowGroups []int)
(*FileMetaData, error) {
f.FileDecryptor,
f.version,
0,
+ f.sourceFileSize,
}
out.RowGroups = make([]*format.RowGroup, 0, len(rowGroups))