platinumhamburg commented on code in PR #2647:
URL: https://github.com/apache/fluss/pull/2647#discussion_r2793084557


##########
fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java:
##########
@@ -286,6 +286,11 @@ public ProjectedRow getOutputProjectedRow(int schemaId) {
     public void close() {
         vectorSchemaRootMap.values().forEach(VectorSchemaRoot::close);
         if (bufferAllocator != null) {
+            // Force-release any remaining allocated memory in case some 
buffers were not tracked by
+            // vectorSchemaRootMap,
+            // for example temporary buffers left behind when an OOM occurs 
during decompression
+            // (see https://github.com/apache/fluss/issues/2646).
+            bufferAllocator.releaseBytes(bufferAllocator.getAllocatedMemory());

Review Comment:
   Thanks for the fix! The approach is consistent with 
RecordAccumulator.close().
   
   One concern: releaseBytes() only adjusts the allocator's accounting but 
doesn't actually free the underlying direct memory - those buffers will
   remain until GC. This fix prevents the IllegalStateException on close, which 
is good for avoiding cascading failures, but the actual memory may still
   leak temporarily.
   
   For a more robust solution, consider adding try-finally in 
ZstdArrowCompressionCodec.doDecompress() to ensure temp buffers are released on 
failure.
   But the current fix is acceptable as a defensive measure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to