This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new fd357fd  [BEAM-13732] Go SDK BigQuery IO wrapper. Initial 
implementation. (#16598)
fd357fd is described below

commit fd357fd1cb200a43db44a232823b3bd5b71c6494
Author: Daniel Oliveira <[email protected]>
AuthorDate: Tue Feb 8 17:34:33 2022 -0800

    [BEAM-13732] Go SDK BigQuery IO wrapper. Initial implementation. (#16598)
---
 sdks/go/pkg/beam/io/xlang/bigqueryio/bigquery.go   | 267 +++++++++++++++++++++
 sdks/go/pkg/beam/io/xlang/schemaio/schemaio.go     | 119 +++++++++
 .../go/pkg/beam/io/xlang/schemaio/schemaio_test.go |  80 ++++++
 3 files changed, 466 insertions(+)

diff --git a/sdks/go/pkg/beam/io/xlang/bigqueryio/bigquery.go 
b/sdks/go/pkg/beam/io/xlang/bigqueryio/bigquery.go
new file mode 100644
index 0000000..19f2d62
--- /dev/null
+++ b/sdks/go/pkg/beam/io/xlang/bigqueryio/bigquery.go
@@ -0,0 +1,267 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package bigqueryio contains cross-language functionality for using Google 
Cloud BigQuery
+// (https://cloud.google.com/bigquery). These transforms only work on runners 
that support
+// cross-language transforms.
+//
+// Setup
+//
+// Transforms specified here are cross-language transforms implemented in a
+// different SDK (listed below). During pipeline construction, the Go SDK will
+// need to connect to an expansion service containing information on these
+// transforms in their native SDK. If an expansion service address is not
+// provided, an appropriate expansion service will be automatically started;
+// however this is slower than having a persistent expansion service running.
+//
+// To use a persistent expansion service, it must be run as a separate process
+// accessible during pipeline construction. The address of that process must be
+// passed to the transforms in this package.
+//
+// The version of the expansion service should match the version of the Beam 
SDK
+// being used. For numbered releases of Beam, these expansions services are
+// released to the Maven repository as modules. For development versions of
+// Beam, it is recommended to build and run it from source using Gradle.
+//
+// Current supported SDKs, including expansion service modules and reference
+// documentation:
+//   * Java
+//     - Vendored Module: beam-sdks-java-extensions-schemaio-expansion-service
+//     - Run via Gradle: ./gradlew 
:sdks:java:extensions:schemaio-expansion-service:runExpansionService
+//     - Reference Class: 
org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaIOProvider and
+//       org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
+//
+// Type Conversions
+//
+// Elements are read from and written to BigQuery by first converting to a 
Beam schema Row type
+// before converting to BigQuery compatible types. The following table lists 
all BigQuery types
+// currently supported, and how they convert to Beam schema and Go types.
+//   +----------------------------+------------------+-----------------+
+//   | BigQuery Standard SQL Type | Beam Schema Type |     Go Type     |
+//   +----------------------------+------------------+-----------------+
+//   | BOOLEAN                    | BOOLEAN          | bool            |
+//   | INT64                      | INT64            | int64           |
+//   | FLOAT64                    | DOUBLE           | float64         |
+//   | BYTES                      | BYTES            | []byte          |
+//   | STRING                     | STRING           | string          |
+//   | ARRAY                      | ARRAY            | Special: slice  |
+//   | STRUCT                     | ROW              | Special: struct |
+//   +----------------------------+------------------+-----------------+
+//
+// Array types are inferred from slice fields. For example, []int64 is 
equivalent to BigQuery's
+// ARRAY<INT64>. Struct types are inferred from nested structs in Go.
+//
+// Additionally, BigQuery schema fields can have a mode assigned to specify 
whether the field is
+// Nullable, Required, or Repeated. In Go, Nullable fields are represented as 
pointers, whereas
+// Required fields are value types. Repeated fields are represented as slices 
in Go (and ARRAYS
+// in SQL, as in the table above).
+//
+// Example of BigQuery fields with modes:
+//   field1 *int64   // Nullable INT64
+//   field2 int64    // Required INT64
+//   field3 []int64  // Repeated INT64
+//
+// Note On Documentation
+//
+// This cross-language implementation relies on the behavior of external SDKs. 
In order to keep
+// documentation up-to-date and consistent, BigQuery functionality will not be 
described in detail
+// in this package. Instead, references to relevant documentation in other 
SDKs is included where
+// relevant.
+package bigqueryio
+
+import (
+       "fmt"
+       "reflect"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/schemaio"
+)
+
+type createDisposition string
+
+const (
+       // CreateNever specifies that new tables should not be created when 
writing to BigQuery.
+       CreateNever createDisposition = "Never"
+
+       // CreateIfNeeded specifies that tables should be created when writing 
to BigQuery, if needed.
+       CreateIfNeeded createDisposition = "IfNeeded"
+
+       readURN  = "beam:transform:org.apache.beam:schemaio_bigquery_read:v1"
+       writeURN = "beam:transform:org.apache.beam:schemaio_bigquery_write:v1"
+
+       serviceGradleTarget = 
":sdks:java:io:google-cloud-platform:expansion-service:runExpansionService"
+)
+
+var autoStartupAddress = 
xlangx.UseAutomatedJavaExpansionService(serviceGradleTarget)
+
+// bigQueryConfig is a struct meant to match the Schema IO config for Java's 
BigQuery IO. This is
+// used for both reads and writes, and is meant to match the schema defined in 
the Java SDK method
+// 
org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaIOProvider.configurationSchema().
+type bigQueryConfig struct {
+       Table             *string `beam:"table"`
+       Query             *string `beam:"query"`
+       QueryLocation     *string `beam:"queryLocation"`
+       CreateDisposition *string `beam:"createDisposition"`
+}
+
+// Read is a cross-language PTransform which reads from a BigQuery table and 
returns a PCollection
+// of the given type, which should correspond to the Schema type generated by 
reading from the
+// table.
+//
+// Read requires a reflect.Type description of the struct to read from 
BigQuery. Additionally,
+// either one Table or one Query must be provided via readOptions to define a 
destination to read
+// from.
+//
+// Read accepts additional parameters as readOptions. All optional parameters 
are predefined in this
+// package as functions that return readOption. To set an additional 
parameter, call the function
+// within Read's function signature.
+//
+// Example:
+//   expansionAddr := "localhost:1234"
+//   table := "project_id:dataset_id.table_id"
+//   outType := reflect.TypeOf((*Foo)(nil)).Elem()
+//   pcol := bigqueryio.Read(s, outType,
+//       bigqueryio.FromTable(table),
+//       bigqueryio.ReadExpansionAddr(expansionAddr))
+func Read(s beam.Scope, elmT reflect.Type, opts ...readOption) 
beam.PCollection {
+       s = s.Scope("bigqueryio.Read")
+
+       rc := readConfig{cfg: &bigQueryConfig{}}
+       for _, opt := range opts {
+               opt(&rc)
+       }
+       if rc.cfg.Table == nil && rc.cfg.Query == nil {
+               panic(fmt.Sprintf("%v requires either a Table or Query 
specified, received none", s.String()))
+       }
+
+       addr := rc.addr
+       if addr == "" {
+               addr = autoStartupAddress
+       }
+
+       pl := schemaio.MustEncodePayload("", rc.cfg, nil)
+       outT := typex.New(elmT)
+       outs := beam.CrossLanguage(s, readURN, pl, addr, nil, 
beam.UnnamedOutput(outT))
+       return outs[beam.UnnamedOutputTag()]
+}
+
+type readConfig struct {
+       cfg  *bigQueryConfig
+       addr string
+}
+type readOption func(*readConfig)
+
+// ReadExpansionAddr specifies the address of a persistent expansion service 
to use for a Read
+// transform. If this is not provided, or if an empty string is provided, the 
transform will
+// automatically start an appropriate expansion service instead.
+func ReadExpansionAddr(addr string) readOption {
+       return func(rc *readConfig) {
+               rc.addr = addr
+       }
+}
+
+// FromTable is a Read option that specifies which table to read from.
+//
+// For more details see in the Java SDK:
+// org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Read.from(java.lang.String)
+func FromTable(table string) readOption {
+       return func(rc *readConfig) {
+               rc.cfg.Table = &table
+       }
+}
+
+// FromQuery is a Read option that specifies a query to use for reading from 
BigQuery. Uses the
+// BigQuery Standard SQL dialect.
+//
+// For more details see in the Java SDK:
+// 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Read.fromQuery(java.lang.String)
+func FromQuery(query string) readOption {
+       return func(rc *readConfig) {
+               rc.cfg.Query = &query
+       }
+}
+
+// WithQueryLocation is a Read option that specifies a BigQuery geographic 
location where the query
+// job will be executed.
+//
+// For more details see in the Java SDK:
+// org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.withQueryLocation
+func WithQueryLocation(location string) readOption {
+       return func(rc *readConfig) {
+               rc.cfg.QueryLocation = &location
+       }
+}
+
+// Write is a cross-language PTRansform which writes elements from a 
PCollection to a BigQuery
+// table.
+//
+// Write requires the ID of a BigQuery table, and an input PCollection. The 
type of the input
+// PCollection is converted to a Beam schema, so it must have a valid Beam 
schema definition.
+//
+// Write accepts additional parameters as writeOptions. All optional 
parameters are predefined in
+// this package as functions that return writeOption. To set an additional 
parameter, call the
+// function within Write's function signature.
+//
+// Example:
+//   expansionAddr := "localhost:1234"
+//   table := "project_id:dataset_id.table_id"
+//   pcol := bigqueryio.Write(s, table, input,
+//      bigqueryio.CreateDisposition(bigqueryio.CreateIfNeeded),
+//      bigqueryio.WriteExpansionAddr(expansionAddr))
+func Write(s beam.Scope, table string, col beam.PCollection, opts 
...writeOption) {
+       s = s.Scope("bigqueryio.Write")
+
+       wc := writeConfig{cfg: &bigQueryConfig{}}
+       wc.cfg.Table = &table
+       for _, opt := range opts {
+               opt(&wc)
+       }
+
+       addr := wc.addr
+       if addr == "" {
+               addr = autoStartupAddress
+       }
+
+       pl := schemaio.MustEncodePayload("", *wc.cfg, nil)
+       beam.CrossLanguage(s, writeURN, pl, addr, beam.UnnamedInput(col), nil)
+}
+
+type writeConfig struct {
+       cfg  *bigQueryConfig
+       addr string
+}
+type writeOption func(*writeConfig)
+
+// WriteExpansionAddr specifies the address of a persistent expansion service 
to use for a Write
+// transform. If this is not provided, or if an empty string is provided, the 
transform will
+// automatically start an appropriate expansion service instead.
+func WriteExpansionAddr(addr string) writeOption {
+       return func(wc *writeConfig) {
+               wc.addr = addr
+       }
+}
+
+// CreateDisposition specifies the write transform's behavior in regards to 
creating new tables.
+//
+// For more details see in the Java SDK:
+// org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.withCreateDisposition
+func CreateDisposition(disp createDisposition) writeOption {
+       str := string(disp)
+       return func(wc *writeConfig) {
+               wc.cfg.CreateDisposition = &str
+       }
+}
diff --git a/sdks/go/pkg/beam/io/xlang/schemaio/schemaio.go 
b/sdks/go/pkg/beam/io/xlang/schemaio/schemaio.go
new file mode 100644
index 0000000..70103e9
--- /dev/null
+++ b/sdks/go/pkg/beam/io/xlang/schemaio/schemaio.go
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package schemaio contains utilities for constructing cross-language IO 
wrappers meant to
+// interface with the Java SDK's Schema IOs. Schema IO is an interface for any 
IO that operates on
+// Beam schema supported elements. Various IOs are implemented via Schema IO, 
and each
+// implementation requires its own IO wrapper in the Go SDK (for example, JDBC 
IO or BigQuery IO),
+// and those IO wrappers can make use of these utilities.
+//
+// For implementation details of Schema IO see 
https://s.apache.org/schemaio-development-guide.
+package schemaio
+
+import (
+       "bytes"
+       "reflect"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/schema"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+       "google.golang.org/protobuf/proto"
+)
+
+// Payload is a struct matching the expected cross-language payload of a 
Schema IO.
+//
+// This documentation describes the expected usage of each field, but 
individual IO implementations
+// are free to use this payload differently. For implementation details of 
those IOs, refer to
+// SchemaIOProvider implementations in the Java SDK.
+type Payload struct {
+       // Location specifies the location to find the data (for example, a URL 
to a database).
+       Location string `beam:"location"`
+
+       // Config is a Beam schema encoded struct containing configuration 
details specific to the
+       // underlying IO implementation.
+       Config []byte `beam:"config"`
+
+       // DataSchema is an optional Beam schema encoded struct representing 
the schema for data being
+       // read or written.
+       DataSchema *[]byte `beam:"dataSchema"`
+}
+
+// encodeAsRow encodes a struct as a Beam schema Row, to embed within a cross 
language payload.
+func encodeAsRow(config interface{}) ([]byte, error) {
+       rt := reflect.TypeOf(config)
+       enc, err := coder.RowEncoderForStruct(rt)
+       if err != nil {
+               err = errors.WithContextf(err, "getting Row encoder for type 
%s", rt.Name())
+               return nil, err
+       }
+       var buf bytes.Buffer
+       if err := enc(config, &buf); err != nil {
+               err = errors.WithContextf(err, "encoding type %s as Row", 
rt.Name())
+               return nil, err
+       }
+       return buf.Bytes(), nil
+}
+
+// encodeAsSchema retrieves a schema of a type, and encodes that schema into 
bytes.
+func encodeAsSchema(rt reflect.Type) ([]byte, error) {
+       scm, err := schema.FromType(rt)
+       if err != nil {
+               err = errors.WithContextf(err, "retrieving schema of type %s", 
rt.Name())
+               return nil, err
+       }
+       encScm, err := proto.Marshal(scm)
+       if err != nil {
+               err = errors.WithContextf(err, "encoding schema of type %s", 
rt.Name())
+               return nil, err
+       }
+       return encScm, nil
+}
+
+// EncodePayload encodes a SchemaIO payload. It takes a location for the 
SchemaIO's data, an
+// IO-specific configuration struct, and an optional struct representing the 
Beam schema for the
+// data.
+func EncodePayload(location string, config interface{}, dataSchema 
reflect.Type) ([]byte, error) {
+       encCfg, err := encodeAsRow(config)
+       if err != nil {
+               err = errors.WithContext(err, "encoding config for SchemaIO 
payload")
+               return nil, err
+       }
+       pl := Payload{
+               Location: location,
+               Config:   encCfg,
+       }
+
+       if dataSchema != nil {
+               encScm, err := encodeAsSchema(dataSchema)
+               if err != nil {
+                       err = errors.WithContext(err, "encoding dataSchema for 
SchemaIO payload")
+                       return nil, err
+               }
+               pl.DataSchema = &encScm
+       }
+       return beam.CrossLanguagePayload(pl), err
+}
+
+// MustEncodePayload encodes a SchemaIO payload. It takes a location for the 
SchemaIO's data, an
+// IO-specific configuration struct, and an optional struct representing the 
Beam schema for the
+// data. Unlike EncodePayload, this panics if an error occurs.
+func MustEncodePayload(location string, config interface{}, dataSchema 
reflect.Type) []byte {
+       pl, err := EncodePayload(location, config, dataSchema)
+       if err != nil {
+               panic(err)
+       }
+       return pl
+}
diff --git a/sdks/go/pkg/beam/io/xlang/schemaio/schemaio_test.go 
b/sdks/go/pkg/beam/io/xlang/schemaio/schemaio_test.go
new file mode 100644
index 0000000..06d77fe
--- /dev/null
+++ b/sdks/go/pkg/beam/io/xlang/schemaio/schemaio_test.go
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package schemaio
+
+import (
+       "reflect"
+       "testing"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx"
+       "github.com/google/go-cmp/cmp"
+)
+
+func init() {
+       beam.RegisterType(reflect.TypeOf((*testConfig)(nil)))
+       beam.RegisterType(reflect.TypeOf((*testDataSchema)(nil)))
+}
+
+type testConfig struct {
+       Cfg1 string
+       Cfg2 *int
+}
+
+type testDataSchema struct {
+       Scm1 bool
+       Scm2 float64
+}
+
+const testLocation = "test_location"
+
+// TestEncodePayload tests that EncodePayload works properly by encoding and 
then decoding a
+// schemaio.Payload and checking that the values match.
+func TestEncodePayload(t *testing.T) {
+       // Create test values, encode them, and create a payload.
+       i := 42
+       config := testConfig{"foo", &i}
+       schemaType := reflect.TypeOf((*testDataSchema)(nil)).Elem()
+
+       encConfig, err := encodeAsRow(config)
+       if err != nil {
+               t.Fatalf("failed to encode Config as row: %s", err)
+       }
+       encSchema, err := encodeAsSchema(schemaType)
+       if err != nil {
+               t.Fatalf("failed to encode DataSchema as schema: %s", err)
+       }
+
+       wantPayload := Payload{
+               Location:   testLocation,
+               Config:     encConfig,
+               DataSchema: &encSchema,
+       }
+
+       // Encode and decode Payload, comparing results.
+       payloadBytes, err := EncodePayload(testLocation, config, schemaType)
+       if err != nil {
+               t.Fatalf("EncodePayload failed with error: %s", err)
+       }
+       gotPayload, err := xlangx.DecodeStructPayload(payloadBytes)
+       if err != nil {
+               t.Fatalf("DecodeStructPayload failed with error: %s", err)
+       }
+
+       if diff := cmp.Diff(wantPayload, gotPayload); diff != "" {
+               t.Errorf("decoded test Payload does not match Payload before 
encoding: diff(-want,+got):\n%v", diff)
+       }
+}

Reply via email to