This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 8bc8b768b [MINOR] improvement(server): Avoid copy while toFlushEvent 
invoked (#2177)
8bc8b768b is described below

commit 8bc8b768bb77421ea4d6f9b6b3227121fb07a0be
Author: maobaolong <[email protected]>
AuthorDate: Wed Oct 16 10:23:51 2024 +0800

    [MINOR] improvement(server): Avoid copy while toFlushEvent invoked (#2177)
    
    ### What changes were proposed in this pull request?
    
    Avoid copy while toFlushEvent invoked
    
    ### Why are the changes needed?
    
    Avoid copy blocks from a  Collection to another Collection.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing UTs
---
 .../org/apache/uniffle/server/ShuffleDataFlushEvent.java |  7 ++++---
 .../org/apache/uniffle/server/ShuffleFlushManager.java   |  4 ++--
 .../server/buffer/ShuffleBufferWithLinkedList.java       | 16 +++++++++-------
 .../uniffle/server/buffer/ShuffleBufferWithSkipList.java |  4 ++--
 .../apache/uniffle/server/ShuffleFlushManagerTest.java   | 11 ++++++-----
 .../server/buffer/ShuffleBufferWithLinkedListTest.java   |  9 +++++----
 .../uniffle/storage/handler/api/ShuffleWriteHandler.java |  4 ++--
 .../storage/handler/impl/HadoopShuffleWriteHandler.java  |  4 ++--
 .../storage/handler/impl/LocalFileWriteHandler.java      |  5 +++--
 .../handler/impl/PooledHadoopShuffleWriteHandler.java    |  4 ++--
 .../impl/PooledHadoopShuffleWriteHandlerTest.java        |  3 ++-
 11 files changed, 39 insertions(+), 32 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
index 7ec882f3b..b1c889adb 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
@@ -18,6 +18,7 @@
 package org.apache.uniffle.server;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
@@ -38,7 +39,7 @@ public class ShuffleDataFlushEvent {
   private final int startPartition;
   private final int endPartition;
   private final long size;
-  private final List<ShufflePartitionedBlock> shuffleBlocks;
+  private final Collection<ShufflePartitionedBlock> shuffleBlocks;
   private final Supplier<Boolean> valid;
   private final ShuffleBuffer shuffleBuffer;
   private final AtomicInteger retryTimes = new AtomicInteger();
@@ -57,7 +58,7 @@ public class ShuffleDataFlushEvent {
       int startPartition,
       int endPartition,
       long size,
-      List<ShufflePartitionedBlock> shuffleBlocks,
+      Collection<ShufflePartitionedBlock> shuffleBlocks,
       Supplier<Boolean> valid,
       ShuffleBuffer shuffleBuffer) {
     this.eventId = eventId;
@@ -72,7 +73,7 @@ public class ShuffleDataFlushEvent {
     this.cleanupCallbackChains = new ArrayList<>();
   }
 
-  public List<ShufflePartitionedBlock> getShuffleBlocks() {
+  public Collection<ShufflePartitionedBlock> getShuffleBlocks() {
     return shuffleBlocks;
   }
 
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 625584d77..c1b759b66 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -123,7 +123,7 @@ public class ShuffleFlushManager {
         throw new EventDiscardException();
       }
 
-      List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
+      Collection<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
       if (CollectionUtils.isEmpty(blocks)) {
         LOG.info("There is no block to be flushed: {}", event);
         return;
@@ -236,7 +236,7 @@ public class ShuffleFlushManager {
   }
 
   private void updateCommittedBlockIds(
-      String appId, int shuffleId, List<ShufflePartitionedBlock> blocks) {
+      String appId, int shuffleId, Collection<ShufflePartitionedBlock> blocks) 
{
     if (blocks == null || blocks.size() == 0) {
       return;
     }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
index 7a95ac5d0..215ef2f7f 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
@@ -17,6 +17,7 @@
 
 package org.apache.uniffle.server.buffer;
 
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
@@ -82,15 +83,16 @@ public class ShuffleBufferWithLinkedList extends 
AbstractShuffleBuffer {
       return null;
     }
     // buffer will be cleared, and new list must be created for async flush
-    List<ShufflePartitionedBlock> spBlocks = new LinkedList<>(blocks);
-    List<ShufflePartitionedBlock> inFlushedQueueBlocks = spBlocks;
+    Collection<ShufflePartitionedBlock> spBlocks = blocks;
+    Set<ShufflePartitionedBlock> inFlushedQueueBlocks = blocks;
     if (dataDistributionType == ShuffleDataDistributionType.LOCAL_ORDER) {
       /**
        * When reordering the blocks, it will break down the original reads 
sequence to cause the
        * data lost in some cases. So we should create a reference copy to 
avoid this.
        */
-      inFlushedQueueBlocks = new LinkedList<>(spBlocks);
-      
spBlocks.sort(Comparator.comparingLong(ShufflePartitionedBlock::getTaskAttemptId));
+      LinkedList<ShufflePartitionedBlock> orderedSpBlocks = new 
LinkedList<>(blocks);
+      
orderedSpBlocks.sort(Comparator.comparingLong(ShufflePartitionedBlock::getTaskAttemptId));
+      spBlocks = orderedSpBlocks;
     }
     long eventId = ShuffleFlushManager.ATOMIC_EVENT_ID.getAndIncrement();
     final ShuffleDataFlushEvent event =
@@ -99,11 +101,11 @@ public class ShuffleBufferWithLinkedList extends 
AbstractShuffleBuffer {
     event.addCleanupCallback(
         () -> {
           this.clearInFlushBuffer(event.getEventId());
-          spBlocks.forEach(spb -> spb.getData().release());
+          inFlushedQueueBlocks.forEach(spb -> spb.getData().release());
           inFlushSize.addAndGet(-event.getSize());
         });
-    inFlushBlockMap.put(eventId, new LinkedHashSet<>(inFlushedQueueBlocks));
-    blocks.clear();
+    inFlushBlockMap.put(eventId, inFlushedQueueBlocks);
+    blocks = new LinkedHashSet<>();
     inFlushSize.addAndGet(size);
     size = 0;
     return event;
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
index d7a748903..414d9a66b 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
@@ -17,9 +17,9 @@
 
 package org.apache.uniffle.server.buffer;
 
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.LinkedHashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -91,7 +91,7 @@ public class ShuffleBufferWithSkipList extends 
AbstractShuffleBuffer {
     if (blocksMap.isEmpty()) {
       return null;
     }
-    List<ShufflePartitionedBlock> spBlocks = new 
LinkedList<>(blocksMap.values());
+    Collection<ShufflePartitionedBlock> spBlocks = blocksMap.values();
     long eventId = ShuffleFlushManager.ATOMIC_EVENT_ID.getAndIncrement();
     final ShuffleDataFlushEvent event =
         new ShuffleDataFlushEvent(
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 1c47c853f..32531b876 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -24,6 +24,7 @@ import java.nio.file.attribute.PosixFileAttributeView;
 import java.nio.file.attribute.PosixFilePermission;
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
@@ -247,13 +248,13 @@ public class ShuffleFlushManagerTest extends 
HadoopTestBase {
     ShuffleFlushManager manager =
         new ShuffleFlushManager(shuffleServerConf, mockShuffleServer, 
storageManager);
     ShuffleDataFlushEvent event1 = createShuffleDataFlushEvent(appId, 1, 1, 1, 
null);
-    final List<ShufflePartitionedBlock> blocks1 = event1.getShuffleBlocks();
+    final Collection<ShufflePartitionedBlock> blocks1 = 
event1.getShuffleBlocks();
     manager.addToFlushQueue(event1);
     ShuffleDataFlushEvent event21 = createShuffleDataFlushEvent(appId, 2, 2, 
2, null);
-    final List<ShufflePartitionedBlock> blocks21 = event21.getShuffleBlocks();
+    final Collection<ShufflePartitionedBlock> blocks21 = 
event21.getShuffleBlocks();
     manager.addToFlushQueue(event21);
     ShuffleDataFlushEvent event22 = createShuffleDataFlushEvent(appId, 2, 2, 
2, null);
-    final List<ShufflePartitionedBlock> blocks22 = event22.getShuffleBlocks();
+    final Collection<ShufflePartitionedBlock> blocks22 = 
event22.getShuffleBlocks();
     manager.addToFlushQueue(event22);
     // wait for write data
     waitForFlush(manager, appId, 1, 5);
@@ -326,7 +327,7 @@ public class ShuffleFlushManagerTest extends HadoopTestBase 
{
     manager.addToFlushQueue(event1);
     waitForFlush(manager, appId, 1, 5);
     assertEquals(1, event1.getRetryTimes());
-    List<ShufflePartitionedBlock> blocks1 = event1.getShuffleBlocks();
+    Collection<ShufflePartitionedBlock> blocks1 = event1.getShuffleBlocks();
     assertEquals(blocks1.size(), manager.getCommittedBlockIds(appId, 
1).getLongCardinality());
 
     int maxRetryTimes = 5;
@@ -732,7 +733,7 @@ public class ShuffleFlushManagerTest extends HadoopTestBase 
{
       String appId,
       int shuffleId,
       int partitionId,
-      List<ShufflePartitionedBlock> blocks,
+      Collection<ShufflePartitionedBlock> blocks,
       int partitionNumPerRange,
       String basePath) {
     Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
diff --git 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
index 151232af2..31c4492d2 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.uniffle.server.buffer;
 
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -224,10 +225,10 @@ public class ShuffleBufferWithLinkedListTest extends 
BufferTestBase {
         2,
         1);
     assertArrayEquals(expectedData, sdr.getData());
-
-    assertEquals(0, event1.getShuffleBlocks().get(0).getTaskAttemptId());
-    assertEquals(1, event1.getShuffleBlocks().get(1).getTaskAttemptId());
-    assertEquals(2, event1.getShuffleBlocks().get(2).getTaskAttemptId());
+    Iterator<ShufflePartitionedBlock> it = 
event1.getShuffleBlocks().iterator();
+    assertEquals(0, it.next().getTaskAttemptId());
+    assertEquals(1, it.next().getTaskAttemptId());
+    assertEquals(2, it.next().getTaskAttemptId());
 
     assertEquals(
         1,
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleWriteHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleWriteHandler.java
index b5b242e9b..2fd16deb8 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleWriteHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleWriteHandler.java
@@ -17,7 +17,7 @@
 
 package org.apache.uniffle.storage.handler.api;
 
-import java.util.List;
+import java.util.Collection;
 
 import org.apache.uniffle.common.ShufflePartitionedBlock;
 
@@ -29,5 +29,5 @@ public interface ShuffleWriteHandler {
    * @param shuffleBlocks blocks to storage
    * @throws Exception
    */
-  void write(List<ShufflePartitionedBlock> shuffleBlocks) throws Exception;
+  void write(Collection<ShufflePartitionedBlock> shuffleBlocks) throws 
Exception;
 }
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleWriteHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleWriteHandler.java
index d6339975a..b5deb78b1 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleWriteHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleWriteHandler.java
@@ -18,7 +18,7 @@
 package org.apache.uniffle.storage.handler.impl;
 
 import java.io.IOException;
-import java.util.List;
+import java.util.Collection;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -109,7 +109,7 @@ public class HadoopShuffleWriteHandler implements 
ShuffleWriteHandler {
   }
 
   @Override
-  public void write(List<ShufflePartitionedBlock> shuffleBlocks) throws 
Exception {
+  public void write(Collection<ShufflePartitionedBlock> shuffleBlocks) throws 
Exception {
     final long start = System.currentTimeMillis();
     writeLock.lock();
     try {
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
index a00e33d2f..9f9fd0c5d 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
@@ -20,7 +20,7 @@ package org.apache.uniffle.storage.handler.impl;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
-import java.util.List;
+import java.util.Collection;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
@@ -81,7 +81,8 @@ public class LocalFileWriteHandler implements 
ShuffleWriteHandler {
   }
 
   @Override
-  public synchronized void write(List<ShufflePartitionedBlock> shuffleBlocks) 
throws Exception {
+  public synchronized void write(Collection<ShufflePartitionedBlock> 
shuffleBlocks)
+      throws Exception {
 
     // Ignore this write, if the shuffle directory is deleted after being 
uploaded in multi mode
     // or after its app heartbeat times out.
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandler.java
index cc9d24f18..f47bd2bce 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandler.java
@@ -17,7 +17,7 @@
 
 package org.apache.uniffle.storage.handler.impl;
 
-import java.util.List;
+import java.util.Collection;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.function.Function;
 
@@ -105,7 +105,7 @@ public class PooledHadoopShuffleWriteHandler implements 
ShuffleWriteHandler {
   }
 
   @Override
-  public void write(List<ShufflePartitionedBlock> shuffleBlocks) throws 
Exception {
+  public void write(Collection<ShufflePartitionedBlock> shuffleBlocks) throws 
Exception {
     if (queue.isEmpty() && initializedHandlerCnt < maxConcurrency) {
       synchronized (this) {
         if (initializedHandlerCnt < maxConcurrency) {
diff --git 
a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandlerTest.java
 
b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandlerTest.java
index e561d6090..32e8786c8 100644
--- 
a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandlerTest.java
+++ 
b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandlerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.uniffle.storage.handler.impl;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -56,7 +57,7 @@ public class PooledHadoopShuffleWriteHandlerTest {
     }
 
     @Override
-    public void write(List<ShufflePartitionedBlock> shuffleBlocks) throws 
Exception {
+    public void write(Collection<ShufflePartitionedBlock> shuffleBlocks) 
throws Exception {
       execution.run();
       invokedList.add(index);
     }

Reply via email to