[ 
https://issues.apache.org/jira/browse/STORM-211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14017693#comment-14017693
 ] 

ASF GitHub Bot commented on STORM-211:
--------------------------------------

Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/128#discussion_r13385682
  
    --- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java 
---
    @@ -0,0 +1,293 @@
    +package org.apache.storm.hdfs.trident;
    +
    +import backtype.storm.task.IMetricsContext;
    +import backtype.storm.topology.FailedException;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
    +import org.apache.hadoop.io.SequenceFile;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.storm.hdfs.common.rotation.RotationAction;
    +import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
    +import org.apache.storm.hdfs.trident.format.FileNameFormat;
    +import org.apache.storm.hdfs.trident.format.RecordFormat;
    +import org.apache.storm.hdfs.trident.format.SequenceFormat;
    +import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import storm.trident.operation.TridentCollector;
    +import storm.trident.state.State;
    +import storm.trident.tuple.TridentTuple;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.net.URI;
    +import java.util.ArrayList;
    +import java.util.EnumSet;
    +import java.util.List;
    +import java.util.Map;
    +
    +public class HdfsState implements State {
    +
    +    public static abstract class Options implements Serializable {
    +
    +        protected String fsUrl;
    +        protected String configKey;
    +        protected FileSystem fs;
    +        private Path currentFile;
    +        protected FileRotationPolicy rotationPolicy;
    +        protected FileNameFormat fileNameFormat;
    +        protected int rotation = 0;
    +        protected Configuration hdfsConfig;
    +        protected ArrayList<RotationAction> rotationActions = new 
ArrayList<RotationAction>();
    +
    +        abstract void closeOutputFile() throws IOException;
    +
    +        abstract Path createOutputFile() throws IOException;
    +
    +        abstract void execute(List<TridentTuple> tuples) throws 
IOException;
    +
    +        abstract void doPrepare(Map conf, int partitionIndex, int 
numPartitions) throws IOException;
    +
    +        protected void rotateOutputFile() throws IOException {
    +            LOG.info("Rotating output file...");
    +            long start = System.currentTimeMillis();
    +            closeOutputFile();
    +            this.rotation++;
    +
    +            Path newFile = createOutputFile();
    +            LOG.info("Performing {} file rotation actions.", 
this.rotationActions.size());
    +            for(RotationAction action : this.rotationActions){
    +                action.execute(this.fs, this.currentFile);
    +            }
    +            this.currentFile = newFile;
    +            long time = System.currentTimeMillis() - start;
    +            LOG.info("File rotation took {} ms.", time);
    +
    +
    +        }
    +
    +        void prepare(Map conf, int partitionIndex, int numPartitions){
    +            if (this.rotationPolicy == null) throw new 
IllegalStateException("RotationPolicy must be specified.");
    +            if (this.fsUrl == null) {
    +                throw new IllegalStateException("File system URL must be 
specified.");
    +            }
    +            this.fileNameFormat.prepare(conf, partitionIndex, 
numPartitions);
    +            this.hdfsConfig = new Configuration();
    +            Map<String, Object> map = (Map<String, 
Object>)conf.get(this.configKey);
    +            if(map != null){
    +                for(String key : map.keySet()){
    +                    this.hdfsConfig.set(key, String.valueOf(map.get(key)));
    +                }
    +            }
    +            try{
    +                HdfsSecurityUtil.login(conf, hdfsConfig);
    +                doPrepare(conf, partitionIndex, numPartitions);
    +                this.currentFile = createOutputFile();
    +
    +            } catch (Exception e){
    +                throw new RuntimeException("Error preparing HdfsState: " + 
e.getMessage(), e);
    +            }
    +        }
    +
    +    }
    +
    +    public static class HdfsFileOptions extends Options {
    +
    +        private FSDataOutputStream out;
    +        protected RecordFormat format;
    +        private long offset = 0;
    +
    +        public HdfsFileOptions withFsUrl(String fsUrl){
    +            this.fsUrl = fsUrl;
    +            return this;
    +        }
    +
    +        public HdfsFileOptions withConfigKey(String configKey){
    +            this.configKey = configKey;
    +            return this;
    +        }
    +
    +        public HdfsFileOptions withFileNameFormat(FileNameFormat 
fileNameFormat){
    +            this.fileNameFormat = fileNameFormat;
    +            return this;
    +        }
    +
    +        public HdfsFileOptions withRecordFormat(RecordFormat format){
    +            this.format = format;
    +            return this;
    +        }
    +
    +        public HdfsFileOptions withRotationPolicy(FileRotationPolicy 
rotationPolicy){
    +            this.rotationPolicy = rotationPolicy;
    +            return this;
    +        }
    +
    +        public HdfsFileOptions addRotationAction(RotationAction action){
    +            this.rotationActions.add(action);
    +            return this;
    +        }
    +
    +        @Override
    +        void doPrepare(Map conf, int partitionIndex, int numPartitions) 
throws IOException {
    +            LOG.info("Preparing HDFS Bolt...");
    +            this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
    +        }
    +
    +        @Override
    +        void closeOutputFile() throws IOException {
    +            this.out.close();
    +        }
    +
    +        @Override
    +        Path createOutputFile() throws IOException {
    +            Path path = new Path(this.fileNameFormat.getPath(), 
this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
    +            this.out = this.fs.create(path);
    +            return path;
    +        }
    +
    +        @Override
    +        public void execute(List<TridentTuple> tuples) throws IOException {
    +            boolean rotated = false;
    +            for(TridentTuple tuple : tuples){
    +                byte[] bytes = this.format.format(tuple);
    +                out.write(bytes);
    +                this.offset += bytes.length;
    +
    +                if(this.rotationPolicy.mark(tuple, this.offset)){
    +                    rotateOutputFile();
    +                    this.offset = 0;
    +                    this.rotationPolicy.reset();
    +                    rotated = true;
    +                }
    +            }
    +            if(!rotated){
    +                if(this.out instanceof HdfsDataOutputStream){
    +                    
((HdfsDataOutputStream)this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
    +                } else {
    +                    this.out.hsync();
    +                }
    +            }
    +        }
    +    }
    +
    +    public static class SequenceFileOptions extends Options {
    +        private SequenceFormat format;
    +        private SequenceFile.CompressionType compressionType = 
SequenceFile.CompressionType.RECORD;
    +        private SequenceFile.Writer writer;
    --- End diff --
    
    transient


> Add module for HDFS integration
> -------------------------------
>
>                 Key: STORM-211
>                 URL: https://issues.apache.org/jira/browse/STORM-211
>             Project: Apache Storm (Incubating)
>          Issue Type: Sub-task
>            Reporter: P. Taylor Goetz
>
> Add a module with generic components (storm, trident) for interacting with 
> HDFS:
> - Write to regular and sequence files
> - Core bolts, and Trident state implementation.
> - Integrate with secure (kerberos-enabled) HDFS



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to