[ 
https://issues.apache.org/jira/browse/FLINK-32826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17819889#comment-17819889
 ] 

LSZ commented on FLINK-32826:
-----------------------------

[~cndpzc]   do u resolve this problem ?  

> Our job is stuck on requestMemorySegmentBlocking
> ------------------------------------------------
>
>                 Key: FLINK-32826
>                 URL: https://issues.apache.org/jira/browse/FLINK-32826
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Network
>    Affects Versions: 1.16.0, 1.16.2
>            Reporter: Zhongyi Sun
>            Priority: Major
>
> We have a Flink job and find it often gets stuck on requesting memory segment.
>  
> {code:java}
> "shardConsumers-Source: xxx[1] -> Calc[2] (1/1)#0-thread-0" Id=107 WAITING on 
> java.util.concurrent.CompletableFuture$Signaller@62e01595
>     at [email protected]/jdk.internal.misc.Unsafe.park(Native Method)
>     -  waiting on java.util.concurrent.CompletableFuture$Signaller@62e01595
>     at [email protected]/java.util.concurrent.locks.LockSupport.park(Unknown 
> Source)
>     at 
> [email protected]/java.util.concurrent.CompletableFuture$Signaller.block(Unknown
>  Source)
>     at 
> [email protected]/java.util.concurrent.ForkJoinPool.managedBlock(Unknown 
> Source)
>     at 
> [email protected]/java.util.concurrent.CompletableFuture.waitingGet(Unknown 
> Source)
>     at [email protected]/java.util.concurrent.CompletableFuture.get(Unknown 
> Source)
>     at 
> app//org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:383)
>     at 
> app//org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:355)
>     at 
> app//org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:414)
>     at 
> app//org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:390)
>     at 
> app//org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:328)
>     at 
> app//org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:161)
>     at 
> app//org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
>     at 
> app//org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
>     at 
> app//org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105)
>     at 
> app//org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:91)
>     at 
> app//org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>     at 
> app//org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
>     at 
> app//org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
>     at StreamExecCalc$23.processElement_split2(Unknown Source)
>     at StreamExecCalc$23.processElement(Unknown Source)
>     at 
> app//org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>     at 
> app//org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
>     at 
> app//org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>     at 
> app//org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
>     at 
> app//org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
>     at 
> app//org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:423)
>     at 
> app//org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:528)
>     at 
> app//org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collectWithTimestamp(StreamSourceContexts.java:108)
>     at 
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:1028)
>     at 
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.access$000(KinesisDataFetcher.java:113)
>     at 
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:315)
>     at 
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:332)
>     at 
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:329)
>     at 
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:1012)
>     at 
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:219)
>     at 
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.lambda$run$0(ShardConsumer.java:126)
>     at 
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer$$Lambda$1780/0x0000000840f89440.accept(Unknown
>  Source)
>     at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.lambda$run$0(FanOutRecordPublisher.java:120)
>     at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher$$Lambda$1781/0x0000000840f89840.accept(Unknown
>  Source)
>     at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.consumeAllRecordsFromKinesisShard(FanOutShardSubscriber.java:360)
>     at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.subscribeToShardAndConsumeRecords(FanOutShardSubscriber.java:189)
>     at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:169)
>     at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.run(FanOutRecordPublisher.java:124)
>     at 
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114)
>     at 
> [email protected]/java.util.concurrent.Executors$RunnableAdapter.call(Unknown 
> Source)
>     at [email protected]/java.util.concurrent.FutureTask.run(Unknown Source)
>     at 
> [email protected]/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
> Source)
>     at 
> [email protected]/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
> Source)
>     at [email protected]/java.lang.Thread.run(Unknown Source) {code}
>  
> We also noticed high backpressure, but we couldn't find the reason. The 
> downstream writer thread was waiting for a message from mailbox.
> {code:java}
> "xxx[3]: Writer (1/1)#0" Id=91 TIMED_WAITING on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2072929 
>    at [email protected]/jdk.internal.misc.Unsafe.park(Native Method)    -  
> waiting on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2072929 
>    at 
> [email protected]/java.util.concurrent.locks.LockSupport.parkNanos(Unknown 
> Source)    at 
> [email protected]/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown
>  Source)    at 
> app//org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149)
>     at 
> app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:363)
>     at 
> app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
>     at 
> app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
>     at 
> app//org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:836)
>  {code}
> Some observations:
>  * The job logic is pretty simple: it consumes AWS Kinesis, does some 
> filtering and writes results to another Kinesis.
>  * The job gets stuck after running for 3-4 days.
>  ** If we restart from last checkpoint, the job will get stuck again soon.
>  ** If we restart without checkpoint, the job will recover, and may be stuck 
> in a few days again.
>  * We have several jobs consuming different Kinesis, but only this one has 
> problem. This Kinesis has only one shard, and data volume is small.
>  * At first we were using 1.16.0, after found some issues like FLINK-29298, 
> FLINK-31293 related to LocalBufferPool, we upgraded to the latest 1.16.2, but 
> the issue was not solved.
> The heap dump of LocalBufferPool:
>  
> {code:java}
> @LocalBufferPool[
>         LOG=@Log4jLogger[
>             FQCN=@String[org.apache.logging.slf4j.Log4jLogger],
>             serialVersionUID=@Long[7869000638091304316],
>             
> EVENT_MARKER=@Log4jMarker[org.apache.logging.slf4j.Log4jMarker@3f47a99],
>             CONVERTER=null,
>             eventLogger=@Boolean[false],
>             
> logger=@Logger[org.apache.flink.runtime.io.network.buffer.LocalBufferPool:INFO
>  in 4783da3f],
>             
> name=@String[org.apache.flink.runtime.io.network.buffer.LocalBufferPool],
>         ],
>         UNKNOWN_CHANNEL=@Integer[-1],
>         networkBufferPool=@NetworkBufferPool[
>             UNBOUNDED_POOL_SIZE=@Integer[2147483647],
>             USAGE_WARNING_THRESHOLD=@Integer[100],
>             LOG=@Log4jLogger[org.apache.logging.slf4j.Log4jLogger@16f3a390],
>             totalNumberOfMemorySegments=@Integer[5079],
>             memorySegmentSize=@Integer[32768],
>             availableMemorySegments=@ArrayDeque[isEmpty=false;size=5068],
>             isDestroyed=@Boolean[false],
>             factoryLock=@Object[java.lang.Object@5151b0a4],
>             allBufferPools=@HashSet[isEmpty=false;size=2],
>             resizableBufferPools=@HashSet[isEmpty=false;size=2],
>             numTotalRequiredBuffers=@Integer[3],
>             requestSegmentsTimeout=@Duration[PT30S],
>             availabilityHelper=@AvailabilityHelper[AVAILABLE],
>             lastCheckedUsage=@Integer[0],
>             $assertionsDisabled=@Boolean[true],
>         ],
>         numberOfRequiredMemorySegments=@Integer[2],
>         availableMemorySegments=@ArrayDeque[isEmpty=true;size=0],
>         registeredListeners=@ArrayDeque[isEmpty=true;size=0],
>         maxNumberOfMemorySegments=@Integer[10],
>         currentPoolSize=@Integer[10],
>         numberOfRequestedMemorySegments=@Integer[10],
>         maxBuffersPerChannel=@Integer[10],
>         subpartitionBuffersCount=@int[][
>             @Integer[10],
>         ],
>         subpartitionBufferRecyclers=@BufferRecycler[][
>             
> @SubpartitionBufferRecycler[org.apache.flink.runtime.io.network.buffer.LocalBufferPool$SubpartitionBufferRecycler@41b3ff09],
>         ],
>         unavailableSubpartitionsCount=@Integer[1],
>         maxOverdraftBuffersPerGate=@Integer[0],
>         isDestroyed=@Boolean[false],
>         availabilityHelper=@AvailabilityHelper[
>             
> availableFuture=@CompletableFuture[java.util.concurrent.CompletableFuture@4570da85[Not
>  completed, 1 dependents]],
>         ],
>         requestingNotificationOfGlobalPoolAvailable=@Boolean[false],
>         $assertionsDisabled=@Boolean[true],
>     ] {code}
> Could you help give some clues on how to troubleshoot such problem? Or if you 
> need more information, please let me know, thank you!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to