lostluck commented on code in PR #24663:
URL: https://github.com/apache/beam/pull/24663#discussion_r1057989202


##########
sdks/go/pkg/beam/io/mongodbio/read_option.go:
##########
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mongodbio
+
+import (
+       "errors"
+
+       "go.mongodb.org/mongo-driver/bson"
+)
+
+// ReadOption represents options for reading from MongoDB.
+type ReadOption struct {
+       BucketAuto bool
+       Filter     bson.M
+       BundleSize int64
+}
+
+// ReadOptionFn is a function that configures a ReadOption.
+type ReadOptionFn func(option *ReadOption) error
+
+// WithReadBucketAuto configures the ReadOption whether to use the bucketAuto 
aggregation stage.
+func WithReadBucketAuto(bucketAuto bool) ReadOptionFn {
+       return func(o *ReadOption) error {
+               o.BucketAuto = bucketAuto
+               return nil
+       }
+}
+
+// WithReadFilter configures the ReadOption to use the provided filter.
+func WithReadFilter(filter bson.M) ReadOptionFn {

Review Comment:
   If that's convenient enough for users, yes. That guideline is aimed to help 
Cross Language use of transforms. Since we don't have a design yet for that in 
the Go SDK, it's not a hard restriction yet, and may not be appropriate at this 
time. I would say that for now, use what the Mongo DB Go libraries use, since 
it will be familiar for existing users.
   
   We know DoFns will generally be configured by a single struct that is a Beam 
Schema Row, and that ideally the element types are Beam Schema Rows themselves 
(generally represented by single structs each). The Go SDK infers a Row schema 
from structs from their Exported Fields.
   
   Translating/wrapping bson.M's to Beam Schema structs would be it's own 
problem, for a later time, when the Go SDK can start exporting it's transforms 
to Python and Java.



##########
sdks/go/pkg/beam/io/mongodbio/read.go:
##########
@@ -0,0 +1,469 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mongodbio
+
+import (
+       "context"
+       "fmt"
+       "math"
+       "reflect"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/util/structx"
+       "go.mongodb.org/mongo-driver/bson"
+       "go.mongodb.org/mongo-driver/mongo"
+       "go.mongodb.org/mongo-driver/mongo/options"
+       "go.mongodb.org/mongo-driver/mongo/readpref"
+)
+
+const (
+       defaultReadBundleSize = 64 * 1024 * 1024
+
+       minSplitVectorChunkSize = 1024 * 1024
+       maxSplitVectorChunkSize = 1024 * 1024 * 1024
+
+       maxBucketCount = math.MaxInt32
+)
+
+func init() {
+       register.DoFn3x1[context.Context, []byte, func(bson.M), 
error](&bucketAutoFn{})
+       register.DoFn3x1[context.Context, []byte, func(bson.M), 
error](&splitVectorFn{})
+       register.Emitter1[bson.M]()
+
+       register.DoFn3x1[context.Context, bson.M, func(beam.Y), 
error](&readFn{})
+       register.Emitter1[beam.Y]()
+}
+
+// Read reads a MongoDB collection and returns a PCollection<T> for a given 
type T. T must be a
+// struct with exported fields that should have a "bson" tag. By default, the 
transform uses the
+// MongoDB internal splitVector command to split the collection into bundles. 
The transform can be
+// configured to use the $bucketAuto aggregation instead to support reading 
from MongoDB Atlas
+// where the splitVector command is not allowed. This is enabled by passing 
the ReadOptionFn
+// WithReadBucketAuto(true).
+//
+// The Read transform has the required parameters:
+//     - s: the scope of the pipeline
+//     - uri: the MongoDB connection string
+//     - database: the MongoDB database to read from
+//     - collection: the MongoDB collection to read from
+//     - t: the type of the elements in the collection
+//
+// The Read transform takes a variadic number of ReadOptionFn which can set 
the ReadOption fields:
+//     - BucketAuto: whether to use the bucketAuto aggregation to split the 
collection into bundles.
+//       Defaults to false
+//     - Filter: a bson.M map that is used to filter the documents in the 
collection. Defaults to nil,
+//       which means no filter is applied
+//     - BundleSize: the size in bytes to bundle the documents into when 
reading. Defaults to
+//       64 * 1024 * 1024 (64 MB)
+func Read(
+       s beam.Scope,
+       uri string,
+       database string,
+       collection string,
+       t reflect.Type,
+       opts ...ReadOptionFn,
+) beam.PCollection {
+       s = s.Scope("mongodbio.Read")
+
+       option := &ReadOption{
+               BundleSize: defaultReadBundleSize,
+       }
+
+       for _, opt := range opts {
+               if err := opt(option); err != nil {
+                       panic(fmt.Sprintf("mongodbio.Read: invalid option: %v", 
err))
+               }
+       }
+
+       imp := beam.Impulse(s)
+
+       var bundled beam.PCollection
+
+       if option.BucketAuto {
+               bundled = beam.ParDo(s, newBucketAutoFn(uri, database, 
collection, option), imp)
+       } else {
+               bundled = beam.ParDo(s, newSplitVectorFn(uri, database, 
collection, option), imp)
+       }
+
+       return beam.ParDo(
+               s,
+               newReadFn(uri, database, collection, t, option),
+               bundled,
+               beam.TypeDefinition{Var: beam.YType, T: t},
+       )
+}
+
+type bucketAutoFn struct {
+       mongoDBFn
+       BundleSize int64
+}
+
+func newBucketAutoFn(
+       uri string,
+       database string,
+       collection string,
+       option *ReadOption,
+) *bucketAutoFn {
+       return &bucketAutoFn{
+               mongoDBFn: mongoDBFn{
+                       URI:        uri,
+                       Database:   database,
+                       Collection: collection,
+               },
+               BundleSize: option.BundleSize,
+       }
+}
+
+func (fn *bucketAutoFn) ProcessElement(
+       ctx context.Context,
+       _ []byte,
+       emit func(bson.M),
+) error {
+       collectionSize, err := fn.getCollectionSize(ctx)
+       if err != nil {
+               return err
+       }
+
+       if collectionSize == 0 {
+               return nil
+       }
+
+       bucketCount := calculateBucketCount(collectionSize, fn.BundleSize)
+
+       buckets, err := fn.getBuckets(ctx, bucketCount)
+       if err != nil {
+               return err
+       }
+
+       idFilters := idFiltersFromBuckets(buckets)
+
+       for _, filter := range idFilters {
+               emit(filter)
+       }
+
+       return nil
+}
+
+type collStats struct {
+       Size int64 `bson:"size"`
+}
+
+func (fn *bucketAutoFn) getCollectionSize(ctx context.Context) (int64, error) {
+       cmd := bson.M{"collStats": fn.Collection}
+       opts := options.RunCmd().SetReadPreference(readpref.Primary())
+
+       var stats collStats
+       if err := fn.collection.Database().RunCommand(ctx, cmd, 
opts).Decode(&stats); err != nil {
+               return 0, fmt.Errorf("error executing collStats command: %w", 
err)
+       }
+
+       return stats.Size, nil
+}
+
+func calculateBucketCount(collectionSize int64, bundleSize int64) int32 {
+       if bundleSize < 0 {
+               panic("monogdbio.calculateBucketCount: bundle size must be 
greater than 0")
+       }
+
+       count := collectionSize / bundleSize
+       if collectionSize%bundleSize != 0 {
+               count++
+       }
+
+       if count > int64(maxBucketCount) {
+               count = maxBucketCount
+       }
+
+       return int32(count)
+}
+
+type bucket struct {
+       ID minMax `bson:"_id"`
+}
+
+type minMax struct {
+       Min any `bson:"min"`
+       Max any `bson:"max"`
+}
+
+func (fn *bucketAutoFn) getBuckets(ctx context.Context, count int32) 
([]bucket, error) {
+       pipeline := mongo.Pipeline{bson.D{{
+               Key: "$bucketAuto",
+               Value: bson.M{
+                       "groupBy": "$_id",
+                       "buckets": count,
+               },
+       }}}
+
+       cursor, err := fn.collection.Aggregate(ctx, pipeline)
+       if err != nil {
+               return nil, fmt.Errorf("error executing bucketAuto aggregation: 
%w", err)
+       }
+
+       var buckets []bucket
+       if err = cursor.All(ctx, &buckets); err != nil {
+               return nil, fmt.Errorf("error decoding buckets: %w", err)
+       }
+
+       return buckets, nil
+}
+
+func idFiltersFromBuckets(buckets []bucket) []bson.M {
+       idFilters := make([]bson.M, len(buckets))
+
+       for i := 0; i < len(buckets); i++ {
+               filter := bson.M{}
+
+               if i != 0 {
+                       filter["$gt"] = buckets[i].ID.Min
+               }
+
+               if i != len(buckets)-1 {
+                       filter["$lte"] = buckets[i].ID.Max
+               }
+
+               if len(filter) == 0 {
+                       idFilters[i] = filter
+               } else {
+                       idFilters[i] = bson.M{"_id": filter}
+               }
+       }
+
+       return idFilters
+}
+
+type splitVectorFn struct {
+       mongoDBFn
+       BundleSize int64
+}
+
+func newSplitVectorFn(
+       uri string,
+       database string,
+       collection string,
+       option *ReadOption,
+) *splitVectorFn {
+       return &splitVectorFn{
+               mongoDBFn: mongoDBFn{
+                       URI:        uri,
+                       Database:   database,
+                       Collection: collection,
+               },
+               BundleSize: option.BundleSize,
+       }
+}
+
+func (fn *splitVectorFn) ProcessElement(
+       ctx context.Context,
+       _ []byte,
+       emit func(bson.M),
+) error {
+       chunkSize := getChunkSize(fn.BundleSize)
+
+       splitKeys, err := fn.getSplitKeys(ctx, chunkSize)
+       if err != nil {
+               return err
+       }
+
+       idFilters := idFiltersFromSplits(splitKeys)
+
+       for _, filter := range idFilters {
+               emit(filter)
+       }
+
+       return nil
+}
+
+func getChunkSize(bundleSize int64) int64 {
+       var chunkSize int64
+
+       if bundleSize < minSplitVectorChunkSize {
+               chunkSize = minSplitVectorChunkSize
+       } else if bundleSize > maxSplitVectorChunkSize {
+               chunkSize = maxSplitVectorChunkSize
+       } else {
+               chunkSize = bundleSize
+       }
+
+       return chunkSize
+}
+
+type splitVector struct {
+       SplitKeys []splitKey `bson:"splitKeys"`
+}
+
+type splitKey struct {
+       ID any `bson:"_id"`
+}
+
+func (fn *splitVectorFn) getSplitKeys(ctx context.Context, chunkSize int64) 
([]splitKey, error) {
+       cmd := bson.D{
+               {Key: "splitVector", Value: fmt.Sprintf("%s.%s", fn.Database, 
fn.Collection)},
+               {Key: "keyPattern", Value: bson.D{{Key: "_id", Value: 1}}},
+               {Key: "maxChunkSizeBytes", Value: chunkSize},
+       }
+
+       opts := options.RunCmd().SetReadPreference(readpref.Primary())
+
+       var vector splitVector
+       if err := fn.collection.Database().RunCommand(ctx, cmd, 
opts).Decode(&vector); err != nil {
+               return nil, fmt.Errorf("error executing splitVector command: 
%w", err)
+       }
+
+       return vector.SplitKeys, nil
+}
+
+func idFiltersFromSplits(splitKeys []splitKey) []bson.M {
+       idFilters := make([]bson.M, len(splitKeys)+1)
+
+       for i := 0; i < len(splitKeys)+1; i++ {
+               filter := bson.M{}
+
+               if i > 0 {
+                       filter["$gt"] = splitKeys[i-1].ID
+               }
+
+               if i < len(splitKeys) {
+                       filter["$lte"] = splitKeys[i].ID
+               }
+
+               if len(filter) == 0 {
+                       idFilters[i] = filter
+               } else {
+                       idFilters[i] = bson.M{"_id": filter}
+               }
+       }
+
+       return idFilters
+}
+
+type readFn struct {
+       mongoDBFn
+       Filter []byte

Review Comment:
   Unfortunately, not. This is the current correct/best way. `interface{}/any` 
are unencodable by nature, and at decode time, concrete runtime types would be 
lost if they were simply JSON or BSON encoded by the framework. The framework 
works best with concrete types.
   
   Ideally it should do the decoding in a `Setup()` method instead of 
`ProcessElement()`, and store the values in an unexported field of type 
`bson.M`. Setup will be called once during the instance's lifetime, while 
ProcessElement is called for every element. Though in practice, since these are 
kicked off with `beam.Impulse()`, those are the same.
   
   Where the `Setup` approach shows value is when the connection/state 
information for the reads (or the filters, etc) is passed in as elements into 
the transform, rather than as static configuration. That way, for example, the 
results from several queries can be emitted from the single transform. This 
would enable longer lived clients and so on.
   
   Certainly not required, and I'm not familiar enough to make the design 
decision. It's possible to migrate to that at a later moment anyway, if it 
seems useful, or simply have a different entry point.



##########
sdks/go/pkg/beam/io/mongodbio/write.go:
##########
@@ -0,0 +1,204 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mongodbio
+
+import (
+       "context"
+       "fmt"
+       "reflect"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/util/structx"
+       "go.mongodb.org/mongo-driver/bson"
+       "go.mongodb.org/mongo-driver/bson/primitive"
+       "go.mongodb.org/mongo-driver/mongo"
+       "go.mongodb.org/mongo-driver/mongo/options"
+)
+
+const (
+       defaultWriteBatchSize = 1000
+       defaultWriteOrdered   = true
+)
+
+func init() {
+       register.Function1x2(createIDFn)
+       register.Emitter2[primitive.ObjectID, beam.Y]()
+
+       register.DoFn3x0[context.Context, beam.Y, func(beam.X, beam.Y)](
+               &extractIDFn{},
+       )
+       register.Emitter2[beam.X, beam.Y]()
+
+       register.DoFn4x1[context.Context, beam.X, beam.Y, func(beam.X), error](
+               &writeFn{},
+       )
+       register.Emitter1[primitive.ObjectID]()
+}
+
+// Write writes a PCollection<T> of a type T to MongoDB. T must be a struct 
with exported fields
+// that should have a "bson" tag. If the struct has a field with the bson tag 
"_id", the value of
+// that field will be used as the id of the document. Otherwise, a new id 
field of type
+// primitive.ObjectID will be generated for each document. Write returns a 
PCollection<K> of the
+// inserted id values with type K.
+//
+// The Write transform has the required parameters:
+//     - s: the scope of the pipeline
+//     - uri: the MongoDB connection string
+//     - database: the MongoDB database to write to
+//     - collection: the MongoDB collection to write to
+//     - col: the PCollection to write to MongoDB
+//
+// The Write transform takes a variadic number of WriteOptionFn which can set 
the WriteOption
+// fields:
+//     - BatchSize: the number of documents to write in a single batch. 
Defaults to 1000
+//     - Ordered: whether to execute the writes in order. Defaults to true
+func Write(
+       s beam.Scope,
+       uri string,
+       database string,
+       collection string,
+       col beam.PCollection,
+       opts ...WriteOptionFn,
+) beam.PCollection {
+       s = s.Scope("mongodbio.Write")
+
+       option := &WriteOption{
+               BatchSize: defaultWriteBatchSize,
+               Ordered:   defaultWriteOrdered,
+       }
+
+       for _, opt := range opts {
+               if err := opt(option); err != nil {
+                       panic(fmt.Sprintf("mongodbio.Write: invalid option: 
%v", err))
+               }
+       }
+
+       t := col.Type().Type()
+       idIndex := structx.FieldIndexByTag(t, bsonTag, "_id")
+
+       var keyed beam.PCollection
+
+       if idIndex == -1 {
+               pre := beam.ParDo(s, createIDFn, col)
+               keyed = beam.Reshuffle(s, pre)
+       } else {
+               keyed = beam.ParDo(
+                       s,
+                       newExtractIDFn(idIndex),
+                       col,
+                       beam.TypeDefinition{Var: beam.XType, T: 
t.Field(idIndex).Type},
+               )
+       }
+
+       return beam.ParDo(
+               s,
+               newWriteFn(uri, database, collection, option),
+               keyed,
+       )
+}
+
+func createIDFn(elem beam.Y) (primitive.ObjectID, beam.Y) {
+       id := primitive.NewObjectID()
+       return id, elem
+}
+
+type extractIDFn struct {
+       IDIndex int
+}
+
+func newExtractIDFn(idIndex int) *extractIDFn {
+       return &extractIDFn{
+               IDIndex: idIndex,
+       }
+}
+
+func (fn *extractIDFn) ProcessElement(
+       _ context.Context,
+       elem beam.Y,
+       emit func(beam.X, beam.Y),
+) {
+       id := reflect.ValueOf(elem).Field(fn.IDIndex).Interface()
+       emit(id, elem)
+}
+
+type writeFn struct {
+       mongoDBFn
+       BatchSize int64
+       Ordered   bool
+       models    []mongo.WriteModel
+}
+
+func newWriteFn(
+       uri string,
+       database string,
+       collection string,
+       option *WriteOption,
+) *writeFn {
+       return &writeFn{
+               mongoDBFn: mongoDBFn{
+                       URI:        uri,
+                       Database:   database,
+                       Collection: collection,
+               },
+               BatchSize: option.BatchSize,
+               Ordered:   option.Ordered,
+       }
+}
+
+func (fn *writeFn) ProcessElement(
+       ctx context.Context,
+       key beam.X,
+       value beam.Y,
+       emit func(beam.X),
+) error {
+       model := mongo.NewReplaceOneModel().
+               SetFilter(bson.M{"_id": key}).
+               SetUpsert(true).
+               SetReplacement(value)
+
+       fn.models = append(fn.models, model)
+
+       if len(fn.models) >= int(fn.BatchSize) {
+               if err := fn.flush(ctx); err != nil {
+                       return err
+               }
+       }
+
+       emit(key)

Review Comment:
   Regarding exposing 3rd party libraries, the concern is typically on the user 
surface. So internal to a composite AKA, all the DoFns + PCollections that are 
being put together on the pipeline when they call `Read` or `Write`, that's 
fine. The only concern is ensuring the type is encodeable (as all elements in 
PCollections must be).
   
   Un-encodeable types can slip in for direct runners, or runners with 
aggressive fusion, but shouldn't be taken as given for execution.
   
   In particular, we don't want DoFns to agggresively serialize things 
themselves to pass between other DoFns, since that's inefficient, and avoiding 
that inefficiency is why fusion exists as an optimization, and why Beam is very 
Coder Aware.



##########
sdks/go/pkg/beam/io/mongodbio/read.go:
##########
@@ -0,0 +1,469 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mongodbio
+
+import (
+       "context"
+       "fmt"
+       "math"
+       "reflect"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/util/structx"
+       "go.mongodb.org/mongo-driver/bson"
+       "go.mongodb.org/mongo-driver/mongo"
+       "go.mongodb.org/mongo-driver/mongo/options"
+       "go.mongodb.org/mongo-driver/mongo/readpref"
+)
+
+const (
+       defaultReadBundleSize = 64 * 1024 * 1024
+
+       minSplitVectorChunkSize = 1024 * 1024
+       maxSplitVectorChunkSize = 1024 * 1024 * 1024
+
+       maxBucketCount = math.MaxInt32
+)
+
+func init() {
+       register.DoFn3x1[context.Context, []byte, func(bson.M), 
error](&bucketAutoFn{})
+       register.DoFn3x1[context.Context, []byte, func(bson.M), 
error](&splitVectorFn{})
+       register.Emitter1[bson.M]()
+
+       register.DoFn3x1[context.Context, bson.M, func(beam.Y), 
error](&readFn{})
+       register.Emitter1[beam.Y]()
+}
+
+// Read reads a MongoDB collection and returns a PCollection<T> for a given 
type T. T must be a
+// struct with exported fields that should have a "bson" tag. By default, the 
transform uses the
+// MongoDB internal splitVector command to split the collection into bundles. 
The transform can be
+// configured to use the $bucketAuto aggregation instead to support reading 
from MongoDB Atlas
+// where the splitVector command is not allowed. This is enabled by passing 
the ReadOptionFn
+// WithReadBucketAuto(true).
+//
+// The Read transform has the required parameters:
+//     - s: the scope of the pipeline
+//     - uri: the MongoDB connection string
+//     - database: the MongoDB database to read from
+//     - collection: the MongoDB collection to read from
+//     - t: the type of the elements in the collection
+//
+// The Read transform takes a variadic number of ReadOptionFn which can set 
the ReadOption fields:
+//     - BucketAuto: whether to use the bucketAuto aggregation to split the 
collection into bundles.
+//       Defaults to false
+//     - Filter: a bson.M map that is used to filter the documents in the 
collection. Defaults to nil,
+//       which means no filter is applied
+//     - BundleSize: the size in bytes to bundle the documents into when 
reading. Defaults to
+//       64 * 1024 * 1024 (64 MB)
+func Read(
+       s beam.Scope,
+       uri string,
+       database string,
+       collection string,
+       t reflect.Type,
+       opts ...ReadOptionFn,
+) beam.PCollection {
+       s = s.Scope("mongodbio.Read")
+
+       option := &ReadOption{
+               BundleSize: defaultReadBundleSize,
+       }
+
+       for _, opt := range opts {
+               if err := opt(option); err != nil {
+                       panic(fmt.Sprintf("mongodbio.Read: invalid option: %v", 
err))
+               }
+       }
+
+       imp := beam.Impulse(s)
+
+       var bundled beam.PCollection
+
+       if option.BucketAuto {
+               bundled = beam.ParDo(s, newBucketAutoFn(uri, database, 
collection, option), imp)
+       } else {
+               bundled = beam.ParDo(s, newSplitVectorFn(uri, database, 
collection, option), imp)

Review Comment:
   I assume these are dividing the Reads into separate chunks for smaller reads?
   
   You could improve scalability and possibly get dynamic scaling (on Dataflow) 
if these are made into the SDF format, generally by pairing the element (in 
this case, "the concept of a specific Mongo DB Database Collection") with a 
restriction of some kind (as you've got it.) The key bit is that restrictions 
technically don't need to be subdividable or numeric, it's just easier that way 
if that can map cleanly.
   
   Definitely not a blocker for this PR though. 
https://beam.apache.org/documentation/programming-guide/#splittable-dofns has 
all the information on them.  



##########
sdks/go/test/integration/io/mongodbio/mongodbio_test.go:
##########
@@ -0,0 +1,232 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mongodbio

Review Comment:
   Ack. Responding based on the latest information you provided. It's 
interesting that the coder handling with and without registration is so 
different, and causes problems in the runners as such. This is very useful to 
know.
   
   WRT tests and their support, not all runners support all beam features, and 
the Go SDK is increasingly encroaching on features not supported by 
Flink/Spark/Samza/PythonPortableULR. As a rule, if possible, the test should be 
able to run successfully on Dataflow, and if that's not possible, it should run 
on Flink instead.
   
   At the moment, the only fully featured language specific runner is the Java 
Direct runner, but we can't use that.  I'm presently working on a replacement 
for the Go Direct runner that will be portable, and ideally have simpler to 
understand support for what the runner is doing under the hood for given 
features. See #24789 for some more info.
   I hope to get it into the main repo in the next month or two, after I've got 
the factoring worked out to make it easier to contributed to and maintain.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to