Romain is correct, you would need some global reference counting here to
use the close() callback. The problem is that the input subscription is a
pipeline-wide resource, it's not a per-reader resource.

On Thu, Aug 2, 2018 at 10:07 AM Romain Manni-Bucau <rmannibu...@gmail.com>
wrote:

>
>
> Le jeu. 2 août 2018 18:32, Andrew Pilloud <apill...@google.com> a écrit :
>
>> The subscriptions I want to clean up are ones that are implicitly created
>> by the PubsubIO. These subscriptions are created then leaked, they aren't
>> reused in future pipelines so the data loss issues are moot here. I agree
>> that we don't want to tear down user supplied subscriptions.
>>
>> I've been doing some more digging, it looks like the Source.Reader
>> interface has a close() callback
>> <https://beam.apache.org/documentation/sdks/javadoc/2.5.0/org/apache/beam/sdk/io/Source.Reader.html#close-->.
>> Is that a place I might be able to do cleanup? (It appears this is hooked
>> up to RichFunction.close() callback on Flink and called from the Direct
>> Runner but possibly not called from other runners.)
>>
>
>
> It is after the parallelization (you can have N>1 readers in parallel) so
> if you have some global reference counting to cleanup once yes, otherwise
> it will be hard.
>
>
>> Andrew
>>
>> On Thu, Aug 2, 2018 at 1:07 AM Reuven Lax <re...@google.com> wrote:
>>
>>> Actually I think SDF is the right way to fix this. The SDF can set a
>>> timer at infinity (which will only fires when the pipeline shuts down). I
>>> believe that SDF support is being added to the portability layer now, so
>>> eventually all portable runners will support it, and maybe we can live with
>>> the status quo until then.
>>>
>>> On Wed, Aug 1, 2018 at 9:59 PM Romain Manni-Bucau <rmannibu...@gmail.com>
>>> wrote:
>>>
>>>> I agree Reuven. But leaking in a source doesnt give any guarantee
>>>> regarding the execution since it will depends the runner and current API
>>>> will not provide you that feature. Using a reference counting state can
>>>> work better but would require a sdf migration (and will hit runner support
>>>> issues :().
>>>>
>>>>
>>>> Le jeu. 2 août 2018 05:39, Reuven Lax <re...@google.com> a écrit :
>>>>
>>>>> Hi Romain,
>>>>>
>>>>> Andrew's example actually wouldn't work for that. With Google Cloud
>>>>> Pub/Sub (the example source he referenced), if there is no subscription to
>>>>> a topic, all publishes to that topic are dropped on the floor; if you 
>>>>> don't
>>>>> want to lose data, your are expected to keep the subscription around
>>>>> continuously. In this example, leaking a subscription is probably
>>>>> preferable to losing date (especially since Pub/Sub itself garbage 
>>>>> collects
>>>>> subscriptions that have been inactive for a long time).
>>>>>
>>>>> The answer might be that Beam does not have a good lifecycle story
>>>>> here, and something needs to be built.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Tue, Jul 31, 2018 at 10:04 PM Romain Manni-Bucau <
>>>>> rmannibu...@gmail.com> wrote:
>>>>>
>>>>>> Hi Andrew,
>>>>>>
>>>>>> IIRC sources should clean up their resources per method since they
>>>>>> dont have a better lifecycle. Readers can create anything longer and
>>>>>> release it at close time.
>>>>>>
>>>>>>
>>>>>> Le mer. 1 août 2018 00:31, Andrew Pilloud <apill...@google.com> a
>>>>>> écrit :
>>>>>>
>>>>>>> Some of our IOs create external resources that need to be cleaned up
>>>>>>> when a pipeline is terminated. It looks like the
>>>>>>> org.apache.beam.sdk.io.UnboundedSource interface is called on creation, 
>>>>>>> but
>>>>>>> there is no call for cleanup. For example, PubsubIO creates a
>>>>>>> Pubsub subcription in createReader()/split() and it should be deleted at
>>>>>>> shutdown. Does anyone have ideas on how I might make this happen?
>>>>>>>
>>>>>>> (I filed https://issues.apache.org/jira/browse/BEAM-5051 tracking
>>>>>>> the PubSub specific issue.)
>>>>>>>
>>>>>>> Andrew
>>>>>>>
>>>>>>

Reply via email to