Hi I have the same blocker and created
https://github.com/apache/beam/pull/4790 and https://github.com/apache/beam/pull/4965 to solve part of it Le 1 avr. 2018 11:35, "Tim Robertson" <timrobertson...@gmail.com> a écrit : Hi devs I'm working on SolrIO tests for failure scenarios (i.e. an exception will come out of the pipeline execution). I see that the exception is surfaced to the driver while "direct-runner-worker" threads are still running. This causes issue because: 1. The Solr tests do thread leak detection, and a solrClient.close() is what removes the object 2. @Teardown is not necessarily called which is what would close the solrClient I can unregister all the solrClients that have been spawned. However I have seen race conditions where there are still threads running creating and registering clients. I need to someone ensure that all workers related to the pipeline execution are indeed finished so no new ones are created after the first exception is passed up. Currently I have this (psuedo code) which works, but I suspect someone can suggest a better approach: // store the state of clients registered for object leak check Set<Object> existingClients = registeredSolrClients(); try { pipeline.run(); } catch (Pipeline.PipelineExecutionException e) { // Hack: await all bundle workers completing while (namedThreadStillExists("direct-runner-worker")) { Thread.sleep(100); } // remove all solrClients created in this execution only // since the teardown may not have done so for (Object o : ObjectReleaseTracker.OBJECTS.keySet()) { if (o instanceof SolrClient && !existingClients.contains(o)) { ObjectReleaseTracker.release(o); } } // now we can do our assertions expectedLogs.verifyWarn(String.format(SolrIO.Write.WriteFn.RETRY_ATTEMPT_LOG, 1)); Please do point out the obvious if I am missing it - I am a newbie here... Thank you all very much, Tim (timrobertson...@gmail.com on the slack apache/beam channel)