This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 8bc8b768b [MINOR] improvement(server): Avoid copy while toFlushEvent
invoked (#2177)
8bc8b768b is described below
commit 8bc8b768bb77421ea4d6f9b6b3227121fb07a0be
Author: maobaolong <[email protected]>
AuthorDate: Wed Oct 16 10:23:51 2024 +0800
[MINOR] improvement(server): Avoid copy while toFlushEvent invoked (#2177)
### What changes were proposed in this pull request?
Avoid copy while toFlushEvent invoked
### Why are the changes needed?
Avoid copy blocks from a Collection to another Collection.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs
---
.../org/apache/uniffle/server/ShuffleDataFlushEvent.java | 7 ++++---
.../org/apache/uniffle/server/ShuffleFlushManager.java | 4 ++--
.../server/buffer/ShuffleBufferWithLinkedList.java | 16 +++++++++-------
.../uniffle/server/buffer/ShuffleBufferWithSkipList.java | 4 ++--
.../apache/uniffle/server/ShuffleFlushManagerTest.java | 11 ++++++-----
.../server/buffer/ShuffleBufferWithLinkedListTest.java | 9 +++++----
.../uniffle/storage/handler/api/ShuffleWriteHandler.java | 4 ++--
.../storage/handler/impl/HadoopShuffleWriteHandler.java | 4 ++--
.../storage/handler/impl/LocalFileWriteHandler.java | 5 +++--
.../handler/impl/PooledHadoopShuffleWriteHandler.java | 4 ++--
.../impl/PooledHadoopShuffleWriteHandlerTest.java | 3 ++-
11 files changed, 39 insertions(+), 32 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
index 7ec882f3b..b1c889adb 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
@@ -18,6 +18,7 @@
package org.apache.uniffle.server;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
@@ -38,7 +39,7 @@ public class ShuffleDataFlushEvent {
private final int startPartition;
private final int endPartition;
private final long size;
- private final List<ShufflePartitionedBlock> shuffleBlocks;
+ private final Collection<ShufflePartitionedBlock> shuffleBlocks;
private final Supplier<Boolean> valid;
private final ShuffleBuffer shuffleBuffer;
private final AtomicInteger retryTimes = new AtomicInteger();
@@ -57,7 +58,7 @@ public class ShuffleDataFlushEvent {
int startPartition,
int endPartition,
long size,
- List<ShufflePartitionedBlock> shuffleBlocks,
+ Collection<ShufflePartitionedBlock> shuffleBlocks,
Supplier<Boolean> valid,
ShuffleBuffer shuffleBuffer) {
this.eventId = eventId;
@@ -72,7 +73,7 @@ public class ShuffleDataFlushEvent {
this.cleanupCallbackChains = new ArrayList<>();
}
- public List<ShufflePartitionedBlock> getShuffleBlocks() {
+ public Collection<ShufflePartitionedBlock> getShuffleBlocks() {
return shuffleBlocks;
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 625584d77..c1b759b66 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -123,7 +123,7 @@ public class ShuffleFlushManager {
throw new EventDiscardException();
}
- List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
+ Collection<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
if (CollectionUtils.isEmpty(blocks)) {
LOG.info("There is no block to be flushed: {}", event);
return;
@@ -236,7 +236,7 @@ public class ShuffleFlushManager {
}
private void updateCommittedBlockIds(
- String appId, int shuffleId, List<ShufflePartitionedBlock> blocks) {
+ String appId, int shuffleId, Collection<ShufflePartitionedBlock> blocks)
{
if (blocks == null || blocks.size() == 0) {
return;
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
index 7a95ac5d0..215ef2f7f 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
@@ -17,6 +17,7 @@
package org.apache.uniffle.server.buffer;
+import java.util.Collection;
import java.util.Comparator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
@@ -82,15 +83,16 @@ public class ShuffleBufferWithLinkedList extends
AbstractShuffleBuffer {
return null;
}
// buffer will be cleared, and new list must be created for async flush
- List<ShufflePartitionedBlock> spBlocks = new LinkedList<>(blocks);
- List<ShufflePartitionedBlock> inFlushedQueueBlocks = spBlocks;
+ Collection<ShufflePartitionedBlock> spBlocks = blocks;
+ Set<ShufflePartitionedBlock> inFlushedQueueBlocks = blocks;
if (dataDistributionType == ShuffleDataDistributionType.LOCAL_ORDER) {
/**
* When reordering the blocks, it will break down the original reads
sequence to cause the
* data lost in some cases. So we should create a reference copy to
avoid this.
*/
- inFlushedQueueBlocks = new LinkedList<>(spBlocks);
-
spBlocks.sort(Comparator.comparingLong(ShufflePartitionedBlock::getTaskAttemptId));
+ LinkedList<ShufflePartitionedBlock> orderedSpBlocks = new
LinkedList<>(blocks);
+
orderedSpBlocks.sort(Comparator.comparingLong(ShufflePartitionedBlock::getTaskAttemptId));
+ spBlocks = orderedSpBlocks;
}
long eventId = ShuffleFlushManager.ATOMIC_EVENT_ID.getAndIncrement();
final ShuffleDataFlushEvent event =
@@ -99,11 +101,11 @@ public class ShuffleBufferWithLinkedList extends
AbstractShuffleBuffer {
event.addCleanupCallback(
() -> {
this.clearInFlushBuffer(event.getEventId());
- spBlocks.forEach(spb -> spb.getData().release());
+ inFlushedQueueBlocks.forEach(spb -> spb.getData().release());
inFlushSize.addAndGet(-event.getSize());
});
- inFlushBlockMap.put(eventId, new LinkedHashSet<>(inFlushedQueueBlocks));
- blocks.clear();
+ inFlushBlockMap.put(eventId, inFlushedQueueBlocks);
+ blocks = new LinkedHashSet<>();
inFlushSize.addAndGet(size);
size = 0;
return event;
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
index d7a748903..414d9a66b 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
@@ -17,9 +17,9 @@
package org.apache.uniffle.server.buffer;
+import java.util.Collection;
import java.util.Comparator;
import java.util.LinkedHashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -91,7 +91,7 @@ public class ShuffleBufferWithSkipList extends
AbstractShuffleBuffer {
if (blocksMap.isEmpty()) {
return null;
}
- List<ShufflePartitionedBlock> spBlocks = new
LinkedList<>(blocksMap.values());
+ Collection<ShufflePartitionedBlock> spBlocks = blocksMap.values();
long eventId = ShuffleFlushManager.ATOMIC_EVENT_ID.getAndIncrement();
final ShuffleDataFlushEvent event =
new ShuffleDataFlushEvent(
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 1c47c853f..32531b876 100644
---
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -24,6 +24,7 @@ import java.nio.file.attribute.PosixFileAttributeView;
import java.nio.file.attribute.PosixFilePermission;
import java.time.Duration;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
@@ -247,13 +248,13 @@ public class ShuffleFlushManagerTest extends
HadoopTestBase {
ShuffleFlushManager manager =
new ShuffleFlushManager(shuffleServerConf, mockShuffleServer,
storageManager);
ShuffleDataFlushEvent event1 = createShuffleDataFlushEvent(appId, 1, 1, 1,
null);
- final List<ShufflePartitionedBlock> blocks1 = event1.getShuffleBlocks();
+ final Collection<ShufflePartitionedBlock> blocks1 =
event1.getShuffleBlocks();
manager.addToFlushQueue(event1);
ShuffleDataFlushEvent event21 = createShuffleDataFlushEvent(appId, 2, 2,
2, null);
- final List<ShufflePartitionedBlock> blocks21 = event21.getShuffleBlocks();
+ final Collection<ShufflePartitionedBlock> blocks21 =
event21.getShuffleBlocks();
manager.addToFlushQueue(event21);
ShuffleDataFlushEvent event22 = createShuffleDataFlushEvent(appId, 2, 2,
2, null);
- final List<ShufflePartitionedBlock> blocks22 = event22.getShuffleBlocks();
+ final Collection<ShufflePartitionedBlock> blocks22 =
event22.getShuffleBlocks();
manager.addToFlushQueue(event22);
// wait for write data
waitForFlush(manager, appId, 1, 5);
@@ -326,7 +327,7 @@ public class ShuffleFlushManagerTest extends HadoopTestBase
{
manager.addToFlushQueue(event1);
waitForFlush(manager, appId, 1, 5);
assertEquals(1, event1.getRetryTimes());
- List<ShufflePartitionedBlock> blocks1 = event1.getShuffleBlocks();
+ Collection<ShufflePartitionedBlock> blocks1 = event1.getShuffleBlocks();
assertEquals(blocks1.size(), manager.getCommittedBlockIds(appId,
1).getLongCardinality());
int maxRetryTimes = 5;
@@ -732,7 +733,7 @@ public class ShuffleFlushManagerTest extends HadoopTestBase
{
String appId,
int shuffleId,
int partitionId,
- List<ShufflePartitionedBlock> blocks,
+ Collection<ShufflePartitionedBlock> blocks,
int partitionNumPerRange,
String basePath) {
Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
diff --git
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
index 151232af2..31c4492d2 100644
---
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
@@ -17,6 +17,7 @@
package org.apache.uniffle.server.buffer;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@@ -224,10 +225,10 @@ public class ShuffleBufferWithLinkedListTest extends
BufferTestBase {
2,
1);
assertArrayEquals(expectedData, sdr.getData());
-
- assertEquals(0, event1.getShuffleBlocks().get(0).getTaskAttemptId());
- assertEquals(1, event1.getShuffleBlocks().get(1).getTaskAttemptId());
- assertEquals(2, event1.getShuffleBlocks().get(2).getTaskAttemptId());
+ Iterator<ShufflePartitionedBlock> it =
event1.getShuffleBlocks().iterator();
+ assertEquals(0, it.next().getTaskAttemptId());
+ assertEquals(1, it.next().getTaskAttemptId());
+ assertEquals(2, it.next().getTaskAttemptId());
assertEquals(
1,
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleWriteHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleWriteHandler.java
index b5b242e9b..2fd16deb8 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleWriteHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleWriteHandler.java
@@ -17,7 +17,7 @@
package org.apache.uniffle.storage.handler.api;
-import java.util.List;
+import java.util.Collection;
import org.apache.uniffle.common.ShufflePartitionedBlock;
@@ -29,5 +29,5 @@ public interface ShuffleWriteHandler {
* @param shuffleBlocks blocks to storage
* @throws Exception
*/
- void write(List<ShufflePartitionedBlock> shuffleBlocks) throws Exception;
+ void write(Collection<ShufflePartitionedBlock> shuffleBlocks) throws
Exception;
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleWriteHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleWriteHandler.java
index d6339975a..b5deb78b1 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleWriteHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleWriteHandler.java
@@ -18,7 +18,7 @@
package org.apache.uniffle.storage.handler.impl;
import java.io.IOException;
-import java.util.List;
+import java.util.Collection;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -109,7 +109,7 @@ public class HadoopShuffleWriteHandler implements
ShuffleWriteHandler {
}
@Override
- public void write(List<ShufflePartitionedBlock> shuffleBlocks) throws
Exception {
+ public void write(Collection<ShufflePartitionedBlock> shuffleBlocks) throws
Exception {
final long start = System.currentTimeMillis();
writeLock.lock();
try {
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
index a00e33d2f..9f9fd0c5d 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
@@ -20,7 +20,7 @@ package org.apache.uniffle.storage.handler.impl;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
-import java.util.List;
+import java.util.Collection;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
@@ -81,7 +81,8 @@ public class LocalFileWriteHandler implements
ShuffleWriteHandler {
}
@Override
- public synchronized void write(List<ShufflePartitionedBlock> shuffleBlocks)
throws Exception {
+ public synchronized void write(Collection<ShufflePartitionedBlock>
shuffleBlocks)
+ throws Exception {
// Ignore this write, if the shuffle directory is deleted after being
uploaded in multi mode
// or after its app heartbeat times out.
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandler.java
index cc9d24f18..f47bd2bce 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandler.java
@@ -17,7 +17,7 @@
package org.apache.uniffle.storage.handler.impl;
-import java.util.List;
+import java.util.Collection;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.function.Function;
@@ -105,7 +105,7 @@ public class PooledHadoopShuffleWriteHandler implements
ShuffleWriteHandler {
}
@Override
- public void write(List<ShufflePartitionedBlock> shuffleBlocks) throws
Exception {
+ public void write(Collection<ShufflePartitionedBlock> shuffleBlocks) throws
Exception {
if (queue.isEmpty() && initializedHandlerCnt < maxConcurrency) {
synchronized (this) {
if (initializedHandlerCnt < maxConcurrency) {
diff --git
a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandlerTest.java
b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandlerTest.java
index e561d6090..32e8786c8 100644
---
a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandlerTest.java
+++
b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandlerTest.java
@@ -17,6 +17,7 @@
package org.apache.uniffle.storage.handler.impl;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -56,7 +57,7 @@ public class PooledHadoopShuffleWriteHandlerTest {
}
@Override
- public void write(List<ShufflePartitionedBlock> shuffleBlocks) throws
Exception {
+ public void write(Collection<ShufflePartitionedBlock> shuffleBlocks)
throws Exception {
execution.run();
invokedList.add(index);
}