It seems there is still an issue with teardown not being called in failed tasks, just created BEAM-4040 to track it.
On Thu, Apr 5, 2018 at 4:45 PM, Tim Robertson <timrobertson...@gmail.com> wrote: > Will do - I'll report the result on https://github.com/apache/beam/pull/4905 > > On Thu, Apr 5, 2018 at 11:45 AM, Ismaël Mejía <ieme...@gmail.com> wrote: >> >> For info, Romain's PR was merged today, can you confirm if this fixes >> the issue Tim. >> >> On Sun, Apr 1, 2018 at 9:21 PM, Tim Robertson <timrobertson...@gmail.com> >> wrote: >> > Thanks all. >> > >> > I went with what I outlined above, which you can see in this test. >> > >> > https://github.com/timrobertson100/beam/blob/BEAM-3848/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTest.java#L285 >> > >> > That forms part of this PR https://github.com/apache/beam/pull/4956 >> > >> > I'll monitor Romain's PR and back it out when appropriate. >> > >> > >> > >> > >> > >> > On Sun, Apr 1, 2018 at 8:20 PM, Jean-Baptiste Onofré <j...@nanthrax.net> >> > wrote: >> >> >> >> Indeed. It's exactly what Romain's PR is about. >> >> >> >> Regards >> >> JB >> >> Le 1 avr. 2018, à 19:33, Reuven Lax <re...@google.com> a écrit: >> >>> >> >>> Correct - teardown is currently run in the direct runner, but >> >>> asynchronously. I believe Romain's pending PRs should solve this for >> >>> your >> >>> use case. >> >>> >> >>> On Sun, Apr 1, 2018 at 3:13 AM Tim Robertson < >> >>> timrobertson...@gmail.com> >> >>> wrote: >> >>>> >> >>>> Thanks for confirming Romain - also for the very fast reply! >> >>>> >> >>>> I'll continue with the workaround and reference BEAM-3409 inline as >> >>>> justification. >> >>>> I'm trying to wrap this up before travel next week, but if I get a >> >>>> chance I'll try and run this scenario (BEAM-3848) with your patch. >> >>>> >> >>>> >> >>>> >> >>>> On Sun, Apr 1, 2018 at 12:05 PM, Romain Manni-Bucau >> >>>> <rmannibu...@gmail.com> wrote: >> >>>>> >> >>>>> Hi >> >>>>> >> >>>>> I have the same blocker and created >> >>>>> >> >>>>> https://github.com/apache/beam/pull/4790 and >> >>>>> https://github.com/apache/beam/pull/4965 to solve part of it >> >>>>> >> >>>>> >> >>>>> >> >>>>> Le 1 avr. 2018 11:35, "Tim Robertson" < timrobertson...@gmail.com> a >> >>>>> écrit : >> >>>>> >> >>>>> Hi devs >> >>>>> >> >>>>> I'm working on SolrIO tests for failure scenarios (i.e. an exception >> >>>>> will come out of the pipeline execution). I see that the exception >> >>>>> is >> >>>>> surfaced to the driver while " direct-runner-worker" threads are >> >>>>> still >> >>>>> running. This causes issue because: >> >>>>> >> >>>>> 1. The Solr tests do thread leak detection, and a >> >>>>> solrClient.close() >> >>>>> is what removes the object >> >>>>> 2. @Teardown is not necessarily called which is what would close >> >>>>> the >> >>>>> solrClient >> >>>>> >> >>>>> I can unregister all the solrClients that have been spawned. >> >>>>> However I >> >>>>> have seen race conditions where there are still threads running >> >>>>> creating and >> >>>>> registering clients. I need to someone ensure that all workers >> >>>>> related to >> >>>>> the pipeline execution are indeed finished so no new ones are >> >>>>> created after >> >>>>> the first exception is passed up. >> >>>>> >> >>>>> Currently I have this (psuedo code) which works, but I suspect >> >>>>> someone >> >>>>> can suggest a better approach: >> >>>>> >> >>>>> // store the state of clients registered for object leak check >> >>>>> Set<Object> existingClients = registeredSolrClients(); >> >>>>> try { >> >>>>> pipeline.run(); >> >>>>> >> >>>>> } catch (Pipeline.PipelineExecutionException e) { >> >>>>> >> >>>>> >> >>>>> // Hack: await all bundle workers completing >> >>>>> while (namedThreadStillExists("direct-runner-worker")) { >> >>>>> Thread.sleep(100); >> >>>>> } >> >>>>> >> >>>>> // remove all solrClients created in this execution only >> >>>>> // since the teardown may not have done so >> >>>>> for (Object o : ObjectReleaseTracker.OBJECTS.keySet()) { >> >>>>> if (o instanceof SolrClient && !existingClients.contains(o)) { >> >>>>> ObjectReleaseTracker.release(o); >> >>>>> } >> >>>>> } >> >>>>> >> >>>>> // now we can do our assertions >> >>>>> >> >>>>> >> >>>>> expectedLogs.verifyWarn(String.format(SolrIO.Write.WriteFn.RETRY_ATTEMPT_LOG, >> >>>>> 1)); >> >>>>> >> >>>>> >> >>>>> Please do point out the obvious if I am missing it - I am a newbie >> >>>>> here... >> >>>>> >> >>>>> Thank you all very much, >> >>>>> Tim >> >>>>> ( timrobertson...@gmail.com on the slack apache/beam channel) >> >>>>> >> >>>>> >> >>>>> >> >>>> >> > > >