[
https://issues.apache.org/jira/browse/FLINK-32826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17819889#comment-17819889
]
LSZ commented on FLINK-32826:
-----------------------------
[~cndpzc] do u resolve this problem ?
> Our job is stuck on requestMemorySegmentBlocking
> ------------------------------------------------
>
> Key: FLINK-32826
> URL: https://issues.apache.org/jira/browse/FLINK-32826
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Network
> Affects Versions: 1.16.0, 1.16.2
> Reporter: Zhongyi Sun
> Priority: Major
>
> We have a Flink job and find it often gets stuck on requesting memory segment.
>
> {code:java}
> "shardConsumers-Source: xxx[1] -> Calc[2] (1/1)#0-thread-0" Id=107 WAITING on
> java.util.concurrent.CompletableFuture$Signaller@62e01595
> at [email protected]/jdk.internal.misc.Unsafe.park(Native Method)
> - waiting on java.util.concurrent.CompletableFuture$Signaller@62e01595
> at [email protected]/java.util.concurrent.locks.LockSupport.park(Unknown
> Source)
> at
> [email protected]/java.util.concurrent.CompletableFuture$Signaller.block(Unknown
> Source)
> at
> [email protected]/java.util.concurrent.ForkJoinPool.managedBlock(Unknown
> Source)
> at
> [email protected]/java.util.concurrent.CompletableFuture.waitingGet(Unknown
> Source)
> at [email protected]/java.util.concurrent.CompletableFuture.get(Unknown
> Source)
> at
> app//org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:383)
> at
> app//org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:355)
> at
> app//org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:414)
> at
> app//org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:390)
> at
> app//org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:328)
> at
> app//org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:161)
> at
> app//org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
> at
> app//org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
> at
> app//org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105)
> at
> app//org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:91)
> at
> app//org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at
> app//org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
> at
> app//org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
> at StreamExecCalc$23.processElement_split2(Unknown Source)
> at StreamExecCalc$23.processElement(Unknown Source)
> at
> app//org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
> at
> app//org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
> at
> app//org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> at
> app//org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
> at
> app//org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
> at
> app//org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:423)
> at
> app//org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:528)
> at
> app//org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collectWithTimestamp(StreamSourceContexts.java:108)
> at
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:1028)
> at
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.access$000(KinesisDataFetcher.java:113)
> at
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:315)
> at
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:332)
> at
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:329)
> at
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:1012)
> at
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:219)
> at
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.lambda$run$0(ShardConsumer.java:126)
> at
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer$$Lambda$1780/0x0000000840f89440.accept(Unknown
> Source)
> at
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.lambda$run$0(FanOutRecordPublisher.java:120)
> at
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher$$Lambda$1781/0x0000000840f89840.accept(Unknown
> Source)
> at
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.consumeAllRecordsFromKinesisShard(FanOutShardSubscriber.java:360)
> at
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.subscribeToShardAndConsumeRecords(FanOutShardSubscriber.java:189)
> at
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:169)
> at
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.run(FanOutRecordPublisher.java:124)
> at
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114)
> at
> [email protected]/java.util.concurrent.Executors$RunnableAdapter.call(Unknown
> Source)
> at [email protected]/java.util.concurrent.FutureTask.run(Unknown Source)
> at
> [email protected]/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source)
> at
> [email protected]/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source)
> at [email protected]/java.lang.Thread.run(Unknown Source) {code}
>
> We also noticed high backpressure, but we couldn't find the reason. The
> downstream writer thread was waiting for a message from mailbox.
> {code:java}
> "xxx[3]: Writer (1/1)#0" Id=91 TIMED_WAITING on
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2072929
> at [email protected]/jdk.internal.misc.Unsafe.park(Native Method) -
> waiting on
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2072929
> at
> [email protected]/java.util.concurrent.locks.LockSupport.parkNanos(Unknown
> Source) at
> [email protected]/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown
> Source) at
> app//org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149)
> at
> app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:363)
> at
> app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
> at
> app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
> at
> app//org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:836)
> {code}
> Some observations:
> * The job logic is pretty simple: it consumes AWS Kinesis, does some
> filtering and writes results to another Kinesis.
> * The job gets stuck after running for 3-4 days.
> ** If we restart from last checkpoint, the job will get stuck again soon.
> ** If we restart without checkpoint, the job will recover, and may be stuck
> in a few days again.
> * We have several jobs consuming different Kinesis, but only this one has
> problem. This Kinesis has only one shard, and data volume is small.
> * At first we were using 1.16.0, after found some issues like FLINK-29298,
> FLINK-31293 related to LocalBufferPool, we upgraded to the latest 1.16.2, but
> the issue was not solved.
> The heap dump of LocalBufferPool:
>
> {code:java}
> @LocalBufferPool[
> LOG=@Log4jLogger[
> FQCN=@String[org.apache.logging.slf4j.Log4jLogger],
> serialVersionUID=@Long[7869000638091304316],
>
> EVENT_MARKER=@Log4jMarker[org.apache.logging.slf4j.Log4jMarker@3f47a99],
> CONVERTER=null,
> eventLogger=@Boolean[false],
>
> logger=@Logger[org.apache.flink.runtime.io.network.buffer.LocalBufferPool:INFO
> in 4783da3f],
>
> name=@String[org.apache.flink.runtime.io.network.buffer.LocalBufferPool],
> ],
> UNKNOWN_CHANNEL=@Integer[-1],
> networkBufferPool=@NetworkBufferPool[
> UNBOUNDED_POOL_SIZE=@Integer[2147483647],
> USAGE_WARNING_THRESHOLD=@Integer[100],
> LOG=@Log4jLogger[org.apache.logging.slf4j.Log4jLogger@16f3a390],
> totalNumberOfMemorySegments=@Integer[5079],
> memorySegmentSize=@Integer[32768],
> availableMemorySegments=@ArrayDeque[isEmpty=false;size=5068],
> isDestroyed=@Boolean[false],
> factoryLock=@Object[java.lang.Object@5151b0a4],
> allBufferPools=@HashSet[isEmpty=false;size=2],
> resizableBufferPools=@HashSet[isEmpty=false;size=2],
> numTotalRequiredBuffers=@Integer[3],
> requestSegmentsTimeout=@Duration[PT30S],
> availabilityHelper=@AvailabilityHelper[AVAILABLE],
> lastCheckedUsage=@Integer[0],
> $assertionsDisabled=@Boolean[true],
> ],
> numberOfRequiredMemorySegments=@Integer[2],
> availableMemorySegments=@ArrayDeque[isEmpty=true;size=0],
> registeredListeners=@ArrayDeque[isEmpty=true;size=0],
> maxNumberOfMemorySegments=@Integer[10],
> currentPoolSize=@Integer[10],
> numberOfRequestedMemorySegments=@Integer[10],
> maxBuffersPerChannel=@Integer[10],
> subpartitionBuffersCount=@int[][
> @Integer[10],
> ],
> subpartitionBufferRecyclers=@BufferRecycler[][
>
> @SubpartitionBufferRecycler[org.apache.flink.runtime.io.network.buffer.LocalBufferPool$SubpartitionBufferRecycler@41b3ff09],
> ],
> unavailableSubpartitionsCount=@Integer[1],
> maxOverdraftBuffersPerGate=@Integer[0],
> isDestroyed=@Boolean[false],
> availabilityHelper=@AvailabilityHelper[
>
> availableFuture=@CompletableFuture[java.util.concurrent.CompletableFuture@4570da85[Not
> completed, 1 dependents]],
> ],
> requestingNotificationOfGlobalPoolAvailable=@Boolean[false],
> $assertionsDisabled=@Boolean[true],
> ] {code}
> Could you help give some clues on how to troubleshoot such problem? Or if you
> need more information, please let me know, thank you!
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)