This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 5bbf3605 [#855] feat(tez): Support Tez Output RssUnorderedKVOutput
(#944)
5bbf3605 is described below
commit 5bbf3605e7dbd0b3bbac322025fa26f6d70be012
Author: bin41215 <[email protected]>
AuthorDate: Mon Jun 12 19:47:47 2023 +0800
[#855] feat(tez): Support Tez Output RssUnorderedKVOutput (#944)
### What changes were proposed in this pull request?
support tez write UnorderedKVOutput
### Why are the changes needed?
Fix: #855
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT
Co-authored-by: bin3.zhang <[email protected]>
---
.../output/RssOrderedPartitionedKVOutput.java | 5 +-
...onedKVOutput.java => RssUnorderedKVOutput.java} | 31 +++---
.../output/RssUnorderedPartitionedKVOutput.java | 5 +-
.../library/output/RssUnorderedKVOutputTest.java | 107 +++++++++++++++++++++
4 files changed, 129 insertions(+), 19 deletions(-)
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssOrderedPartitionedKVOutput.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssOrderedPartitionedKVOutput.java
index 928ec3c4..5ccb9ad8 100644
---
a/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssOrderedPartitionedKVOutput.java
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssOrderedPartitionedKVOutput.java
@@ -230,12 +230,13 @@ public class RssOrderedPartitionedKVOutput extends
AbstractLogicalOutput {
int[] numRecordsPerPartition = ((RssSorter)
sorter).getNumRecordsPerPartition();
- RssTezPerPartitionRecord rssTezSpillRecord = new
RssTezPerPartitionRecord(numOutputs, numRecordsPerPartition);
+ RssTezPerPartitionRecord rssTezPerPartitionRecord =
+ new RssTezPerPartitionRecord(numOutputs, numRecordsPerPartition);
LOG.info("RssTezPerPartitionRecord is initialized");
ShuffleUtils.generateEventOnSpill(eventList, true, isLastEvent,
- getContext(), 0, rssTezSpillRecord,
+ getContext(), 0, rssTezPerPartitionRecord,
getNumPhysicalOutputs(), sendEmptyPartitionDetails,
getContext().getUniqueIdentifier(),
sorter.getPartitionStats(), sorter.reportDetailedPartitionStats(),
auxiliaryService, deflater);
LOG.info("Generate events.");
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutput.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutput.java
similarity index 95%
copy from
client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutput.java
copy to
client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutput.java
index d0fb0788..b1cef02e 100644
---
a/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutput.java
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutput.java
@@ -66,16 +66,17 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.ShuffleServerInfo;
+
/**
- * {@link RssUnorderedPartitionedKVOutput} is an {@link AbstractLogicalOutput}
which
+ * {@link RssUnorderedKVOutput} is an {@link AbstractLogicalOutput} which
* support remote shuffle.
*
*/
@Public
-public class RssUnorderedPartitionedKVOutput extends AbstractLogicalOutput {
- private static final Logger LOG =
LoggerFactory.getLogger(RssUnorderedPartitionedKVOutput.class);
- protected ExternalSorter sorter;
+public class RssUnorderedKVOutput extends AbstractLogicalOutput {
+ private static final Logger LOG =
LoggerFactory.getLogger(RssUnorderedKVOutput.class);
+ protected ExternalSorter sorter;
protected Configuration conf;
protected MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
private long startTime;
@@ -95,7 +96,7 @@ public class RssUnorderedPartitionedKVOutput extends
AbstractLogicalOutput {
private String destinationVertexName;
private int shuffleId;
- public RssUnorderedPartitionedKVOutput(OutputContext outputContext, int
numPhysicalOutputs) {
+ public RssUnorderedKVOutput(OutputContext outputContext, int
numPhysicalOutputs) {
super(outputContext, numPhysicalOutputs);
this.outputContext = outputContext;
this.deflater = TezCommonUtils.newBestCompressionDeflater();
@@ -109,7 +110,7 @@ public class RssUnorderedPartitionedKVOutput extends
AbstractLogicalOutput {
LOG.info("taskAttemptId is {}", taskAttemptId.toString());
LOG.info("taskVertexName is {}", taskVertexName);
LOG.info("destinationVertexName is {}", destinationVertexName);
- LOG.info("Initialized RssUnOrderedPartitionedKVOutput.");
+ LOG.info("Initialized RssUnorderedKVOutput.");
}
private void getRssConf() {
@@ -117,13 +118,14 @@ public class RssUnorderedPartitionedKVOutput extends
AbstractLogicalOutput {
JobConf conf = new JobConf(RssTezConfig.RSS_CONF_FILE);
this.host = conf.get(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS, "null
host");
this.port = conf.getInt(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT, -1);
-
LOG.info("Got RssConf am info : host is {}, port is {}", host, port);
+
} catch (Exception e) {
LOG.warn("debugRssConf error: ", e);
}
}
+
@Override
public List<Event> initialize() throws Exception {
this.startTime = System.nanoTime();
@@ -141,10 +143,10 @@ public class RssUnorderedPartitionedKVOutput extends
AbstractLogicalOutput {
TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT);
-
final InetSocketAddress address = NetUtils.createSocketAddrForHost(host,
port);
UserGroupInformation taskOwner =
UserGroupInformation.createRemoteUser(this.applicationId.toString());
+
TezRemoteShuffleUmbilicalProtocol umbilical = taskOwner
.doAs(new
PrivilegedExceptionAction<TezRemoteShuffleUmbilicalProtocol>() {
@Override
@@ -160,8 +162,8 @@ public class RssUnorderedPartitionedKVOutput extends
AbstractLogicalOutput {
this.shuffleId = RssTezUtils.computeShuffleId(tezDAGID.getId(),
this.taskVertexName, this.destinationVertexName);
GetShuffleServerRequest request = new
GetShuffleServerRequest(this.taskAttemptId, this.mapNum,
this.numOutputs, this.shuffleId);
- GetShuffleServerResponse response =
umbilical.getShuffleAssignments(request);
+ GetShuffleServerResponse response =
umbilical.getShuffleAssignments(request);
this.partitionToServers = response.getShuffleAssignmentsInfoWritable()
.getShuffleAssignmentsInfo().getPartitionToServers();
@@ -190,7 +192,7 @@ public class RssUnorderedPartitionedKVOutput extends
AbstractLogicalOutput {
getContext().getDestinationVertexName(),
this.getClass().getSimpleName());
returnEvents = generateEmptyEvents();
}
- LOG.info("RssUnorderedPartitionedKVOutput close.");
+ LOG.info("RssUnOrderedKVOutput close.");
return returnEvents;
}
@@ -207,7 +209,7 @@ public class RssUnorderedPartitionedKVOutput extends
AbstractLogicalOutput {
}
@Override
- public Writer getWriter() throws IOException {
+ public Writer getWriter() throws Exception {
Preconditions.checkState(isStarted.get(), "Cannot get writer before
starting the Output");
return new KeyValuesWriter() {
@Override
@@ -231,12 +233,13 @@ public class RssUnorderedPartitionedKVOutput extends
AbstractLogicalOutput {
int[] numRecordsPerPartition = ((RssUnSorter)
sorter).getNumRecordsPerPartition();
- RssTezPerPartitionRecord rssTezSpillRecord = new
RssTezPerPartitionRecord(numOutputs, numRecordsPerPartition);
+ RssTezPerPartitionRecord rssTezPerPartitionRecord =
+ new RssTezPerPartitionRecord(numOutputs, numRecordsPerPartition);
LOG.info("RssTezPerPartitionRecord is initialized");
ShuffleUtils.generateEventOnSpill(eventList, true, isLastEvent,
- getContext(), 0, rssTezSpillRecord,
+ getContext(), 0, rssTezPerPartitionRecord,
getNumPhysicalOutputs(), sendEmptyPartitionDetails,
getContext().getUniqueIdentifier(),
sorter.getPartitionStats(), sorter.reportDetailedPartitionStats(),
auxiliaryService, deflater);
LOG.info("Generate events.");
@@ -288,6 +291,4 @@ public class RssUnorderedPartitionedKVOutput extends
AbstractLogicalOutput {
public static Set<String> getConfigurationKeySet() {
return Collections.unmodifiableSet(confKeys);
}
-
}
-
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutput.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutput.java
index d0fb0788..0eb26e0f 100644
---
a/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutput.java
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutput.java
@@ -231,12 +231,13 @@ public class RssUnorderedPartitionedKVOutput extends
AbstractLogicalOutput {
int[] numRecordsPerPartition = ((RssUnSorter)
sorter).getNumRecordsPerPartition();
- RssTezPerPartitionRecord rssTezSpillRecord = new
RssTezPerPartitionRecord(numOutputs, numRecordsPerPartition);
+ RssTezPerPartitionRecord rssTezPerPartitionRecord =
+ new RssTezPerPartitionRecord(numOutputs, numRecordsPerPartition);
LOG.info("RssTezPerPartitionRecord is initialized");
ShuffleUtils.generateEventOnSpill(eventList, true, isLastEvent,
- getContext(), 0, rssTezSpillRecord,
+ getContext(), 0, rssTezPerPartitionRecord,
getNumPhysicalOutputs(), sendEmptyPartitionDetails,
getContext().getUniqueIdentifier(),
sorter.getPartitionStats(), sorter.reportDetailedPartitionStats(),
auxiliaryService, deflater);
LOG.info("Generate events.");
diff --git
a/client-tez/src/test/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutputTest.java
b/client-tez/src/test/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutputTest.java
new file mode 100644
index 00000000..9f4f2044
--- /dev/null
+++
b/client-tez/src/test/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutputTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.tez.runtime.library.output;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import org.apache.uniffle.common.ShuffleServerInfo;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RssUnorderedKVOutputTest {
+ private static Map<Integer, List<ShuffleServerInfo>> partitionToServers =
new HashMap<>();
+ private Configuration conf;
+ private FileSystem localFs;
+ private Path workingDir;
+
+
+ @BeforeEach
+ public void setup() throws IOException {
+ conf = new Configuration();
+ localFs = FileSystem.getLocal(conf);
+ workingDir = new Path(System.getProperty("test.build.data",
+ System.getProperty("java.io.tmpdir", "/tmp")),
+ RssOrderedPartitionedKVOutputTest.class.getName()).makeQualified(
+ localFs.getUri(), localFs.getWorkingDirectory());
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS,
Text.class.getName());
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS,
Text.class.getName());
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS,
+ HashPartitioner.class.getName());
+ conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS,
workingDir.toString());
+ }
+
+ @AfterEach
+ public void cleanup() throws IOException {
+ localFs.delete(workingDir, true);
+ }
+
+ @Test
+ @Timeout(value = 3000, unit = TimeUnit.MILLISECONDS)
+ public void testNonStartedOutput() throws Exception {
+ OutputContext outputContext = OutputTestHelpers.createOutputContext(conf,
workingDir);
+ int numPartitions = 10;
+ RssUnorderedKVOutput output = new RssUnorderedKVOutput(outputContext,
numPartitions);
+ List<Event> events = output.close();
+ assertEquals(2, events.size());
+ Event event1 = events.get(0);
+ assertTrue(event1 instanceof VertexManagerEvent);
+ Event event2 = events.get(1);
+ assertTrue(event2 instanceof CompositeDataMovementEvent);
+ CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) event2;
+ ByteBuffer bb = cdme.getUserPayload();
+ ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload =
+
ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(bb));
+ assertTrue(shufflePayload.hasEmptyPartitions());
+ byte[] emptyPartitions =
TezCommonUtils.decompressByteStringToByteArray(shufflePayload
+ .getEmptyPartitions());
+ BitSet emptyPartionsBitSet =
TezUtilsInternal.fromByteArray(emptyPartitions);
+ assertEquals(numPartitions, emptyPartionsBitSet.cardinality());
+ for (int i = 0; i < numPartitions; i++) {
+ assertTrue(emptyPartionsBitSet.get(i));
+ }
+ }
+
+}