Added: storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/storm-solr.md URL: http://svn.apache.org/viewvc/storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/storm-solr.md?rev=1735652&view=auto ============================================================================== --- storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/storm-solr.md (added) +++ storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/storm-solr.md Fri Mar 18 17:50:56 2016 @@ -0,0 +1,184 @@ +--- +title: Storm Solr Integration +layout: documentation +documentation: true +--- + +Storm and Trident integration for Apache Solr. This package includes a bolt and a trident state that enable a Storm topology +stream the contents of storm tuples to index Solr collections. + +# Index Storm tuples into a Solr collection +The The bolt and trident state provided use one of the supplied mappers to build a `SolrRequest` object that is +responsible for making the update calls to Solr, thus updating the index of the collection specified. + +# Usage Examples +In this section we provide some simple code snippets on how to build Storm and Trident topologies to index Solr. In subsequent sections we +describe in detail the two key components of the Storm Solr integration, the `SolrUpdateBolt`, and the `Mappers`, `SolrFieldsMapper`, and `SolrJsonMapper`. + +## Storm Bolt With JSON Mapper and Count Based Commit Strategy + +```java + new SolrUpdateBolt(solrConfig, solrMapper, solrCommitStgy) + + // zkHostString for Solr 'gettingstarted' example + SolrConfig solrConfig = new SolrConfig("127.0.0.1:9983"); + + // JSON Mapper used to generate 'SolrRequest' requests to update the "gettingstarted" Solr collection with JSON content declared the tuple field with name "JSON" + SolrMapper solrMapper = new SolrJsonMapper.Builder("gettingstarted", "JSON").build(); + + // Acks every other five tuples. Setting to null acks every tuple + SolrCommitStrategy solrCommitStgy = new CountBasedCommit(5); +``` + +## Trident Topology With Fields Mapper +```java + new SolrStateFactory(solrConfig, solrMapper); + + // zkHostString for Solr 'gettingstarted' example + SolrConfig solrConfig = new SolrConfig("127.0.0.1:9983"); + + /* Solr Fields Mapper used to generate 'SolrRequest' requests to update the "gettingstarted" Solr collection. The Solr index is updated using the field values of the tuple fields that match static or dynamic fields declared in the schema object build using schemaBuilder */ + SolrMapper solrMapper = new SolrFieldsMapper.Builder(schemaBuilder, "gettingstarted").build(); + + // builds the Schema object from the JSON representation of the schema as returned by the URL http://localhost:8983/solr/gettingstarted/schema/ + SchemaBuilder schemaBuilder = new RestJsonSchemaBuilder("localhost", "8983", "gettingstarted") +``` + +## SolrUpdateBolt + `SolrUpdateBolt` streams tuples directly into Apache Solr. The Solr index is updated using`SolrRequest` requests. + The `SolrUpdateBolt` is configurable using implementations of `SolrConfig`, `SolrMapper`, and optionally `SolrCommitStrategy`. + + The data to stream onto Solr is extracted from the tuples using the strategy defined in the `SolrMapper` implementation. + + The `SolrRquest` can be sent every tuple, or according to a strategy defined by `SolrCommitStrategy` implementations. + If a `SolrCommitStrategy` is in place and one of the tuples in the batch fails, the batch is not committed, and all the tuples in that + batch are marked as Fail, and retried. On the other hand, if all tuples succeed, the `SolrRequest` is committed and all tuples are successfully acked. + + `SolrConfig` is the class containing Solr configuration to be made available to Storm Solr bolts. Any configuration needed in the bolts should be put in this class. + + +## SolrMapper +`SorlMapper` implementations define the strategy to extract information from the tuples. The public method +`toSolrRequest` receives a tuple or a list of tuples and returns a `SolrRequest` object that is used to update the Solr index. + + +### SolrJsonMapper +The `SolrJsonMapper` creates a Solr update request that is sent to the URL endpoint defined by Solr as the resource +destination for requests in JSON format. + +To create a `SolrJsonMapper` the client must specify the name of the collection to update as well as the +tuple field that contains the JSON object used to update the Solr index. If the tuple does not contain the field specified, +a `SolrMapperException` will be thrown when the method `toSolrRequest`is called. If the field exists, its value can either +be a String with the contents in JSON format, or a Java object that will be serialized to JSON + +Code snippet illustrating how to create a `SolrJsonMapper` object to update the `gettingstarted` Solr collection with JSON content +declared in the tuple field with name "JSON" +``` java + SolrMapper solrMapper = new SolrJsonMapper.Builder("gettingstarted", "JSON").build(); +``` + + +### SolrFieldsMapper +The `SolrFieldsMapper` creates a Solr update request that is sent to the Solr URL endpoint that handles the updates of `SolrInputDocument` objects. + +To create a `SolrFieldsMapper` the client must specify the name of the collection to update as well as the `SolrSchemaBuilder`. +The Solr `Schema` is used to extract information about the Solr schema fields and corresponding types. This metadata is used +to get the information from the tuples. Only tuple fields that match a static or dynamic Solr fields are added to the document. Tuple fields +that do not match the schema are not added to the `SolrInputDocument` being prepared for indexing. A debug log message is printed for the + tuple fields that do not match the schema and hence are not indexed. + + +The `SolrFieldsMapper` supports multivalue fields. A multivalue tuple field must be tokenized. The default token is |. Any +arbitrary token can be specified by calling the method `org.apache.storm.solr.mapper.SolrFieldsMapper.Builder.setMultiValueFieldToken` +that is part of the `SolrFieldsMapper.Builder` builder class. + +Code snippet illustrating how to create a `SolrFieldsMapper` object to update the `gettingstarted` Solr collection. The multivalue +field separates each value with the token % instead of the default | . To use the default token you can ommit the call to the method +`setMultiValueFieldToken`. + +``` java + new SolrFieldsMapper.Builder( + new RestJsonSchemaBuilder("localhost", "8983", "gettingstarted"), "gettingstarted") + .setMultiValueFieldToken("%").build(); +``` + +# Build And Run Bundled Examples +To be able to run the examples you must first build the java code in the package `storm-solr`, +and then generate an uber jar with all the dependencies. + + +## Build the Storm Apache Solr Integration Code + +`mvn clean install -f REPO_HOME/storm/external/storm-solr/pom.xml` + +## Use the Maven Shade Plugin to Build the Uber Jar + + Add the following to `REPO_HOME/storm/external/storm-solr/pom.xml` + + ``` + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>2.4.1</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>org.apache.storm.solr.topology.SolrJsonTopology</mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> +</plugin> + ``` + +create the uber jar by running the commmand: + +`mvn package -f REPO_HOME/storm/external/storm-solr/pom.xml` + +This will create the uber jar file with the name and location matching the following pattern: + +`REPO_HOME/storm/external/storm/target/storm-solr-0.11.0-SNAPSHOT.jar` + +## Run Examples +Copy the file `REPO_HOME/storm/external/storm-solr/target/storm-solr-0.11.0-SNAPSHOT.jar` to `STORM_HOME/extlib` + +**The code examples provided require that you first run the [Solr gettingstarted](http://lucene.apache.org/solr/quickstart.html) example** + +### Run Storm Topology + +STORM_HOME/bin/storm jar REPO_HOME/storm/external/storm-solr/target/storm-solr-0.11.0-SNAPSHOT-tests.jar org.apache.storm.solr.topology.SolrFieldsTopology + +STORM_HOME/bin/storm jar REPO_HOME/storm/external/storm-solr/target/storm-solr-0.11.0-SNAPSHOT-tests.jar org.apache.storm.solr.topology.SolrJsonTopology + +### Run Trident Topology + +STORM_HOME/bin/storm jar REPO_HOME/storm/external/storm-solr/target/storm-solr-0.11.0-SNAPSHOT-tests.jar org.apache.storm.solr.trident.SolrFieldsTridentTopology + +STORM_HOME/bin/storm jar REPO_HOME/storm/external/storm-solr/target/storm-solr-0.11.0-SNAPSHOT-tests.jar org.apache.storm.solr.trident.SolrJsonTridentTopology + + +### Verify Results + +The aforementioned Storm and Trident topologies index the Solr `gettingstarted` collection with objects that have the following `id` pattern: + +\*id_fields_test_val\* for `SolrFieldsTopology` and `SolrFieldsTridentTopology` + +\*json_test_val\* for `SolrJsonTopology` and `SolrJsonTridentTopology` + +Querying Solr for these patterns, you will see the values that have been indexed by the Storm Apache Solr integration: + +curl -X GET -H "Content-type:application/json" -H "Accept:application/json" http://localhost:8983/solr/gettingstarted_shard1_replica2/select?q=*id_fields_test_val*&wt=json&indent=true + +curl -X GET -H "Content-type: application/json" -H "Accept: application/json" http://localhost:8983/solr/gettingstarted_shard1_replica2/select?q=*id_fields_test_val*&wt=json&indent=true + +You can also see the results by opening the Apache Solr UI and pasting the `id` pattern in the `q` textbox in the queries page + +http://localhost:8983/solr/#/gettingstarted_shard1_replica2/query +
Added: storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/storm-sql-internal.md URL: http://svn.apache.org/viewvc/storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/storm-sql-internal.md?rev=1735652&view=auto ============================================================================== --- storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/storm-sql-internal.md (added) +++ storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/storm-sql-internal.md Fri Mar 18 17:50:56 2016 @@ -0,0 +1,55 @@ +--- +title: The Internals of Storm SQL +layout: documentation +documentation: true +--- + +This page describes the design and the implementation of the Storm SQL integration. + +## Overview + +SQL is a well-adopted yet complicated standard. Several projects including Drill, Hive, Phoenix and Spark have invested significantly in their SQL layers. One of the main design goal of StormSQL is to leverage the existing investments for these projects. StormSQL leverages [Apache Calcite](///calcite.apache.org) to implement the SQL standard. StormSQL focuses on compiling the SQL statements to Storm / Trident topologies so that they can be executed in Storm clusters. + +Figure 1 describes the workflow of executing a SQL query in StormSQL. First, users provide a sequence of SQL statements. StormSQL parses the SQL statements and translates them to a Calcite logical plan. A logical plan consists of a sequence of SQL logical operators that describe how the query should be executed irrespective to the underlying execution engines. Some examples of logical operators include `TableScan`, `Filter`, `Projection` and `GroupBy`. + +<div align="center"> +<img title="Workflow of StormSQL" src="images/storm-sql-internal-workflow.png" style="max-width: 80rem"/> + +<p>Figure 1: Workflow of StormSQL.</p> +</div> + +The next step is to compile the logical execution plan down to a physical execution plan. A physical plan consists of physical operators that describes how to execute the SQL query in *StormSQL*. Physical operators such as `Filter`, `Projection`, and `GroupBy` are directly mapped to operations in Trident topologies. StormSQL also compiles expressions in the SQL statements into Java byte codes and plugs them into the Trident topologies. + +Finally, StormSQL packages both the Java byte codes and the topology into a JAR and submits it to the Storm cluster. Storm schedules and executes the JAR in the same way of it executes other Storm topologies. + +The follow code blocks show an example query that filters and projects results from a Kafka stream. + +``` +CREATE EXTERNAL TABLE ORDERS (ID INT PRIMARY KEY, UNIT_PRICE INT, QUANTITY INT) LOCATION 'kafka://localhost:2181/brokers?topic=orders' ... + +CREATE EXTERNAL TABLE LARGE_ORDERS (ID INT PRIMARY KEY, TOTAL INT) LOCATION 'kafka://localhost:2181/brokers?topic=large_orders' ... + +INSERT INTO LARGE_ORDERS SELECT ID, UNIT_PRICE * QUANTITY AS TOTAL FROM ORDERS WHERE UNIT_PRICE * QUANTITY > 50 +``` + +The first two SQL statements define the inputs and outputs of external data. Figure 2 describes the processes of how StormSQL takes the last `SELECT` query and compiles it down to Trident topology. + +<div align="center"> +<img title="Compiling the example query to Trident topology" src="images/storm-sql-internal-example.png" style="max-width: 80rem"/> + +<p>Figure 2: Compiling the example query to Trident topology.</p> +</div> + + +## Constraints of querying streaming tables + +There are several constraints when querying tables that represent a real-time data stream: + +* The `ORDER BY` clause cannot be applied to a stream. +* There is at least one monotonic field in the `GROUP BY` clauses to allow StormSQL bounds the size of the buffer. + +For more information please refer to http://calcite.apache.org/docs/stream.html. + +## Dependency + +StormSQL does not ship the dependency of the external data sources in the packaged JAR. The users have to provide the dependency in the `extlib` directory of the worker node. Added: storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/storm-sql.md URL: http://svn.apache.org/viewvc/storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/storm-sql.md?rev=1735652&view=auto ============================================================================== --- storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/storm-sql.md (added) +++ storm/branches/bobby-versioned-site/releases/1.0.0-SNAPSHOT/storm-sql.md Fri Mar 18 17:50:56 2016 @@ -0,0 +1,97 @@ +--- +title: Storm SQL integration +layout: documentation +documentation: true +--- + +The Storm SQL integration allows users to run SQL queries over streaming data in Storm. Not only the SQL interface allows faster development cycles on streaming analytics, but also opens up the opportunities to unify batch data processing like [Apache Hive](///hive.apache.org) and real-time streaming data analytics. + +At a very high level StormSQL compiles the SQL queries to [Trident](Trident-API-Overview.html) topologies and executes them in Storm clusters. This document provides information of how to use StormSQL as end users. For people that are interested in more details in the design and the implementation of StormSQL please refer to the [this](storm-sql-internal.html) page. + +## Usage + +Run the ``storm sql`` command to compile SQL statements into Trident topology, and submit it to the Storm cluster + +``` +$ bin/storm sql <sql-file> <topo-name> +``` + +In which `sql-file` contains a list of SQL statements to be executed, and `topo-name` is the name of the topology. + + +## Supported Features + +The following features are supported in the current repository: + +* Streaming from and to external data sources +* Filtering tuples +* Projections + +## Specifying External Data Sources + +In StormSQL data is represented by external tables. Users can specify data sources using the `CREATE EXTERNAL TABLE` statement. The syntax of `CREATE EXTERNAL TABLE` closely follows the one defined in [Hive Data Definition Language](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL): + +``` +CREATE EXTERNAL TABLE table_name field_list + [ STORED AS + INPUTFORMAT input_format_classname + OUTPUTFORMAT output_format_classname + ] + LOCATION location + [ TBLPROPERTIES tbl_properties ] + [ AS select_stmt ] +``` + +You can find detailed explanations of the properties in [Hive Data Definition Language](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL). For example, the following statement specifies a Kafka spouts and sink: + +``` +CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION 'kafka://localhost:2181/brokers?topic=test' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}' +``` + +## Plugging in External Data Sources + +Users plug in external data sources through implementing the `ISqlTridentDataSource` interface and registers them using the mechanisms of Java's service loader. The external data source will be chosen based on the scheme of the URI of the tables. Please refer to the implementation of `storm-sql-kafka` for more details. + +## Example: Filtering Kafka Stream + +Let's say there is a Kafka stream that represents the transactions of orders. Each message in the stream contains the id of the order, the unit price of the product and the quantity of the orders. The goal is to filter orders where the transactions are significant and to insert these orders into another Kafka stream for further analysis. + +The user can specify the following SQL statements in the SQL file: + +``` +CREATE EXTERNAL TABLE ORDERS (ID INT PRIMARY KEY, UNIT_PRICE INT, QUANTITY INT) LOCATION 'kafka://localhost:2181/brokers?topic=orders' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}' +CREATE EXTERNAL TABLE LARGE_ORDERS (ID INT PRIMARY KEY, TOTAL INT) LOCATION 'kafka://localhost:2181/brokers?topic=large_orders' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}' +INSERT INTO LARGE_ORDERS SELECT ID, UNIT_PRICE * QUANTITY AS TOTAL FROM ORDERS WHERE UNIT_PRICE * QUANTITY > 50 +``` + +The first statement defines the table `ORDER` which represents the input stream. The `LOCATION` clause specifies the ZkHost (`localhost:2181`), the path of the brokers in ZooKeeper (`/brokers`) and the topic (`orders`). The `TBLPROPERTIES` clause specifies the configuration of [KafkaProducer](http://kafka.apache.org/documentation.html#producerconfigs). +Current implementation of `storm-sql-kafka` requires specifying both `LOCATION` and `TBLPROPERTIES` clauses even though the table is read-only or write-only. + +Similarly, the second statement specifies the table `LARGE_ORDERS` which represents the output stream. The third statement is a `SELECT` statement which defines the topology: it instructs StormSQL to filter all orders in the external table `ORDERS`, calculates the total price and inserts matching records into the Kafka stream specified by `LARGE_ORDER`. + +To run this example, users need to include the data sources (`storm-sql-kafka` in this case) and its dependency in the class path. One approach is to put the required jars into the `extlib` directory: + +``` +$ cp curator-client-2.5.0.jar curator-framework-2.5.0.jar zookeeper-3.4.6.jar + extlib/ +$ cp scala-library-2.10.4.jar kafka-clients-0.8.2.1.jar kafka_2.10-0.8.2.1.jar metrics-core-2.2.0.jar extlib/ +$ cp json-simple-1.1.1.jar extlib/ +$ cp jackson-annotations-2.6.0.jar extlib/ +$ cp storm-kafka-*.jar storm-sql-kafka-*.jar storm-sql-runtime-*.jar extlib/ +``` + +The next step is to submit the SQL statements to StormSQL: + +``` +$ bin/storm sql order_filtering order_filtering.sql +``` + +By now you should be able to see the `order_filtering` topology in the Storm UI. + +## Current Limitations + +Aggregation, windowing and joining tables are yet to be implemented. Specifying parallelism hints in the topology is not yet supported. All processors have a parallelism hint of 1. + +Users also need to provide the dependency of the external data sources in the `extlib` directory. Otherwise the topology will fail to run because of `ClassNotFoundException`. + +The current implementation of the Kafka connector in StormSQL assumes both the input and the output are in JSON formats. The connector has not yet recognized the `INPUTFORMAT` and `OUTPUTFORMAT` clauses yet.
