loserwang1024 opened a new issue, #2646:
URL: https://github.com/apache/fluss/issues/2646

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/apache/fluss/issues) and 
found nothing similar.
   
   
   ### Fluss version
   
   0.8.0 (latest release)
   
   ### Please describe the bug 🐞
   
   If oom when decompress data: 
   ```java
   2026-02-11 16:00:59,159 ERROR 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - 
Received uncaught exception.
   java.lang.RuntimeException: SplitFetcher thread 0 received unexpected 
exception while polling the records
        at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
 ~[flink-connector-files-1.20-vvr-11.5.0-2-jdk11.jar:1.20-vvr-11.5.0-2-jdk11]
        at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
 [flink-connector-files-1.20-vvr-11.5.0-2-jdk11.jar:1.20-vvr-11.5.0-2-jdk11]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]
        at java.lang.Thread.run(Thread.java:991) [?:?]
   Caused by: org.apache.fluss.exception.FetchException: Received exception 
when fetching the next record from TableBucket{tableId=7, bucket=3}. If needed, 
please back to past the record to continue scanning.
        at 
org.apache.fluss.client.table.scanner.log.CompletedFetch.fetchRecords(CompletedFetch.java:187)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.client.table.scanner.log.LogFetchCollector.fetchRecords(LogFetchCollector.java:160)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.client.table.scanner.log.LogFetchCollector.collectFetch(LogFetchCollector.java:118)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.client.table.scanner.log.LogFetcher.collectFetch(LogFetcher.java:165)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.client.table.scanner.log.LogScannerImpl.pollForFetches(LogScannerImpl.java:234)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.client.table.scanner.log.LogScannerImpl.poll(LogScannerImpl.java:144)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.flink.source.reader.FlinkSourceSplitReader.fetch(FlinkSourceSplitReader.java:174)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
 ~[flink-connector-files-1.20-vvr-11.5.0-2-jdk11.jar:1.20-vvr-11.5.0-2-jdk11]
        at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
 ~[flink-connector-files-1.20-vvr-11.5.0-2-jdk11.jar:1.20-vvr-11.5.0-2-jdk11]
        ... 6 more
   Caused by: 
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.OutOfMemoryException: 
Failure allocating buffer.
        at 
org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:67)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:77)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:84)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.NettyAllocationManager$1.create(NettyAllocationManager.java:34)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:300)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:296)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:284)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:265)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:231)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.compression.ZstdArrowCompressionCodec.doDecompress(ZstdArrowCompressionCodec.java:88)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.compression.AbstractCompressionCodec.decompress(AbstractCompressionCodec.java:77)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorLoader.loadBuffers(VectorLoader.java:106)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorLoader.load(VectorLoader.java:84)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.utils.ArrowUtils.createArrowReader(ArrowUtils.java:181) 
~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.record.DefaultLogRecordBatch.columnRecordIterator(DefaultLogRecordBatch.java:345)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.record.DefaultLogRecordBatch.records(DefaultLogRecordBatch.java:231)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.record.FileLogInputStream$FileChannelLogRecordBatch.records(FileLogInputStream.java:169)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.client.table.scanner.log.CompletedFetch.nextFetchedRecord(CompletedFetch.java:220)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.client.table.scanner.log.CompletedFetch.fetchRecords(CompletedFetch.java:170)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.client.table.scanner.log.LogFetchCollector.fetchRecords(LogFetchCollector.java:160)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.client.table.scanner.log.LogFetchCollector.collectFetch(LogFetchCollector.java:118)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.client.table.scanner.log.LogFetcher.collectFetch(LogFetcher.java:165)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.client.table.scanner.log.LogScannerImpl.pollForFetches(LogScannerImpl.java:234)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.client.table.scanner.log.LogScannerImpl.poll(LogScannerImpl.java:144)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.flink.source.reader.FlinkSourceSplitReader.fetch(FlinkSourceSplitReader.java:174)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
 ~[flink-connector-files-1.20-vvr-11.5.0-2-jdk11.jar:1.20-vvr-11.5.0-2-jdk11]
        at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
 ~[flink-connector-files-1.20-vvr-11.5.0-2-jdk11.jar:1.20-vvr-11.5.0-2-jdk11]
        ... 6 more
   Caused by: java.lang.OutOfMemoryError: Direct buffer memory
        at java.nio.Bits.reserveMemory(Bits.java:175) ~[?:?]
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118) ~[?:?]
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) ~[?:?]
        at 
org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:710)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:685)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:212)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena.tcacheAllocateNormal(PoolArena.java:194)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:136)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:126)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.newDirectBufferL(PooledByteBufAllocatorL.java:178)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.directBuffer(PooledByteBufAllocatorL.java:211)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:58)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:77)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:84)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.NettyAllocationManager$1.create(NettyAllocationManager.java:34)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:300)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:296)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:284)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:265)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:231)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.compression.ZstdArrowCompressionCodec.doDecompress(ZstdArrowCompressionCodec.java:88)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.compression.AbstractCompressionCodec.decompress(AbstractCompressionCodec.java:77)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorLoader.loadBuffers(VectorLoader.java:106)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorLoader.load(VectorLoader.java:84)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.utils.ArrowUtils.createArrowReader(ArrowUtils.java:181) 
~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.record.DefaultLogRecordBatch.columnRecordIterator(DefaultLogRecordBatch.java:345)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.record.DefaultLogRecordBatch.records(DefaultLogRecordBatch.java:231)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.record.FileLogInputStream$FileChannelLogRecordBatch.records(FileLogInputStream.java:169)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.client.table.scanner.log.CompletedFetch.nextFetchedRecord(CompletedFetch.java:220)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.client.table.scanner.log.CompletedFetch.fetchRecords(CompletedFetch.java:170)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.client.table.scanner.log.LogFetchCollector.fetchRecords(LogFetchCollector.java:160)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.client.table.scanner.log.LogFetchCollector.collectFetch(LogFetchCollector.java:118)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.client.table.scanner.log.LogFetcher.collectFetch(LogFetcher.java:165)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.client.table.scanner.log.LogScannerImpl.pollForFetches(LogScannerImpl.java:234)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.client.table.scanner.log.LogScannerImpl.poll(LogScannerImpl.java:144)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.flink.source.reader.FlinkSourceSplitReader.fetch(FlinkSourceSplitReader.java:174)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
 ~[flink-connector-files-1.20-vvr-11.5.0-2-jdk11.jar:1.20-vvr-11.5.0-2-jdk11]
        at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
 ~[flink-connector-files-1.20-vvr-11.5.0-2-jdk11.jar:1.20-vvr-11.5.0-2-jdk11]
        ... 6 more
   ```
   
   Then  will leak memory:
   ```java
   2026-02-11 16:00:59,241 ERROR 
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator [] - Memory 
was leaked by query. Memory leaked: (2112)
   Allocator(ROOT) 0/2112/16777088/9223372036854775807 (res/actual/peak/limit)
   
   2026-02-11 16:00:59,241 ERROR 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - 
Received uncaught exception.
   java.lang.IllegalStateException: Memory was leaked by query. Memory leaked: 
(2112)
   Allocator(ROOT) 0/2112/16777088/9223372036854775807 (res/actual/peak/limit)
   
        at 
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.close(BaseAllocator.java:405)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
        at 
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator.close(RootAllocator.java:29)
 ~[fluss-ali-vvr-11-oom2.jar:0.10-SNAPSHOT]
   ```
   
   <img width="1191" height="669" alt="Image" 
src="https://github.com/user-attachments/assets/27d83042-94fe-4163-9837-ebe81afe7c40";
 />
   
   Theses temp ownBuffers cannot be released by VectorSchemaRoot because it has 
not been attach to vector.
   
   ### Solution
   
   Change from 
   ```java
   public void close() {
           vectorSchemaRootMap.values().forEach(VectorSchemaRoot::close);
           if (bufferAllocator != null) {
               bufferAllocator.close();
           }
       }
   ```
   
   to 
   
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to