It looks like if the SourceHandler has a problem with it’s output stream, it will never recover. The handler will be in the map and continue to be used, but it will continue to throw exceptions.
Is there a reason why we don’t try to recover and recreate the SourceHandler, such as: synchronized SourceHandler getSourceHandler(String sourceType, String stellarResult, WriterConfiguration config) throws IOException { SourceHandlerKey key = new SourceHandlerKey(sourceType, stellarResult); SourceHandler ret = sourceHandlerMap.get(key); // BEGIN FAKE CODE PART if(ret != null && ret.isUsableOrIsOpenOrSomething()) { try { ret.close(); } catch(Exception e){ // NO LOGGER IN THIS CLASS } sourceHandlerMap.remove(key); ret = null; } // END FAKE CODE PART if(ret == null) { if(sourceHandlerMap.size() >= maxOpenFiles) { throw new IllegalStateException("Too many HDFS files open!"); } ret = new SourceHandler(rotationActions, rotationPolicy, syncPolicyCreator.create(sourceType, config), new PathExtensionFileNameFormat(key.getStellarResult(), fileNameFormat), new SourceHandlerCallback(sourceHandlerMap, key)); sourceHandlerMap.put(key, ret); } return ret; } It seems strange for something that is supposed to be a long long running process, writing to a distributed network store.