lostluck commented on a change in pull request #17173:
URL: https://github.com/apache/beam/pull/17173#discussion_r837666622



##########
File path: sdks/go/pkg/beam/io/datastoreio/datastore.go
##########
@@ -55,20 +64,25 @@ func init() {
 // datastoreio.Read(s, "project", "Item", 256, reflect.TypeOf(Item{}), itemKey)
 func Read(s beam.Scope, project, kind string, shards int, t reflect.Type, 
typeKey string) beam.PCollection {
        s = s.Scope("datastore.Read")
-       return query(s, project, kind, shards, t, typeKey)
+       return query(s, project, kind, shards, t, typeKey, datastoreNewClient)
 }
 
-func query(s beam.Scope, project, kind string, shards int, t reflect.Type, 
typeKey string) beam.PCollection {
+func datastoreNewClient(ctx context.Context, projectID string, opts 
...option.ClientOption) (clientType, error) {
+       return datastore.NewClient(ctx, projectID, opts...)
+}
+
+func query(s beam.Scope, project, kind string, shards int, t reflect.Type, 
typeKey string, newClient newClientFuncType) beam.PCollection {
        imp := beam.Impulse(s)
-       ex := beam.ParDo(s, &splitQueryFn{Project: project, Kind: kind, Shards: 
shards}, imp)
+       ex := beam.ParDo(s, &splitQueryFn{Project: project, Kind: kind, Shards: 
shards, newClientFunc: newClient}, imp)
        g := beam.GroupByKey(s, ex)
-       return beam.ParDo(s, &queryFn{Project: project, Kind: kind, Type: 
typeKey}, g, beam.TypeDefinition{Var: beam.XType, T: t})
+       return beam.ParDo(s, &queryFn{Project: project, Kind: kind, Type: 
typeKey, newClientFunc: newClient}, g, beam.TypeDefinition{Var: beam.XType, T: 
t})
 }
 
 type splitQueryFn struct {
-       Project string `json:"project"`
-       Kind    string `json:"kind"`
-       Shards  int    `json:"shards"`
+       Project       string `json:"project"`
+       Kind          string `json:"kind"`
+       Shards        int    `json:"shards"`
+       newClientFunc newClientFuncType

Review comment:
       Same thing here, a `Setup` method, to set up the real datastore call.

##########
File path: sdks/go/pkg/beam/io/datastoreio/datastore.go
##########
@@ -55,20 +64,25 @@ func init() {
 // datastoreio.Read(s, "project", "Item", 256, reflect.TypeOf(Item{}), itemKey)
 func Read(s beam.Scope, project, kind string, shards int, t reflect.Type, 
typeKey string) beam.PCollection {
        s = s.Scope("datastore.Read")
-       return query(s, project, kind, shards, t, typeKey)
+       return query(s, project, kind, shards, t, typeKey, datastoreNewClient)

Review comment:
       See other comment, but pass a nil here instead.

##########
File path: sdks/go/pkg/beam/io/datastoreio/datastore_test.go
##########
@@ -16,11 +16,164 @@
 package datastoreio
 
 import (
+       "context"
+       "errors"
+       "reflect"
+       "strings"
        "testing"
 
        "cloud.google.com/go/datastore"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+       "google.golang.org/api/option"
 )
 
+// fake client type implements datastoreio.clientType
+type fakeClient struct {
+       runFn   func()
+       closeFn func()
+}
+
+func (client *fakeClient) Run(context.Context, *datastore.Query) 
*datastore.Iterator {
+       client.runFn()
+       // return an empty iterator
+       return new(datastore.Iterator)
+}
+
+func (client *fakeClient) Close() error {
+       client.closeFn()
+       return nil
+}
+
+// mock type for query
+type Foo struct {
+}
+
+type Bar struct {
+}
+
+func TestRead(t *testing.T) {
+       runCounter := 0
+       closeCounter := 0
+
+       // setup a fake newClient caller
+       originalClient := newClient
+       newClient = func(ctx context.Context,
+               projectID string,
+               opts ...option.ClientOption) (clientType, error) {
+               client := fakeClient{
+                       runFn: func() {
+                               runCounter += 1
+                       },
+                       closeFn: func() {
+                               closeCounter += 1
+                       },
+               }
+               return &client, nil
+       }
+
+       testCases := []struct {
+               v           interface{}
+               shard       int
+               expectRun   int
+               expectClose int
+       }{
+               // case 1: shard=1, without split query
+               {Foo{}, 1, 1, 1},
+               // case 2: shard=2 (>1), with split query
+               {Bar{}, 2, 2, 2},
+       }
+       for _, tc := range testCases {
+               itemType := reflect.TypeOf(tc.v)
+               itemKey := runtime.RegisterType(itemType)

Review comment:
       Valid point! Thank you.
   
   That's not a pattern that should be used as a rule, and it was evidently 
missed in the early contributions. `runtime.RegisterType` shouldn't be exposed 
like that.

##########
File path: sdks/go/pkg/beam/io/datastoreio/datastore.go
##########
@@ -186,12 +200,12 @@ type queryFn struct {
        // Kind is the datastore kind
        Kind string `json:"kind"`
        // Type is the name of the global schema type
-       Type string `json:"type"`
+       Type          string `json:"type"`
+       newClientFunc newClientFuncType

Review comment:
       You've missed the part where I say the value should be checked in a 
`Setup` method on the DoFn.
   
   As written, this will crash on portable runners, because newClientFunc will 
always be nil in that context. This is why the suggestion was to check if it's 
nil, and set up the datastore factory function in a `Setup` method on 
`queryFn`, and simply passing nil in the Read call.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to