[ 
https://issues.apache.org/jira/browse/STORM-2509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16006598#comment-16006598
 ] 

Ryan Persaud commented on STORM-2509:
-------------------------------------

I used code in the HiveBolt as a starting point for handling retiring writers 
(see below), and I'll create a PR with the same:

 // Setting accessOrder to true to facilitate obtaining the LRU  entry
 writers = new LinkedHashMap<>(this.maxOpenFiles, 0.75f, true);

private AbstractHDFSWriter getOrCreateWriter(String writerKey, Tuple tuple) 
throws IOException {
    AbstractHDFSWriter writer = writers.get( writerKey );
    if (writer == null) {
        LOG.debug("Creating Writer for writerKey : " + writerKey);
        Path pathForNextFile = getBasePathForNextFile(tuple);
        writer = makeNewWriter(pathForNextFile, tuple);

        if (writers.size() > (this.maxOpenFiles - 1)) {
            LOG.info("cached writers size {} exceeded maxOpenFiles {} ", 
writers.size(), this.maxOpenFiles);
            retireEldestWriter();
        }
        this.writers.put(writerKey, writer);
    }
    return writer;
}

/**
 * Locate writer that has not been used for longest time and retire it
 */
private void retireEldestWriter() throws IOException {
    LOG.info("Attempting to close eldest writer");
    Iterator<String> iterator = this.writers.keySet().iterator();
    if (iterator.hasNext()) {
        String eldestKey = iterator.next();
        doRotationAndRemoveWriter(eldestKey, this.writers.get(eldestKey));
    }
}

> Writers in AbstractHDFSBolt are not closed/rotated when evicted from 
> WritersMap
> -------------------------------------------------------------------------------
>
>                 Key: STORM-2509
>                 URL: https://issues.apache.org/jira/browse/STORM-2509
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-hdfs
>    Affects Versions: 1.1.0
>            Reporter: Ryan Persaud
>            Priority: Minor
>
> When the eldest entry in the WritersMap in the AbstractHDFSBolt gets removed 
> due to the number of writers exceeding maxWriters (see below), the writer is 
> not closed and rotation actions are not executed (doRotationAndRemoveWriter 
> is not called).
> static class WritersMap extends LinkedHashMap<String, AbstractHDFSWriter> {
>     final long maxWriters;
>     public WritersMap(long maxWriters) {
>         super((int)maxWriters, 0.75f, true);
>         this.maxWriters = maxWriters;
>     }
>     @Override
>     protected boolean removeEldestEntry(Map.Entry<String, AbstractHDFSWriter> 
> eldest) {
>         return this.size() > this.maxWriters;
>     }
> }



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to