This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e54ce21 [#1019] test(tez) RssOrderedPartitionedKVOutputTest add close 
func unit test (#1025)
2e54ce21 is described below

commit 2e54ce212c957ece37beaa8dc3c06856c2aac583
Author: bin41215 <[email protected]>
AuthorDate: Thu Jul 20 19:24:03 2023 +0800

    [#1019] test(tez) RssOrderedPartitionedKVOutputTest add close func unit 
test (#1025)
    
    ### What changes were proposed in this pull request?
    
    tez-client, RssOrderedPartitionedKVOutputTest add close func unit test
    
    ### Why are the changes needed?
    
    tez-client, RssOrderedPartitionedKVOutputTest add close func unit test
    
    Fix: #1019
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    unit test.
---
 .../output/RssOrderedPartitionedKVOutputTest.java  | 54 ++++++++++++++++++++++
 1 file changed, 54 insertions(+)

diff --git 
a/client-tez/src/test/java/org/apache/tez/runtime/library/output/RssOrderedPartitionedKVOutputTest.java
 
b/client-tez/src/test/java/org/apache/tez/runtime/library/output/RssOrderedPartitionedKVOutputTest.java
index 1cb70b43..28b80658 100644
--- 
a/client-tez/src/test/java/org/apache/tez/runtime/library/output/RssOrderedPartitionedKVOutputTest.java
+++ 
b/client-tez/src/test/java/org/apache/tez/runtime/library/output/RssOrderedPartitionedKVOutputTest.java
@@ -30,7 +30,13 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.common.GetShuffleServerResponse;
+import org.apache.tez.common.ShuffleAssignmentsInfoWritable;
 import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezRemoteShuffleUmbilicalProtocol;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.runtime.api.Event;
@@ -41,14 +47,27 @@ import 
org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 
+import org.apache.uniffle.common.ShuffleAssignmentsInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
 
+import static 
org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS;
+import static org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT;
+import static 
org.apache.tez.common.RssTezConfig.RSS_SHUFFLE_DESTINATION_VERTEX_ID;
+import static org.apache.tez.common.RssTezConfig.RSS_SHUFFLE_SOURCE_VERTEX_ID;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 
 public class RssOrderedPartitionedKVOutputTest {
   private static Map<Integer, List<ShuffleServerInfo>> partitionToServers = 
new HashMap<>();
@@ -56,6 +75,7 @@ public class RssOrderedPartitionedKVOutputTest {
   private FileSystem localFs;
   private Path workingDir;
 
+  /** set up */
   @BeforeEach
   public void setup() throws IOException {
     conf = new Configuration();
@@ -70,6 +90,10 @@ public class RssOrderedPartitionedKVOutputTest {
     conf.set(
         TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, 
HashPartitioner.class.getName());
     conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, 
workingDir.toString());
+    conf.set(RSS_AM_SHUFFLE_MANAGER_ADDRESS, "localhost");
+    conf.setInt(RSS_AM_SHUFFLE_MANAGER_PORT, 0);
+    conf.setInt(RSS_SHUFFLE_SOURCE_VERTEX_ID, 0);
+    conf.setInt(RSS_SHUFFLE_DESTINATION_VERTEX_ID, 1);
   }
 
   @AfterEach
@@ -103,4 +127,34 @@ public class RssOrderedPartitionedKVOutputTest {
       assertTrue(emptyPartionsBitSet.get(i));
     }
   }
+
+  @Test
+  @Timeout(value = 8000, unit = TimeUnit.MILLISECONDS)
+  public void testClose() throws Exception {
+    try (MockedStatic<RPC> rpc = Mockito.mockStatic(RPC.class); ) {
+      TezRemoteShuffleUmbilicalProtocol protocol = 
mock(TezRemoteShuffleUmbilicalProtocol.class);
+      GetShuffleServerResponse response = new GetShuffleServerResponse();
+      ShuffleAssignmentsInfo shuffleAssignmentsInfo =
+          new ShuffleAssignmentsInfo(new HashMap(), new HashMap());
+      response.setShuffleAssignmentsInfoWritable(
+          new ShuffleAssignmentsInfoWritable(shuffleAssignmentsInfo));
+      doReturn(response).when(protocol).getShuffleAssignments(any());
+      rpc.when(() -> RPC.getProxy(any(), anyLong(), any(), 
any())).thenReturn(protocol);
+      try (MockedStatic<ConverterUtils> converterUtils = 
Mockito.mockStatic(ConverterUtils.class)) {
+        ContainerId containerId = 
ContainerId.newContainerId(OutputTestHelpers.APP_ATTEMPT_ID, 1);
+        converterUtils.when(() -> 
ConverterUtils.toContainerId(null)).thenReturn(containerId);
+        converterUtils
+            .when(() -> ConverterUtils.toContainerId(anyString()))
+            .thenReturn(containerId);
+        OutputContext outputContext = 
OutputTestHelpers.createOutputContext(conf, workingDir);
+        int numPartitions = 1;
+        RssOrderedPartitionedKVOutput output =
+            new RssOrderedPartitionedKVOutput(outputContext, numPartitions);
+        output.initialize();
+        output.start();
+        Assertions.assertNotNull(output.getWriter());
+        output.close();
+      }
+    }
+  }
 }

Reply via email to