This is an automated email from the ASF dual-hosted git repository.

csy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new d2af8a4b [AURON #2032] Fix thread-safety issues in 
UnifflePartitionWriter synchronization. (#2033)
d2af8a4b is described below

commit d2af8a4b6d1d8f2522790caa04a65d055a270230
Author: slfan1989 <[email protected]>
AuthorDate: Tue Feb 24 22:54:55 2026 +0800

    [AURON #2032] Fix thread-safety issues in UnifflePartitionWriter 
synchronization. (#2033)
    
    ### Which issue does this PR close?
    
    Closes #2032
    
    ### Rationale for this change
    
    **Split synchronization blocks** in `write()` method:
    
    ```
    override def write(partitionId: Int, buffer: ByteBuffer): Unit = {
        val bytes = new Array[Byte](buffer.limit())
        buffer.get(bytes)
        val bytesWritten = bytes.length
    
        val bufferManager = rssShuffleWriter.getBufferManager
        val shuffleBlockInfos = rssShuffleWriter.synchronized {
          bufferManager.addPartitionData(partitionId, bytes)
        }
        if (shuffleBlockInfos != null && !shuffleBlockInfos.isEmpty) {
          // synchronized
          rssShuffleWriter.synchronized {
            rssShuffleWriterPushBlocksMethod.invoke(rssShuffleWriter, 
shuffleBlockInfos)
          }
        }
        metrics.incBytesWritten(bytesWritten)
        mapStatusLengths(partitionId) += bytesWritten
      }
    ```
    
    **Inconsistent locking** in `close()` method
    
    ```
    override def close(success: Boolean): Unit = {
        val start = System.currentTimeMillis()
        val bufferManager = rssShuffleWriter.getBufferManager
        val restBlocks = bufferManager.clear()
        if (success && restBlocks != null && !restBlocks.isEmpty) {
          // non-synchronized
          rssShuffleWriterPushBlocksMethod.invoke(rssShuffleWriter, restBlocks)
        }
        val writeDurationMs = bufferManager.getWriteTime + 
(System.currentTimeMillis() - start)
        metrics.incWriteTime(TimeUnit.MILLISECONDS.toNanos(writeDurationMs))
      }
    ```
    
    ### What changes are included in this PR?
    
    #### Solution
    
    ```
    override def close(success: Boolean): Unit = {
        val start = System.currentTimeMillis()
        val bufferManager = rssShuffleWriter.getBufferManager
        val restBlocks = bufferManager.clear()
        if (success && restBlocks != null && !restBlocks.isEmpty) {
          // synchronized
          rssShuffleWriter.synchronized {
            rssShuffleWriterPushBlocksMethod.invoke(rssShuffleWriter, 
restBlocks)
          }
        }
        val writeDurationMs = bufferManager.getWriteTime + 
(System.currentTimeMillis() - start)
        metrics.incWriteTime(TimeUnit.MILLISECONDS.toNanos(writeDurationMs))
      }
    ```
    
    ### Are there any user-facing changes?
    
    No.
    
    ### How was this patch tested?
    
    Exists Unit Test.
    
    Signed-off-by: slfan1989 <[email protected]>
---
 .../sql/execution/auron/shuffle/uniffle/UnifflePartitionWriter.scala  | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git 
a/thirdparty/auron-uniffle/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/uniffle/UnifflePartitionWriter.scala
 
b/thirdparty/auron-uniffle/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/uniffle/UnifflePartitionWriter.scala
index 69c65a28..bc00ffb7 100644
--- 
a/thirdparty/auron-uniffle/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/uniffle/UnifflePartitionWriter.scala
+++ 
b/thirdparty/auron-uniffle/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/uniffle/UnifflePartitionWriter.scala
@@ -66,7 +66,9 @@ class UnifflePartitionWriter[K, V, C](
     val bufferManager = rssShuffleWriter.getBufferManager
     val restBlocks = bufferManager.clear()
     if (success && restBlocks != null && !restBlocks.isEmpty) {
-      rssShuffleWriterPushBlocksMethod.invoke(rssShuffleWriter, restBlocks)
+      rssShuffleWriter.synchronized {
+        rssShuffleWriterPushBlocksMethod.invoke(rssShuffleWriter, restBlocks)
+      }
     }
     val writeDurationMs = bufferManager.getWriteTime + 
(System.currentTimeMillis() - start)
     metrics.incWriteTime(TimeUnit.MILLISECONDS.toNanos(writeDurationMs))

Reply via email to