This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new bb61efbe [#886] fix(tez): Tez Client may lost data or throw exception 
when rss.storage.type without MEMORY. (#976)
bb61efbe is described below

commit bb61efbe9ff68df36832a28f8cdf83b105749d2a
Author: zhengchenyu <[email protected]>
AuthorDate: Tue Jun 27 17:13:03 2023 +0800

    [#886] fix(tez): Tez Client may lost data or throw exception when 
rss.storage.type without MEMORY. (#976)
---
 .../library/common/sort/buffer/WriteBufferManager.java       | 12 ++++++------
 .../library/common/sort/buffer/WriteBufferManagerTest.java   |  4 ++--
 2 files changed, 8 insertions(+), 8 deletions(-)

diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
index 96fa6f29..c465034e 100644
--- 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
@@ -276,12 +276,6 @@ public class WriteBufferManager<K,V> {
       sendBuffersToServers();
     }
     long start = System.currentTimeMillis();
-    long commitDuration = 0;
-    if (!isMemoryShuffleEnabled) {
-      long s = System.currentTimeMillis();
-      sendCommit();
-      commitDuration = System.currentTimeMillis() - s;
-    }
     while (true) {
       if (failedBlockIds.size() > 0) {
         String errorMsg = "Send failed: failed because " + 
failedBlockIds.size()
@@ -302,6 +296,12 @@ public class WriteBufferManager<K,V> {
         throw new RssException(errorMsg);
       }
     }
+    long commitDuration = 0;
+    if (!isMemoryShuffleEnabled) {
+      long s = System.currentTimeMillis();
+      sendCommit();
+      commitDuration = System.currentTimeMillis() - s;
+    }
     start = System.currentTimeMillis();
     TezVertexID tezVertexID = tezTaskAttemptID.getTaskID().getVertexID();
     TezDAGID tezDAGID = tezVertexID.getDAGId();
diff --git 
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
 
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
index ce3a8a48..2b581656 100644
--- 
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
+++ 
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
@@ -195,7 +195,6 @@ public class WriteBufferManagerTest {
     double sendThreshold = 0.2f;
     int batch = 50;
     int numMaps = 1;
-    String storageType = "MEMORY";
     RssConf rssConf = new RssConf();
     Map<Integer, List<ShuffleServerInfo>> partitionToServers = new HashMap<>();
     long sendCheckInterval = 500L;
@@ -209,7 +208,7 @@ public class WriteBufferManagerTest {
         comparator, maxSegmentSize, keySerializer,
         valSerializer, maxBufferSize, memoryThreshold,
         sendThreshold, batch, rssConf, partitionToServers,
-        numMaps, isMemoryShuffleEnabled(storageType),
+        numMaps, false,
         sendCheckInterval, sendCheckTimeout, bitmapSplitNum, shuffleId, true);
 
     Random random = new Random();
@@ -221,6 +220,7 @@ public class WriteBufferManagerTest {
       int partitionId = random.nextInt(50);
       bufferManager.addRecord(partitionId, new BytesWritable(key), new 
BytesWritable(value));
     }
+    bufferManager.waitSendFinished();
 
     assertTrue(bufferManager.getWaitSendBuffers().isEmpty());
     assertEquals(writeClient.mockedShuffleServer.getFinishBlockSize(),

Reply via email to