[ 
https://issues.apache.org/jira/browse/STORM-211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14017694#comment-14017694
 ] 

ASF GitHub Bot commented on STORM-211:
--------------------------------------

Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/128#discussion_r13385705
  
    --- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java 
---
    @@ -0,0 +1,293 @@
    +package org.apache.storm.hdfs.trident;
    +
    +import backtype.storm.task.IMetricsContext;
    +import backtype.storm.topology.FailedException;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
    +import org.apache.hadoop.io.SequenceFile;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.storm.hdfs.common.rotation.RotationAction;
    +import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
    +import org.apache.storm.hdfs.trident.format.FileNameFormat;
    +import org.apache.storm.hdfs.trident.format.RecordFormat;
    +import org.apache.storm.hdfs.trident.format.SequenceFormat;
    +import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import storm.trident.operation.TridentCollector;
    +import storm.trident.state.State;
    +import storm.trident.tuple.TridentTuple;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.net.URI;
    +import java.util.ArrayList;
    +import java.util.EnumSet;
    +import java.util.List;
    +import java.util.Map;
    +
    +public class HdfsState implements State {
    +
    +    public static abstract class Options implements Serializable {
    +
    +        protected String fsUrl;
    +        protected String configKey;
    +        protected FileSystem fs;
    +        private Path currentFile;
    +        protected FileRotationPolicy rotationPolicy;
    +        protected FileNameFormat fileNameFormat;
    +        protected int rotation = 0;
    +        protected Configuration hdfsConfig;
    +        protected ArrayList<RotationAction> rotationActions = new 
ArrayList<RotationAction>();
    +
    +        abstract void closeOutputFile() throws IOException;
    +
    +        abstract Path createOutputFile() throws IOException;
    +
    +        abstract void execute(List<TridentTuple> tuples) throws 
IOException;
    +
    +        abstract void doPrepare(Map conf, int partitionIndex, int 
numPartitions) throws IOException;
    +
    +        protected void rotateOutputFile() throws IOException {
    +            LOG.info("Rotating output file...");
    +            long start = System.currentTimeMillis();
    +            closeOutputFile();
    +            this.rotation++;
    +
    +            Path newFile = createOutputFile();
    +            LOG.info("Performing {} file rotation actions.", 
this.rotationActions.size());
    +            for(RotationAction action : this.rotationActions){
    +                action.execute(this.fs, this.currentFile);
    +            }
    +            this.currentFile = newFile;
    +            long time = System.currentTimeMillis() - start;
    +            LOG.info("File rotation took {} ms.", time);
    +
    +
    +        }
    +
    +        void prepare(Map conf, int partitionIndex, int numPartitions){
    +            if (this.rotationPolicy == null) throw new 
IllegalStateException("RotationPolicy must be specified.");
    +            if (this.fsUrl == null) {
    +                throw new IllegalStateException("File system URL must be 
specified.");
    +            }
    +            this.fileNameFormat.prepare(conf, partitionIndex, 
numPartitions);
    +            this.hdfsConfig = new Configuration();
    +            Map<String, Object> map = (Map<String, 
Object>)conf.get(this.configKey);
    +            if(map != null){
    +                for(String key : map.keySet()){
    +                    this.hdfsConfig.set(key, String.valueOf(map.get(key)));
    +                }
    +            }
    +            try{
    +                HdfsSecurityUtil.login(conf, hdfsConfig);
    +                doPrepare(conf, partitionIndex, numPartitions);
    +                this.currentFile = createOutputFile();
    +
    +            } catch (Exception e){
    +                throw new RuntimeException("Error preparing HdfsState: " + 
e.getMessage(), e);
    +            }
    +        }
    +
    +    }
    +
    +    public static class HdfsFileOptions extends Options {
    +
    +        private FSDataOutputStream out;
    --- End diff --
    
    transient


> Add module for HDFS integration
> -------------------------------
>
>                 Key: STORM-211
>                 URL: https://issues.apache.org/jira/browse/STORM-211
>             Project: Apache Storm (Incubating)
>          Issue Type: Sub-task
>            Reporter: P. Taylor Goetz
>
> Add a module with generic components (storm, trident) for interacting with 
> HDFS:
> - Write to regular and sequence files
> - Core bolts, and Trident state implementation.
> - Integrate with secure (kerberos-enabled) HDFS



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to