Added: storm/site/releases/0.9.6/storm-hbase.md URL: http://svn.apache.org/viewvc/storm/site/releases/0.9.6/storm-hbase.md?rev=1738648&view=auto ============================================================================== --- storm/site/releases/0.9.6/storm-hbase.md (added) +++ storm/site/releases/0.9.6/storm-hbase.md Mon Apr 11 21:15:29 2016 @@ -0,0 +1,241 @@ +--- +title: Storm HBase Integration +layout: documentation +documentation: true +--- + +Storm/Trident integration for [Apache HBase](https://hbase.apache.org) + +## Usage +The main API for interacting with HBase is the `org.apache.storm.hbase.bolt.mapper.HBaseMapper` +interface: + +```java +public interface HBaseMapper extends Serializable { + byte[] rowKey(Tuple tuple); + + ColumnList columns(Tuple tuple); +} +``` + +The `rowKey()` method is straightforward: given a Storm tuple, return a byte array representing the +row key. + +The `columns()` method defines what will be written to an HBase row. The `ColumnList` class allows you +to add both standard HBase columns as well as HBase counter columns. + +To add a standard column, use one of the `addColumn()` methods: + +```java +ColumnList cols = new ColumnList(); +cols.addColumn(this.columnFamily, field.getBytes(), toBytes(tuple.getValueByField(field))); +``` + +To add a counter column, use one of the `addCounter()` methods: + +```java +ColumnList cols = new ColumnList(); +cols.addCounter(this.columnFamily, field.getBytes(), toLong(tuple.getValueByField(field))); +``` + +When the remote HBase is security enabled, a kerberos keytab and the corresponding principal name need to be +provided for the storm-hbase connector. Specifically, the Config object passed into the topology should contain +{(âstorm.keytab.fileâ, â$keytabâ), ("storm.kerberos.principal", â$principalâ)}. Example: + +```java +Config config = new Config(); +... +config.put("storm.keytab.file", "$keytab"); +config.put("storm.kerberos.principal", "$principle"); +StormSubmitter.submitTopology("$topologyName", config, builder.createTopology()); +``` + +##Working with Secure HBASE using delegation tokens. +If your topology is going to interact with secure HBase, your bolts/states needs to be authenticated by HBase. +The approach described above requires that all potential worker hosts have "storm.keytab.file" on them. If you have +multiple topologies on a cluster , each with different hbase user, you will have to create multiple keytabs and distribute +it to all workers. Instead of doing that you could use the following approach: + +Your administrator can configure nimbus to automatically get delegation tokens on behalf of the topology submitter user. +The nimbus need to start with following configurations: + +nimbus.autocredential.plugins.classes : ["org.apache.storm.hbase.security.AutoHBase"] +nimbus.credential.renewers.classes : ["org.apache.storm.hbase.security.AutoHBase"] +hbase.keytab.file: "/path/to/keytab/on/nimbus" (This is the keytab of hbase super user that can impersonate other users.) +hbase.kerberos.principal: "[email protected]" +nimbus.credential.renewers.freq.secs : 518400 (6 days, hbase tokens by default expire every 7 days and can not be renewed, +if you have custom settings for hbase.auth.token.max.lifetime in hbase-site.xml than you should ensure this value is +atleast 1 hour less then that.) + +Your topology configuration should have: +topology.auto-credentials :["org.apache.storm.hbase.security.AutoHBase"] + +If nimbus did not have the above configuration you need to add it and then restart it. Ensure the hbase configuration +files(core-site.xml,hdfs-site.xml and hbase-site.xml) and the storm-hbase jar with all the dependencies is present in nimbus's classpath. +Nimbus will use the keytab and principal specified in the config to authenticate with HBase. From then on for every +topology submission, nimbus will impersonate the topology submitter user and acquire delegation tokens on behalf of the +topology submitter user. If topology was started with topology.auto-credentials set to AutoHBase, nimbus will push the +delegation tokens to all the workers for your topology and the hbase bolt/state will authenticate with these tokens. + +As nimbus is impersonating topology submitter user, you need to ensure the user specified in storm.kerberos.principal +has permissions to acquire tokens on behalf of other users. To achieve this you need to follow configuration directions +listed on this link + +http://hbase.apache.org/book/security.html#security.rest.gateway + +You can read about setting up secure HBase here:http://hbase.apache.org/book/security.html. + +### SimpleHBaseMapper +`storm-hbase` includes a general purpose `HBaseMapper` implementation called `SimpleHBaseMapper` that can map Storm +tuples to both regular HBase columns as well as counter columns. + +To use `SimpleHBaseMapper`, you simply tell it which fields to map to which types of columns. + +The following code create a `SimpleHBaseMapper` instance that: + +1. Uses the `word` tuple value as a row key. +2. Adds a standard HBase column for the tuple field `word`. +3. Adds an HBase counter column for the tuple field `count`. +4. Writes values to the `cf` column family. + +```java +SimpleHBaseMapper mapper = new SimpleHBaseMapper() + .withRowKeyField("word") + .withColumnFields(new Fields("word")) + .withCounterFields(new Fields("count")) + .withColumnFamily("cf"); +``` +### HBaseBolt +To use the `HBaseBolt`, construct it with the name of the table to write to, an a `HBaseMapper` implementation: + + ```java +HBaseBolt hbase = new HBaseBolt("WordCount", mapper); + ``` + +The `HBaseBolt` will delegate to the `mapper` instance to figure out how to persist tuple data to HBase. + +###HBaseValueMapper +This class allows you to transform the HBase lookup result into storm Values that will be emitted by the `HBaseLookupBolt`. + +```java +public interface HBaseValueMapper extends Serializable { + public List<Values> toTuples(Result result) throws Exception; + void declareOutputFields(OutputFieldsDeclarer declarer); +} +``` + +The `toTuples` method takes in a HBase `Result` instance and expects a List of `Values` instant. +Each of the value returned by this function will be emitted by the `HBaseLookupBolt`. + +The `declareOutputFields` should be used to declare the outputFields of the `HBaseLookupBolt`. + +There is an example implementation in `src/test/java` directory. + +###HBaseProjectionCriteria +This class allows you to specify the projection criteria for your HBase Get function. This is optional parameter +for the lookupBolt and if you do not specify this instance all the columns will be returned by `HBaseLookupBolt`. + +```java +public class HBaseProjectionCriteria implements Serializable { + public HBaseProjectionCriteria addColumnFamily(String columnFamily); + public HBaseProjectionCriteria addColumn(ColumnMetaData column); +``` +`addColumnFamily` takes in columnFamily. Setting this parameter means all columns for this family will be included + in the projection. + +`addColumn` takes in a columnMetaData instance. Setting this parameter means only this column from the column familty + will be part of your projection. +The following code creates a projectionCriteria which specifies a projection criteria that: + +1. includes count column from column family cf. +2. includes all columns from column family cf2. + +```java +HBaseProjectionCriteria projectionCriteria = new HBaseProjectionCriteria() + .addColumn(new HBaseProjectionCriteria.ColumnMetaData("cf", "count")) + .addColumnFamily("cf2"); +``` + +###HBaseLookupBolt +To use the `HBaseLookupBolt`, Construct it with the name of the table to write to, an implementation of `HBaseMapper` +and an implementation of `HBaseRowToStormValueMapper`. You can optionally specify a `HBaseProjectionCriteria`. + +The `HBaseLookupBolt` will use the mapper to get rowKey to lookup for. It will use the `HBaseProjectionCriteria` to +figure out which columns to include in the result and it will leverage the `HBaseRowToStormValueMapper` to get the +values to be emitted by the bolt. + +You can look at an example topology LookupWordCount.java under `src/test/java`. +## Example: Persistent Word Count +A runnable example can be found in the `src/test/java` directory. + +### Setup +The following steps assume you are running HBase locally, or there is an `hbase-site.xml` on the +classpath pointing to your HBase cluster. + +Use the `hbase shell` command to create the schema: + +``` +> create 'WordCount', 'cf' +``` + +### Execution +Run the `org.apache.storm.hbase.topology.PersistenWordCount` class (it will run the topology for 10 seconds, then exit). + +After (or while) the word count topology is running, run the `org.apache.storm.hbase.topology.WordCountClient` class +to view the counter values stored in HBase. You should see something like to following: + +``` +Word: 'apple', Count: 6867 +Word: 'orange', Count: 6645 +Word: 'pineapple', Count: 6954 +Word: 'banana', Count: 6787 +Word: 'watermelon', Count: 6806 +``` + +For reference, the sample topology is listed below: + +```java +public class PersistentWordCount { + private static final String WORD_SPOUT = "WORD_SPOUT"; + private static final String COUNT_BOLT = "COUNT_BOLT"; + private static final String HBASE_BOLT = "HBASE_BOLT"; + + + public static void main(String[] args) throws Exception { + Config config = new Config(); + + WordSpout spout = new WordSpout(); + WordCounter bolt = new WordCounter(); + + SimpleHBaseMapper mapper = new SimpleHBaseMapper() + .withRowKeyField("word") + .withColumnFields(new Fields("word")) + .withCounterFields(new Fields("count")) + .withColumnFamily("cf"); + + HBaseBolt hbase = new HBaseBolt("WordCount", mapper); + + + // wordSpout ==> countBolt ==> HBaseBolt + TopologyBuilder builder = new TopologyBuilder(); + + builder.setSpout(WORD_SPOUT, spout, 1); + builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT); + builder.setBolt(HBASE_BOLT, hbase, 1).fieldsGrouping(COUNT_BOLT, new Fields("word")); + + + if (args.length == 0) { + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("test", config, builder.createTopology()); + Thread.sleep(10000); + cluster.killTopology("test"); + cluster.shutdown(); + System.exit(0); + } else { + config.setNumWorkers(3); + StormSubmitter.submitTopology(args[0], config, builder.createTopology()); + } + } +} +``` +
Added: storm/site/releases/0.9.6/storm-hdfs.md URL: http://svn.apache.org/viewvc/storm/site/releases/0.9.6/storm-hdfs.md?rev=1738648&view=auto ============================================================================== --- storm/site/releases/0.9.6/storm-hdfs.md (added) +++ storm/site/releases/0.9.6/storm-hdfs.md Mon Apr 11 21:15:29 2016 @@ -0,0 +1,368 @@ +--- +title: Storm HDFS Integration +layout: documentation +documentation: true +--- + +Storm components for interacting with HDFS file systems + + +## Usage +The following example will write pipe("|")-delimited files to the HDFS path hdfs://localhost:54310/foo. After every +1,000 tuples it will sync filesystem, making that data visible to other HDFS clients. It will rotate files when they +reach 5 megabytes in size. + +```java +// use "|" instead of "," for field delimiter +RecordFormat format = new DelimitedRecordFormat() + .withFieldDelimiter("|"); + +// sync the filesystem after every 1k tuples +SyncPolicy syncPolicy = new CountSyncPolicy(1000); + +// rotate files when they reach 5MB +FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB); + +FileNameFormat fileNameFormat = new DefaultFileNameFormat() + .withPath("/foo/"); + +HdfsBolt bolt = new HdfsBolt() + .withFsUrl("hdfs://localhost:54310") + .withFileNameFormat(fileNameFormat) + .withRecordFormat(format) + .withRotationPolicy(rotationPolicy) + .withSyncPolicy(syncPolicy); +``` + +### Packaging a Topology +When packaging your topology, it's important that you use the [maven-shade-plugin]() as opposed to the +[maven-assembly-plugin](). + +The shade plugin provides facilities for merging JAR manifest entries, which the hadoop client leverages for URL scheme +resolution. + +If you experience errors such as the following: + +``` +java.lang.RuntimeException: Error preparing HdfsBolt: No FileSystem for scheme: hdfs +``` + +it's an indication that your topology jar file isn't packaged properly. + +If you are using maven to create your topology jar, you should use the following `maven-shade-plugin` configuration to +create your topology jar: + +```xml +<plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>1.4</version> + <configuration> + <createDependencyReducedPom>true</createDependencyReducedPom> + </configuration> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <transformers> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass></mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> +</plugin> + +``` + +### Specifying a Hadoop Version +By default, storm-hdfs uses the following Hadoop dependencies: + +```xml +<dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <version>2.2.0</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> +</dependency> +<dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>2.2.0</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> +</dependency> +``` + +If you are using a different version of Hadoop, you should exclude the Hadoop libraries from the storm-hdfs dependency +and add the dependencies for your preferred version in your pom. + +Hadoop client version incompatibilites can manifest as errors like: + +``` +com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero) +``` + +## Customization + +### Record Formats +Record format can be controlled by providing an implementation of the `org.apache.storm.hdfs.format.RecordFormat` +interface: + +```java +public interface RecordFormat extends Serializable { + byte[] format(Tuple tuple); +} +``` + +The provided `org.apache.storm.hdfs.format.DelimitedRecordFormat` is capable of producing formats such as CSV and +tab-delimited files. + + +### File Naming +File naming can be controlled by providing an implementation of the `org.apache.storm.hdfs.format.FileNameFormat` +interface: + +```java +public interface FileNameFormat extends Serializable { + void prepare(Map conf, TopologyContext topologyContext); + String getName(long rotation, long timeStamp); + String getPath(); +} +``` + +The provided `org.apache.storm.hdfs.format.DefaultFileNameFormat` will create file names with the following format: + + {prefix}{componentId}-{taskId}-{rotationNum}-{timestamp}{extension} + +For example: + + MyBolt-5-7-1390579837830.txt + +By default, prefix is empty and extenstion is ".txt". + + + +### Sync Policies +Sync policies allow you to control when buffered data is flushed to the underlying filesystem (thus making it available +to clients reading the data) by implementing the `org.apache.storm.hdfs.sync.SyncPolicy` interface: + +```java +public interface SyncPolicy extends Serializable { + boolean mark(Tuple tuple, long offset); + void reset(); +} +``` +The `HdfsBolt` will call the `mark()` method for every tuple it processes. Returning `true` will trigger the `HdfsBolt` +to perform a sync/flush, after which it will call the `reset()` method. + +The `org.apache.storm.hdfs.sync.CountSyncPolicy` class simply triggers a sync after the specified number of tuples have +been processed. + +### File Rotation Policies +Similar to sync policies, file rotation policies allow you to control when data files are rotated by providing a +`org.apache.storm.hdfs.rotation.FileRotation` interface: + +```java +public interface FileRotationPolicy extends Serializable { + boolean mark(Tuple tuple, long offset); + void reset(); +} +``` + +The `org.apache.storm.hdfs.rotation.FileSizeRotationPolicy` implementation allows you to trigger file rotation when +data files reach a specific file size: + +```java +FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB); +``` + +### File Rotation Actions +Both the HDFS bolt and Trident State implementation allow you to register any number of `RotationAction`s. +What `RotationAction`s do is provide a hook to allow you to perform some action right after a file is rotated. For +example, moving a file to a different location or renaming it. + + +```java +public interface RotationAction extends Serializable { + void execute(FileSystem fileSystem, Path filePath) throws IOException; +} +``` + +Storm-HDFS includes a simple action that will move a file after rotation: + +```java +public class MoveFileAction implements RotationAction { + private static final Logger LOG = LoggerFactory.getLogger(MoveFileAction.class); + + private String destination; + + public MoveFileAction withDestination(String destDir){ + destination = destDir; + return this; + } + + @Override + public void execute(FileSystem fileSystem, Path filePath) throws IOException { + Path destPath = new Path(destination, filePath.getName()); + LOG.info("Moving file {} to {}", filePath, destPath); + boolean success = fileSystem.rename(filePath, destPath); + return; + } +} +``` + +If you are using Trident and sequence files you can do something like this: + +```java + HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions() + .withFileNameFormat(fileNameFormat) + .withSequenceFormat(new DefaultSequenceFormat("key", "data")) + .withRotationPolicy(rotationPolicy) + .withFsUrl("hdfs://localhost:54310") + .addRotationAction(new MoveFileAction().withDestination("/dest2/")); +``` + + +## Support for HDFS Sequence Files + +The `org.apache.storm.hdfs.bolt.SequenceFileBolt` class allows you to write storm data to HDFS sequence files: + +```java + // sync the filesystem after every 1k tuples + SyncPolicy syncPolicy = new CountSyncPolicy(1000); + + // rotate files when they reach 5MB + FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB); + + FileNameFormat fileNameFormat = new DefaultFileNameFormat() + .withExtension(".seq") + .withPath("/data/"); + + // create sequence format instance. + DefaultSequenceFormat format = new DefaultSequenceFormat("timestamp", "sentence"); + + SequenceFileBolt bolt = new SequenceFileBolt() + .withFsUrl("hdfs://localhost:54310") + .withFileNameFormat(fileNameFormat) + .withSequenceFormat(format) + .withRotationPolicy(rotationPolicy) + .withSyncPolicy(syncPolicy) + .withCompressionType(SequenceFile.CompressionType.RECORD) + .withCompressionCodec("deflate"); +``` + +The `SequenceFileBolt` requires that you provide a `org.apache.storm.hdfs.bolt.format.SequenceFormat` that maps tuples to +key/value pairs: + +```java +public interface SequenceFormat extends Serializable { + Class keyClass(); + Class valueClass(); + + Writable key(Tuple tuple); + Writable value(Tuple tuple); +} +``` + +## Trident API +storm-hdfs also includes a Trident `state` implementation for writing data to HDFS, with an API that closely mirrors +that of the bolts. + + ```java + Fields hdfsFields = new Fields("field1", "field2"); + + FileNameFormat fileNameFormat = new DefaultFileNameFormat() + .withPath("/trident") + .withPrefix("trident") + .withExtension(".txt"); + + RecordFormat recordFormat = new DelimitedRecordFormat() + .withFields(hdfsFields); + + FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB); + + HdfsState.Options options = new HdfsState.HdfsFileOptions() + .withFileNameFormat(fileNameFormat) + .withRecordFormat(recordFormat) + .withRotationPolicy(rotationPolicy) + .withFsUrl("hdfs://localhost:54310"); + + StateFactory factory = new HdfsStateFactory().withOptions(options); + + TridentState state = stream + .partitionPersist(factory, hdfsFields, new HdfsUpdater(), new Fields()); + ``` + + To use the sequence file `State` implementation, use the `HdfsState.SequenceFileOptions`: + + ```java + HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions() + .withFileNameFormat(fileNameFormat) + .withSequenceFormat(new DefaultSequenceFormat("key", "data")) + .withRotationPolicy(rotationPolicy) + .withFsUrl("hdfs://localhost:54310") + .addRotationAction(new MoveFileAction().toDestination("/dest2/")); +``` + +##Working with Secure HDFS +If your topology is going to interact with secure HDFS, your bolts/states needs to be authenticated by NameNode. We +currently have 2 options to support this: + +### Using HDFS delegation tokens +Your administrator can configure nimbus to automatically get delegation tokens on behalf of the topology submitter user. +The nimbus need to start with following configurations: + +nimbus.autocredential.plugins.classes : ["org.apache.storm.hdfs.common.security.AutoHDFS"] +nimbus.credential.renewers.classes : ["org.apache.storm.hdfs.common.security.AutoHDFS"] +hdfs.keytab.file: "/path/to/keytab/on/nimbus" (This is the keytab of hdfs super user that can impersonate other users.) +hdfs.kerberos.principal: "[email protected]" +nimbus.credential.renewers.freq.secs : 82800 (23 hours, hdfs tokens needs to be renewed every 24 hours so this value should be +less then 24 hours.) +topology.hdfs.uri:"hdfs://host:port" (This is an optional config, by default we will use value of "fs.defaultFS" property +specified in hadoop's core-site.xml) + +Your topology configuration should have: +topology.auto-credentials :["org.apache.storm.hdfs.common.security.AutoHDFS"] + +If nimbus did not have the above configuration you need to add it and then restart it. Ensure the hadoop configuration +files(core-site.xml and hdfs-site.xml) and the storm-hdfs jar with all the dependencies is present in nimbus's classpath. +Nimbus will use the keytab and principal specified in the config to authenticate with Namenode. From then on for every +topology submission, nimbus will impersonate the topology submitter user and acquire delegation tokens on behalf of the +topology submitter user. If topology was started with topology.auto-credentials set to AutoHDFS, nimbus will push the +delegation tokens to all the workers for your topology and the hdfs bolt/state will authenticate with namenode using +these tokens. + +As nimbus is impersonating topology submitter user, you need to ensure the user specified in hdfs.kerberos.principal +has permissions to acquire tokens on behalf of other users. To achieve this you need to follow configuration directions +listed on this link +http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html + +You can read about setting up secure HDFS here: http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SecureMode.html. + +### Using keytabs on all worker hosts +If you have distributed the keytab files for hdfs user on all potential worker hosts then you can use this method. You should specify a +hdfs config key using the method HdfsBolt/State.withconfigKey("somekey") and the value map of this key should have following 2 properties: + +hdfs.keytab.file: "/path/to/keytab/" +hdfs.kerberos.principal: "[email protected]" + +On worker hosts the bolt/trident-state code will use the keytab file with principal provided in the config to authenticate with +Namenode. This method is little dangerous as you need to ensure all workers have the keytab file at the same location and you need +to remember this as you bring up new hosts in the cluster. Added: storm/site/releases/0.9.6/storm-kafka.md URL: http://svn.apache.org/viewvc/storm/site/releases/0.9.6/storm-kafka.md?rev=1738648&view=auto ============================================================================== --- storm/site/releases/0.9.6/storm-kafka.md (added) +++ storm/site/releases/0.9.6/storm-kafka.md Mon Apr 11 21:15:29 2016 @@ -0,0 +1,287 @@ +--- +title: Storm Kafka Integration +layout: documentation +documentation: true +--- + +Provides core Storm and Trident spout implementations for consuming data from Apache Kafka 0.8.x. + +##Spouts +We support both Trident and core Storm spouts. For both spout implementations, we use a BrokerHost interface that +tracks Kafka broker host to partition mapping and kafkaConfig that controls some Kafka related parameters. + +###BrokerHosts +In order to initialize your Kafka spout/emitter you need to construct an instance of the marker interface BrokerHosts. +Currently, we support the following two implementations: + +####ZkHosts +ZkHosts is what you should use if you want to dynamically track Kafka broker to partition mapping. This class uses +Kafka's ZooKeeper entries to track brokerHost -> partition mapping. You can instantiate an object by calling +```java + public ZkHosts(String brokerZkStr, String brokerZkPath) + public ZkHosts(String brokerZkStr) +``` +Where brokerZkStr is just ip:port (e.g. localhost:2181). brokerZkPath is the root directory under which all the topics and +partition information is stored. By default this is /brokers which is what the default Kafka implementation uses. + +By default, the broker-partition mapping is refreshed every 60 seconds from ZooKeeper. If you want to change it, you +should set host.refreshFreqSecs to your chosen value. + +####StaticHosts +This is an alternative implementation where broker -> partition information is static. In order to construct an instance +of this class, you need to first construct an instance of GlobalPartitionInformation. + +```java + Broker brokerForPartition0 = new Broker("localhost");//localhost:9092 + Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly + Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string. + GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation(); + partitionInfo.addPartition(0, brokerForPartition0);//mapping from partition 0 to brokerForPartition0 + partitionInfo.addPartition(1, brokerForPartition1);//mapping from partition 1 to brokerForPartition1 + partitionInfo.addPartition(2, brokerForPartition2);//mapping from partition 2 to brokerForPartition2 + StaticHosts hosts = new StaticHosts(partitionInfo); +``` + +###KafkaConfig +The second thing needed for constructing a kafkaSpout is an instance of KafkaConfig. +```java + public KafkaConfig(BrokerHosts hosts, String topic) + public KafkaConfig(BrokerHosts hosts, String topic, String clientId) +``` + +The BrokerHosts can be any implementation of BrokerHosts interface as described above. The topic is name of Kafka topic. +The optional ClientId is used as a part of the ZooKeeper path where the spout's current consumption offset is stored. + +There are 2 extensions of KafkaConfig currently in use. + +Spoutconfig is an extension of KafkaConfig that supports additional fields with ZooKeeper connection info and for controlling +behavior specific to KafkaSpout. The Zkroot will be used as root to store your consumer's offset. The id should uniquely +identify your spout. +```java +public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id); +public SpoutConfig(BrokerHosts hosts, String topic, String id); +``` +In addition to these parameters, SpoutConfig contains the following fields that control how KafkaSpout behaves: +```java + // setting for how often to save the current Kafka offset to ZooKeeper + public long stateUpdateIntervalMs = 2000; + + // Exponential back-off retry settings. These are used when retrying messages after a bolt + // calls OutputCollector.fail(). + // Note: be sure to set backtype.storm.Config.MESSAGE_TIMEOUT_SECS appropriately to prevent + // resubmitting the message while still retrying. + public long retryInitialDelayMs = 0; + public double retryDelayMultiplier = 1.0; + public long retryDelayMaxMs = 60 * 1000; + + // if set to true, spout will set Kafka topic as the emitted Stream ID + public boolean topicAsStreamId = false; +``` +Core KafkaSpout only accepts an instance of SpoutConfig. + +TridentKafkaConfig is another extension of KafkaConfig. +TridentKafkaEmitter only accepts TridentKafkaConfig. + +The KafkaConfig class also has bunch of public variables that controls your application's behavior. Here are defaults: +```java + public int fetchSizeBytes = 1024 * 1024; + public int socketTimeoutMs = 10000; + public int fetchMaxWait = 10000; + public int bufferSizeBytes = 1024 * 1024; + public MultiScheme scheme = new RawMultiScheme(); + public boolean ignoreZkOffsets = false; + public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); + public long maxOffsetBehind = Long.MAX_VALUE; + public boolean useStartOffsetTimeIfOffsetOutOfRange = true; + public int metricsTimeBucketSizeInSecs = 60; +``` + +Most of them are self explanatory except MultiScheme. +###MultiScheme +MultiScheme is an interface that dictates how the byte[] consumed from Kafka gets transformed into a storm tuple. It +also controls the naming of your output field. + +```java + public Iterable<List<Object>> deserialize(byte[] ser); + public Fields getOutputFields(); +``` + +The default `RawMultiScheme` just takes the `byte[]` and returns a tuple with `byte[]` as is. The name of the +outputField is "bytes". There are alternative implementation like `SchemeAsMultiScheme` and +`KeyValueSchemeAsMultiScheme` which can convert the `byte[]` to `String`. + + +### Examples + +#### Core Spout + +```java +BrokerHosts hosts = new ZkHosts(zkConnString); +SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString()); +spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); +KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); +``` + +#### Trident Spout +```java +TridentTopology topology = new TridentTopology(); +BrokerHosts zk = new ZkHosts("localhost"); +TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test-topic"); +spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); +OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf); +``` + + +### How KafkaSpout stores offsets of a Kafka topic and recovers in case of failures + +As shown in the above KafkaConfig properties, you can control from where in the Kafka topic the spout begins to read by +setting `KafkaConfig.startOffsetTime` as follows: + +1. `kafka.api.OffsetRequest.EarliestTime()`: read from the beginning of the topic (i.e. from the oldest messages onwards) +2. `kafka.api.OffsetRequest.LatestTime()`: read from the end of the topic (i.e. any new messsages that are being written to the topic) +3. A Unix timestamp aka seconds since the epoch (e.g. via `System.currentTimeMillis()`): + see [How do I accurately get offsets of messages for a certain timestamp using OffsetRequest?](https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdoIaccuratelygetoffsetsofmessagesforacertaintimestampusingOffsetRequest?) in the Kafka FAQ + +As the topology runs the Kafka spout keeps track of the offsets it has read and emitted by storing state information +under the ZooKeeper path `SpoutConfig.zkRoot+ "/" + SpoutConfig.id`. In the case of failures it recovers from the last +written offset in ZooKeeper. + +> **Important:** When re-deploying a topology make sure that the settings for `SpoutConfig.zkRoot` and `SpoutConfig.id` +> were not modified, otherwise the spout will not be able to read its previous consumer state information (i.e. the +> offsets) from ZooKeeper -- which may lead to unexpected behavior and/or to data loss, depending on your use case. + +This means that when a topology has run once the setting `KafkaConfig.startOffsetTime` will not have an effect for +subsequent runs of the topology because now the topology will rely on the consumer state information (offsets) in +ZooKeeper to determine from where it should begin (more precisely: resume) reading. +If you want to force the spout to ignore any consumer state information stored in ZooKeeper, then you should +set the parameter `KafkaConfig.ignoreZkOffsets` to `true`. If `true`, the spout will always begin reading from the +offset defined by `KafkaConfig.startOffsetTime` as described above. + + +## Using storm-kafka with different versions of Scala + +Storm-kafka's Kafka dependency is defined as `provided` scope in maven, meaning it will not be pulled in +as a transitive dependency. This allows you to use a version of Kafka built against a specific Scala version. + +When building a project with storm-kafka, you must explicitly add the Kafka dependency. For example, to +use Kafka 0.8.1.1 built against Scala 2.10, you would use the following dependency in your `pom.xml`: + +```xml + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.10</artifactId> + <version>0.8.1.1</version> + <exclusions> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + </exclusions> + </dependency> +``` + +Note that the ZooKeeper and log4j dependencies are excluded to prevent version conflicts with Storm's dependencies. + +##Writing to Kafka as part of your topology +You can create an instance of storm.kafka.bolt.KafkaBolt and attach it as a component to your topology or if you +are using trident you can use storm.kafka.trident.TridentState, storm.kafka.trident.TridentStateFactory and +storm.kafka.trident.TridentKafkaUpdater. + +You need to provide implementation of following 2 interfaces + +###TupleToKafkaMapper and TridentTupleToKafkaMapper +These interfaces have 2 methods defined: + +```java + K getKeyFromTuple(Tuple/TridentTuple tuple); + V getMessageFromTuple(Tuple/TridentTuple tuple); +``` + +As the name suggests, these methods are called to map a tuple to Kafka key and Kafka message. If you just want one field +as key and one field as value, then you can use the provided FieldNameBasedTupleToKafkaMapper.java +implementation. In the KafkaBolt, the implementation always looks for a field with field name "key" and "message" if you +use the default constructor to construct FieldNameBasedTupleToKafkaMapper for backward compatibility +reasons. Alternatively you could also specify a different key and message field by using the non default constructor. +In the TridentKafkaState you must specify what is the field name for key and message as there is no default constructor. +These should be specified while constructing and instance of FieldNameBasedTupleToKafkaMapper. + +###KafkaTopicSelector and trident KafkaTopicSelector +This interface has only one method +```java +public interface KafkaTopicSelector { + String getTopics(Tuple/TridentTuple tuple); +} +``` +The implementation of this interface should return the topic to which the tuple's key/message mapping needs to be published +You can return a null and the message will be ignored. If you have one static topic name then you can use +DefaultTopicSelector.java and set the name of the topic in the constructor. + +### Specifying Kafka producer properties +You can provide all the produce properties , see http://kafka.apache.org/documentation.html#producerconfigs +section "Important configuration properties for the producer", in your Storm topology config by setting the properties +map with key kafka.broker.properties. + +###Putting it all together + +For the bolt : +```java + TopologyBuilder builder = new TopologyBuilder(); + + Fields fields = new Fields("key", "message"); + FixedBatchSpout spout = new FixedBatchSpout(fields, 4, + new Values("storm", "1"), + new Values("trident", "1"), + new Values("needs", "1"), + new Values("javadoc", "1") + ); + spout.setCycle(true); + builder.setSpout("spout", spout, 5); + KafkaBolt bolt = new KafkaBolt() + .withTopicSelector(new DefaultTopicSelector("test")) + .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper()); + builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout"); + + Config conf = new Config(); + //set producer properties. + Properties props = new Properties(); + props.put("metadata.broker.list", "localhost:9092"); + props.put("request.required.acks", "1"); + props.put("serializer.class", "kafka.serializer.StringEncoder"); + conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props); + + StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology()); +``` + +For Trident: + +```java + Fields fields = new Fields("word", "count"); + FixedBatchSpout spout = new FixedBatchSpout(fields, 4, + new Values("storm", "1"), + new Values("trident", "1"), + new Values("needs", "1"), + new Values("javadoc", "1") + ); + spout.setCycle(true); + + TridentTopology topology = new TridentTopology(); + Stream stream = topology.newStream("spout1", spout); + + TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory() + .withKafkaTopicSelector(new DefaultTopicSelector("test")) + .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count")); + stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields()); + + Config conf = new Config(); + //set producer properties. + Properties props = new Properties(); + props.put("metadata.broker.list", "localhost:9092"); + props.put("request.required.acks", "1"); + props.put("serializer.class", "kafka.serializer.StringEncoder"); + conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props); + StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build()); +```
