lostluck commented on code in PR #26042:
URL: https://github.com/apache/beam/pull/26042#discussion_r1156128691


##########
sdks/go/pkg/beam/io/spannerio/common.go:
##########
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package spannerio provides an API for reading and writing resouces to
+// Google Spanner datastores.
+package spannerio
+
+import (
+       "cloud.google.com/go/spanner"
+       "context"
+       "fmt"
+)

Review Comment:
   Minor nit: Most of the project ends up also running 
[`goimports`](https://pkg.go.dev/golang.org/x/tools/cmd/goimports) as well, 
which separates the standard library imports from other imports into two 
blocks. Commenting since it's inconsistent with most of Beam Go.
   
   Definitely a minor personal choice, since I don't think we have any tooling 
that mandates this. (Though, the Github Action that checks that fails on an 
earlier step for this PR 
https://github.com/apache/beam/actions/runs/4589437759/jobs/8104324444?pr=26042,
 so it still might.)



##########
sdks/go/pkg/beam/io/spannerio/read_batch_integration_test.go:
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package spannerio
+
+import (
+       "cloud.google.com/go/spanner"
+       database "cloud.google.com/go/spanner/admin/database/apiv1"
+       "context"
+       "flag"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+       adminpb 
"google.golang.org/genproto/googleapis/spanner/admin/database/v1"
+       "reflect"
+       "testing"
+)
+
+var (
+       integrationTests = flag.Bool("int", false, "")
+)
+
+func TestExampleQueryBatch(t *testing.T) {
+       if !*integrationTests {
+               t.Skip("Not running in integration test mode.")
+       }
+
+       p := beam.NewPipeline()
+       s := p.Root()
+
+       db := 
"projects/test-project/instances/test-instance/databases/test-database"
+
+       srv, srvCleanup := newServer(t)
+       defer srvCleanup()
+
+       client, admin, cleanup, err := createFakeClient(srv.Addr, db)
+       if err != nil {
+               t.Fatalf("Unable to create fake client: %v", err)
+       }
+       defer cleanup()
+
+       populateSpanner(context.Background(), admin, db, client)
+
+       rows := QueryBatch(s, db, "SELECT * FROM TEST", 
reflect.TypeOf(TestDto{}))
+
+       ptest.RunAndValidate(t, p)
+       passert.Count(s, rows, "Should have 4 rows", 4)
+}
+
+func populateSpanner(ctx context.Context, admin *database.DatabaseAdminClient, 
db string, client *spanner.Client) error {
+       iter := client.Single().Query(ctx, spanner.Statement{SQL: "SELECT 1 
FROM Test"})
+       defer iter.Stop()
+
+       if _, err := iter.Next(); err == nil {
+               return nil
+       }
+
+       op, err := admin.UpdateDatabaseDdl(ctx, 
&adminpb.UpdateDatabaseDdlRequest{
+               Database: db,
+               Statements: []string{`CREATE TABLE Test (
+                                       One STRING(20),
+                                       Two INT64,
+                               ) PRIMARY KEY (Two)`},
+       })
+
+       if err != nil {
+               return err
+       }
+
+       if err := op.Wait(context.Background()); err != nil {
+               return err
+       }
+
+       testRows := []TestDto{
+               {
+                       One: "one",
+                       Two: 1,
+               },
+               {
+                       One: "one",
+                       Two: 2,
+               },
+               {
+                       One: "one",
+                       Two: 3,
+               },
+               {
+                       One: "one",
+                       Two: 4,
+               },
+       }
+
+       var mutations []*spanner.Mutation
+       for _, m := range testRows {
+               mutation, err := spanner.InsertStruct("Test", m)
+               if err != nil {
+                       return err
+               }
+
+               mutations = append(mutations, mutation)
+       }
+
+       _, err = client.Apply(context.Background(), mutations)
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+func TestMain(t *testing.M) {
+       flag.Parse()
+       t.Run()
+}

Review Comment:
   This should be calling 
[`ptest.Main`](https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest#Main)



##########
sdks/go/pkg/beam/io/spannerio/generate_partitions.go:
##########
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package spannerio provides an API for reading and writing resources to
+// Google Spanner datastores.

Review Comment:
   It's not recommended to have a packagedoc comment on every file. All the 
files need the license header, but it's best to only have the doc comment in a 
single file. Having it in common.go would be appropriate for this package.
   
   See https://google.github.io/styleguide/go/decisions#package-comments
   
   ```suggestion
   ```



##########
sdks/go/pkg/beam/io/spannerio/generate_partitions_test.go:
##########
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package spannerio provides an API for reading and writing resources to
+// Google Spanner datastores.
+
+package spannerio
+
+import (
+       "cloud.google.com/go/spanner"
+       "context"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+       "testing"
+)
+
+func TestGeneratePartitions(t *testing.T) {
+       database := 
"projects/fake-proj/instances/fake-instance/databases/fake-db-4-rows"
+
+       query := "SELECT * from Test"
+
+       p := beam.NewPipeline()
+       s := p.Root()
+
+       fn := newGeneratePartitionsFn(database, query)
+       fn.generator = &partitionGeneratorStub{}
+
+       partitions := fn.generatePartitions(s)

Review Comment:
   I'll note that this manner of pipeline testing doesn't work well for 
Portable beam. This at best, works for the Direct runner, and no other runners 
(like, flink, prism, dataflow etc).



##########
sdks/go/pkg/beam/io/spannerio/generate_partitions_test.go:
##########
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package spannerio provides an API for reading and writing resources to
+// Google Spanner datastores.
+
+package spannerio

Review Comment:
   ```suggestion
   package spannerio
   ```



##########
sdks/go/pkg/beam/io/spannerio/read_batch_integration_test.go:
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package spannerio
+
+import (
+       "cloud.google.com/go/spanner"
+       database "cloud.google.com/go/spanner/admin/database/apiv1"
+       "context"
+       "flag"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+       adminpb 
"google.golang.org/genproto/googleapis/spanner/admin/database/v1"
+       "reflect"
+       "testing"
+)
+
+var (
+       integrationTests = flag.Bool("int", false, "")
+)
+
+func TestExampleQueryBatch(t *testing.T) {
+       if !*integrationTests {
+               t.Skip("Not running in integration test mode.")
+       }
+
+       p := beam.NewPipeline()
+       s := p.Root()
+
+       db := 
"projects/test-project/instances/test-instance/databases/test-database"
+
+       srv, srvCleanup := newServer(t)
+       defer srvCleanup()
+
+       client, admin, cleanup, err := createFakeClient(srv.Addr, db)
+       if err != nil {
+               t.Fatalf("Unable to create fake client: %v", err)
+       }
+       defer cleanup()
+
+       populateSpanner(context.Background(), admin, db, client)
+
+       rows := QueryBatch(s, db, "SELECT * FROM TEST", 
reflect.TypeOf(TestDto{}))
+
+       ptest.RunAndValidate(t, p)
+       passert.Count(s, rows, "Should have 4 rows", 4)
+}
+
+func populateSpanner(ctx context.Context, admin *database.DatabaseAdminClient, 
db string, client *spanner.Client) error {
+       iter := client.Single().Query(ctx, spanner.Statement{SQL: "SELECT 1 
FROM Test"})
+       defer iter.Stop()
+
+       if _, err := iter.Next(); err == nil {
+               return nil
+       }

Review Comment:
   This section doesn't appear to be doing anything WRT "populating Spanner". 
If it's necessary, please explain why in a comment, and augment the returned 
error with this sort of detail as well. Otherwise, please remove.



##########
sdks/go/pkg/beam/io/spannerio/read_batch_integration_test.go:
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package spannerio
+
+import (
+       "cloud.google.com/go/spanner"
+       database "cloud.google.com/go/spanner/admin/database/apiv1"
+       "context"
+       "flag"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+       adminpb 
"google.golang.org/genproto/googleapis/spanner/admin/database/v1"
+       "reflect"
+       "testing"
+)
+
+var (
+       integrationTests = flag.Bool("int", false, "")
+)
+
+func TestExampleQueryBatch(t *testing.T) {
+       if !*integrationTests {
+               t.Skip("Not running in integration test mode.")
+       }
+
+       p := beam.NewPipeline()
+       s := p.Root()
+
+       db := 
"projects/test-project/instances/test-instance/databases/test-database"
+
+       srv, srvCleanup := newServer(t)
+       defer srvCleanup()
+
+       client, admin, cleanup, err := createFakeClient(srv.Addr, db)
+       if err != nil {
+               t.Fatalf("Unable to create fake client: %v", err)
+       }
+       defer cleanup()
+
+       populateSpanner(context.Background(), admin, db, client)
+
+       rows := QueryBatch(s, db, "SELECT * FROM TEST", 
reflect.TypeOf(TestDto{}))
+
+       ptest.RunAndValidate(t, p)
+       passert.Count(s, rows, "Should have 4 rows", 4)
+}
+
+func populateSpanner(ctx context.Context, admin *database.DatabaseAdminClient, 
db string, client *spanner.Client) error {
+       iter := client.Single().Query(ctx, spanner.Statement{SQL: "SELECT 1 
FROM Test"})
+       defer iter.Stop()
+
+       if _, err := iter.Next(); err == nil {
+               return nil
+       }
+
+       op, err := admin.UpdateDatabaseDdl(ctx, 
&adminpb.UpdateDatabaseDdlRequest{
+               Database: db,
+               Statements: []string{`CREATE TABLE Test (
+                                       One STRING(20),
+                                       Two INT64,
+                               ) PRIMARY KEY (Two)`},
+       })
+
+       if err != nil {
+               return err
+       }
+
+       if err := op.Wait(context.Background()); err != nil {

Review Comment:
   A context is being passed in, use that instead of creating a new one or 
comment why the passed in context is being ignored.



##########
sdks/go/pkg/beam/io/spannerio/read_batch_integration_test.go:
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package spannerio
+
+import (
+       "cloud.google.com/go/spanner"
+       database "cloud.google.com/go/spanner/admin/database/apiv1"
+       "context"
+       "flag"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+       adminpb 
"google.golang.org/genproto/googleapis/spanner/admin/database/v1"
+       "reflect"
+       "testing"
+)
+
+var (
+       integrationTests = flag.Bool("int", false, "")
+)
+
+func TestExampleQueryBatch(t *testing.T) {
+       if !*integrationTests {
+               t.Skip("Not running in integration test mode.")
+       }

Review Comment:
   Please put integration tests in an appropriate directory in 
https://github.com/apache/beam/tree/master/sdks/go/test/integration/io and use 
the existing infrastructure for integration tests. This will allow the test to 
run on several runners.
   
   Although, I'll note that as written, this test won't work as it's using a 
fake client/server, like the other tests, but notably, the [spannertest fake 
doesn't support partitions at this 
time](https://pkg.go.dev/cloud.google.com/go/spanner/spannertest#section-readme)
   
   It would be important for any integration tests against real spanner would 
need to take care to clean up after itself so we don't waste beam project 
resources.



##########
sdks/go/pkg/beam/io/spannerio/read_batch_integration_test.go:
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package spannerio
+
+import (
+       "cloud.google.com/go/spanner"
+       database "cloud.google.com/go/spanner/admin/database/apiv1"
+       "context"
+       "flag"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+       adminpb 
"google.golang.org/genproto/googleapis/spanner/admin/database/v1"
+       "reflect"
+       "testing"
+)
+
+var (
+       integrationTests = flag.Bool("int", false, "")
+)
+
+func TestExampleQueryBatch(t *testing.T) {
+       if !*integrationTests {
+               t.Skip("Not running in integration test mode.")
+       }
+
+       p := beam.NewPipeline()
+       s := p.Root()
+
+       db := 
"projects/test-project/instances/test-instance/databases/test-database"
+
+       srv, srvCleanup := newServer(t)
+       defer srvCleanup()
+
+       client, admin, cleanup, err := createFakeClient(srv.Addr, db)
+       if err != nil {
+               t.Fatalf("Unable to create fake client: %v", err)
+       }
+       defer cleanup()
+
+       populateSpanner(context.Background(), admin, db, client)
+
+       rows := QueryBatch(s, db, "SELECT * FROM TEST", 
reflect.TypeOf(TestDto{}))
+
+       ptest.RunAndValidate(t, p)
+       passert.Count(s, rows, "Should have 4 rows", 4)

Review Comment:
   Having the count *after* the run and validate does nothing. The `passert`s 
are executed as part of a pipeline, and can't be added afterwards.



##########
sdks/go/pkg/beam/io/spannerio/write.go:
##########
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package spannerio provides an API for reading and writing resouces to
+// Google Spanner datastores.
+package spannerio

Review Comment:
   ```suggestion
   
   package spannerio
   ```



##########
sdks/go/pkg/beam/io/spannerio/read.go:
##########
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package spannerio provides an API for reading and writing resouces to
+// Google Spanner datastores.
+package spannerio

Review Comment:
   ```suggestion
   package spannerio
   ```



##########
sdks/go/pkg/beam/io/spannerio/read_batch_integration_test.go:
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package spannerio
+
+import (
+       "cloud.google.com/go/spanner"
+       database "cloud.google.com/go/spanner/admin/database/apiv1"
+       "context"
+       "flag"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+       adminpb 
"google.golang.org/genproto/googleapis/spanner/admin/database/v1"
+       "reflect"
+       "testing"
+)
+
+var (
+       integrationTests = flag.Bool("int", false, "")
+)
+
+func TestExampleQueryBatch(t *testing.T) {
+       if !*integrationTests {
+               t.Skip("Not running in integration test mode.")
+       }
+
+       p := beam.NewPipeline()
+       s := p.Root()
+
+       db := 
"projects/test-project/instances/test-instance/databases/test-database"
+
+       srv, srvCleanup := newServer(t)
+       defer srvCleanup()
+
+       client, admin, cleanup, err := createFakeClient(srv.Addr, db)
+       if err != nil {
+               t.Fatalf("Unable to create fake client: %v", err)
+       }
+       defer cleanup()
+
+       populateSpanner(context.Background(), admin, db, client)
+
+       rows := QueryBatch(s, db, "SELECT * FROM TEST", 
reflect.TypeOf(TestDto{}))
+
+       ptest.RunAndValidate(t, p)
+       passert.Count(s, rows, "Should have 4 rows", 4)
+}
+
+func populateSpanner(ctx context.Context, admin *database.DatabaseAdminClient, 
db string, client *spanner.Client) error {
+       iter := client.Single().Query(ctx, spanner.Statement{SQL: "SELECT 1 
FROM Test"})
+       defer iter.Stop()
+
+       if _, err := iter.Next(); err == nil {
+               return nil
+       }
+
+       op, err := admin.UpdateDatabaseDdl(ctx, 
&adminpb.UpdateDatabaseDdlRequest{
+               Database: db,
+               Statements: []string{`CREATE TABLE Test (
+                                       One STRING(20),
+                                       Two INT64,
+                               ) PRIMARY KEY (Two)`},
+       })
+
+       if err != nil {
+               return err
+       }
+
+       if err := op.Wait(context.Background()); err != nil {
+               return err
+       }
+
+       testRows := []TestDto{
+               {
+                       One: "one",
+                       Two: 1,
+               },
+               {
+                       One: "one",
+                       Two: 2,
+               },
+               {
+                       One: "one",
+                       Two: 3,
+               },
+               {
+                       One: "one",
+                       Two: 4,
+               },
+       }
+
+       var mutations []*spanner.Mutation
+       for _, m := range testRows {
+               mutation, err := spanner.InsertStruct("Test", m)
+               if err != nil {
+                       return err
+               }
+
+               mutations = append(mutations, mutation)
+       }
+
+       _, err = client.Apply(context.Background(), mutations)
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+func TestMain(t *testing.M) {
+       flag.Parse()
+       t.Run()
+}

Review Comment:
   This should be calling 
[`ptest.Main`](https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest#Main)
 instead, which ensures test pipelines can be executed properly on arbitrary 
runners. 
   
   ```suggestion
   func TestMain(t *testing.M) {
        ptest.Main(m)
   }
   ```



##########
sdks/go/pkg/beam/io/spannerio/generate_partitions.go:
##########
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package spannerio provides an API for reading and writing resources to
+// Google Spanner datastores.
+package spannerio
+
+import (
+       "cloud.google.com/go/spanner"
+       "context"
+       "fmt"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+       register.DoFn3x1[context.Context, []byte, func(read *PartitionedRead), 
error]((*generatePartitionsFn)(nil))
+       register.Emitter1[*PartitionedRead]()
+}
+
+type generatePartitionsFn struct {
+       spannerFn
+       Query     string            `json:"query"`   // Table is the table 
identifier.
+       Options   queryBatchOptions `json:"options"` // Options specifies 
additional query execution options.
+       generator partitionGenerator
+}
+
+func (f *generatePartitionsFn) Setup(ctx context.Context) error {
+       err := f.spannerFn.Setup(ctx)
+
+       if f.generator == nil {
+               f.generator = newPartitionGenerator(f.client)
+       }
+
+       return err
+}
+
+func (f *generatePartitionsFn) Teardown() {
+       f.spannerFn.Teardown()
+}
+
+func partitionOptions(options queryBatchOptions) spanner.PartitionOptions {
+       partitionOptions := spanner.PartitionOptions{}
+
+       if options.MaxPartitions != 0 {
+               partitionOptions.MaxPartitions = options.MaxPartitions
+       }
+
+       return partitionOptions
+}
+
+// GeneratePartitions generates read partitions to support batched reading 
from Spanner.
+func GeneratePartitions(s beam.Scope, db string, query string, options 
...func(*queryBatchOptions) error) beam.PCollection {
+       s.Scope("spanner.GeneratePartitions")
+
+       fn := newGeneratePartitionsFn(db, query, options...)
+       return fn.generatePartitions(s)
+}
+
+func newGeneratePartitionsFn(
+       db string,
+       query string,
+       options ...func(*queryBatchOptions) error,
+) *generatePartitionsFn {
+       if db == "" {
+               panic("no database provided")
+       }
+
+       opts := queryBatchOptions{}
+       for _, opt := range options {
+               if err := opt(&opts); err != nil {
+                       panic(err)
+               }
+       }
+
+       return &generatePartitionsFn{
+               spannerFn: newSpannerFn(db),
+               Query:     query,
+               Options:   opts,
+       }
+}
+
+func (f *generatePartitionsFn) generatePartitions(s beam.Scope) 
beam.PCollection {
+       imp := beam.Impulse(s)
+       return beam.ParDo(s, f, imp)
+}
+
+func (f *generatePartitionsFn) ProcessElement(ctx context.Context, _ []byte, 
emit func(*PartitionedRead)) error {
+       txnId, partitions := f.generator.generate(ctx, 
f.Options.TimestampBound, f.Query, partitionOptions(f.Options))
+
+       for _, p := range partitions {
+               emit(NewPartitionedRead(txnId, p))
+       }
+
+       return nil
+}
+
+type partitionGenerator interface {
+       generate(
+               ctx context.Context,
+               tb spanner.TimestampBound,
+               query string,
+               opts spanner.PartitionOptions,
+       ) (spanner.BatchReadOnlyTransactionID, []*spanner.Partition)
+}
+
+type partitionGeneratorImpl struct {
+       client *spanner.Client
+}
+
+func newPartitionGenerator(client *spanner.Client) partitionGenerator {
+       return &partitionGeneratorImpl{client}
+}
+
+func (g *partitionGeneratorImpl) generate(
+       ctx context.Context,
+       tb spanner.TimestampBound,
+       query string,
+       opts spanner.PartitionOptions,
+) (spanner.BatchReadOnlyTransactionID, []*spanner.Partition) {
+       txn, err := g.client.BatchReadOnlyTransaction(ctx, tb)
+       if err != nil {
+               panic("unable to create batch read only transaction: " + 
err.Error())
+       }
+       defer txn.Close()
+
+       if txn == nil {
+               panic("unable to create a spanner transaction")
+       }

Review Comment:
   I'd remove this check, as it's simply untestable code.
   
   If the construction method `BatchReadOnlyTransaction` doesn't return an 
error and doesn't document that it can return a nil, its convention to assume 
that it's not nil, so it's not necessary to check here. 
   
   Further, if the call to PartitionQuery was called with a nil transaction, 
some internal detail would then fail with a panic, which would lead us back 
here anyway, so while it short cuts that investigation somewhat, it also 
shouldn't be happening in the first place.



##########
sdks/go/pkg/beam/io/spannerio/test_utils.go:
##########
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package spannerio
+
+import (
+       "cloud.google.com/go/spanner"
+       db "cloud.google.com/go/spanner/admin/database/apiv1"
+       "cloud.google.com/go/spanner/spannertest"
+       "context"
+       "google.golang.org/api/option"
+       "google.golang.org/grpc"
+       "testing"
+)
+
+type TestDto struct {
+       One string `spanner:"One"`
+       Two int64  `spanner:"Two"`
+}
+
+func newServer(t *testing.T) (*spannertest.Server, func()) {
+       srv, err := spannertest.NewServer("localhost:0")
+       if err != nil {
+               t.Fatalf("Starting in-memory fake spanner: %v", err)
+       }
+
+       return srv, func() {
+               srv.Close()
+       }
+}
+
+func createFakeClient(address string, database string) (*spanner.Client, 
*db.DatabaseAdminClient, func(), error) {

Review Comment:
   We can simplify things for the testing since Spanner has an env variable 
override we can use.
   
   See `SPANNER_EMULATOR_HOST` in the spannertest docs.
   https://pkg.go.dev/cloud.google.com/go/spanner/spannertest#section-readme
   
   This lets us use the normal client paths, and may be able to work better on 
portable runners at least for unit test purposes.
   
   Alternatively, we add another exported field to the spannerFn to put in the 
server address, and when that's present, we provide the appropriate connection 
option for the fake override. While this does move some test specific code into 
the production code, it makes testing much easier.



##########
sdks/go/pkg/beam/io/spannerio/read_batch.go:
##########
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package spannerio provides an API for reading and writing resources to
+// Google Spanner datastores.
+package spannerio

Review Comment:
   ```suggestion
   
   package spannerio
   ```



##########
sdks/go/pkg/beam/io/spannerio/read_batch_integration_test.go:
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package spannerio
+
+import (
+       "cloud.google.com/go/spanner"
+       database "cloud.google.com/go/spanner/admin/database/apiv1"
+       "context"
+       "flag"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+       adminpb 
"google.golang.org/genproto/googleapis/spanner/admin/database/v1"
+       "reflect"
+       "testing"
+)
+
+var (
+       integrationTests = flag.Bool("int", false, "")
+)
+
+func TestExampleQueryBatch(t *testing.T) {
+       if !*integrationTests {
+               t.Skip("Not running in integration test mode.")
+       }
+
+       p := beam.NewPipeline()
+       s := p.Root()
+
+       db := 
"projects/test-project/instances/test-instance/databases/test-database"
+
+       srv, srvCleanup := newServer(t)
+       defer srvCleanup()
+
+       client, admin, cleanup, err := createFakeClient(srv.Addr, db)
+       if err != nil {
+               t.Fatalf("Unable to create fake client: %v", err)
+       }
+       defer cleanup()
+
+       populateSpanner(context.Background(), admin, db, client)
+
+       rows := QueryBatch(s, db, "SELECT * FROM TEST", 
reflect.TypeOf(TestDto{}))
+
+       ptest.RunAndValidate(t, p)
+       passert.Count(s, rows, "Should have 4 rows", 4)
+}
+
+func populateSpanner(ctx context.Context, admin *database.DatabaseAdminClient, 
db string, client *spanner.Client) error {

Review Comment:
   Since this is part of test set up it should take in a `*testing.T` and call 
`t.Helper()`. All errors should then be logged with the `t.Error` or `t.Fatal` 
etc, assuming they block execution of the test. 
   
   As it stands, all the errors are ignored, which would make debugging 
difficult.
   
   See https://google.github.io/styleguide/go/decisions#test-helpers



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to