This is an automated email from the ASF dual-hosted git repository. zeroshade pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push: new f609bb171a GH-39910: [Go] Add func to load prepared statement from ActionCreatePreparedStatementResult (#39913) f609bb171a is described below commit f609bb171a8bce973d7b040d8684b04a60e806ed Author: abandy <aba...@live.com> AuthorDate: Wed Feb 7 16:01:55 2024 -0500 GH-39910: [Go] Add func to load prepared statement from ActionCreatePreparedStatementResult (#39913) Currently, in order to create a PreparedStatement a DoAction call will always be made via the client. I need to be able to make a PreparedStatement from persisted data that will not trigger the DoAction call to the server. * Closes: #39910 Authored-by: Alva Bandy <aba...@live.com> Signed-off-by: Matt Topol <zotthewiz...@gmail.com> --- go/arrow/flight/flightsql/client.go | 65 ++++++++++++++++++++++++++++++++ go/arrow/flight/flightsql/client_test.go | 30 +++++++++++++++ go/arrow/flight/flightsql/types.go | 2 + 3 files changed, 97 insertions(+) diff --git a/go/arrow/flight/flightsql/client.go b/go/arrow/flight/flightsql/client.go index 441f88f39f..068bfa84c3 100644 --- a/go/arrow/flight/flightsql/client.go +++ b/go/arrow/flight/flightsql/client.go @@ -450,6 +450,31 @@ func (c *Client) PrepareSubstrait(ctx context.Context, plan SubstraitPlan, opts return parsePreparedStatementResponse(c, c.Alloc, stream) } +func (c *Client) LoadPreparedStatementFromResult(result *CreatePreparedStatementResult) (*PreparedStatement, error) { + var ( + err error + dsSchema, paramSchema *arrow.Schema + ) + if result.DatasetSchema != nil { + dsSchema, err = flight.DeserializeSchema(result.DatasetSchema, c.Alloc) + if err != nil { + return nil, err + } + } + if result.ParameterSchema != nil { + paramSchema, err = flight.DeserializeSchema(result.ParameterSchema, c.Alloc) + if err != nil { + return nil, err + } + } + return &PreparedStatement{ + client: c, + handle: result.PreparedStatementHandle, + datasetSchema: dsSchema, + paramSchema: paramSchema, + }, nil +} + func parsePreparedStatementResponse(c *Client, mem memory.Allocator, results pb.FlightService_DoActionClient) (*PreparedStatement, error) { if err := results.CloseSend(); err != nil { return nil, err @@ -1027,6 +1052,46 @@ func (p *PreparedStatement) Execute(ctx context.Context, opts ...grpc.CallOption return p.client.getFlightInfo(ctx, desc, opts...) } +// ExecutePut calls DoPut for the prepared statement on the server. If SetParameters +// has been called then the parameter bindings will be sent before execution. +// +// Will error if already closed. +func (p *PreparedStatement) ExecutePut(ctx context.Context, opts ...grpc.CallOption) error { + if p.closed { + return errors.New("arrow/flightsql: prepared statement already closed") + } + + cmd := &pb.CommandPreparedStatementQuery{PreparedStatementHandle: p.handle} + + desc, err := descForCommand(cmd) + if err != nil { + return err + } + + if p.hasBindParameters() { + pstream, err := p.client.Client.DoPut(ctx, opts...) + if err != nil { + return err + } + + wr, err := p.writeBindParameters(pstream, desc) + if err != nil { + return err + } + if err = wr.Close(); err != nil { + return err + } + pstream.CloseSend() + + // wait for the server to ack the result + if _, err = pstream.Recv(); err != nil && err != io.EOF { + return err + } + } + + return nil +} + // ExecutePoll executes the prepared statement on the server and returns a PollInfo // indicating the progress of execution. // diff --git a/go/arrow/flight/flightsql/client_test.go b/go/arrow/flight/flightsql/client_test.go index c8b9f7f124..f35aeefcf4 100644 --- a/go/arrow/flight/flightsql/client_test.go +++ b/go/arrow/flight/flightsql/client_test.go @@ -665,6 +665,36 @@ func (s *FlightSqlClientSuite) TestRenewFlightEndpoint() { s.Equal(&mockedRenewedEndpoint, renewedEndpoint) } +func (s *FlightSqlClientSuite) TestPreparedStatementLoadFromResult() { + const query = "query" + + result := &pb.ActionCreatePreparedStatementResult{ + PreparedStatementHandle: []byte(query), + } + + parameterSchemaResult := arrow.NewSchema([]arrow.Field{{Name: "p_id", Type: arrow.PrimitiveTypes.Int64, Nullable: true}}, nil) + result.ParameterSchema = flight.SerializeSchema(parameterSchemaResult, memory.DefaultAllocator) + datasetSchemaResult := arrow.NewSchema([]arrow.Field{{Name: "ds_id", Type: arrow.PrimitiveTypes.Int64, Nullable: true}}, nil) + result.DatasetSchema = flight.SerializeSchema(datasetSchemaResult, memory.DefaultAllocator) + + prepared, err := s.sqlClient.LoadPreparedStatementFromResult(result) + s.NoError(err) + + s.Equal(string(prepared.Handle()), "query") + + paramSchema := prepared.ParameterSchema() + paramRec, _, err := array.RecordFromJSON(memory.DefaultAllocator, paramSchema, strings.NewReader(`[{"p_id": 1}]`)) + s.NoError(err) + defer paramRec.Release() + + datasetSchema := prepared.DatasetSchema() + datasetRec, _, err := array.RecordFromJSON(memory.DefaultAllocator, datasetSchema, strings.NewReader(`[{"ds_id": 1}]`)) + s.NoError(err) + defer datasetRec.Release() + + s.Equal(string(prepared.Handle()), "query") +} + func TestFlightSqlClient(t *testing.T) { suite.Run(t, new(FlightSqlClientSuite)) } diff --git a/go/arrow/flight/flightsql/types.go b/go/arrow/flight/flightsql/types.go index d89e68f028..c70a8bdc4e 100644 --- a/go/arrow/flight/flightsql/types.go +++ b/go/arrow/flight/flightsql/types.go @@ -852,3 +852,5 @@ const ( // cancellation request. CancelResultNotCancellable = pb.ActionCancelQueryResult_CANCEL_RESULT_NOT_CANCELLABLE ) + +type CreatePreparedStatementResult = pb.ActionCreatePreparedStatementResult