gianm opened a new issue, #13894:
URL: https://github.com/apache/druid/issues/13894

   There's a problematic code path in `StreamChunkParser.parseWithInputFormat`:
   
   ```
   try (FilteringCloseableInputRowIterator rowIterator = new 
FilteringCloseableInputRowIterator(
             byteEntityReader.read(),
             rowFilter,
             rowIngestionMeters,
             parseExceptionHandler
         )) {
           rowIterator.forEachRemaining(rows::add);
         }
   ```
   
   It's a problem because `byteEntityReader.read()` can throw ParseException, 
but nothing in the call stack catches it.
   
   Logs look like this when using Kafka + Avro + an invalid message:
   
   ```
   2023-03-01T03:52:23,217 INFO [task-runner-0-priority-0] 
org.apache.kafka.clients.Metadata - [Consumer 
clientId=consumer-kafka-supervisor-gjmihhhb-1, 
groupId=kafka-supervisor-gjmihhhb] Cluster ID: KeurxA5sT_eKFHuIHquSHg
   2023-03-01T03:52:23,259 ERROR [task-runner-0-priority-0] 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - 
Encountered exception in run() before persisting.
   org.apache.druid.java.util.common.parsers.ParseException: Avro's unnecessary 
EOFException, detail: [https://issues.apache.org/jira/browse/AVRO-813]
        at 
org.apache.druid.data.input.avro.InlineSchemaAvroBytesDecoder.parse(InlineSchemaAvroBytesDecoder.java:92)
 ~[?:?]
        at 
org.apache.druid.data.input.avro.AvroStreamReader.intermediateRowIterator(AvroStreamReader.java:70)
 ~[?:?]
        at 
org.apache.druid.data.input.IntermediateRowParsingReader.intermediateRowIteratorWithMetadata(IntermediateRowParsingReader.java:231)
 ~[druid-core-25.0.0.jar:25.0.0]
        at 
org.apache.druid.data.input.IntermediateRowParsingReader.read(IntermediateRowParsingReader.java:49)
 ~[druid-core-25.0.0.jar:25.0.0]
        at 
org.apache.druid.segment.transform.TransformingInputEntityReader.read(TransformingInputEntityReader.java:43)
 ~[druid-processing-25.0.0.jar:25.0.0]
        at 
org.apache.druid.indexing.seekablestream.SettableByteEntityReader.read(SettableByteEntityReader.java:70)
 ~[druid-indexing-service-25.0.0.jar:25.0.0]
        at 
org.apache.druid.indexing.seekablestream.StreamChunkParser.parseWithInputFormat(StreamChunkParser.java:135)
 ~[druid-indexing-service-25.0.0.jar:25.0.0]
        at 
org.apache.druid.indexing.seekablestream.StreamChunkParser.parse(StreamChunkParser.java:104)
 ~[druid-indexing-service-25.0.0.jar:25.0.0]
        at 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.runInternal(SeekableStreamIndexTaskRunner.java:635)
 ~[druid-indexing-service-25.0.0.jar:25.0.0]
        at 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.run(SeekableStreamIndexTaskRunner.java:266)
 ~[druid-indexing-service-25.0.0.jar:25.0.0]
        at 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask.runTask(SeekableStreamIndexTask.java:151)
 ~[druid-indexing-service-25.0.0.jar:25.0.0]
        at 
org.apache.druid.indexing.common.task.AbstractTask.run(AbstractTask.java:169) 
~[druid-indexing-service-25.0.0.jar:25.0.0]
        at 
org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:477)
 ~[druid-indexing-service-25.0.0.jar:25.0.0]
        at 
org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:449)
 ~[druid-indexing-service-25.0.0.jar:25.0.0]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
~[?:?]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
~[?:?]
        at java.lang.Thread.run(Thread.java:834) ~[?:?]
   Caused by: java.io.EOFException
        at 
org.apache.avro.io.BinaryDecoder$InputStreamByteSource.readRaw(BinaryDecoder.java:850)
 ~[?:?]
        at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:372) 
~[?:?]
        at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:289) 
~[?:?]
        at 
org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:209) ~[?:?]
        at 
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469)
 ~[?:?]
        at 
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459)
 ~[?:?]
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
 ~[?:?]
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) 
~[?:?]
        at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
 ~[?:?]
        at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
 ~[?:?]
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
 ~[?:?]
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) 
~[?:?]
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) 
~[?:?]
        at 
org.apache.druid.data.input.avro.InlineSchemaAvroBytesDecoder.parse(InlineSchemaAvroBytesDecoder.java:88)
 ~[?:?]
        ... 17 more
   2023-03-01T03:52:23,268 INFO [task-runner-0-priority-0] 
org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Persisted 
rows[0] and (estimated) bytes[0]
   2023-03-01T03:52:23,272 INFO 
[[index_kafka_test-duba_77ed6ecdfe1baa6_enmkjpdg]-appenderator-persist] 
org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Flushed 
in-memory data with commit metadata [AppenderatorDriverMetadata{segments={}, 
lastSegmentIds={}, 
callerMetadata={nextPartitions=SeekableStreamEndSequenceNumbers{stream='test-duba',
 partitionSequenceNumberMap={0=0}}}}] for segments: 
   2023-03-01T03:52:23,272 INFO 
[[index_kafka_test-duba_77ed6ecdfe1baa6_enmkjpdg]-appenderator-persist] 
org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Persisted 
stats: processed rows: [0], persisted rows[0], sinks: [0], total fireHydrants 
(across sinks): [0], persisted fireHydrants (across sinks): [0]
   2023-03-01T03:52:23,272 INFO [task-runner-0-priority-0] 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer 
clientId=consumer-kafka-supervisor-gjmihhhb-1, 
groupId=kafka-supervisor-gjmihhhb] Resetting generation and member id due to: 
consumer pro-actively leaving the group
   2023-03-01T03:52:23,272 INFO [task-runner-0-priority-0] 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer 
clientId=consumer-kafka-supervisor-gjmihhhb-1, 
groupId=kafka-supervisor-gjmihhhb] Request joining group due to: consumer 
pro-actively leaving the group
   2023-03-01T03:52:23,273 INFO [task-runner-0-priority-0] 
org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
   2023-03-01T03:52:23,273 INFO [task-runner-0-priority-0] 
org.apache.kafka.common.metrics.Metrics - Closing reporter 
org.apache.kafka.common.metrics.JmxReporter
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to