Thanks for confirming Romain - also for the very fast reply!

I'll continue with the workaround and reference BEAM-3409 inline as
justification.
I'm trying to wrap this up before travel next week, but if I get a chance
I'll try and run this scenario (BEAM-3848) with your patch.



On Sun, Apr 1, 2018 at 12:05 PM, Romain Manni-Bucau <rmannibu...@gmail.com>
wrote:

> Hi
>
> I have the same blocker and created
>
> https://github.com/apache/beam/pull/4790 and https://github.com/apache/
> beam/pull/4965 to solve part of it
>
>
>
> Le 1 avr. 2018 11:35, "Tim Robertson" <timrobertson...@gmail.com> a
> écrit :
>
> Hi devs
>
> I'm working on SolrIO tests for failure scenarios (i.e. an exception will
> come out of the pipeline execution).  I see that the exception is surfaced
> to the driver while "direct-runner-worker" threads are still running.
> This causes issue because:
>
>   1. The Solr tests do thread leak detection, and a solrClient.close() is
> what removes the object
>   2. @Teardown is not necessarily called which is what would close the
> solrClient
>
> I can unregister all the solrClients that have been spawned.  However I
> have seen race conditions where there are still threads running creating
> and registering clients. I need to someone ensure that all workers related
> to the pipeline execution are indeed finished so no new ones are created
> after the first exception is passed up.
>
> Currently I have this (psuedo code) which works, but I suspect someone can
> suggest a better approach:
>
> // store the state of clients registered for object leak check
> Set<Object> existingClients = registeredSolrClients();
> try {
>   pipeline.run();
>
> } catch (Pipeline.PipelineExecutionException e) {
>
>
> // Hack: await all bundle workers completing
> while (namedThreadStillExists("direct-runner-worker")) {
> Thread.sleep(100);
> }
>
> // remove all solrClients created in this execution only
> // since the teardown may not have done so
> for (Object o : ObjectReleaseTracker.OBJECTS.keySet()) {
> if (o instanceof SolrClient && !existingClients.contains(o)) {
> ObjectReleaseTracker.release(o);
> }
> }
>
> // now we can do our assertions
> expectedLogs.verifyWarn(String.format(SolrIO.Write.WriteFn.R
> ETRY_ATTEMPT_LOG, 1));
>
>
> Please do point out the obvious if I am missing it - I am a newbie here...
>
> Thank you all very much,
> Tim
> (timrobertson...@gmail.com on the slack apache/beam channel)
>
>
>
>

Reply via email to