cocoa-xu commented on code in PR #1722:
URL: https://github.com/apache/arrow-adbc/pull/1722#discussion_r1610690947


##########
go/adbc/driver/bigquery/statement.go:
##########
@@ -0,0 +1,525 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package bigquery
+
+import (
+       "context"
+       "fmt"
+       "strconv"
+       "time"
+
+       "cloud.google.com/go/bigquery"
+       "github.com/apache/arrow-adbc/go/adbc"
+       "github.com/apache/arrow/go/v17/arrow"
+       "github.com/apache/arrow/go/v17/arrow/array"
+)
+
+// todos for bigquery.QueryConfig
+// - TableDefinitions
+// - Parameters
+// - TimePartitioning
+// - RangePartitioning
+// - Clustering
+// - Labels
+// - DestinationEncryptionConfig
+// - SchemaUpdateOptions
+// - ConnectionProperties
+
+type statement struct {
+       connectionImpl *connectionImpl
+       query          *bigquery.Query
+       parameterMode  string
+       paramBinding   arrow.Record
+       streamBinding  array.RecordReader
+}
+
+// Close releases any relevant resources associated with this statement
+// and closes it (particularly if it is a prepared statement).
+//
+// A statement instance should not be used after Close is called.
+func (st *statement) Close() error {
+       st.clearParameters()
+       return nil
+}
+
+func (st *statement) GetOption(key string) (string, error) {
+       switch key {
+       case OptionStringProjectID:
+               val, err := st.connectionImpl.GetOption(OptionStringProjectID)
+               if err != nil {
+                       return "", err
+               } else {
+                       return val, nil
+               }
+       case OptionStringQueryParameterMode:
+               return st.parameterMode, nil
+       case OptionStringQueryDestinationTable:
+               return tableToString(st.query.QueryConfig.Dst), nil
+       case OptionStringQueryDefaultProjectID:
+               return st.query.QueryConfig.DefaultProjectID, nil
+       case OptionStringQueryDefaultDatasetID:
+               return st.query.QueryConfig.DefaultDatasetID, nil
+       case OptionStringQueryCreateDisposition:
+               return string(st.query.QueryConfig.CreateDisposition), nil
+       case OptionStringQueryWriteDisposition:
+               return string(st.query.QueryConfig.WriteDisposition), nil
+       case OptionBoolQueryDisableQueryCache:
+               return 
strconv.FormatBool(st.query.QueryConfig.DisableQueryCache), nil
+       case OptionBoolDisableFlattenedResults:
+               return 
strconv.FormatBool(st.query.QueryConfig.DisableFlattenedResults), nil
+       case OptionBoolQueryAllowLargeResults:
+               return 
strconv.FormatBool(st.query.QueryConfig.AllowLargeResults), nil
+       case OptionStringQueryPriority:
+               return string(st.query.QueryConfig.Priority), nil
+       case OptionBoolQueryUseLegacySQL:
+               return strconv.FormatBool(st.query.QueryConfig.UseLegacySQL), 
nil
+       case OptionBoolQueryDryRun:
+               return strconv.FormatBool(st.query.QueryConfig.DryRun), nil
+       case OptionBoolQueryCreateSession:
+               return strconv.FormatBool(st.query.QueryConfig.CreateSession), 
nil
+       default:
+               val, err := st.connectionImpl.GetOption(key)
+               if err == nil {
+                       return val, nil
+               }
+               return "", err
+       }
+}
+
+func (st *statement) GetOptionInt(key string) (int64, error) {
+       switch key {
+       case OptionIntQueryMaxBillingTier:
+               return int64(st.query.QueryConfig.MaxBillingTier), nil
+       case OptionIntQueryMaxBytesBilled:
+               return st.query.QueryConfig.MaxBytesBilled, nil
+       case OptionIntQueryJobTimeout:
+               return st.query.QueryConfig.JobTimeout.Milliseconds(), nil
+       default:
+               val, err := st.connectionImpl.GetOptionInt(key)
+               if err == nil {
+                       return val, nil
+               }
+               return 0, err
+       }
+}
+
+func (st *statement) SetOption(key string, v string) error {
+       switch key {
+       case OptionStringQueryParameterMode:
+               switch v {
+               case OptionValueQueryParameterModeNamed, 
OptionValueQueryParameterModePositional:
+                       st.parameterMode = v
+               default:
+                       return adbc.Error{
+                               Code: adbc.StatusInvalidArgument,
+                               Msg:  fmt.Sprintf("Parameter mode for the 
statement can only be either %s or %s", OptionValueQueryParameterModeNamed, 
OptionValueQueryParameterModePositional),
+                       }
+               }
+       case OptionStringQueryDestinationTable:
+               val, err := stringToTable(v)
+               if err == nil {
+                       st.query.QueryConfig.Dst = val
+               } else {
+                       return err
+               }
+       case OptionStringQueryDefaultProjectID:
+               st.query.QueryConfig.DefaultProjectID = v
+       case OptionStringQueryDefaultDatasetID:
+               st.query.QueryConfig.DefaultDatasetID = v
+       case OptionStringQueryCreateDisposition:
+               val, err := stringToTableCreateDisposition(v)
+               if err == nil {
+                       st.query.QueryConfig.CreateDisposition = val
+               } else {
+                       return err
+               }
+       case OptionStringQueryWriteDisposition:
+               val, err := stringToTableWriteDisposition(v)
+               if err == nil {
+                       st.query.QueryConfig.WriteDisposition = val
+               } else {
+                       return err
+               }
+       case OptionBoolQueryDisableQueryCache:
+               val, err := strconv.ParseBool(v)
+               if err == nil {
+                       st.query.QueryConfig.DisableQueryCache = val
+               } else {
+                       return err
+               }
+       case OptionBoolDisableFlattenedResults:
+               val, err := strconv.ParseBool(v)
+               if err == nil {
+                       st.query.QueryConfig.DisableFlattenedResults = val
+               } else {
+                       return err
+               }
+       case OptionBoolQueryAllowLargeResults:
+               val, err := strconv.ParseBool(v)
+               if err == nil {
+                       st.query.QueryConfig.AllowLargeResults = val
+               } else {
+                       return err
+               }
+       case OptionStringQueryPriority:
+               val, err := stringToQueryPriority(v)
+               if err == nil {
+                       st.query.QueryConfig.Priority = val
+               } else {
+                       return err
+               }
+       case OptionBoolQueryUseLegacySQL:
+               val, err := strconv.ParseBool(v)
+               if err == nil {
+                       st.query.QueryConfig.UseLegacySQL = val
+               } else {
+                       return err
+               }
+       case OptionBoolQueryDryRun:
+               val, err := strconv.ParseBool(v)
+               if err == nil {
+                       st.query.QueryConfig.DryRun = val
+               } else {
+                       return err
+               }
+       case OptionBoolQueryCreateSession:
+               val, err := strconv.ParseBool(v)
+               if err == nil {
+                       st.query.QueryConfig.CreateSession = val
+               } else {
+                       return err
+               }
+
+       default:
+               return adbc.Error{
+                       Code: adbc.StatusInvalidArgument,
+                       Msg:  fmt.Sprintf("unknown statement string type option 
`%s`", key),
+               }
+       }
+       return nil
+}
+
+func (st *statement) SetOptionInt(key string, value int64) error {
+       switch key {
+       case OptionIntQueryMaxBillingTier:
+               st.query.QueryConfig.MaxBillingTier = int(value)
+       case OptionIntQueryMaxBytesBilled:
+               st.query.QueryConfig.MaxBytesBilled = value
+       case OptionIntQueryJobTimeout:
+               st.query.QueryConfig.JobTimeout = time.Duration(value) * 
time.Millisecond
+       default:
+               return adbc.Error{
+                       Code: adbc.StatusInvalidArgument,
+                       Msg:  fmt.Sprintf("unknown statement string type option 
`%s`", key),
+               }
+       }
+       return nil
+}
+
+// SetSqlQuery sets the query string to be executed.
+//
+// The query can then be executed with any of the Execute methods.
+// For queries expected to be executed repeatedly, Prepare should be
+// called before execution.
+func (st *statement) SetSqlQuery(query string) error {
+       st.query.QueryConfig.Q = query
+       return nil
+}
+
+// ExecuteQuery executes the current query or prepared statement
+// and returns a RecordReader for the results along with the number
+// of rows affected if known, otherwise it will be -1.
+//
+// This invalidates any prior result sets on this statement.
+func (st *statement) ExecuteQuery(ctx context.Context) (array.RecordReader, 
int64, error) {
+       parameters, err := st.getQueryParameter()
+       if err != nil {
+               return nil, 0, err
+       }
+       if parameters != nil {
+               st.query.QueryConfig.Parameters = parameters
+       }
+       reader, affectedRows, err := newRecordReader(ctx, st.query, 
st.connectionImpl.Alloc)
+       if err != nil {
+               return nil, -1, err
+       }
+       return reader, affectedRows, nil
+}
+
+// ExecuteUpdate executes a statement that does not generate a result
+// set. It returns the number of rows affected if known, otherwise -1.
+func (st *statement) ExecuteUpdate(ctx context.Context) (int64, error) {
+       return -1, adbc.Error{
+               Code: adbc.StatusNotImplemented,
+               Msg:  "ExecuteUpdate not yet implemented for BigQuery driver",
+       }
+}
+
+// ExecuteSchema gets the schema of the result set of a query without 
executing it.
+func (st *statement) ExecuteSchema(ctx context.Context) (*arrow.Schema, error) 
{
+       return nil, adbc.Error{
+               Code: adbc.StatusNotImplemented,
+               Msg:  "ExecuteSchema not yet implemented for BigQuery driver",
+       }
+}
+
+// Prepare turns this statement into a prepared statement to be executed
+// multiple times. This invalidates any prior result sets.
+func (st *statement) Prepare(_ context.Context) error {
+       return nil
+}
+
+// SetSubstraitPlan allows setting a serialized Substrait execution
+// plan into the query or for querying Substrait-related metadata.
+//
+// Drivers are not required to support both SQL and Substrait semantics.
+// If they do, it may be via converting between representations internally.
+//
+// Like SetSqlQuery, after this is called the query can be executed
+// using any of the Execute methods. If the query is expected to be
+// executed repeatedly, Prepare should be called first on the statement.
+func (st *statement) SetSubstraitPlan(plan []byte) error {
+       return adbc.Error{
+               Code: adbc.StatusNotImplemented,
+               Msg:  "Substrait not yet implemented for BigQuery driver",
+       }
+}
+
+func arrowValueToQueryParameterValue(value arrow.Array) 
bigquery.QueryParameter {
+       parameter := bigquery.QueryParameter{}
+       if value.IsNull(0) {
+               parameter.Value = &bigquery.QueryParameterValue{
+                       Type: bigquery.StandardSQLDataType{
+                               TypeKind: "NULL",
+                       },
+                       Value: "NULL",
+               }
+               return parameter
+       }
+
+       // 
https://cloud.google.com/bigquery/docs/reference/storage#arrow_schema_details
+       // 
https://cloud.google.com/bigquery/docs/reference/rest/v2/StandardSqlDataType#typekind
+       switch value.DataType().ID() {

Review Comment:
   Got it! I'll leave a todo here and implement them in separate PRs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to