This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new c8def22c2 [CELEBORN-1729] Support ordered KV output for Tez
c8def22c2 is described below
commit c8def22c2a5882978370a71326e7ac3a4ba790ea
Author: hongguangwei <[email protected]>
AuthorDate: Thu Dec 5 20:17:21 2024 +0800
[CELEBORN-1729] Support ordered KV output for Tez
### What changes were proposed in this pull request?
Add CelebornOrderedPartitionedKVOutput to replace Tez's
OrderedPartitionedKVOutput
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #2971 from GH-Gloway/1730.
Authored-by: hongguangwei <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../output/CelebornOrderedPartitionedKVOutput.java | 172 +++++++++++++++++++++
.../tez/runtime/library/sort/CelebornSorter.java | 126 +++++++++++++++
2 files changed, 298 insertions(+)
diff --git
a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/output/CelebornOrderedPartitionedKVOutput.java
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/output/CelebornOrderedPartitionedKVOutput.java
new file mode 100644
index 000000000..0c379f410
--- /dev/null
+++
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/output/CelebornOrderedPartitionedKVOutput.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software
distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.output;
+
+import static org.apache.celeborn.tez.plugin.util.CelebornTezUtils.*;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.zip.Deflater;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.AbstractLogicalOutput;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.library.api.Partitioner;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.sort.CelebornSorter;
+import org.apache.tez.runtime.library.sort.CelebornTezPerPartitionRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.CelebornTezWriter;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.identity.UserIdentifier;
+import org.apache.celeborn.tez.plugin.util.CelebornTezUtils;
+
+/**
+ * {@link CelebornOrderedPartitionedKVOutput} is an {@link
AbstractLogicalOutput} which sorts
+ * key/value pairs written to it. It also partitions the output based on a
{@link Partitioner}
+ */
+@Public
+public class CelebornOrderedPartitionedKVOutput extends
OrderedPartitionedKVOutput {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(CelebornOrderedPartitionedKVOutput.class);
+
+ private long endTime;
+ private int mapNum;
+ private int numOutputs;
+ private int mapId;
+ private int attemptId;
+ private String host;
+ private int port;
+ private int shuffleId;
+ private String appId;
+ private final AtomicBoolean isStarted = new AtomicBoolean(false);
+ private final Deflater deflater;
+ CelebornTezWriter celebornTezWriter;
+ private boolean sendEmptyPartitionDetails;
+ CelebornConf celebornConf;
+
+ public CelebornOrderedPartitionedKVOutput(OutputContext outputContext, int
numPhysicalOutputs) {
+ super(outputContext, numPhysicalOutputs);
+ this.deflater = (Deflater) getParentPrivateField(this, "deflater");
+ this.sendEmptyPartitionDetails =
+ (boolean) getParentPrivateField(this, "sendEmptyPartitionDetails");
+ this.numOutputs = getNumPhysicalOutputs();
+ this.mapNum = outputContext.getVertexParallelism();
+ TezTaskAttemptID taskAttemptId =
+ TezTaskAttemptID.fromString(
+
CelebornTezUtils.uniqueIdentifierToAttemptId(outputContext.getUniqueIdentifier()));
+ attemptId = taskAttemptId.getId();
+ mapId = taskAttemptId.getTaskID().getId();
+ }
+
+ @Override
+ public synchronized List<Event> initialize() throws IOException {
+ super.initialize();
+
+ this.host = this.conf.get(TEZ_CELEBORN_LM_HOST);
+ this.port = this.conf.getInt(TEZ_CELEBORN_LM_PORT, -1);
+ this.shuffleId = this.conf.getInt(TEZ_SHUFFLE_ID, -1);
+ this.appId = this.conf.get(TEZ_CELEBORN_APPLICATION_ID);
+ celebornConf = CelebornTezUtils.fromTezConfiguration(conf);
+ celebornTezWriter =
+ new CelebornTezWriter(
+ shuffleId,
+ mapId,
+ mapId,
+ attemptId,
+ mapNum,
+ numOutputs,
+ celebornConf,
+ appId,
+ host,
+ port,
+ new UserIdentifier(
+ celebornConf.quotaUserSpecificTenant(),
celebornConf.quotaUserSpecificUserName()));
+
+ return Collections.emptyList();
+ }
+
+ @Override
+ public synchronized void start() throws Exception {
+ super.start();
+ sorter =
+ new CelebornSorter(
+ getContext(),
+ conf,
+ getNumPhysicalOutputs(),
+ (int) memoryUpdateCallbackHandler.getMemoryAssigned(),
+ celebornTezWriter,
+ celebornConf);
+ setParentPrivateField(this, "sorter", sorter);
+ }
+
+ @Override
+ public synchronized List<Event> close() throws IOException {
+ List<Event> returnEvents = Lists.newLinkedList();
+ if (sorter != null) {
+ sorter.flush();
+ returnEvents.addAll(sorter.close());
+ this.endTime = System.nanoTime();
+ returnEvents.addAll(generateEvents());
+ sorter = null;
+ } else {
+ returnEvents = super.close();
+ }
+
+ return returnEvents;
+ }
+
+ private List<Event> generateEvents() throws IOException {
+ List<Event> eventList = Lists.newLinkedList();
+ boolean isLastEvent = true;
+ String auxiliaryService =
+ conf.get(
+ TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+ TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+
+ int[] numRecordsPerPartition = ((CelebornSorter)
sorter).getNumRecordsPerPartition();
+
+ CelebornTezPerPartitionRecord celebornTezPerPartitionRecord =
+ new CelebornTezPerPartitionRecord(numOutputs, numRecordsPerPartition);
+
+ LOG.info("CelebornTezPerPartitionRecord is initialized");
+
+ ShuffleUtils.generateEventOnSpill(
+ eventList,
+ true,
+ isLastEvent,
+ getContext(),
+ 0,
+ celebornTezPerPartitionRecord,
+ getNumPhysicalOutputs(),
+ sendEmptyPartitionDetails,
+ getContext().getUniqueIdentifier(),
+ sorter.getPartitionStats(),
+ sorter.reportDetailedPartitionStats(),
+ auxiliaryService,
+ deflater);
+ LOG.info("Generate events.");
+ return eventList;
+ }
+}
diff --git
a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/sort/CelebornSorter.java
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/sort/CelebornSorter.java
new file mode 100644
index 000000000..99e18be5d
--- /dev/null
+++
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/sort/CelebornSorter.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.sort;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.CelebornTezWriter;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.exception.CelebornIOException;
+
+/** {@link CelebornSorter} is an {@link ExternalSorter} */
+public class CelebornSorter extends ExternalSorter {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CelebornSorter.class);
+ private CelebornSortBasedPusher celebornSortBasedPusher;
+
+ private int[] numRecordsPerPartition;
+
+ /** Initialization */
+ public CelebornSorter(
+ OutputContext outputContext,
+ Configuration conf,
+ int numOutputs,
+ int initialMemoryAvailable,
+ CelebornTezWriter celebornTezWriter,
+ CelebornConf celebornConf)
+ throws IOException {
+ super(outputContext, conf, numOutputs, initialMemoryAvailable);
+
+ this.numRecordsPerPartition = new int[numOutputs];
+
+ final float spillper =
+ this.conf.getFloat(
+ TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT_DEFAULT);
+ int pushSize = (int) (availableMemoryMb * spillper);
+ LOG.info("availableMemoryMb is {}", availableMemoryMb);
+ RawComparator intermediateOutputKeyComparator =
+ ConfigUtils.getIntermediateOutputKeyComparator(conf);
+ celebornSortBasedPusher =
+ new CelebornSortBasedPusher<>(
+ keySerializer,
+ valSerializer,
+ initialMemoryAvailable,
+ pushSize,
+ intermediateOutputKeyComparator,
+ mapOutputByteCounter,
+ mapOutputRecordCounter,
+ celebornTezWriter,
+ celebornConf,
+ true);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ celebornSortBasedPusher.close();
+ }
+
+ @Override
+ public final List<Event> close() throws IOException {
+ return super.close();
+ }
+
+ @Override
+ public void write(Object key, Object value) throws IOException {
+ try {
+ collect(key, value, partitioner.getPartition(key, value, partitions));
+ } catch (InterruptedException e) {
+ throw new CelebornIOException(e);
+ }
+ }
+
+ synchronized void collect(Object key, Object value, final int partition)
+ throws IOException, InterruptedException {
+ if (key.getClass() != serializationContext.getKeyClass()) {
+ throw new IOException(
+ "Type mismatch in key from map: expected "
+ + serializationContext.getKeyClass().getName()
+ + ", received "
+ + key.getClass().getName());
+ }
+ if (value.getClass() != serializationContext.getValueClass()) {
+ throw new IOException(
+ "Type mismatch in value from map: expected "
+ + serializationContext.getValueClass().getName()
+ + ", received "
+ + value.getClass().getName());
+ }
+ if (partition < 0 || partition >= partitions) {
+ throw new IOException("Illegal partition for " + key + " (" + partition
+ ")");
+ }
+
+ celebornSortBasedPusher.insert(key, value, partition);
+ numRecordsPerPartition[partition]++;
+ }
+
+ public int[] getNumRecordsPerPartition() {
+ return numRecordsPerPartition;
+ }
+}