This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new e7158c6 ARROW-13984: [Go][Parquet] File readers
e7158c6 is described below
commit e7158c62ae43cbcea3f90c11dcbb40ffbbc94484
Author: Matthew Topol <[email protected]>
AuthorDate: Sat Oct 23 17:21:15 2021 -0400
ARROW-13984: [Go][Parquet] File readers
Looks like I merged #11146 before it finished sync'ing to the apache mirror
and was missing a few commits. Here's the missing ones.
Closes #11530 from zeroshade/goparquet-file
Authored-by: Matthew Topol <[email protected]>
Signed-off-by: Matthew Topol <[email protected]>
---
go/parquet/file/page_reader.go | 16 ++++++++++++----
go/parquet/file/row_group_reader.go | 10 ++++++++++
2 files changed, 22 insertions(+), 4 deletions(-)
diff --git a/go/parquet/file/page_reader.go b/go/parquet/file/page_reader.go
index 251499a..5c36b33 100644
--- a/go/parquet/file/page_reader.go
+++ b/go/parquet/file/page_reader.go
@@ -26,7 +26,6 @@ import (
"github.com/apache/arrow/go/arrow/memory"
"github.com/apache/arrow/go/parquet"
"github.com/apache/arrow/go/parquet/compress"
- "github.com/apache/arrow/go/parquet/internal/debug"
"github.com/apache/arrow/go/parquet/internal/encryption"
format "github.com/apache/arrow/go/parquet/internal/gen-go/parquet"
"github.com/apache/arrow/go/parquet/internal/thrift"
@@ -512,7 +511,10 @@ func (p *serializedPageReader) Next() bool {
p.err = err
return false
}
- debug.Assert(len(data) == lenUncompressed, "len(data)
!= lenUncompressed")
+ if len(data) != lenUncompressed {
+ p.err = xerrors.Errorf("parquet: metadata said
%d bytes uncompressed dictionary page, got %d bytes", lenUncompressed,
len(data))
+ return false
+ }
// p.buf.Resize(lenUncompressed)
// make dictionary page
@@ -540,7 +542,10 @@ func (p *serializedPageReader) Next() bool {
p.err = err
return false
}
- debug.Assert(len(data) == lenUncompressed, "len(data)
!= lenUncompressed")
+ if len(data) != lenUncompressed {
+ p.err = xerrors.Errorf("parquet: metadata said
%d bytes uncompressed data page, got %d bytes", lenUncompressed, len(data))
+ return false
+ }
// make datapagev1
p.curPage = &DataPageV1{
@@ -589,7 +594,10 @@ func (p *serializedPageReader) Next() bool {
io.ReadFull(p.r, p.buf.Bytes())
data = p.buf.Bytes()
}
- debug.Assert(len(data) == lenUncompressed, "len(data)
!= lenUncompressed")
+ if len(data) != lenUncompressed {
+ p.err = xerrors.Errorf("parquet: metadata said
%d bytes uncompressed data page, got %d bytes", lenUncompressed, len(data))
+ return false
+ }
// make datapage v2
p.curPage = &DataPageV2{
diff --git a/go/parquet/file/row_group_reader.go
b/go/parquet/file/row_group_reader.go
index 9c74a25..455144e 100644
--- a/go/parquet/file/row_group_reader.go
+++ b/go/parquet/file/row_group_reader.go
@@ -79,7 +79,17 @@ func (r *RowGroupReader) GetColumnPageReader(i int)
(PageReader, error) {
}
colLen := col.TotalCompressedSize()
+ // PARQUET-816 workaround for old files created by older parquet-mr
if
r.fileMetadata.WriterVersion().LessThan(metadata.Parquet816FixedVersion) {
+ // The Parquet MR writer had a bug in 1.2.8 and below where it
didn't include the
+ // dictionary page header size in total_compressed_size and
total_uncompressed_size
+ // (see IMPALA-694). We add padding to compensate.
+ if colStart < 0 || colLen < 0 {
+ return nil, xerrors.Errorf("invalid column chunk
metadata, offset (%d) and length (%d) should both be positive", colStart,
colLen)
+ }
+ if colStart > r.sourceSz || colLen > r.sourceSz {
+ return nil, xerrors.Errorf("invalid column chunk
metadata, offset (%d) and length (%d) must both be less than total source size
(%d)", colStart, colLen, r.sourceSz)
+ }
bytesRemain := r.sourceSz - (colStart + colLen)
padding := utils.Min(maxDictHeaderSize, bytesRemain)
colLen += padding