Another piece of information, the snappy compression also does not work. Thanks, Scott
On Tue, Jul 9, 2013 at 11:07 AM, Scott Wang < scott.w...@rumbleentertainment.com> wrote: > I just try it and it still not showing up, thanks for looking into this. > > Thanks, > Scott > > > On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao <jun...@gmail.com> wrote: > >> Could you try starting the consumer first (and enable gzip in the >> producer)? >> >> Thanks, >> >> Jun >> >> >> On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang < >> scott.w...@rumbleentertainment.com> wrote: >> >> > No, I did not start the consumer before the producer. I actually >> started >> > the producer first and nothing showed up in the consumer unless I >> commented >> > out this line -- props.put("compression.codec", "gzip"). If I >> commented >> > out the compression codec, everything just works. >> > >> > >> > On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao <jun...@gmail.com> wrote: >> > >> > > Did you start the consumer before the producer? Be default, the >> consumer >> > > gets only the new data? >> > > >> > > Thanks, >> > > >> > > Jun >> > > >> > > >> > > On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang < >> > > scott.w...@rumbleentertainment.com> wrote: >> > > >> > > > I am testing with Kafka 0.8 beta and having problem of receiving >> > message >> > > in >> > > > consumer. There is no error so does anyone have any insights. >> When I >> > > > commented out the "compression.code" everything works fine. >> > > > >> > > > My producer: >> > > > public class TestKafka08Prod { >> > > > >> > > > public static void main(String [] args) { >> > > > >> > > > Producer<Integer, String> producer = null; >> > > > try { >> > > > Properties props = new Properties(); >> > > > props.put("metadata.broker.list", "localhost:9092"); >> > > > props.put("serializer.class", >> > > > "kafka.serializer.StringEncoder"); >> > > > props.put("producer.type", "sync"); >> > > > props.put("request.required.acks","1"); >> > > > props.put("compression.codec", "gzip"); >> > > > ProducerConfig config = new ProducerConfig(props); >> > > > producer = new Producer<Integer, String>(config); >> > > > int j=0; >> > > > for(int i=0; i<10; i++) { >> > > > KeyedMessage<Integer, String> data = new >> > > > KeyedMessage<Integer, String>("test-topic", "test-message: "+i+" >> > > > "+System.currentTimeMillis()); >> > > > producer.send(data); >> > > > >> > > > } >> > > > >> > > > } catch (Exception e) { >> > > > System.out.println("Error happened: "); >> > > > e.printStackTrace(); >> > > > } finally { >> > > > if(null != null) { >> > > > producer.close(); >> > > > } >> > > > >> > > > System.out.println("Ened of Sending"); >> > > > } >> > > > >> > > > System.exit(0); >> > > > } >> > > > } >> > > > >> > > > >> > > > My consumer: >> > > > >> > > > public class TestKafka08Consumer { >> > > > public static void main(String [] args) throws >> > UnknownHostException, >> > > > SocketException { >> > > > >> > > > Properties props = new Properties(); >> > > > props.put("zookeeper.connect", "localhost:2181/kafka_0_8"); >> > > > props.put("group.id", "test08ConsumerId"); >> > > > props.put("zk.sessiontimeout.ms", "4000"); >> > > > props.put("zk.synctime.ms", "2000"); >> > > > props.put("autocommit.interval.ms", "1000"); >> > > > >> > > > ConsumerConfig consumerConfig = new ConsumerConfig(props); >> > > > >> > > > ConsumerConnector consumerConnector = >> > > > kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); >> > > > >> > > > String topic = "test-topic"; >> > > > Map<String, Integer> topicCountMap = new HashMap<String, >> > > > Integer>(); >> > > > topicCountMap.put(topic, new Integer(1)); >> > > > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = >> > > > consumerConnector.createMessageStreams(topicCountMap); >> > > > KafkaStream<byte[], byte[]> stream = >> > > > consumerMap.get(topic).get(0); >> > > > >> > > > ConsumerIterator<byte[], byte[]> it = stream.iterator(); >> > > > >> > > > int counter=0; >> > > > while(it.hasNext()) { >> > > > try { >> > > > String fromPlatform = new >> String(it.next().message()); >> > > > System.out.println("The messages: "+fromPlatform); >> > > > } catch(Exception e) { >> > > > e.printStackTrace(); >> > > > } >> > > > } >> > > > System.out.println("SystemOut"); >> > > > } >> > > > } >> > > > >> > > > >> > > > Thanks >> > > > >> > > >> > >> > >