Hi devs

I'm working on SolrIO tests for failure scenarios (i.e. an exception will
come out of the pipeline execution).  I see that the exception is surfaced
to the driver while "direct-runner-worker" threads are still running.  This
causes issue because:

  1. The Solr tests do thread leak detection, and a solrClient.close() is
what removes the object
  2. @Teardown is not necessarily called which is what would close the
solrClient

I can unregister all the solrClients that have been spawned.  However I
have seen race conditions where there are still threads running creating
and registering clients. I need to someone ensure that all workers related
to the pipeline execution are indeed finished so no new ones are created
after the first exception is passed up.

Currently I have this (psuedo code) which works, but I suspect someone can
suggest a better approach:

// store the state of clients registered for object leak check
Set<Object> existingClients = registeredSolrClients();
try {
  pipeline.run();

} catch (Pipeline.PipelineExecutionException e) {


// Hack: await all bundle workers completing
while (namedThreadStillExists("direct-runner-worker")) {
Thread.sleep(100);
}

// remove all solrClients created in this execution only
// since the teardown may not have done so
for (Object o : ObjectReleaseTracker.OBJECTS.keySet()) {
if (o instanceof SolrClient && !existingClients.contains(o)) {
ObjectReleaseTracker.release(o);
}
}

// now we can do our assertions
expectedLogs.verifyWarn(String.format(SolrIO.Write.WriteFn.RETRY_ATTEMPT_LOG,
1));


Please do point out the obvious if I am missing it - I am a newbie here...

Thank you all very much,
Tim
(timrobertson...@gmail.com on the slack apache/beam channel)

Reply via email to