loserwang1024 opened a new issue, #2646: URL: https://github.com/apache/fluss/issues/2646
### Search before asking - [x] I searched in the [issues](https://github.com/apache/fluss/issues) and found nothing similar. ### Fluss version 0.8.0 (latest release) ### Please describe the bug 🐞 If oom when decompress data: ```java 2026-02-11 16:00:59,159 ERROR org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Received uncaught exception. java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168) ~[flink-connector-files-1.20-vvr-11.5.0-2-jdk11.jar:1.20-vvr-11.5.0-2-jdk11] at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) [flink-connector-files-1.20-vvr-11.5.0-2-jdk11.jar:1.20-vvr-11.5.0-2-jdk11] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?] at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at java.lang.Thread.run(Thread.java:991) [?:?] Caused by: org.apache.fluss.exception.FetchException: Received exception when fetching the next record from TableBucket{tableId=7, bucket=3}. If needed, please back to past the record to continue scanning. at org.apache.fluss.client.table.scanner.log.CompletedFetch.fetchRecords(CompletedFetch.java:187) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.client.table.scanner.log.LogFetchCollector.fetchRecords(LogFetchCollector.java:160) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.client.table.scanner.log.LogFetchCollector.collectFetch(LogFetchCollector.java:118) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.client.table.scanner.log.LogFetcher.collectFetch(LogFetcher.java:165) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.client.table.scanner.log.LogScannerImpl.pollForFetches(LogScannerImpl.java:234) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.client.table.scanner.log.LogScannerImpl.poll(LogScannerImpl.java:144) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.flink.source.reader.FlinkSourceSplitReader.fetch(FlinkSourceSplitReader.java:174) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-connector-files-1.20-vvr-11.5.0-2-jdk11.jar:1.20-vvr-11.5.0-2-jdk11] at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ~[flink-connector-files-1.20-vvr-11.5.0-2-jdk11.jar:1.20-vvr-11.5.0-2-jdk11] ... 6 more Caused by: org.apache.fluss.shaded.arrow.org.apache.arrow.memory.OutOfMemoryException: Failure allocating buffer. at org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:67) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:77) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:84) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.NettyAllocationManager$1.create(NettyAllocationManager.java:34) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:300) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:296) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:284) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:265) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:231) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.compression.ZstdArrowCompressionCodec.doDecompress(ZstdArrowCompressionCodec.java:88) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.arrow.org.apache.arrow.vector.compression.AbstractCompressionCodec.decompress(AbstractCompressionCodec.java:77) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorLoader.loadBuffers(VectorLoader.java:106) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorLoader.load(VectorLoader.java:84) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.utils.ArrowUtils.createArrowReader(ArrowUtils.java:181) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.record.DefaultLogRecordBatch.columnRecordIterator(DefaultLogRecordBatch.java:345) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.record.DefaultLogRecordBatch.records(DefaultLogRecordBatch.java:231) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.record.FileLogInputStream$FileChannelLogRecordBatch.records(FileLogInputStream.java:169) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.client.table.scanner.log.CompletedFetch.nextFetchedRecord(CompletedFetch.java:220) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.client.table.scanner.log.CompletedFetch.fetchRecords(CompletedFetch.java:170) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.client.table.scanner.log.LogFetchCollector.fetchRecords(LogFetchCollector.java:160) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.client.table.scanner.log.LogFetchCollector.collectFetch(LogFetchCollector.java:118) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.client.table.scanner.log.LogFetcher.collectFetch(LogFetcher.java:165) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.client.table.scanner.log.LogScannerImpl.pollForFetches(LogScannerImpl.java:234) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.client.table.scanner.log.LogScannerImpl.poll(LogScannerImpl.java:144) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.flink.source.reader.FlinkSourceSplitReader.fetch(FlinkSourceSplitReader.java:174) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-connector-files-1.20-vvr-11.5.0-2-jdk11.jar:1.20-vvr-11.5.0-2-jdk11] at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ~[flink-connector-files-1.20-vvr-11.5.0-2-jdk11.jar:1.20-vvr-11.5.0-2-jdk11] ... 6 more Caused by: java.lang.OutOfMemoryError: Direct buffer memory at java.nio.Bits.reserveMemory(Bits.java:175) ~[?:?] at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118) ~[?:?] at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) ~[?:?] at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:710) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:685) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:212) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena.tcacheAllocateNormal(PoolArena.java:194) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:136) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:126) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.newDirectBufferL(PooledByteBufAllocatorL.java:178) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.directBuffer(PooledByteBufAllocatorL.java:211) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:58) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:77) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:84) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.NettyAllocationManager$1.create(NettyAllocationManager.java:34) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:300) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:296) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:284) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:265) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:231) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.compression.ZstdArrowCompressionCodec.doDecompress(ZstdArrowCompressionCodec.java:88) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.arrow.org.apache.arrow.vector.compression.AbstractCompressionCodec.decompress(AbstractCompressionCodec.java:77) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorLoader.loadBuffers(VectorLoader.java:106) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorLoader.load(VectorLoader.java:84) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.utils.ArrowUtils.createArrowReader(ArrowUtils.java:181) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.record.DefaultLogRecordBatch.columnRecordIterator(DefaultLogRecordBatch.java:345) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.record.DefaultLogRecordBatch.records(DefaultLogRecordBatch.java:231) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.record.FileLogInputStream$FileChannelLogRecordBatch.records(FileLogInputStream.java:169) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.client.table.scanner.log.CompletedFetch.nextFetchedRecord(CompletedFetch.java:220) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.client.table.scanner.log.CompletedFetch.fetchRecords(CompletedFetch.java:170) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.client.table.scanner.log.LogFetchCollector.fetchRecords(LogFetchCollector.java:160) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.client.table.scanner.log.LogFetchCollector.collectFetch(LogFetchCollector.java:118) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.client.table.scanner.log.LogFetcher.collectFetch(LogFetcher.java:165) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.client.table.scanner.log.LogScannerImpl.pollForFetches(LogScannerImpl.java:234) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.client.table.scanner.log.LogScannerImpl.poll(LogScannerImpl.java:144) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.flink.source.reader.FlinkSourceSplitReader.fetch(FlinkSourceSplitReader.java:174) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-connector-files-1.20-vvr-11.5.0-2-jdk11.jar:1.20-vvr-11.5.0-2-jdk11] at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ~[flink-connector-files-1.20-vvr-11.5.0-2-jdk11.jar:1.20-vvr-11.5.0-2-jdk11] ... 6 more ``` Then will leak memory: ```java 2026-02-11 16:00:59,241 ERROR org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator [] - Memory was leaked by query. Memory leaked: (2112) Allocator(ROOT) 0/2112/16777088/9223372036854775807 (res/actual/peak/limit) 2026-02-11 16:00:59,241 ERROR org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Received uncaught exception. java.lang.IllegalStateException: Memory was leaked by query. Memory leaked: (2112) Allocator(ROOT) 0/2112/16777088/9223372036854775807 (res/actual/peak/limit) at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.close(BaseAllocator.java:405) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator.close(RootAllocator.java:29) ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT] ``` <img width="1191" height="669" alt="Image" src="https://github.com/user-attachments/assets/27d83042-94fe-4163-9837-ebe81afe7c40" /> Theses temp ownBuffers cannot be released by VectorSchemaRoot because it has not been attach to vector. ### Solution Change from ```java public void close() { vectorSchemaRootMap.values().forEach(VectorSchemaRoot::close); if (bufferAllocator != null) { bufferAllocator.close(); } } ``` to ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
