Romain is correct, you would need some global reference counting here to use the close() callback. The problem is that the input subscription is a pipeline-wide resource, it's not a per-reader resource.
On Thu, Aug 2, 2018 at 10:07 AM Romain Manni-Bucau <rmannibu...@gmail.com> wrote: > > > Le jeu. 2 août 2018 18:32, Andrew Pilloud <apill...@google.com> a écrit : > >> The subscriptions I want to clean up are ones that are implicitly created >> by the PubsubIO. These subscriptions are created then leaked, they aren't >> reused in future pipelines so the data loss issues are moot here. I agree >> that we don't want to tear down user supplied subscriptions. >> >> I've been doing some more digging, it looks like the Source.Reader >> interface has a close() callback >> <https://beam.apache.org/documentation/sdks/javadoc/2.5.0/org/apache/beam/sdk/io/Source.Reader.html#close-->. >> Is that a place I might be able to do cleanup? (It appears this is hooked >> up to RichFunction.close() callback on Flink and called from the Direct >> Runner but possibly not called from other runners.) >> > > > It is after the parallelization (you can have N>1 readers in parallel) so > if you have some global reference counting to cleanup once yes, otherwise > it will be hard. > > >> Andrew >> >> On Thu, Aug 2, 2018 at 1:07 AM Reuven Lax <re...@google.com> wrote: >> >>> Actually I think SDF is the right way to fix this. The SDF can set a >>> timer at infinity (which will only fires when the pipeline shuts down). I >>> believe that SDF support is being added to the portability layer now, so >>> eventually all portable runners will support it, and maybe we can live with >>> the status quo until then. >>> >>> On Wed, Aug 1, 2018 at 9:59 PM Romain Manni-Bucau <rmannibu...@gmail.com> >>> wrote: >>> >>>> I agree Reuven. But leaking in a source doesnt give any guarantee >>>> regarding the execution since it will depends the runner and current API >>>> will not provide you that feature. Using a reference counting state can >>>> work better but would require a sdf migration (and will hit runner support >>>> issues :(). >>>> >>>> >>>> Le jeu. 2 août 2018 05:39, Reuven Lax <re...@google.com> a écrit : >>>> >>>>> Hi Romain, >>>>> >>>>> Andrew's example actually wouldn't work for that. With Google Cloud >>>>> Pub/Sub (the example source he referenced), if there is no subscription to >>>>> a topic, all publishes to that topic are dropped on the floor; if you >>>>> don't >>>>> want to lose data, your are expected to keep the subscription around >>>>> continuously. In this example, leaking a subscription is probably >>>>> preferable to losing date (especially since Pub/Sub itself garbage >>>>> collects >>>>> subscriptions that have been inactive for a long time). >>>>> >>>>> The answer might be that Beam does not have a good lifecycle story >>>>> here, and something needs to be built. >>>>> >>>>> Reuven >>>>> >>>>> On Tue, Jul 31, 2018 at 10:04 PM Romain Manni-Bucau < >>>>> rmannibu...@gmail.com> wrote: >>>>> >>>>>> Hi Andrew, >>>>>> >>>>>> IIRC sources should clean up their resources per method since they >>>>>> dont have a better lifecycle. Readers can create anything longer and >>>>>> release it at close time. >>>>>> >>>>>> >>>>>> Le mer. 1 août 2018 00:31, Andrew Pilloud <apill...@google.com> a >>>>>> écrit : >>>>>> >>>>>>> Some of our IOs create external resources that need to be cleaned up >>>>>>> when a pipeline is terminated. It looks like the >>>>>>> org.apache.beam.sdk.io.UnboundedSource interface is called on creation, >>>>>>> but >>>>>>> there is no call for cleanup. For example, PubsubIO creates a >>>>>>> Pubsub subcription in createReader()/split() and it should be deleted at >>>>>>> shutdown. Does anyone have ideas on how I might make this happen? >>>>>>> >>>>>>> (I filed https://issues.apache.org/jira/browse/BEAM-5051 tracking >>>>>>> the PubSub specific issue.) >>>>>>> >>>>>>> Andrew >>>>>>> >>>>>>