This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new c8def22c2 [CELEBORN-1729] Support ordered KV output for Tez
c8def22c2 is described below

commit c8def22c2a5882978370a71326e7ac3a4ba790ea
Author: hongguangwei <[email protected]>
AuthorDate: Thu Dec 5 20:17:21 2024 +0800

    [CELEBORN-1729] Support ordered KV output for Tez
    
    ### What changes were proposed in this pull request?
    Add CelebornOrderedPartitionedKVOutput to replace Tez's 
OrderedPartitionedKVOutput
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #2971 from GH-Gloway/1730.
    
    Authored-by: hongguangwei <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../output/CelebornOrderedPartitionedKVOutput.java | 172 +++++++++++++++++++++
 .../tez/runtime/library/sort/CelebornSorter.java   | 126 +++++++++++++++
 2 files changed, 298 insertions(+)

diff --git 
a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/output/CelebornOrderedPartitionedKVOutput.java
 
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/output/CelebornOrderedPartitionedKVOutput.java
new file mode 100644
index 000000000..0c379f410
--- /dev/null
+++ 
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/output/CelebornOrderedPartitionedKVOutput.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software 
distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing 
permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.output;
+
+import static org.apache.celeborn.tez.plugin.util.CelebornTezUtils.*;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.zip.Deflater;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.AbstractLogicalOutput;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.library.api.Partitioner;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.sort.CelebornSorter;
+import org.apache.tez.runtime.library.sort.CelebornTezPerPartitionRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.CelebornTezWriter;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.identity.UserIdentifier;
+import org.apache.celeborn.tez.plugin.util.CelebornTezUtils;
+
+/**
+ * {@link CelebornOrderedPartitionedKVOutput} is an {@link 
AbstractLogicalOutput} which sorts
+ * key/value pairs written to it. It also partitions the output based on a 
{@link Partitioner}
+ */
+@Public
+public class CelebornOrderedPartitionedKVOutput extends 
OrderedPartitionedKVOutput {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CelebornOrderedPartitionedKVOutput.class);
+
+  private long endTime;
+  private int mapNum;
+  private int numOutputs;
+  private int mapId;
+  private int attemptId;
+  private String host;
+  private int port;
+  private int shuffleId;
+  private String appId;
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final Deflater deflater;
+  CelebornTezWriter celebornTezWriter;
+  private boolean sendEmptyPartitionDetails;
+  CelebornConf celebornConf;
+
+  public CelebornOrderedPartitionedKVOutput(OutputContext outputContext, int 
numPhysicalOutputs) {
+    super(outputContext, numPhysicalOutputs);
+    this.deflater = (Deflater) getParentPrivateField(this, "deflater");
+    this.sendEmptyPartitionDetails =
+        (boolean) getParentPrivateField(this, "sendEmptyPartitionDetails");
+    this.numOutputs = getNumPhysicalOutputs();
+    this.mapNum = outputContext.getVertexParallelism();
+    TezTaskAttemptID taskAttemptId =
+        TezTaskAttemptID.fromString(
+            
CelebornTezUtils.uniqueIdentifierToAttemptId(outputContext.getUniqueIdentifier()));
+    attemptId = taskAttemptId.getId();
+    mapId = taskAttemptId.getTaskID().getId();
+  }
+
+  @Override
+  public synchronized List<Event> initialize() throws IOException {
+    super.initialize();
+
+    this.host = this.conf.get(TEZ_CELEBORN_LM_HOST);
+    this.port = this.conf.getInt(TEZ_CELEBORN_LM_PORT, -1);
+    this.shuffleId = this.conf.getInt(TEZ_SHUFFLE_ID, -1);
+    this.appId = this.conf.get(TEZ_CELEBORN_APPLICATION_ID);
+    celebornConf = CelebornTezUtils.fromTezConfiguration(conf);
+    celebornTezWriter =
+        new CelebornTezWriter(
+            shuffleId,
+            mapId,
+            mapId,
+            attemptId,
+            mapNum,
+            numOutputs,
+            celebornConf,
+            appId,
+            host,
+            port,
+            new UserIdentifier(
+                celebornConf.quotaUserSpecificTenant(), 
celebornConf.quotaUserSpecificUserName()));
+
+    return Collections.emptyList();
+  }
+
+  @Override
+  public synchronized void start() throws Exception {
+    super.start();
+    sorter =
+        new CelebornSorter(
+            getContext(),
+            conf,
+            getNumPhysicalOutputs(),
+            (int) memoryUpdateCallbackHandler.getMemoryAssigned(),
+            celebornTezWriter,
+            celebornConf);
+    setParentPrivateField(this, "sorter", sorter);
+  }
+
+  @Override
+  public synchronized List<Event> close() throws IOException {
+    List<Event> returnEvents = Lists.newLinkedList();
+    if (sorter != null) {
+      sorter.flush();
+      returnEvents.addAll(sorter.close());
+      this.endTime = System.nanoTime();
+      returnEvents.addAll(generateEvents());
+      sorter = null;
+    } else {
+      returnEvents = super.close();
+    }
+
+    return returnEvents;
+  }
+
+  private List<Event> generateEvents() throws IOException {
+    List<Event> eventList = Lists.newLinkedList();
+    boolean isLastEvent = true;
+    String auxiliaryService =
+        conf.get(
+            TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+            TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+
+    int[] numRecordsPerPartition = ((CelebornSorter) 
sorter).getNumRecordsPerPartition();
+
+    CelebornTezPerPartitionRecord celebornTezPerPartitionRecord =
+        new CelebornTezPerPartitionRecord(numOutputs, numRecordsPerPartition);
+
+    LOG.info("CelebornTezPerPartitionRecord is initialized");
+
+    ShuffleUtils.generateEventOnSpill(
+        eventList,
+        true,
+        isLastEvent,
+        getContext(),
+        0,
+        celebornTezPerPartitionRecord,
+        getNumPhysicalOutputs(),
+        sendEmptyPartitionDetails,
+        getContext().getUniqueIdentifier(),
+        sorter.getPartitionStats(),
+        sorter.reportDetailedPartitionStats(),
+        auxiliaryService,
+        deflater);
+    LOG.info("Generate events.");
+    return eventList;
+  }
+}
diff --git 
a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/sort/CelebornSorter.java
 
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/sort/CelebornSorter.java
new file mode 100644
index 000000000..99e18be5d
--- /dev/null
+++ 
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/sort/CelebornSorter.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.sort;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.CelebornTezWriter;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.exception.CelebornIOException;
+
+/** {@link CelebornSorter} is an {@link ExternalSorter} */
+public class CelebornSorter extends ExternalSorter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CelebornSorter.class);
+  private CelebornSortBasedPusher celebornSortBasedPusher;
+
+  private int[] numRecordsPerPartition;
+
+  /** Initialization */
+  public CelebornSorter(
+      OutputContext outputContext,
+      Configuration conf,
+      int numOutputs,
+      int initialMemoryAvailable,
+      CelebornTezWriter celebornTezWriter,
+      CelebornConf celebornConf)
+      throws IOException {
+    super(outputContext, conf, numOutputs, initialMemoryAvailable);
+
+    this.numRecordsPerPartition = new int[numOutputs];
+
+    final float spillper =
+        this.conf.getFloat(
+            TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT,
+            TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT_DEFAULT);
+    int pushSize = (int) (availableMemoryMb * spillper);
+    LOG.info("availableMemoryMb is {}", availableMemoryMb);
+    RawComparator intermediateOutputKeyComparator =
+        ConfigUtils.getIntermediateOutputKeyComparator(conf);
+    celebornSortBasedPusher =
+        new CelebornSortBasedPusher<>(
+            keySerializer,
+            valSerializer,
+            initialMemoryAvailable,
+            pushSize,
+            intermediateOutputKeyComparator,
+            mapOutputByteCounter,
+            mapOutputRecordCounter,
+            celebornTezWriter,
+            celebornConf,
+            true);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    celebornSortBasedPusher.close();
+  }
+
+  @Override
+  public final List<Event> close() throws IOException {
+    return super.close();
+  }
+
+  @Override
+  public void write(Object key, Object value) throws IOException {
+    try {
+      collect(key, value, partitioner.getPartition(key, value, partitions));
+    } catch (InterruptedException e) {
+      throw new CelebornIOException(e);
+    }
+  }
+
+  synchronized void collect(Object key, Object value, final int partition)
+      throws IOException, InterruptedException {
+    if (key.getClass() != serializationContext.getKeyClass()) {
+      throw new IOException(
+          "Type mismatch in key from map: expected "
+              + serializationContext.getKeyClass().getName()
+              + ", received "
+              + key.getClass().getName());
+    }
+    if (value.getClass() != serializationContext.getValueClass()) {
+      throw new IOException(
+          "Type mismatch in value from map: expected "
+              + serializationContext.getValueClass().getName()
+              + ", received "
+              + value.getClass().getName());
+    }
+    if (partition < 0 || partition >= partitions) {
+      throw new IOException("Illegal partition for " + key + " (" + partition 
+ ")");
+    }
+
+    celebornSortBasedPusher.insert(key, value, partition);
+    numRecordsPerPartition[partition]++;
+  }
+
+  public int[] getNumRecordsPerPartition() {
+    return numRecordsPerPartition;
+  }
+}

Reply via email to