This is an automated email from the ASF dual-hosted git repository.
rickyma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new d32542bae [#1847] refactor: Remove capacity and related codes within
AbstractShuffleBuffer (#1853)
d32542bae is described below
commit d32542bae04b6be99cb82e13ffa90c5d1d50c837
Author: maobaolong <[email protected]>
AuthorDate: Tue Jul 2 19:35:05 2024 +0800
[#1847] refactor: Remove capacity and related codes within
AbstractShuffleBuffer (#1853)
### What changes were proposed in this pull request?
Remove `capacity` and related codes within `AbstractShuffleBuffer`.
### Why are the changes needed?
Fix: #1847.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs.
---
.../server/buffer/AbstractShuffleBuffer.java | 9 +------
.../uniffle/server/buffer/ShuffleBuffer.java | 2 --
.../server/buffer/ShuffleBufferManager.java | 6 ++---
.../server/buffer/ShuffleBufferWithLinkedList.java | 3 +--
.../server/buffer/ShuffleBufferWithSkipList.java | 3 +--
.../buffer/ShuffleBufferWithLinkedListTest.java | 28 ++++++++++------------
.../buffer/ShuffleBufferWithSkipListTest.java | 12 ++++------
7 files changed, 21 insertions(+), 42 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
index fd5a62280..ddbeb21cf 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
@@ -39,11 +39,9 @@ public abstract class AbstractShuffleBuffer implements
ShuffleBuffer {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractShuffleBuffer.class);
- private final long capacity;
protected long size;
- public AbstractShuffleBuffer(long capacity) {
- this.capacity = capacity;
+ public AbstractShuffleBuffer() {
this.size = 0;
}
@@ -69,11 +67,6 @@ public abstract class AbstractShuffleBuffer implements
ShuffleBuffer {
return size;
}
- @Override
- public boolean isFull() {
- return size > capacity;
- }
-
@Override
public synchronized ShuffleDataResult getShuffleData(long lastBlockId, int
readBufferSize) {
return getShuffleData(lastBlockId, readBufferSize, null);
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
index 1884a50c0..f0d4dadb4 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
@@ -52,8 +52,6 @@ public interface ShuffleBuffer {
long getSize();
- boolean isFull();
-
/** Only for test */
List<ShufflePartitionedBlock> getBlocks();
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index c62ebd8c5..ed0c2136d 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -74,8 +74,6 @@ public class ShuffleBufferManager {
// Huge partition vars
private ReconfigurableConfManager.Reconfigurable<Long>
hugePartitionSizeThresholdRef;
private long hugePartitionMemoryLimitSize;
-
- protected long bufferSize = 0;
protected AtomicLong preAllocatedSize = new AtomicLong(0L);
protected AtomicLong inFlushSize = new AtomicLong(0L);
protected AtomicLong usedMemory = new AtomicLong(0L);
@@ -156,9 +154,9 @@ public class ShuffleBufferManager {
ShuffleServerMetrics.gaugeTotalPartitionNum.inc();
ShuffleBuffer shuffleBuffer;
if (shuffleBufferType == ShuffleBufferType.SKIP_LIST) {
- shuffleBuffer = new ShuffleBufferWithSkipList(bufferSize);
+ shuffleBuffer = new ShuffleBufferWithSkipList();
} else {
- shuffleBuffer = new ShuffleBufferWithLinkedList(bufferSize);
+ shuffleBuffer = new ShuffleBufferWithLinkedList();
}
bufferRangeMap.put(Range.closed(startPartition, endPartition),
shuffleBuffer);
} else {
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
index d37dc446f..3f5ff900c 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
@@ -42,8 +42,7 @@ public class ShuffleBufferWithLinkedList extends
AbstractShuffleBuffer {
private List<ShufflePartitionedBlock> blocks;
private Map<Long, List<ShufflePartitionedBlock>> inFlushBlockMap;
- public ShuffleBufferWithLinkedList(long capacity) {
- super(capacity);
+ public ShuffleBufferWithLinkedList() {
this.blocks = new LinkedList<>();
this.inFlushBlockMap = JavaUtils.newConcurrentMap();
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
index db0ce2543..e7630b434 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
@@ -44,8 +44,7 @@ public class ShuffleBufferWithSkipList extends
AbstractShuffleBuffer {
private final Map<Long, ConcurrentSkipListMap<Long,
ShufflePartitionedBlock>> inFlushBlockMap;
private int blockCount;
- public ShuffleBufferWithSkipList(long capacity) {
- super(capacity);
+ public ShuffleBufferWithSkipList() {
this.blocksMap = newConcurrentSkipListMap();
this.inFlushBlockMap = JavaUtils.newConcurrentMap();
}
diff --git
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
index 90ac86c9f..b262b7c32 100644
---
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
@@ -35,7 +35,6 @@ import org.apache.uniffle.server.ShuffleDataFlushEvent;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -45,24 +44,21 @@ public class ShuffleBufferWithLinkedListTest extends
BufferTestBase {
@Test
public void appendTest() {
- ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList(100);
+ ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
shuffleBuffer.append(createData(10));
// ShufflePartitionedBlock has constant 32 bytes overhead
assertEquals(42, shuffleBuffer.getSize());
- assertFalse(shuffleBuffer.isFull());
shuffleBuffer.append(createData(26));
assertEquals(100, shuffleBuffer.getSize());
- assertFalse(shuffleBuffer.isFull());
shuffleBuffer.append(createData(1));
assertEquals(133, shuffleBuffer.getSize());
- assertTrue(shuffleBuffer.isFull());
}
@Test
public void appendMultiBlocksTest() {
- ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList(100);
+ ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
ShufflePartitionedData data1 = createData(10);
ShufflePartitionedData data2 = createData(10);
ShufflePartitionedBlock[] dataCombine = new ShufflePartitionedBlock[2];
@@ -74,7 +70,7 @@ public class ShuffleBufferWithLinkedListTest extends
BufferTestBase {
@Test
public void toFlushEventTest() {
- ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList(100);
+ ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
ShuffleDataFlushEvent event = shuffleBuffer.toFlushEvent("appId", 0, 0, 1,
null);
assertNull(event);
shuffleBuffer.append(createData(10));
@@ -88,7 +84,7 @@ public class ShuffleBufferWithLinkedListTest extends
BufferTestBase {
@Test
public void getShuffleDataWithExpectedTaskIdsFilterTest() {
/** case1: all blocks in cached(or in flushed map) and size <
readBufferSize */
- ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList(100);
+ ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
ShufflePartitionedData spd1 = createData(1, 1, 15);
ShufflePartitionedData spd2 = createData(1, 0, 15);
ShufflePartitionedData spd3 = createData(1, 2, 55);
@@ -200,7 +196,7 @@ public class ShuffleBufferWithLinkedListTest extends
BufferTestBase {
@Test
public void getShuffleDataWithLocalOrderTest() {
- ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList(200);
+ ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
ShufflePartitionedData spd1 = createData(1, 1, 15);
ShufflePartitionedData spd2 = createData(1, 0, 15);
ShufflePartitionedData spd3 = createData(1, 2, 15);
@@ -238,7 +234,7 @@ public class ShuffleBufferWithLinkedListTest extends
BufferTestBase {
@Test
public void getShuffleDataTest() {
- ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList(200);
+ ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
// case1: cached data only, blockId = -1, readBufferSize > buffer size
ShufflePartitionedData spd1 = createData(10);
ShufflePartitionedData spd2 = createData(20);
@@ -250,7 +246,7 @@ public class ShuffleBufferWithLinkedListTest extends
BufferTestBase {
assertArrayEquals(expectedData, sdr.getData());
// case2: cached data only, blockId = -1, readBufferSize = buffer size
- shuffleBuffer = new ShuffleBufferWithLinkedList(200);
+ shuffleBuffer = new ShuffleBufferWithLinkedList();
spd1 = createData(20);
spd2 = createData(20);
shuffleBuffer.append(spd1);
@@ -261,7 +257,7 @@ public class ShuffleBufferWithLinkedListTest extends
BufferTestBase {
assertArrayEquals(expectedData, sdr.getData());
// case3-1: cached data only, blockId = -1, readBufferSize < buffer size
- shuffleBuffer = new ShuffleBufferWithLinkedList(200);
+ shuffleBuffer = new ShuffleBufferWithLinkedList();
spd1 = createData(20);
spd2 = createData(21);
shuffleBuffer.append(spd1);
@@ -272,7 +268,7 @@ public class ShuffleBufferWithLinkedListTest extends
BufferTestBase {
assertArrayEquals(expectedData, sdr.getData());
// case3-2: cached data only, blockId = -1, readBufferSize < buffer size
- shuffleBuffer = new ShuffleBufferWithLinkedList(200);
+ shuffleBuffer = new ShuffleBufferWithLinkedList();
spd1 = createData(15);
spd2 = createData(15);
ShufflePartitionedData spd3 = createData(15);
@@ -292,7 +288,7 @@ public class ShuffleBufferWithLinkedListTest extends
BufferTestBase {
assertArrayEquals(expectedData, sdr.getData());
// case5: flush data only, blockId = -1, readBufferSize < buffer size
- shuffleBuffer = new ShuffleBufferWithLinkedList(200);
+ shuffleBuffer = new ShuffleBufferWithLinkedList();
spd1 = createData(15);
spd2 = createData(15);
shuffleBuffer.append(spd1);
@@ -310,13 +306,13 @@ public class ShuffleBufferWithLinkedListTest extends
BufferTestBase {
assertEquals(0, sdr.getBufferSegments().size());
// case6: no data in buffer & flush buffer
- shuffleBuffer = new ShuffleBufferWithLinkedList(200);
+ shuffleBuffer = new ShuffleBufferWithLinkedList();
sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 10);
assertEquals(0, sdr.getBufferSegments().size());
assertEquals(0, sdr.getDataLength());
// case7: get data with multiple flush buffer and cached buffer
- shuffleBuffer = new ShuffleBufferWithLinkedList(200);
+ shuffleBuffer = new ShuffleBufferWithLinkedList();
spd1 = createData(15);
spd2 = createData(15);
spd3 = createData(15);
diff --git
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipListTest.java
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipListTest.java
index bf5040304..5bb5e2aa1 100644
---
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipListTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipListTest.java
@@ -31,7 +31,6 @@ import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.server.ShuffleDataFlushEvent;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -40,24 +39,21 @@ public class ShuffleBufferWithSkipListTest extends
BufferTestBase {
@Test
public void appendTest() {
- ShuffleBuffer shuffleBuffer = new ShuffleBufferWithSkipList(100);
+ ShuffleBuffer shuffleBuffer = new ShuffleBufferWithSkipList();
shuffleBuffer.append(createData(10));
// ShufflePartitionedBlock has constant 32 bytes overhead
assertEquals(42, shuffleBuffer.getSize());
- assertFalse(shuffleBuffer.isFull());
shuffleBuffer.append(createData(26));
assertEquals(100, shuffleBuffer.getSize());
- assertFalse(shuffleBuffer.isFull());
shuffleBuffer.append(createData(1));
assertEquals(133, shuffleBuffer.getSize());
- assertTrue(shuffleBuffer.isFull());
}
@Test
public void appendMultiBlocksTest() {
- ShuffleBuffer shuffleBuffer = new ShuffleBufferWithSkipList(100);
+ ShuffleBuffer shuffleBuffer = new ShuffleBufferWithSkipList();
ShufflePartitionedData data1 = createData(10);
ShufflePartitionedData data2 = createData(10);
ShufflePartitionedBlock[] dataCombine = new ShufflePartitionedBlock[2];
@@ -69,7 +65,7 @@ public class ShuffleBufferWithSkipListTest extends
BufferTestBase {
@Test
public void toFlushEventTest() {
- ShuffleBuffer shuffleBuffer = new ShuffleBufferWithSkipList(100);
+ ShuffleBuffer shuffleBuffer = new ShuffleBufferWithSkipList();
ShuffleDataFlushEvent event = shuffleBuffer.toFlushEvent("appId", 0, 0, 1,
null);
assertNull(event);
shuffleBuffer.append(createData(10));
@@ -83,7 +79,7 @@ public class ShuffleBufferWithSkipListTest extends
BufferTestBase {
@Test
public void getShuffleDataWithExpectedTaskIdsFilterTest() {
/** case1: all blocks in cached(or in flushed map) and size <
readBufferSize */
- ShuffleBuffer shuffleBuffer = new ShuffleBufferWithSkipList(100);
+ ShuffleBuffer shuffleBuffer = new ShuffleBufferWithSkipList();
ShufflePartitionedData spd1 = createData(1, 1, 15);
ShufflePartitionedData spd2 = createData(1, 0, 15);
ShufflePartitionedData spd3 = createData(1, 2, 55);