gianm opened a new issue, #13894:
URL: https://github.com/apache/druid/issues/13894
There's a problematic code path in `StreamChunkParser.parseWithInputFormat`:
```
try (FilteringCloseableInputRowIterator rowIterator = new
FilteringCloseableInputRowIterator(
byteEntityReader.read(),
rowFilter,
rowIngestionMeters,
parseExceptionHandler
)) {
rowIterator.forEachRemaining(rows::add);
}
```
It's a problem because `byteEntityReader.read()` can throw ParseException,
but nothing in the call stack catches it.
Logs look like this when using Kafka + Avro + an invalid message:
```
2023-03-01T03:52:23,217 INFO [task-runner-0-priority-0]
org.apache.kafka.clients.Metadata - [Consumer
clientId=consumer-kafka-supervisor-gjmihhhb-1,
groupId=kafka-supervisor-gjmihhhb] Cluster ID: KeurxA5sT_eKFHuIHquSHg
2023-03-01T03:52:23,259 ERROR [task-runner-0-priority-0]
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner -
Encountered exception in run() before persisting.
org.apache.druid.java.util.common.parsers.ParseException: Avro's unnecessary
EOFException, detail: [https://issues.apache.org/jira/browse/AVRO-813]
at
org.apache.druid.data.input.avro.InlineSchemaAvroBytesDecoder.parse(InlineSchemaAvroBytesDecoder.java:92)
~[?:?]
at
org.apache.druid.data.input.avro.AvroStreamReader.intermediateRowIterator(AvroStreamReader.java:70)
~[?:?]
at
org.apache.druid.data.input.IntermediateRowParsingReader.intermediateRowIteratorWithMetadata(IntermediateRowParsingReader.java:231)
~[druid-core-25.0.0.jar:25.0.0]
at
org.apache.druid.data.input.IntermediateRowParsingReader.read(IntermediateRowParsingReader.java:49)
~[druid-core-25.0.0.jar:25.0.0]
at
org.apache.druid.segment.transform.TransformingInputEntityReader.read(TransformingInputEntityReader.java:43)
~[druid-processing-25.0.0.jar:25.0.0]
at
org.apache.druid.indexing.seekablestream.SettableByteEntityReader.read(SettableByteEntityReader.java:70)
~[druid-indexing-service-25.0.0.jar:25.0.0]
at
org.apache.druid.indexing.seekablestream.StreamChunkParser.parseWithInputFormat(StreamChunkParser.java:135)
~[druid-indexing-service-25.0.0.jar:25.0.0]
at
org.apache.druid.indexing.seekablestream.StreamChunkParser.parse(StreamChunkParser.java:104)
~[druid-indexing-service-25.0.0.jar:25.0.0]
at
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.runInternal(SeekableStreamIndexTaskRunner.java:635)
~[druid-indexing-service-25.0.0.jar:25.0.0]
at
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.run(SeekableStreamIndexTaskRunner.java:266)
~[druid-indexing-service-25.0.0.jar:25.0.0]
at
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask.runTask(SeekableStreamIndexTask.java:151)
~[druid-indexing-service-25.0.0.jar:25.0.0]
at
org.apache.druid.indexing.common.task.AbstractTask.run(AbstractTask.java:169)
~[druid-indexing-service-25.0.0.jar:25.0.0]
at
org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:477)
~[druid-indexing-service-25.0.0.jar:25.0.0]
at
org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:449)
~[druid-indexing-service-25.0.0.jar:25.0.0]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
~[?:?]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
~[?:?]
at java.lang.Thread.run(Thread.java:834) ~[?:?]
Caused by: java.io.EOFException
at
org.apache.avro.io.BinaryDecoder$InputStreamByteSource.readRaw(BinaryDecoder.java:850)
~[?:?]
at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:372)
~[?:?]
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:289)
~[?:?]
at
org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:209) ~[?:?]
at
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469)
~[?:?]
at
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459)
~[?:?]
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
~[?:?]
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
~[?:?]
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
~[?:?]
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
~[?:?]
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
~[?:?]
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
~[?:?]
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
~[?:?]
at
org.apache.druid.data.input.avro.InlineSchemaAvroBytesDecoder.parse(InlineSchemaAvroBytesDecoder.java:88)
~[?:?]
... 17 more
2023-03-01T03:52:23,268 INFO [task-runner-0-priority-0]
org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Persisted
rows[0] and (estimated) bytes[0]
2023-03-01T03:52:23,272 INFO
[[index_kafka_test-duba_77ed6ecdfe1baa6_enmkjpdg]-appenderator-persist]
org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Flushed
in-memory data with commit metadata [AppenderatorDriverMetadata{segments={},
lastSegmentIds={},
callerMetadata={nextPartitions=SeekableStreamEndSequenceNumbers{stream='test-duba',
partitionSequenceNumberMap={0=0}}}}] for segments:
2023-03-01T03:52:23,272 INFO
[[index_kafka_test-duba_77ed6ecdfe1baa6_enmkjpdg]-appenderator-persist]
org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Persisted
stats: processed rows: [0], persisted rows[0], sinks: [0], total fireHydrants
(across sinks): [0], persisted fireHydrants (across sinks): [0]
2023-03-01T03:52:23,272 INFO [task-runner-0-priority-0]
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer
clientId=consumer-kafka-supervisor-gjmihhhb-1,
groupId=kafka-supervisor-gjmihhhb] Resetting generation and member id due to:
consumer pro-actively leaving the group
2023-03-01T03:52:23,272 INFO [task-runner-0-priority-0]
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer
clientId=consumer-kafka-supervisor-gjmihhhb-1,
groupId=kafka-supervisor-gjmihhhb] Request joining group due to: consumer
pro-actively leaving the group
2023-03-01T03:52:23,273 INFO [task-runner-0-priority-0]
org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
2023-03-01T03:52:23,273 INFO [task-runner-0-priority-0]
org.apache.kafka.common.metrics.Metrics - Closing reporter
org.apache.kafka.common.metrics.JmxReporter
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]