This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8aa1392 [BEAM-4634] Add options to allow BigQuery StandardSQL
queries. (#12061)
8aa1392 is described below
commit 8aa13920e6a9fd9692e56bef73c4ac18223d27ae
Author: Brian Michalski <[email protected]>
AuthorDate: Tue Jun 23 20:30:34 2020 -0400
[BEAM-4634] Add options to allow BigQuery StandardSQL queries. (#12061)
---
sdks/go/pkg/beam/io/bigqueryio/bigquery.go | 35 +++++++++++++++++++++++++-----
1 file changed, 30 insertions(+), 5 deletions(-)
diff --git a/sdks/go/pkg/beam/io/bigqueryio/bigquery.go
b/sdks/go/pkg/beam/io/bigqueryio/bigquery.go
index 52cee62..dc54109 100644
--- a/sdks/go/pkg/beam/io/bigqueryio/bigquery.go
+++ b/sdks/go/pkg/beam/io/bigqueryio/bigquery.go
@@ -93,18 +93,39 @@ func Read(s beam.Scope, project, table string, t
reflect.Type) beam.PCollection
return query(s, project, fmt.Sprintf("SELECT * from [%v]", table), t)
}
+// QueryOptions represents additional options for executing a query.
+type QueryOptions struct {
+ // UseStandardSQL enables BigQuery's Standard SQL dialect when
executing a query.
+ UseStandardSQL bool
+}
+
+// UseStandardSQL enables BigQuery's Standard SQL dialect when executing a
query.
+func UseStandardSQL() func(qo *QueryOptions) error {
+ return func(qo *QueryOptions) error {
+ qo.UseStandardSQL = true
+ return nil
+ }
+}
+
// Query executes a query. The output must have a schema compatible with the
given
// type, t. It returns a PCollection<t>.
-func Query(s beam.Scope, project, q string, t reflect.Type) beam.PCollection {
+func Query(s beam.Scope, project, q string, t reflect.Type, options
...func(*QueryOptions) error) beam.PCollection {
s = s.Scope("bigquery.Query")
- return query(s, project, q, t)
+ return query(s, project, q, t, options...)
}
-func query(s beam.Scope, project, query string, t reflect.Type)
beam.PCollection {
+func query(s beam.Scope, project, query string, t reflect.Type, options
...func(*QueryOptions) error) beam.PCollection {
mustInferSchema(t)
+ queryOptions := QueryOptions{}
+ for _, opt := range options {
+ if err := opt(&queryOptions); err != nil {
+ panic(err)
+ }
+ }
+
imp := beam.Impulse(s)
- return beam.ParDo(s, &queryFn{Project: project, Query: query, Type:
beam.EncodedType{T: t}}, imp, beam.TypeDefinition{Var: beam.XType, T: t})
+ return beam.ParDo(s, &queryFn{Project: project, Query: query, Type:
beam.EncodedType{T: t}, Options: queryOptions}, imp, beam.TypeDefinition{Var:
beam.XType, T: t})
}
type queryFn struct {
@@ -114,6 +135,8 @@ type queryFn struct {
Query string `json:"query"`
// Type is the encoded schema type.
Type beam.EncodedType `json:"type"`
+ // Options specifies additional query execution options.
+ Options QueryOptions `json:"options"`
}
func (f *queryFn) ProcessElement(ctx context.Context, _ []byte, emit
func(beam.X)) error {
@@ -124,7 +147,9 @@ func (f *queryFn) ProcessElement(ctx context.Context, _
[]byte, emit func(beam.X
defer client.Close()
q := client.Query(f.Query)
- q.UseLegacySQL = true
+ if !f.Options.UseStandardSQL {
+ q.UseLegacySQL = true
+ }
it, err := q.Read(ctx)
if err != nil {