This is an automated email from the ASF dual-hosted git repository.
joellubi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new de19af928a GH-43359: [Go][Parquet] ReadRowGroups panics with canceled
context (#43360)
de19af928a is described below
commit de19af928a83c09d12cd0f3d83932bfa8cca9b19
Author: Seb. V <[email protected]>
AuthorDate: Tue Jul 23 13:47:26 2024 +0200
GH-43359: [Go][Parquet] ReadRowGroups panics with canceled context (#43360)
### Rationale for this change
`ReadRowGroups` needs to support externally canceled contexts, e.g. for
request-scoped contexts in servers like gRPC.
### What changes are included in this PR?
Additionnaly, `releaseColumns` needs to ignore columns with uninitialized
data as it used in a `defer` statement.
### Are these changes tested?
Yes: a new test `TestArrowReaderCanceledContext` is included.
### Are there any user-facing changes?
None
* GitHub Issue: #43359
Authored-by: sebdotv <[email protected]>
Signed-off-by: Joel Lubinitsky <[email protected]>
---
go/parquet/pqarrow/file_reader.go | 5 +++++
go/parquet/pqarrow/file_reader_test.go | 23 +++++++++++++++++++++++
go/parquet/pqarrow/helpers.go | 4 +++-
3 files changed, 31 insertions(+), 1 deletion(-)
diff --git a/go/parquet/pqarrow/file_reader.go
b/go/parquet/pqarrow/file_reader.go
index 208ac9ceeb..a2e84d9ce2 100755
--- a/go/parquet/pqarrow/file_reader.go
+++ b/go/parquet/pqarrow/file_reader.go
@@ -18,6 +18,7 @@ package pqarrow
import (
"context"
+ "errors"
"fmt"
"io"
"sync"
@@ -375,6 +376,10 @@ func (fr *FileReader) ReadRowGroups(ctx context.Context,
indices, rowGroups []in
data.data.Release()
}
+ // if the context is in error, but we haven't set an error yet, then it
means that the parent context
+ // was cancelled. In this case, we should exit early as some columns
may not have been read yet.
+ err = errors.Join(err, ctx.Err())
+
if err != nil {
// if we encountered an error, consume any waiting data on the
channel
// so the goroutines don't leak and so memory can get cleaned
up. we already
diff --git a/go/parquet/pqarrow/file_reader_test.go
b/go/parquet/pqarrow/file_reader_test.go
index b7d178f864..fe5a4547a7 100644
--- a/go/parquet/pqarrow/file_reader_test.go
+++ b/go/parquet/pqarrow/file_reader_test.go
@@ -167,6 +167,29 @@ func TestArrowReaderAdHocReadFloat16s(t *testing.T) {
}
}
+func TestArrowReaderCanceledContext(t *testing.T) {
+ dataDir := getDataDir()
+
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(t, 0)
+
+ filename := filepath.Join(dataDir, "int32_decimal.parquet")
+ require.FileExists(t, filename)
+
+ rdr, err := file.OpenParquetFile(filename, false,
file.WithReadProps(parquet.NewReaderProperties(mem)))
+ require.NoError(t, err)
+ defer rdr.Close()
+ arrowRdr, err := pqarrow.NewFileReader(rdr,
pqarrow.ArrowReadProperties{}, mem)
+ require.NoError(t, err)
+
+ // create a canceled context
+ ctx, cancel := context.WithCancel(context.Background())
+ cancel()
+
+ _, err = arrowRdr.ReadTable(ctx)
+ require.ErrorIs(t, err, context.Canceled)
+}
+
func TestRecordReaderParallel(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)
diff --git a/go/parquet/pqarrow/helpers.go b/go/parquet/pqarrow/helpers.go
index 800cd84192..237de4366c 100644
--- a/go/parquet/pqarrow/helpers.go
+++ b/go/parquet/pqarrow/helpers.go
@@ -38,6 +38,8 @@ func releaseArrayData(data []arrow.ArrayData) {
func releaseColumns(columns []arrow.Column) {
for _, col := range columns {
- col.Release()
+ if col.Data() != nil { // data can be nil due to the way
columns are constructed in ReadRowGroups
+ col.Release()
+ }
}
}