Note: I don’t think my pseudo code is actually the best way to resolve this
On September 4, 2017 at 21:37:29, Otto Fowler ([email protected]) wrote: Laurens, If you have exceptions, you may have seen the effects of this, if from a different cause. On September 4, 2017 at 13:25:42, Laurens Vets ([email protected]) wrote: Hi Otto, Might this be related to the issues I was seeing? If/when indexing topology got broken, I couldn't recover until I cleared all queues. On 2017-09-04 08:23, Otto Fowler wrote: > It looks like if the SourceHandler has a problem with it’s output > stream, > it will never recover. > The handler will be in the map and continue to be used, but it will > continue to throw exceptions. > > Is there a reason why we don’t try to recover and recreate the > SourceHandler, > such as: > > synchronized SourceHandler getSourceHandler(String sourceType, String > stellarResult, WriterConfiguration config) throws IOException { > SourceHandlerKey key = new SourceHandlerKey(sourceType, > stellarResult); > SourceHandler ret = sourceHandlerMap.get(key); > > // BEGIN FAKE CODE PART > if(ret != null && ret.isUsableOrIsOpenOrSomething()) { > try { > ret.close(); > } catch(Exception e){ > // NO LOGGER IN THIS CLASS > } > sourceHandlerMap.remove(key); > ret = null; > } > // END FAKE CODE PART > > if(ret == null) { > if(sourceHandlerMap.size() >= maxOpenFiles) { > throw new IllegalStateException("Too many HDFS files open!"); > } > ret = new SourceHandler(rotationActions, > rotationPolicy, > syncPolicyCreator.create(sourceType, > config), > new > PathExtensionFileNameFormat(key.getStellarResult(), fileNameFormat), > new SourceHandlerCallback(sourceHandlerMap, > key)); > sourceHandlerMap.put(key, ret); > } > return ret; > } > > > > It seems strange for something that is supposed to be a long long > running > process, writing to a distributed network store.
