This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new b7595882 [#989] Bug(tez): parition class is not set for
RssUnorderedKVOutput. (#994)
b7595882 is described below
commit b75958823849b95355bd69ee90b84e707e54cf19
Author: zhengchenyu <[email protected]>
AuthorDate: Tue Jul 11 11:20:55 2023 +0800
[#989] Bug(tez): parition class is not set for RssUnorderedKVOutput. (#994)
### What changes were proposed in this pull request?
set the partition class for RssUnorderedKVOutput.
### Why are the changes needed?
If the partition class is not set for RssUnorderedKVOutput, will throw
error:
```
at
com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at
org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2534)
at
org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2499)
at
org.apache.tez.runtime.library.common.TezRuntimeUtils.instantiatePartitioner(TezRuntimeUtils.java:117)
at
org.apache.tez.runtime.library.common.sort.impl.ExternalSorter.(ExternalSorter.java:271)
at
org.apache.tez.runtime.library.common.sort.impl.RssUnSorter.(RssUnSorter.java:68)
at
org.apache.tez.runtime.library.output.RssUnorderedKVOutput.start(RssUnorderedKVOutput.java:204)
at
org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.init(MapRecordProcessor.java:193)
at
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:266)
```
Fix: #989
### How was this patch tested?
unit test, integration test, tez local, tez on yarn.
---
.../library/output/RssUnorderedKVOutput.java | 2 +
.../runtime/library/output/OutputTestHelpers.java | 7 +++
.../library/output/RssUnorderedKVOutputTest.java | 56 ++++++++++++++++++++--
3 files changed, 61 insertions(+), 4 deletions(-)
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutput.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutput.java
index 94a1a024..76f133e7 100644
---
a/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutput.java
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutput.java
@@ -118,6 +118,8 @@ public class RssUnorderedKVOutput extends
AbstractLogicalOutput {
public List<Event> initialize() throws Exception {
this.startTime = System.nanoTime();
this.conf =
TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+ this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS,
+ UnorderedKVOutput.CustomPartitioner.class.getName());
this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
long memRequestSize = RssTezUtils.getInitialMemoryRequirement(conf,
getContext().getTotalMemoryAvailableToTask());
diff --git
a/client-tez/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
b/client-tez/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
index e6f8d7e7..85c47dd6 100644
---
a/client-tez/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
+++
b/client-tez/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.runtime.api.MemoryUpdateCallback;
@@ -38,6 +40,10 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
public class OutputTestHelpers {
+
+ public static final ApplicationId APP_ID =
ApplicationId.newInstance(1681717153064L, 3601637);
+ public static final ApplicationAttemptId APP_ATTEMPT_ID =
ApplicationAttemptId.newInstance(APP_ID, 1);
+
/**
* help to create output context
*/
@@ -64,6 +70,7 @@ public class OutputTestHelpers {
OutputStatisticsReporter statsReporter =
mock(OutputStatisticsReporter.class);
doReturn(statsReporter).when(ctx).getStatisticsReporter();
doReturn(new
ExecutionContextImpl("localhost")).when(ctx).getExecutionContext();
+ doReturn(APP_ID).when(ctx).getApplicationId();
return ctx;
}
}
diff --git
a/client-tez/src/test/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutputTest.java
b/client-tez/src/test/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutputTest.java
index 9f4f2044..c7a2952d 100644
---
a/client-tez/src/test/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutputTest.java
+++
b/client-tez/src/test/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutputTest.java
@@ -31,7 +31,14 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.common.GetShuffleServerResponse;
+import org.apache.tez.common.IdUtils;
+import org.apache.tez.common.ShuffleAssignmentsInfoWritable;
import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezRemoteShuffleUmbilicalProtocol;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.runtime.api.Event;
@@ -39,17 +46,29 @@ import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.apache.uniffle.common.ShuffleAssignmentsInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
+import static
org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS;
+import static org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT;
+import static
org.apache.tez.common.RssTezConfig.RSS_SHUFFLE_DESTINATION_VERTEX_ID;
+import static org.apache.tez.common.RssTezConfig.RSS_SHUFFLE_SOURCE_VERTEX_ID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
public class RssUnorderedKVOutputTest {
private static Map<Integer, List<ShuffleServerInfo>> partitionToServers =
new HashMap<>();
@@ -57,7 +76,6 @@ public class RssUnorderedKVOutputTest {
private FileSystem localFs;
private Path workingDir;
-
@BeforeEach
public void setup() throws IOException {
conf = new Configuration();
@@ -68,9 +86,11 @@ public class RssUnorderedKVOutputTest {
localFs.getUri(), localFs.getWorkingDirectory());
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS,
Text.class.getName());
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS,
Text.class.getName());
- conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS,
- HashPartitioner.class.getName());
conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS,
workingDir.toString());
+ conf.set(RSS_AM_SHUFFLE_MANAGER_ADDRESS, "localhost");
+ conf.setInt(RSS_AM_SHUFFLE_MANAGER_PORT, 0);
+ conf.setInt(RSS_SHUFFLE_SOURCE_VERTEX_ID, 0);
+ conf.setInt(RSS_SHUFFLE_DESTINATION_VERTEX_ID, 1);
}
@AfterEach
@@ -104,4 +124,32 @@ public class RssUnorderedKVOutputTest {
}
}
+ @Test
+ @Timeout(value = 5000, unit = TimeUnit.MILLISECONDS)
+ public void testClose() throws Exception {
+ try (MockedStatic<RPC> rpc = Mockito.mockStatic(RPC.class);) {
+ TezRemoteShuffleUmbilicalProtocol protocol =
mock(TezRemoteShuffleUmbilicalProtocol.class);
+ GetShuffleServerResponse response = new GetShuffleServerResponse();
+ ShuffleAssignmentsInfo shuffleAssignmentsInfo = new
ShuffleAssignmentsInfo(new HashMap(), new HashMap());
+ response.setShuffleAssignmentsInfoWritable(new
ShuffleAssignmentsInfoWritable(shuffleAssignmentsInfo));
+ doReturn(response).when(protocol).getShuffleAssignments(any());
+ rpc.when(() -> RPC.getProxy(any(), anyLong(), any(),
any())).thenReturn(protocol);
+ try (MockedStatic<IdUtils> idUtils = Mockito.mockStatic(IdUtils.class)) {
+ idUtils.when(() ->
IdUtils.getApplicationAttemptId()).thenReturn(OutputTestHelpers.APP_ATTEMPT_ID);
+ idUtils.when(() ->
IdUtils.getAppAttemptId()).thenReturn(OutputTestHelpers.APP_ATTEMPT_ID.getAttemptId());
+ try (MockedStatic<ConverterUtils> converterUtils =
Mockito.mockStatic(ConverterUtils.class)) {
+ ContainerId containerId =
ContainerId.newContainerId(OutputTestHelpers.APP_ATTEMPT_ID, 1);
+ converterUtils.when(() ->
ConverterUtils.toContainerId(null)).thenReturn(containerId);
+ converterUtils.when(() ->
ConverterUtils.toContainerId(anyString())).thenReturn(containerId);
+ OutputContext outputContext =
OutputTestHelpers.createOutputContext(conf, workingDir);
+ int numPartitions = 1;
+ RssUnorderedKVOutput output = new
RssUnorderedKVOutput(outputContext, numPartitions);
+ output.initialize();
+ output.start();
+ Assertions.assertNotNull(output.getWriter());
+ output.close();
+ }
+ }
+ }
+ }
}