[
https://issues.apache.org/jira/browse/STORM-2509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16006598#comment-16006598
]
Ryan Persaud commented on STORM-2509:
-------------------------------------
I used code in the HiveBolt as a starting point for handling retiring writers
(see below), and I'll create a PR with the same:
// Setting accessOrder to true to facilitate obtaining the LRU entry
writers = new LinkedHashMap<>(this.maxOpenFiles, 0.75f, true);
private AbstractHDFSWriter getOrCreateWriter(String writerKey, Tuple tuple)
throws IOException {
AbstractHDFSWriter writer = writers.get( writerKey );
if (writer == null) {
LOG.debug("Creating Writer for writerKey : " + writerKey);
Path pathForNextFile = getBasePathForNextFile(tuple);
writer = makeNewWriter(pathForNextFile, tuple);
if (writers.size() > (this.maxOpenFiles - 1)) {
LOG.info("cached writers size {} exceeded maxOpenFiles {} ",
writers.size(), this.maxOpenFiles);
retireEldestWriter();
}
this.writers.put(writerKey, writer);
}
return writer;
}
/**
* Locate writer that has not been used for longest time and retire it
*/
private void retireEldestWriter() throws IOException {
LOG.info("Attempting to close eldest writer");
Iterator<String> iterator = this.writers.keySet().iterator();
if (iterator.hasNext()) {
String eldestKey = iterator.next();
doRotationAndRemoveWriter(eldestKey, this.writers.get(eldestKey));
}
}
> Writers in AbstractHDFSBolt are not closed/rotated when evicted from
> WritersMap
> -------------------------------------------------------------------------------
>
> Key: STORM-2509
> URL: https://issues.apache.org/jira/browse/STORM-2509
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-hdfs
> Affects Versions: 1.1.0
> Reporter: Ryan Persaud
> Priority: Minor
>
> When the eldest entry in the WritersMap in the AbstractHDFSBolt gets removed
> due to the number of writers exceeding maxWriters (see below), the writer is
> not closed and rotation actions are not executed (doRotationAndRemoveWriter
> is not called).
> static class WritersMap extends LinkedHashMap<String, AbstractHDFSWriter> {
> final long maxWriters;
> public WritersMap(long maxWriters) {
> super((int)maxWriters, 0.75f, true);
> this.maxWriters = maxWriters;
> }
> @Override
> protected boolean removeEldestEntry(Map.Entry<String, AbstractHDFSWriter>
> eldest) {
> return this.size() > this.maxWriters;
> }
> }
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)