This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 3525bab94 [#2711] fix(spark): Race condition on deferred compressed
block initialization (#2712)
3525bab94 is described below
commit 3525bab94eef75d1a9002fafb1579534b9d6307d
Author: Junfan Zhang <[email protected]>
AuthorDate: Thu Jan 15 10:49:00 2026 +0800
[#2711] fix(spark): Race condition on deferred compressed block
initialization (#2712)
### What changes were proposed in this pull request?
Fix race condition on deferred compressed block initialization
### Why are the changes needed?
fix #2711
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing unit tests
---------
Co-authored-by: zhangjunfan <[email protected]>
---
.../org/apache/uniffle/common/DeferredCompressedBlock.java | 13 +++++++++----
1 file changed, 9 insertions(+), 4 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/DeferredCompressedBlock.java
b/common/src/main/java/org/apache/uniffle/common/DeferredCompressedBlock.java
index d2cb4076e..e178c5269 100644
---
a/common/src/main/java/org/apache/uniffle/common/DeferredCompressedBlock.java
+++
b/common/src/main/java/org/apache/uniffle/common/DeferredCompressedBlock.java
@@ -18,6 +18,7 @@
package org.apache.uniffle.common;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import io.netty.buffer.ByteBuf;
@@ -31,7 +32,7 @@ import io.netty.buffer.Unpooled;
public class DeferredCompressedBlock extends ShuffleBlockInfo {
private final Function<DeferredCompressedBlock, DeferredCompressedBlock>
rebuildFunction;
private int estimatedCompressedSize;
- private boolean isInitialized = false;
+ private AtomicBoolean isInitialized = new AtomicBoolean(false);
public DeferredCompressedBlock(
int shuffleId,
@@ -66,9 +67,13 @@ public class DeferredCompressedBlock extends
ShuffleBlockInfo {
}
private void initialize() {
- if (!isInitialized) {
- rebuildFunction.apply(this);
- isInitialized = true;
+ if (!isInitialized.get()) {
+ synchronized (this) {
+ if (!isInitialized.get()) {
+ rebuildFunction.apply(this);
+ isInitialized.set(true);
+ }
+ }
}
}