This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8aa1392  [BEAM-4634] Add options to allow BigQuery StandardSQL 
queries. (#12061)
8aa1392 is described below

commit 8aa13920e6a9fd9692e56bef73c4ac18223d27ae
Author: Brian Michalski <bmichal...@gmail.com>
AuthorDate: Tue Jun 23 20:30:34 2020 -0400

    [BEAM-4634] Add options to allow BigQuery StandardSQL queries. (#12061)
---
 sdks/go/pkg/beam/io/bigqueryio/bigquery.go | 35 +++++++++++++++++++++++++-----
 1 file changed, 30 insertions(+), 5 deletions(-)

diff --git a/sdks/go/pkg/beam/io/bigqueryio/bigquery.go 
b/sdks/go/pkg/beam/io/bigqueryio/bigquery.go
index 52cee62..dc54109 100644
--- a/sdks/go/pkg/beam/io/bigqueryio/bigquery.go
+++ b/sdks/go/pkg/beam/io/bigqueryio/bigquery.go
@@ -93,18 +93,39 @@ func Read(s beam.Scope, project, table string, t 
reflect.Type) beam.PCollection
        return query(s, project, fmt.Sprintf("SELECT * from [%v]", table), t)
 }
 
+// QueryOptions represents additional options for executing a query.
+type QueryOptions struct {
+       // UseStandardSQL enables BigQuery's Standard SQL dialect when 
executing a query.
+       UseStandardSQL bool
+}
+
+// UseStandardSQL enables BigQuery's Standard SQL dialect when executing a 
query.
+func UseStandardSQL() func(qo *QueryOptions) error {
+       return func(qo *QueryOptions) error {
+               qo.UseStandardSQL = true
+               return nil
+       }
+}
+
 // Query executes a query. The output must have a schema compatible with the 
given
 // type, t. It returns a PCollection<t>.
-func Query(s beam.Scope, project, q string, t reflect.Type) beam.PCollection {
+func Query(s beam.Scope, project, q string, t reflect.Type, options 
...func(*QueryOptions) error) beam.PCollection {
        s = s.Scope("bigquery.Query")
-       return query(s, project, q, t)
+       return query(s, project, q, t, options...)
 }
 
-func query(s beam.Scope, project, query string, t reflect.Type) 
beam.PCollection {
+func query(s beam.Scope, project, query string, t reflect.Type, options 
...func(*QueryOptions) error) beam.PCollection {
        mustInferSchema(t)
 
+       queryOptions := QueryOptions{}
+       for _, opt := range options {
+               if err := opt(&queryOptions); err != nil {
+                       panic(err)
+               }
+       }
+
        imp := beam.Impulse(s)
-       return beam.ParDo(s, &queryFn{Project: project, Query: query, Type: 
beam.EncodedType{T: t}}, imp, beam.TypeDefinition{Var: beam.XType, T: t})
+       return beam.ParDo(s, &queryFn{Project: project, Query: query, Type: 
beam.EncodedType{T: t}, Options: queryOptions}, imp, beam.TypeDefinition{Var: 
beam.XType, T: t})
 }
 
 type queryFn struct {
@@ -114,6 +135,8 @@ type queryFn struct {
        Query string `json:"query"`
        // Type is the encoded schema type.
        Type beam.EncodedType `json:"type"`
+       // Options specifies additional query execution options.
+       Options QueryOptions `json:"options"`
 }
 
 func (f *queryFn) ProcessElement(ctx context.Context, _ []byte, emit 
func(beam.X)) error {
@@ -124,7 +147,9 @@ func (f *queryFn) ProcessElement(ctx context.Context, _ 
[]byte, emit func(beam.X
        defer client.Close()
 
        q := client.Query(f.Query)
-       q.UseLegacySQL = true
+       if !f.Options.UseStandardSQL {
+               q.UseLegacySQL = true
+       }
 
        it, err := q.Read(ctx)
        if err != nil {

Reply via email to