Added: kafka/site/07/quickstart.html URL: http://svn.apache.org/viewvc/kafka/site/07/quickstart.html?rev=1476865&view=auto ============================================================================== --- kafka/site/07/quickstart.html (added) +++ kafka/site/07/quickstart.html Sun Apr 28 23:50:07 2013 @@ -0,0 +1,310 @@ +<!--#include virtual="includes/header.html" --> + +<h2>Quick Start</h3> + +<h3> Step 1: Download the code </h3> + +<a href="downloads.html" title="Kafka downloads">Download</a> a recent stable release. + +<pre> +<b>> tar xzf kafka-<VERSION>.tgz</b> +<b>> cd kafka-<VERSION></b> +<b>> ./sbt update</b> +<b>> ./sbt package</b> +</pre> + +<h3>Step 2: Start the server</h3> + +Kafka brokers and consumers use this for co-ordination. +<p> +First start the zookeeper server. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node zookeeper instance. + +<pre> +<b>> bin/zookeeper-server-start.sh config/zookeeper.properties</b> +[2010-11-21 23:45:02,335] INFO Reading configuration from: config/zookeeper.properties +... +</pre> + +Now start the Kafka server: +<pre> +<b>> bin/kafka-server-start.sh config/server.properties</b> +jkreps-mn-2:kafka-trunk jkreps$ bin/kafka-server-start.sh config/server.properties +[2010-11-21 23:51:39,608] INFO starting log cleaner every 60000 ms (kafka.log.LogManager) +[2010-11-21 23:51:39,628] INFO connecting to ZK: localhost:2181 (kafka.server.KafkaZooKeeper) +... +</pre> + +<h3>Step 3: Send some messages</h3> + +Kafka comes with a command line client that will take input from standard in and send it out as messages to the Kafka cluster. By default each line will be sent as a separate message. The topic <i>test</i> is created automatically when messages are sent to it. Omitting logging you should see something like this: + +<pre> +> <b>bin/kafka-console-producer.sh --zookeeper localhost:2181 --topic test</b> +This is a message +This is another message +</pre> + +<h3>Step 4: Start a consumer</h3> + +Kafka also has a command line consumer that will dump out messages to standard out. + +<pre> +<b>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning</b> +This is a message +This is another message +</pre> +<p> +If you have each of the above commands running in a different terminal then you should now be able to type messages into the producer terminal and see them appear in the consumer terminal. +</p> +<p> +Both of these command line tools have additional options. Running the command with no arguments will display usage information documenting them in more detail. +</p> + +<h3>Step 5: Write some code</h3> + +Below is some very simple examples of using Kafka for sending messages, more complete examples can be found in the Kafka source code in the examples/ directory. + +<h4>Producer Code</h4> + +<h5>Producer API </h5> + +Here are examples of using the producer API - <code>kafka.producer.Producer<T></code> - + +<ol> +<li>First, start a local instance of the zookeeper server +<pre>./bin/zookeeper-server-start.sh config/zookeeper.properties</pre> +</li> +<li>Next, start a kafka broker +<pre>./bin/kafka-server-start.sh config/server.properties</pre> +</li> +<li>Now, create the producer with all configuration defaults and use zookeeper based broker discovery. +<pre> +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import kafka.javaapi.producer.SyncProducer; +import kafka.javaapi.message.ByteBufferMessageSet; +import kafka.message.Message; +import kafka.producer.SyncProducerConfig; + +... + +Properties props = new Properties(); +props.put(âzk.connectâ, â127.0.0.1:2181â); +props.put("serializer.class", "kafka.serializer.StringEncoder"); +ProducerConfig config = new ProducerConfig(props); +Producer<String, String> producer = new Producer<String, String>(config); +</pre> +</li> +<li>Send a single message +<pre> +<small>// The message is sent to a randomly selected partition registered in ZK</small> +ProducerData<String, String> data = new ProducerData<String, String>("test-topic", "test-message"); +producer.send(data); +</pre> +</li> +<li>Send multiple messages to multiple topics in one request +<pre> +List<String> messages = new java.util.ArrayList<String>(); +messages.add("test-message1"); +messages.add("test-message2"); +ProducerData<String, String> data1 = new ProducerData<String, String>("test-topic1", messages); +ProducerData<String, String> data2 = new ProducerData<String, String>("test-topic2", messages); +List<ProducerData<String, String>> dataForMultipleTopics = new ArrayList<ProducerData<String, String>>(); +dataForMultipleTopics.add(data1); +dataForMultipleTopics.add(data2); +producer.send(dataForMultipleTopics); +</pre> +</li> +<li>Send a message with a partition key. Messages with the same key are sent to the same partition +<pre> +ProducerData<String, String> data = new ProducerData<String, String>("test-topic", "test-key", "test-message"); +producer.send(data); +</pre> +</li> +<li>Use your custom partitioner +<p>If you are using zookeeper based broker discovery, <code>kafka.producer.Producer<T></code> routes your data to a particular broker partition based on a <code>kafka.producer.Partitioner<T></code>, specified through the <code>partitioner.class</code> config parameter. It defaults to <code>kafka.producer.DefaultPartitioner</code>. If you don't supply a partition key, then it sends each request to a random broker partition.</p> +<pre> +class MemberIdPartitioner extends Partitioner[MemberIdLocation] { + def partition(data: MemberIdLocation, numPartitions: Int): Int = { + (data.location.hashCode % numPartitions) + } +} +<small>// create the producer config to plug in the above partitioner</small> +Properties props = new Properties(); +props.put(âzk.connectâ, â127.0.0.1:2181â); +props.put("serializer.class", "kafka.serializer.StringEncoder"); +props.put("partitioner.class", "xyz.MemberIdPartitioner"); +ProducerConfig config = new ProducerConfig(props); +Producer<String, String> producer = new Producer<String, String>(config); +</pre> +</li> +<li>Use custom Encoder +<p>The producer takes in a required config parameter <code>serializer.class</code> that specifies an <code>Encoder<T></code> to convert T to a Kafka Message. Default is the no-op kafka.serializer.DefaultEncoder. +Here is an example of a custom Encoder -</p> +<pre> +class TrackingDataSerializer extends Encoder<TrackingData> { + <small>// Say you want to use your own custom Avro encoding</small> + CustomAvroEncoder avroEncoder = new CustomAvroEncoder(); + def toMessage(event: TrackingData):Message = { + new Message(avroEncoder.getBytes(event)); + } +} +</pre> +If you want to use the above Encoder, pass it in to the "serializer.class" config parameter +<pre> +Properties props = new Properties(); +props.put("serializer.class", "xyz.TrackingDataSerializer"); +</pre> +</li> +<li>Using static list of brokers, instead of zookeeper based broker discovery +<p>Some applications would rather not depend on zookeeper. In that case, the config parameter <code>broker.list</code> +can be used to specify the list of all brokers in the Kafka cluster.- the list of all brokers in your Kafka cluster in the following format - +<code>broker_id1:host1:port1, broker_id2:host2:port2...</code></p> +<pre> +<small>// you can stop the zookeeper instance as it is no longer required</small> +./bin/zookeeper-server-stop.sh +<small>// create the producer config object </small> +Properties props = new Properties(); +props.put(âbroker.listâ, â0:localhost:9092â); +props.put("serializer.class", "kafka.serializer.StringEncoder"); +ProducerConfig config = new ProducerConfig(props); +<small>// send a message using default partitioner </small> +Producer<String, String> producer = new Producer<String, String>(config); +List<String> messages = new java.util.ArrayList<String>(); +messages.add("test-message"); +ProducerData<String, String> data = new ProducerData<String, String>("test-topic", messages); +producer.send(data); +</pre> +</li> +<li>Use the asynchronous producer along with GZIP compression. This buffers writes in memory until either <code>batch.size</code> or <code>queue.time</code> is reached. After that, data is sent to the Kafka brokers +<pre> +Properties props = new Properties(); +props.put("zk.connect"â "127.0.0.1:2181"); +props.put("serializer.class", "kafka.serializer.StringEncoder"); +props.put("producer.type", "async"); +props.put("compression.codec", "1"); +ProducerConfig config = new ProducerConfig(props); +Producer<String, String> producer = new Producer<String, String>(config); +ProducerData<String, String> data = new ProducerData<String, String>("test-topic", "test-message"); +producer.send(data); +</pre +</li> +<li>Finally, the producer should be closed, through +<pre>producer.close();</pre> +</li> +</ol> + +<h5>Log4j appender </h5> + +Data can also be produced to a Kafka server in the form of a log4j appender. In this way, minimal code needs to be written in order to send some data across to the Kafka server. +Here is an example of how to use the Kafka Log4j appender - + +Start by defining the Kafka appender in your log4j.properties file. +<pre> +<small>// define the kafka log4j appender config parameters</small> +log4j.appender.KAFKA=kafka.producer.KafkaLog4jAppender +<small>// <b>REQUIRED</b>: set the hostname of the kafka server</small> +log4j.appender.KAFKA.Host=localhost +<small>// <b>REQUIRED</b>: set the port on which the Kafka server is listening for connections</small> +log4j.appender.KAFKA.Port=9092 +<small>// <b>REQUIRED</b>: the topic under which the logger messages are to be posted</small> +log4j.appender.KAFKA.Topic=test +<small>// the serializer to be used to turn an object into a Kafka message. Defaults to kafka.producer.DefaultStringEncoder</small> +log4j.appender.KAFKA.Serializer=kafka.test.AppenderStringSerializer +<small>// do not set the above KAFKA appender as the root appender</small> +log4j.rootLogger=INFO +<small>// set the logger for your package to be the KAFKA appender</small> +log4j.logger.your.test.package=INFO, KAFKA +</pre> + +Data can be sent using a log4j appender as follows - + +<pre> +Logger logger = Logger.getLogger([your.test.class]) +logger.info("message from log4j appender"); +</pre> + +If your log4j appender fails to send messages, please verify that the correct +log4j properties file is being used. You can add +<code>-Dlog4j.debug=true</code> to your VM parameters to verify this. + +<h4>Consumer Code</h4> + +The consumer code is slightly more complex as it enables multithreaded consumption: + +<pre> +// specify some consumer properties +Properties props = new Properties(); +props.put("zk.connect", "localhost:2181"); +props.put("zk.connectiontimeout.ms", "1000000"); +props.put("groupid", "test_group"); + +// Create the connection to the cluster +ConsumerConfig consumerConfig = new ConsumerConfig(props); +ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); + +// create 4 partitions of the stream for topic âtestâ, to allow 4 threads to consume +Map<String, List<KafkaStream<Message>>> topicMessageStreams = + consumerConnector.createMessageStreams(ImmutableMap.of("test", 4)); +List<KafkaStream<Message>> streams = topicMessageStreams.get("test"); + +// create list of 4 threads to consume from each of the partitions +ExecutorService executor = Executors.newFixedThreadPool(4); + +// consume the messages in the threads +for(final KafkaStream<Message> stream: streams) { + executor.submit(new Runnable() { + public void run() { + for(MessageAndMetadata msgAndMetadata: stream) { + // process message (msgAndMetadata.message()) + } + } + }); +} +</pre> + +<h4>Hadoop Consumer</h4> + +<p> +Providing a horizontally scalable solution for aggregating and loading data into Hadoop was one of our basic use cases. To support this use case, we provide a Hadoop-based consumer which spawns off many map tasks to pull data from the Kafka cluster in parallel. This provides extremely fast pull-based Hadoop data load capabilities (we were able to fully saturate the network with only a handful of Kafka servers). +</p> + +<p> +Usage information on the hadoop consumer can be found <a href="https://github.com/kafka-dev/kafka/tree/master/contrib/hadoop-consumer">here</a>. +</p> + +<h4>Simple Consumer</h4> + +Kafka has a lower-level consumer api for reading message chunks directly from servers. Under most circumstances this should not be needed. But just in case, it's usage is as follows: + +<pre> +import kafka.api.FetchRequest; +import kafka.javaapi.consumer.SimpleConsumer; +import kafka.javaapi.message.ByteBufferMessageSet; +import kafka.message.Message; +import kafka.message.MessageSet; +import kafka.utils.Utils; + +... + +<small>// create a consumer to connect to the kafka server running on localhost, port 9092, socket timeout of 10 secs, socket receive buffer of ~1MB</small> +SimpleConsumer consumer = new SimpleConsumer("127.0.0.1", 9092, 10000, 1024000); + +long offset = 0; +while (true) { + <small>// create a fetch request for topic âtestâ, partition 0, current offset, and fetch size of 1MB</small> + FetchRequest fetchRequest = new FetchRequest("test", 0, offset, 1000000); + + <small>// get the message set from the consumer and print them out</small> + ByteBufferMessageSet messages = consumer.fetch(fetchRequest); + for(MessageAndOffset msg : messages) { + System.out.println("consumed: " + Utils.toString(msg.message.payload(), "UTF-8")); + <small>// advance the offset after consuming each message</small> + offset = msg.offset; + } +} +</pre> + +<!--#include virtual="includes/footer.html" --> +
Modified: kafka/site/includes/header.html URL: http://svn.apache.org/viewvc/kafka/site/includes/header.html?rev=1476865&r1=1476864&r2=1476865&view=diff ============================================================================== --- kafka/site/includes/header.html (original) +++ kafka/site/includes/header.html Sun Apr 28 23:50:07 2013 @@ -26,12 +26,10 @@ <div class="lsidebar"> <ul> <li><a href="downloads.html">download</a></li> - <li><a href="http://people.apache.org/~joestein/kafka-0.7.1-incubating-docs">api docs</a></li> - <li><a href="quickstart.html">quickstart</a></li> - <li><a href="https://cwiki.apache.org/confluence/display/KAFKA/Clients">clients</a></li> - <li><a href="design.html">design</a></li> - <li><a href="configuration.html">configuration</a></li> - <li><a href="performance.html">performance</a></li> + <li>releases</li> + <ul> + <li><a href="07/quickstart.html">0.7</a></li> + </ul> <li><a href="https://cwiki.apache.org/confluence/display/KAFKA/Operations">operations</a></li> <li><a href="faq.html">faq</a></li> <li><a href="https://cwiki.apache.org/confluence/display/KAFKA">wiki</li> Modified: kafka/site/index.html URL: http://svn.apache.org/viewvc/kafka/site/index.html?rev=1476865&r1=1476864&r2=1476865&view=diff ============================================================================== --- kafka/site/index.html (original) +++ kafka/site/index.html Sun Apr 28 23:50:07 2013 @@ -14,7 +14,7 @@ Kafka provides a publish-subscribe solut </p> <p> -The use for activity stream processing makes Kafka comparable to <a href="https://github.com/facebook/scribe">Facebook's Scribe</a> or <a href="http://flume.apache.org">Apache Flume</a> (incubating), though the architecture and primitives are very different for these systems and make Kafka more comparable to a traditional messaging system. See our <a href="design.html">design</a> page for more details. +The use for activity stream processing makes Kafka comparable to <a href="https://github.com/facebook/scribe">Facebook's Scribe</a> or <a href="http://flume.apache.org">Apache Flume</a> (incubating), though the architecture and primitives are very different for these systems and make Kafka more comparable to a traditional messaging system. </p> <!--#include virtual="includes/footer.html" --> Modified: kafka/site/projects.html URL: http://svn.apache.org/viewvc/kafka/site/projects.html?rev=1476865&r1=1476864&r2=1476865&view=diff ============================================================================== --- kafka/site/projects.html (original) +++ kafka/site/projects.html Sun Apr 28 23:50:07 2013 @@ -35,7 +35,7 @@ Below is a list of projects which would <h3>Clients In Other Languages</h3> <p> -We offer a JVM-based client for production and consumption and also a rather primitive native python client. It would be great to improve this list. The lower-level protocols are well documented <a href="design.html">here</a> and should be relatively easy to implement in any language that supports standard socket I/O. +We offer a JVM-based client for production and consumption. It would be great to implement the client in other languages. </p> <h3>Convert Hadoop InputFormat or OutputFormat to Scala</h3>