Indeed. It's exactly what Romain's PR is about. Regards JB
Le 1 avr. 2018 à 19:33, à 19:33, Reuven Lax <[email protected]> a écrit: >Correct - teardown is currently run in the direct runner, but >asynchronously. I believe Romain's pending PRs should solve this for >your >use case. > >On Sun, Apr 1, 2018 at 3:13 AM Tim Robertson ><[email protected]> >wrote: > >> Thanks for confirming Romain - also for the very fast reply! >> >> I'll continue with the workaround and reference BEAM-3409 inline as >> justification. >> I'm trying to wrap this up before travel next week, but if I get a >chance >> I'll try and run this scenario (BEAM-3848) with your patch. >> >> >> >> On Sun, Apr 1, 2018 at 12:05 PM, Romain Manni-Bucau ><[email protected] >> > wrote: >> >>> Hi >>> >>> I have the same blocker and created >>> >>> https://github.com/apache/beam/pull/4790 and >>> https://github.com/apache/beam/pull/4965 to solve part of it >>> >>> >>> >>> Le 1 avr. 2018 11:35, "Tim Robertson" <[email protected]> a >>> écrit : >>> >>> Hi devs >>> >>> I'm working on SolrIO tests for failure scenarios (i.e. an exception >will >>> come out of the pipeline execution). I see that the exception is >surfaced >>> to the driver while "direct-runner-worker" threads are still >running. >>> This causes issue because: >>> >>> 1. The Solr tests do thread leak detection, and a >solrClient.close() >>> is what removes the object >>> 2. @Teardown is not necessarily called which is what would close >the >>> solrClient >>> >>> I can unregister all the solrClients that have been spawned. >However I >>> have seen race conditions where there are still threads running >creating >>> and registering clients. I need to someone ensure that all workers >related >>> to the pipeline execution are indeed finished so no new ones are >created >>> after the first exception is passed up. >>> >>> Currently I have this (psuedo code) which works, but I suspect >someone >>> can suggest a better approach: >>> >>> // store the state of clients registered for object leak check >>> Set<Object> existingClients = registeredSolrClients(); >>> try { >>> pipeline.run(); >>> >>> } catch (Pipeline.PipelineExecutionException e) { >>> >>> >>> // Hack: await all bundle workers completing >>> while (namedThreadStillExists("direct-runner-worker")) { >>> Thread.sleep(100); >>> } >>> >>> // remove all solrClients created in this execution only >>> // since the teardown may not have done so >>> for (Object o : ObjectReleaseTracker.OBJECTS.keySet()) { >>> if (o instanceof SolrClient && !existingClients.contains(o)) { >>> ObjectReleaseTracker.release(o); >>> } >>> } >>> >>> // now we can do our assertions >>> expectedLogs.verifyWarn(String.format(SolrIO.Write.WriteFn. >>> RETRY_ATTEMPT_LOG, 1)); >>> >>> >>> Please do point out the obvious if I am missing it - I am a newbie >here... >>> >>> Thank you all very much, >>> Tim >>> ([email protected] on the slack apache/beam channel) >>> >>> >>> >>> >>
