Thanks for confirming Romain - also for the very fast reply! I'll continue with the workaround and reference BEAM-3409 inline as justification. I'm trying to wrap this up before travel next week, but if I get a chance I'll try and run this scenario (BEAM-3848) with your patch.
On Sun, Apr 1, 2018 at 12:05 PM, Romain Manni-Bucau <rmannibu...@gmail.com> wrote: > Hi > > I have the same blocker and created > > https://github.com/apache/beam/pull/4790 and https://github.com/apache/ > beam/pull/4965 to solve part of it > > > > Le 1 avr. 2018 11:35, "Tim Robertson" <timrobertson...@gmail.com> a > écrit : > > Hi devs > > I'm working on SolrIO tests for failure scenarios (i.e. an exception will > come out of the pipeline execution). I see that the exception is surfaced > to the driver while "direct-runner-worker" threads are still running. > This causes issue because: > > 1. The Solr tests do thread leak detection, and a solrClient.close() is > what removes the object > 2. @Teardown is not necessarily called which is what would close the > solrClient > > I can unregister all the solrClients that have been spawned. However I > have seen race conditions where there are still threads running creating > and registering clients. I need to someone ensure that all workers related > to the pipeline execution are indeed finished so no new ones are created > after the first exception is passed up. > > Currently I have this (psuedo code) which works, but I suspect someone can > suggest a better approach: > > // store the state of clients registered for object leak check > Set<Object> existingClients = registeredSolrClients(); > try { > pipeline.run(); > > } catch (Pipeline.PipelineExecutionException e) { > > > // Hack: await all bundle workers completing > while (namedThreadStillExists("direct-runner-worker")) { > Thread.sleep(100); > } > > // remove all solrClients created in this execution only > // since the teardown may not have done so > for (Object o : ObjectReleaseTracker.OBJECTS.keySet()) { > if (o instanceof SolrClient && !existingClients.contains(o)) { > ObjectReleaseTracker.release(o); > } > } > > // now we can do our assertions > expectedLogs.verifyWarn(String.format(SolrIO.Write.WriteFn.R > ETRY_ATTEMPT_LOG, 1)); > > > Please do point out the obvious if I am missing it - I am a newbie here... > > Thank you all very much, > Tim > (timrobertson...@gmail.com on the slack apache/beam channel) > > > >