This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 5bbf3605 [#855] feat(tez): Support Tez Output RssUnorderedKVOutput 
(#944)
5bbf3605 is described below

commit 5bbf3605e7dbd0b3bbac322025fa26f6d70be012
Author: bin41215 <[email protected]>
AuthorDate: Mon Jun 12 19:47:47 2023 +0800

    [#855] feat(tez): Support Tez Output RssUnorderedKVOutput (#944)
    
    ### What changes were proposed in this pull request?
    
    support tez write UnorderedKVOutput
    
    ### Why are the changes needed?
    Fix: #855
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    UT
    
    Co-authored-by: bin3.zhang <[email protected]>
---
 .../output/RssOrderedPartitionedKVOutput.java      |   5 +-
 ...onedKVOutput.java => RssUnorderedKVOutput.java} |  31 +++---
 .../output/RssUnorderedPartitionedKVOutput.java    |   5 +-
 .../library/output/RssUnorderedKVOutputTest.java   | 107 +++++++++++++++++++++
 4 files changed, 129 insertions(+), 19 deletions(-)

diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssOrderedPartitionedKVOutput.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssOrderedPartitionedKVOutput.java
index 928ec3c4..5ccb9ad8 100644
--- 
a/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssOrderedPartitionedKVOutput.java
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssOrderedPartitionedKVOutput.java
@@ -230,12 +230,13 @@ public class RssOrderedPartitionedKVOutput extends 
AbstractLogicalOutput {
 
     int[] numRecordsPerPartition = ((RssSorter) 
sorter).getNumRecordsPerPartition();
 
-    RssTezPerPartitionRecord rssTezSpillRecord = new 
RssTezPerPartitionRecord(numOutputs, numRecordsPerPartition);
+    RssTezPerPartitionRecord rssTezPerPartitionRecord =
+        new RssTezPerPartitionRecord(numOutputs, numRecordsPerPartition);
 
     LOG.info("RssTezPerPartitionRecord is initialized");
 
     ShuffleUtils.generateEventOnSpill(eventList, true, isLastEvent,
-        getContext(), 0, rssTezSpillRecord,
+        getContext(), 0, rssTezPerPartitionRecord,
         getNumPhysicalOutputs(), sendEmptyPartitionDetails, 
getContext().getUniqueIdentifier(),
         sorter.getPartitionStats(), sorter.reportDetailedPartitionStats(), 
auxiliaryService, deflater);
     LOG.info("Generate events.");
diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutput.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutput.java
similarity index 95%
copy from 
client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutput.java
copy to 
client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutput.java
index d0fb0788..b1cef02e 100644
--- 
a/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutput.java
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutput.java
@@ -66,16 +66,17 @@ import org.slf4j.LoggerFactory;
 import org.apache.uniffle.common.ShuffleServerInfo;
 
 
+
 /**
- * {@link RssUnorderedPartitionedKVOutput} is an {@link AbstractLogicalOutput} 
which
+ * {@link RssUnorderedKVOutput} is an {@link AbstractLogicalOutput} which
  * support remote shuffle.
  *
  */
 @Public
-public class RssUnorderedPartitionedKVOutput extends AbstractLogicalOutput {
-  private static final Logger LOG = 
LoggerFactory.getLogger(RssUnorderedPartitionedKVOutput.class);
-  protected ExternalSorter sorter;
+public class RssUnorderedKVOutput extends AbstractLogicalOutput {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(RssUnorderedKVOutput.class);
+  protected ExternalSorter sorter;
   protected Configuration conf;
   protected MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
   private long startTime;
@@ -95,7 +96,7 @@ public class RssUnorderedPartitionedKVOutput extends 
AbstractLogicalOutput {
   private String destinationVertexName;
   private int shuffleId;
 
-  public RssUnorderedPartitionedKVOutput(OutputContext outputContext, int 
numPhysicalOutputs) {
+  public RssUnorderedKVOutput(OutputContext outputContext, int 
numPhysicalOutputs) {
     super(outputContext, numPhysicalOutputs);
     this.outputContext = outputContext;
     this.deflater = TezCommonUtils.newBestCompressionDeflater();
@@ -109,7 +110,7 @@ public class RssUnorderedPartitionedKVOutput extends 
AbstractLogicalOutput {
     LOG.info("taskAttemptId is {}", taskAttemptId.toString());
     LOG.info("taskVertexName is {}", taskVertexName);
     LOG.info("destinationVertexName is {}", destinationVertexName);
-    LOG.info("Initialized RssUnOrderedPartitionedKVOutput.");
+    LOG.info("Initialized RssUnorderedKVOutput.");
   }
 
   private void getRssConf() {
@@ -117,13 +118,14 @@ public class RssUnorderedPartitionedKVOutput extends 
AbstractLogicalOutput {
       JobConf conf = new JobConf(RssTezConfig.RSS_CONF_FILE);
       this.host = conf.get(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS, "null 
host");
       this.port = conf.getInt(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT, -1);
-
       LOG.info("Got RssConf am info : host is {}, port is {}", host, port);
+
     } catch (Exception e) {
       LOG.warn("debugRssConf error: ", e);
     }
   }
 
+
   @Override
   public List<Event> initialize() throws Exception {
     this.startTime = System.nanoTime();
@@ -141,10 +143,10 @@ public class RssUnorderedPartitionedKVOutput extends 
AbstractLogicalOutput {
       
TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
       
TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT);
 
-
     final InetSocketAddress address = NetUtils.createSocketAddrForHost(host, 
port);
 
     UserGroupInformation taskOwner = 
UserGroupInformation.createRemoteUser(this.applicationId.toString());
+
     TezRemoteShuffleUmbilicalProtocol umbilical = taskOwner
         .doAs(new 
PrivilegedExceptionAction<TezRemoteShuffleUmbilicalProtocol>() {
           @Override
@@ -160,8 +162,8 @@ public class RssUnorderedPartitionedKVOutput extends 
AbstractLogicalOutput {
     this.shuffleId = RssTezUtils.computeShuffleId(tezDAGID.getId(), 
this.taskVertexName, this.destinationVertexName);
     GetShuffleServerRequest request = new 
GetShuffleServerRequest(this.taskAttemptId, this.mapNum,
         this.numOutputs, this.shuffleId);
-    GetShuffleServerResponse response = 
umbilical.getShuffleAssignments(request);
 
+    GetShuffleServerResponse response = 
umbilical.getShuffleAssignments(request);
     this.partitionToServers = response.getShuffleAssignmentsInfoWritable()
         .getShuffleAssignmentsInfo().getPartitionToServers();
 
@@ -190,7 +192,7 @@ public class RssUnorderedPartitionedKVOutput extends 
AbstractLogicalOutput {
           getContext().getDestinationVertexName(), 
this.getClass().getSimpleName());
       returnEvents = generateEmptyEvents();
     }
-    LOG.info("RssUnorderedPartitionedKVOutput close.");
+    LOG.info("RssUnOrderedKVOutput close.");
     return returnEvents;
   }
 
@@ -207,7 +209,7 @@ public class RssUnorderedPartitionedKVOutput extends 
AbstractLogicalOutput {
   }
 
   @Override
-  public Writer getWriter() throws IOException {
+  public Writer getWriter() throws Exception {
     Preconditions.checkState(isStarted.get(), "Cannot get writer before 
starting the Output");
     return new KeyValuesWriter() {
       @Override
@@ -231,12 +233,13 @@ public class RssUnorderedPartitionedKVOutput extends 
AbstractLogicalOutput {
 
     int[] numRecordsPerPartition = ((RssUnSorter) 
sorter).getNumRecordsPerPartition();
 
-    RssTezPerPartitionRecord rssTezSpillRecord = new 
RssTezPerPartitionRecord(numOutputs, numRecordsPerPartition);
+    RssTezPerPartitionRecord rssTezPerPartitionRecord =
+        new RssTezPerPartitionRecord(numOutputs, numRecordsPerPartition);
 
     LOG.info("RssTezPerPartitionRecord is initialized");
 
     ShuffleUtils.generateEventOnSpill(eventList, true, isLastEvent,
-        getContext(), 0, rssTezSpillRecord,
+        getContext(), 0, rssTezPerPartitionRecord,
         getNumPhysicalOutputs(), sendEmptyPartitionDetails, 
getContext().getUniqueIdentifier(),
         sorter.getPartitionStats(), sorter.reportDetailedPartitionStats(), 
auxiliaryService, deflater);
     LOG.info("Generate events.");
@@ -288,6 +291,4 @@ public class RssUnorderedPartitionedKVOutput extends 
AbstractLogicalOutput {
   public static Set<String> getConfigurationKeySet() {
     return Collections.unmodifiableSet(confKeys);
   }
-
 }
-
diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutput.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutput.java
index d0fb0788..0eb26e0f 100644
--- 
a/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutput.java
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutput.java
@@ -231,12 +231,13 @@ public class RssUnorderedPartitionedKVOutput extends 
AbstractLogicalOutput {
 
     int[] numRecordsPerPartition = ((RssUnSorter) 
sorter).getNumRecordsPerPartition();
 
-    RssTezPerPartitionRecord rssTezSpillRecord = new 
RssTezPerPartitionRecord(numOutputs, numRecordsPerPartition);
+    RssTezPerPartitionRecord rssTezPerPartitionRecord =
+        new RssTezPerPartitionRecord(numOutputs, numRecordsPerPartition);
 
     LOG.info("RssTezPerPartitionRecord is initialized");
 
     ShuffleUtils.generateEventOnSpill(eventList, true, isLastEvent,
-        getContext(), 0, rssTezSpillRecord,
+        getContext(), 0, rssTezPerPartitionRecord,
         getNumPhysicalOutputs(), sendEmptyPartitionDetails, 
getContext().getUniqueIdentifier(),
         sorter.getPartitionStats(), sorter.reportDetailedPartitionStats(), 
auxiliaryService, deflater);
     LOG.info("Generate events.");
diff --git 
a/client-tez/src/test/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutputTest.java
 
b/client-tez/src/test/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutputTest.java
new file mode 100644
index 00000000..9f4f2044
--- /dev/null
+++ 
b/client-tez/src/test/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutputTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.tez.runtime.library.output;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import org.apache.uniffle.common.ShuffleServerInfo;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RssUnorderedKVOutputTest {
+  private static Map<Integer, List<ShuffleServerInfo>> partitionToServers = 
new HashMap<>();
+  private Configuration conf;
+  private FileSystem localFs;
+  private Path workingDir;
+
+
+  @BeforeEach
+  public void setup() throws IOException {
+    conf = new Configuration();
+    localFs = FileSystem.getLocal(conf);
+    workingDir = new Path(System.getProperty("test.build.data",
+        System.getProperty("java.io.tmpdir", "/tmp")),
+        RssOrderedPartitionedKVOutputTest.class.getName()).makeQualified(
+        localFs.getUri(), localFs.getWorkingDirectory());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, 
Text.class.getName());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, 
Text.class.getName());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS,
+        HashPartitioner.class.getName());
+    conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, 
workingDir.toString());
+  }
+
+  @AfterEach
+  public void cleanup() throws IOException {
+    localFs.delete(workingDir, true);
+  }
+
+  @Test
+  @Timeout(value = 3000, unit = TimeUnit.MILLISECONDS)
+  public void testNonStartedOutput() throws Exception {
+    OutputContext outputContext = OutputTestHelpers.createOutputContext(conf, 
workingDir);
+    int numPartitions = 10;
+    RssUnorderedKVOutput output = new RssUnorderedKVOutput(outputContext, 
numPartitions);
+    List<Event> events = output.close();
+    assertEquals(2, events.size());
+    Event event1 = events.get(0);
+    assertTrue(event1 instanceof VertexManagerEvent);
+    Event event2 = events.get(1);
+    assertTrue(event2 instanceof CompositeDataMovementEvent);
+    CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) event2;
+    ByteBuffer bb = cdme.getUserPayload();
+    ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload =
+        
ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(bb));
+    assertTrue(shufflePayload.hasEmptyPartitions());
+    byte[] emptyPartitions = 
TezCommonUtils.decompressByteStringToByteArray(shufflePayload
+        .getEmptyPartitions());
+    BitSet emptyPartionsBitSet = 
TezUtilsInternal.fromByteArray(emptyPartitions);
+    assertEquals(numPartitions, emptyPartionsBitSet.cardinality());
+    for (int i = 0; i < numPartitions; i++) {
+      assertTrue(emptyPartionsBitSet.get(i));
+    }
+  }
+
+}

Reply via email to