One solution would be to make the substitution only at Pipeline Submission
time, when we know if the job is being submitted to Batch Dataflow or not,
and either 1. Fail with a clear error message, or 2. Substitute the
external transform with the batch publish DoFn.

This is closer to how Java and Python implement their substitutions for
portable implementations.

This should be authored so the other runner adapters can reuse it when they
don't support a "native" PubsubIO. This leads to predictable behavior
between runners and for the implementation.

This is the best approach since the runner based IO is preferable to trying
to switch out  portable dofns. Arbitrary DoFns are a pain to single out and
detect, while the external approach has a known URN to key off of.

------

Another approach would be to change the existing pubsubio.Sink call so that
it checks if the input PCollection is Unbounded or not.

If it's Unbounded, the current approach should be used.

If it's Bounded, then we're likely in a situation where the "runner
provided PubsubIO Sink" isn't going to be available, so we should use a
backup dofn to publish in batch situations.

This isn't a great solution because it leads to less predictable behavior.
Also it runs the risk of using the batch implementation instead of a tuned
runner implementation in a streaming pipeline, which will likely have
performance consequences.

----

Personally I feel Dataflow should support the sink properly in batch rather
than differentiating between that and streaming but I'm aware their
implementation precludes that at the moment, as they also don't want to
have multiple implementations to maintain.

Robert Burke
Beam Go Busybody

On Mon, Feb 6, 2023, 8:44 AM Ritesh Ghorse via dev <[email protected]>
wrote:

> I've added a comment to the StackOverflow question
> <https://stackoverflow.com/questions/69651665/go-apache-beam-gcp-dataflow-could-not-find-the-sink-for-pubsub-check-that-th>.
> I guess the suggested approach there is the workaround for now,
> unfortunately. You have to create a new client in the setup function there
> and add a publish call
> <https://pkg.go.dev/cloud.google.com/go/pubsub#Topic.Publish> in the
> `publishBatch()`
>
> Also @shivam if you can pick that issue it would be great. Feel free to
> post questions/ask for help here
>
> On Mon, Feb 6, 2023 at 7:13 AM Shivam Singhal <[email protected]>
> wrote:
>
>> I will be picking the issue up once the maintainers have triaged the
>> issue.
>>
>> On Mon, 6 Feb 2023 at 17:43, Shivam Singhal <[email protected]>
>> wrote:
>>
>>> Not sure if there is any solution other than fixing the Go pubsubio
>>> package.
>>>
>>> On Mon, 6 Feb 2023 at 17:41, Ashok KS <[email protected]> wrote:
>>>
>>>> Yes, that is where Iam getting stuck. I wrote the complete pipeline in
>>>> Python which reads from the BQ table and published it as a PubSub message.
>>>> I'm able to force it as a streaming application by passing --streaming=True
>>>> But for my project, they want it in Go so I had to rewrite the complete
>>>> logic in Go.
>>>> I did the same, but stuck at the last point of publishing it to PubSub.
>>>>
>>>> On Mon, Feb 6, 2023 at 11:07 PM Shivam Singhal <
>>>> [email protected]> wrote:
>>>>
>>>>> It depends on the input source: it will decide if your pipeline is a
>>>>> streaming or a batch pipeline.
>>>>>
>>>>> Since you are querying over a BQ table, the input is finite and in
>>>>> result, your pipeline is a batch pipeline.
>>>>> I am not sure there is a straightforward way where you can convert
>>>>> this pipeline into a streaming pipeline.
>>>>>
>>>>>
>>>>> On Mon, 6 Feb 2023 at 17:32, Ashok KS <[email protected]> wrote:
>>>>>
>>>>>> Hi Shivam,
>>>>>>
>>>>>> Thanks for that. How can run the pipeline as a streaming pipeline?
>>>>>> In python I could just run the pipeline by passing —streaming=True in the
>>>>>> command line, but I couldn’t find anything similar in Go.
>>>>>>
>>>>>> Any pointers would be appreciated.
>>>>>>
>>>>>> Regards,
>>>>>> Ashok
>>>>>>
>>>>>> On Mon, 6 Feb 2023 at 10:59 pm, Shivam Singhal <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> The issue is not yet verified by the maintainers but I think the
>>>>>>> pubsubio connector's Write method doesn't work in Batch pipelines.
>>>>>>>
>>>>>>> But I am pretty sure that pubsubio Write doesn't work for Batch
>>>>>>> Pipelines because it's mentioned in the code comments. Check the below
>>>>>>> issue for the details:
>>>>>>> https://github.com/apache/beam/issues/25326
>>>>>>>
>>>>>>> On Mon, 6 Feb 2023 at 17:26, Ashok KS <[email protected]> wrote:
>>>>>>>
>>>>>>>> Hi Shivam,
>>>>>>>>
>>>>>>>> Thanks a lot for your response. Yes it is a batch pipeline. My task
>>>>>>>> is to read a big query table, process the data and publish the Rows as 
>>>>>>>> a
>>>>>>>> PubSub message.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Ashok
>>>>>>>>
>>>>>>>> On Mon, 6 Feb 2023 at 10:52 pm, Shivam Singhal <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Hey Ashok KS,
>>>>>>>>>
>>>>>>>>> Is this a batch pipeline?
>>>>>>>>>
>>>>>>>>> On Mon, 6 Feb 2023 at 09:27, Ashok KS <[email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Hi All,
>>>>>>>>>>
>>>>>>>>>> Just sending a reminder in case anyone could help. I haven't
>>>>>>>>>> received any response to my issue.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Ashok
>>>>>>>>>>
>>>>>>>>>> On Fri, Feb 3, 2023 at 12:23 AM Ashok KS <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi All,
>>>>>>>>>>>
>>>>>>>>>>> I'm new to using Apache Beam using Go.
>>>>>>>>>>>
>>>>>>>>>>> pubsubio.Write(scope, "project", "topic", ppMessages)
>>>>>>>>>>> When I try to publish a message in a topic I get an error
>>>>>>>>>>> message
>>>>>>>>>>> "Could not find the sink for pubsub, Check that the sink library
>>>>>>>>>>> specifies alwayslink = 1
>>>>>>>>>>>
>>>>>>>>>>> I found a StackOverFlow post for the same issue but it doesn't
>>>>>>>>>>> solve the problem.
>>>>>>>>>>>
>>>>>>>>>>> Stackoverflow Link
>>>>>>>>>>> <https://stackoverflow.com/questions/69651665/go-apache-beam-gcp-dataflow-could-not-find-the-sink-for-pubsub-check-that-th>
>>>>>>>>>>>
>>>>>>>>>>> Can someone please help?
>>>>>>>>>>>
>>>>>>>>>>> Regards,
>>>>>>>>>>> Ashok
>>>>>>>>>>>
>>>>>>>>>>

Reply via email to