lostluck commented on code in PR #26042: URL: https://github.com/apache/beam/pull/26042#discussion_r1194443372
########## sdks/go/pkg/beam/io/spannerio/query_options.go: ########## @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package spannerio + +import ( + "cloud.google.com/go/spanner" + "errors" + "fmt" +) + +const ( + defaultBatching = true +) + +// queryOptions represents additional options for executing a query. +type queryOptions struct { + Batching bool `json:"batching"` // Batched reading, default is true. + MaxPartitions int64 `json:"maxPartitions"` // Maximum partitions + TimestampBound spanner.TimestampBound `json:"timestampBound"` // The TimestampBound to use for batched reading +} + +func newQueryOptions(options ...func(*queryOptions) error) queryOptions { + opts := queryOptions{ + Batching: defaultBatching, + } + + for _, opt := range options { + if err := opt(&opts); err != nil { + panic(fmt.Sprintf("spannerio.Query: invalid option: %v", err)) + } + } + + return opts +} + +// WithBatching sets whether we will use a batched reader. Batching is set to true by default, disable it when the +// underlying query is not root-partitionable. +func WithBatching(batching bool) func(opts *queryOptions) error { Review Comment: I recommend adding a real type for the functional options, if only for documentation purposes: `type QueryOptionFn func(*queryOptions)` and use that instead of the raw type. This makes the options show up nicely in documentation. Eg. See how the options appear for `fileio.ReadOptionFn` and `fileio.MatchOptionFn` https://pkg.go.dev/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/io/fileio ########## sdks/go/pkg/beam/io/spannerio/read_batch.go: ########## @@ -0,0 +1,168 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package spannerio + +import ( + "context" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange" + "reflect" + + "cloud.google.com/go/spanner" + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" + "google.golang.org/api/iterator" +) + +func init() { + register.DoFn4x1[context.Context, *sdf.LockRTracker, PartitionedRead, func(beam.X), error]((*readBatchFn)(nil)) + register.Emitter1[beam.X]() +} + +type readBatchFn struct { + spannerFn + Type beam.EncodedType + Options queryOptions +} + +func newReadBatchFn(db string, t reflect.Type, options queryOptions) *readBatchFn { + return &readBatchFn{ + spannerFn: newSpannerFn(db), + Type: beam.EncodedType{T: t}, + Options: options, + } +} + +func readBatch(s beam.Scope, db string, query string, t reflect.Type, options queryOptions) beam.PCollection { + partitions := generatePartitions(s, db, query, t, options) + + s = s.Scope("spannerio.ReadBatch") + + return beam.ParDo( + s, + newReadBatchFn(db, t, options), + partitions, + beam.TypeDefinition{Var: beam.XType, T: t}, + ) +} + +func (f *readBatchFn) Setup(ctx context.Context) error { + return f.spannerFn.Setup(ctx) +} + +// CreateInitialRestriction creates an offset range restriction representing +// the partition's size in bytes. +func (f *readBatchFn) CreateInitialRestriction(read PartitionedRead) offsetrange.Restriction { + txn := f.client.BatchReadOnlyTransactionFromID(read.BatchTransactionId) + iter := txn.Execute(context.Background(), read.Partition) + defer iter.Stop() + + return offsetrange.Restriction{ + Start: 0, + End: iter.RowCount, + } +} + +const ( + blockSize = 10000 + tooSmall = 100 +) + +// SplitRestriction splits each file restriction into blocks of a predetermined +// size, with some checks to avoid having small remainders. +func (f *readBatchFn) SplitRestriction(_ PartitionedRead, rest offsetrange.Restriction) []offsetrange.Restriction { + splits := rest.SizedSplits(blockSize) Review Comment: Note that this approach also may have the same issue as textIO does with a fixed size split, see #25892. However, since Spanner has already done a bunch of splits for us, it might be moot. Not required to fix this now, but just a note. ########## sdks/go/pkg/beam/io/spannerio/read_batch.go: ########## @@ -0,0 +1,168 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package spannerio + +import ( + "context" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange" + "reflect" + + "cloud.google.com/go/spanner" + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" + "google.golang.org/api/iterator" +) + +func init() { + register.DoFn4x1[context.Context, *sdf.LockRTracker, PartitionedRead, func(beam.X), error]((*readBatchFn)(nil)) + register.Emitter1[beam.X]() +} + +type readBatchFn struct { + spannerFn + Type beam.EncodedType + Options queryOptions +} + +func newReadBatchFn(db string, t reflect.Type, options queryOptions) *readBatchFn { + return &readBatchFn{ + spannerFn: newSpannerFn(db), + Type: beam.EncodedType{T: t}, + Options: options, + } +} + +func readBatch(s beam.Scope, db string, query string, t reflect.Type, options queryOptions) beam.PCollection { + partitions := generatePartitions(s, db, query, t, options) + + s = s.Scope("spannerio.ReadBatch") + + return beam.ParDo( + s, + newReadBatchFn(db, t, options), + partitions, + beam.TypeDefinition{Var: beam.XType, T: t}, + ) +} + +func (f *readBatchFn) Setup(ctx context.Context) error { + return f.spannerFn.Setup(ctx) +} + +// CreateInitialRestriction creates an offset range restriction representing +// the partition's size in bytes. Review Comment: ```suggestion // the number of rows in the partition.. ``` ########## sdks/go/pkg/beam/io/spannerio/read_batch.go: ########## @@ -0,0 +1,168 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package spannerio + +import ( + "context" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange" + "reflect" + + "cloud.google.com/go/spanner" + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" + "google.golang.org/api/iterator" +) + +func init() { + register.DoFn4x1[context.Context, *sdf.LockRTracker, PartitionedRead, func(beam.X), error]((*readBatchFn)(nil)) + register.Emitter1[beam.X]() +} + +type readBatchFn struct { + spannerFn + Type beam.EncodedType + Options queryOptions +} + +func newReadBatchFn(db string, t reflect.Type, options queryOptions) *readBatchFn { + return &readBatchFn{ + spannerFn: newSpannerFn(db), + Type: beam.EncodedType{T: t}, + Options: options, + } +} + +func readBatch(s beam.Scope, db string, query string, t reflect.Type, options queryOptions) beam.PCollection { + partitions := generatePartitions(s, db, query, t, options) + + s = s.Scope("spannerio.ReadBatch") + + return beam.ParDo( + s, + newReadBatchFn(db, t, options), + partitions, + beam.TypeDefinition{Var: beam.XType, T: t}, + ) +} + +func (f *readBatchFn) Setup(ctx context.Context) error { + return f.spannerFn.Setup(ctx) +} + +// CreateInitialRestriction creates an offset range restriction representing +// the partition's size in bytes. +func (f *readBatchFn) CreateInitialRestriction(read PartitionedRead) offsetrange.Restriction { + txn := f.client.BatchReadOnlyTransactionFromID(read.BatchTransactionId) + iter := txn.Execute(context.Background(), read.Partition) + defer iter.Stop() + + return offsetrange.Restriction{ + Start: 0, + End: iter.RowCount, + } +} + +const ( + blockSize = 10000 + tooSmall = 100 +) + +// SplitRestriction splits each file restriction into blocks of a predetermined +// size, with some checks to avoid having small remainders. +func (f *readBatchFn) SplitRestriction(_ PartitionedRead, rest offsetrange.Restriction) []offsetrange.Restriction { + splits := rest.SizedSplits(blockSize) + numSplits := len(splits) + if numSplits > 1 { + last := splits[numSplits-1] + if last.End-last.Start <= tooSmall { + // Last restriction is too small, so merge it with previous one. + splits[numSplits-2].End = last.End + splits = splits[:numSplits-1] + } + } + return splits +} + +// RestrictionSize returns the size of each restriction as its range. +func (f *readBatchFn) RestrictionSize(_ PartitionedRead, rest offsetrange.Restriction) float64 { + return rest.Size() +} + +// CreateTracker creates sdf.LockRTrackers wrapping offsetRange.Trackers for +// each restriction. +func (f *readBatchFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker { + return sdf.NewLockRTracker(offsetrange.NewTracker(rest)) +} + +func (f *readBatchFn) Teardown() { + f.spannerFn.Teardown() +} + +func (f *readBatchFn) ProcessElement(ctx context.Context, rt *sdf.LockRTracker, read PartitionedRead, emit func(beam.X)) error { + rest := rt.GetRestriction().(offsetrange.Restriction) + + txn := f.client.BatchReadOnlyTransactionFromID(read.BatchTransactionId) + iter := txn.Execute(ctx, read.Partition) + defer iter.Stop() + + index := int64(0) + for { + if index == rest.Start { + break + } + + _, err := iter.Next() + if err == iterator.Done { + break + } else if err != nil { + return err + } + } + + for rt.TryClaim(index) { + row, err := iter.Next() + if err == iterator.Done { + break + } else if err != nil { + return err + } + + val := reflect.New(f.Type.T).Interface() // val : *T + + if err := row.ToStruct(val); err != nil { + return err + } + + emit(reflect.ValueOf(val).Elem().Interface()) // emit(*val) + } + + return nil +} + +// PartitionedRead holds relevant partition information to support partitioned reading from Spanner. +type PartitionedRead struct { Review Comment: We probably don't need to export this type. The fields need to be exported, but the struct itself does not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
