lostluck commented on a change in pull request #17173:
URL: https://github.com/apache/beam/pull/17173#discussion_r837666622
##########
File path: sdks/go/pkg/beam/io/datastoreio/datastore.go
##########
@@ -55,20 +64,25 @@ func init() {
// datastoreio.Read(s, "project", "Item", 256, reflect.TypeOf(Item{}), itemKey)
func Read(s beam.Scope, project, kind string, shards int, t reflect.Type,
typeKey string) beam.PCollection {
s = s.Scope("datastore.Read")
- return query(s, project, kind, shards, t, typeKey)
+ return query(s, project, kind, shards, t, typeKey, datastoreNewClient)
}
-func query(s beam.Scope, project, kind string, shards int, t reflect.Type,
typeKey string) beam.PCollection {
+func datastoreNewClient(ctx context.Context, projectID string, opts
...option.ClientOption) (clientType, error) {
+ return datastore.NewClient(ctx, projectID, opts...)
+}
+
+func query(s beam.Scope, project, kind string, shards int, t reflect.Type,
typeKey string, newClient newClientFuncType) beam.PCollection {
imp := beam.Impulse(s)
- ex := beam.ParDo(s, &splitQueryFn{Project: project, Kind: kind, Shards:
shards}, imp)
+ ex := beam.ParDo(s, &splitQueryFn{Project: project, Kind: kind, Shards:
shards, newClientFunc: newClient}, imp)
g := beam.GroupByKey(s, ex)
- return beam.ParDo(s, &queryFn{Project: project, Kind: kind, Type:
typeKey}, g, beam.TypeDefinition{Var: beam.XType, T: t})
+ return beam.ParDo(s, &queryFn{Project: project, Kind: kind, Type:
typeKey, newClientFunc: newClient}, g, beam.TypeDefinition{Var: beam.XType, T:
t})
}
type splitQueryFn struct {
- Project string `json:"project"`
- Kind string `json:"kind"`
- Shards int `json:"shards"`
+ Project string `json:"project"`
+ Kind string `json:"kind"`
+ Shards int `json:"shards"`
+ newClientFunc newClientFuncType
Review comment:
Same thing here, a `Setup` method, to set up the real datastore call.
##########
File path: sdks/go/pkg/beam/io/datastoreio/datastore.go
##########
@@ -55,20 +64,25 @@ func init() {
// datastoreio.Read(s, "project", "Item", 256, reflect.TypeOf(Item{}), itemKey)
func Read(s beam.Scope, project, kind string, shards int, t reflect.Type,
typeKey string) beam.PCollection {
s = s.Scope("datastore.Read")
- return query(s, project, kind, shards, t, typeKey)
+ return query(s, project, kind, shards, t, typeKey, datastoreNewClient)
Review comment:
See other comment, but pass a nil here instead.
##########
File path: sdks/go/pkg/beam/io/datastoreio/datastore_test.go
##########
@@ -16,11 +16,164 @@
package datastoreio
import (
+ "context"
+ "errors"
+ "reflect"
+ "strings"
"testing"
"cloud.google.com/go/datastore"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+ "google.golang.org/api/option"
)
+// fake client type implements datastoreio.clientType
+type fakeClient struct {
+ runFn func()
+ closeFn func()
+}
+
+func (client *fakeClient) Run(context.Context, *datastore.Query)
*datastore.Iterator {
+ client.runFn()
+ // return an empty iterator
+ return new(datastore.Iterator)
+}
+
+func (client *fakeClient) Close() error {
+ client.closeFn()
+ return nil
+}
+
+// mock type for query
+type Foo struct {
+}
+
+type Bar struct {
+}
+
+func TestRead(t *testing.T) {
+ runCounter := 0
+ closeCounter := 0
+
+ // setup a fake newClient caller
+ originalClient := newClient
+ newClient = func(ctx context.Context,
+ projectID string,
+ opts ...option.ClientOption) (clientType, error) {
+ client := fakeClient{
+ runFn: func() {
+ runCounter += 1
+ },
+ closeFn: func() {
+ closeCounter += 1
+ },
+ }
+ return &client, nil
+ }
+
+ testCases := []struct {
+ v interface{}
+ shard int
+ expectRun int
+ expectClose int
+ }{
+ // case 1: shard=1, without split query
+ {Foo{}, 1, 1, 1},
+ // case 2: shard=2 (>1), with split query
+ {Bar{}, 2, 2, 2},
+ }
+ for _, tc := range testCases {
+ itemType := reflect.TypeOf(tc.v)
+ itemKey := runtime.RegisterType(itemType)
Review comment:
Valid point! Thank you.
That's not a pattern that should be used as a rule, and it was evidently
missed in the early contributions. `runtime.RegisterType` shouldn't be exposed
like that.
##########
File path: sdks/go/pkg/beam/io/datastoreio/datastore.go
##########
@@ -186,12 +200,12 @@ type queryFn struct {
// Kind is the datastore kind
Kind string `json:"kind"`
// Type is the name of the global schema type
- Type string `json:"type"`
+ Type string `json:"type"`
+ newClientFunc newClientFuncType
Review comment:
You've missed the part where I say the value should be checked in a
`Setup` method on the DoFn.
As written, this will crash on portable runners, because newClientFunc will
always be nil in that context. This is why the suggestion was to check if it's
nil, and set up the datastore factory function in a `Setup` method on
`queryFn`, and simply passing nil in the Read call.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]