[ 
https://issues.apache.org/jira/browse/STORM-1464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192886#comment-15192886
 ] 

ASF GitHub Bot commented on STORM-1464:
---------------------------------------

Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1044#discussion_r55965901
  
    --- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
 ---
    @@ -198,22 +194,62 @@ public final void execute(Tuple tuple) {
                     }
                 }
     
    -            if(this.rotationPolicy.mark(tuple, this.offset)) {
    -                try {
    -                    rotateOutputFile();
    -                    this.rotationPolicy.reset();
    -                    this.offset = 0;
    -                } catch (IOException e) {
    -                    this.collector.reportError(e);
    -                    LOG.warn("File could not be rotated");
    -                    //At this point there is nothing to do.  In all 
likelihood any filesystem operations will fail.
    -                    //The next tuple will almost certainly fail to write 
and/or sync, which force a rotation.  That
    -                    //will give rotateAndReset() a chance to work which 
includes creating a fresh file handle.
    -                }
    +            if (writer != null && writer.needsRotation()) {
    +                    doRotationAndRemoveWriter(writerKey, writer);
                 }
             }
         }
     
    +    private AbstractHDFSWriter getOrCreateWriter(String writerKey, Tuple 
tuple) throws IOException {
    +        AbstractHDFSWriter writer;
    +
    +        writer = writers.get(writerKey);
    +        if (writer == null) {
    +            if (writers.size() >= maxOpenFiles)
    +            {
    +                String keyToOldest = getKeyToOldestWriter();
    +                AbstractHDFSWriter oldest = writers.get(keyToOldest);
    +                rotateOutputFile(oldest);
    +                writers.remove(keyToOldest);
    +            }
    +
    +            Path pathForNextFile = getBasePathForNextFile(tuple);
    +            writer = makeNewWriter(pathForNextFile, tuple);
    +            writers.put(writerKey, writer);
    +            this.rotation++;
    +        }
    +        return writer;
    +    }
    +
    +    /**
    +     * A tuple must be mapped to a writer based on two factors:
    +     *  - bolt specific logic that must separate tuples into different 
files in the same directory (see the avro bolt
    +     *    for an example of this)
    +     *  - the directory the tuple will be partioned into
    +     *
    +     * @param tuple
    +     * @return
    +     */
    +    private String getHashKeyForTuple(Tuple tuple) {
    +        final String boltKey = getWriterKey(tuple);
    --- End diff --
    
    Why a separate key instead of the partition path itself as the key ?


> storm-hdfs should support writing to multiple files
> ---------------------------------------------------
>
>                 Key: STORM-1464
>                 URL: https://issues.apache.org/jira/browse/STORM-1464
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-hdfs
>            Reporter: Aaron Dossett
>            Assignee: Aaron Dossett
>              Labels: avro
>
> Examples of when this is needed include:
> - One avro bolt writing multiple schemas, each of which require a different 
> file. Schema evolution is a common use of avro and the avro bolt should 
> support that seamlessly.
> - Partitioning output to different directories based on the tuple contents.  
> For example, if the tuple contains a "USER" field, it should be possible to 
> partition based on that value.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to