Technoboy- opened a new pull request, #16155:
URL: https://github.com/apache/pulsar/pull/16155

   ### Motivation
   When PulsarRecordCursor close, there may occur below error: 
   ```
   2022-06-08T09:33:33.959423259-04:00 2022-06-08T13:33:33.959Z INFO    
20220608_133129_00353_ydmer.2.0-21-52   
org.apache.pulsar.sql.presto.PulsarRecordCursor Init cacheSizeAllocator with 
NullCacheSizeAllocator.
   2022-06-08T09:33:33.964070819-04:00 2022-06-08T13:33:33.963Z ERROR   
deserialize-thread-split-13     org.apache.pulsar.sql.presto.PulsarRecordCursor 
Stop running DeserializeEntries
   2022-06-08T09:33:33.964094767-04:00 java.lang.IllegalArgumentException: 
newPosition > limit: (48825 > 119)
   2022-06-08T09:33:33.964101767-04:00  at 
java.base/java.nio.Buffer.createPositionException(Buffer.java:318)
   2022-06-08T09:33:33.964104898-04:00  at 
java.base/java.nio.Buffer.position(Buffer.java:293)
   2022-06-08T09:33:33.964123765-04:00  at 
java.base/java.nio.ByteBuffer.position(ByteBuffer.java:1094)
   2022-06-08T09:33:33.964126988-04:00  at 
java.base/java.nio.HeapByteBuffer.get(HeapByteBuffer.java:184)
   2022-06-08T09:33:33.964130488-04:00  at 
io.netty.buffer.ReadOnlyByteBufferBuf.getBytes(ReadOnlyByteBufferBuf.java:200)
   2022-06-08T09:33:33.964133500-04:00  at 
io.netty.buffer.AbstractUnpooledSlicedByteBuf.getBytes(AbstractUnpooledSlicedByteBuf.java:243)
   2022-06-08T09:33:33.964136667-04:00  at 
io.netty.buffer.ByteBufUtil.decodeString(ByteBufUtil.java:1270)
   2022-06-08T09:33:33.964139714-04:00  at 
io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:1246)
   2022-06-08T09:33:33.964142670-04:00  at 
org.apache.pulsar.common.api.proto.LightProtoCodec.readString(LightProtoCodec.java:250)
   2022-06-08T09:33:33.964145457-04:00  at 
org.apache.pulsar.common.api.proto.KeyValue.getValue(KeyValue.java:55)
   2022-06-08T09:33:33.964148441-04:00  at 
org.apache.pulsar.common.api.proto.KeyValue.copyFrom(KeyValue.java:159)
   2022-06-08T09:33:33.964150976-04:00  at 
org.apache.pulsar.common.api.proto.SingleMessageMetadata.copyFrom(SingleMessageMetadata.java:505)
   2022-06-08T09:33:33.964155504-04:00  at 
org.apache.pulsar.common.api.raw.RawMessageImpl.get(RawMessageImpl.java:75)
   2022-06-08T09:33:33.964158176-04:00  at 
org.apache.pulsar.common.api.raw.MessageParser.receiveIndividualMessagesFromBatch(MessageParser.java:176)
   2022-06-08T09:33:33.964161161-04:00  at 
org.apache.pulsar.common.api.raw.MessageParser.parseMessage(MessageParser.java:112)
   2022-06-08T09:33:33.964163699-04:00  at 
org.apache.pulsar.sql.presto.PulsarRecordCursor$DeserializeEntries$1.accept(PulsarRecordCursor.java:295)
   2022-06-08T09:33:33.964166399-04:00  at 
org.apache.pulsar.sql.presto.PulsarRecordCursor$DeserializeEntries$1.accept(PulsarRecordCursor.java:273)
   2022-06-08T09:33:33.964174886-04:00  at 
org.jctools.queues.SpscArrayQueue.drain(SpscArrayQueue.java:266)
   2022-06-08T09:33:33.964177766-04:00  at 
org.jctools.queues.SpscArrayQueue.drain(SpscArrayQueue.java:239)
   2022-06-08T09:33:33.964180504-04:00  at 
org.apache.pulsar.sql.presto.PulsarRecordCursor$DeserializeEntries.run(PulsarRecordCursor.java:273)
   
   2022-06-08T09:33:33.965527031-04:00 2022-06-08T13:33:33.963Z ERROR   
SplitRunner-0-47        io.prestosql.execution.executor.TaskExecutor    Error 
processing Split 20220608_133129_00352_ydmer.2.0-7 PulsarSplit{splitId=13, 
connectorId='pulsar', originSchemaName='derived_margin_data', 
schemaName='qa-ipg/portfolio_finance', tableName='derived_margin_data', 
splitSize=37, 
schema='{"type":"record","namespace":"ASC.DerivedMarginData","name":"DerivedMarginDataItem","fields":[{"name":"ASCPermId","type":"int"},{"name":"ASCId","type":["null","string"],"default":null},{"name":"Ubs30AvgDtv","type":["null","double"],"default":null},{"name":"Ubs90Volatility","type":["null","double"],"default":null},{"name":"Jpm20AvgDtv","type":["null","double"],"default":null},{"name":"Jpm20Volatility","type":["null","double"],"default":null},{"name":"Empirical20Volatility","type":["null","double"],"default":null},{"name":"Dbk100AvgDtv","type":["null","double"],"default":null},{"name":"Dbk90AvgDtv","type":["null","double"],
 
"default":null},{"name":"Dbk20AvgDtv","type":["null","double"],"default":null},{"name":"Dbk10AvgDtv","type":["null","double"],"default":null},{"name":"Csf90AvgDtv","type":["null","double"],"default":null},{"name":"Csf60AvgDtv","type":["null","double"],"default":null},{"name":"Csf30AvgDtv","type":["null","double"],"default":null}]}',
 schemaType=JSON, startPositionEntryId=33, endPositionEntryId=70, 
startPositionLedgerId=3463486, endPositionLedgerId=3463486, 
schemaInfoProperties={}} (start = 2.296111546565531E9, wall = 147 ms, cpu = 0 
ms, wait = 429 ms, calls = 1)
   2022-06-08T09:33:33.965565080-04:00 java.nio.BufferUnderflowException
   2022-06-08T09:33:33.965571091-04:00  at 
java.base/java.nio.HeapByteBuffer.get(HeapByteBuffer.java:182)
   2022-06-08T09:33:33.965577921-04:00  at 
io.netty.buffer.ReadOnlyByteBufferBuf.getBytes(ReadOnlyByteBufferBuf.java:200)
   2022-06-08T09:33:33.965582062-04:00  at 
io.netty.buffer.AbstractUnpooledSlicedByteBuf.getBytes(AbstractUnpooledSlicedByteBuf.java:243)
   2022-06-08T09:33:33.965586668-04:00  at 
io.netty.buffer.AbstractByteBuf.getBytes(AbstractByteBuf.java:490)
   2022-06-08T09:33:33.965589527-04:00  at 
org.apache.pulsar.common.api.proto.MessageMetadata.getSchemaVersion(MessageMetadata.java:537)
   2022-06-08T09:33:33.965592372-04:00  at 
org.apache.pulsar.common.api.raw.RawMessageImpl.getSchemaVersion(RawMessageImpl.java:156)
   2022-06-08T09:33:33.965595098-04:00  at 
org.apache.pulsar.sql.presto.PulsarRecordCursor.getSchemaInfo(PulsarRecordCursor.java:662)
   2022-06-08T09:33:33.965597713-04:00  at 
org.apache.pulsar.sql.presto.PulsarRecordCursor.advanceNextPosition(PulsarRecordCursor.java:530)
   2022-06-08T09:33:33.965600538-04:00  at 
io.prestosql.$gen.CursorProcessor_20220608_133326_33.process(Unknown Source)
   2022-06-08T09:33:33.965603289-04:00  at 
io.prestosql.operator.ScanFilterAndProjectOperator$RecordCursorToPages.process(ScanFilterAndProjectOperator.java:323)
   2022-06-08T09:33:33.965606919-04:00  at 
io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
   2022-06-08T09:33:33.965610158-04:00  at 
io.prestosql.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:221)
   2022-06-08T09:33:33.965612957-04:00  at 
io.prestosql.operator.WorkProcessorUtils.access$000(WorkProcessorUtils.java:37)
   2022-06-08T09:33:33.965615585-04:00  at 
io.prestosql.operator.WorkProcessorUtils$YieldingProcess.process(WorkProcessorUtils.java:181)
   2022-06-08T09:33:33.965618223-04:00  at 
io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
   2022-06-08T09:33:33.965628121-04:00  at 
io.prestosql.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:221)
   2022-06-08T09:33:33.965631636-04:00  at 
io.prestosql.operator.WorkProcessorUtils.lambda$processStateMonitor$2(WorkProcessorUtils.java:200)
   2022-06-08T09:33:33.965635773-04:00  at 
io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
   2022-06-08T09:33:33.965639680-04:00  at 
io.prestosql.operator.WorkProcessorUtils.lambda$flatten$6(WorkProcessorUtils.java:277)
   2022-06-08T09:33:33.965643475-04:00  at 
io.prestosql.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:319)
   2022-06-08T09:33:33.965646669-04:00  at 
io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
   2022-06-08T09:33:33.965649650-04:00  at 
io.prestosql.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:306)
   2022-06-08T09:33:33.965652535-04:00  at 
io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
   2022-06-08T09:33:33.965655504-04:00  at 
io.prestosql.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:221)
   2022-06-08T09:33:33.965859342-04:00  at 
io.prestosql.operator.WorkProcessorUtils.lambda$processStateMonitor$2(WorkProcessorUtils.java:200)
   2022-06-08T09:33:33.965867541-04:00  at 
io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
   2022-06-08T09:33:33.965871003-04:00  at 
io.prestosql.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:221)
   2022-06-08T09:33:33.965877994-04:00  at 
io.prestosql.operator.WorkProcessorUtils.lambda$finishWhen$3(WorkProcessorUtils.java:215)
   2022-06-08T09:33:33.965882107-04:00  at 
io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
   2022-06-08T09:33:33.965889844-04:00  at 
io.prestosql.operator.WorkProcessorSourceOperatorAdapter.getOutput(WorkProcessorSourceOperatorAdapter.java:149)
   ```
   
   It's the same issue with #14379.
   
   Because DeserializeEntries offer entries into `messageQueue` and will 
release relative entries after processing.
   But when PulsarRecordCursor closes, it will also release entries in the 
`messageQueue`, so different threads have released the same entry to cause the 
above issue.
   
   ### Modification
   
   - Release the related queue messages after `deserializeEntries` close.
   
   ### Documentation
   
   
   - [x] `doc-not-needed` 
   (Please explain why)
     


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to