This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new 5ce5806 [Parquet] Recover from panic in file reader (#124)
5ce5806 is described below
commit 5ce580609ad3715936909e3dfb5c27dc2fb4332d
Author: Matt Topol <[email protected]>
AuthorDate: Fri Sep 13 14:20:41 2024 -0400
[Parquet] Recover from panic in file reader (#124)
see https://github.com/apache/arrow/pull/43607
original author: @don4get
---
ci/scripts/test.sh | 1 +
parquet/internal/utils/bit_packing_avx2_amd64.go | 5 ++--
parquet/internal/utils/bit_packing_neon_arm64.go | 5 ++--
parquet/pqarrow/file_reader.go | 8 ++++++
parquet/pqarrow/file_reader_test.go | 34 ++++++++++++++++++++++++
5 files changed, 47 insertions(+), 6 deletions(-)
diff --git a/ci/scripts/test.sh b/ci/scripts/test.sh
index 9edfab8..9607e30 100755
--- a/ci/scripts/test.sh
+++ b/ci/scripts/test.sh
@@ -65,6 +65,7 @@ go test "${test_args[@]}" -tags ${tags},noasm ./...
popd
export PARQUET_TEST_DATA=${1}/parquet-testing/data
+export PARQUET_TEST_BAD_DATA=${1}/parquet-testing/bad_data
export ARROW_TEST_DATA=${1}/arrow-testing/data
pushd "${source_dir}/parquet"
diff --git a/parquet/internal/utils/bit_packing_avx2_amd64.go
b/parquet/internal/utils/bit_packing_avx2_amd64.go
index 0455ccc..5f1923f 100644
--- a/parquet/internal/utils/bit_packing_avx2_amd64.go
+++ b/parquet/internal/utils/bit_packing_avx2_amd64.go
@@ -33,12 +33,11 @@ func _unpack32_avx2(in, out unsafe.Pointer, batchSize,
nbits int) (num int)
func unpack32Avx2(in io.Reader, out []uint32, nbits int) int {
batch := len(out) / 32 * 32
- if batch <= 0 {
+ n := batch * nbits / 8
+ if n <= 0 {
return 0
}
- n := batch * nbits / 8
-
buffer := bufferPool.Get().(*bytes.Buffer)
defer bufferPool.Put(buffer)
buffer.Reset()
diff --git a/parquet/internal/utils/bit_packing_neon_arm64.go
b/parquet/internal/utils/bit_packing_neon_arm64.go
index 09154e3..580f9a1 100755
--- a/parquet/internal/utils/bit_packing_neon_arm64.go
+++ b/parquet/internal/utils/bit_packing_neon_arm64.go
@@ -33,12 +33,11 @@ func _unpack32_neon(in, out unsafe.Pointer, batchSize,
nbits int) (num int)
func unpack32NEON(in io.Reader, out []uint32, nbits int) int {
batch := len(out) / 32 * 32
- if batch <= 0 {
+ n := batch * nbits / 8
+ if n <= 0 {
return 0
}
- n := batch * nbits / 8
-
buffer := bufferPool.Get().(*bytes.Buffer)
defer bufferPool.Put(buffer)
buffer.Reset()
diff --git a/parquet/pqarrow/file_reader.go b/parquet/pqarrow/file_reader.go
index 393215b..c9a83b8 100755
--- a/parquet/pqarrow/file_reader.go
+++ b/parquet/pqarrow/file_reader.go
@@ -28,6 +28,7 @@ import (
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/arrio"
"github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/arrow-go/v18/internal/utils"
"github.com/apache/arrow-go/v18/parquet"
"github.com/apache/arrow-go/v18/parquet/file"
"github.com/apache/arrow-go/v18/parquet/schema"
@@ -332,6 +333,13 @@ func (fr *FileReader) ReadRowGroups(ctx context.Context,
indices, rowGroups []in
for i := 0; i < np; i++ {
go func() {
defer wg.Done()
+ defer func() {
+ if pErr := recover(); pErr != nil {
+ err :=
utils.FormatRecoveredError("panic while reading", pErr)
+ results <- resultPair{err: err}
+ }
+ }()
+
for {
select {
case r, ok := <-ch:
diff --git a/parquet/pqarrow/file_reader_test.go
b/parquet/pqarrow/file_reader_test.go
index 61100ac..9010927 100644
--- a/parquet/pqarrow/file_reader_test.go
+++ b/parquet/pqarrow/file_reader_test.go
@@ -22,6 +22,7 @@ import (
"fmt"
"io"
"os"
+ "path"
"path/filepath"
"strings"
"testing"
@@ -373,3 +374,36 @@ func TestFileReaderColumnChunkBoundsErrors(t *testing.T) {
assert.ErrorContains(t, tooHighErr, fmt.Sprintf("there are only
%d columns", schema.NumFields()))
}
}
+
+func TestReadParquetFile(t *testing.T) {
+ dir := os.Getenv("PARQUET_TEST_BAD_DATA")
+ if dir == "" {
+ t.Skip("no path supplied with PARQUET_TEST_BAD_DATA")
+ }
+ assert.DirExists(t, dir)
+ filename := path.Join(dir, "ARROW-GH-43605.parquet")
+ ctx := context.TODO()
+
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+
+ rdr, err := file.OpenParquetFile(
+ filename,
+ false,
+ file.WithReadProps(parquet.NewReaderProperties(mem)),
+ )
+ require.NoError(t, err)
+ defer func() {
+ if err2 := rdr.Close(); err2 != nil {
+ t.Errorf("unexpected error: %v", err2)
+ }
+ }()
+
+ arrowRdr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{
+ Parallel: false,
+ BatchSize: 0,
+ }, mem)
+ require.NoError(t, err)
+
+ _, err = arrowRdr.ReadTable(ctx)
+ assert.NoError(t, err)
+}