Correct - teardown is currently run in the direct runner, but asynchronously. I believe Romain's pending PRs should solve this for your use case.
On Sun, Apr 1, 2018 at 3:13 AM Tim Robertson <[email protected]> wrote: > Thanks for confirming Romain - also for the very fast reply! > > I'll continue with the workaround and reference BEAM-3409 inline as > justification. > I'm trying to wrap this up before travel next week, but if I get a chance > I'll try and run this scenario (BEAM-3848) with your patch. > > > > On Sun, Apr 1, 2018 at 12:05 PM, Romain Manni-Bucau <[email protected] > > wrote: > >> Hi >> >> I have the same blocker and created >> >> https://github.com/apache/beam/pull/4790 and >> https://github.com/apache/beam/pull/4965 to solve part of it >> >> >> >> Le 1 avr. 2018 11:35, "Tim Robertson" <[email protected]> a >> écrit : >> >> Hi devs >> >> I'm working on SolrIO tests for failure scenarios (i.e. an exception will >> come out of the pipeline execution). I see that the exception is surfaced >> to the driver while "direct-runner-worker" threads are still running. >> This causes issue because: >> >> 1. The Solr tests do thread leak detection, and a solrClient.close() >> is what removes the object >> 2. @Teardown is not necessarily called which is what would close the >> solrClient >> >> I can unregister all the solrClients that have been spawned. However I >> have seen race conditions where there are still threads running creating >> and registering clients. I need to someone ensure that all workers related >> to the pipeline execution are indeed finished so no new ones are created >> after the first exception is passed up. >> >> Currently I have this (psuedo code) which works, but I suspect someone >> can suggest a better approach: >> >> // store the state of clients registered for object leak check >> Set<Object> existingClients = registeredSolrClients(); >> try { >> pipeline.run(); >> >> } catch (Pipeline.PipelineExecutionException e) { >> >> >> // Hack: await all bundle workers completing >> while (namedThreadStillExists("direct-runner-worker")) { >> Thread.sleep(100); >> } >> >> // remove all solrClients created in this execution only >> // since the teardown may not have done so >> for (Object o : ObjectReleaseTracker.OBJECTS.keySet()) { >> if (o instanceof SolrClient && !existingClients.contains(o)) { >> ObjectReleaseTracker.release(o); >> } >> } >> >> // now we can do our assertions >> expectedLogs.verifyWarn(String.format(SolrIO.Write.WriteFn. >> RETRY_ATTEMPT_LOG, 1)); >> >> >> Please do point out the obvious if I am missing it - I am a newbie here... >> >> Thank you all very much, >> Tim >> ([email protected] on the slack apache/beam channel) >> >> >> >> >
