This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 9533fc3757d feat: implement bigtable io connector with write
capabilities (#23411)
9533fc3757d is described below
commit 9533fc3757d3041135cbe2f5078dff0855adc1f1
Author: capthiron <[email protected]>
AuthorDate: Mon Nov 7 19:32:18 2022 +0100
feat: implement bigtable io connector with write capabilities (#23411)
---
CHANGES.md | 2 +-
sdks/go.mod | 8 +
sdks/go.sum | 10 +-
sdks/go/pkg/beam/io/bigtableio/bigtable.go | 285 ++++++++++++++++++++++++
sdks/go/pkg/beam/io/bigtableio/bigtable_test.go | 195 ++++++++++++++++
5 files changed, 498 insertions(+), 2 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 724a57e59aa..43df1bb5f0b 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -58,7 +58,7 @@
## I/Os
-* Support for X source added (Java/Python)
([#X](https://github.com/apache/beam/issues/X)).
+* Support for Bigtable sink (Write and WriteBatch) added (Go)
([#23324](https://github.com/apache/beam/issues/23324)).
## New Features / Improvements
diff --git a/sdks/go.mod b/sdks/go.mod
index 2611f23ea10..5b55903fc89 100644
--- a/sdks/go.mod
+++ b/sdks/go.mod
@@ -54,6 +54,8 @@ require (
gopkg.in/yaml.v2 v2.4.0
)
+require cloud.google.com/go/bigtable v1.16.0
+
require (
cloud.google.com/go v0.104.0 // indirect
cloud.google.com/go/compute v1.10.0 // indirect
@@ -64,11 +66,17 @@ require (
github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516 //
indirect
github.com/apache/thrift v0.14.2 // indirect
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
+ github.com/census-instrumentation/opencensus-proto v0.2.1 // indirect
+ github.com/cespare/xxhash/v2 v2.1.2 // indirect
+ github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4 // indirect
+ github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1 // indirect
github.com/containerd/cgroups v1.0.4 // indirect
github.com/containerd/containerd v1.6.8 // indirect
github.com/docker/distribution v2.8.1+incompatible // indirect
github.com/docker/docker v20.10.17+incompatible // indirect
github.com/docker/go-units v0.5.0 // indirect
+ github.com/envoyproxy/go-control-plane
v0.10.2-0.20220325020618-49ff273808a1 // indirect
+ github.com/envoyproxy/protoc-gen-validate v0.1.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da //
indirect
github.com/golang/snappy v0.0.4 // indirect
diff --git a/sdks/go.sum b/sdks/go.sum
index af635897dad..f1c7acd5b78 100644
--- a/sdks/go.sum
+++ b/sdks/go.sum
@@ -39,6 +39,8 @@ cloud.google.com/go/bigquery v1.7.0/go.mod
h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4g
cloud.google.com/go/bigquery v1.8.0/go.mod
h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ=
cloud.google.com/go/bigquery v1.43.0
h1:u0fvz5ysJBe1jwUPI4LuPwAX+o+6fCUwf3ECeg6eDUQ=
cloud.google.com/go/bigquery v1.43.0/go.mod
h1:ZMQcXHsl+xmU1z36G2jNGZmKp9zNY5BUua5wDgmNCfw=
+cloud.google.com/go/bigtable v1.16.0
h1:sqJhhslzQOag49Mf2/uH3+u+NdfpPX0gjKAcgYpRUCU=
+cloud.google.com/go/bigtable v1.16.0/go.mod
h1:6f7WVXfeZaJz0xevUZoTA1s8sTmmrQqIAkRDVEHVg7I=
cloud.google.com/go/compute v0.1.0/go.mod
h1:GAesmwr110a34z04OlxYkATPBEfVhkymfTBXtfbBFow=
cloud.google.com/go/compute v1.3.0/go.mod
h1:cCZiE1NHEtai4wiufUhW8I8S1JKkAnhnQJWM7YD99wM=
cloud.google.com/go/compute v1.5.0/go.mod
h1:9SMHyhJlzhlkJqrPAc839t2BZFTSk6Jdj6mkzQJeu0M=
@@ -185,10 +187,10 @@ github.com/cenkalti/backoff/v4 v4.1.1/go.mod
h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInq
github.com/cenkalti/backoff/v4 v4.1.2/go.mod
h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/cenkalti/backoff/v4 v4.1.3
h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4=
github.com/cenkalti/backoff/v4 v4.1.3/go.mod
h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
+github.com/census-instrumentation/opencensus-proto v0.2.1
h1:glEXhBS5PSLLv4IXzLA5yPRVX4bilULVyxxbrfOtDAk=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod
h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/certifi/gocertifi v0.0.0-20191021191039-0944d244cd40/go.mod
h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054/go.mod
h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
-github.com/cespare/xxhash v1.1.0
h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod
h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod
h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2
h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
@@ -209,11 +211,13 @@ github.com/client9/misspell v0.3.4/go.mod
h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod
h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod
h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod
h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
+github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4
h1:hzAQntlaYRkVSFEfj9OTWlVV1H155FMD8BTKktLv0QI=
github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod
h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI=
github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod
h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod
h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod
h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod
h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
+github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1
h1:zH8ljVhhq7yC0MIeUL/IviMtY8hx2mK8cN9wEYb8ggw=
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod
h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod
h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
github.com/cockroachdb/datadriven v0.0.0-20200714090401-bf6692d28da5/go.mod
h1:h6jFvWxBdQXxjopDMZyH2UVceIRfR84bdzbkoKrsWNo=
@@ -403,7 +407,9 @@ github.com/envoyproxy/go-control-plane
v0.9.9-0.20201210154907-fd9021fe5dad/go.m
github.com/envoyproxy/go-control-plane
v0.9.9-0.20210217033140-668b12f5399d/go.mod
h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/go-control-plane
v0.9.9-0.20210512163311-63b5d3c536b0/go.mod
h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ=
github.com/envoyproxy/go-control-plane
v0.9.10-0.20210907150352-cf90f659a021/go.mod
h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0=
+github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1
h1:xvqufLtNVwAhN8NMyWklVgxnWohi+wtMGQMhtxexlm0=
github.com/envoyproxy/go-control-plane
v0.10.2-0.20220325020618-49ff273808a1/go.mod
h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE=
+github.com/envoyproxy/protoc-gen-validate v0.1.0
h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod
h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/evanphx/json-patch v4.9.0+incompatible/go.mod
h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch v4.11.0+incompatible/go.mod
h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
@@ -523,6 +529,7 @@ github.com/golang/snappy v0.0.4/go.mod
h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod
h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod
h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.1/go.mod
h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA=
+github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU=
github.com/google/flatbuffers v1.11.0
h1:O7CEyB8Cb3/DmtxODGtLHcEvpr81Jm5qLg/hsHnxA2A=
github.com/google/flatbuffers v1.11.0/go.mod
h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/go-cmp v0.2.0/go.mod
h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
@@ -1722,6 +1729,7 @@ k8s.io/kubernetes v1.13.0/go.mod
h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk=
k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod
h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a/go.mod
h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20210930125809-cb0fa318a74b/go.mod
h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
+rsc.io/binaryregexp v0.2.0 h1:HfqmD5MEmC0zvwBuF187nq9mdnXjXsSivRiXN7SmRkE=
rsc.io/binaryregexp v0.2.0/go.mod
h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
diff --git a/sdks/go/pkg/beam/io/bigtableio/bigtable.go
b/sdks/go/pkg/beam/io/bigtableio/bigtable.go
new file mode 100644
index 00000000000..df7a6d98cd4
--- /dev/null
+++ b/sdks/go/pkg/beam/io/bigtableio/bigtable.go
@@ -0,0 +1,285 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package bigtableio provides transformations and utilities to interact with
+// Google Bigtable. See also: https://cloud.google.com/bigtable/docs
+package bigtableio
+
+import (
+ "context"
+ "fmt"
+ "hash/fnv"
+ "reflect"
+
+ "cloud.google.com/go/bigtable"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+ register.DoFn3x1[context.Context, int, func(*Mutation) bool,
error](&writeFn{})
+ register.DoFn3x1[context.Context, int, func(*Mutation) bool,
error](&writeBatchFn{})
+ register.Iter1[*Mutation]()
+}
+
+// Mutation represents a necessary serializable wrapper analogue
+// to bigtable.Mutation containing a rowKey and the operations to be applied.
+type Mutation struct {
+ RowKey string
+ Ops []Operation
+
+ // optional custom beam.GroupByKey key, default is a fixed key of 1.
+ GroupKey string
+}
+
+// Operation represents a raw change to be applied within a Mutation.
+type Operation struct {
+ Family string
+ Column string
+ Ts bigtable.Timestamp
+ Value []byte
+}
+
+// NewMutation returns a new *Mutation, analogue to bigtable.NewMutation().
+func NewMutation(rowKey string) *Mutation {
+ return &Mutation{RowKey: rowKey}
+}
+
+// Set sets a value in a specified column, with the given timestamp,
+// analogue to bigtable.Mutation.Set().
+// The timestamp will be truncated to millisecond granularity.
+// A timestamp of ServerTime means to use the server timestamp.
+func (m *Mutation) Set(family, column string, ts bigtable.Timestamp, value
[]byte) {
+ m.Ops = append(m.Ops, Operation{Family: family, Column: column, Ts: ts,
Value: value})
+}
+
+// WithGroupKey sets a custom group key to be utilised by beam.GroupByKey.
+func (m *Mutation) WithGroupKey(key string) *Mutation {
+ m.GroupKey = key
+ return m
+}
+
+// Write writes the elements of the given PCollection<bigtableio.Mutation> to
bigtable.
+func Write(s beam.Scope, project, instanceID, table string, col
beam.PCollection) {
+ t := col.Type().Type()
+ err := mustBeBigtableioMutation(t)
+ if err != nil {
+ panic(err)
+ }
+
+ s = s.Scope("bigtable.Write")
+
+ pre := beam.ParDo(s, addGroupKeyFn, col)
+ post := beam.GroupByKey(s, pre)
+ beam.ParDo0(s, &writeFn{Project: project, InstanceID: instanceID,
TableName: table, Type: beam.EncodedType{T: t}}, post)
+}
+
+// WriteBatch writes the elements of the given PCollection<bigtableio.Mutation>
+// to bigtable using bigtable.ApplyBulk().
+// For the underlying bigtable.ApplyBulk function to work properly
+// the maximum number of operations per bigtableio.Mutation of the input
+// PCollection must not be greater than 100,000. For more information
+// see https://cloud.google.com/bigtable/docs/writes#batch for more.
+func WriteBatch(s beam.Scope, project, instanceID, table string, col
beam.PCollection) {
+ t := col.Type().Type()
+ err := mustBeBigtableioMutation(t)
+ if err != nil {
+ panic(err)
+ }
+
+ s = s.Scope("bigtable.WriteBatch")
+
+ pre := beam.ParDo(s, addGroupKeyFn, col)
+ post := beam.GroupByKey(s, pre)
+ beam.ParDo0(s, &writeBatchFn{Project: project, InstanceID: instanceID,
TableName: table, Type: beam.EncodedType{T: t}}, post)
+}
+
+func addGroupKeyFn(mutation Mutation) (int, Mutation) {
+ if mutation.GroupKey != "" {
+ return hashStringToInt(mutation.GroupKey), mutation
+ }
+ return 1, mutation
+}
+
+func hashStringToInt(s string) int {
+ h := fnv.New32a()
+ h.Write([]byte(s))
+ return int(h.Sum32())
+}
+
+func mustBeBigtableioMutation(t reflect.Type) error {
+ if t != reflect.TypeOf(Mutation{}) {
+ return fmt.Errorf("type must be bigtableio.Mutation but is:
%v", t)
+ }
+ return nil
+}
+
+type writeFn struct {
+ // Project is the project
+ Project string `json:"project"`
+ // InstanceID is the bigtable instanceID
+ InstanceID string `json:"instanceId"`
+ // Client is the bigtable.Client
+ client *bigtable.Client `json:"-"`
+ // TableName is the qualified table identifier.
+ TableName string `json:"tableName"`
+ // Table is a bigtable.Table instance with an eventual open connection
+ table *bigtable.Table `json:"-"`
+ // Type is the encoded schema type.
+ Type beam.EncodedType `json:"type"`
+}
+
+func (f *writeFn) Setup(ctx context.Context) error {
+ var err error
+ f.client, err = bigtable.NewClient(ctx, f.Project, f.InstanceID)
+ if err != nil {
+ return fmt.Errorf("could not create data operations client:
%v", err)
+ }
+
+ f.table = f.client.Open(f.TableName)
+ return nil
+}
+
+func (f *writeFn) Teardown() error {
+ if err := f.client.Close(); err != nil {
+ return fmt.Errorf("could not close data operations client: %v",
err)
+ }
+ return nil
+}
+
+func (f *writeFn) ProcessElement(ctx context.Context, key int, values
func(*Mutation) bool) error {
+
+ var mutation Mutation
+ for values(&mutation) {
+
+ err := validateMutation(mutation)
+ if err != nil {
+ return fmt.Errorf("invalid bigtableio.Mutation: %s",
err)
+ }
+
+ err = f.table.Apply(ctx, mutation.RowKey,
getBigtableMutation(mutation))
+ if err != nil {
+ return fmt.Errorf("could not apply mutation for row
key='%s': %v", mutation.RowKey, err)
+ }
+
+ }
+
+ return nil
+}
+
+type writeBatchFn struct {
+ // Project is the project
+ Project string `json:"project"`
+ // InstanceID is the bigtable instanceID
+ InstanceID string `json:"instanceId"`
+ // Client is the bigtable.Client
+ client *bigtable.Client `json:"-"`
+ // TableName is the qualified table identifier.
+ TableName string `json:"tableName"`
+ // Table is a bigtable.Table instance with an eventual open connection
+ table *bigtable.Table `json:"-"`
+ // Type is the encoded schema type.
+ Type beam.EncodedType `json:"type"`
+}
+
+func (f *writeBatchFn) Setup(ctx context.Context) error {
+ var err error
+ f.client, err = bigtable.NewClient(ctx, f.Project, f.InstanceID)
+ if err != nil {
+ return fmt.Errorf("could not create data operations client:
%v", err)
+ }
+
+ f.table = f.client.Open(f.TableName)
+ return nil
+}
+
+func (f *writeBatchFn) Teardown() error {
+ if err := f.client.Close(); err != nil {
+ return fmt.Errorf("could not close data operations client: %v",
err)
+ }
+ return nil
+}
+
+func (f *writeBatchFn) ProcessElement(ctx context.Context, key int, values
func(*Mutation) bool) error {
+
+ var rowKeysInBatch []string
+ var mutationsInBatch []*bigtable.Mutation
+
+ // opsAddedToBatch is used to make sure that one batch does not include
more than 100000 operations/mutations
+ opsAddedToBatch := 0
+
+ var mutation Mutation
+ for values(&mutation) {
+
+ err := validateMutation(mutation)
+ if err != nil {
+ return fmt.Errorf("invalid bigtableio.Mutation: %s",
err)
+ }
+
+ opsInMutation := len(mutation.Ops)
+
+ if (opsAddedToBatch + opsInMutation) > 100000 {
+ err := tryApplyBulk(f.table.ApplyBulk(ctx,
rowKeysInBatch, mutationsInBatch))
+ if err != nil {
+ return err
+ }
+
+ rowKeysInBatch = nil
+ mutationsInBatch = nil
+ opsAddedToBatch = 0
+ }
+
+ rowKeysInBatch = append(rowKeysInBatch, mutation.RowKey)
+ mutationsInBatch = append(mutationsInBatch,
getBigtableMutation(mutation))
+ opsAddedToBatch += len(mutation.Ops)
+
+ }
+
+ if len(rowKeysInBatch) != 0 && len(mutationsInBatch) != 0 {
+ err := tryApplyBulk(f.table.ApplyBulk(ctx, rowKeysInBatch,
mutationsInBatch))
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func validateMutation(mutation Mutation) error {
+ if len(mutation.Ops) > 100000 {
+ return fmt.Errorf("one instance of bigtableio.Mutation must not
have more than 100,000 operations/mutations, see
https://cloud.google.com/bigtable/docs/writes#batch")
+ }
+ return nil
+}
+
+func tryApplyBulk(errs []error, processErr error) error {
+ if processErr != nil {
+ return fmt.Errorf("bulk apply procces failed: %v", processErr)
+ }
+ for _, err := range errs {
+ if err != nil {
+ return fmt.Errorf("could not apply mutation: %v", err)
+ }
+ }
+ return nil
+}
+
+func getBigtableMutation(mutation Mutation) *bigtable.Mutation {
+ bigtableMutation := bigtable.NewMutation()
+ for _, m := range mutation.Ops {
+ bigtableMutation.Set(m.Family, m.Column, m.Ts, m.Value)
+ }
+ return bigtableMutation
+}
diff --git a/sdks/go/pkg/beam/io/bigtableio/bigtable_test.go
b/sdks/go/pkg/beam/io/bigtableio/bigtable_test.go
new file mode 100644
index 00000000000..1294a28bf7b
--- /dev/null
+++ b/sdks/go/pkg/beam/io/bigtableio/bigtable_test.go
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package bigtableio
+
+import (
+ "errors"
+ "fmt"
+ "reflect"
+ "testing"
+
+ "cloud.google.com/go/bigtable"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+)
+
+func TestHashStringToInt(t *testing.T) {
+ equalVal := "equal"
+
+ firstHash := hashStringToInt(equalVal)
+ secondHash := hashStringToInt(equalVal)
+
+ if firstHash != secondHash {
+ t.Errorf("hashStringToInt(\"%s\") should equal
hashStringToInt(\"%s\")", equalVal, equalVal)
+ }
+
+ if hashStringToInt("helloWorld") == hashStringToInt("helloworld") {
+ t.Error("hashStringToInt(\"helloWorld\") should not equal
hashStringToInt(\"helloworld\")")
+ }
+
+ if hashStringToInt("saturnsmoon1") == hashStringToInt("saturnsmoon2") {
+ t.Error("hashStringToInt(\"saturnsmoon1\") should not equal
hashStringToInt(\"saturnsmoon2\")")
+ }
+}
+
+func TestAddGroupKeyFnGroupKeyGiven(t *testing.T) {
+ mutationWithGroupKey := NewMutation("rowKey").WithGroupKey("1")
+ groupKey, _ := addGroupKeyFn(*mutationWithGroupKey)
+ if groupKey == 1 {
+ t.Error("addGroupKeyFn should hash groupKey values properly,
but projected \"1\" -> 1")
+ }
+}
+
+func TestAddGroupKeyFnNoGroupKeyGiven(t *testing.T) {
+ mutationNoGroupKey := NewMutation("rowKey")
+ groupKey, _ := addGroupKeyFn(*mutationNoGroupKey)
+ if groupKey != 1 {
+ t.Errorf("addGroupKeyFn should assign 1 as hash if no groupKey
is given, but projected nil -> %d", groupKey)
+ }
+}
+
+func TestMustBeBigtableioMutation(t *testing.T) {
+
+ mutation := NewMutation("key")
+ mutation.Set("family", "column", 0, []byte{})
+
+ mutationWithGroupKey := NewMutation("key").WithGroupKey("groupKey")
+ mutationWithGroupKey.Set("family", "column", 0, []byte{})
+
+ passValues := []Mutation{
+ {},
+ {RowKey: "key"},
+ {Ops: []Operation{{}}},
+ *mutation,
+ *mutationWithGroupKey,
+ }
+
+ for _, passValue := range passValues {
+ passType := reflect.TypeOf(passValue)
+ err := mustBeBigtableioMutation(passType)
+ if err != nil {
+ t.Errorf("input type %v should be considered a
bigtableio.Mutation", passType)
+ }
+ }
+}
+
+func TestMustNotBeBigtableioMutation(t *testing.T) {
+ failValues := []interface{}{
+ 1,
+ 1.0,
+ "strings must fail",
+ errors.New("errors must fail"),
+ }
+
+ for _, failValue := range failValues {
+ failType := reflect.TypeOf(reflect.ValueOf(failValue))
+ err := mustBeBigtableioMutation(failType)
+ if err == nil {
+ t.Errorf("input type %v should not be considered a
bigtableio.Mutation", failType)
+ }
+ }
+}
+
+func TestTryApplyBulk(t *testing.T) {
+ err := tryApplyBulk(nil, nil)
+ if err != nil {
+ t.Error("tryApplyBulk should not return an error for inputs
<nil, nil> but returned:\n", err)
+ }
+
+ err = tryApplyBulk(nil, errors.New("error"))
+ if err == nil {
+ t.Error("tryApplyBulk should return an error for inputs <nil,
error>")
+ }
+
+ err = tryApplyBulk([]error{errors.New("error")}, nil)
+ if err == nil {
+ t.Error("tryApplyBulk should return an error for inputs
<[]error, nil>")
+ }
+}
+
+func TestValidateMutationSucceedsWhenZeroOps(t *testing.T) {
+ validMutation := NewMutation("rowKey")
+
+ err := validateMutation(*validMutation)
+ if err != nil {
+ t.Errorf("mutation (0 ops) should be valid, but was marked
invalid: %s", err)
+ }
+}
+
+func TestValidateMutationSucceedsWhenLessThanOrEqualHundredKOps(t *testing.T) {
+ validMutation := NewMutation("rowKey")
+
+ for i := 0; i < 100000; i++ {
+ validMutation.Set("family", fmt.Sprint(i), bigtable.Now(),
[]byte{})
+ }
+
+ err := validateMutation(*validMutation)
+ if err != nil {
+ t.Errorf("mutation (100,000 ops) should be valid, but was
marked invalid: %s", err)
+ }
+}
+
+func TestValidateMutationFailsWhenGreaterThanHundredKOps(t *testing.T) {
+ validMutation := NewMutation("rowKey")
+
+ for i := 0; i < 100001; i++ {
+ validMutation.Set("family", fmt.Sprint(i), bigtable.Now(),
[]byte{})
+ }
+
+ err := validateMutation(*validMutation)
+ if err == nil {
+ t.Error("mutation (100,001 ops) should be invalid, but was
marked valid")
+ }
+}
+
+// Examples:
+
+func ExampleWriteBatch() {
+ pipeline := beam.NewPipeline()
+ s := pipeline.Root()
+
+ //sample PBCollection<bigtableio.Mutation>
+ bigtableioMutationCol := beam.CreateList(s, func() []Mutation {
+ columnFamilyName := "stats_summary"
+ timestamp := bigtable.Now()
+
+ // var muts []bigtableio.Mutation
+ var muts []Mutation
+
+ deviceA := "tablet"
+ rowKeyA := deviceA + "#a0b81f74#20190501"
+
+ // bigtableio.NewMutation(rowKeyA).WithGroupKey(deviceA)
+ mutA := NewMutation(rowKeyA).WithGroupKey(deviceA) // this
groups bundles by device identifiers
+ mutA.Set(columnFamilyName, "connected_wifi", timestamp,
[]byte("1"))
+ mutA.Set(columnFamilyName, "os_build", timestamp,
[]byte("12155.0.0-rc1"))
+
+ muts = append(muts, *mutA)
+
+ deviceB := "phone"
+ rowKeyB := deviceB + "#a0b81f74#20190502"
+
+ mutB := NewMutation(rowKeyB).WithGroupKey(deviceB)
+ mutB.Set(columnFamilyName, "connected_wifi", timestamp,
[]byte("1"))
+ mutB.Set(columnFamilyName, "os_build", timestamp,
[]byte("12145.0.0-rc6"))
+
+ muts = append(muts, *mutB)
+
+ return muts
+ }())
+
+ // bigtableio.WriteBatch(...)
+ WriteBatch(s, "project", "instanceId", "tableName",
bigtableioMutationCol)
+}