After looking at the source code (trunk), I think this is a bug that there are some cases where KafkaExceptions are not logged in log4j yet before it pop up to the StreamThread. I will file a JIRA for this issue.
Guozhang On Thu, May 12, 2016 at 5:57 PM, Greg Fodor <[email protected]> wrote: > Hey Guozhang, output was silent (since the log4j is streaming it to a > log file) except for the stack trace landing in stdout, i'm assuming > from the rethrow: > > Exception in thread "StreamThread-9" > org.apache.kafka.common.errors.SerializationException: Error > serializing Avro message > > Caused by: java.lang.NullPointerException: null of > com.altvr.schema.jobs.RoomViewId in field source_view_id of > com.altvr.schema.jobs.ViewBroadcastKey > > at > org.apache.avro.generic.GenericDatumWriter.npe(GenericDatumWriter.java:93) > > at > org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:87) > > at > org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58) > > at > io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:65) > > at > io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:61) > > at > com.altvr.streams.serializers.SpecificKafkaAvroSerializer.serialize(SpecificKafkaAvroSerializer.java:27) > > at > com.altvr.streams.serializers.SpecificKafkaAvroSerializer.serialize(SpecificKafkaAvroSerializer.java:10) > > at > org.apache.kafka.streams.state.StateSerdes.rawKey(StateSerdes.java:103) > > at > org.apache.kafka.streams.state.internals.RocksDBStore.get(RocksDBStore.java:229) > > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:100) > > at > com.altvr.streams.jobs.RoomOperationMessageProcessor.hasBegunBroadcastingRootViewToSpace(RoomOperationMessageProcessor.java:407) > > at > com.altvr.streams.jobs.RoomOperationMessageProcessor.setupOrTeardownBroadcasts(RoomOperationMessageProcessor.java:377) > > at > com.altvr.streams.jobs.RoomOperationMessageProcessor.forwardMessagesForBroadcasts(RoomOperationMessageProcessor.java:106) > > at > com.altvr.streams.jobs.BroadcastPhotonMessages$1.transform(BroadcastPhotonMessages.java:193) > > at > com.altvr.streams.jobs.BroadcastPhotonMessages$1.transform(BroadcastPhotonMessages.java:167) > > at > org.apache.kafka.streams.kstream.internals.KStreamTransform$KStreamTransformProcessor.process(KStreamTransform.java:57) > > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68) > > at > org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338) > > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) > > at > org.apache.kafka.streams.kstream.internals.KStreamKTableLeftJoin$KStreamKTableLeftJoinProcessor.process(KStreamKTableLeftJoin.java:61) > > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68) > > at > org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338) > > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) > > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64) > > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174) > > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:352) > > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:250) > > Caused by: java.lang.NullPointerException > > at > org.apache.avro.generic.GenericData.getField(GenericData.java:580) > > at > org.apache.avro.generic.GenericData.getField(GenericData.java:595) > > at > org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:112) > > at > org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104) > > at > org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66) > > at > org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114) > > at > org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104) > > at > org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66) > > > > On Thu, May 12, 2016 at 5:09 PM, Guozhang Wang <[email protected]> wrote: > > Greg, > > > > Could you post the output from stdout when running the console as well? > > > > > > Guozhang > > > > On Thu, May 12, 2016 at 4:52 PM, Greg Fodor <[email protected]> wrote: > > > >> We noticed that some errors were happening in one of our KafkaStreams > >> jobs but they were not appearing in our logs or being sent to our > >> error reporting service (Airbrake) -- they only became visible on > >> stdout when running from the console. I believe the reason is because > >> of this explicit catch-rethrow of KafkaExceptions in the main run loop > >> of the StreamThread: > >> > >> > >> > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L219 > >> > >> What's the proper way for these exceptions to be getting logged? The > >> comment indicates they should be getting logged but they are not being > >> processed by our loggers in log4j.properties. Here is our properties > >> file: > >> > >> https://gist.github.com/gfodor/7d9a2102bde5f2dc062ead4e9551e670 > >> > >> Thanks! > >> > > > > > > > > -- > > -- Guozhang > -- -- Guozhang
