[
https://issues.apache.org/jira/browse/STORM-211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14017693#comment-14017693
]
ASF GitHub Bot commented on STORM-211:
--------------------------------------
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/incubator-storm/pull/128#discussion_r13385682
--- Diff:
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
---
@@ -0,0 +1,293 @@
+package org.apache.storm.hdfs.trident;
+
+import backtype.storm.task.IMetricsContext;
+import backtype.storm.topology.FailedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.storm.hdfs.common.rotation.RotationAction;
+import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
+import org.apache.storm.hdfs.trident.format.FileNameFormat;
+import org.apache.storm.hdfs.trident.format.RecordFormat;
+import org.apache.storm.hdfs.trident.format.SequenceFormat;
+import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.State;
+import storm.trident.tuple.TridentTuple;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+
+public class HdfsState implements State {
+
+ public static abstract class Options implements Serializable {
+
+ protected String fsUrl;
+ protected String configKey;
+ protected FileSystem fs;
+ private Path currentFile;
+ protected FileRotationPolicy rotationPolicy;
+ protected FileNameFormat fileNameFormat;
+ protected int rotation = 0;
+ protected Configuration hdfsConfig;
+ protected ArrayList<RotationAction> rotationActions = new
ArrayList<RotationAction>();
+
+ abstract void closeOutputFile() throws IOException;
+
+ abstract Path createOutputFile() throws IOException;
+
+ abstract void execute(List<TridentTuple> tuples) throws
IOException;
+
+ abstract void doPrepare(Map conf, int partitionIndex, int
numPartitions) throws IOException;
+
+ protected void rotateOutputFile() throws IOException {
+ LOG.info("Rotating output file...");
+ long start = System.currentTimeMillis();
+ closeOutputFile();
+ this.rotation++;
+
+ Path newFile = createOutputFile();
+ LOG.info("Performing {} file rotation actions.",
this.rotationActions.size());
+ for(RotationAction action : this.rotationActions){
+ action.execute(this.fs, this.currentFile);
+ }
+ this.currentFile = newFile;
+ long time = System.currentTimeMillis() - start;
+ LOG.info("File rotation took {} ms.", time);
+
+
+ }
+
+ void prepare(Map conf, int partitionIndex, int numPartitions){
+ if (this.rotationPolicy == null) throw new
IllegalStateException("RotationPolicy must be specified.");
+ if (this.fsUrl == null) {
+ throw new IllegalStateException("File system URL must be
specified.");
+ }
+ this.fileNameFormat.prepare(conf, partitionIndex,
numPartitions);
+ this.hdfsConfig = new Configuration();
+ Map<String, Object> map = (Map<String,
Object>)conf.get(this.configKey);
+ if(map != null){
+ for(String key : map.keySet()){
+ this.hdfsConfig.set(key, String.valueOf(map.get(key)));
+ }
+ }
+ try{
+ HdfsSecurityUtil.login(conf, hdfsConfig);
+ doPrepare(conf, partitionIndex, numPartitions);
+ this.currentFile = createOutputFile();
+
+ } catch (Exception e){
+ throw new RuntimeException("Error preparing HdfsState: " +
e.getMessage(), e);
+ }
+ }
+
+ }
+
+ public static class HdfsFileOptions extends Options {
+
+ private FSDataOutputStream out;
+ protected RecordFormat format;
+ private long offset = 0;
+
+ public HdfsFileOptions withFsUrl(String fsUrl){
+ this.fsUrl = fsUrl;
+ return this;
+ }
+
+ public HdfsFileOptions withConfigKey(String configKey){
+ this.configKey = configKey;
+ return this;
+ }
+
+ public HdfsFileOptions withFileNameFormat(FileNameFormat
fileNameFormat){
+ this.fileNameFormat = fileNameFormat;
+ return this;
+ }
+
+ public HdfsFileOptions withRecordFormat(RecordFormat format){
+ this.format = format;
+ return this;
+ }
+
+ public HdfsFileOptions withRotationPolicy(FileRotationPolicy
rotationPolicy){
+ this.rotationPolicy = rotationPolicy;
+ return this;
+ }
+
+ public HdfsFileOptions addRotationAction(RotationAction action){
+ this.rotationActions.add(action);
+ return this;
+ }
+
+ @Override
+ void doPrepare(Map conf, int partitionIndex, int numPartitions)
throws IOException {
+ LOG.info("Preparing HDFS Bolt...");
+ this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
+ }
+
+ @Override
+ void closeOutputFile() throws IOException {
+ this.out.close();
+ }
+
+ @Override
+ Path createOutputFile() throws IOException {
+ Path path = new Path(this.fileNameFormat.getPath(),
this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
+ this.out = this.fs.create(path);
+ return path;
+ }
+
+ @Override
+ public void execute(List<TridentTuple> tuples) throws IOException {
+ boolean rotated = false;
+ for(TridentTuple tuple : tuples){
+ byte[] bytes = this.format.format(tuple);
+ out.write(bytes);
+ this.offset += bytes.length;
+
+ if(this.rotationPolicy.mark(tuple, this.offset)){
+ rotateOutputFile();
+ this.offset = 0;
+ this.rotationPolicy.reset();
+ rotated = true;
+ }
+ }
+ if(!rotated){
+ if(this.out instanceof HdfsDataOutputStream){
+
((HdfsDataOutputStream)this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
+ } else {
+ this.out.hsync();
+ }
+ }
+ }
+ }
+
+ public static class SequenceFileOptions extends Options {
+ private SequenceFormat format;
+ private SequenceFile.CompressionType compressionType =
SequenceFile.CompressionType.RECORD;
+ private SequenceFile.Writer writer;
--- End diff --
transient
> Add module for HDFS integration
> -------------------------------
>
> Key: STORM-211
> URL: https://issues.apache.org/jira/browse/STORM-211
> Project: Apache Storm (Incubating)
> Issue Type: Sub-task
> Reporter: P. Taylor Goetz
>
> Add a module with generic components (storm, trident) for interacting with
> HDFS:
> - Write to regular and sequence files
> - Core bolts, and Trident state implementation.
> - Integrate with secure (kerberos-enabled) HDFS
--
This message was sent by Atlassian JIRA
(v6.2#6252)