Hello Again!

So, the External stuff is how the "built in" implementation in runners is being 
triggered.

That ends up adding a Transform in the raw Pipeline proto [0] that has the 
PubSub Write URN [1] which is how the Runner (Dataflow) knows that it should be 
doing something more for this.

As I mentioned in my other email, the best way to solve this so *all* runners 
can take advantage of it is to have portable implementation that gets 
substituted into the proto here in the `dataflow.Execute` function [2]. We 
don't have full convenience code for this in the Go SDK presently, but most of 
the other parts are there.

In this case, instead of the FunctionSpec with the "pubsub Write" urn [1] we'd 
replace that urn with ParDo URN, and a ParDo Payload refering to the DoFn being 
implemented [3][4], assuming it's a single DoFn. The coders/inputs/outputs 
would be the same at least.

The trick is gettnig the right encodeded form of the DoFn to put into the 
Payload.

If it's subgraph of several transforms it's going to be a little more involved. 
But I don't have time *right now* to write that out.

For Dataflow, this only needs to happen when streaming = false, and only for 
the Write transform, since if there's a PubSub Read in there, it's vacuously a 
streaming pipeline.

Robert Burke
Beam Go Busybody

[0] 
https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L80
[1] 
https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L367
[2] 
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/dataflow/dataflow.go#L220

[3] 
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L581
[4] 
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L458

Bonus: PubsubIO link
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/io/pubsubio/pubsubio.go#L133

