Technoboy- opened a new pull request, #16155:
URL: https://github.com/apache/pulsar/pull/16155
### Motivation
When PulsarRecordCursor close, there may occur below error:
```
2022-06-08T09:33:33.959423259-04:00 2022-06-08T13:33:33.959Z INFO
20220608_133129_00353_ydmer.2.0-21-52
org.apache.pulsar.sql.presto.PulsarRecordCursor Init cacheSizeAllocator with
NullCacheSizeAllocator.
2022-06-08T09:33:33.964070819-04:00 2022-06-08T13:33:33.963Z ERROR
deserialize-thread-split-13 org.apache.pulsar.sql.presto.PulsarRecordCursor
Stop running DeserializeEntries
2022-06-08T09:33:33.964094767-04:00 java.lang.IllegalArgumentException:
newPosition > limit: (48825 > 119)
2022-06-08T09:33:33.964101767-04:00 at
java.base/java.nio.Buffer.createPositionException(Buffer.java:318)
2022-06-08T09:33:33.964104898-04:00 at
java.base/java.nio.Buffer.position(Buffer.java:293)
2022-06-08T09:33:33.964123765-04:00 at
java.base/java.nio.ByteBuffer.position(ByteBuffer.java:1094)
2022-06-08T09:33:33.964126988-04:00 at
java.base/java.nio.HeapByteBuffer.get(HeapByteBuffer.java:184)
2022-06-08T09:33:33.964130488-04:00 at
io.netty.buffer.ReadOnlyByteBufferBuf.getBytes(ReadOnlyByteBufferBuf.java:200)
2022-06-08T09:33:33.964133500-04:00 at
io.netty.buffer.AbstractUnpooledSlicedByteBuf.getBytes(AbstractUnpooledSlicedByteBuf.java:243)
2022-06-08T09:33:33.964136667-04:00 at
io.netty.buffer.ByteBufUtil.decodeString(ByteBufUtil.java:1270)
2022-06-08T09:33:33.964139714-04:00 at
io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:1246)
2022-06-08T09:33:33.964142670-04:00 at
org.apache.pulsar.common.api.proto.LightProtoCodec.readString(LightProtoCodec.java:250)
2022-06-08T09:33:33.964145457-04:00 at
org.apache.pulsar.common.api.proto.KeyValue.getValue(KeyValue.java:55)
2022-06-08T09:33:33.964148441-04:00 at
org.apache.pulsar.common.api.proto.KeyValue.copyFrom(KeyValue.java:159)
2022-06-08T09:33:33.964150976-04:00 at
org.apache.pulsar.common.api.proto.SingleMessageMetadata.copyFrom(SingleMessageMetadata.java:505)
2022-06-08T09:33:33.964155504-04:00 at
org.apache.pulsar.common.api.raw.RawMessageImpl.get(RawMessageImpl.java:75)
2022-06-08T09:33:33.964158176-04:00 at
org.apache.pulsar.common.api.raw.MessageParser.receiveIndividualMessagesFromBatch(MessageParser.java:176)
2022-06-08T09:33:33.964161161-04:00 at
org.apache.pulsar.common.api.raw.MessageParser.parseMessage(MessageParser.java:112)
2022-06-08T09:33:33.964163699-04:00 at
org.apache.pulsar.sql.presto.PulsarRecordCursor$DeserializeEntries$1.accept(PulsarRecordCursor.java:295)
2022-06-08T09:33:33.964166399-04:00 at
org.apache.pulsar.sql.presto.PulsarRecordCursor$DeserializeEntries$1.accept(PulsarRecordCursor.java:273)
2022-06-08T09:33:33.964174886-04:00 at
org.jctools.queues.SpscArrayQueue.drain(SpscArrayQueue.java:266)
2022-06-08T09:33:33.964177766-04:00 at
org.jctools.queues.SpscArrayQueue.drain(SpscArrayQueue.java:239)
2022-06-08T09:33:33.964180504-04:00 at
org.apache.pulsar.sql.presto.PulsarRecordCursor$DeserializeEntries.run(PulsarRecordCursor.java:273)
2022-06-08T09:33:33.965527031-04:00 2022-06-08T13:33:33.963Z ERROR
SplitRunner-0-47 io.prestosql.execution.executor.TaskExecutor Error
processing Split 20220608_133129_00352_ydmer.2.0-7 PulsarSplit{splitId=13,
connectorId='pulsar', originSchemaName='derived_margin_data',
schemaName='qa-ipg/portfolio_finance', tableName='derived_margin_data',
splitSize=37,
schema='{"type":"record","namespace":"ASC.DerivedMarginData","name":"DerivedMarginDataItem","fields":[{"name":"ASCPermId","type":"int"},{"name":"ASCId","type":["null","string"],"default":null},{"name":"Ubs30AvgDtv","type":["null","double"],"default":null},{"name":"Ubs90Volatility","type":["null","double"],"default":null},{"name":"Jpm20AvgDtv","type":["null","double"],"default":null},{"name":"Jpm20Volatility","type":["null","double"],"default":null},{"name":"Empirical20Volatility","type":["null","double"],"default":null},{"name":"Dbk100AvgDtv","type":["null","double"],"default":null},{"name":"Dbk90AvgDtv","type":["null","double"],
"default":null},{"name":"Dbk20AvgDtv","type":["null","double"],"default":null},{"name":"Dbk10AvgDtv","type":["null","double"],"default":null},{"name":"Csf90AvgDtv","type":["null","double"],"default":null},{"name":"Csf60AvgDtv","type":["null","double"],"default":null},{"name":"Csf30AvgDtv","type":["null","double"],"default":null}]}',
schemaType=JSON, startPositionEntryId=33, endPositionEntryId=70,
startPositionLedgerId=3463486, endPositionLedgerId=3463486,
schemaInfoProperties={}} (start = 2.296111546565531E9, wall = 147 ms, cpu = 0
ms, wait = 429 ms, calls = 1)
2022-06-08T09:33:33.965565080-04:00 java.nio.BufferUnderflowException
2022-06-08T09:33:33.965571091-04:00 at
java.base/java.nio.HeapByteBuffer.get(HeapByteBuffer.java:182)
2022-06-08T09:33:33.965577921-04:00 at
io.netty.buffer.ReadOnlyByteBufferBuf.getBytes(ReadOnlyByteBufferBuf.java:200)
2022-06-08T09:33:33.965582062-04:00 at
io.netty.buffer.AbstractUnpooledSlicedByteBuf.getBytes(AbstractUnpooledSlicedByteBuf.java:243)
2022-06-08T09:33:33.965586668-04:00 at
io.netty.buffer.AbstractByteBuf.getBytes(AbstractByteBuf.java:490)
2022-06-08T09:33:33.965589527-04:00 at
org.apache.pulsar.common.api.proto.MessageMetadata.getSchemaVersion(MessageMetadata.java:537)
2022-06-08T09:33:33.965592372-04:00 at
org.apache.pulsar.common.api.raw.RawMessageImpl.getSchemaVersion(RawMessageImpl.java:156)
2022-06-08T09:33:33.965595098-04:00 at
org.apache.pulsar.sql.presto.PulsarRecordCursor.getSchemaInfo(PulsarRecordCursor.java:662)
2022-06-08T09:33:33.965597713-04:00 at
org.apache.pulsar.sql.presto.PulsarRecordCursor.advanceNextPosition(PulsarRecordCursor.java:530)
2022-06-08T09:33:33.965600538-04:00 at
io.prestosql.$gen.CursorProcessor_20220608_133326_33.process(Unknown Source)
2022-06-08T09:33:33.965603289-04:00 at
io.prestosql.operator.ScanFilterAndProjectOperator$RecordCursorToPages.process(ScanFilterAndProjectOperator.java:323)
2022-06-08T09:33:33.965606919-04:00 at
io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
2022-06-08T09:33:33.965610158-04:00 at
io.prestosql.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:221)
2022-06-08T09:33:33.965612957-04:00 at
io.prestosql.operator.WorkProcessorUtils.access$000(WorkProcessorUtils.java:37)
2022-06-08T09:33:33.965615585-04:00 at
io.prestosql.operator.WorkProcessorUtils$YieldingProcess.process(WorkProcessorUtils.java:181)
2022-06-08T09:33:33.965618223-04:00 at
io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
2022-06-08T09:33:33.965628121-04:00 at
io.prestosql.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:221)
2022-06-08T09:33:33.965631636-04:00 at
io.prestosql.operator.WorkProcessorUtils.lambda$processStateMonitor$2(WorkProcessorUtils.java:200)
2022-06-08T09:33:33.965635773-04:00 at
io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
2022-06-08T09:33:33.965639680-04:00 at
io.prestosql.operator.WorkProcessorUtils.lambda$flatten$6(WorkProcessorUtils.java:277)
2022-06-08T09:33:33.965643475-04:00 at
io.prestosql.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:319)
2022-06-08T09:33:33.965646669-04:00 at
io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
2022-06-08T09:33:33.965649650-04:00 at
io.prestosql.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:306)
2022-06-08T09:33:33.965652535-04:00 at
io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
2022-06-08T09:33:33.965655504-04:00 at
io.prestosql.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:221)
2022-06-08T09:33:33.965859342-04:00 at
io.prestosql.operator.WorkProcessorUtils.lambda$processStateMonitor$2(WorkProcessorUtils.java:200)
2022-06-08T09:33:33.965867541-04:00 at
io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
2022-06-08T09:33:33.965871003-04:00 at
io.prestosql.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:221)
2022-06-08T09:33:33.965877994-04:00 at
io.prestosql.operator.WorkProcessorUtils.lambda$finishWhen$3(WorkProcessorUtils.java:215)
2022-06-08T09:33:33.965882107-04:00 at
io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
2022-06-08T09:33:33.965889844-04:00 at
io.prestosql.operator.WorkProcessorSourceOperatorAdapter.getOutput(WorkProcessorSourceOperatorAdapter.java:149)
```
It's the same issue with #14379.
Because DeserializeEntries offer entries into `messageQueue` and will
release relative entries after processing.
But when PulsarRecordCursor closes, it will also release entries in the
`messageQueue`, so different threads have released the same entry to cause the
above issue.
### Modification
- Release the related queue messages after `deserializeEntries` close.
### Documentation
- [x] `doc-not-needed`
(Please explain why)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]