This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new bb61efbe [#886] fix(tez): Tez Client may lost data or throw exception
when rss.storage.type without MEMORY. (#976)
bb61efbe is described below
commit bb61efbe9ff68df36832a28f8cdf83b105749d2a
Author: zhengchenyu <[email protected]>
AuthorDate: Tue Jun 27 17:13:03 2023 +0800
[#886] fix(tez): Tez Client may lost data or throw exception when
rss.storage.type without MEMORY. (#976)
---
.../library/common/sort/buffer/WriteBufferManager.java | 12 ++++++------
.../library/common/sort/buffer/WriteBufferManagerTest.java | 4 ++--
2 files changed, 8 insertions(+), 8 deletions(-)
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
index 96fa6f29..c465034e 100644
---
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
@@ -276,12 +276,6 @@ public class WriteBufferManager<K,V> {
sendBuffersToServers();
}
long start = System.currentTimeMillis();
- long commitDuration = 0;
- if (!isMemoryShuffleEnabled) {
- long s = System.currentTimeMillis();
- sendCommit();
- commitDuration = System.currentTimeMillis() - s;
- }
while (true) {
if (failedBlockIds.size() > 0) {
String errorMsg = "Send failed: failed because " +
failedBlockIds.size()
@@ -302,6 +296,12 @@ public class WriteBufferManager<K,V> {
throw new RssException(errorMsg);
}
}
+ long commitDuration = 0;
+ if (!isMemoryShuffleEnabled) {
+ long s = System.currentTimeMillis();
+ sendCommit();
+ commitDuration = System.currentTimeMillis() - s;
+ }
start = System.currentTimeMillis();
TezVertexID tezVertexID = tezTaskAttemptID.getTaskID().getVertexID();
TezDAGID tezDAGID = tezVertexID.getDAGId();
diff --git
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
index ce3a8a48..2b581656 100644
---
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
+++
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
@@ -195,7 +195,6 @@ public class WriteBufferManagerTest {
double sendThreshold = 0.2f;
int batch = 50;
int numMaps = 1;
- String storageType = "MEMORY";
RssConf rssConf = new RssConf();
Map<Integer, List<ShuffleServerInfo>> partitionToServers = new HashMap<>();
long sendCheckInterval = 500L;
@@ -209,7 +208,7 @@ public class WriteBufferManagerTest {
comparator, maxSegmentSize, keySerializer,
valSerializer, maxBufferSize, memoryThreshold,
sendThreshold, batch, rssConf, partitionToServers,
- numMaps, isMemoryShuffleEnabled(storageType),
+ numMaps, false,
sendCheckInterval, sendCheckTimeout, bitmapSplitNum, shuffleId, true);
Random random = new Random();
@@ -221,6 +220,7 @@ public class WriteBufferManagerTest {
int partitionId = random.nextInt(50);
bufferManager.addRecord(partitionId, new BytesWritable(key), new
BytesWritable(value));
}
+ bufferManager.waitSendFinished();
assertTrue(bufferManager.getWaitSendBuffers().isEmpty());
assertEquals(writeClient.mockedShuffleServer.getFinishBlockSize(),