Actually I think SDF is the right way to fix this. The SDF can set a timer
at infinity (which will only fires when the pipeline shuts down). I believe
that SDF support is being added to the portability layer now, so eventually
all portable runners will support it, and maybe we can live with the status
quo until then.

On Wed, Aug 1, 2018 at 9:59 PM Romain Manni-Bucau <rmannibu...@gmail.com>
wrote:

> I agree Reuven. But leaking in a source doesnt give any guarantee
> regarding the execution since it will depends the runner and current API
> will not provide you that feature. Using a reference counting state can
> work better but would require a sdf migration (and will hit runner support
> issues :().
>
>
> Le jeu. 2 août 2018 05:39, Reuven Lax <re...@google.com> a écrit :
>
>> Hi Romain,
>>
>> Andrew's example actually wouldn't work for that. With Google Cloud
>> Pub/Sub (the example source he referenced), if there is no subscription to
>> a topic, all publishes to that topic are dropped on the floor; if you don't
>> want to lose data, your are expected to keep the subscription around
>> continuously. In this example, leaking a subscription is probably
>> preferable to losing date (especially since Pub/Sub itself garbage collects
>> subscriptions that have been inactive for a long time).
>>
>> The answer might be that Beam does not have a good lifecycle story here,
>> and something needs to be built.
>>
>> Reuven
>>
>> On Tue, Jul 31, 2018 at 10:04 PM Romain Manni-Bucau <
>> rmannibu...@gmail.com> wrote:
>>
>>> Hi Andrew,
>>>
>>> IIRC sources should clean up their resources per method since they dont
>>> have a better lifecycle. Readers can create anything longer and release it
>>> at close time.
>>>
>>>
>>> Le mer. 1 août 2018 00:31, Andrew Pilloud <apill...@google.com> a
>>> écrit :
>>>
>>>> Some of our IOs create external resources that need to be cleaned up
>>>> when a pipeline is terminated. It looks like the
>>>> org.apache.beam.sdk.io.UnboundedSource interface is called on creation, but
>>>> there is no call for cleanup. For example, PubsubIO creates a Pubsub
>>>> subcription in createReader()/split() and it should be deleted at shutdown.
>>>> Does anyone have ideas on how I might make this happen?
>>>>
>>>> (I filed https://issues.apache.org/jira/browse/BEAM-5051 tracking the
>>>> PubSub specific issue.)
>>>>
>>>> Andrew
>>>>
>>>

Reply via email to