I am testing with Kafka 0.8 beta and having problem of receiving message in consumer. There is no error so does anyone have any insights. When I commented out the "compression.code" everything works fine.
My producer: public class TestKafka08Prod { public static void main(String [] args) { Producer<Integer, String> producer = null; try { Properties props = new Properties(); props.put("metadata.broker.list", "localhost:9092"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("producer.type", "sync"); props.put("request.required.acks","1"); props.put("compression.codec", "gzip"); ProducerConfig config = new ProducerConfig(props); producer = new Producer<Integer, String>(config); int j=0; for(int i=0; i<10; i++) { KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>("test-topic", "test-message: "+i+" "+System.currentTimeMillis()); producer.send(data); } } catch (Exception e) { System.out.println("Error happened: "); e.printStackTrace(); } finally { if(null != null) { producer.close(); } System.out.println("Ened of Sending"); } System.exit(0); } } My consumer: public class TestKafka08Consumer { public static void main(String [] args) throws UnknownHostException, SocketException { Properties props = new Properties(); props.put("zookeeper.connect", "localhost:2181/kafka_0_8"); props.put("group.id", "test08ConsumerId"); props.put("zk.sessiontimeout.ms", "4000"); props.put("zk.synctime.ms", "2000"); props.put("autocommit.interval.ms", "1000"); ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); String topic = "test-topic"; Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); int counter=0; while(it.hasNext()) { try { String fromPlatform = new String(it.next().message()); System.out.println("The messages: "+fromPlatform); } catch(Exception e) { e.printStackTrace(); } } System.out.println("SystemOut"); } } Thanks