This is an automated email from the ASF dual-hosted git repository.

joellubi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new de19af928a GH-43359: [Go][Parquet] ReadRowGroups panics with canceled 
context (#43360)
de19af928a is described below

commit de19af928a83c09d12cd0f3d83932bfa8cca9b19
Author: Seb. V <[email protected]>
AuthorDate: Tue Jul 23 13:47:26 2024 +0200

    GH-43359: [Go][Parquet] ReadRowGroups panics with canceled context (#43360)
    
    ### Rationale for this change
    
    `ReadRowGroups` needs to support externally canceled contexts, e.g. for 
request-scoped contexts in servers like gRPC.
    
    ### What changes are included in this PR?
    
    Additionnaly, `releaseColumns` needs to ignore columns with uninitialized 
data as it used in a `defer` statement.
    
    ### Are these changes tested?
    
    Yes: a new test `TestArrowReaderCanceledContext` is included.
    
    ### Are there any user-facing changes?
    
    None
    * GitHub Issue: #43359
    
    Authored-by: sebdotv <[email protected]>
    Signed-off-by: Joel Lubinitsky <[email protected]>
---
 go/parquet/pqarrow/file_reader.go      |  5 +++++
 go/parquet/pqarrow/file_reader_test.go | 23 +++++++++++++++++++++++
 go/parquet/pqarrow/helpers.go          |  4 +++-
 3 files changed, 31 insertions(+), 1 deletion(-)

diff --git a/go/parquet/pqarrow/file_reader.go 
b/go/parquet/pqarrow/file_reader.go
index 208ac9ceeb..a2e84d9ce2 100755
--- a/go/parquet/pqarrow/file_reader.go
+++ b/go/parquet/pqarrow/file_reader.go
@@ -18,6 +18,7 @@ package pqarrow
 
 import (
        "context"
+       "errors"
        "fmt"
        "io"
        "sync"
@@ -375,6 +376,10 @@ func (fr *FileReader) ReadRowGroups(ctx context.Context, 
indices, rowGroups []in
                data.data.Release()
        }
 
+       // if the context is in error, but we haven't set an error yet, then it 
means that the parent context
+       // was cancelled. In this case, we should exit early as some columns 
may not have been read yet.
+       err = errors.Join(err, ctx.Err())
+
        if err != nil {
                // if we encountered an error, consume any waiting data on the 
channel
                // so the goroutines don't leak and so memory can get cleaned 
up. we already
diff --git a/go/parquet/pqarrow/file_reader_test.go 
b/go/parquet/pqarrow/file_reader_test.go
index b7d178f864..fe5a4547a7 100644
--- a/go/parquet/pqarrow/file_reader_test.go
+++ b/go/parquet/pqarrow/file_reader_test.go
@@ -167,6 +167,29 @@ func TestArrowReaderAdHocReadFloat16s(t *testing.T) {
        }
 }
 
+func TestArrowReaderCanceledContext(t *testing.T) {
+       dataDir := getDataDir()
+
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(t, 0)
+
+       filename := filepath.Join(dataDir, "int32_decimal.parquet")
+       require.FileExists(t, filename)
+
+       rdr, err := file.OpenParquetFile(filename, false, 
file.WithReadProps(parquet.NewReaderProperties(mem)))
+       require.NoError(t, err)
+       defer rdr.Close()
+       arrowRdr, err := pqarrow.NewFileReader(rdr, 
pqarrow.ArrowReadProperties{}, mem)
+       require.NoError(t, err)
+
+       // create a canceled context
+       ctx, cancel := context.WithCancel(context.Background())
+       cancel()
+
+       _, err = arrowRdr.ReadTable(ctx)
+       require.ErrorIs(t, err, context.Canceled)
+}
+
 func TestRecordReaderParallel(t *testing.T) {
        mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
        defer mem.AssertSize(t, 0)
diff --git a/go/parquet/pqarrow/helpers.go b/go/parquet/pqarrow/helpers.go
index 800cd84192..237de4366c 100644
--- a/go/parquet/pqarrow/helpers.go
+++ b/go/parquet/pqarrow/helpers.go
@@ -38,6 +38,8 @@ func releaseArrayData(data []arrow.ArrayData) {
 
 func releaseColumns(columns []arrow.Column) {
        for _, col := range columns {
-               col.Release()
+               if col.Data() != nil { // data can be nil due to the way 
columns are constructed in ReadRowGroups
+                       col.Release()
+               }
        }
 }

Reply via email to