This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new b7595882 [#989] Bug(tez): parition class is not set for 
RssUnorderedKVOutput. (#994)
b7595882 is described below

commit b75958823849b95355bd69ee90b84e707e54cf19
Author: zhengchenyu <[email protected]>
AuthorDate: Tue Jul 11 11:20:55 2023 +0800

    [#989] Bug(tez): parition class is not set for RssUnorderedKVOutput. (#994)
    
    ### What changes were proposed in this pull request?
    
    set the partition class for RssUnorderedKVOutput.
    
    ### Why are the changes needed?
    
    If the partition class is not set for RssUnorderedKVOutput, will throw 
error:
    
    ```
    at 
com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
      at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.NullPointerException
      at java.lang.Class.forName0(Native Method)
      at java.lang.Class.forName(Class.java:348)
      at 
org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2534)
      at 
org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2499)
      at 
org.apache.tez.runtime.library.common.TezRuntimeUtils.instantiatePartitioner(TezRuntimeUtils.java:117)
      at 
org.apache.tez.runtime.library.common.sort.impl.ExternalSorter.(ExternalSorter.java:271)
      at 
org.apache.tez.runtime.library.common.sort.impl.RssUnSorter.(RssUnSorter.java:68)
      at 
org.apache.tez.runtime.library.output.RssUnorderedKVOutput.start(RssUnorderedKVOutput.java:204)
      at 
org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.init(MapRecordProcessor.java:193)
      at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:266)
    ```
    
    Fix: #989
    
    ### How was this patch tested?
    
    unit test, integration test, tez local, tez on yarn.
---
 .../library/output/RssUnorderedKVOutput.java       |  2 +
 .../runtime/library/output/OutputTestHelpers.java  |  7 +++
 .../library/output/RssUnorderedKVOutputTest.java   | 56 ++++++++++++++++++++--
 3 files changed, 61 insertions(+), 4 deletions(-)

diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutput.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutput.java
index 94a1a024..76f133e7 100644
--- 
a/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutput.java
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutput.java
@@ -118,6 +118,8 @@ public class RssUnorderedKVOutput extends 
AbstractLogicalOutput {
   public List<Event> initialize() throws Exception {
     this.startTime = System.nanoTime();
     this.conf = 
TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+    this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS,
+        UnorderedKVOutput.CustomPartitioner.class.getName());
     this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
 
     long memRequestSize = RssTezUtils.getInitialMemoryRequirement(conf, 
getContext().getTotalMemoryAvailableToTask());
diff --git 
a/client-tez/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
 
b/client-tez/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
index e6f8d7e7..85c47dd6 100644
--- 
a/client-tez/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
+++ 
b/client-tez/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.runtime.api.MemoryUpdateCallback;
@@ -38,6 +40,10 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
 public class OutputTestHelpers {
+
+  public static final ApplicationId APP_ID = 
ApplicationId.newInstance(1681717153064L, 3601637);
+  public static final ApplicationAttemptId APP_ATTEMPT_ID = 
ApplicationAttemptId.newInstance(APP_ID, 1);
+
   /**
    * help to create output context
    */
@@ -64,6 +70,7 @@ public class OutputTestHelpers {
     OutputStatisticsReporter statsReporter = 
mock(OutputStatisticsReporter.class);
     doReturn(statsReporter).when(ctx).getStatisticsReporter();
     doReturn(new 
ExecutionContextImpl("localhost")).when(ctx).getExecutionContext();
+    doReturn(APP_ID).when(ctx).getApplicationId();
     return ctx;
   }
 }
diff --git 
a/client-tez/src/test/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutputTest.java
 
b/client-tez/src/test/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutputTest.java
index 9f4f2044..c7a2952d 100644
--- 
a/client-tez/src/test/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutputTest.java
+++ 
b/client-tez/src/test/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutputTest.java
@@ -31,7 +31,14 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.common.GetShuffleServerResponse;
+import org.apache.tez.common.IdUtils;
+import org.apache.tez.common.ShuffleAssignmentsInfoWritable;
 import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezRemoteShuffleUmbilicalProtocol;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.runtime.api.Event;
@@ -39,17 +46,29 @@ import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 
+import org.apache.uniffle.common.ShuffleAssignmentsInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
 
+import static 
org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS;
+import static org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT;
+import static 
org.apache.tez.common.RssTezConfig.RSS_SHUFFLE_DESTINATION_VERTEX_ID;
+import static org.apache.tez.common.RssTezConfig.RSS_SHUFFLE_SOURCE_VERTEX_ID;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 
 public class RssUnorderedKVOutputTest {
   private static Map<Integer, List<ShuffleServerInfo>> partitionToServers = 
new HashMap<>();
@@ -57,7 +76,6 @@ public class RssUnorderedKVOutputTest {
   private FileSystem localFs;
   private Path workingDir;
 
-
   @BeforeEach
   public void setup() throws IOException {
     conf = new Configuration();
@@ -68,9 +86,11 @@ public class RssUnorderedKVOutputTest {
         localFs.getUri(), localFs.getWorkingDirectory());
     conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, 
Text.class.getName());
     conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, 
Text.class.getName());
-    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS,
-        HashPartitioner.class.getName());
     conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, 
workingDir.toString());
+    conf.set(RSS_AM_SHUFFLE_MANAGER_ADDRESS, "localhost");
+    conf.setInt(RSS_AM_SHUFFLE_MANAGER_PORT, 0);
+    conf.setInt(RSS_SHUFFLE_SOURCE_VERTEX_ID, 0);
+    conf.setInt(RSS_SHUFFLE_DESTINATION_VERTEX_ID, 1);
   }
 
   @AfterEach
@@ -104,4 +124,32 @@ public class RssUnorderedKVOutputTest {
     }
   }
 
+  @Test
+  @Timeout(value = 5000, unit = TimeUnit.MILLISECONDS)
+  public void testClose() throws Exception {
+    try (MockedStatic<RPC> rpc = Mockito.mockStatic(RPC.class);) {
+      TezRemoteShuffleUmbilicalProtocol protocol = 
mock(TezRemoteShuffleUmbilicalProtocol.class);
+      GetShuffleServerResponse response = new GetShuffleServerResponse();
+      ShuffleAssignmentsInfo shuffleAssignmentsInfo = new 
ShuffleAssignmentsInfo(new HashMap(), new HashMap());
+      response.setShuffleAssignmentsInfoWritable(new 
ShuffleAssignmentsInfoWritable(shuffleAssignmentsInfo));
+      doReturn(response).when(protocol).getShuffleAssignments(any());
+      rpc.when(() -> RPC.getProxy(any(), anyLong(), any(), 
any())).thenReturn(protocol);
+      try (MockedStatic<IdUtils> idUtils = Mockito.mockStatic(IdUtils.class)) {
+        idUtils.when(() -> 
IdUtils.getApplicationAttemptId()).thenReturn(OutputTestHelpers.APP_ATTEMPT_ID);
+        idUtils.when(() -> 
IdUtils.getAppAttemptId()).thenReturn(OutputTestHelpers.APP_ATTEMPT_ID.getAttemptId());
+        try (MockedStatic<ConverterUtils> converterUtils = 
Mockito.mockStatic(ConverterUtils.class)) {
+          ContainerId containerId = 
ContainerId.newContainerId(OutputTestHelpers.APP_ATTEMPT_ID, 1);
+          converterUtils.when(() -> 
ConverterUtils.toContainerId(null)).thenReturn(containerId);
+          converterUtils.when(() -> 
ConverterUtils.toContainerId(anyString())).thenReturn(containerId);
+          OutputContext outputContext = 
OutputTestHelpers.createOutputContext(conf, workingDir);
+          int numPartitions = 1;
+          RssUnorderedKVOutput output = new 
RssUnorderedKVOutput(outputContext, numPartitions);
+          output.initialize();
+          output.start();
+          Assertions.assertNotNull(output.getWriter());
+          output.close();
+        }
+      }
+    }
+  }
 }

Reply via email to