This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 49effee1 [#1256] refactor: optimize collections contruction (#1257)
49effee1 is described below
commit 49effee17b833986feeb7e16eaa400a3b80e7c91
Author: summaryzb <[email protected]>
AuthorDate: Mon Oct 23 20:16:02 2023 +0800
[#1256] refactor: optimize collections contruction (#1257)
### What changes were proposed in this pull request?
1. Some empty collections are only for read, so replace it with
Collections.emptyList(), Collections.emptySet(), Collections.emptyMap().
2. In spark, writer client construct a new List prepared for
shuffleBlockInfos per record, But in most case there is no blocks produced.
3. Set proper initial value.
### Why are the changes needed?
https://github.com/apache/incubator-uniffle/issues/1256
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
unit test
---
.../org/apache/hadoop/mapred/SortWriteBufferManager.java | 3 ++-
.../hadoop/mapreduce/task/reduce/EventFetcherTest.java | 9 +++++++--
.../apache/spark/shuffle/writer/WriteBufferManager.java | 15 ++++++++++-----
.../java/org/apache/spark/shuffle/RssShuffleManager.java | 4 ++--
.../java/org/apache/spark/shuffle/RssShuffleManager.java | 4 ++--
.../library/common/sort/buffer/WriteBufferManager.java | 3 ++-
.../common/sort/buffer/WriteBufferManagerTest.java | 3 ++-
.../client/request/RssGetShuffleAssignmentsRequest.java | 4 ++--
.../org/apache/uniffle/server/ShuffleTaskManagerTest.java | 5 +++--
9 files changed, 32 insertions(+), 18 deletions(-)
diff --git
a/client-mr/core/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
b/client-mr/core/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
index afa80df7..d410fa3b 100644
---
a/client-mr/core/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
+++
b/client-mr/core/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
@@ -201,7 +202,7 @@ public class SortWriteBufferManager<K, V> {
}
private void sendBufferToServers(SortWriteBuffer<K, V> buffer) {
- List<ShuffleBlockInfo> shuffleBlocks = Lists.newArrayList();
+ List<ShuffleBlockInfo> shuffleBlocks = new ArrayList<>(1);
prepareBufferForSend(shuffleBlocks, buffer);
sendShuffleBlocks(shuffleBlocks);
}
diff --git
a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcherTest.java
b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcherTest.java
index bf7af9c1..8e8ab8f4 100644
---
a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcherTest.java
+++
b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcherTest.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.mapreduce.task.reduce;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Set;
import com.google.common.collect.Sets;
@@ -78,7 +79,11 @@ public class EventFetcherTest {
any(JobID.class), eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
.thenReturn(
getMockedCompletionEventsUpdate(
- 0, mapTaskNum, Sets.newHashSet(70, 80, 90), Sets.newHashSet(),
Sets.newHashSet()));
+ 0,
+ mapTaskNum,
+ Sets.newHashSet(70, 80, 90),
+ Collections.EMPTY_SET,
+ Collections.EMPTY_SET));
RssEventFetcher ef = new RssEventFetcher(1, tid, umbilical, jobConf,
MAX_EVENTS_TO_FETCH);
Roaring64NavigableMap expected = Roaring64NavigableMap.bitmapOf();
@@ -135,7 +140,7 @@ public class EventFetcherTest {
any(JobID.class), eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
.thenReturn(
getInconsistentCompletionEventsUpdate(
- 0, mapTaskNum, Sets.newHashSet(45, 67), Sets.newHashSet()));
+ 0, mapTaskNum, Sets.newHashSet(45, 67),
Collections.EMPTY_SET));
RssEventFetcher ef = new RssEventFetcher(1, tid, umbilical, jobConf,
MAX_EVENTS_TO_FETCH);
Roaring64NavigableMap expected = Roaring64NavigableMap.bitmapOf();
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index eaa3ff91..6e468cd2 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -18,6 +18,7 @@
package org.apache.spark.shuffle.writer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -169,15 +170,18 @@ public class WriteBufferManager extends MemoryConsumer {
public List<ShuffleBlockInfo> addPartitionData(
int partitionId, byte[] serializedData, int serializedDataLength, long
start) {
- List<ShuffleBlockInfo> candidateSendingBlocks =
+ List<ShuffleBlockInfo> singleOrEmptySendingBlocks =
insertIntoBuffer(partitionId, serializedData, serializedDataLength);
// check buffer size > spill threshold
if (usedBytes.get() - inSendListBytes.get() > spillSize) {
- candidateSendingBlocks.addAll(clear());
+ List<ShuffleBlockInfo> multiSendingBlocks = clear();
+ multiSendingBlocks.addAll(singleOrEmptySendingBlocks);
+ writeTime += System.currentTimeMillis() - start;
+ return multiSendingBlocks;
}
writeTime += System.currentTimeMillis() - start;
- return candidateSendingBlocks;
+ return singleOrEmptySendingBlocks;
}
/**
@@ -192,7 +196,6 @@ public class WriteBufferManager extends MemoryConsumer {
*/
private List<ShuffleBlockInfo> insertIntoBuffer(
int partitionId, byte[] serializedData, int serializedDataLength) {
- List<ShuffleBlockInfo> sentBlocks = new ArrayList<>();
long required = Math.max(bufferSegmentSize, serializedDataLength);
// Asking memory from task memory manager for the existing writer buffer,
// this may trigger current WriteBufferManager spill method, which will
@@ -213,6 +216,7 @@ public class WriteBufferManager extends MemoryConsumer {
WriterBuffer wb = buffers.get(partitionId);
wb.addRecord(serializedData, serializedDataLength);
if (wb.getMemoryUsed() > bufferSize) {
+ List<ShuffleBlockInfo> sentBlocks = new ArrayList<>(1);
sentBlocks.add(createShuffleBlock(partitionId, wb));
copyTime += wb.getCopyTime();
buffers.remove(partitionId);
@@ -228,6 +232,7 @@ public class WriteBufferManager extends MemoryConsumer {
+ wb.getDataLength()
+ "]");
}
+ return sentBlocks;
}
} else {
// The true of hasRequested means the former partitioned buffer has been
flushed, that is
@@ -242,7 +247,7 @@ public class WriteBufferManager extends MemoryConsumer {
wb.addRecord(serializedData, serializedDataLength);
buffers.put(partitionId, wb);
}
- return sentBlocks;
+ return Collections.emptyList();
}
public List<ShuffleBlockInfo> addRecord(int partitionId, Object key, Object
value) {
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index bf32826f..d1f1d9a6 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -585,7 +585,7 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
public Set<Long> getFailedBlockIds(String taskId) {
Set<Long> result = taskToFailedBlockIds.get(taskId);
if (result == null) {
- result = Sets.newHashSet();
+ result = Collections.emptySet();
}
return result;
}
@@ -593,7 +593,7 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
public Set<Long> getSuccessBlockIds(String taskId) {
Set<Long> result = taskToSuccessBlockIds.get(taskId);
if (result == null) {
- result = Sets.newHashSet();
+ result = Collections.emptySet();
}
return result;
}
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 36225fec..a917b456 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -875,7 +875,7 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
public Set<Long> getFailedBlockIds(String taskId) {
Set<Long> result = taskToFailedBlockIds.get(taskId);
if (result == null) {
- result = Sets.newHashSet();
+ result = Collections.emptySet();
}
return result;
}
@@ -883,7 +883,7 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
public Set<Long> getSuccessBlockIds(String taskId) {
Set<Long> result = taskToSuccessBlockIds.get(taskId);
if (result == null) {
- result = Sets.newHashSet();
+ result = Collections.emptySet();
}
return result;
}
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
index ad068613..af4d5cea 100644
---
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
@@ -18,6 +18,7 @@
package org.apache.tez.runtime.library.common.sort.buffer;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
@@ -208,7 +209,7 @@ public class WriteBufferManager<K, V> {
}
private void sendBufferToServers(WriteBuffer<K, V> buffer) {
- List<ShuffleBlockInfo> shuffleBlocks = Lists.newArrayList();
+ List<ShuffleBlockInfo> shuffleBlocks = new ArrayList<>(1);
prepareBufferForSend(shuffleBlocks, buffer);
sendShuffleBlocks(shuffleBlocks);
}
diff --git
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
index 1ebcc77a..bef29fa0 100644
---
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
+++
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.tez.runtime.library.common.sort.buffer;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -556,7 +557,7 @@ public class WriteBufferManagerTest {
for (ShuffleBlockInfo blockInfo : shuffleBlockInfoList) {
successBlockIds.add(blockInfo.getBlockId());
}
- return new SendShuffleDataResult(successBlockIds, Sets.newHashSet());
+ return new SendShuffleDataResult(successBlockIds,
Collections.EMPTY_SET);
}
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleAssignmentsRequest.java
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleAssignmentsRequest.java
index 5b1fa472..98fd0124 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleAssignmentsRequest.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleAssignmentsRequest.java
@@ -17,10 +17,10 @@
package org.apache.uniffle.client.request;
+import java.util.Collections;
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Sets;
public class RssGetShuffleAssignmentsRequest {
@@ -51,7 +51,7 @@ public class RssGetShuffleAssignmentsRequest {
requiredTags,
-1,
-1,
- Sets.newHashSet());
+ Collections.emptySet());
}
public RssGetShuffleAssignmentsRequest(
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 36a089b0..389b1365 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.server;
import java.io.File;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -576,7 +577,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
shuffleTaskManager.checkResourceStatus();
// wait resource delete
Thread.sleep(3000);
- assertEquals(Sets.newHashSet(), shuffleTaskManager.getAppIds());
+ assertEquals(Collections.EMPTY_SET, shuffleTaskManager.getAppIds());
assertTrue(shuffleTaskManager.getCachedBlockIds("clearTest1",
shuffleId).isEmpty());
}
@@ -627,7 +628,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
.start();
}
countDownLatch.await();
- assertEquals(Sets.newHashSet(), shuffleTaskManager.getAppIds());
+ assertEquals(Collections.EMPTY_SET, shuffleTaskManager.getAppIds());
assertTrue(shuffleTaskManager.getCachedBlockIds(appId,
shuffleId).isEmpty());
}