Weird - I tried your exact code and it worked for me (although I was
using 0.8 head and not the beta). Can you re-run with trace logs
enabled in your producer and paste that output? Broker logs also if
you can?

Thanks,

Joel

On Wed, Jul 10, 2013 at 10:23 AM, Scott Wang
<scott.w...@rumbleentertainment.com> wrote:
> Jun,
>
> I did a test this morning and got a very interesting result with you
> command.  I started by wipe all the log files and clean up all zookeeper
> data files.
>
> Once I restarted both server, producer and consumer then execute your
> command, what I got is a empty log as following:
>
> Dumping /Users/scott/Temp/kafka/test-topic-0/00000000000000000000.log
> Starting offset: 0
>
> One observation, the 00000000000000000000.index file was getting huge but
> there was nothing in 00000000000000000000.log file.
>
> Thanks,
> Scott
>
>
>
>
> On Tue, Jul 9, 2013 at 8:40 PM, Jun Rao <jun...@gmail.com> wrote:
>
>> Could you run the following command on one of the log files of your topic
>> and attach the output?
>>
>> bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
>> /tmp/kafka-logs/testtopic-0/00000000000000000000.log
>>
>> Thanks,
>>
>> Jun
>>
>>
>> On Tue, Jul 9, 2013 at 3:23 PM, Scott Wang <
>> scott.w...@rumbleentertainment.com> wrote:
>>
>> > Another piece of information, the snappy compression also does not work.
>> >
>> > Thanks,
>> > Scott
>> >
>> >
>> > On Tue, Jul 9, 2013 at 11:07 AM, Scott Wang <
>> > scott.w...@rumbleentertainment.com> wrote:
>> >
>> > > I just try it and it still not showing up, thanks for looking into
>> this.
>> > >
>> > > Thanks,
>> > > Scott
>> > >
>> > >
>> > > On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao <jun...@gmail.com> wrote:
>> > >
>> > >> Could you try starting the consumer first (and enable gzip in the
>> > >> producer)?
>> > >>
>> > >> Thanks,
>> > >>
>> > >> Jun
>> > >>
>> > >>
>> > >> On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang <
>> > >> scott.w...@rumbleentertainment.com> wrote:
>> > >>
>> > >> > No, I did not start the consumer before the producer.  I actually
>> > >> started
>> > >> > the producer first and nothing showed up in the consumer unless I
>> > >> commented
>> > >> > out this line -- props.put("compression.codec", "gzip").    If I
>> > >> commented
>> > >> > out the compression codec, everything just works.
>> > >> >
>> > >> >
>> > >> > On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao <jun...@gmail.com> wrote:
>> > >> >
>> > >> > > Did you start the consumer before the producer? Be default, the
>> > >> consumer
>> > >> > > gets only the new data?
>> > >> > >
>> > >> > > Thanks,
>> > >> > >
>> > >> > > Jun
>> > >> > >
>> > >> > >
>> > >> > > On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang <
>> > >> > > scott.w...@rumbleentertainment.com> wrote:
>> > >> > >
>> > >> > > > I am testing with Kafka 0.8 beta and having problem of receiving
>> > >> > message
>> > >> > > in
>> > >> > > > consumer.  There is no error so does anyone have any insights.
>> > >>  When I
>> > >> > > > commented out the "compression.code" everything works fine.
>> > >> > > >
>> > >> > > > My producer:
>> > >> > > > public class TestKafka08Prod {
>> > >> > > >
>> > >> > > >     public static void main(String [] args) {
>> > >> > > >
>> > >> > > >         Producer<Integer, String> producer = null;
>> > >> > > >         try {
>> > >> > > >             Properties props = new Properties();
>> > >> > > >             props.put("metadata.broker.list", "localhost:9092");
>> > >> > > >             props.put("serializer.class",
>> > >> > > > "kafka.serializer.StringEncoder");
>> > >> > > >             props.put("producer.type", "sync");
>> > >> > > >             props.put("request.required.acks","1");
>> > >> > > >             props.put("compression.codec", "gzip");
>> > >> > > >             ProducerConfig config = new ProducerConfig(props);
>> > >> > > >             producer = new Producer<Integer, String>(config);
>> > >> > > >             int j=0;
>> > >> > > >             for(int i=0; i<10; i++) {
>> > >> > > >                 KeyedMessage<Integer, String> data = new
>> > >> > > > KeyedMessage<Integer, String>("test-topic", "test-message: "+i+"
>> > >> > > > "+System.currentTimeMillis());
>> > >> > > >                 producer.send(data);
>> > >> > > >
>> > >> > > >             }
>> > >> > > >
>> > >> > > >         } catch (Exception e) {
>> > >> > > >             System.out.println("Error happened: ");
>> > >> > > >             e.printStackTrace();
>> > >> > > >         } finally {
>> > >> > > >             if(null != null) {
>> > >> > > >                 producer.close();
>> > >> > > >             }
>> > >> > > >
>> > >> > > >             System.out.println("Ened of Sending");
>> > >> > > >         }
>> > >> > > >
>> > >> > > >         System.exit(0);
>> > >> > > >     }
>> > >> > > > }
>> > >> > > >
>> > >> > > >
>> > >> > > > My consumer:
>> > >> > > >
>> > >> > > > public class TestKafka08Consumer {
>> > >> > > >     public static void main(String [] args) throws
>> > >> > UnknownHostException,
>> > >> > > > SocketException {
>> > >> > > >
>> > >> > > >         Properties props = new Properties();
>> > >> > > >         props.put("zookeeper.connect",
>> > "localhost:2181/kafka_0_8");
>> > >> > > >         props.put("group.id", "test08ConsumerId");
>> > >> > > >         props.put("zk.sessiontimeout.ms", "4000");
>> > >> > > >         props.put("zk.synctime.ms", "2000");
>> > >> > > >         props.put("autocommit.interval.ms", "1000");
>> > >> > > >
>> > >> > > >         ConsumerConfig consumerConfig = new
>> ConsumerConfig(props);
>> > >> > > >
>> > >> > > >         ConsumerConnector consumerConnector =
>> > >> > > >
>> > kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
>> > >> > > >
>> > >> > > >         String topic = "test-topic";
>> > >> > > >         Map<String, Integer> topicCountMap = new HashMap<String,
>> > >> > > > Integer>();
>> > >> > > >         topicCountMap.put(topic, new Integer(1));
>> > >> > > >         Map<String, List<KafkaStream<byte[], byte[]>>>
>> > consumerMap =
>> > >> > > > consumerConnector.createMessageStreams(topicCountMap);
>> > >> > > >         KafkaStream<byte[], byte[]> stream =
>> > >> > > >  consumerMap.get(topic).get(0);
>> > >> > > >
>> > >> > > >         ConsumerIterator<byte[], byte[]> it = stream.iterator();
>> > >> > > >
>> > >> > > >         int counter=0;
>> > >> > > >         while(it.hasNext()) {
>> > >> > > >             try {
>> > >> > > >                 String fromPlatform = new
>> > >> String(it.next().message());
>> > >> > > >                 System.out.println("The messages:
>> "+fromPlatform);
>> > >> > > >             } catch(Exception e) {
>> > >> > > >                 e.printStackTrace();
>> > >> > > >             }
>> > >> > > >         }
>> > >> > > >         System.out.println("SystemOut");
>> > >> > > >     }
>> > >> > > > }
>> > >> > > >
>> > >> > > >
>> > >> > > > Thanks
>> > >> > > >
>> > >> > >
>> > >> >
>> > >>
>> > >
>> > >
>> >
>>

Reply via email to