This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 49effee1 [#1256] refactor: optimize collections contruction (#1257)
49effee1 is described below

commit 49effee17b833986feeb7e16eaa400a3b80e7c91
Author: summaryzb <[email protected]>
AuthorDate: Mon Oct 23 20:16:02 2023 +0800

    [#1256] refactor: optimize collections contruction (#1257)
    
    ### What changes were proposed in this pull request?
    1. Some empty collections are only for read, so replace it with 
Collections.emptyList(), Collections.emptySet(), Collections.emptyMap().
    2. In spark, writer client construct a new List prepared for 
shuffleBlockInfos per record, But in most case there is no blocks produced.
    3. Set proper initial value.
    
    ### Why are the changes needed?
    https://github.com/apache/incubator-uniffle/issues/1256
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    unit test
---
 .../org/apache/hadoop/mapred/SortWriteBufferManager.java  |  3 ++-
 .../hadoop/mapreduce/task/reduce/EventFetcherTest.java    |  9 +++++++--
 .../apache/spark/shuffle/writer/WriteBufferManager.java   | 15 ++++++++++-----
 .../java/org/apache/spark/shuffle/RssShuffleManager.java  |  4 ++--
 .../java/org/apache/spark/shuffle/RssShuffleManager.java  |  4 ++--
 .../library/common/sort/buffer/WriteBufferManager.java    |  3 ++-
 .../common/sort/buffer/WriteBufferManagerTest.java        |  3 ++-
 .../client/request/RssGetShuffleAssignmentsRequest.java   |  4 ++--
 .../org/apache/uniffle/server/ShuffleTaskManagerTest.java |  5 +++--
 9 files changed, 32 insertions(+), 18 deletions(-)

diff --git 
a/client-mr/core/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
 
b/client-mr/core/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
index afa80df7..d410fa3b 100644
--- 
a/client-mr/core/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
+++ 
b/client-mr/core/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
@@ -201,7 +202,7 @@ public class SortWriteBufferManager<K, V> {
   }
 
   private void sendBufferToServers(SortWriteBuffer<K, V> buffer) {
-    List<ShuffleBlockInfo> shuffleBlocks = Lists.newArrayList();
+    List<ShuffleBlockInfo> shuffleBlocks = new ArrayList<>(1);
     prepareBufferForSend(shuffleBlocks, buffer);
     sendShuffleBlocks(shuffleBlocks);
   }
diff --git 
a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcherTest.java
 
b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcherTest.java
index bf7af9c1..8e8ab8f4 100644
--- 
a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcherTest.java
+++ 
b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcherTest.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.mapreduce.task.reduce;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Set;
 
 import com.google.common.collect.Sets;
@@ -78,7 +79,11 @@ public class EventFetcherTest {
             any(JobID.class), eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
         .thenReturn(
             getMockedCompletionEventsUpdate(
-                0, mapTaskNum, Sets.newHashSet(70, 80, 90), Sets.newHashSet(), 
Sets.newHashSet()));
+                0,
+                mapTaskNum,
+                Sets.newHashSet(70, 80, 90),
+                Collections.EMPTY_SET,
+                Collections.EMPTY_SET));
 
     RssEventFetcher ef = new RssEventFetcher(1, tid, umbilical, jobConf, 
MAX_EVENTS_TO_FETCH);
     Roaring64NavigableMap expected = Roaring64NavigableMap.bitmapOf();
@@ -135,7 +140,7 @@ public class EventFetcherTest {
             any(JobID.class), eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
         .thenReturn(
             getInconsistentCompletionEventsUpdate(
-                0, mapTaskNum, Sets.newHashSet(45, 67), Sets.newHashSet()));
+                0, mapTaskNum, Sets.newHashSet(45, 67), 
Collections.EMPTY_SET));
 
     RssEventFetcher ef = new RssEventFetcher(1, tid, umbilical, jobConf, 
MAX_EVENTS_TO_FETCH);
     Roaring64NavigableMap expected = Roaring64NavigableMap.bitmapOf();
diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index eaa3ff91..6e468cd2 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -18,6 +18,7 @@
 package org.apache.spark.shuffle.writer;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -169,15 +170,18 @@ public class WriteBufferManager extends MemoryConsumer {
 
   public List<ShuffleBlockInfo> addPartitionData(
       int partitionId, byte[] serializedData, int serializedDataLength, long 
start) {
-    List<ShuffleBlockInfo> candidateSendingBlocks =
+    List<ShuffleBlockInfo> singleOrEmptySendingBlocks =
         insertIntoBuffer(partitionId, serializedData, serializedDataLength);
 
     // check buffer size > spill threshold
     if (usedBytes.get() - inSendListBytes.get() > spillSize) {
-      candidateSendingBlocks.addAll(clear());
+      List<ShuffleBlockInfo> multiSendingBlocks = clear();
+      multiSendingBlocks.addAll(singleOrEmptySendingBlocks);
+      writeTime += System.currentTimeMillis() - start;
+      return multiSendingBlocks;
     }
     writeTime += System.currentTimeMillis() - start;
-    return candidateSendingBlocks;
+    return singleOrEmptySendingBlocks;
   }
 
   /**
@@ -192,7 +196,6 @@ public class WriteBufferManager extends MemoryConsumer {
    */
   private List<ShuffleBlockInfo> insertIntoBuffer(
       int partitionId, byte[] serializedData, int serializedDataLength) {
-    List<ShuffleBlockInfo> sentBlocks = new ArrayList<>();
     long required = Math.max(bufferSegmentSize, serializedDataLength);
     // Asking memory from task memory manager for the existing writer buffer,
     // this may trigger current WriteBufferManager spill method, which will
@@ -213,6 +216,7 @@ public class WriteBufferManager extends MemoryConsumer {
       WriterBuffer wb = buffers.get(partitionId);
       wb.addRecord(serializedData, serializedDataLength);
       if (wb.getMemoryUsed() > bufferSize) {
+        List<ShuffleBlockInfo> sentBlocks = new ArrayList<>(1);
         sentBlocks.add(createShuffleBlock(partitionId, wb));
         copyTime += wb.getCopyTime();
         buffers.remove(partitionId);
@@ -228,6 +232,7 @@ public class WriteBufferManager extends MemoryConsumer {
                   + wb.getDataLength()
                   + "]");
         }
+        return sentBlocks;
       }
     } else {
       // The true of hasRequested means the former partitioned buffer has been 
flushed, that is
@@ -242,7 +247,7 @@ public class WriteBufferManager extends MemoryConsumer {
       wb.addRecord(serializedData, serializedDataLength);
       buffers.put(partitionId, wb);
     }
-    return sentBlocks;
+    return Collections.emptyList();
   }
 
   public List<ShuffleBlockInfo> addRecord(int partitionId, Object key, Object 
value) {
diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index bf32826f..d1f1d9a6 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -585,7 +585,7 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
   public Set<Long> getFailedBlockIds(String taskId) {
     Set<Long> result = taskToFailedBlockIds.get(taskId);
     if (result == null) {
-      result = Sets.newHashSet();
+      result = Collections.emptySet();
     }
     return result;
   }
@@ -593,7 +593,7 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
   public Set<Long> getSuccessBlockIds(String taskId) {
     Set<Long> result = taskToSuccessBlockIds.get(taskId);
     if (result == null) {
-      result = Sets.newHashSet();
+      result = Collections.emptySet();
     }
     return result;
   }
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 36225fec..a917b456 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -875,7 +875,7 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
   public Set<Long> getFailedBlockIds(String taskId) {
     Set<Long> result = taskToFailedBlockIds.get(taskId);
     if (result == null) {
-      result = Sets.newHashSet();
+      result = Collections.emptySet();
     }
     return result;
   }
@@ -883,7 +883,7 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
   public Set<Long> getSuccessBlockIds(String taskId) {
     Set<Long> result = taskToSuccessBlockIds.get(taskId);
     if (result == null) {
-      result = Sets.newHashSet();
+      result = Collections.emptySet();
     }
     return result;
   }
diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
index ad068613..af4d5cea 100644
--- 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
@@ -18,6 +18,7 @@
 package org.apache.tez.runtime.library.common.sort.buffer;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
@@ -208,7 +209,7 @@ public class WriteBufferManager<K, V> {
   }
 
   private void sendBufferToServers(WriteBuffer<K, V> buffer) {
-    List<ShuffleBlockInfo> shuffleBlocks = Lists.newArrayList();
+    List<ShuffleBlockInfo> shuffleBlocks = new ArrayList<>(1);
     prepareBufferForSend(shuffleBlocks, buffer);
     sendShuffleBlocks(shuffleBlocks);
   }
diff --git 
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
 
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
index 1ebcc77a..bef29fa0 100644
--- 
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
+++ 
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.tez.runtime.library.common.sort.buffer;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -556,7 +557,7 @@ public class WriteBufferManagerTest {
         for (ShuffleBlockInfo blockInfo : shuffleBlockInfoList) {
           successBlockIds.add(blockInfo.getBlockId());
         }
-        return new SendShuffleDataResult(successBlockIds, Sets.newHashSet());
+        return new SendShuffleDataResult(successBlockIds, 
Collections.EMPTY_SET);
       }
     }
 
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleAssignmentsRequest.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleAssignmentsRequest.java
index 5b1fa472..98fd0124 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleAssignmentsRequest.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleAssignmentsRequest.java
@@ -17,10 +17,10 @@
 
 package org.apache.uniffle.client.request;
 
+import java.util.Collections;
 import java.util.Set;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Sets;
 
 public class RssGetShuffleAssignmentsRequest {
 
@@ -51,7 +51,7 @@ public class RssGetShuffleAssignmentsRequest {
         requiredTags,
         -1,
         -1,
-        Sets.newHashSet());
+        Collections.emptySet());
   }
 
   public RssGetShuffleAssignmentsRequest(
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 36a089b0..389b1365 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.server;
 
 import java.io.File;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -576,7 +577,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
     shuffleTaskManager.checkResourceStatus();
     // wait resource delete
     Thread.sleep(3000);
-    assertEquals(Sets.newHashSet(), shuffleTaskManager.getAppIds());
+    assertEquals(Collections.EMPTY_SET, shuffleTaskManager.getAppIds());
     assertTrue(shuffleTaskManager.getCachedBlockIds("clearTest1", 
shuffleId).isEmpty());
   }
 
@@ -627,7 +628,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
           .start();
     }
     countDownLatch.await();
-    assertEquals(Sets.newHashSet(), shuffleTaskManager.getAppIds());
+    assertEquals(Collections.EMPTY_SET, shuffleTaskManager.getAppIds());
     assertTrue(shuffleTaskManager.getCachedBlockIds(appId, 
shuffleId).isEmpty());
   }
 

Reply via email to