On 2023/02/17 10:15:15 Shivam Singhal wrote:
> Hi folks,
> 
> Finding it a little hard to figure out the starting point.
> If someone from the community can pair with me on this for 30-45 mins, that
> would be great.
> 
> I have read the code in the pubsubio package and drilled down into the
> `beam.External` function as well but couldn't make much sense.
> 
> Thanks,
> Shivam Singhal
> 
> On Mon, 6 Feb 2023 at 22:45, Robert Burke <[email protected]> wrote:
> 
> > One solution would be to make the substitution only at Pipeline Submission
> > time, when we know if the job is being submitted to Batch Dataflow or not,
> > and either 1. Fail with a clear error message, or 2. Substitute the
> > external transform with the batch publish DoFn.
> >
> > This is closer to how Java and Python implement their substitutions for
> > portable implementations.
> >
> > This should be authored so the other runner adapters can reuse it when
> > they don't support a "native" PubsubIO. This leads to predictable behavior
> > between runners and for the implementation.
> >
> > This is the best approach since the runner based IO is preferable to
> > trying to switch out  portable dofns. Arbitrary DoFns are a pain to single
> > out and detect, while the external approach has a known URN to key off of.
> >
> > ------
> >
> > Another approach would be to change the existing pubsubio.Sink call so
> > that it checks if the input PCollection is Unbounded or not.
> >
> > If it's Unbounded, the current approach should be used.
> >
> > If it's Bounded, then we're likely in a situation where the "runner
> > provided PubsubIO Sink" isn't going to be available, so we should use a
> > backup dofn to publish in batch situations.
> >
> > This isn't a great solution because it leads to less predictable behavior.
> > Also it runs the risk of using the batch implementation instead of a tuned
> > runner implementation in a streaming pipeline, which will likely have
> > performance consequences.
> >
> > ----
> >
> > Personally I feel Dataflow should support the sink properly in batch
> > rather than differentiating between that and streaming but I'm aware their
> > implementation precludes that at the moment, as they also don't want to
> > have multiple implementations to maintain.
> >
> > Robert Burke
> > Beam Go Busybody
> >
> > On Mon, Feb 6, 2023, 8:44 AM Ritesh Ghorse via dev <[email protected]>
> > wrote:
> >
> >> I've added a comment to the StackOverflow question
> >> <https://stackoverflow.com/questions/69651665/go-apache-beam-gcp-dataflow-could-not-find-the-sink-for-pubsub-check-that-th>.
> >> I guess the suggested approach there is the workaround for now,
> >> unfortunately. You have to create a new client in the setup function there
> >> and add a publish call
> >> <https://pkg.go.dev/cloud.google.com/go/pubsub#Topic.Publish> in the
> >> `publishBatch()`
> >>
> >> Also @shivam if you can pick that issue it would be great. Feel free to
> >> post questions/ask for help here
> >>
> >> On Mon, Feb 6, 2023 at 7:13 AM Shivam Singhal <
> >> [email protected]> wrote:
> >>
> >>> I will be picking the issue up once the maintainers have triaged the
> >>> issue.
> >>>
> >>> On Mon, 6 Feb 2023 at 17:43, Shivam Singhal <[email protected]>
> >>> wrote:
> >>>
> >>>> Not sure if there is any solution other than fixing the Go pubsubio
> >>>> package.
> >>>>
> >>>> On Mon, 6 Feb 2023 at 17:41, Ashok KS <[email protected]> wrote:
> >>>>
> >>>>> Yes, that is where Iam getting stuck. I wrote the complete pipeline in
> >>>>> Python which reads from the BQ table and published it as a PubSub 
> >>>>> message.
> >>>>> I'm able to force it as a streaming application by passing 
> >>>>> --streaming=True
> >>>>> But for my project, they want it in Go so I had to rewrite the
> >>>>> complete logic in Go.
> >>>>> I did the same, but stuck at the last point of publishing it to PubSub.
> >>>>>
> >>>>> On Mon, Feb 6, 2023 at 11:07 PM Shivam Singhal <
> >>>>> [email protected]> wrote:
> >>>>>
> >>>>>> It depends on the input source: it will decide if your pipeline is a
> >>>>>> streaming or a batch pipeline.
> >>>>>>
> >>>>>> Since you are querying over a BQ table, the input is finite and in
> >>>>>> result, your pipeline is a batch pipeline.
> >>>>>> I am not sure there is a straightforward way where you can convert
> >>>>>> this pipeline into a streaming pipeline.
> >>>>>>
> >>>>>>
> >>>>>> On Mon, 6 Feb 2023 at 17:32, Ashok KS <[email protected]> wrote:
> >>>>>>
> >>>>>>> Hi Shivam,
> >>>>>>>
> >>>>>>> Thanks for that. How can run the pipeline as a streaming pipeline?
> >>>>>>> In python I could just run the pipeline by passing —streaming=True in 
> >>>>>>> the
> >>>>>>> command line, but I couldn’t find anything similar in Go.
> >>>>>>>
> >>>>>>> Any pointers would be appreciated.
> >>>>>>>
> >>>>>>> Regards,
> >>>>>>> Ashok
> >>>>>>>
> >>>>>>> On Mon, 6 Feb 2023 at 10:59 pm, Shivam Singhal <
> >>>>>>> [email protected]> wrote:
> >>>>>>>
> >>>>>>>> The issue is not yet verified by the maintainers but I think the
> >>>>>>>> pubsubio connector's Write method doesn't work in Batch pipelines.
> >>>>>>>>
> >>>>>>>> But I am pretty sure that pubsubio Write doesn't work for Batch
> >>>>>>>> Pipelines because it's mentioned in the code comments. Check the 
> >>>>>>>> below
> >>>>>>>> issue for the details:
> >>>>>>>> https://github.com/apache/beam/issues/25326
> >>>>>>>>
> >>>>>>>> On Mon, 6 Feb 2023 at 17:26, Ashok KS <[email protected]> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Shivam,
> >>>>>>>>>
> >>>>>>>>> Thanks a lot for your response. Yes it is a batch pipeline. My
> >>>>>>>>> task is to read a big query table, process the data and publish the 
> >>>>>>>>> Rows as
> >>>>>>>>> a PubSub message.
> >>>>>>>>>
> >>>>>>>>> Regards,
> >>>>>>>>> Ashok
> >>>>>>>>>
> >>>>>>>>> On Mon, 6 Feb 2023 at 10:52 pm, Shivam Singhal <
> >>>>>>>>> [email protected]> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hey Ashok KS,
> >>>>>>>>>>
> >>>>>>>>>> Is this a batch pipeline?
> >>>>>>>>>>
> >>>>>>>>>> On Mon, 6 Feb 2023 at 09:27, Ashok KS <[email protected]>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi All,
> >>>>>>>>>>>
> >>>>>>>>>>> Just sending a reminder in case anyone could help. I haven't
> >>>>>>>>>>> received any response to my issue.
> >>>>>>>>>>>
> >>>>>>>>>>> Regards,
> >>>>>>>>>>> Ashok
> >>>>>>>>>>>
> >>>>>>>>>>> On Fri, Feb 3, 2023 at 12:23 AM Ashok KS <[email protected]>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi All,
> >>>>>>>>>>>>
> >>>>>>>>>>>> I'm new to using Apache Beam using Go.
> >>>>>>>>>>>>
> >>>>>>>>>>>> pubsubio.Write(scope, "project", "topic", ppMessages)
> >>>>>>>>>>>> When I try to publish a message in a topic I get an error
> >>>>>>>>>>>> message
> >>>>>>>>>>>> "Could not find the sink for pubsub, Check that the sink
> >>>>>>>>>>>> library specifies alwayslink = 1
> >>>>>>>>>>>>
> >>>>>>>>>>>> I found a StackOverFlow post for the same issue but it doesn't
> >>>>>>>>>>>> solve the problem.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Stackoverflow Link
> >>>>>>>>>>>> <https://stackoverflow.com/questions/69651665/go-apache-beam-gcp-dataflow-could-not-find-the-sink-for-pubsub-check-that-th>
> >>>>>>>>>>>>
> >>>>>>>>>>>> Can someone please help?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Regards,
> >>>>>>>>>>>> Ashok
> >>>>>>>>>>>>
> >>>>>>>>>>>
> 

Reply via email to