After looking at the source code (trunk), I think this is a bug that there
are some cases where KafkaExceptions are not logged in log4j yet before it
pop up to the StreamThread. I will file a JIRA for this issue.


Guozhang

On Thu, May 12, 2016 at 5:57 PM, Greg Fodor <[email protected]> wrote:

> Hey Guozhang, output was silent (since the log4j is streaming it to a
> log file) except for the stack trace landing in stdout, i'm assuming
> from the rethrow:
>
> Exception in thread "StreamThread-9"
> org.apache.kafka.common.errors.SerializationException: Error
> serializing Avro message
>
> Caused by: java.lang.NullPointerException: null of
> com.altvr.schema.jobs.RoomViewId in field source_view_id of
> com.altvr.schema.jobs.ViewBroadcastKey
>
>         at
> org.apache.avro.generic.GenericDatumWriter.npe(GenericDatumWriter.java:93)
>
>         at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:87)
>
>         at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
>
>         at
> io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:65)
>
>         at
> io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:61)
>
>         at
> com.altvr.streams.serializers.SpecificKafkaAvroSerializer.serialize(SpecificKafkaAvroSerializer.java:27)
>
>         at
> com.altvr.streams.serializers.SpecificKafkaAvroSerializer.serialize(SpecificKafkaAvroSerializer.java:10)
>
>         at
> org.apache.kafka.streams.state.StateSerdes.rawKey(StateSerdes.java:103)
>
>         at
> org.apache.kafka.streams.state.internals.RocksDBStore.get(RocksDBStore.java:229)
>
>         at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:100)
>
>         at
> com.altvr.streams.jobs.RoomOperationMessageProcessor.hasBegunBroadcastingRootViewToSpace(RoomOperationMessageProcessor.java:407)
>
>         at
> com.altvr.streams.jobs.RoomOperationMessageProcessor.setupOrTeardownBroadcasts(RoomOperationMessageProcessor.java:377)
>
>         at
> com.altvr.streams.jobs.RoomOperationMessageProcessor.forwardMessagesForBroadcasts(RoomOperationMessageProcessor.java:106)
>
>         at
> com.altvr.streams.jobs.BroadcastPhotonMessages$1.transform(BroadcastPhotonMessages.java:193)
>
>         at
> com.altvr.streams.jobs.BroadcastPhotonMessages$1.transform(BroadcastPhotonMessages.java:167)
>
>         at
> org.apache.kafka.streams.kstream.internals.KStreamTransform$KStreamTransformProcessor.process(KStreamTransform.java:57)
>
>         at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
>
>         at
> org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
>
>         at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>
>         at
> org.apache.kafka.streams.kstream.internals.KStreamKTableLeftJoin$KStreamKTableLeftJoinProcessor.process(KStreamKTableLeftJoin.java:61)
>
>         at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
>
>         at
> org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
>
>         at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>
>         at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64)
>
>         at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174)
>
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:352)
>
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:250)
>
> Caused by: java.lang.NullPointerException
>
>         at
> org.apache.avro.generic.GenericData.getField(GenericData.java:580)
>
>         at
> org.apache.avro.generic.GenericData.getField(GenericData.java:595)
>
>         at
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:112)
>
>         at
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
>
>         at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
>
>         at
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
>
>         at
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
>
>         at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
>
>
>
> On Thu, May 12, 2016 at 5:09 PM, Guozhang Wang <[email protected]> wrote:
> > Greg,
> >
> > Could you post the output from stdout when running the console as well?
> >
> >
> > Guozhang
> >
> > On Thu, May 12, 2016 at 4:52 PM, Greg Fodor <[email protected]> wrote:
> >
> >> We noticed that some errors were happening in one of our KafkaStreams
> >> jobs but they were not appearing in our logs or being sent to our
> >> error reporting service (Airbrake) -- they only became visible on
> >> stdout when running from the console. I believe the reason is because
> >> of this explicit catch-rethrow of KafkaExceptions in the main run loop
> >> of the StreamThread:
> >>
> >>
> >>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L219
> >>
> >> What's the proper way for these exceptions to be getting logged? The
> >> comment indicates they should be getting logged but they are not being
> >> processed by our loggers in log4j.properties. Here is our properties
> >> file:
> >>
> >> https://gist.github.com/gfodor/7d9a2102bde5f2dc062ead4e9551e670
> >>
> >> Thanks!
> >>
> >
> >
> >
> > --
> > -- Guozhang
>



-- 
-- Guozhang

Reply via email to