This is an automated email from the ASF dual-hosted git repository.
csy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new d2af8a4b [AURON #2032] Fix thread-safety issues in
UnifflePartitionWriter synchronization. (#2033)
d2af8a4b is described below
commit d2af8a4b6d1d8f2522790caa04a65d055a270230
Author: slfan1989 <[email protected]>
AuthorDate: Tue Feb 24 22:54:55 2026 +0800
[AURON #2032] Fix thread-safety issues in UnifflePartitionWriter
synchronization. (#2033)
### Which issue does this PR close?
Closes #2032
### Rationale for this change
**Split synchronization blocks** in `write()` method:
```
override def write(partitionId: Int, buffer: ByteBuffer): Unit = {
val bytes = new Array[Byte](buffer.limit())
buffer.get(bytes)
val bytesWritten = bytes.length
val bufferManager = rssShuffleWriter.getBufferManager
val shuffleBlockInfos = rssShuffleWriter.synchronized {
bufferManager.addPartitionData(partitionId, bytes)
}
if (shuffleBlockInfos != null && !shuffleBlockInfos.isEmpty) {
// synchronized
rssShuffleWriter.synchronized {
rssShuffleWriterPushBlocksMethod.invoke(rssShuffleWriter,
shuffleBlockInfos)
}
}
metrics.incBytesWritten(bytesWritten)
mapStatusLengths(partitionId) += bytesWritten
}
```
**Inconsistent locking** in `close()` method
```
override def close(success: Boolean): Unit = {
val start = System.currentTimeMillis()
val bufferManager = rssShuffleWriter.getBufferManager
val restBlocks = bufferManager.clear()
if (success && restBlocks != null && !restBlocks.isEmpty) {
// non-synchronized
rssShuffleWriterPushBlocksMethod.invoke(rssShuffleWriter, restBlocks)
}
val writeDurationMs = bufferManager.getWriteTime +
(System.currentTimeMillis() - start)
metrics.incWriteTime(TimeUnit.MILLISECONDS.toNanos(writeDurationMs))
}
```
### What changes are included in this PR?
#### Solution
```
override def close(success: Boolean): Unit = {
val start = System.currentTimeMillis()
val bufferManager = rssShuffleWriter.getBufferManager
val restBlocks = bufferManager.clear()
if (success && restBlocks != null && !restBlocks.isEmpty) {
// synchronized
rssShuffleWriter.synchronized {
rssShuffleWriterPushBlocksMethod.invoke(rssShuffleWriter,
restBlocks)
}
}
val writeDurationMs = bufferManager.getWriteTime +
(System.currentTimeMillis() - start)
metrics.incWriteTime(TimeUnit.MILLISECONDS.toNanos(writeDurationMs))
}
```
### Are there any user-facing changes?
No.
### How was this patch tested?
Exists Unit Test.
Signed-off-by: slfan1989 <[email protected]>
---
.../sql/execution/auron/shuffle/uniffle/UnifflePartitionWriter.scala | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git
a/thirdparty/auron-uniffle/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/uniffle/UnifflePartitionWriter.scala
b/thirdparty/auron-uniffle/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/uniffle/UnifflePartitionWriter.scala
index 69c65a28..bc00ffb7 100644
---
a/thirdparty/auron-uniffle/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/uniffle/UnifflePartitionWriter.scala
+++
b/thirdparty/auron-uniffle/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/uniffle/UnifflePartitionWriter.scala
@@ -66,7 +66,9 @@ class UnifflePartitionWriter[K, V, C](
val bufferManager = rssShuffleWriter.getBufferManager
val restBlocks = bufferManager.clear()
if (success && restBlocks != null && !restBlocks.isEmpty) {
- rssShuffleWriterPushBlocksMethod.invoke(rssShuffleWriter, restBlocks)
+ rssShuffleWriter.synchronized {
+ rssShuffleWriterPushBlocksMethod.invoke(rssShuffleWriter, restBlocks)
+ }
}
val writeDurationMs = bufferManager.getWriteTime +
(System.currentTimeMillis() - start)
metrics.incWriteTime(TimeUnit.MILLISECONDS.toNanos(writeDurationMs))