This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 5ce5806  [Parquet] Recover from panic in file reader (#124)
5ce5806 is described below

commit 5ce580609ad3715936909e3dfb5c27dc2fb4332d
Author: Matt Topol <[email protected]>
AuthorDate: Fri Sep 13 14:20:41 2024 -0400

    [Parquet] Recover from panic in file reader (#124)
    
    see https://github.com/apache/arrow/pull/43607
    
    original author: @don4get
---
 ci/scripts/test.sh                               |  1 +
 parquet/internal/utils/bit_packing_avx2_amd64.go |  5 ++--
 parquet/internal/utils/bit_packing_neon_arm64.go |  5 ++--
 parquet/pqarrow/file_reader.go                   |  8 ++++++
 parquet/pqarrow/file_reader_test.go              | 34 ++++++++++++++++++++++++
 5 files changed, 47 insertions(+), 6 deletions(-)

diff --git a/ci/scripts/test.sh b/ci/scripts/test.sh
index 9edfab8..9607e30 100755
--- a/ci/scripts/test.sh
+++ b/ci/scripts/test.sh
@@ -65,6 +65,7 @@ go test "${test_args[@]}" -tags ${tags},noasm ./...
 popd
 
 export PARQUET_TEST_DATA=${1}/parquet-testing/data
+export PARQUET_TEST_BAD_DATA=${1}/parquet-testing/bad_data
 export ARROW_TEST_DATA=${1}/arrow-testing/data
 pushd "${source_dir}/parquet"
 
diff --git a/parquet/internal/utils/bit_packing_avx2_amd64.go 
b/parquet/internal/utils/bit_packing_avx2_amd64.go
index 0455ccc..5f1923f 100644
--- a/parquet/internal/utils/bit_packing_avx2_amd64.go
+++ b/parquet/internal/utils/bit_packing_avx2_amd64.go
@@ -33,12 +33,11 @@ func _unpack32_avx2(in, out unsafe.Pointer, batchSize, 
nbits int) (num int)
 
 func unpack32Avx2(in io.Reader, out []uint32, nbits int) int {
        batch := len(out) / 32 * 32
-       if batch <= 0 {
+       n := batch * nbits / 8
+       if n <= 0 {
                return 0
        }
 
-       n := batch * nbits / 8
-
        buffer := bufferPool.Get().(*bytes.Buffer)
        defer bufferPool.Put(buffer)
        buffer.Reset()
diff --git a/parquet/internal/utils/bit_packing_neon_arm64.go 
b/parquet/internal/utils/bit_packing_neon_arm64.go
index 09154e3..580f9a1 100755
--- a/parquet/internal/utils/bit_packing_neon_arm64.go
+++ b/parquet/internal/utils/bit_packing_neon_arm64.go
@@ -33,12 +33,11 @@ func _unpack32_neon(in, out unsafe.Pointer, batchSize, 
nbits int) (num int)
 
 func unpack32NEON(in io.Reader, out []uint32, nbits int) int {
        batch := len(out) / 32 * 32
-       if batch <= 0 {
+       n := batch * nbits / 8
+       if n <= 0 {
                return 0
        }
 
-       n := batch * nbits / 8
-
        buffer := bufferPool.Get().(*bytes.Buffer)
        defer bufferPool.Put(buffer)
        buffer.Reset()
diff --git a/parquet/pqarrow/file_reader.go b/parquet/pqarrow/file_reader.go
index 393215b..c9a83b8 100755
--- a/parquet/pqarrow/file_reader.go
+++ b/parquet/pqarrow/file_reader.go
@@ -28,6 +28,7 @@ import (
        "github.com/apache/arrow-go/v18/arrow/array"
        "github.com/apache/arrow-go/v18/arrow/arrio"
        "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/arrow-go/v18/internal/utils"
        "github.com/apache/arrow-go/v18/parquet"
        "github.com/apache/arrow-go/v18/parquet/file"
        "github.com/apache/arrow-go/v18/parquet/schema"
@@ -332,6 +333,13 @@ func (fr *FileReader) ReadRowGroups(ctx context.Context, 
indices, rowGroups []in
        for i := 0; i < np; i++ {
                go func() {
                        defer wg.Done()
+                       defer func() {
+                               if pErr := recover(); pErr != nil {
+                                       err := 
utils.FormatRecoveredError("panic while reading", pErr)
+                                       results <- resultPair{err: err}
+                               }
+                       }()
+
                        for {
                                select {
                                case r, ok := <-ch:
diff --git a/parquet/pqarrow/file_reader_test.go 
b/parquet/pqarrow/file_reader_test.go
index 61100ac..9010927 100644
--- a/parquet/pqarrow/file_reader_test.go
+++ b/parquet/pqarrow/file_reader_test.go
@@ -22,6 +22,7 @@ import (
        "fmt"
        "io"
        "os"
+       "path"
        "path/filepath"
        "strings"
        "testing"
@@ -373,3 +374,36 @@ func TestFileReaderColumnChunkBoundsErrors(t *testing.T) {
                assert.ErrorContains(t, tooHighErr, fmt.Sprintf("there are only 
%d columns", schema.NumFields()))
        }
 }
+
+func TestReadParquetFile(t *testing.T) {
+       dir := os.Getenv("PARQUET_TEST_BAD_DATA")
+       if dir == "" {
+               t.Skip("no path supplied with PARQUET_TEST_BAD_DATA")
+       }
+       assert.DirExists(t, dir)
+       filename := path.Join(dir, "ARROW-GH-43605.parquet")
+       ctx := context.TODO()
+
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+
+       rdr, err := file.OpenParquetFile(
+               filename,
+               false,
+               file.WithReadProps(parquet.NewReaderProperties(mem)),
+       )
+       require.NoError(t, err)
+       defer func() {
+               if err2 := rdr.Close(); err2 != nil {
+                       t.Errorf("unexpected error: %v", err2)
+               }
+       }()
+
+       arrowRdr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{
+               Parallel:  false,
+               BatchSize: 0,
+       }, mem)
+       require.NoError(t, err)
+
+       _, err = arrowRdr.ReadTable(ctx)
+       assert.NoError(t, err)
+}

Reply via email to