http://git-wip-us.apache.org/repos/asf/storm/blob/97f0558e/docs/Running-topologies-on-a-production-cluster.md ---------------------------------------------------------------------- diff --git a/docs/Running-topologies-on-a-production-cluster.md b/docs/Running-topologies-on-a-production-cluster.md index 248c929..76c43dd 100644 --- a/docs/Running-topologies-on-a-production-cluster.md +++ b/docs/Running-topologies-on-a-production-cluster.md @@ -1,5 +1,7 @@ --- +title: Running Topologies on a Production Cluster layout: documentation +documentation: true --- Running topologies on a production cluster is similar to running in [Local mode](Local-mode.html). Here are the steps: @@ -48,7 +50,7 @@ You can find out how to configure your `storm` client to talk to a Storm cluster There are a variety of configurations you can set per topology. A list of all the configurations you can set can be found [here](javadocs/backtype/storm/Config.html). The ones prefixed with "TOPOLOGY" can be overridden on a topology-specific basis (the other ones are cluster configurations and cannot be overridden). Here are some common ones that are set for a topology: 1. **Config.TOPOLOGY_WORKERS**: This sets the number of worker processes to use to execute the topology. For example, if you set this to 25, there will be 25 Java processes across the cluster executing all the tasks. If you had a combined 150 parallelism across all components in the topology, each worker process will have 6 tasks running within it as threads. -2. **Config.TOPOLOGY_ACKERS**: This sets the number of tasks that will track tuple trees and detect when a spout tuple has been fully processed. Ackers are an integral part of Storm's reliability model and you can read more about them on [Guaranteeing message processing](Guaranteeing-message-processing.html). +2. **Config.TOPOLOGY_ACKER_EXECUTORS**: This sets the number of executors that will track tuple trees and detect when a spout tuple has been fully processed. Ackers are an integral part of Storm's reliability model and you can read more about them on [Guaranteeing message processing](Guaranteeing-message-processing.html). By not setting this variable or setting it as null, Storm will set the number of acker executors to be equal to the number of workers configured for this topology. If this variable is set to 0, then Storm will immediately ack tuples as soon as they come off the spout, effectively disabling reliability. 3. **Config.TOPOLOGY_MAX_SPOUT_PENDING**: This sets the maximum number of spout tuples that can be pending on a single spout task at once (pending means the tuple has not been acked or failed yet). It is highly recommended you set this config to prevent queue explosion. 4. **Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS**: This is the maximum amount of time a spout tuple has to be fully completed before it is considered failed. This value defaults to 30 seconds, which is sufficient for most topologies. See [Guaranteeing message processing](Guaranteeing-message-processing.html) for more information on how Storm's reliability model works. 5. **Config.TOPOLOGY_SERIALIZATIONS**: You can register more serializers to Storm using this config so that you can use custom types within tuples.
http://git-wip-us.apache.org/repos/asf/storm/blob/97f0558e/docs/SECURITY.md ---------------------------------------------------------------------- diff --git a/docs/SECURITY.md b/docs/SECURITY.md index 495061a..353cb86 100644 --- a/docs/SECURITY.md +++ b/docs/SECURITY.md @@ -3,19 +3,23 @@ title: Running Apache Storm Securely layout: documentation documentation: true --- + # Running Apache Storm Securely -The current release of Apache Storm offers no authentication or authorization. -It does not encrypt any data being sent across the network, and does not -attempt to restrict access to data stored on the local file system or in -Apache Zookeeper. As such there are a number of different precautions you may -want to enact outside of storm itself to be sure storm is running securely. +Apache Storm offers a range of configuration options when trying to secure +your cluster. By default all authentication and authorization is disabled but +can be turned on as needed. + +## Firewall/OS level Security + +You can still have a secure storm cluster without turning on formal +Authentication and Authorization. But to do so usually requires +configuring your Operating System to restrict the operations that can be done. +This is generally a good idea even if you plan on running your cluster with Auth. The exact detail of how to setup these precautions varies a lot and is beyond the scope of this document. -## Network Security - It is generally a good idea to enable a firewall and restrict incoming network connections to only those originating from the cluster itself and from trusted hosts and services, a complete list of ports storm uses are below. @@ -33,47 +37,442 @@ IPsec to encrypt all traffic being sent between the hosts in the cluster. | 8000 | `logviewer.port` | Client Web Browsers | Logviewer | | 3772 | `drpc.port` | External DRPC Clients | DRPC | | 3773 | `drpc.invocations.port` | Worker Processes | DRPC | +| 3774 | `drpc.http.port` | External HTTP DRPC Clients | DRPC | | 670{0,1,2,3} | `supervisor.slots.ports` | Worker Processes | Worker Processes | + ### UI/Logviewer The UI and logviewer processes provide a way to not only see what a cluster is doing, but also manipulate running topologies. In general these processes should -not be exposed except to users of the cluster. It is often simplest to restrict -these ports to only accept connections from local hosts, and then front them with another web server, -like Apache httpd, that can authenticate/authorize incoming connections and +not be exposed except to users of the cluster. + +Some form of Authentication is typically required, with using java servlet filters + +```yaml +ui.filter: "filter.class" +ui.filter.params: "param1":"value1" +``` +or by restricting the UI/log viewers ports to only accept connections from local +hosts, and then front them with another web server, like Apache httpd, that can +authenticate/authorize incoming connections and proxy the connection to the storm process. To make this work the ui process must have logviewer.port set to the port of the proxy in its storm.yaml, while the logviewers must have it set to the actual port that they are going to bind to. -### Nimbus +The servlet filters are preferred because it allows individual topologies to +specificy who is and who is not allowed to access the pages associated with +them. + +Storm UI can be configured to use AuthenticationFilter from hadoop-auth. +```yaml +ui.filter: "org.apache.hadoop.security.authentication.server.AuthenticationFilter" +ui.filter.params: + "type": "kerberos" + "kerberos.principal": "HTTP/nimbus.witzend.com" + "kerberos.keytab": "/vagrant/keytabs/http.keytab" + "kerberos.name.rules": "RULE:[2:$1@$0]([jt]t@.*EXAMPLE.COM)s/.*/$MAPRED_USER/ RULE:[2:$1@$0]([nd]n@.*EXAMPLE.COM)s/.*/$HDFS_USER/DEFAULT" +``` +make sure to create a principal 'HTTP/{hostname}' (here hostname should be the one where UI daemon runs + +Once configured users needs to do kinit before accessing UI. +Ex: +curl -i --negotiate -u:anyUser -b ~/cookiejar.txt -c ~/cookiejar.txt http://storm-ui-hostname:8080/api/v1/cluster/summary + +1. Firefox: Goto about:config and search for network.negotiate-auth.trusted-uris double-click to add value "http://storm-ui-hostname:8080" +2. Google-chrome: start from command line with: google-chrome --auth-server-whitelist="*storm-ui-hostname" --auth-negotiate-delegate-whitelist="*storm-ui-hostname" +3. IE: Configure trusted websites to include "storm-ui-hostname" and allow negotiation for that website + +**Caution**: In AD MIT Keberos setup the key size is bigger than the default UI jetty server request header size. Make sure you set ui.header.buffer.bytes to 65536 in storm.yaml. More details are on [STORM-633](https://issues.apache.org/jira/browse/STORM-633) + + +## UI / DRPC SSL + +Both UI and DRPC allows users to configure ssl . + +### UI + +For UI users needs to set following config in storm.yaml. Generating keystores with proper keys and certs should be taken care by the user before this step. + +1. ui.https.port +2. ui.https.keystore.type (example "jks") +3. ui.https.keystore.path (example "/etc/ssl/storm_keystore.jks") +4. ui.https.keystore.password (keystore password) +5. ui.https.key.password (private key password) + +optional config +6. ui.https.truststore.path (example "/etc/ssl/storm_truststore.jks") +7. ui.https.truststore.password (truststore password) +8. ui.https.truststore.type (example "jks") + +If users want to setup 2-way auth +9. ui.https.want.client.auth (If this set to true server requests for client certifcate authentication, but keeps the connection if no authentication provided) +10. ui.https.need.client.auth (If this set to true server requires client to provide authentication) + + -Nimbus's Thrift port should be locked down as it can be used to control the entire -cluster including running arbitrary user code on different nodes in the cluster. -Ideally access to it is restricted to nodes within the cluster and possibly some gateway -nodes that allow authorized users to log into them and run storm client commands. ### DRPC +similarly to UI , users need to configure following for DRPC + +1. drpc.https.port +2. drpc.https.keystore.type (example "jks") +3. drpc.https.keystore.path (example "/etc/ssl/storm_keystore.jks") +4. drpc.https.keystore.password (keystore password) +5. drpc.https.key.password (private key password) + +optional config +6. drpc.https.truststore.path (example "/etc/ssl/storm_truststore.jks") +7. drpc.https.truststore.password (truststore password) +8. drpc.https.truststore.type (example "jks") + +If users want to setup 2-way auth +9. drpc.https.want.client.auth (If this set to true server requests for client certifcate authentication, but keeps the connection if no authentication provided) +10. drpc.https.need.client.auth (If this set to true server requires client to provide authentication) + + + + + +## Authentication (Kerberos) + +Storm offers pluggable authentication support through thrift and SASL. This +example only goes off of Kerberos as it is a common setup for most big data +projects. + +Setting up a KDC and configuring kerberos on each node is beyond the scope of +this document and it is assumed that you have done that already. + +### Create Headless Principals and keytabs + +Each Zookeeper Server, Nimbus, and DRPC server will need a service principal, which, by convention, includes the FQDN of the host it will run on. Be aware that the zookeeper user *MUST* be zookeeper. +The supervisors and UI also need a principal to run as, but because they are outgoing connections they do not need to be service principals. +The following is an example of how to setup kerberos principals, but the +details may vary depending on your KDC and OS. + + +```bash +# Zookeeper (Will need one of these for each box in teh Zk ensamble) +sudo kadmin.local -q 'addprinc zookeeper/[email protected]' +sudo kadmin.local -q "ktadd -k /tmp/zk.keytab zookeeper/[email protected]" +# Nimbus and DRPC +sudo kadmin.local -q 'addprinc storm/[email protected]' +sudo kadmin.local -q "ktadd -k /tmp/storm.keytab storm/[email protected]" +# All UI logviewer and Supervisors +sudo kadmin.local -q 'addprinc [email protected]' +sudo kadmin.local -q "ktadd -k /tmp/storm.keytab [email protected]" +``` + +be sure to distribute the keytab(s) to the appropriate boxes and set the FS permissions so that only the headless user running ZK, or storm has access to them. + +#### Storm Kerberos Configuration + +Both storm and Zookeeper use jaas configuration files to log the user in. +Each jaas file may have multiple sections for different interfaces being used. + +To enable Kerberos authentication in storm you need to set the following storm.yaml configs +```yaml +storm.thrift.transport: "backtype.storm.security.auth.kerberos.KerberosSaslTransportPlugin" +java.security.auth.login.config: "/path/to/jaas.conf" +``` + +Nimbus and the supervisor processes will also connect to ZooKeeper(ZK) and we want to configure them to use Kerberos for authentication with ZK. To do this append +``` +-Djava.security.auth.login.config=/path/to/jaas.conf +``` + +to the childopts of nimbus, ui, and supervisor. Here is an example given the default childopts settings at the time of writing: + +```yaml +nimbus.childopts: "-Xmx1024m -Djava.security.auth.login.config=/path/to/jaas.conf" +ui.childopts: "-Xmx768m -Djava.security.auth.login.config=/path/to/jaas.conf" +supervisor.childopts: "-Xmx256m -Djava.security.auth.login.config=/path/to/jaas.conf" +``` + +The jaas.conf file should look something like the following for the storm nodes. +The StormServer section is used by nimbus and the DRPC Nodes. It does not need to be included on supervisor nodes. +The StormClient section is used by all storm clients that want to talk to nimbus, including the ui, logviewer, and supervisor. We will use this section on the gateways as well but the structure of that will be a bit different. +The Client section is used by processes wanting to talk to zookeeper and really only needs to be included with nimbus and the supervisors. +The Server section is used by the zookeeper servers. +Having unused sections in the jaas is not a problem. + +``` +StormServer { + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + keyTab="$keytab" + storeKey=true + useTicketCache=false + principal="$principal"; +}; +StormClient { + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + keyTab="$keytab" + storeKey=true + useTicketCache=false + serviceName="$nimbus_user" + principal="$principal"; +}; +Client { + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + keyTab="$keytab" + storeKey=true + useTicketCache=false + serviceName="zookeeper" + principal="$principal"; +}; +Server { + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + keyTab="$keytab" + storeKey=true + useTicketCache=false + principal="$principal"; +}; +``` + +The following is an example based off of the keytabs generated +``` +StormServer { + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + keyTab="/keytabs/storm.keytab" + storeKey=true + useTicketCache=false + principal="storm/[email protected]"; +}; +StormClient { + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + keyTab="/keytabs/storm.keytab" + storeKey=true + useTicketCache=false + serviceName="storm" + principal="[email protected]"; +}; +Client { + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + keyTab="/keytabs/storm.keytab" + storeKey=true + useTicketCache=false + serviceName="zookeeper" + principal="[email protected]"; +}; +Server { + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + keyTab="/keytabs/zk.keytab" + storeKey=true + useTicketCache=false + serviceName="zookeeper" + principal="zookeeper/[email protected]"; +}; +``` + +Nimbus also will translate the principal into a local user name, so that other services can use this name. To configure this for Kerberos authentication set + +``` +storm.principal.tolocal: "backtype.storm.security.auth.KerberosPrincipalToLocal" +``` -Each DRPC server has two different ports. The invocations port is accessed by worker -processes within the cluster. The other port is accessed by external clients that -want to query the topology. The external port should be restricted to hosts that you -want to be able to do queries. +This only needs to be done on nimbus, but it will not hurt on any node. +We also need to inform the topology who the supervisor daemon and the nimbus daemon are running as from a ZooKeeper perspective. -### Supervisors +``` +storm.zookeeper.superACL: "sasl:${nimbus-user}" +``` -Supervisors are only clients they are not servers, and as such don't need special restrictions. +Here *nimbus-user* is the Kerberos user that nimbus uses to authenticate with ZooKeeper. If ZooKeeeper is stripping host and realm then this needs to have host and realm stripped too. -### Workers +#### ZooKeeper Ensemble -Worker processes receive data from each other. There is the option to encrypt this data using -Blowfish by setting `topology.tuple.serializer` to `backtype.storm.security.serialization.BlowfishTupleSerializer` -and setting `topology.tuple.serializer.blowfish.key` to a secret key you want your topology to use. +Complete details of how to setup a secure ZK are beyond the scope of this document. But in general you want to enable SASL authentication on each server, and optionally strip off host and realm -### Zookeeper +``` +authProvider.1 = org.apache.zookeeper.server.auth.SASLAuthenticationProvider +kerberos.removeHostFromPrincipal = true +kerberos.removeRealmFromPrincipal = true +``` + +And you want to include the jaas.conf on the command line when launching the server so it can use it can find the keytab. +``` +-Djava.security.auth.login.config=/jaas/zk_jaas.conf +``` + +#### Gateways + +Ideally the end user will only need to run kinit before interacting with storm. To make this happen seamlessly we need the default jaas.conf on the gateways to be something like + +``` +StormClient { + com.sun.security.auth.module.Krb5LoginModule required + doNotPrompt=false + useTicketCache=true + serviceName="$nimbus_user"; +}; +``` + +The end user can override this if they have a headless user that has a keytab. + +### Authorization Setup + +*Authentication* does the job of verifying who the user is, but we also need *authorization* to do the job of enforcing what each user can do. + +The preferred authorization plug-in for nimbus is The *SimpleACLAuthorizer*. To use the *SimpleACLAuthorizer*, set the following: + +```yaml +nimbus.authorizer: "backtype.storm.security.auth.authorizer.SimpleACLAuthorizer" +``` + +DRPC has a separate authorizer configuration for it. Do not use SimpleACLAuthorizer for DRPC. + +The *SimpleACLAuthorizer* plug-in needs to know who the supervisor users are, and it needs to know about all of the administrator users, including the user running the ui daemon. + +These are set through *nimbus.supervisor.users* and *nimbus.admins* respectively. Each can either be a full Kerberos principal name, or the name of the user with host and realm stripped off. + +The Log servers have their own authorization configurations. These are set through *logs.users* and *logs.groups*. These should be set to the admin users or groups for all of the nodes in the cluster. + +When a topology is submitted, the submitting user can specify users in this list as well. The users and groups specified-in addition to the users in the cluster-wide setting-will be granted access to the submitted topology's worker logs in the logviewers. + +### Supervisors headless User and group Setup + +To ensure isolation of users in multi-tenancy, there is need to run supervisors and headless user and group unique to execution on the supervisor nodes. To enable this follow below steps. +1. Add headlessuser to all supervisor hosts. +2. Create unique group and make it the primary group for the headless user on the supervisor nodes. +3. The set following properties on storm for these supervisor nodes. + +### Multi-tenant Scheduler + +To support multi-tenancy better we have written a new scheduler. To enable this scheduler set. +```yaml +storm.scheduler: "backtype.storm.scheduler.multitenant.MultitenantScheduler" +``` +Be aware that many of the features of this scheduler rely on storm authentication. Without them the scheduler will not know what the user is and will not isolate topologies properly. + +The goal of the multi-tenant scheduler is to provide a way to isolate topologies from one another, but to also limit the resources that an individual user can have in the cluster. + +The scheduler currently has one config that can be set either through =storm.yaml= or through a separate config file called =multitenant-scheduler.yaml= that should be placed in the same directory as =storm.yaml=. It is preferable to use =multitenant-scheduler.yaml= because it can be updated without needing to restart nimbus. + +There is currently only one config in =multitenant-scheduler.yaml=, =multitenant.scheduler.user.pools= is a map from the user name, to the maximum number of nodes that user is guaranteed to be able to use for their topologies. + +For example: + +```yaml +multitenant.scheduler.user.pools: + "evans": 10 + "derek": 10 +``` + +### Run worker processes as user who submitted the topology +By default storm runs workers as the user that is running the supervisor. This is not ideal for security. To make storm run the topologies as the user that launched them set. + +```yaml +supervisor.run.worker.as.user: true +``` + +There are several files that go along with this that are needed to be configured properly to make storm secure. + +The worker-launcher executable is a special program that allows the supervisor to launch workers as different users. For this to work it needs to be owned by root, but with the group set to be a group that only teh supervisor headless user is a part of. +It also needs to have 6550 permissions. +There is also a worker-launcher.cfg file, usually located under /etc/ that should look something like the following + +``` +storm.worker-launcher.group=$(worker_launcher_group) +min.user.id=$(min_user_id) +``` +where worker_launcher_group is the same group the supervisor is a part of, and min.user.id is set to the first real user id on the system. +This config file also needs to be owned by root and not have world or group write permissions. + +### Impersonating a user +A storm client may submit requests on behalf of another user. For example, if a `userX` submits an oozie workflow and as part of workflow execution if user `oozie` wants to submit a topology on behalf of `userX` +it can do so by leveraging the impersonation feature.In order to submit topology as some other user , you can use `StormSubmitter.submitTopologyAs` API. Alternatively you can use `NimbusClient.getConfiguredClientAs` +to get a nimbus client as some other user and perform any nimbus action(i.e. kill/rebalance/activate/deactivate) using this client. + +To ensure only authorized users can perform impersonation you should start nimbus with `nimbus.impersonation.authorizer` set to `backtype.storm.security.auth.authorizer.ImpersonationAuthorizer`. +The `ImpersonationAuthorizer` uses `nimbus.impersonation.acl` as the acl to authorize users. Following is a sample nimbus config for supporting impersonation: + +```yaml +nimbus.impersonation.authorizer: backtype.storm.security.auth.authorizer.ImpersonationAuthorizer +nimbus.impersonation.acl: + impersonating_user1: + hosts: + [comma separated list of hosts from which impersonating_user1 is allowed to impersonate other users] + groups: + [comma separated list of groups whose users impersonating_user1 is allowed to impersonate] + impersonating_user2: + hosts: + [comma separated list of hosts from which impersonating_user2 is allowed to impersonate other users] + groups: + [comma separated list of groups whose users impersonating_user2 is allowed to impersonate] +``` + +To support the oozie use case following config can be supplied: +```yaml +nimbus.impersonation.acl: + oozie: + hosts: + [oozie-host1, oozie-host2, 127.0.0.1] + groups: + [some-group-that-userX-is-part-of] +``` + +### Automatic Credentials Push and Renewal +Individual topologies have the ability to push credentials (tickets and tokens) to workers so that they can access secure services. Exposing this to all of the users can be a pain for them. +To hide this from them in the common case plugins can be used to populate the credentials, unpack them on the other side into a java Subject, and also allow Nimbus to renew the credentials if needed. +These are controlled by the following configs. topology.auto-credentials is a list of java plugins, all of which must implement IAutoCredentials interface, that populate the credentials on gateway +and unpack them on the worker side. On a kerberos secure cluster they should be set by default to point to backtype.storm.security.auth.kerberos.AutoTGT. +nimbus.credential.renewers.classes should also be set to this value so that nimbus can periodically renew the TGT on behalf of the user. + +nimbus.credential.renewers.freq.secs controls how often the renewer will poll to see if anything needs to be renewed, but the default should be fine. + +In addition Nimbus itself can be used to get credentials on behalf of the user submitting topologies. This can be configures using nimbus.autocredential.plugins.classes which is a list +of fully qualified class names ,all of which must implement INimbusCredentialPlugin. Nimbus will invoke the populateCredentials method of all the configured implementation as part of topology +submission. You should use this config with topology.auto-credentials and nimbus.credential.renewers.classes so the credentials can be populated on worker side and nimbus can automatically renew +them. Currently there are 2 examples of using this config, AutoHDFS and AutoHBase which auto populates hdfs and hbase delegation tokens for topology submitter so they don't have to distribute keytabs +on all possible worker hosts. + +### Limits +By default storm allows any sized topology to be submitted. But ZK and others have limitations on how big a topology can actually be. The following configs allow you to limit the maximum size a topology can be. + +| YAML Setting | Description | +|------------|----------------------| +| nimbus.slots.perTopology | The maximum number of slots/workers a topology can use. | +| nimbus.executors.perTopology | The maximum number of executors/threads a topology can use. | + +### Log Cleanup +The Logviewer daemon now is also responsible for cleaning up old log files for dead topologies. + +| YAML Setting | Description | +|--------------|-------------------------------------| +| logviewer.cleanup.age.mins | How old (by last modification time) must a worker's log be before that log is considered for clean-up. (Living workers' logs are never cleaned up by the logviewer: Their logs are rolled via logback.) | +| logviewer.cleanup.interval.secs | Interval of time in seconds that the logviewer cleans up worker logs. | + + +### Allowing specific users or groups to access storm + + With SimpleACLAuthorizer any user with valid kerberos ticket can deploy a topology or do further operations such as activate, deactivate , access cluster information. + One can restrict this access by specifying nimbus.users or nimbus.groups. If nimbus.users configured only the users in the list can deploy a topology or access cluster. + Similarly nimbus.groups restrict storm cluster access to users who belong to those groups. + + To configure specify the following config in storm.yaml + +```yaml +nimbus.users: + - "testuser" +``` + +or + +```yaml +nimbus.groups: + - "storm" +``` + + +### DRPC +Hopefully more on this soon -Zookeeper uses other ports for communications within the ensemble the details of which -are beyond the scope of this document. You should look at restricting Zookeeper access -as well, because storm does not set up any ACLs for the data it write to Zookeeper. - http://git-wip-us.apache.org/repos/asf/storm/blob/97f0558e/docs/STORM-UI-REST-API.md ---------------------------------------------------------------------- diff --git a/docs/STORM-UI-REST-API.md b/docs/STORM-UI-REST-API.md index 2109ab2..26dbf07 100644 --- a/docs/STORM-UI-REST-API.md +++ b/docs/STORM-UI-REST-API.md @@ -1,5 +1,5 @@ --- -title: Storm UI REST API +title: Storm REST API layout: documentation documentation: true --- http://git-wip-us.apache.org/repos/asf/storm/blob/97f0558e/docs/Serialization.md ---------------------------------------------------------------------- diff --git a/docs/Serialization.md b/docs/Serialization.md index 4c271b4..ac8efe1 100644 --- a/docs/Serialization.md +++ b/docs/Serialization.md @@ -1,11 +1,13 @@ --- +title: Serialization layout: documentation +documentation: true --- This page is about how the serialization system in Storm works for versions 0.6.0 and onwards. Storm used a different serialization system prior to 0.6.0 which is documented on [Serialization (prior to 0.6.0)](Serialization-\(prior-to-0.6.0\).html). Tuples can be comprised of objects of any types. Since Storm is a distributed system, it needs to know how to serialize and deserialize objects when they're passed between tasks. -Storm uses [Kryo](http://code.google.com/p/kryo/) for serialization. Kryo is a flexible and fast serialization library that produces small serializations. +Storm uses [Kryo](https://github.com/EsotericSoftware/kryo) for serialization. Kryo is a flexible and fast serialization library that produces small serializations. By default, Storm can serialize primitive types, strings, byte arrays, ArrayList, HashMap, HashSet, and the Clojure collection types. If you want to use another type in your tuples, you'll need to register a custom serializer. @@ -21,12 +23,12 @@ Finally, another reason for using dynamic typing is so Storm can be used in a st ### Custom serialization -As mentioned, Storm uses Kryo for serialization. To implement custom serializers, you need to register new serializers with Kryo. It's highly recommended that you read over [Kryo's home page](http://code.google.com/p/kryo/) to understand how it handles custom serialization. +As mentioned, Storm uses Kryo for serialization. To implement custom serializers, you need to register new serializers with Kryo. It's highly recommended that you read over [Kryo's home page](https://github.com/EsotericSoftware/kryo) to understand how it handles custom serialization. Adding custom serializers is done through the "topology.kryo.register" property in your topology config. It takes a list of registrations, where each registration can take one of two forms: 1. The name of a class to register. In this case, Storm will use Kryo's `FieldsSerializer` to serialize the class. This may or may not be optimal for the class -- see the Kryo docs for more details. -2. A map from the name of a class to register to an implementation of [com.esotericsoftware.kryo.Serializer](http://code.google.com/p/kryo/source/browse/trunk/src/com/esotericsoftware/kryo/Serializer.java). +2. A map from the name of a class to register to an implementation of [com.esotericsoftware.kryo.Serializer](https://github.com/EsotericSoftware/kryo/blob/master/src/com/esotericsoftware/kryo/Serializer.java). Let's look at an example. http://git-wip-us.apache.org/repos/asf/storm/blob/97f0558e/docs/Setting-up-a-Storm-cluster.md ---------------------------------------------------------------------- diff --git a/docs/Setting-up-a-Storm-cluster.md b/docs/Setting-up-a-Storm-cluster.md index e139523..2b58703 100644 --- a/docs/Setting-up-a-Storm-cluster.md +++ b/docs/Setting-up-a-Storm-cluster.md @@ -1,5 +1,7 @@ --- +title: Setting up a Storm Cluster layout: documentation +documentation: true --- This page outlines the steps for getting a Storm cluster up and running. If you're on AWS, you should check out the [storm-deploy](https://github.com/nathanmarz/storm-deploy/wiki) project. [storm-deploy](https://github.com/nathanmarz/storm-deploy/wiki) completely automates the provisioning, configuration, and installation of Storm clusters on EC2. It also sets up Ganglia for you so you can monitor CPU, disk, and network usage. @@ -34,11 +36,11 @@ These are the versions of the dependencies that have been tested with Storm. Sto ### Download and extract a Storm release to Nimbus and worker machines -Next, download a Storm release and extract the zip file somewhere on Nimbus and each of the worker machines. The Storm releases can be downloaded [from here](http://github.com/apache/incubator-storm/downloads). +Next, download a Storm release and extract the zip file somewhere on Nimbus and each of the worker machines. The Storm releases can be downloaded [from here](http://github.com/apache/storm/releases). ### Fill in mandatory configurations into storm.yaml -The Storm release contains a file at `conf/storm.yaml` that configures the Storm daemons. You can see the default configuration values [here](https://github.com/apache/incubator-storm/blob/master/conf/defaults.yaml). storm.yaml overrides anything in defaults.yaml. There's a few configurations that are mandatory to get a working cluster: +The Storm release contains a file at `conf/storm.yaml` that configures the Storm daemons. You can see the default configuration values [here]({{page.git-blob-base}}/conf/defaults.yaml). storm.yaml overrides anything in defaults.yaml. There's a few configurations that are mandatory to get a working cluster: 1) **storm.zookeeper.servers**: This is a list of the hosts in the Zookeeper cluster for your Storm cluster. It should look something like: @@ -50,17 +52,25 @@ storm.zookeeper.servers: If the port that your Zookeeper cluster uses is different than the default, you should set **storm.zookeeper.port** as well. -2) **storm.local.dir**: The Nimbus and Supervisor daemons require a directory on the local disk to store small amounts of state (like jars, confs, and things like that). You should create that directory on each machine, give it proper permissions, and then fill in the directory location using this config. For example: +2) **storm.local.dir**: The Nimbus and Supervisor daemons require a directory on the local disk to store small amounts of state (like jars, confs, and things like that). + You should create that directory on each machine, give it proper permissions, and then fill in the directory location using this config. For example: ```yaml storm.local.dir: "/mnt/storm" ``` +If you run storm on windows,it could be: +```yaml +storm.local.dir: "C:\\storm-local" +``` +If you use a relative path,it will be relative to where you installed storm(STORM_HOME). +You can leave it empty with default value `$STORM_HOME/storm-local` -3) **nimbus.host**: The worker nodes need to know which machine is the master in order to download topology jars and confs. For example: +3) **nimbus.seeds**: The worker nodes need to know which machines are the candidate of master in order to download topology jars and confs. For example: ```yaml -nimbus.host: "111.222.333.44" +nimbus.seeds: ["111.222.333.44"] ``` +You're encouraged to fill out the value to list of **machine's FQDN**. If you want to set up Nimbus H/A, you have to address all machines' FQDN which run nimbus. You may want to leave it to default value when you just want to set up 'pseudo-distributed' cluster, but you're still encouraged to fill out FQDN. 4) **supervisor.slots.ports**: For each worker machine, you configure how many workers run on that machine with this config. Each worker uses a single port for receiving messages, and this setting defines which ports are open for use. If you define five ports here, then Storm will allocate up to five workers to run on this machine. If you define three ports, Storm will only run up to three. By default, this setting is configured to run 4 workers on the ports 6700, 6701, 6702, and 6703. For example: @@ -72,12 +82,36 @@ supervisor.slots.ports: - 6703 ``` +### Monitoring Health of Supervisors + +Storm provides a mechanism by which administrators can configure the supervisor to run administrator supplied scripts periodically to determine if a node is healthy or not. Administrators can have the supervisor determine if the node is in a healthy state by performing any checks of their choice in scripts located in storm.health.check.dir. If a script detects the node to be in an unhealthy state, it must print a line to standard output beginning with the string ERROR. The supervisor will periodically run the scripts in the health check dir and check the output. If the scriptâs output contains the string ERROR, as described above, the supervisor will shut down any workers and exit. + +If the supervisor is running with supervision "/bin/storm node-health-check" can be called to determine if the supervisor should be launched or if the node is unhealthy. + +The health check directory location can be configured with: + +```yaml +storm.health.check.dir: "healthchecks" + +``` +The scripts must have execute permissions. +The time to allow any given healthcheck script to run before it is marked failed due to timeout can be configured with: + +```yaml +storm.health.check.timeout.ms: 5000 +``` + +### Configure external libraries and environmental variables (optional) + +If you need support from external libraries or custom plugins, you can place such jars into the extlib/ and extlib-daemon/ directories. Note that the extlib-daemon/ directory stores jars used only by daemons (Nimbus, Supervisor, DRPC, UI, Logviewer), e.g., HDFS and customized scheduling libraries. Accordingly, two environmental variables STORM_EXT_CLASSPATH and STORM_EXT_CLASSPATH_DAEMON can be configured by users for including the external classpath and daemon-only external classpath. + + ### Launch daemons under supervision using "storm" script and a supervisor of your choice The last step is to launch all the Storm daemons. It is critical that you run each of these daemons under supervision. Storm is a __fail-fast__ system which means the processes will halt whenever an unexpected error is encountered. Storm is designed so that it can safely halt at any point and recover correctly when the process is restarted. This is why Storm keeps no state in-process -- if Nimbus or the Supervisors restart, the running topologies are unaffected. Here's how to run the Storm daemons: 1. **Nimbus**: Run the command "bin/storm nimbus" under supervision on the master machine. 2. **Supervisor**: Run the command "bin/storm supervisor" under supervision on each worker machine. The supervisor daemon is responsible for starting and stopping worker processes on that machine. -3. **UI**: Run the Storm UI (a site you can access from the browser that gives diagnostics on the cluster and topologies) by running the command "bin/storm ui" under supervision. The UI can be accessed by navigating your web browser to http://{nimbus host}:8080. +3. **UI**: Run the Storm UI (a site you can access from the browser that gives diagnostics on the cluster and topologies) by running the command "bin/storm ui" under supervision. The UI can be accessed by navigating your web browser to http://{ui host}:8080. As you can see, running the daemons is very straightforward. The daemons will log to the logs/ directory in wherever you extracted the Storm release. http://git-wip-us.apache.org/repos/asf/storm/blob/97f0558e/docs/Setting-up-development-environment.md ---------------------------------------------------------------------- diff --git a/docs/Setting-up-development-environment.md b/docs/Setting-up-development-environment.md index 07ba670..bfa98a2 100644 --- a/docs/Setting-up-development-environment.md +++ b/docs/Setting-up-development-environment.md @@ -1,9 +1,11 @@ --- +title: Setting Up a Development Environment layout: documentation +documentation: true --- This page outlines what you need to do to get a Storm development environment set up. In summary, the steps are: -1. Download a [Storm release](/releases.html) , unpack it, and put the unpacked `bin/` directory on your PATH +1. Download a [Storm release](..//downloads.html) , unpack it, and put the unpacked `bin/` directory on your PATH 2. To be able to start and stop topologies on a remote cluster, put the cluster information in `~/.storm/storm.yaml` More detail on each of these steps is below. @@ -18,7 +20,7 @@ Let's quickly go over the relationship between your machine and a remote cluster ### Installing a Storm release locally -If you want to be able to submit topologies to a remote cluster from your machine, you should install a Storm release locally. Installing a Storm release will give you the `storm` client that you can use to interact with remote clusters. To install Storm locally, download a release [from here](/releases.html) and unzip it somewhere on your computer. Then add the unpacked `bin/` directory onto your `PATH` and make sure the `bin/storm` script is executable. +If you want to be able to submit topologies to a remote cluster from your machine, you should install a Storm release locally. Installing a Storm release will give you the `storm` client that you can use to interact with remote clusters. To install Storm locally, download a release [from here](https://github.com/apache/storm/releases) and unzip it somewhere on your computer. Then add the unpacked `bin/` directory onto your `PATH` and make sure the `bin/storm` script is executable. Installing a Storm release locally is only for interacting with remote clusters. For developing and testing topologies in local mode, it is recommended that you use Maven to include Storm as a dev dependency for your project. You can read more about using Maven for this purpose on [Maven](Maven.html). @@ -27,13 +29,5 @@ Installing a Storm release locally is only for interacting with remote clusters. The previous step installed the `storm` client on your machine which is used to communicate with remote Storm clusters. Now all you have to do is tell the client which Storm cluster to talk to. To do this, all you have to do is put the host address of the master in the `~/.storm/storm.yaml` file. It should look something like this: ``` -nimbus.host: "123.45.678.890" +nimbus.seeds: ["123.45.678.890"] ``` - -Alternatively, if you use the [storm-deploy](https://github.com/nathanmarz/storm-deploy) project to provision Storm clusters on AWS, it will automatically set up your ~/.storm/storm.yaml file. You can manually attach to a Storm cluster (or switch between multiple clusters) using the "attach" command, like so: - -``` -lein run :deploy --attach --name mystormcluster -``` - -More information is on the storm-deploy [wiki](https://github.com/nathanmarz/storm-deploy/wiki) http://git-wip-us.apache.org/repos/asf/storm/blob/97f0558e/docs/Spout-implementations.md ---------------------------------------------------------------------- diff --git a/docs/Spout-implementations.md b/docs/Spout-implementations.md index 10ddd42..f52e662 100644 --- a/docs/Spout-implementations.md +++ b/docs/Spout-implementations.md @@ -1,5 +1,7 @@ --- +title: Spout Implementations layout: documentation +documentation: true --- * [storm-kestrel](https://github.com/nathanmarz/storm-kestrel): Adapter to use Kestrel as a spout * [storm-amqp-spout](https://github.com/rapportive-oss/storm-amqp-spout): Adapter to use AMQP source as a spout http://git-wip-us.apache.org/repos/asf/storm/blob/97f0558e/docs/Structure-of-the-codebase.md ---------------------------------------------------------------------- diff --git a/docs/Structure-of-the-codebase.md b/docs/Structure-of-the-codebase.md index 8ac66f4..573a93c 100644 --- a/docs/Structure-of-the-codebase.md +++ b/docs/Structure-of-the-codebase.md @@ -1,5 +1,7 @@ --- +title: Structure of the Codebase layout: documentation +documentation: true --- There are three distinct layers to Storm's codebase. @@ -13,18 +15,18 @@ The following sections explain each of these layers in more detail. ### storm.thrift -The first place to look to understand the structure of Storm's codebase is the [storm.thrift](https://github.com/apache/incubator-storm/blob/master/storm-core/src/storm.thrift) file. +The first place to look to understand the structure of Storm's codebase is the [storm.thrift]({{page.git-blob-base}}/storm-core/src/storm.thrift) file. Storm uses [this fork](https://github.com/nathanmarz/thrift/tree/storm) of Thrift (branch 'storm') to produce the generated code. This "fork" is actually Thrift 7 with all the Java packages renamed to be `org.apache.thrift7`. Otherwise, it's identical to Thrift 7. This fork was done because of the lack of backwards compatibility in Thrift and the need for many people to use other versions of Thrift in their Storm topologies. -Every spout or bolt in a topology is given a user-specified identifier called the "component id". The component id is used to specify subscriptions from a bolt to the output streams of other spouts or bolts. A [StormTopology](https://github.com/apache/incubator-storm/blob/master/storm-core/src/storm.thrift#L91) structure contains a map from component id to component for each type of component (spouts and bolts). +Every spout or bolt in a topology is given a user-specified identifier called the "component id". The component id is used to specify subscriptions from a bolt to the output streams of other spouts or bolts. A [StormTopology]({{page.git-blob-base}}/storm-core/src/storm.thrift#L91) structure contains a map from component id to component for each type of component (spouts and bolts). -Spouts and bolts have the same Thrift definition, so let's just take a look at the [Thrift definition for bolts](https://github.com/apache/incubator-storm/blob/master/storm-core/src/storm.thrift#L79). It contains a `ComponentObject` struct and a `ComponentCommon` struct. +Spouts and bolts have the same Thrift definition, so let's just take a look at the [Thrift definition for bolts]({{page.git-blob-base}}/storm-core/src/storm.thrift#L79). It contains a `ComponentObject` struct and a `ComponentCommon` struct. The `ComponentObject` defines the implementation for the bolt. It can be one of three types: -1. A serialized java object (that implements [IBolt](https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/task/IBolt.java)) -2. A `ShellComponent` object that indicates the implementation is in another language. Specifying a bolt this way will cause Storm to instantiate a [ShellBolt](https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/task/ShellBolt.java) object to handle the communication between the JVM-based worker process and the non-JVM-based implementation of the component. +1. A serialized java object (that implements [IBolt]({{page.git-blob-base}}/storm-core/src/jvm/backtype/storm/task/IBolt.java)) +2. A `ShellComponent` object that indicates the implementation is in another language. Specifying a bolt this way will cause Storm to instantiate a [ShellBolt]({{page.git-blob-base}}/storm-core/src/jvm/backtype/storm/task/ShellBolt.java) object to handle the communication between the JVM-based worker process and the non-JVM-based implementation of the component. 3. A `JavaObject` structure which tells Storm the classname and constructor arguments to use to instantiate that bolt. This is useful if you want to define a topology in a non-JVM language. This way, you can make use of JVM-based spouts and bolts without having to create and serialize a Java object yourself. `ComponentCommon` defines everything else for this component. This includes: @@ -32,9 +34,9 @@ The `ComponentObject` defines the implementation for the bolt. It can be one of 1. What streams this component emits and the metadata for each stream (whether it's a direct stream, the fields declaration) 2. What streams this component consumes (specified as a map from component_id:stream_id to the stream grouping to use) 3. The parallelism for this component -4. The component-specific [configuration](https://github.com/apache/incubator-storm/wiki/Configuration) for this component +4. The component-specific [configuration](Configuration.html) for this component -Note that the structure spouts also have a `ComponentCommon` field, and so spouts can also have declarations to consume other input streams. Yet the Storm Java API does not provide a way for spouts to consume other streams, and if you put any input declarations there for a spout you would get an error when you tried to submit the topology. The reason that spouts have an input declarations field is not for users to use, but for Storm itself to use. Storm adds implicit streams and bolts to the topology to set up the [acking framework](https://github.com/apache/incubator-storm/wiki/Guaranteeing-message-processing), and two of these implicit streams are from the acker bolt to each spout in the topology. The acker sends "ack" or "fail" messages along these streams whenever a tuple tree is detected to be completed or failed. The code that transforms the user's topology into the runtime topology is located [here](https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/back type/storm/daemon/common.clj#L279). +Note that the structure spouts also have a `ComponentCommon` field, and so spouts can also have declarations to consume other input streams. Yet the Storm Java API does not provide a way for spouts to consume other streams, and if you put any input declarations there for a spout you would get an error when you tried to submit the topology. The reason that spouts have an input declarations field is not for users to use, but for Storm itself to use. Storm adds implicit streams and bolts to the topology to set up the [acking framework](https://github.com/apache/storm/wiki/Guaranteeing-message-processing), and two of these implicit streams are from the acker bolt to each spout in the topology. The acker sends "ack" or "fail" messages along these streams whenever a tuple tree is detected to be completed or failed. The code that transforms the user's topology into the runtime topology is located [here]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/daemon/common.clj#L279). ### Java interfaces @@ -49,92 +51,92 @@ The strategy for the majority of the interfaces is to: 1. Specify the interface using a Java interface 2. Provide a base class that provides default implementations when appropriate -You can see this strategy at work with the [BaseRichSpout](javadocs/backtype/storm/topology/base/BaseRichSpout.html) class. +You can see this strategy at work with the [BaseRichSpout](javadocs/backtype/storm/topology/base/BaseRichSpout.html) class. Spouts and bolts are serialized into the Thrift definition of the topology as described above. -One subtle aspect of the interfaces is the difference between `IBolt` and `ISpout` vs. `IRichBolt` and `IRichSpout`. The main difference between them is the addition of the `declareOutputFields` method in the "Rich" versions of the interfaces. The reason for the split is that the output fields declaration for each output stream needs to be part of the Thrift struct (so it can be specified from any language), but as a user you want to be able to declare the streams as part of your class. What `TopologyBuilder` does when constructing the Thrift representation is call `declareOutputFields` to get the declaration and convert it into the Thrift structure. The conversion happens [at this portion](https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java#L205) of the `TopologyBuilder` code. +One subtle aspect of the interfaces is the difference between `IBolt` and `ISpout` vs. `IRichBolt` and `IRichSpout`. The main difference between them is the addition of the `declareOutputFields` method in the "Rich" versions of the interfaces. The reason for the split is that the output fields declaration for each output stream needs to be part of the Thrift struct (so it can be specified from any language), but as a user you want to be able to declare the streams as part of your class. What `TopologyBuilder` does when constructing the Thrift representation is call `declareOutputFields` to get the declaration and convert it into the Thrift structure. The conversion happens [at this portion]({{page.git-blob-base}}/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java#L205) of the `TopologyBuilder` code. ### Implementation Specifying all the functionality via Java interfaces ensures that every feature of Storm is available via Java. Moreso, the focus on Java interfaces ensures that the user experience from Java-land is pleasant as well. -The implementation of Storm, on the other hand, is primarily in Clojure. While the codebase is about 50% Java and 50% Clojure in terms of LOC, most of the implementation logic is in Clojure. There are two notable exceptions to this, and that is the [DRPC](https://github.com/apache/incubator-storm/wiki/Distributed-RPC) and [transactional topologies](https://github.com/apache/incubator-storm/wiki/Transactional-topologies) implementations. These are implemented purely in Java. This was done to serve as an illustration for how to implement a higher level abstraction on Storm. The DRPC and transactional topologies implementations are in the [backtype.storm.coordination](https://github.com/apache/incubator-storm/tree/master/storm-core/src/jvm/backtype/storm/coordination), [backtype.storm.drpc](https://github.com/apache/incubator-storm/tree/master/storm-core/src/jvm/backtype/storm/drpc), and [backtype.storm.transactional](https://github.com/apache/incubator-storm/tree/master/storm-core/src /jvm/backtype/storm/transactional) packages. +The implementation of Storm, on the other hand, is primarily in Clojure. While the codebase is about 50% Java and 50% Clojure in terms of LOC, most of the implementation logic is in Clojure. There are two notable exceptions to this, and that is the [DRPC](https://github.com/apache/storm/wiki/Distributed-RPC) and [transactional topologies](https://github.com/apache/storm/wiki/Transactional-topologies) implementations. These are implemented purely in Java. This was done to serve as an illustration for how to implement a higher level abstraction on Storm. The DRPC and transactional topologies implementations are in the [backtype.storm.coordination]({{page.git-tree-base}}/storm-core/src/jvm/backtype/storm/coordination), [backtype.storm.drpc]({{page.git-tree-base}}/storm-core/src/jvm/backtype/storm/drpc), and [backtype.storm.transactional]({{page.git-tree-base}}/storm-core/src/jvm/backtype/storm/transactional) packages. Here's a summary of the purpose of the main Java packages and Clojure namespace: #### Java packages -[backtype.storm.coordination](https://github.com/apache/incubator-storm/tree/master/storm-core/src/jvm/backtype/storm/coordination): Implements the pieces required to coordinate batch-processing on top of Storm, which both DRPC and transactional topologies use. `CoordinatedBolt` is the most important class here. +[backtype.storm.coordination]({{page.git-tree-base}}/storm-core/src/jvm/backtype/storm/coordination): Implements the pieces required to coordinate batch-processing on top of Storm, which both DRPC and transactional topologies use. `CoordinatedBolt` is the most important class here. -[backtype.storm.drpc](https://github.com/apache/incubator-storm/tree/master/storm-core/src/jvm/backtype/storm/drpc): Implementation of the DRPC higher level abstraction +[backtype.storm.drpc]({{page.git-tree-base}}/storm-core/src/jvm/backtype/storm/drpc): Implementation of the DRPC higher level abstraction -[backtype.storm.generated](https://github.com/apache/incubator-storm/tree/master/storm-core/src/jvm/backtype/storm/generated): The generated Thrift code for Storm (generated using [this fork](https://github.com/nathanmarz/thrift) of Thrift, which simply renames the packages to org.apache.thrift7 to avoid conflicts with other Thrift versions) +[backtype.storm.generated]({{page.git-tree-base}}/storm-core/src/jvm/backtype/storm/generated): The generated Thrift code for Storm (generated using [this fork](https://github.com/nathanmarz/thrift) of Thrift, which simply renames the packages to org.apache.thrift7 to avoid conflicts with other Thrift versions) -[backtype.storm.grouping](https://github.com/apache/incubator-storm/tree/master/storm-core/src/jvm/backtype/storm/grouping): Contains interface for making custom stream groupings +[backtype.storm.grouping]({{page.git-tree-base}}/storm-core/src/jvm/backtype/storm/grouping): Contains interface for making custom stream groupings -[backtype.storm.hooks](https://github.com/apache/incubator-storm/tree/master/storm-core/src/jvm/backtype/storm/hooks): Interfaces for hooking into various events in Storm, such as when tasks emit tuples, when tuples are acked, etc. User guide for hooks is [here](https://github.com/apache/incubator-storm/wiki/Hooks). +[backtype.storm.hooks]({{page.git-tree-base}}/storm-core/src/jvm/backtype/storm/hooks): Interfaces for hooking into various events in Storm, such as when tasks emit tuples, when tuples are acked, etc. User guide for hooks is [here](https://github.com/apache/storm/wiki/Hooks). -[backtype.storm.serialization](https://github.com/apache/incubator-storm/tree/master/storm-core/src/jvm/backtype/storm/serialization): Implementation of how Storm serializes/deserializes tuples. Built on top of [Kryo](http://code.google.com/p/kryo/). +[backtype.storm.serialization]({{page.git-tree-base}}/storm-core/src/jvm/backtype/storm/serialization): Implementation of how Storm serializes/deserializes tuples. Built on top of [Kryo](http://code.google.com/p/kryo/). -[backtype.storm.spout](https://github.com/apache/incubator-storm/tree/master/storm-core/src/jvm/backtype/storm/spout): Definition of spout and associated interfaces (like the `SpoutOutputCollector`). Also contains `ShellSpout` which implements the protocol for defining spouts in non-JVM languages. +[backtype.storm.spout]({{page.git-tree-base}}/storm-core/src/jvm/backtype/storm/spout): Definition of spout and associated interfaces (like the `SpoutOutputCollector`). Also contains `ShellSpout` which implements the protocol for defining spouts in non-JVM languages. -[backtype.storm.task](https://github.com/apache/incubator-storm/tree/master/storm-core/src/jvm/backtype/storm/task): Definition of bolt and associated interfaces (like `OutputCollector`). Also contains `ShellBolt` which implements the protocol for defining bolts in non-JVM languages. Finally, `TopologyContext` is defined here as well, which is provided to spouts and bolts so they can get data about the topology and its execution at runtime. +[backtype.storm.task]({{page.git-tree-base}}/storm-core/src/jvm/backtype/storm/task): Definition of bolt and associated interfaces (like `OutputCollector`). Also contains `ShellBolt` which implements the protocol for defining bolts in non-JVM languages. Finally, `TopologyContext` is defined here as well, which is provided to spouts and bolts so they can get data about the topology and its execution at runtime. -[backtype.storm.testing](https://github.com/apache/incubator-storm/tree/master/storm-core/src/jvm/backtype/storm/testing): Contains a variety of test bolts and utilities used in Storm's unit tests. +[backtype.storm.testing]({{page.git-tree-base}}/storm-core/src/jvm/backtype/storm/testing): Contains a variety of test bolts and utilities used in Storm's unit tests. -[backtype.storm.topology](https://github.com/apache/incubator-storm/tree/master/storm-core/src/jvm/backtype/storm/topology): Java layer over the underlying Thrift structure to provide a clean, pure-Java API to Storm (users don't have to know about Thrift). `TopologyBuilder` is here as well as the helpful base classes for the different spouts and bolts. The slightly-higher level `IBasicBolt` interface is here, which is a simpler way to write certain kinds of bolts. +[backtype.storm.topology]({{page.git-tree-base}}/storm-core/src/jvm/backtype/storm/topology): Java layer over the underlying Thrift structure to provide a clean, pure-Java API to Storm (users don't have to know about Thrift). `TopologyBuilder` is here as well as the helpful base classes for the different spouts and bolts. The slightly-higher level `IBasicBolt` interface is here, which is a simpler way to write certain kinds of bolts. -[backtype.storm.transactional](https://github.com/apache/incubator-storm/tree/master/storm-core/src/jvm/backtype/storm/transactional): Implementation of transactional topologies. +[backtype.storm.transactional]({{page.git-tree-base}}/storm-core/src/jvm/backtype/storm/transactional): Implementation of transactional topologies. -[backtype.storm.tuple](https://github.com/apache/incubator-storm/tree/master/storm-core/src/jvm/backtype/storm/tuple): Implementation of Storm's tuple data model. +[backtype.storm.tuple]({{page.git-tree-base}}/storm-core/src/jvm/backtype/storm/tuple): Implementation of Storm's tuple data model. -[backtype.storm.utils](https://github.com/apache/incubator-storm/tree/master/storm-core/src/jvm/backtype/storm/tuple): Data structures and miscellaneous utilities used throughout the codebase. +[backtype.storm.utils]({{page.git-tree-base}}/storm-core/src/jvm/backtype/storm/tuple): Data structures and miscellaneous utilities used throughout the codebase. #### Clojure namespaces -[backtype.storm.bootstrap](https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/bootstrap.clj): Contains a helpful macro to import all the classes and namespaces that are used throughout the codebase. +[backtype.storm.bootstrap]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/bootstrap.clj): Contains a helpful macro to import all the classes and namespaces that are used throughout the codebase. -[backtype.storm.clojure](https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/clojure.clj): Implementation of the Clojure DSL for Storm. +[backtype.storm.clojure]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/clojure.clj): Implementation of the Clojure DSL for Storm. -[backtype.storm.cluster](https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/cluster.clj): All Zookeeper logic used in Storm daemons is encapsulated in this file. This code manages how cluster state (like what tasks are running where, what spout/bolt each task runs as) is mapped to the Zookeeper "filesystem" API. +[backtype.storm.cluster]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/cluster.clj): All Zookeeper logic used in Storm daemons is encapsulated in this file. This code manages how cluster state (like what tasks are running where, what spout/bolt each task runs as) is mapped to the Zookeeper "filesystem" API. -[backtype.storm.command.*](https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/command): These namespaces implement various commands for the `storm` command line client. These implementations are very short. +[backtype.storm.command.*]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/command): These namespaces implement various commands for the `storm` command line client. These implementations are very short. -[backtype.storm.config](https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/config.clj): Implementation of config reading/parsing code for Clojure. Also has utility functions for determining what local path nimbus/supervisor/daemons should be using for various things. e.g. the `master-inbox` function will return the local path that Nimbus should use when jars are uploaded to it. +[backtype.storm.config]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/config.clj): Implementation of config reading/parsing code for Clojure. Also has utility functions for determining what local path nimbus/supervisor/daemons should be using for various things. e.g. the `master-inbox` function will return the local path that Nimbus should use when jars are uploaded to it. -[backtype.storm.daemon.acker](https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/daemon/acker.clj): Implementation of the "acker" bolt, which is a key part of how Storm guarantees data processing. +[backtype.storm.daemon.acker]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/daemon/acker.clj): Implementation of the "acker" bolt, which is a key part of how Storm guarantees data processing. -[backtype.storm.daemon.common](https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/daemon/common.clj): Implementation of common functions used in Storm daemons, like getting the id for a topology based on the name, mapping a user's topology into the one that actually executes (with implicit acking streams and acker bolt added - see `system-topology!` function), and definitions for the various heartbeat and other structures persisted by Storm. +[backtype.storm.daemon.common]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/daemon/common.clj): Implementation of common functions used in Storm daemons, like getting the id for a topology based on the name, mapping a user's topology into the one that actually executes (with implicit acking streams and acker bolt added - see `system-topology!` function), and definitions for the various heartbeat and other structures persisted by Storm. -[backtype.storm.daemon.drpc](https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/daemon/drpc.clj): Implementation of the DRPC server for use with DRPC topologies. +[backtype.storm.daemon.drpc]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/daemon/drpc.clj): Implementation of the DRPC server for use with DRPC topologies. -[backtype.storm.daemon.nimbus](https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/daemon/nimbus.clj): Implementation of Nimbus. +[backtype.storm.daemon.nimbus]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/daemon/nimbus.clj): Implementation of Nimbus. -[backtype.storm.daemon.supervisor](https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/daemon/supervisor.clj): Implementation of Supervisor. +[backtype.storm.daemon.supervisor]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/daemon/supervisor.clj): Implementation of Supervisor. -[backtype.storm.daemon.task](https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/daemon/task.clj): Implementation of an individual task for a spout or bolt. Handles message routing, serialization, stats collection for the UI, as well as the spout-specific and bolt-specific execution implementations. +[backtype.storm.daemon.task]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/daemon/task.clj): Implementation of an individual task for a spout or bolt. Handles message routing, serialization, stats collection for the UI, as well as the spout-specific and bolt-specific execution implementations. -[backtype.storm.daemon.worker](https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/daemon/worker.clj): Implementation of a worker process (which will contain many tasks within). Implements message transferring and task launching. +[backtype.storm.daemon.worker]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/daemon/worker.clj): Implementation of a worker process (which will contain many tasks within). Implements message transferring and task launching. -[backtype.storm.event](https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/event.clj): Implements a simple asynchronous function executor. Used in various places in Nimbus and Supervisor to make functions execute in serial to avoid any race conditions. +[backtype.storm.event]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/event.clj): Implements a simple asynchronous function executor. Used in various places in Nimbus and Supervisor to make functions execute in serial to avoid any race conditions. -[backtype.storm.log](https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/log.clj): Defines the functions used to log messages to log4j. +[backtype.storm.log]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/log.clj): Defines the functions used to log messages to log4j. -[backtype.storm.messaging.*](https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/messaging): Defines a higher level interface to implementing point to point messaging. In local mode Storm uses in-memory Java queues to do this; on a cluster, it uses ZeroMQ. The generic interface is defined in protocol.clj. +[backtype.storm.messaging.*]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/messaging): Defines a higher level interface to implementing point to point messaging. In local mode Storm uses in-memory Java queues to do this; on a cluster, it uses ZeroMQ. The generic interface is defined in protocol.clj. -[backtype.storm.stats](https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/stats.clj): Implementation of stats rollup routines used when sending stats to ZK for use by the UI. Does things like windowed and rolling aggregations at multiple granularities. +[backtype.storm.stats]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/stats.clj): Implementation of stats rollup routines used when sending stats to ZK for use by the UI. Does things like windowed and rolling aggregations at multiple granularities. -[backtype.storm.testing](https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/testing.clj): Implementation of facilities used to test Storm topologies. Includes time simulation, `complete-topology` for running a fixed set of tuples through a topology and capturing the output, tracker topologies for having fine grained control over detecting when a cluster is "idle", and other utilities. +[backtype.storm.testing]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/testing.clj): Implementation of facilities used to test Storm topologies. Includes time simulation, `complete-topology` for running a fixed set of tuples through a topology and capturing the output, tracker topologies for having fine grained control over detecting when a cluster is "idle", and other utilities. -[backtype.storm.thrift](https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/thrift.clj): Clojure wrappers around the generated Thrift API to make working with Thrift structures more pleasant. +[backtype.storm.thrift]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/thrift.clj): Clojure wrappers around the generated Thrift API to make working with Thrift structures more pleasant. -[backtype.storm.timer](https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/timer.clj): Implementation of a background timer to execute functions in the future or on a recurring interval. Storm couldn't use the [Timer](http://docs.oracle.com/javase/1.4.2/docs/api/java/util/Timer.html) class because it needed integration with time simulation in order to be able to unit test Nimbus and the Supervisor. +[backtype.storm.timer]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/timer.clj): Implementation of a background timer to execute functions in the future or on a recurring interval. Storm couldn't use the [Timer](http://docs.oracle.com/javase/1.4.2/docs/api/java/util/Timer.html) class because it needed integration with time simulation in order to be able to unit test Nimbus and the Supervisor. -[backtype.storm.ui.*](https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/ui): Implementation of Storm UI. Completely independent from rest of code base and uses the Nimbus Thrift API to get data. +[backtype.storm.ui.*]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/ui): Implementation of Storm UI. Completely independent from rest of code base and uses the Nimbus Thrift API to get data. -[backtype.storm.util](https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/util.clj): Contains generic utility functions used throughout the code base. +[backtype.storm.util]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/util.clj): Contains generic utility functions used throughout the code base. -[backtype.storm.zookeeper](https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/zookeeper.clj): Clojure wrapper around the Zookeeper API and implements some "high-level" stuff like "mkdirs" and "delete-recursive". +[backtype.storm.zookeeper]({{page.git-blob-base}}/storm-core/src/clj/backtype/storm/zookeeper.clj): Clojure wrapper around the Zookeeper API and implements some "high-level" stuff like "mkdirs" and "delete-recursive". http://git-wip-us.apache.org/repos/asf/storm/blob/97f0558e/docs/Support-for-non-java-languages.md ---------------------------------------------------------------------- diff --git a/docs/Support-for-non-java-languages.md b/docs/Support-for-non-java-languages.md index 724d106..d03dcad 100644 --- a/docs/Support-for-non-java-languages.md +++ b/docs/Support-for-non-java-languages.md @@ -1,5 +1,7 @@ --- +title: Support for Non-Java Languages layout: documentation +documentation: true --- * [Scala DSL](https://github.com/velvia/ScalaStorm) * [JRuby DSL](https://github.com/colinsurprenant/storm-jruby) http://git-wip-us.apache.org/repos/asf/storm/blob/97f0558e/docs/Transactional-topologies.md ---------------------------------------------------------------------- diff --git a/docs/Transactional-topologies.md b/docs/Transactional-topologies.md index 1271a21..df2a0e3 100644 --- a/docs/Transactional-topologies.md +++ b/docs/Transactional-topologies.md @@ -1,5 +1,7 @@ --- +title: Transactional Topologies layout: documentation +documentation: true --- **NOTE**: Transactional topologies have been deprecated -- use the [Trident](Trident-tutorial.html) framework instead. @@ -75,11 +77,11 @@ When using transactional topologies, Storm does the following for you: 3. *Fault detection:* Storm leverages the acking framework to efficiently determine when a batch has successfully processed, successfully committed, or failed. Storm will then replay batches appropriately. You don't have to do any acking or anchoring -- Storm manages all of this for you. 4. *First class batch processing API*: Storm layers an API on top of regular bolts to allow for batch processing of tuples. Storm manages all the coordination for determining when a task has received all the tuples for that particular transaction. Storm will also take care of cleaning up any accumulated state for each transaction (like the partial counts). -Finally, another thing to note is that transactional topologies require a source queue that can replay an exact batch of messages. Technologies like [Kestrel](https://github.com/robey/kestrel) can't do this. [Apache Kafka](http://incubator.apache.org/kafka/index.html) is a perfect fit for this kind of spout, and [storm-kafka](https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka) in [storm-contrib](https://github.com/nathanmarz/storm-contrib) contains a transactional spout implementation for Kafka. +Finally, another thing to note is that transactional topologies require a source queue that can replay an exact batch of messages. Technologies like [Kestrel](https://github.com/robey/kestrel) can't do this. [Apache Kafka](http://incubator.apache.org/kafka/index.html) is a perfect fit for this kind of spout, and [storm-kafka](https://github.com/apache/storm/tree/master/external/storm-kafka) contains a transactional spout implementation for Kafka. ## The basics through example -You build transactional topologies by using [TransactionalTopologyBuilder](javadocs/backtype/storm/transactional/TransactionalTopologyBuilder.html). Here's the transactional topology definition for a topology that computes the global count of tuples from the input stream. This code comes from [TransactionalGlobalCount](https://github.com/nathanmarz/storm-starter/blob/master/src/jvm/storm/starter/TransactionalGlobalCount.java) in storm-starter. +You build transactional topologies by using [TransactionalTopologyBuilder](javadocs/backtype/storm/transactional/TransactionalTopologyBuilder.html). Here's the transactional topology definition for a topology that computes the global count of tuples from the input stream. This code comes from [TransactionalGlobalCount]({{page.git-blob-base}}/examples/storm-starter/src/jvm/storm/starter/TransactionalGlobalCount.java) in storm-starter. ```java MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH); @@ -128,7 +130,7 @@ public static class BatchCount extends BaseBatchBolt { } ``` -A new instance of this object is created for every batch that's being processed. The actual bolt this runs within is called [BatchBoltExecutor](https://github.com/apache/incubator-storm/blob/0.7.0/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java) and manages the creation and cleanup for these objects. +A new instance of this object is created for every batch that's being processed. The actual bolt this runs within is called [BatchBoltExecutor](https://github.com/apache/storm/blob/0.7.0/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java) and manages the creation and cleanup for these objects. The `prepare` method parameterizes this batch bolt with the Storm config, the topology context, an output collector, and the id for this batch of tuples. In the case of transactional topologies, the id will be a [TransactionAttempt](javadocs/backtype/storm/transactional/TransactionAttempt.html) object. The batch bolt abstraction can be used in Distributed RPC as well which uses a different type of id for the batches. `BatchBolt` can actually be parameterized with the type of the id, so if you only intend to use the batch bolt for transactional topologies, you can extend `BaseTransactionalBolt` which has this definition: @@ -199,7 +201,7 @@ First, notice that this bolt implements the `ICommitter` interface. This tells S The code for `finishBatch` in `UpdateGlobalCount` gets the current value from the database and compares its transaction id to the transaction id for this batch. If they are the same, it does nothing. Otherwise, it increments the value in the database by the partial count for this batch. -A more involved transactional topology example that updates multiple databases idempotently can be found in storm-starter in the [TransactionalWords](https://github.com/nathanmarz/storm-starter/blob/master/src/jvm/storm/starter/TransactionalWords.java) class. +A more involved transactional topology example that updates multiple databases idempotently can be found in storm-starter in the [TransactionalWords]({{page.git-blob-base}}/examples/storm-starter/src/jvm/storm/starter/TransactionalWords.java) class. ## Transactional Topology API @@ -253,7 +255,7 @@ The details of implementing a `TransactionalSpout` are in [the Javadoc](javadocs #### Partitioned Transactional Spout -A common kind of transactional spout is one that reads the batches from a set of partitions across many queue brokers. For example, this is how [TransactionalKafkaSpout](https://github.com/nathanmarz/storm-contrib/blob/master/storm-kafka/src/jvm/storm/kafka/TransactionalKafkaSpout.java) works. An `IPartitionedTransactionalSpout` automates the bookkeeping work of managing the state for each partition to ensure idempotent replayability. See [the Javadoc](javadocs/backtype/storm/transactional/partitioned/IPartitionedTransactionalSpout.html) for more details. +A common kind of transactional spout is one that reads the batches from a set of partitions across many queue brokers. For example, this is how [TransactionalKafkaSpout]({{page.git-tree-base}}/external/storm-kafka/src/jvm/storm/kafka/TransactionalKafkaSpout.java) works. An `IPartitionedTransactionalSpout` automates the bookkeeping work of managing the state for each partition to ensure idempotent replayability. See [the Javadoc](javadocs/backtype/storm/transactional/partitioned/IPartitionedTransactionalSpout.html) for more details. ### Configuration @@ -323,7 +325,7 @@ In this scenario, tuples 41-50 are skipped. By failing all subsequent transactio By failing all subsequent transactions on failure, no tuples are skipped. This also shows that a requirement of transactional spouts is that they always emit where the last transaction left off. -A non-idempotent transactional spout is more concisely referred to as an "OpaqueTransactionalSpout" (opaque is the opposite of idempotent). [IOpaquePartitionedTransactionalSpout](javadocs/backtype/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.html) is an interface for implementing opaque partitioned transactional spouts, of which [OpaqueTransactionalKafkaSpout](https://github.com/nathanmarz/storm-contrib/blob/kafka0.7/storm-kafka/src/jvm/storm/kafka/OpaqueTransactionalKafkaSpout.java) is an example. `OpaqueTransactionalKafkaSpout` can withstand losing individual Kafka nodes without sacrificing accuracy as long as you use the update strategy as explained in this section. +A non-idempotent transactional spout is more concisely referred to as an "OpaqueTransactionalSpout" (opaque is the opposite of idempotent). [IOpaquePartitionedTransactionalSpout](javadocs/backtype/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.html) is an interface for implementing opaque partitioned transactional spouts, of which [OpaqueTransactionalKafkaSpout]({{page.git-tree-base}}/external/storm-kafka/src/jvm/storm/kafka/OpaqueTransactionalKafkaSpout.java) is an example. `OpaqueTransactionalKafkaSpout` can withstand losing individual Kafka nodes without sacrificing accuracy as long as you use the update strategy as explained in this section. ## Implementation
