I am testing with Kafka 0.8 beta and having problem of receiving message in
consumer.  There is no error so does anyone have any insights.  When I
commented out the "compression.code" everything works fine.

My producer:
public class TestKafka08Prod {

    public static void main(String [] args) {

        Producer<Integer, String> producer = null;
        try {
            Properties props = new Properties();
            props.put("metadata.broker.list", "localhost:9092");
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            props.put("producer.type", "sync");
            props.put("request.required.acks","1");
            props.put("compression.codec", "gzip");
            ProducerConfig config = new ProducerConfig(props);
            producer = new Producer<Integer, String>(config);
            int j=0;
            for(int i=0; i<10; i++) {
                KeyedMessage<Integer, String> data = new
KeyedMessage<Integer, String>("test-topic", "test-message: "+i+"
"+System.currentTimeMillis());
                producer.send(data);

            }

        } catch (Exception e) {
            System.out.println("Error happened: ");
            e.printStackTrace();
        } finally {
            if(null != null) {
                producer.close();
            }

            System.out.println("Ened of Sending");
        }

        System.exit(0);
    }
}


My consumer:

public class TestKafka08Consumer {
    public static void main(String [] args) throws UnknownHostException,
SocketException {

        Properties props = new Properties();
        props.put("zookeeper.connect", "localhost:2181/kafka_0_8");
        props.put("group.id", "test08ConsumerId");
        props.put("zk.sessiontimeout.ms", "4000");
        props.put("zk.synctime.ms", "2000");
        props.put("autocommit.interval.ms", "1000");

        ConsumerConfig consumerConfig = new ConsumerConfig(props);

        ConsumerConnector consumerConnector =
kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);

        String topic = "test-topic";
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumerConnector.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);

        ConsumerIterator<byte[], byte[]> it = stream.iterator();

        int counter=0;
        while(it.hasNext()) {
            try {
                String fromPlatform = new String(it.next().message());
                System.out.println("The messages: "+fromPlatform);
            } catch(Exception e) {
                e.printStackTrace();
            }
        }
        System.out.println("SystemOut");
    }
}


Thanks

Reply via email to