This is an automated email from the ASF dual-hosted git repository. dmagda pushed a commit to branch IGNITE-7595 in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/IGNITE-7595 by this push: new 74e1981 ported all streaming related docs from readme.io 74e1981 is described below commit 74e19810cc0a84bf6976b22f2206aad907c34e47 Author: Denis Magda <dma...@gridgain.com> AuthorDate: Fri Sep 25 16:17:44 2020 -0700 ported all streaming related docs from readme.io --- docs/_data/toc.yaml | 22 +++ .../streaming/camel-streamer.adoc | 139 ++++++++++++++ .../streaming/flink-streamer.adoc | 64 +++++++ .../streaming/flume-sink.adoc | 65 +++++++ .../streaming/jms-streamer.adoc | 109 +++++++++++ .../streaming/kafka-streamer.adoc | 207 +++++++++++++++++++++ .../streaming/mqtt-streamer.adoc | 62 ++++++ .../streaming/rocketmq-streamer.adoc | 71 +++++++ .../streaming/storm-streamer.adoc | 48 +++++ .../streaming/twitter-streamer.adoc | 51 +++++ .../streaming/zeromq-streamer.adoc | 53 ++++++ docs/_docs/images/integrations/camel-streamer.png | Bin 0 -> 120217 bytes 12 files changed, 891 insertions(+) diff --git a/docs/_data/toc.yaml b/docs/_data/toc.yaml index 356bbc8..2616397 100644 --- a/docs/_data/toc.yaml +++ b/docs/_data/toc.yaml @@ -399,6 +399,28 @@ url: extensions-and-integrations/hibernate-l2-cache - title: MyBatis L2 Cache url: extensions-and-integrations/mybatis-l2-cache + - title: Streaming + items: + - title: Kafka Streamer + url: extensions-and-integrations/streaming/kafka-streamer + - title: Camel Streamer + url: extensions-and-integrations/streaming/camel-streamer + - title: Flink Streamer + url: extensions-and-integrations/streaming/flink-streamer + - title: Flume Sink + url: extensions-and-integrations/streaming/flume-sink + - title: JMS Streamer + url: extensions-and-integrations/streaming/jms-streamer + - title: MQTT Streamer + url: extensions-and-integrations/streaming/mqtt-streamer + - title: RocketMQ Streamer + url: extensions-and-integrations/streaming/rocketmq-streamer + - title: Storm Streamer + url: extensions-and-integrations/streaming/storm-streamer + - title: ZeroMQ Streamer + url: extensions-and-integrations/streaming/zeromq-streamer + - title: Twitter Streamer + url: extensions-and-integrations/streaming/twitter-streamer - title: Plugins url: plugins - title: SQL Reference diff --git a/docs/_docs/extensions-and-integrations/streaming/camel-streamer.adoc b/docs/_docs/extensions-and-integrations/streaming/camel-streamer.adoc new file mode 100644 index 0000000..36614403 --- /dev/null +++ b/docs/_docs/extensions-and-integrations/streaming/camel-streamer.adoc @@ -0,0 +1,139 @@ += Apache Camel Streamer + +== Overview + +This documentation page focuses on the Apache Camel, which can also be thought of as a universal streamer because it +allows you to consume from any technology or protocol supported by Camel into an Ignite Cache. + +image::images/integrations/camel-streamer.png[Camel Streamer] + +With this streamer, you can ingest entries straight into an Ignite cache based on: + +* Calls received on a Web Service (SOAP or REST), by extracting the body or headers. +* Listening on a TCP or UDP channel for messages. +* The content of files received via FTP or written to the local filesystem. +* Email messages received via POP3 or IMAP. +* A MongoDB tailable cursor. +* An AWS SQS queue. +* And many others. + +This streamer supports two modes of ingestion: **direct ingestion** and **mediated ingestion**. + +[NOTE] +==== +[discrete] +=== The Ignite Camel Component +There is also the https://camel.apache.org/components/latest/ignite-summary.html[camel-ignite, window=_blank] component, if what you are looking is +to interact with Ignite Caches, Compute, Events, Messaging, etc. from within a Camel route. +==== + +== Maven Dependency + +To make use of the `ignite-camel` streamer, you need to add the following dependency: + +[tabs] +-- +tab:pom.xml[] +[source,xml] +---- +<dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-camel</artifactId> + <version>${ignite.version}</version> +</dependency> +---- +-- + +It will also pull in `camel-core` as a transitive dependency. + +== Direct Ingestion + +Direct Ingestion allows you to consume from any Camel endpoint straight into Ignite, with the help of a +Tuple Extractor. We call this **direct ingestion**. + +Here is a code sample: +[tabs] +-- +tab:Java[] +[source,java] +---- +// Start Apache Ignite. +Ignite ignite = Ignition.start(); + +// Create an streamer pipe which ingests into the 'mycache' cache. +IgniteDataStreamer<String, String> pipe = ignite.dataStreamer("mycache"); + +// Create a Camel streamer and connect it. +CamelStreamer<String, String> streamer = new CamelStreamer<>(); +streamer.setIgnite(ignite); +streamer.setStreamer(pipe); + +// This endpoint starts a Jetty server and consumes from all network interfaces on port 8080 and context path /ignite. +streamer.setEndpointUri("jetty:http://0.0.0.0:8080/ignite?httpMethodRestrict=POST"); + +// This is the tuple extractor. We'll assume each message contains only one tuple. +// If your message contains multiple tuples, use a StreamMultipleTupleExtractor. +// The Tuple Extractor receives the Camel Exchange and returns a Map.Entry<?,?> with the key and value. +streamer.setSingleTupleExtractor(new StreamSingleTupleExtractor<Exchange, String, String>() { + @Override public Map.Entry<String, String> extract(Exchange exchange) { + String stationId = exchange.getIn().getHeader("X-StationId", String.class); + String temperature = exchange.getIn().getBody(String.class); + return new GridMapEntry<>(stationId, temperature); + } +}); + +// Start the streamer. +streamer.start(); +---- +-- + +== Mediated Ingestion + +For more sophisticated scenarios, you can also create a Camel route that performs complex processing on incoming messages, e.g. transformations, validations, splitting, aggregating, idempotency, resequencing, enrichment, etc. and **ingest only the result into the Ignite cache**. + +We call this **mediated ingestion**. + +[tabs] +-- +tab:Java[] +[source,java] +---- +// Create a CamelContext with a custom route that will: +// (1) consume from our Jetty endpoint. +// (2) transform incoming JSON into a Java object with Jackson. +// (3) uses JSR 303 Bean Validation to validate the object. +// (4) dispatches to the direct:ignite.ingest endpoint, where the streamer is consuming from. +CamelContext context = new DefaultCamelContext(); +context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("jetty:http://0.0.0.0:8080/ignite?httpMethodRestrict=POST") + .unmarshal().json(JsonLibrary.Jackson) + .to("bean-validator:validate") + .to("direct:ignite.ingest"); + } +}); + +// Remember our Streamer is now consuming from the Direct endpoint above. +streamer.setEndpointUri("direct:ignite.ingest"); +---- +-- + +== Setting a Response + +By default, the response sent back to the caller (if it is a synchronous endpoint) is simply an echo of the original request. +If you want to customize the response, set a Camel `Processor` as a `responseProcessor`: + +[tabs] +-- +tab:Java[] +[source,java] +---- +streamer.setResponseProcessor(new Processor() { + @Override public void process(Exchange exchange) throws Exception { + exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, 200); + exchange.getOut().setBody("OK"); + } +}); +---- +-- diff --git a/docs/_docs/extensions-and-integrations/streaming/flink-streamer.adoc b/docs/_docs/extensions-and-integrations/streaming/flink-streamer.adoc new file mode 100644 index 0000000..381d85e --- /dev/null +++ b/docs/_docs/extensions-and-integrations/streaming/flink-streamer.adoc @@ -0,0 +1,64 @@ += Apache Flink Streamer + +Apache Ignite Flink Sink module is a streaming connector to inject Flink data into Ignite cache. The sink emits its input +data to Ignite cache. When creating a sink, an Ignite cache name and Ignite grid configuration file have to be provided. + +Starting data transfer to Ignite cache can be done with the following steps. + +. Import Ignite Flink Sink Module in Maven Project +If you are using Maven to manage dependencies of your project, you can add Flink module +dependency like this (replace `${ignite.version}` with actual Ignite version you are +interested in): ++ +[tabs] +-- +tab:pom.xml[] +[source,xml] +---- +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 + http://maven.apache.org/xsd/maven-4.0.0.xsd"> + ... + <dependencies> + ... + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-flink</artifactId> + <version>${ignite.version}</version> + </dependency> + ... + </dependencies> + ... +</project> +---- +-- +. Create an Ignite configuration file and make sure it is accessible from the sink. +. Make sure your data input to the sink is specified and start the sink. ++ +[tabs] +-- +tab:Java[] +[source,java] +---- +IgniteSink igniteSink = new IgniteSink("myCache", "ignite.xml"); + +igniteSink.setAllowOverwrite(true); +igniteSink.setAutoFlushFrequency(10); +igniteSink.start(); + +DataStream<Map> stream = ...; + +// Sink data into the grid. +stream.addSink(igniteSink); +try { + env.execute(); +} catch (Exception e){ + // Exception handling. +} +finally { + igniteSink.stop(); +---- +-- + +Refer to the Javadocs of the `ignite-flink` module for more info on the available options. diff --git a/docs/_docs/extensions-and-integrations/streaming/flume-sink.adoc b/docs/_docs/extensions-and-integrations/streaming/flume-sink.adoc new file mode 100644 index 0000000..9231824 --- /dev/null +++ b/docs/_docs/extensions-and-integrations/streaming/flume-sink.adoc @@ -0,0 +1,65 @@ += Apache Flume Sink + +== Overview + +Apache Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large +amounts of log data. (https://github.com/apache/flume). + +`IgniteSink` is a Flume sink that extracts events from an associated Flume channel and injects into an Ignite cache. + +`IgniteSink` and its dependencies have to be included in the agent's classpath, as described in the following subsection, +before starting the Flume agent. + +== Setting Up + +. Create a transformer by implementing `EventTransformer` interface. +. Create `ignite` directory inside `plugins.d` directory which is located in `$\{FLUME_HOME}`. If the `plugins.d` directory +is not there, create it. +. Build it and copy to `$\{FLUME_HOME}/plugins.d/ignite-sink/lib`. +. Copy other Ignite-related jar files from Apache Ignite distribution to `$\{FLUME_HOME}/plugins.d/ignite-sink/libext` to +have them as shown below. ++ +---- +plugins.d/ +`-- ignite + |-- lib + | `-- ignite-flume-transformer-x.x.x.jar <-- your jar + `-- libext + |-- cache-api-1.0.0.jar + |-- ignite-core-x.x.x.jar + |-- ignite-flume-x.x.x.jar <-- IgniteSink + |-- ignite-spring-x.x.x.jar + |-- spring-aop-4.1.0.RELEASE.jar + |-- spring-beans-4.1.0.RELEASE.jar + |-- spring-context-4.1.0.RELEASE.jar + |-- spring-core-4.1.0.RELEASE.jar + `-- spring-expression-4.1.0.RELEASE.jar +---- + +. In Flume configuration file, specify Ignite configuration XML file's location with cache properties +(see `flume/src/test/resources/example-ignite.xml` for a basic example) with the cache name specified for cache creation. +Also specify the cache name (same as in Ignite configuration file), your `EventTransformer`'s implementation class, and, +optionally, batch size. All properties are shown in the table below (required properties are in bold). ++ +[cols="20%,45%,35%",opts="header"] +|=== +|Property Name |Description | Default Value +|channel| | - +|type| The component type name. Needs to be `org.apache.ignite.stream.flume.IgniteSink` | - +|igniteCfg| Ignite configuration XML file | - +|cacheName| Cache name. Same as in igniteCfg | - +|eventTransformer| Your implementation of `org.apache.ignite.stream.flume.EventTransformer` | - +|batchSize| Number of events to be written per transaction| 100 +|=== + +The sink configuration part of agent named `a1` can look like this: + +---- +a1.sinks.k1.type = org.apache.ignite.stream.flume.IgniteSink +a1.sinks.k1.igniteCfg = /some-path/ignite.xml +a1.sinks.k1.cacheName = testCache +a1.sinks.k1.eventTransformer = my.company.MyEventTransformer +a1.sinks.k1.batchSize = 100 +---- + +After specifying your source and channel (see Flume's docs), you are ready to run a Flume agent. diff --git a/docs/_docs/extensions-and-integrations/streaming/jms-streamer.adoc b/docs/_docs/extensions-and-integrations/streaming/jms-streamer.adoc new file mode 100644 index 0000000..df6578d --- /dev/null +++ b/docs/_docs/extensions-and-integrations/streaming/jms-streamer.adoc @@ -0,0 +1,109 @@ += JMS Streamer + +== Overview + +Ignite offers a JMS Data Streamer to consume messages from JMS brokers, convert them into cache tuples and insert them in Ignite. + +This data streamer supports the following features: + +* Consumes from queues or topics. +* Supports durable subscriptions from topics. +* Concurrent consumers are supported via the `threads` parameter. + ** When consuming from queues, this component will start as many `Session` objects with separate `MessageListener` instances each, therefore achieving _natural_ concurrency. + ** When consuming from topics, obviously we cannot start multiple threads as that would lead us to consume duplicate messages. Therefore, we achieve concurrency in a _virtualized_ manner through an internal thread pool. +* Transacted sessions are supported through the `transacted` parameter. +* Batched consumption is possible via the `batched` parameter, which groups message reception within the scope of a local JMS transaction (XA not used supported). Depending on the broker, this technique can provide a higher throughput as it decreases the amount of message acknowledgment round trips that are necessary, albeit at the expense possible duplicate messages (especially if an incident occurs in the middle of a transaction). + ** Batches are committed when the `batchClosureMillis` time has elapsed, or when a Session has received at least `batchClosureSize` messages. + ** Time-based closure fires with the specified frequency and applies to all ``Session``s in parallel. + ** Size-based closure applies to each individual `Session` (as transactions are `Session-bound` in JMS), so it will fire when that `Session` has processed that many messages. + ** Both options are compatible with each other. You can disable either, but not both if batching is enabled. +* Supports specifying the destination with implementation-specific `Destination` objects or with names. + +We have tested our implementation against http://activemq.apache.org[Apache ActiveMQ, window=_blank], but any JMS broker +is supported as long as it client library implements the http://download.oracle.com/otndocs/jcp/7195-jms-1.1-fr-spec-oth-JSpec/[JMS 1.1 specification, window=_blank]. + +== Instantiating JMS Streamer + +When you instantiate the JMS Streamer, you will need to concretize the following generic types: + +* `T extends Message` \=> the type of JMS `Message` this streamer will receive. If it can receive multiple, use the generic `Message` type. +* `K` \=> the type of the cache key. +* `V` \=> the type of the cache value. + +To configure the JMS streamer, you will need to provide the following compulsory properties: + +* `connectionFactory` \=> an instance of your `ConnectionFactory` duly configured as required by the broker. It can be a pooled `ConnectionFactory`. +* `destination` or (`destinationName` and `destinationType`) \=> a `Destination` object (normally a broker-specific implementation of the JMS `Queue` or `Topic` interfaces), or the combination of a destination name (queue or topic name) and the type as a `Class` reference to either `Queue` or `Topic`. In the latter case, the streamer will use either `Session.createQueue(String)` or `Session.createTopic(String)` to get a hold of the destination. +* `transformer` \=> an implementation of `MessageTransformer<T, K, V>` that digests a JMS message of type `T` and produces a `Map<K, V>` of cache entries to add. It can also return `null` or an empty `Map` to ignore the incoming message. + +== Example + +The example in this section populates a cache with `String` keys and `String` values, consuming `TextMessages` with this format: + +---- +raulk,Raul Kripalani +dsetrakyan,Dmitriy Setrakyan +sv,Sergi Vladykin +gm,Gianfranco Murador +---- + +Here is the code: + +[tabs] +-- +tab:Java[] +[source,java] +---- +// create a data streamer +IgniteDataStreamer<String, String> dataStreamer = ignite.dataStreamer("mycache")); +dataStreamer.allowOverwrite(true); + +// create a JMS streamer and plug the data streamer into it +JmsStreamer<TextMessage, String, String> jmsStreamer = new JmsStreamer<>(); +jmsStreamer.setIgnite(ignite); +jmsStreamer.setStreamer(dataStreamer); +jmsStreamer.setConnectionFactory(connectionFactory); +jmsStreamer.setDestination(destination); +jmsStreamer.setTransacted(true); +jmsStreamer.setTransformer(new MessageTransformer<TextMessage, String, String>() { + @Override + public Map<String, String> apply(TextMessage message) { + final Map<String, String> answer = new HashMap<>(); + String text; + try { + text = message.getText(); + } + catch (JMSException e) { + LOG.warn("Could not parse message.", e); + return Collections.emptyMap(); + } + for (String s : text.split("\n")) { + String[] tokens = s.split(","); + answer.put(tokens[0], tokens[1]); + } + return answer; + } +}); + +jmsStreamer.start(); + +// on application shutdown +jmsStreamer.stop(); +dataStreamer.close(); +---- +-- + +To use this component, you have to import the following module through your build system (Maven, Ivy, Gradle, sbt, etc.): + +[tabs] +-- +tab:pom.xml[] +[source,xml] +---- +<dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-jms11</artifactId> + <version>${ignite.version}</version> +</dependency> +---- +-- diff --git a/docs/_docs/extensions-and-integrations/streaming/kafka-streamer.adoc b/docs/_docs/extensions-and-integrations/streaming/kafka-streamer.adoc new file mode 100644 index 0000000..c1d510d --- /dev/null +++ b/docs/_docs/extensions-and-integrations/streaming/kafka-streamer.adoc @@ -0,0 +1,207 @@ += Apache Kafka Streamer + +== Overview + +Apache Ignite Kafka Streamer module provides streaming from Kafka to Ignite cache. +Either of the following two methods can be used to achieve such streaming: + +* using Kafka Connect functionality with Ignite sink +* importing the Kafka Streamer module in your Maven project and instantiating KafkaStreamer for data streaming + +== Streaming Data via Kafka Connect + +`IgniteSinkConnector` will help you export data from Kafka to Ignite cache by polling data from Kafka topics and writing +it to your specified cache. The connector can be found in the `optional/ignite-kafka` module. It and its dependencies +have to be on the classpath of a Kafka running instance, as described in the following subsection. _For more information +on Kafka Connect, see http://kafka.apache.org/documentation.html#connect[Kafka Documentation, window=_blank]._ + +=== Setting up and Running + +. Add the `IGNITE_HOME/libs/optional/ignite-kafka` module to the application classpath. + +. Prepare worker configurations, e.g., ++ +[tabs] +-- +tab:Configuration[] +[source,yaml] +---- +bootstrap.servers=localhost:9092 + +key.converter=org.apache.kafka.connect.storage.StringConverter +value.converter=org.apache.kafka.connect.storage.StringConverter +key.converter.schemas.enable=false +value.converter.schemas.enable=false + +internal.key.converter=org.apache.kafka.connect.storage.StringConverter +internal.value.converter=org.apache.kafka.connect.storage.StringConverter +internal.key.converter.schemas.enable=false +internal.value.converter.schemas.enable=false + +offset.storage.file.filename=/tmp/connect.offsets +offset.flush.interval.ms=10000 +---- +-- + +. Prepare connector configurations, e.g., ++ +[tabs] +-- +tab:Configuration[] +[source,yaml] +---- +# connector +name=my-ignite-connector +connector.class=org.apache.ignite.stream.kafka.connect.IgniteSinkConnector +tasks.max=2 +topics=someTopic1,someTopic2 + +# cache +cacheName=myCache +cacheAllowOverwrite=true +igniteCfg=/some-path/ignite.xml +singleTupleExtractorCls=my.company.MyExtractor +---- +-- ++ +* where `cacheName` is the name of the cache you specify in `/some-path/ignite.xml` and the data from `someTopic1,someTopic2` +will be pulled and stored. +* `cacheAllowOverwrite` can be set to `true` if you want to enable overwriting existing values in the cache. +* If you need to parse the incoming data and decide on your new key and value, you can implement it as `StreamSingleTupleExtractor` and specify as `singleTupleExtractorCls`. +* You can also set `cachePerNodeDataSize` and `cachePerNodeParOps` to adjust per-node buffer and the maximum number of parallel stream operations for a single node. + +. Start connector, for instance, in a standalone mode as follows, ++ +[tabs] +-- +tab:Shell[] +[source,shell] +---- +bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties +---- +-- + +=== Checking the Flow + +To perform a very basic functionality check, you can do the following, + +. Start Zookeeper ++ +[tabs] +-- +tab:Shell[] +[source,shell] +---- +bin/zookeeper-server-start.sh config/zookeeper.properties +---- +-- +. Start Kafka server ++ +[tabs] +-- +tab:Shell[] +[source,shell] +---- +bin/kafka-server-start.sh config/server.properties +---- +-- +. Provide some data input to the Kafka server ++ +[tabs] +-- +tab:Shell[] +[source,shell] +---- +bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --property parse.key=true --property key.separator=,k1,v1 +---- +-- +. Start the connector ++ +[tabs] +-- +tab:Shell[] +[source,shell] +---- +bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties +---- +-- +. Check the value is in the cache. For example, via REST API, ++ +[tabs] +-- +tab:Shell[] +[source,shell] +---- +http://node1:8080/ignite?cmd=size&cacheName=cache1 +---- +-- + +== Streaming data with Ignite Kafka Streamer Module + +If you are using Maven to manage dependencies of your project, first of all you will have to add Kafka Streamer module +dependency like this (replace `${ignite.version}` with actual Ignite version you are interested in): + +[tabs] +-- +tab:pom.xml[] +[source,xml] +---- +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 + http://maven.apache.org/xsd/maven-4.0.0.xsd"> + ... + <dependencies> + ... + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-kafka</artifactId> + <version>${ignite.version}</version> + </dependency> + ... + </dependencies> + ... +</project> +---- +-- + +Having a cache with `String` keys and `String` values, the streamer can be started as follows +[tabs] +-- +tab:Java[] +[source,java] +---- +KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>(); + +IgniteDataStreamer<String, String> stmr = ignite.dataStreamer("myCache")); + +// allow overwriting cache data +stmr.allowOverwrite(true); + +kafkaStreamer.setIgnite(ignite); +kafkaStreamer.setStreamer(stmr); + +// set the topic +kafkaStreamer.setTopic(someKafkaTopic); + +// set the number of threads to process Kafka streams +kafkaStreamer.setThreads(4); + +// set Kafka consumer configurations +kafkaStreamer.setConsumerConfig(kafkaConsumerConfig); + +// set extractor +kafkaStreamer.setSingleTupleExtractor(strExtractor); + +kafkaStreamer.start(); + +... + +// stop on shutdown +kafkaStreamer.stop(); + +strm.close(); +---- +-- + +For the detailed information on Kafka consumer properties, refer http://kafka.apache.org/documentation.html diff --git a/docs/_docs/extensions-and-integrations/streaming/mqtt-streamer.adoc b/docs/_docs/extensions-and-integrations/streaming/mqtt-streamer.adoc new file mode 100644 index 0000000..9c792da --- /dev/null +++ b/docs/_docs/extensions-and-integrations/streaming/mqtt-streamer.adoc @@ -0,0 +1,62 @@ += MQTT Streamer + +== Overview + +This streamer consumes from an MQTT topic and feeds key-value pairs into an `IgniteDataStreamer` instance, using +https://eclipse.org/paho/[Eclipse Paho, window=_blank] as an MQTT client. + +You need to provide a stream tuple extractor (either a single-entry or multiple-entries extractor) to process the incoming +message and extract the tuple to insert. + +This streamer supports: + +* Subscribing to a single topic or multiple topics at once. +* Specifying the subscriber's QoS for a single topic or for multiple topics. +* Setting https://www.eclipse.org/paho/files/javadoc/org/eclipse/paho/client/mqttv3/MqttConnectOptions.html[MqttConnectOptions, window=_blank] +to enable features like _last will testament_, _persistent sessions_, etc. +* Specifying the client ID. A random one will be generated and maintained throughout reconnections if the user does not provide one. +* (Re-)Connection retries powered by the https://github.com/rholder/guava-retrying[guava-retrying library, window=_blank]. +_Retry wait_ and _retry stop_ policies can be configured. +* Blocking the start() method until the client is connected for the first time. + +== Example + +Here's a trivial code sample showing how to use this streamer: + +[tabs] +-- +tab:Java[] +[source,java] +---- +// Start Ignite. +Ignite ignite = Ignition.start(); + +// Get a data streamer reference. +IgniteDataStreamer<Integer, String> dataStreamer = grid().dataStreamer("mycache"); + +// Create an MQTT data streamer +MqttStreamer<Integer, String> streamer = new MqttStreamer<>(); +streamer.setIgnite(ignite); +streamer.setStreamer(dataStreamer); +streamer.setBrokerUrl(brokerUrl); +streamer.setBlockUntilConnected(true); + +// Set a single tuple extractor to extract items in the format 'key,value' where key => Int, and value => String +// (using Guava here). +streamer.setSingleTupleExtractor(new StreamSingleTupleExtractor<MqttMessage, Integer, String>() { + @Override public Map.Entry<Integer, String> extract(MqttMessage msg) { + List<String> s = Splitter.on(",").splitToList(new String(msg.getPayload())); + + return new GridMapEntry<>(Integer.parseInt(s.get(0)), s.get(1)); + } +}); + +// Consume from multiple topics at once. +streamer.setTopics(Arrays.asList("def", "ghi", "jkl", "mno")); + +// Start the MQTT Streamer. +streamer.start(); +---- +-- + +Refer to the Javadocs of the `ignite-mqtt` module for more info on the available options. diff --git a/docs/_docs/extensions-and-integrations/streaming/rocketmq-streamer.adoc b/docs/_docs/extensions-and-integrations/streaming/rocketmq-streamer.adoc new file mode 100644 index 0000000..a2781a9 --- /dev/null +++ b/docs/_docs/extensions-and-integrations/streaming/rocketmq-streamer.adoc @@ -0,0 +1,71 @@ += RocketMQ Streamer + +This streamer module provides streaming from https://github.com/apache/incubator-rocketmq[Apache RocketMQ, window=_blank] +to Ignite. + +To use Ignite RocketMQ Streamer module + +. Import it to your Maven project. If you are using Maven to manage dependencies of your project, you can add an Ignite +RocketMQ module dependency like this (replace `${ignite.version}` with actual Ignite version you are interested in): ++ +[tabs] +-- +tab:pom.xml[] +[source,xml] +---- +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 + http://maven.apache.org/xsd/maven-4.0.0.xsd"> + ... + <dependencies> + ... + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-rocketmq</artifactId> + <version>${ignite.version}</version> + </dependency> + ... + </dependencies> + ... +</project> +---- +-- + +. Implement either `StreamSingleTupleExtractor` or `StreamMultipleTupleExtractor` for the streamer (shown +as `MyTupleExtractor` in the code sample below). For a simple implementation, refer to `RocketMQStreamerTest.java`. + +. Initialize and start the streamer ++ +[tabs] +-- +tab:Java[] +[source,java] +---- +IgniteDataStreamer<String, byte[]> dataStreamer = ignite.dataStreamer(MY_CACHE)); + +dataStreamer.allowOverwrite(true); +dataStreamer.autoFlushFrequency(10); + +streamer = new RocketMQStreamer<>(); + +//configure. +streamer.setIgnite(ignite); +streamer.setStreamer(dataStreamer); +streamer.setNameSrvAddr(NAMESERVER_IP_PORT); +streamer.setConsumerGrp(CONSUMER_GRP); +streamer.setTopic(TOPIC_NAME); +streamer.setMultipleTupleExtractor(new MyTupleExtractor()); + +streamer.start(); + +... + +// stop on shutdown +streamer.stop(); + +dataStreamer.close(); +---- +-- + +Refer to the Javadocs for more info on the available options. diff --git a/docs/_docs/extensions-and-integrations/streaming/storm-streamer.adoc b/docs/_docs/extensions-and-integrations/streaming/storm-streamer.adoc new file mode 100644 index 0000000..1eb3a2a --- /dev/null +++ b/docs/_docs/extensions-and-integrations/streaming/storm-streamer.adoc @@ -0,0 +1,48 @@ += Apache Storm Streamer + +Apache Ignite Storm Streamer module provides streaming via http://storm.apache.org/[Storm, window=_blank] to Ignite. + +Starting data transfer to Ignite can be done with the following steps. + +. Import Ignite Storm Streamer Module In Maven Project. If you are using Maven to manage dependencies of your project, +you can add Storm module dependency like this (replace `${ignite.version}` with actual Ignite version you are interested in): ++ +[tabs] +-- +tab:pom.xml[] +[source,xml] +---- +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 + http://maven.apache.org/xsd/maven-4.0.0.xsd"> + ... + <dependencies> + ... + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-storm</artifactId> + <version>${ignite.version}</version> + </dependency> + ... + </dependencies> + ... +</project> +---- +-- + +. Create an Ignite configuration file (see `example-ignite.xml` in `modules/storm/src/test/resources/example-ignite.xml`) +and make sure it is accessible from the streamer. +. Make sure your key-value data input to the streamer is specified with the field named `ignite` (or a different one you +configure with `StormStreamer.setIgniteTupleField(...)`). +See TestStormSpout.declareOutputFields(...) for an example. +. Create a topology with the streamer, make a jar file with all dependencies and run the following ++ +[tabs] +-- +tab:Shell[] +[source,shell] +---- +storm jar ignite-storm-streaming-jar-with-dependencies.jar my.company.ignite.MyStormTopology +---- +-- diff --git a/docs/_docs/extensions-and-integrations/streaming/twitter-streamer.adoc b/docs/_docs/extensions-and-integrations/streaming/twitter-streamer.adoc new file mode 100644 index 0000000..aca8fc9 --- /dev/null +++ b/docs/_docs/extensions-and-integrations/streaming/twitter-streamer.adoc @@ -0,0 +1,51 @@ += Twitter Streamer + +Ignite Twitter Streamer module consumes tweets from Twitter and feeds the transformed key-value pairs `<tweetId, text>` into Ignite. + +To stream data from Twitter into Ignite, you need to: + +. Import Ignite Twitter Module with Maven and replace `${ignite.version}` with the actual Ignite version you are interested in. ++ +[tabs] +-- +tab:pom.xml[] +[source,xml] +---- +<dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-twitter</artifactId> + <version>${ignite.version}</version> +</dependency> +---- +-- + +. In your code, set the necessary parameters and start the streamer, like so: ++ +[tabs] +-- +tab:Java[] +[source,java] +---- +IgniteDataStreamer dataStreamer = ignite.dataStreamer("myCache"); +dataStreamer.allowOverwrite(true); +dataStreamer.autoFlushFrequency(10); + +OAuthSettings oAuthSettings = new OAuthSettings("setting1", "setting2", "setting3", "setting4"); + +TwitterStreamer<Integer, String> streamer = new TwitterStreamer<>(oAuthSettings); +streamer.setIgnite(ignite); +streamer.setStreamer(dataStreamer); + +Map<String, String> params = new HashMap<>(); +params.put("track", "apache, twitter"); +params.put("follow", "3004445758"); + +streamer.setApiParams(params);// Twitter Streaming API params. +streamer.setEndpointUrl(endpointUrl);// Twitter streaming API endpoint. +streamer.setThreadsCount(8); + +streamer.start(); +---- +-- + +Refer to https://dev.twitter.com/streaming/overview[Twitter streaming API, window=_blank] documentation for more information on various streaming parameters. diff --git a/docs/_docs/extensions-and-integrations/streaming/zeromq-streamer.adoc b/docs/_docs/extensions-and-integrations/streaming/zeromq-streamer.adoc new file mode 100644 index 0000000..0e6e6cf --- /dev/null +++ b/docs/_docs/extensions-and-integrations/streaming/zeromq-streamer.adoc @@ -0,0 +1,53 @@ += ZeroMQ Streamer + +Apache Ignite ZeroMQ Streamer module enables streaming capabilities via http://zeromq.org/[ZeroMQ, window=_blank] into Ignite. + +To start streaming into Ignite, you need to do the following: + +. Add Ignite ZeroMQ Streamer Module to your Maven `pom.xml` file. ++ +[tabs] +-- +tab:pom.xml[] +[source,xml] +---- +<dependencies> + ... + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-zeromq</artifactId> + <version>${ignite.version}</version> + </dependency> + ... +</dependencies> +---- +-- + +. Implement either the https://github.com/apache/ignite/blob/f2f82f09b35368f25e136c9fce5e7f2198a91171/modules/core/src/main/java/org/apache/ignite/stream/StreamSingleTupleExtractor.java[StreamSingleTupleExtractor, window=_blank] or +the https://github.com/apache/ignite/blob/f2f82f09b35368f25e136c9fce5e7f2198a91171/modules/core/src/main/java/org/apache/ignite/stream/StreamMultipleTupleExtractor.java[StreamMultipleTupleExtractor, window=_blank] for ZeroMQ streamer. +Refer to https://github.com/apache/ignite/blob/7492843ad9e22c91764fb8d0c3a096b8ce6c653e/modules/zeromq/src/test/java/org/apache/ignite/stream/zeromq/ZeroMqStringSingleTupleExtractor.java[this sample implementation, window=_blank] for more details. +. Set the extractor and initiate the streaming as shown below: ++ +[tabs] +-- +tab:Java[] +[source,java] +---- +try (IgniteDataStreamer<Integer, String> dataStreamer = + grid().dataStreamer("myCacheName")) { + + dataStreamer.allowOverwrite(true); + dataStreamer.autoFlushFrequency(1); + + try (IgniteZeroMqStreamer streamer = new IgniteZeroMqStreamer( + 1, ZeroMqTypeSocket.PULL, "tcp://localhost:5671", null)) { + streamer.setIgnite(grid()); + streamer.setStreamer(dataStreamer); + + streamer.setSingleTupleExtractor(new ZeroMqStringSingleTupleExtractor()); + + streamer.start(); + } +} +---- +-- diff --git a/docs/_docs/images/integrations/camel-streamer.png b/docs/_docs/images/integrations/camel-streamer.png new file mode 100644 index 0000000..cff36dc Binary files /dev/null and b/docs/_docs/images/integrations/camel-streamer.png differ