FMX commented on code in PR #2300:
URL: https://github.com/apache/celeborn/pull/2300#discussion_r1600972546


##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -156,35 +199,90 @@ public void decrementPendingWrites() {
     numPendingWrites.decrementAndGet();
   }
 
-  protected void flush(boolean finalFlush) throws IOException {
+  protected void flushInternal(boolean finalFlush, boolean evict) throws 
IOException {
     synchronized (flushLock) {
       // flushBuffer == null here means writer already closed
       if (flushBuffer != null) {
         int numBytes = flushBuffer.readableBytes();
         if (numBytes != 0) {
           notifier.checkException();
-          notifier.numPendingFlushes.incrementAndGet();
           FlushTask task = null;
-          if (channel != null) {
-            task = new LocalFlushTask(flushBuffer, channel, notifier);
-          } else if (diskFileInfo.isHdfs()) {
-            task = new HdfsFlushTask(flushBuffer, diskFileInfo.getHdfsPath(), 
notifier);
+          if (evict) {
+            notifier.numPendingFlushes.incrementAndGet();
+            // flush task will release the buffer of memory shuffle file
+            if (channel != null) {
+              task = new LocalFlushTask(flushBuffer, channel, notifier, false);
+            } else if (diskFileInfo.isHdfs()) {
+              task = new HdfsFlushTask(flushBuffer, 
diskFileInfo.getHdfsPath(), notifier, false);
+            }
+            MemoryManager.instance().releaseMemoryFileStorage(numBytes);
+            MemoryManager.instance().incrementDiskBuffer(numBytes);
+            // read flush buffer to generate correct chunk offsets
+            ByteBuf dupBuf = null;
+            if (memoryFileInfo.getSortedBuffer() != null) {
+              dupBuf = memoryFileInfo.getSortedBuffer();
+            } else {
+              dupBuf = flushBuffer.duplicate();
+            }
+            ByteBuffer headerBuf = ByteBuffer.allocate(16);
+            while (dupBuf.isReadable()) {
+              headerBuf.rewind();
+              dupBuf.readBytes(headerBuf);
+              byte[] batchHeader = headerBuf.array();
+              int compressedSize = Platform.getInt(batchHeader, 
Platform.BYTE_ARRAY_OFFSET + 12);
+              dupBuf.skipBytes(compressedSize);
+              diskFileInfo.updateBytesFlushed(compressedSize + 16);
+            }
+          } else {
+            if (!isMemoryShuffleFile.get()) {

Review Comment:
   True means this partition writer is still in memory while closing this 
writer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to