tgw-thirdfort opened a new issue, #35295:
URL: https://github.com/apache/beam/issues/35295

   ### What happened?
   
   When streaming data from PubSub, there are two broad options:
   
   1. Create a new subscription on a topic
   2. Use an existing subscription for a topic
   
   These are mutually exclusive.
   
   However, the pubsubio package does not guarantee this exclusion - as can be 
seen 
[here](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/io/pubsubio/pubsubio.go#L74),
 if `ReadOptions` contains a non-empty Subscription, this is set on the 
existing 
[payload](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/model/pipeline_v1/beam_runner_api.pb.go#L3659)
 which already has the Topic set. This violates the guarantees that the 
comments on the `pipepb.PubSubReadPayload` assume - that "Exactly one of topic 
or subscription should be set.".
   
   In practice, when used in a Dataflow job, I observe that a new subscription 
is created - it seems the fact that `Topic` is specified means approach 1 
(above) wins out.
   
   If I make the following change to the `pubsubio` code I observe the 
behaviour I expect when running in Dataflow (no subscription created except for 
the tracker subscription if custom timestamps are used):
   
   ```go
   if opts != nil {
                payload.IdAttribute = opts.IDAttribute
                payload.TimestampAttribute = opts.TimestampAttribute
                if opts.Subscription != "" {
                        payload.Topic = ""
                        payload.Subscription = 
pubsubx.MakeQualifiedSubscriptionName(project, opts.Subscription)
                }
                payload.WithAttributes = opts.WithAttributes
        }
   ```
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [ ] Component: Java SDK
   - [x] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Infrastructure
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [x] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to