This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new e7158c6  ARROW-13984: [Go][Parquet] File readers
e7158c6 is described below

commit e7158c62ae43cbcea3f90c11dcbb40ffbbc94484
Author: Matthew Topol <[email protected]>
AuthorDate: Sat Oct 23 17:21:15 2021 -0400

    ARROW-13984: [Go][Parquet] File readers
    
    Looks like I merged #11146 before it finished sync'ing to the apache mirror 
and was missing a few commits. Here's the missing ones.
    
    Closes #11530 from zeroshade/goparquet-file
    
    Authored-by: Matthew Topol <[email protected]>
    Signed-off-by: Matthew Topol <[email protected]>
---
 go/parquet/file/page_reader.go      | 16 ++++++++++++----
 go/parquet/file/row_group_reader.go | 10 ++++++++++
 2 files changed, 22 insertions(+), 4 deletions(-)

diff --git a/go/parquet/file/page_reader.go b/go/parquet/file/page_reader.go
index 251499a..5c36b33 100644
--- a/go/parquet/file/page_reader.go
+++ b/go/parquet/file/page_reader.go
@@ -26,7 +26,6 @@ import (
        "github.com/apache/arrow/go/arrow/memory"
        "github.com/apache/arrow/go/parquet"
        "github.com/apache/arrow/go/parquet/compress"
-       "github.com/apache/arrow/go/parquet/internal/debug"
        "github.com/apache/arrow/go/parquet/internal/encryption"
        format "github.com/apache/arrow/go/parquet/internal/gen-go/parquet"
        "github.com/apache/arrow/go/parquet/internal/thrift"
@@ -512,7 +511,10 @@ func (p *serializedPageReader) Next() bool {
                                p.err = err
                                return false
                        }
-                       debug.Assert(len(data) == lenUncompressed, "len(data) 
!= lenUncompressed")
+                       if len(data) != lenUncompressed {
+                               p.err = xerrors.Errorf("parquet: metadata said 
%d bytes uncompressed dictionary page, got %d bytes", lenUncompressed, 
len(data))
+                               return false
+                       }
 
                        // p.buf.Resize(lenUncompressed)
                        // make dictionary page
@@ -540,7 +542,10 @@ func (p *serializedPageReader) Next() bool {
                                p.err = err
                                return false
                        }
-                       debug.Assert(len(data) == lenUncompressed, "len(data) 
!= lenUncompressed")
+                       if len(data) != lenUncompressed {
+                               p.err = xerrors.Errorf("parquet: metadata said 
%d bytes uncompressed data page, got %d bytes", lenUncompressed, len(data))
+                               return false
+                       }
 
                        // make datapagev1
                        p.curPage = &DataPageV1{
@@ -589,7 +594,10 @@ func (p *serializedPageReader) Next() bool {
                                io.ReadFull(p.r, p.buf.Bytes())
                                data = p.buf.Bytes()
                        }
-                       debug.Assert(len(data) == lenUncompressed, "len(data) 
!= lenUncompressed")
+                       if len(data) != lenUncompressed {
+                               p.err = xerrors.Errorf("parquet: metadata said 
%d bytes uncompressed data page, got %d bytes", lenUncompressed, len(data))
+                               return false
+                       }
 
                        // make datapage v2
                        p.curPage = &DataPageV2{
diff --git a/go/parquet/file/row_group_reader.go 
b/go/parquet/file/row_group_reader.go
index 9c74a25..455144e 100644
--- a/go/parquet/file/row_group_reader.go
+++ b/go/parquet/file/row_group_reader.go
@@ -79,7 +79,17 @@ func (r *RowGroupReader) GetColumnPageReader(i int) 
(PageReader, error) {
        }
 
        colLen := col.TotalCompressedSize()
+       // PARQUET-816 workaround for old files created by older parquet-mr
        if 
r.fileMetadata.WriterVersion().LessThan(metadata.Parquet816FixedVersion) {
+               // The Parquet MR writer had a bug in 1.2.8 and below where it 
didn't include the
+               // dictionary page header size in total_compressed_size and 
total_uncompressed_size
+               // (see IMPALA-694). We add padding to compensate.
+               if colStart < 0 || colLen < 0 {
+                       return nil, xerrors.Errorf("invalid column chunk 
metadata, offset (%d) and length (%d) should both be positive", colStart, 
colLen)
+               }
+               if colStart > r.sourceSz || colLen > r.sourceSz {
+                       return nil, xerrors.Errorf("invalid column chunk 
metadata, offset (%d) and length (%d) must both be less than total source size 
(%d)", colStart, colLen, r.sourceSz)
+               }
                bytesRemain := r.sourceSz - (colStart + colLen)
                padding := utils.Min(maxDictHeaderSize, bytesRemain)
                colLen += padding

Reply via email to