Github user revans2 commented on a diff in the pull request:
https://github.com/apache/incubator-storm/pull/128#discussion_r13205187
--- Diff: external/storm-hdfs/README.md ---
@@ -0,0 +1,342 @@
+# Storm HDFS
+
+Storm components for interacting with HDFS file systems
+
+
+## Usage
+The following example will write pipe("|")-delimited files to the HDFS
path hdfs://localhost:54310/foo. After every
+1,000 tuples it will sync filesystem, making that data visible to other
HDFS clients. It will rotate files when they
+reach 5 megabytes in size.
+
+```java
+// use "|" instead of "," for field delimiter
+RecordFormat format = new DelimitedRecordFormat()
+ .withFieldDelimiter("|");
+
+// sync the filesystem after every 1k tuples
+SyncPolicy syncPolicy = new CountSyncPolicy(1000);
+
+// rotate files when they reach 5MB
+FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f,
Units.MB);
+
+FileNameFormat fileNameFormat = new DefaultFileNameFormat()
+ .withPath("/foo/");
+
+HdfsBolt bolt = new HdfsBolt()
+ .withFsUrl("hdfs://localhost:54310")
+ .withFileNameFormat(fileNameFormat)
+ .withRecordFormat(format)
+ .withRotationPolicy(rotationPolicy)
+ .withSyncPolicy(syncPolicy);
+```
+
+### Packaging a Topology
+When packaging your topology, it's important that you use the
[maven-shade-plugin]() as opposed to the
+[maven-assempbly-plugin]().
+
+The shade plugin provides facilities for merging JAR manifest entries,
which the hadoop client leverages for URL scheme
+resolution.
+
+If you experience errors such as the following:
+
+```
+java.lang.RuntimeException: Error preparing HdfsBolt: No FileSystem for
scheme: hdfs
+```
+
+it's an indication that your topology jar file isn't packaged properly.
+
+If you are using maven to create your topology jar, you should use the
following `maven-shade-plugin` configuration to
+create your topology jar:
+
+```xml
+<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>1.4</version>
+ <configuration>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer
+
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer
+
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass></mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+</plugin>
+
+```
+
+### Specifying a Hadoop Version
+By default, storm-hdfs uses the following Hadoop dependencies:
+
+```xml
+<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>2.2.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+</dependency>
+<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>2.2.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+</dependency>
+```
+
+If you are using a different version of Hadoop, you should exclude the
Hadoop libraries from the storm-hdfs dependency
+and add the dependencies for your preferred version in your pom.
+
+Hadoop client version incompatibilites can manifest as errors like:
+
+```
+com.google.protobuf.InvalidProtocolBufferException: Protocol message
contained an invalid tag (zero)
+```
+
+## Customization
+
+### Record Formats
+Record format can be controlled by providing an implementation of the
`org.apache.storm.hdfs.format.RecordFormat`
+interface:
+
+```java
+public interface RecordFormat extends Serializable {
+ byte[] format(Tuple tuple);
+}
+```
+
+The provided `org.apache.storm.hdfs.format.DelimitedRecordFormat` is
capable of producing formats such as CSV and
+tab-delimited files.
+
+
+### File Naming
+File naming can be controlled by providing an implementation of the
`org.apache.storm.hdfs.format.FileNameFormat`
+interface:
+
+```java
+public interface FileNameFormat extends Serializable {
+ void prepare(Map conf, TopologyContext topologyContext);
+ String getName(long rotation, long timeStamp);
+ String getPath();
+}
+```
+
+The provided `org.apache.storm.hdfs.format.DefaultFileNameFormat` will
create file names with the following format:
+
+ {prefix}{componentId}-{taskId}-{rotationNum}-{timestamp}{extension}
+
+For example:
+
+ MyBolt-5-7-1390579837830.txt
+
+By default, prefix is empty and extenstion is ".txt".
+
+
+
+### Sync Policies
+Sync policies allow you to control when buffered data is flushed to the
underlying filesystem (thus making it available
+to clients reading the data) by implementing the
`org.apache.storm.hdfs.sync.SyncPolicy` interface:
+
+```java
+public interface SyncPolicy extends Serializable {
+ boolean mark(Tuple tuple, long offset);
+ void reset();
+}
+```
+The `HdfsBolt` will call the `mark()` method for every tuple it processes.
Returning `true` will trigger the `HdfsBolt`
+to perform a sync/flush, after which it will call the `reset()` method.
+
+The `org.apache.storm.hdfs.sync.CountSyncPolicy` class simply triggers a
sync after the specified number of tuples have
+been processed.
+
+### File Rotation Policies
+Similar to sync policies, file rotation policies allow you to control when
data files are rotated by providing a
+`org.apache.storm.hdfs.rotation.FileRotation` interface:
+
+```java
+public interface FileRotationPolicy extends Serializable {
+ boolean mark(Tuple tuple, long offset);
+ void reset();
+}
+```
+
+The `org.apache.storm.hdfs.rotation.FileSizeRotationPolicy` implementation
allows you to trigger file rotation when
+data files reach a specific file size:
+
+```java
+FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f,
Units.MB);
+```
+
+### File Rotation Actions
+Both the HDFS bolt and Trident State implementation allow you to register
any number of `RotationAction`s.
+What `RotationAction`s do is provide a hook to allow you to perform some
action right after a file is rotated. For
+example, moving a file to a different location or renaming it.
+
+
+```java
+public interface RotationAction extends Serializable {
+ void execute(FileSystem fileSystem, Path filePath) throws IOException;
+}
+```
+
+Storm-HDFS includes a simple action that will move a file after rotation:
+
+```java
+public class MoveFileAction implements RotationAction {
+ private static final Logger LOG =
LoggerFactory.getLogger(MoveFileAction.class);
+
+ private String destination;
+
+ public MoveFileAction withDestination(String destDir){
+ destination = destDir;
+ return this;
+ }
+
+ @Override
+ public void execute(FileSystem fileSystem, Path filePath) throws
IOException {
+ Path destPath = new Path(destination, filePath.getName());
+ LOG.info("Moving file {} to {}", filePath, destPath);
+ boolean success = fileSystem.rename(filePath, destPath);
+ return;
+ }
+}
+```
+
+If you are using Trident and sequence files you can do something like this:
+
+```java
+ HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions()
+ .withFileNameFormat(fileNameFormat)
+ .withSequenceFormat(new DefaultSequenceFormat("key",
"data"))
+ .withRotationPolicy(rotationPolicy)
+ .withFsUrl("hdfs://localhost:54310")
+ .addRotationAction(new
MoveFileAction().withDestination("/dest2/"));
+```
+
+
+## Support for HDFS Sequence Files
+
+The `org.apache.storm.hdfs.bolt.SequenceFileBolt` class allows you to
write storm data to HDFS sequence files:
+
+```java
+ // sync the filesystem after every 1k tuples
+ SyncPolicy syncPolicy = new CountSyncPolicy(1000);
+
+ // rotate files when they reach 5MB
+ FileRotationPolicy rotationPolicy = new
FileSizeRotationPolicy(5.0f, Units.MB);
+
+ FileNameFormat fileNameFormat = new DefaultFileNameFormat()
+ .withExtension(".seq")
+ .withPath("/data/");
+
+ // create sequence format instance.
+ DefaultSequenceFormat format = new
DefaultSequenceFormat("timestamp", "sentence");
+
+ SequenceFileBolt bolt = new SequenceFileBolt()
+ .withFsUrl("hdfs://localhost:54310")
+ .withFileNameFormat(fileNameFormat)
+ .withSequenceFormat(format)
+ .withRotationPolicy(rotationPolicy)
+ .withSyncPolicy(syncPolicy)
+ .withCompressionType(SequenceFile.CompressionType.RECORD)
+ .withCompressionCodec("deflate");
--- End diff --
The deflate codec just reminded me too, now that JNI support is available
in storm, it might be nice to include instructions about how to package
libhadoop.so so that storm/Hadoop can pick it up. It can be a huge speed
improvement over the pure java compression/CRC implementations.
Might even be worth it to do some of that on behalf of the user, but that
might require some coordination with Hadoop itself so that the native libraries
are packaged in a way that would allow us to pull them in.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---