Could you remove the following statement and see if it works?
System.out.println("Created iterator " + it.toString() + " thread number "
+ threadNumber);
Thanks,
Jun
On Tue, Aug 27, 2013 at 3:43 PM, David Williams <[email protected]>wrote:
>
> Hi all,
>
> I checked out the java source and looked at the java examples. They
> worked well in my IDE and on the console. However, I also tried the
> threaded example following the consumer group example. The problem is,
> this example is not working and toString on the stream iterator returns the
> words "empty iterator". Below, run2() method is the run method from the
> source code, THAT WORKS. The run() method below is from the Consumer Group
> Example and DOES NOT WORK.
>
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
>
> It simply prints messages like
>
> Created iterator empty iterator thread number 9
> Created iterator empty iterator thread number 1
> Shutting down Thread: 1
> Created iterator empty iterator thread number 3
>
> And continues doing so as I produce message using the console producer and
> does not print messages.
>
>
>
>
> Im not sure if this is a versioning issue, or what might be the cause.
> But help is appreciated!
>
>
>
> Here is the Consumer class:
>
> import kafka.consumer.KafkaStream;
> import kafka.consumer.ConsumerIterator;
>
> public class Consumer implements Runnable {
>
> private KafkaStream kafkaStream;
> private Integer threadNumber;
>
> public Consumer(KafkaStream kafkaStream, Integer threadNumber) {
> this.threadNumber = threadNumber;
> this.kafkaStream = kafkaStream;
> }
>
> public void run() {
> ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
> System.out.println("Created iterator " + it.toString() + " thread
> number " + threadNumber);
> while(it.hasNext()) {
> System.out.println("Thread " + threadNumber + ": " + new
> String(it.next().message()));
>
> // validate
> // enrich
> // dispatch
> }
> System.out.println("Shutting down Thread: " + threadNumber);
> }
>
> }
>
>
>
>
> In my ConsumerThreadPool class:
>
>
> public class ConsumerThreadPool {
>
> private final ConsumerConnector consumer;
> private final String topic;
>
> private ExecutorService executor;
> private static ApplicationContext context = new
> AnnotationConfigApplicationContext(AppConfig.class);
>
> public ConsumerThreadPool(String topic) {
> consumer =
> kafka.consumer.Consumer.createJavaConsumerConnector((ConsumerConfig)context.getBean("consumerConfig"));
> this.topic = topic;
> }
>
> public void shutdown() {
> if (consumer != null) consumer.shutdown();
> if (executor != null) executor.shutdown();
> }
>
> public void run(Integer numThreads) {
> Map<String, Integer> topicCountMap = new HashMap<String,
> Integer>();
>
> topicCountMap.put(topic, new Integer(numThreads));
> Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
> List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
>
> // create threads
> executor = Executors.newFixedThreadPool(numThreads);
>
> // now create an object to consume the messages
> Integer threadNumber = 0;
> for(KafkaStream<byte[], byte[]> stream : streams) {
> executor.submit(new Consumer(stream, threadNumber));
> threadNumber++;
> }
> }
>
>
> public void run2() {
> Map<String, Integer> topicCountMap = new HashMap<String,
> Integer>();
>
> topicCountMap.put(topic, new Integer(1));
>
> Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>
> KafkaStream<byte[], byte[]> stream =
> consumerMap.get(topic).get(0);
> ConsumerIterator<byte[], byte[]> it = stream.iterator();
> while(true) {
> try {
> Thread.sleep(1000);
> } catch (InterruptedException e) {
> e.printStackTrace();
> }
> while(it.hasNext()){
> System.out.println(new String(it.next().message()));
>
> }
> }
> }
>
> }
>
>
>
> The AppConfig is pretty simple:
>
> @Configuration
> @ComponentScan("com.truecar.inventory.worker.core")
> public class AppConfig {
>
> @Bean
> @Named("sharedProducerConsumerConfig")
> private static Properties sharedProducerConsumerConfig() {
> Properties properties = new Properties();
> properties.put("zookeeper.connect", "127.0.0.1:2181");
> properties.put("group.id", "intelligence");
> properties.put("zookeeper.session.timeout.ms", "400");
> properties.put("zookeeper.sync.time.ms", "200");
> properties.put("auto.commit.interval.ms", "1000");
> return properties;
> }
>
> @Bean
> @Named("consumerConfig")
> private static ConsumerConfig consumerConfig() {
> Properties properties = sharedProducerConsumerConfig();
> return new ConsumerConfig(properties);
> }
>
> @Bean
> @Named("producerConfig")
> private static ProducerConfig producerConfig() {
> Properties properties = sharedProducerConsumerConfig();
> properties.put("serializer.class",
> "kafka.serializer.StringEncoder");
> properties.put("metadata.broker.list", "localhost:9092");
> return new ProducerConfig(properties);
> }
>
> }
>
>
> --
>
>