Joel, Would you mind point me to how I would be able to enable the trace logs in the producer and broker?
Thanks, Scott On Wed, Jul 10, 2013 at 5:33 PM, Joel Koshy <jjkosh...@gmail.com> wrote: > Weird - I tried your exact code and it worked for me (although I was > using 0.8 head and not the beta). Can you re-run with trace logs > enabled in your producer and paste that output? Broker logs also if > you can? > > Thanks, > > Joel > > On Wed, Jul 10, 2013 at 10:23 AM, Scott Wang > <scott.w...@rumbleentertainment.com> wrote: > > Jun, > > > > I did a test this morning and got a very interesting result with you > > command. I started by wipe all the log files and clean up all zookeeper > > data files. > > > > Once I restarted both server, producer and consumer then execute your > > command, what I got is a empty log as following: > > > > Dumping /Users/scott/Temp/kafka/test-topic-0/00000000000000000000.log > > Starting offset: 0 > > > > One observation, the 00000000000000000000.index file was getting huge but > > there was nothing in 00000000000000000000.log file. > > > > Thanks, > > Scott > > > > > > > > > > On Tue, Jul 9, 2013 at 8:40 PM, Jun Rao <jun...@gmail.com> wrote: > > > >> Could you run the following command on one of the log files of your > topic > >> and attach the output? > >> > >> bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files > >> /tmp/kafka-logs/testtopic-0/00000000000000000000.log > >> > >> Thanks, > >> > >> Jun > >> > >> > >> On Tue, Jul 9, 2013 at 3:23 PM, Scott Wang < > >> scott.w...@rumbleentertainment.com> wrote: > >> > >> > Another piece of information, the snappy compression also does not > work. > >> > > >> > Thanks, > >> > Scott > >> > > >> > > >> > On Tue, Jul 9, 2013 at 11:07 AM, Scott Wang < > >> > scott.w...@rumbleentertainment.com> wrote: > >> > > >> > > I just try it and it still not showing up, thanks for looking into > >> this. > >> > > > >> > > Thanks, > >> > > Scott > >> > > > >> > > > >> > > On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao <jun...@gmail.com> wrote: > >> > > > >> > >> Could you try starting the consumer first (and enable gzip in the > >> > >> producer)? > >> > >> > >> > >> Thanks, > >> > >> > >> > >> Jun > >> > >> > >> > >> > >> > >> On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang < > >> > >> scott.w...@rumbleentertainment.com> wrote: > >> > >> > >> > >> > No, I did not start the consumer before the producer. I actually > >> > >> started > >> > >> > the producer first and nothing showed up in the consumer unless I > >> > >> commented > >> > >> > out this line -- props.put("compression.codec", "gzip"). If I > >> > >> commented > >> > >> > out the compression codec, everything just works. > >> > >> > > >> > >> > > >> > >> > On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao <jun...@gmail.com> > wrote: > >> > >> > > >> > >> > > Did you start the consumer before the producer? Be default, the > >> > >> consumer > >> > >> > > gets only the new data? > >> > >> > > > >> > >> > > Thanks, > >> > >> > > > >> > >> > > Jun > >> > >> > > > >> > >> > > > >> > >> > > On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang < > >> > >> > > scott.w...@rumbleentertainment.com> wrote: > >> > >> > > > >> > >> > > > I am testing with Kafka 0.8 beta and having problem of > receiving > >> > >> > message > >> > >> > > in > >> > >> > > > consumer. There is no error so does anyone have any > insights. > >> > >> When I > >> > >> > > > commented out the "compression.code" everything works fine. > >> > >> > > > > >> > >> > > > My producer: > >> > >> > > > public class TestKafka08Prod { > >> > >> > > > > >> > >> > > > public static void main(String [] args) { > >> > >> > > > > >> > >> > > > Producer<Integer, String> producer = null; > >> > >> > > > try { > >> > >> > > > Properties props = new Properties(); > >> > >> > > > props.put("metadata.broker.list", > "localhost:9092"); > >> > >> > > > props.put("serializer.class", > >> > >> > > > "kafka.serializer.StringEncoder"); > >> > >> > > > props.put("producer.type", "sync"); > >> > >> > > > props.put("request.required.acks","1"); > >> > >> > > > props.put("compression.codec", "gzip"); > >> > >> > > > ProducerConfig config = new > ProducerConfig(props); > >> > >> > > > producer = new Producer<Integer, String>(config); > >> > >> > > > int j=0; > >> > >> > > > for(int i=0; i<10; i++) { > >> > >> > > > KeyedMessage<Integer, String> data = new > >> > >> > > > KeyedMessage<Integer, String>("test-topic", "test-message: > "+i+" > >> > >> > > > "+System.currentTimeMillis()); > >> > >> > > > producer.send(data); > >> > >> > > > > >> > >> > > > } > >> > >> > > > > >> > >> > > > } catch (Exception e) { > >> > >> > > > System.out.println("Error happened: "); > >> > >> > > > e.printStackTrace(); > >> > >> > > > } finally { > >> > >> > > > if(null != null) { > >> > >> > > > producer.close(); > >> > >> > > > } > >> > >> > > > > >> > >> > > > System.out.println("Ened of Sending"); > >> > >> > > > } > >> > >> > > > > >> > >> > > > System.exit(0); > >> > >> > > > } > >> > >> > > > } > >> > >> > > > > >> > >> > > > > >> > >> > > > My consumer: > >> > >> > > > > >> > >> > > > public class TestKafka08Consumer { > >> > >> > > > public static void main(String [] args) throws > >> > >> > UnknownHostException, > >> > >> > > > SocketException { > >> > >> > > > > >> > >> > > > Properties props = new Properties(); > >> > >> > > > props.put("zookeeper.connect", > >> > "localhost:2181/kafka_0_8"); > >> > >> > > > props.put("group.id", "test08ConsumerId"); > >> > >> > > > props.put("zk.sessiontimeout.ms", "4000"); > >> > >> > > > props.put("zk.synctime.ms", "2000"); > >> > >> > > > props.put("autocommit.interval.ms", "1000"); > >> > >> > > > > >> > >> > > > ConsumerConfig consumerConfig = new > >> ConsumerConfig(props); > >> > >> > > > > >> > >> > > > ConsumerConnector consumerConnector = > >> > >> > > > > >> > kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); > >> > >> > > > > >> > >> > > > String topic = "test-topic"; > >> > >> > > > Map<String, Integer> topicCountMap = new > HashMap<String, > >> > >> > > > Integer>(); > >> > >> > > > topicCountMap.put(topic, new Integer(1)); > >> > >> > > > Map<String, List<KafkaStream<byte[], byte[]>>> > >> > consumerMap = > >> > >> > > > consumerConnector.createMessageStreams(topicCountMap); > >> > >> > > > KafkaStream<byte[], byte[]> stream = > >> > >> > > > consumerMap.get(topic).get(0); > >> > >> > > > > >> > >> > > > ConsumerIterator<byte[], byte[]> it = > stream.iterator(); > >> > >> > > > > >> > >> > > > int counter=0; > >> > >> > > > while(it.hasNext()) { > >> > >> > > > try { > >> > >> > > > String fromPlatform = new > >> > >> String(it.next().message()); > >> > >> > > > System.out.println("The messages: > >> "+fromPlatform); > >> > >> > > > } catch(Exception e) { > >> > >> > > > e.printStackTrace(); > >> > >> > > > } > >> > >> > > > } > >> > >> > > > System.out.println("SystemOut"); > >> > >> > > > } > >> > >> > > > } > >> > >> > > > > >> > >> > > > > >> > >> > > > Thanks > >> > >> > > > > >> > >> > > > >> > >> > > >> > >> > >> > > > >> > > > >> > > >> >