This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 7d9f3812 feat: Support setting IPC options in FlightSQL call options 
(#674)
7d9f3812 is described below

commit 7d9f38123934afb7c92567f06078dbecc3d75fad
Author: William <[email protected]>
AuthorDate: Fri Feb 27 10:00:04 2026 -0800

    feat: Support setting IPC options in FlightSQL call options (#674)
    
    ### Rationale for this change
    
    I would like to specify the IPC stream compression settings for the
    FlightSQL `ExecuteIngest` command. Currently, there is no way to apply
    IPC options to the writer stream from the `ExecuteIngest` method.
    
    ### What changes are included in this PR?
    
    * Introduces a `ExecuteIngestWithIPC` which allows passing IPC options
    * Retains the existing behaviour of `ExecuteIngest`
    * Consolidates the shared execute ingest behaviour into a private
    `executeIngest`
    
    ### Are these changes tested?
    
    * Tested with new unit tests to validate the record batch frames are
    sent with LZ4 compression when the `ipc.WithLZ4()` option is passed in
    the call options
    
    ### Are there any user-facing changes?
    
    * Yes - introduces a new public method `ExecuteIngestWithIPC`
---
 arrow/flight/flightsql/client.go      |  17 +++-
 arrow/flight/flightsql/client_test.go | 145 ++++++++++++++++++++++++++++++++++
 2 files changed, 161 insertions(+), 1 deletion(-)

diff --git a/arrow/flight/flightsql/client.go b/arrow/flight/flightsql/client.go
index 2610da39..192cec7b 100644
--- a/arrow/flight/flightsql/client.go
+++ b/arrow/flight/flightsql/client.go
@@ -247,6 +247,18 @@ func (c *Client) ExecuteSubstraitUpdate(ctx 
context.Context, plan SubstraitPlan,
 // The provided RecordReader will be retained for the duration of the call, 
but it is the caller's
 // responsibility to release the original reference.
 func (c *Client) ExecuteIngest(ctx context.Context, rdr array.RecordReader, 
reqOptions *ExecuteIngestOpts, opts ...grpc.CallOption) (int64, error) {
+       return c.executeIngest(ctx, rdr, reqOptions, nil, opts...)
+}
+
+// ExecuteIngestWithIPC is like ExecuteIngest, and also allows configuring IPC
+// stream writer options such as compression.
+// The provided RecordReader will be retained for the duration of the call, 
but it is the caller's
+// responsibility to release the original reference.
+func (c *Client) ExecuteIngestWithIPC(ctx context.Context, rdr 
array.RecordReader, reqOptions *ExecuteIngestOpts, ipcOpts []ipc.Option, opts 
...grpc.CallOption) (int64, error) {
+       return c.executeIngest(ctx, rdr, reqOptions, ipcOpts, opts...)
+}
+
+func (c *Client) executeIngest(ctx context.Context, rdr array.RecordReader, 
reqOptions *ExecuteIngestOpts, ipcOpts []ipc.Option, opts ...grpc.CallOption) 
(int64, error) {
        var (
                err          error
                desc         *flight.FlightDescriptor
@@ -274,7 +286,10 @@ func (c *Client) ExecuteIngest(ctx context.Context, rdr 
array.RecordReader, reqO
                return 0, err
        }
 
-       wr = flight.NewRecordWriter(stream, ipc.WithAllocator(c.Alloc), 
ipc.WithSchema(rdr.Schema()))
+       writerOpts := make([]ipc.Option, 0, 2+len(ipcOpts))
+       writerOpts = append(writerOpts, ipc.WithAllocator(c.Alloc), 
ipc.WithSchema(rdr.Schema()))
+       writerOpts = append(writerOpts, ipcOpts...)
+       wr = flight.NewRecordWriter(stream, writerOpts...)
        defer wr.Close()
 
        wr.SetFlightDescriptor(desc)
diff --git a/arrow/flight/flightsql/client_test.go 
b/arrow/flight/flightsql/client_test.go
index a8ba228b..858526ea 100644
--- a/arrow/flight/flightsql/client_test.go
+++ b/arrow/flight/flightsql/client_test.go
@@ -27,7 +27,10 @@ import (
        "github.com/apache/arrow-go/v18/arrow/flight"
        "github.com/apache/arrow-go/v18/arrow/flight/flightsql"
        pb "github.com/apache/arrow-go/v18/arrow/flight/gen/flight"
+       "github.com/apache/arrow-go/v18/arrow/internal/flatbuf"
+       "github.com/apache/arrow-go/v18/arrow/ipc"
        "github.com/apache/arrow-go/v18/arrow/memory"
+       flatbuffers "github.com/google/flatbuffers/go"
        "github.com/stretchr/testify/mock"
        "github.com/stretchr/testify/suite"
        "google.golang.org/grpc"
@@ -163,6 +166,7 @@ func getAction(cmd proto.Message) *flight.Action {
 func (s *FlightSqlClientSuite) SetupTest() {
        s.mockClient = FlightServiceClientMock{}
        s.sqlClient.Client = &s.mockClient
+       s.sqlClient.Alloc = memory.DefaultAllocator
        s.callOpts = []grpc.CallOption{grpc.EmptyCallOption{}}
 }
 
@@ -649,6 +653,147 @@ func (s *FlightSqlClientSuite) TestExecuteUpdate() {
        s.EqualValues(100, num)
 }
 
+func (s *FlightSqlClientSuite) TestExecuteIngestWithIPCOptions() {
+       schema := arrow.NewSchema([]arrow.Field{{Name: "id", Type: 
arrow.PrimitiveTypes.Int64}}, nil)
+       rec, _, err := array.RecordFromJSON(memory.DefaultAllocator, schema, 
strings.NewReader(`[{"id": 1}]`))
+       s.Require().NoError(err)
+       defer rec.Release()
+
+       rdr, err := array.NewRecordReader(schema, []arrow.RecordBatch{rec})
+       s.Require().NoError(err)
+       defer rdr.Release()
+
+       request := &flightsql.ExecuteIngestOpts{
+               Table:                  "target_table",
+               TableDefinitionOptions: &flightsql.TableDefinitionOptions{},
+       }
+
+       result := &pb.DoPutUpdateResult{RecordCount: 1}
+       resdata, _ := proto.Marshal(result)
+
+       mockedPut := &mockDoPutClient{}
+       defer mockedPut.AssertExpectations(s.T())
+
+       var sent []*flight.FlightData
+       mockedPut.On("Send", 
mock.AnythingOfType("*flight.FlightData")).Run(func(args mock.Arguments) {
+               sent = append(sent, 
proto.Clone(args.Get(0).(*flight.FlightData)).(*flight.FlightData))
+       }).Return(nil)
+       mockedPut.On("CloseSend").Return(nil)
+       mockedPut.On("Recv").Return(&pb.PutResult{AppMetadata: resdata}, 
nil).Once()
+       mockedPut.On("Recv").Return(&pb.PutResult{}, io.EOF).Once()
+
+       s.mockClient.On("DoPut", s.callOpts).Return(mockedPut, nil)
+
+       count, err := s.sqlClient.ExecuteIngestWithIPC(
+               context.Background(),
+               rdr,
+               request,
+               []ipc.Option{ipc.WithLZ4()},
+               s.callOpts...,
+       )
+       s.Require().NoError(err)
+       s.EqualValues(1, count)
+
+       var rbCompression *flatbuf.BodyCompression
+       for _, fd := range sent {
+               if len(fd.DataHeader) == 0 {
+                       continue
+               }
+
+               msg := flatbuf.GetRootAsMessage(fd.DataHeader, 0)
+               if msg.HeaderType() != flatbuf.MessageHeaderRecordBatch {
+                       continue
+               }
+
+               var header flatbuffers.Table
+               if !msg.Header(&header) {
+                       continue
+               }
+
+               var batch flatbuf.RecordBatch
+               batch.Init(header.Bytes, header.Pos)
+               rbCompression = batch.Compression(nil)
+               break
+       }
+
+       if s.NotNil(rbCompression, "record batch should include compression 
metadata") {
+               s.Equal(flatbuf.CompressionTypeLZ4_FRAME, rbCompression.Codec())
+       }
+}
+
+func (s *FlightSqlClientSuite) TestExecuteIngestWithSchemaOverrideOption() {
+       dataSchema := arrow.NewSchema([]arrow.Field{{Name: "id", Type: 
arrow.PrimitiveTypes.Int64}}, nil)
+       overrideSchema := arrow.NewSchema([]arrow.Field{{Name: "name", Type: 
arrow.BinaryTypes.String}}, nil)
+
+       rec, _, err := array.RecordFromJSON(memory.DefaultAllocator, 
dataSchema, strings.NewReader(`[{"id": 1}]`))
+       s.Require().NoError(err)
+       defer rec.Release()
+
+       rdr, err := array.NewRecordReader(dataSchema, []arrow.RecordBatch{rec})
+       s.Require().NoError(err)
+       defer rdr.Release()
+
+       request := &flightsql.ExecuteIngestOpts{
+               Table:                  "target_table",
+               TableDefinitionOptions: &flightsql.TableDefinitionOptions{},
+       }
+
+       mockedPut := &mockDoPutClient{}
+       defer mockedPut.AssertExpectations(s.T())
+       mockedPut.On("Send", 
mock.AnythingOfType("*flight.FlightData")).Return(nil)
+
+       s.mockClient.On("DoPut", s.callOpts).Return(mockedPut, nil)
+
+       _, err = s.sqlClient.ExecuteIngestWithIPC(
+               context.Background(),
+               rdr,
+               request,
+               []ipc.Option{ipc.WithSchema(overrideSchema)},
+               s.callOpts...,
+       )
+       s.Error(err)
+       s.ErrorContains(err, "different schema")
+}
+
+func (s *FlightSqlClientSuite) TestExecuteIngestWithSliceOptions() {
+       schema := arrow.NewSchema([]arrow.Field{{Name: "id", Type: 
arrow.PrimitiveTypes.Int64}}, nil)
+       rec, _, err := array.RecordFromJSON(memory.DefaultAllocator, schema, 
strings.NewReader(`[{"id": 1}]`))
+       s.Require().NoError(err)
+       defer rec.Release()
+
+       rdr, err := array.NewRecordReader(schema, []arrow.RecordBatch{rec})
+       s.Require().NoError(err)
+       defer rdr.Release()
+
+       request := &flightsql.ExecuteIngestOpts{
+               Table:                  "target_table",
+               TableDefinitionOptions: &flightsql.TableDefinitionOptions{},
+       }
+
+       result := &pb.DoPutUpdateResult{RecordCount: 1}
+       resdata, _ := proto.Marshal(result)
+
+       mockedPut := &mockDoPutClient{}
+       defer mockedPut.AssertExpectations(s.T())
+       mockedPut.On("Send", 
mock.AnythingOfType("*flight.FlightData")).Return(nil)
+       mockedPut.On("CloseSend").Return(nil)
+       mockedPut.On("Recv").Return(&pb.PutResult{AppMetadata: resdata}, 
nil).Once()
+       mockedPut.On("Recv").Return(&pb.PutResult{}, io.EOF).Once()
+
+       s.mockClient.On("DoPut", s.callOpts).Return(mockedPut, nil)
+
+       ipcOpts := []ipc.Option{ipc.WithLZ4()}
+       count, err := s.sqlClient.ExecuteIngestWithIPC(
+               context.Background(),
+               rdr,
+               request,
+               ipcOpts,
+               s.callOpts...,
+       )
+       s.Require().NoError(err)
+       s.EqualValues(1, count)
+}
+
 func (s *FlightSqlClientSuite) TestGetSqlInfo() {
        sqlInfo := []flightsql.SqlInfo{
                flightsql.SqlInfoFlightSqlServerName,

Reply via email to