Hello, I am working on experimental project on message distribution and load balancing across cluster using Apache Kafka and Zookeeper. The goal of the project is to equally distribute the messages to the cluster for concurrent processing.
For example; the server cluster contains 3 servers namely kafkaserver1, kafkaserver2, kafkaserver3. When the producer sends the 300 number of messages to particular topic (demo), I expect each servers should get 100 messages each. The project setup - Started Kafka and Zookeeper process on 3 servers - Started 3 Consumer client connections and listening for messages, ex: client 1 connects to Kafkaserver1, client2 connects to Kafkaserver2, client 3 connected to Kafkaserver3. - Started Producer which will push messages to Zookeeper cluster. The code as follows: Sample Producer.java System.out.println("ProdMain - starting.."); Properties props = new Properties(); String broker = "kafkaserver1:2181,kafkaserver2:2181,kafkaserver3:2181"; props.put("zk.connect", broker); props.put("zk.connectiontimeout.ms", "1000000"); props.put("zk.sessiontimeout.ms", "1000000"); props.put("partitioner.class", "com.esg.ganges.kafka.MemberIdPartitioner"); props.put("serializer.class", StringEncoder.class.getName()); System.out.println("ProdMain - Initializing.."); ProducerConfig config = new ProducerConfig(props); System.out.println("ProdMain - con time: " + config.getZkConnectionTimeoutMs()); System.out.println("ProdMain - Producer:start"); Producer<String, String> producer = new Producer<String, String>(config); System.out.println("ProdMain - Creating the data"); StringProducerData prodData = new StringProducerData("demo"); System.out.println("ProdMain - Start sending messages..."); try { long start = System.currentTimeMillis(); prodData.add("Hello world"); for (int i = 0; i < Integer.MAX_VALUE; i++) { producer.send(prodData); } long cost = System.currentTimeMillis() - start; System.out.println("send message cost: "+cost+" ms"); } finally { producer.close(); } System.out.println("ProdMain - End"); Consumer.java Properties props = new Properties(); String broker = args[0]+":2181"; System.out.println("Connecting to the Server:" + broker); props.put("zk.connect", broker); props.put("groupid", "test_group"); // ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector connector = Consumer.create(consumerConfig); // Map<String, List<MessageStream<String>>> topicMessageStreams = connector.createMessageStreams(ImmutableMap.of("demo", 2), new StringDecoder()); List<MessageStream<String>> streams = topicMessageStreams.get("demo"); // ExecutorService executor = Executors.newFixedThreadPool(2); final AtomicInteger count = new AtomicInteger(); for (final MessageStream<String> stream : streams) { executor.submit(new Runnable() { public void run() { for (String message : stream) { System.out.println(count.incrementAndGet() + " => " + message); } } }); } // System.out.println("Connected to the broker:" + broker + ", waiting for messages.."); executor.awaitTermination(1, TimeUnit.HOURS); } public class MemberIdPartitioner implements Partitioner{ public MemberIdPartitioner() { System.out.println("Initialized: MemberIdPartitioner.."); } @Override public int partition(Object arg0, int numberOfPartitions) { System.out.println("Type: " + arg0.getClass().getName()); // TODO Auto-generated method stub return 3; } } The behavior I see in this implementation is all the messages been consumed by only single consumer. When one of the consumer is shutdown; the next consumer gets activated and see messages consumed. My experiment is to distribute messages equally across all the three servers. Please do let me know if I am doing something wrong. Thank you, Vish