Did you start the consumer before the producer? Be default, the consumer gets only the new data?
Thanks, Jun On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang < scott.w...@rumbleentertainment.com> wrote: > I am testing with Kafka 0.8 beta and having problem of receiving message in > consumer. There is no error so does anyone have any insights. When I > commented out the "compression.code" everything works fine. > > My producer: > public class TestKafka08Prod { > > public static void main(String [] args) { > > Producer<Integer, String> producer = null; > try { > Properties props = new Properties(); > props.put("metadata.broker.list", "localhost:9092"); > props.put("serializer.class", > "kafka.serializer.StringEncoder"); > props.put("producer.type", "sync"); > props.put("request.required.acks","1"); > props.put("compression.codec", "gzip"); > ProducerConfig config = new ProducerConfig(props); > producer = new Producer<Integer, String>(config); > int j=0; > for(int i=0; i<10; i++) { > KeyedMessage<Integer, String> data = new > KeyedMessage<Integer, String>("test-topic", "test-message: "+i+" > "+System.currentTimeMillis()); > producer.send(data); > > } > > } catch (Exception e) { > System.out.println("Error happened: "); > e.printStackTrace(); > } finally { > if(null != null) { > producer.close(); > } > > System.out.println("Ened of Sending"); > } > > System.exit(0); > } > } > > > My consumer: > > public class TestKafka08Consumer { > public static void main(String [] args) throws UnknownHostException, > SocketException { > > Properties props = new Properties(); > props.put("zookeeper.connect", "localhost:2181/kafka_0_8"); > props.put("group.id", "test08ConsumerId"); > props.put("zk.sessiontimeout.ms", "4000"); > props.put("zk.synctime.ms", "2000"); > props.put("autocommit.interval.ms", "1000"); > > ConsumerConfig consumerConfig = new ConsumerConfig(props); > > ConsumerConnector consumerConnector = > kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); > > String topic = "test-topic"; > Map<String, Integer> topicCountMap = new HashMap<String, > Integer>(); > topicCountMap.put(topic, new Integer(1)); > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = > consumerConnector.createMessageStreams(topicCountMap); > KafkaStream<byte[], byte[]> stream = > consumerMap.get(topic).get(0); > > ConsumerIterator<byte[], byte[]> it = stream.iterator(); > > int counter=0; > while(it.hasNext()) { > try { > String fromPlatform = new String(it.next().message()); > System.out.println("The messages: "+fromPlatform); > } catch(Exception e) { > e.printStackTrace(); > } > } > System.out.println("SystemOut"); > } > } > > > Thanks >