This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 6fb2a9a6 [#1018] test(tez) RssUnorderedPartitionedKVOutputTest add
close func unit test (#1034)
6fb2a9a6 is described below
commit 6fb2a9a63132ab9abbde2f1f9240f9caf2f5d0f0
Author: bin41215 <[email protected]>
AuthorDate: Tue Jul 25 07:20:37 2023 +0800
[#1018] test(tez) RssUnorderedPartitionedKVOutputTest add close func unit
test (#1034)
### What changes were proposed in this pull request?
tez-client, RssUnorderedPartitionedKVOutputTest add close func unit test
### Why are the changes needed?
tez-client, RssUnorderedPartitionedKVOutputTest add close func unit test
Fix: #1018
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
unit test.
---
.../RssUnorderedPartitionedKVOutputTest.java | 54 ++++++++++++++++++++++
1 file changed, 54 insertions(+)
diff --git
a/client-tez/src/test/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutputTest.java
b/client-tez/src/test/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutputTest.java
index c5389423..8cb5c7d6 100644
---
a/client-tez/src/test/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutputTest.java
+++
b/client-tez/src/test/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutputTest.java
@@ -30,7 +30,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.common.GetShuffleServerResponse;
+import org.apache.tez.common.ShuffleAssignmentsInfoWritable;
import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezRemoteShuffleUmbilicalProtocol;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.runtime.api.Event;
@@ -41,14 +47,27 @@ import
org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.apache.uniffle.common.ShuffleAssignmentsInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
+import static
org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS;
+import static org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT;
+import static
org.apache.tez.common.RssTezConfig.RSS_SHUFFLE_DESTINATION_VERTEX_ID;
+import static org.apache.tez.common.RssTezConfig.RSS_SHUFFLE_SOURCE_VERTEX_ID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
public class RssUnorderedPartitionedKVOutputTest {
private static Map<Integer, List<ShuffleServerInfo>> partitionToServers =
new HashMap<>();
@@ -56,6 +75,7 @@ public class RssUnorderedPartitionedKVOutputTest {
private FileSystem localFs;
private Path workingDir;
+ /** set up */
@BeforeEach
public void setup() throws IOException {
conf = new Configuration();
@@ -70,6 +90,10 @@ public class RssUnorderedPartitionedKVOutputTest {
conf.set(
TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS,
HashPartitioner.class.getName());
conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS,
workingDir.toString());
+ conf.set(RSS_AM_SHUFFLE_MANAGER_ADDRESS, "localhost");
+ conf.setInt(RSS_AM_SHUFFLE_MANAGER_PORT, 0);
+ conf.setInt(RSS_SHUFFLE_SOURCE_VERTEX_ID, 0);
+ conf.setInt(RSS_SHUFFLE_DESTINATION_VERTEX_ID, 1);
}
@AfterEach
@@ -103,4 +127,34 @@ public class RssUnorderedPartitionedKVOutputTest {
assertTrue(emptyPartionsBitSet.get(i));
}
}
+
+ @Test
+ @Timeout(value = 8000, unit = TimeUnit.MILLISECONDS)
+ public void testClose() throws Exception {
+ try (MockedStatic<RPC> rpc = Mockito.mockStatic(RPC.class); ) {
+ TezRemoteShuffleUmbilicalProtocol protocol =
mock(TezRemoteShuffleUmbilicalProtocol.class);
+ GetShuffleServerResponse response = new GetShuffleServerResponse();
+ ShuffleAssignmentsInfo shuffleAssignmentsInfo =
+ new ShuffleAssignmentsInfo(new HashMap(), new HashMap());
+ response.setShuffleAssignmentsInfoWritable(
+ new ShuffleAssignmentsInfoWritable(shuffleAssignmentsInfo));
+ doReturn(response).when(protocol).getShuffleAssignments(any());
+ rpc.when(() -> RPC.getProxy(any(), anyLong(), any(),
any())).thenReturn(protocol);
+ try (MockedStatic<ConverterUtils> converterUtils =
Mockito.mockStatic(ConverterUtils.class)) {
+ ContainerId containerId =
ContainerId.newContainerId(OutputTestHelpers.APP_ATTEMPT_ID, 1);
+ converterUtils.when(() ->
ConverterUtils.toContainerId(null)).thenReturn(containerId);
+ converterUtils
+ .when(() -> ConverterUtils.toContainerId(anyString()))
+ .thenReturn(containerId);
+ OutputContext outputContext =
OutputTestHelpers.createOutputContext(conf, workingDir);
+ int numPartitions = 1;
+ RssUnorderedPartitionedKVOutput output =
+ new RssUnorderedPartitionedKVOutput(outputContext, numPartitions);
+ output.initialize();
+ output.start();
+ Assertions.assertNotNull(output.getWriter());
+ output.close();
+ }
+ }
+ }
}