lostluck commented on code in PR #26042:
URL: https://github.com/apache/beam/pull/26042#discussion_r1194443372


##########
sdks/go/pkg/beam/io/spannerio/query_options.go:
##########
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package spannerio
+
+import (
+       "cloud.google.com/go/spanner"
+       "errors"
+       "fmt"
+)
+
+const (
+       defaultBatching = true
+)
+
+// queryOptions represents additional options for executing a query.
+type queryOptions struct {
+       Batching       bool                   `json:"batching"`       // 
Batched reading, default is true.
+       MaxPartitions  int64                  `json:"maxPartitions"`  // 
Maximum partitions
+       TimestampBound spanner.TimestampBound `json:"timestampBound"` // The 
TimestampBound to use for batched reading
+}
+
+func newQueryOptions(options ...func(*queryOptions) error) queryOptions {
+       opts := queryOptions{
+               Batching: defaultBatching,
+       }
+
+       for _, opt := range options {
+               if err := opt(&opts); err != nil {
+                       panic(fmt.Sprintf("spannerio.Query: invalid option: 
%v", err))
+               }
+       }
+
+       return opts
+}
+
+// WithBatching sets whether we will use a batched reader. Batching is set to 
true by default, disable it when the
+// underlying query is not root-partitionable.
+func WithBatching(batching bool) func(opts *queryOptions) error {

Review Comment:
   I recommend adding a real type for the functional options, if only for 
documentation purposes:
   
   `type QueryOptionFn func(*queryOptions)` and use that instead of the raw 
type. This makes the options show up nicely in documentation.  Eg. See how the 
options appear for `fileio.ReadOptionFn` and `fileio.MatchOptionFn` 
https://pkg.go.dev/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/io/fileio



##########
sdks/go/pkg/beam/io/spannerio/read_batch.go:
##########
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package spannerio
+
+import (
+       "context"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+       "reflect"
+
+       "cloud.google.com/go/spanner"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+       "google.golang.org/api/iterator"
+)
+
+func init() {
+       register.DoFn4x1[context.Context, *sdf.LockRTracker, PartitionedRead, 
func(beam.X), error]((*readBatchFn)(nil))
+       register.Emitter1[beam.X]()
+}
+
+type readBatchFn struct {
+       spannerFn
+       Type    beam.EncodedType
+       Options queryOptions
+}
+
+func newReadBatchFn(db string, t reflect.Type, options queryOptions) 
*readBatchFn {
+       return &readBatchFn{
+               spannerFn: newSpannerFn(db),
+               Type:      beam.EncodedType{T: t},
+               Options:   options,
+       }
+}
+
+func readBatch(s beam.Scope, db string, query string, t reflect.Type, options 
queryOptions) beam.PCollection {
+       partitions := generatePartitions(s, db, query, t, options)
+
+       s = s.Scope("spannerio.ReadBatch")
+
+       return beam.ParDo(
+               s,
+               newReadBatchFn(db, t, options),
+               partitions,
+               beam.TypeDefinition{Var: beam.XType, T: t},
+       )
+}
+
+func (f *readBatchFn) Setup(ctx context.Context) error {
+       return f.spannerFn.Setup(ctx)
+}
+
+// CreateInitialRestriction creates an offset range restriction representing
+// the partition's size in bytes.
+func (f *readBatchFn) CreateInitialRestriction(read PartitionedRead) 
offsetrange.Restriction {
+       txn := f.client.BatchReadOnlyTransactionFromID(read.BatchTransactionId)
+       iter := txn.Execute(context.Background(), read.Partition)
+       defer iter.Stop()
+
+       return offsetrange.Restriction{
+               Start: 0,
+               End:   iter.RowCount,
+       }
+}
+
+const (
+       blockSize = 10000
+       tooSmall  = 100
+)
+
+// SplitRestriction splits each file restriction into blocks of a predetermined
+// size, with some checks to avoid having small remainders.
+func (f *readBatchFn) SplitRestriction(_ PartitionedRead, rest 
offsetrange.Restriction) []offsetrange.Restriction {
+       splits := rest.SizedSplits(blockSize)

Review Comment:
   Note that this approach also may have the same issue as textIO does with a 
fixed size split, see #25892. However, since Spanner has already done a bunch 
of splits for us, it might be moot.  Not required to fix this now, but just a 
note.



##########
sdks/go/pkg/beam/io/spannerio/read_batch.go:
##########
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package spannerio
+
+import (
+       "context"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+       "reflect"
+
+       "cloud.google.com/go/spanner"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+       "google.golang.org/api/iterator"
+)
+
+func init() {
+       register.DoFn4x1[context.Context, *sdf.LockRTracker, PartitionedRead, 
func(beam.X), error]((*readBatchFn)(nil))
+       register.Emitter1[beam.X]()
+}
+
+type readBatchFn struct {
+       spannerFn
+       Type    beam.EncodedType
+       Options queryOptions
+}
+
+func newReadBatchFn(db string, t reflect.Type, options queryOptions) 
*readBatchFn {
+       return &readBatchFn{
+               spannerFn: newSpannerFn(db),
+               Type:      beam.EncodedType{T: t},
+               Options:   options,
+       }
+}
+
+func readBatch(s beam.Scope, db string, query string, t reflect.Type, options 
queryOptions) beam.PCollection {
+       partitions := generatePartitions(s, db, query, t, options)
+
+       s = s.Scope("spannerio.ReadBatch")
+
+       return beam.ParDo(
+               s,
+               newReadBatchFn(db, t, options),
+               partitions,
+               beam.TypeDefinition{Var: beam.XType, T: t},
+       )
+}
+
+func (f *readBatchFn) Setup(ctx context.Context) error {
+       return f.spannerFn.Setup(ctx)
+}
+
+// CreateInitialRestriction creates an offset range restriction representing
+// the partition's size in bytes.

Review Comment:
   
   ```suggestion
   // the number of rows in the partition..
   ```



##########
sdks/go/pkg/beam/io/spannerio/read_batch.go:
##########
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package spannerio
+
+import (
+       "context"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+       "reflect"
+
+       "cloud.google.com/go/spanner"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+       "google.golang.org/api/iterator"
+)
+
+func init() {
+       register.DoFn4x1[context.Context, *sdf.LockRTracker, PartitionedRead, 
func(beam.X), error]((*readBatchFn)(nil))
+       register.Emitter1[beam.X]()
+}
+
+type readBatchFn struct {
+       spannerFn
+       Type    beam.EncodedType
+       Options queryOptions
+}
+
+func newReadBatchFn(db string, t reflect.Type, options queryOptions) 
*readBatchFn {
+       return &readBatchFn{
+               spannerFn: newSpannerFn(db),
+               Type:      beam.EncodedType{T: t},
+               Options:   options,
+       }
+}
+
+func readBatch(s beam.Scope, db string, query string, t reflect.Type, options 
queryOptions) beam.PCollection {
+       partitions := generatePartitions(s, db, query, t, options)
+
+       s = s.Scope("spannerio.ReadBatch")
+
+       return beam.ParDo(
+               s,
+               newReadBatchFn(db, t, options),
+               partitions,
+               beam.TypeDefinition{Var: beam.XType, T: t},
+       )
+}
+
+func (f *readBatchFn) Setup(ctx context.Context) error {
+       return f.spannerFn.Setup(ctx)
+}
+
+// CreateInitialRestriction creates an offset range restriction representing
+// the partition's size in bytes.
+func (f *readBatchFn) CreateInitialRestriction(read PartitionedRead) 
offsetrange.Restriction {
+       txn := f.client.BatchReadOnlyTransactionFromID(read.BatchTransactionId)
+       iter := txn.Execute(context.Background(), read.Partition)
+       defer iter.Stop()
+
+       return offsetrange.Restriction{
+               Start: 0,
+               End:   iter.RowCount,
+       }
+}
+
+const (
+       blockSize = 10000
+       tooSmall  = 100
+)
+
+// SplitRestriction splits each file restriction into blocks of a predetermined
+// size, with some checks to avoid having small remainders.
+func (f *readBatchFn) SplitRestriction(_ PartitionedRead, rest 
offsetrange.Restriction) []offsetrange.Restriction {
+       splits := rest.SizedSplits(blockSize)
+       numSplits := len(splits)
+       if numSplits > 1 {
+               last := splits[numSplits-1]
+               if last.End-last.Start <= tooSmall {
+                       // Last restriction is too small, so merge it with 
previous one.
+                       splits[numSplits-2].End = last.End
+                       splits = splits[:numSplits-1]
+               }
+       }
+       return splits
+}
+
+// RestrictionSize returns the size of each restriction as its range.
+func (f *readBatchFn) RestrictionSize(_ PartitionedRead, rest 
offsetrange.Restriction) float64 {
+       return rest.Size()
+}
+
+// CreateTracker creates sdf.LockRTrackers wrapping offsetRange.Trackers for
+// each restriction.
+func (f *readBatchFn) CreateTracker(rest offsetrange.Restriction) 
*sdf.LockRTracker {
+       return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
+}
+
+func (f *readBatchFn) Teardown() {
+       f.spannerFn.Teardown()
+}
+
+func (f *readBatchFn) ProcessElement(ctx context.Context, rt 
*sdf.LockRTracker, read PartitionedRead, emit func(beam.X)) error {
+       rest := rt.GetRestriction().(offsetrange.Restriction)
+
+       txn := f.client.BatchReadOnlyTransactionFromID(read.BatchTransactionId)
+       iter := txn.Execute(ctx, read.Partition)
+       defer iter.Stop()
+
+       index := int64(0)
+       for {
+               if index == rest.Start {
+                       break
+               }
+
+               _, err := iter.Next()
+               if err == iterator.Done {
+                       break
+               } else if err != nil {
+                       return err
+               }
+       }
+
+       for rt.TryClaim(index) {
+               row, err := iter.Next()
+               if err == iterator.Done {
+                       break
+               } else if err != nil {
+                       return err
+               }
+
+               val := reflect.New(f.Type.T).Interface() // val : *T
+
+               if err := row.ToStruct(val); err != nil {
+                       return err
+               }
+
+               emit(reflect.ValueOf(val).Elem().Interface()) // emit(*val)
+       }
+
+       return nil
+}
+
+// PartitionedRead holds relevant partition information to support partitioned 
reading from Spanner.
+type PartitionedRead struct {

Review Comment:
   We probably don't need to export this type. The fields need to be exported, 
but the struct itself does not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to