This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fb2a9a6 [#1018] test(tez) RssUnorderedPartitionedKVOutputTest add 
close func unit test (#1034)
6fb2a9a6 is described below

commit 6fb2a9a63132ab9abbde2f1f9240f9caf2f5d0f0
Author: bin41215 <[email protected]>
AuthorDate: Tue Jul 25 07:20:37 2023 +0800

    [#1018] test(tez) RssUnorderedPartitionedKVOutputTest add close func unit 
test (#1034)
    
    ### What changes were proposed in this pull request?
    
    tez-client, RssUnorderedPartitionedKVOutputTest add close func unit test
    
    ### Why are the changes needed?
    
    tez-client, RssUnorderedPartitionedKVOutputTest add close func unit test
    
    Fix: #1018
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    unit test.
---
 .../RssUnorderedPartitionedKVOutputTest.java       | 54 ++++++++++++++++++++++
 1 file changed, 54 insertions(+)

diff --git 
a/client-tez/src/test/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutputTest.java
 
b/client-tez/src/test/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutputTest.java
index c5389423..8cb5c7d6 100644
--- 
a/client-tez/src/test/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutputTest.java
+++ 
b/client-tez/src/test/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutputTest.java
@@ -30,7 +30,13 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.common.GetShuffleServerResponse;
+import org.apache.tez.common.ShuffleAssignmentsInfoWritable;
 import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezRemoteShuffleUmbilicalProtocol;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.runtime.api.Event;
@@ -41,14 +47,27 @@ import 
org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 
+import org.apache.uniffle.common.ShuffleAssignmentsInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
 
+import static 
org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS;
+import static org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT;
+import static 
org.apache.tez.common.RssTezConfig.RSS_SHUFFLE_DESTINATION_VERTEX_ID;
+import static org.apache.tez.common.RssTezConfig.RSS_SHUFFLE_SOURCE_VERTEX_ID;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 
 public class RssUnorderedPartitionedKVOutputTest {
   private static Map<Integer, List<ShuffleServerInfo>> partitionToServers = 
new HashMap<>();
@@ -56,6 +75,7 @@ public class RssUnorderedPartitionedKVOutputTest {
   private FileSystem localFs;
   private Path workingDir;
 
+  /** set up */
   @BeforeEach
   public void setup() throws IOException {
     conf = new Configuration();
@@ -70,6 +90,10 @@ public class RssUnorderedPartitionedKVOutputTest {
     conf.set(
         TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, 
HashPartitioner.class.getName());
     conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, 
workingDir.toString());
+    conf.set(RSS_AM_SHUFFLE_MANAGER_ADDRESS, "localhost");
+    conf.setInt(RSS_AM_SHUFFLE_MANAGER_PORT, 0);
+    conf.setInt(RSS_SHUFFLE_SOURCE_VERTEX_ID, 0);
+    conf.setInt(RSS_SHUFFLE_DESTINATION_VERTEX_ID, 1);
   }
 
   @AfterEach
@@ -103,4 +127,34 @@ public class RssUnorderedPartitionedKVOutputTest {
       assertTrue(emptyPartionsBitSet.get(i));
     }
   }
+
+  @Test
+  @Timeout(value = 8000, unit = TimeUnit.MILLISECONDS)
+  public void testClose() throws Exception {
+    try (MockedStatic<RPC> rpc = Mockito.mockStatic(RPC.class); ) {
+      TezRemoteShuffleUmbilicalProtocol protocol = 
mock(TezRemoteShuffleUmbilicalProtocol.class);
+      GetShuffleServerResponse response = new GetShuffleServerResponse();
+      ShuffleAssignmentsInfo shuffleAssignmentsInfo =
+          new ShuffleAssignmentsInfo(new HashMap(), new HashMap());
+      response.setShuffleAssignmentsInfoWritable(
+          new ShuffleAssignmentsInfoWritable(shuffleAssignmentsInfo));
+      doReturn(response).when(protocol).getShuffleAssignments(any());
+      rpc.when(() -> RPC.getProxy(any(), anyLong(), any(), 
any())).thenReturn(protocol);
+      try (MockedStatic<ConverterUtils> converterUtils = 
Mockito.mockStatic(ConverterUtils.class)) {
+        ContainerId containerId = 
ContainerId.newContainerId(OutputTestHelpers.APP_ATTEMPT_ID, 1);
+        converterUtils.when(() -> 
ConverterUtils.toContainerId(null)).thenReturn(containerId);
+        converterUtils
+            .when(() -> ConverterUtils.toContainerId(anyString()))
+            .thenReturn(containerId);
+        OutputContext outputContext = 
OutputTestHelpers.createOutputContext(conf, workingDir);
+        int numPartitions = 1;
+        RssUnorderedPartitionedKVOutput output =
+            new RssUnorderedPartitionedKVOutput(outputContext, numPartitions);
+        output.initialize();
+        output.start();
+        Assertions.assertNotNull(output.getWriter());
+        output.close();
+      }
+    }
+  }
 }

Reply via email to