waitinfuture commented on code in PR #1830:
URL: 
https://github.com/apache/incubator-celeborn/pull/1830#discussion_r1307125426


##########
client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.mapreduce.v2.app;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ipc.CallerContext;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.LifecycleManager;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.exception.CelebornIOException;
+import org.apache.celeborn.util.HadoopUtils;
+
+public class MRAppMasterWithCeleborn extends MRAppMaster {
+  private static final Logger logger = 
LoggerFactory.getLogger(MRAppMasterWithCeleborn.class);
+
+  public MRAppMasterWithCeleborn(
+      ApplicationAttemptId applicationAttemptId,
+      ContainerId containerId,
+      String nmHost,
+      int nmPort,
+      int nmHttpPort,
+      long appSubmitTime,
+      JobConf jobConf)
+      throws CelebornIOException {
+    super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, 
appSubmitTime);
+
+    int numReducers = jobConf.getInt(MRJobConfig.NUM_REDUCES, 0);
+    if (numReducers > 0) {
+      CelebornConf conf = HadoopUtils.fromYarnConf(jobConf);
+      LifecycleManager lifecycleManager =
+          new LifecycleManager(applicationAttemptId.toString(), conf);
+      String lcHost = lifecycleManager.getHost();
+      int lcPort = lifecycleManager.getPort();
+      logger.info("RMAppMaster initialized with {} {} {}", lcHost, lcPort, 
applicationAttemptId);
+      JobConf lcConf = new JobConf();
+      lcConf.clear();
+      lcConf.set(HadoopUtils.MR_CELEBORN_LC_HOST, lcHost);
+      lcConf.set(HadoopUtils.MR_CELEBORN_LC_PORT, lcPort + "");
+      lcConf.set(HadoopUtils.MR_CELEBORN_APPLICATION_ID, 
applicationAttemptId.toString());
+      writeLifecycleManagerConfToTask(jobConf, lcConf);
+    }
+  }
+
+  void writeLifecycleManagerConfToTask(JobConf conf, JobConf lcConf) throws 
CelebornIOException {
+    try {
+      FileSystem fs = new Cluster(conf).getFileSystem();

Review Comment:
   Not sure about the cost to new a `Cluster` object, are there lighter ways to 
get the fs? If this is the normal way to get the fs, please correct me.



##########
client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornSortBasedPusher.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.ShuffleClient;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.unsafe.Platform;
+import org.apache.celeborn.common.util.Utils;
+
+public class CelebornSortBasedPusher<K, V> extends OutputStream {
+  final Logger logger = LoggerFactory.getLogger(CelebornSortBasedPusher.class);
+  private final int mapId;
+  private final int attempt;
+  private final int numMappers;
+  private final int numReducers;
+  private final ShuffleClient shuffleClient;
+  private final int maxIOBufferSize;
+  private final int spillIOBufferSize;
+  private final Serializer<K> kSer;
+  private final Serializer<V> vSer;
+  private final RawComparator<K> comparator;
+  private final AtomicReference<Exception> exception = new AtomicReference<>();
+  private final Counters.Counter mapOutputByteCounter;
+  private final Counters.Counter mapOutputRecordCounter;
+  private final Map<Integer, List<SerializedKV>> currentSerializedKVs;
+  private int writePos;
+  private byte[] serializedKV;
+  private final int maxPushDataSize;
+
+  public CelebornSortBasedPusher(
+      int numMappers,
+      int numReducers,
+      int mapId,
+      int attemptId,
+      Serializer<K> kSer,
+      Serializer<V> vSer,
+      int maxIOBufferSize,
+      int spillIOBufferSize,
+      RawComparator<K> comparator,
+      Counters.Counter mapOutputByteCounter,
+      Counters.Counter mapOutputRecordCounter,
+      ShuffleClient shuffleClient,
+      CelebornConf celebornConf) {
+    this.numMappers = numMappers;
+    this.numReducers = numReducers;
+    this.mapId = mapId;
+    this.attempt = attemptId;
+    this.kSer = kSer;
+    this.vSer = vSer;
+    this.maxIOBufferSize = maxIOBufferSize;
+    this.spillIOBufferSize = spillIOBufferSize;
+    this.mapOutputByteCounter = mapOutputByteCounter;
+    this.mapOutputRecordCounter = mapOutputRecordCounter;
+    this.comparator = comparator;
+    this.shuffleClient = shuffleClient;
+    currentSerializedKVs = new HashMap<>();
+    serializedKV = new byte[maxIOBufferSize];
+    maxPushDataSize = (int) celebornConf.clientMrMaxPushData();
+    logger.info(
+        "Sort based push initialized with"
+            + " numMappers:{} numReducers:{} mapId:{} attemptId:{}"
+            + " maxIOBufferSize:{} spillIOBufferSize:{}",
+        numMappers,
+        numReducers,
+        mapId,
+        attemptId,
+        maxIOBufferSize,
+        spillIOBufferSize);
+    try {
+      kSer.open(this);
+      vSer.open(this);
+    } catch (IOException e) {
+      exception.compareAndSet(null, e);
+    }
+  }
+
+  public void insert(K key, V value, int partition) {
+    try {
+      if (writePos >= spillIOBufferSize) {
+        // needs to sort and flush data
+        if (logger.isDebugEnabled()) {
+          logger.debug(
+              "Data is large enough {}/{}/{}, trigger sort and flush",
+              Utils.bytesToString(writePos),
+              Utils.bytesToString(spillIOBufferSize),
+              Utils.bytesToString(maxIOBufferSize));
+        }
+        synchronized (this) {
+          sortKVs();
+          sendKVAndUpdateWritePos();
+        }
+      }
+      int dataLen = insertRecordInternal(key, value, partition);
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "Sort based pusher insert into partition:{} with {} bytes", 
partition, dataLen);
+      }
+      mapOutputRecordCounter.increment(1);
+      mapOutputByteCounter.increment(dataLen);
+    } catch (IOException e) {
+      exception.compareAndSet(null, e);
+    }
+  }
+
+  private void sendKVAndUpdateWritePos() throws IOException {
+    for (Map.Entry<Integer, List<SerializedKV>> partitionKVEntry :
+        currentSerializedKVs.entrySet()) {
+      List<SerializedKV> kvs = partitionKVEntry.getValue();
+      synchronized (kvs) {
+        int partition = partitionKVEntry.getKey();
+        List<SerializedKV> localKVs = new ArrayList<>();
+        int partitionKVTotalLen = 0;
+        // process buffers for specific partition
+        for (SerializedKV kv : kvs) {
+          partitionKVTotalLen += kv.kLen + kv.vLen;
+          localKVs.add(kv);
+          if (partitionKVTotalLen > maxPushDataSize) {
+            // limit max size of pushdata to avoid possible memory issue in 
Celeborn worker
+            // data layout
+            // pushdata header (16) + pushDataLen(4) +
+            // [keyLen(4)+valLen(4)+serializedRecord(x)][...]
+            sendSortedBuffersPartition(partition, localKVs, 
partitionKVTotalLen);
+            localKVs.clear();
+            partitionKVTotalLen = 0;
+          }
+        }
+        if (!localKVs.isEmpty()) {
+          sendSortedBuffersPartition(partition, localKVs, partitionKVTotalLen);
+        }
+        kvs.clear();
+      }
+    }
+    // all data sent
+    currentSerializedKVs.clear();
+    writePos = 0;
+  }
+
+  private void sendSortedBuffersPartition(
+      int partition, List<SerializedKV> localKVs, int partitionKVTotalLen) 
throws IOException {
+    int extraSize = 0;
+    for (SerializedKV localKV : localKVs) {
+      extraSize += WritableUtils.getVIntSize(localKV.kLen);
+      extraSize += WritableUtils.getVIntSize(localKV.vLen);
+    }
+    // copied from hadoop logic
+    extraSize += WritableUtils.getVIntSize(-1);
+    extraSize += WritableUtils.getVIntSize(-1);
+    // whole buffer's size + 
[(keyLen+valueLen)+(serializedKey+serializedValue)]
+    byte[] pkvs = new byte[4 + extraSize + partitionKVTotalLen];
+    int pkvsPos = 4;
+    Platform.putInt(pkvs, Platform.BYTE_ARRAY_OFFSET, partitionKVTotalLen + 
extraSize);
+    for (SerializedKV kv : localKVs) {
+      int recordLen = kv.kLen + kv.vLen;
+      // write key len
+      pkvsPos = writeDataInt(pkvs, pkvsPos, kv.kLen);
+      // write value len
+      pkvsPos = writeDataInt(pkvs, pkvsPos, kv.vLen);
+      // write serialized record
+      System.arraycopy(serializedKV, kv.offset, pkvs, pkvsPos, recordLen);
+      pkvsPos += recordLen;
+    }
+    // finally write -1 two times
+    pkvsPos = writeDataInt(pkvs, pkvsPos, -1);
+    writeDataInt(pkvs, pkvsPos, -1);
+    int compressedSize =
+        shuffleClient.pushData(
+            0,

Review Comment:
   Better to comment about why shuffleId is always set to 0



##########
client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornSortBasedPusher.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.ShuffleClient;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.unsafe.Platform;
+import org.apache.celeborn.common.util.Utils;
+
+public class CelebornSortBasedPusher<K, V> extends OutputStream {
+  final Logger logger = LoggerFactory.getLogger(CelebornSortBasedPusher.class);
+  private final int mapId;
+  private final int attempt;
+  private final int numMappers;
+  private final int numReducers;
+  private final ShuffleClient shuffleClient;
+  private final int maxIOBufferSize;
+  private final int spillIOBufferSize;
+  private final Serializer<K> kSer;
+  private final Serializer<V> vSer;
+  private final RawComparator<K> comparator;
+  private final AtomicReference<Exception> exception = new AtomicReference<>();
+  private final Counters.Counter mapOutputByteCounter;
+  private final Counters.Counter mapOutputRecordCounter;
+  private final Map<Integer, List<SerializedKV>> currentSerializedKVs;
+  private int writePos;
+  private byte[] serializedKV;
+  private final int maxPushDataSize;
+
+  public CelebornSortBasedPusher(
+      int numMappers,
+      int numReducers,
+      int mapId,
+      int attemptId,
+      Serializer<K> kSer,
+      Serializer<V> vSer,
+      int maxIOBufferSize,
+      int spillIOBufferSize,
+      RawComparator<K> comparator,
+      Counters.Counter mapOutputByteCounter,
+      Counters.Counter mapOutputRecordCounter,
+      ShuffleClient shuffleClient,
+      CelebornConf celebornConf) {
+    this.numMappers = numMappers;
+    this.numReducers = numReducers;
+    this.mapId = mapId;
+    this.attempt = attemptId;
+    this.kSer = kSer;
+    this.vSer = vSer;
+    this.maxIOBufferSize = maxIOBufferSize;
+    this.spillIOBufferSize = spillIOBufferSize;
+    this.mapOutputByteCounter = mapOutputByteCounter;
+    this.mapOutputRecordCounter = mapOutputRecordCounter;
+    this.comparator = comparator;
+    this.shuffleClient = shuffleClient;
+    currentSerializedKVs = new HashMap<>();
+    serializedKV = new byte[maxIOBufferSize];
+    maxPushDataSize = (int) celebornConf.clientMrMaxPushData();
+    logger.info(
+        "Sort based push initialized with"
+            + " numMappers:{} numReducers:{} mapId:{} attemptId:{}"
+            + " maxIOBufferSize:{} spillIOBufferSize:{}",
+        numMappers,
+        numReducers,
+        mapId,
+        attemptId,
+        maxIOBufferSize,
+        spillIOBufferSize);
+    try {
+      kSer.open(this);
+      vSer.open(this);
+    } catch (IOException e) {
+      exception.compareAndSet(null, e);
+    }
+  }
+
+  public void insert(K key, V value, int partition) {
+    try {
+      if (writePos >= spillIOBufferSize) {
+        // needs to sort and flush data
+        if (logger.isDebugEnabled()) {
+          logger.debug(
+              "Data is large enough {}/{}/{}, trigger sort and flush",
+              Utils.bytesToString(writePos),
+              Utils.bytesToString(spillIOBufferSize),
+              Utils.bytesToString(maxIOBufferSize));
+        }
+        synchronized (this) {
+          sortKVs();
+          sendKVAndUpdateWritePos();
+        }
+      }
+      int dataLen = insertRecordInternal(key, value, partition);
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "Sort based pusher insert into partition:{} with {} bytes", 
partition, dataLen);
+      }
+      mapOutputRecordCounter.increment(1);
+      mapOutputByteCounter.increment(dataLen);
+    } catch (IOException e) {
+      exception.compareAndSet(null, e);
+    }
+  }
+
+  private void sendKVAndUpdateWritePos() throws IOException {
+    for (Map.Entry<Integer, List<SerializedKV>> partitionKVEntry :
+        currentSerializedKVs.entrySet()) {
+      List<SerializedKV> kvs = partitionKVEntry.getValue();
+      synchronized (kvs) {
+        int partition = partitionKVEntry.getKey();
+        List<SerializedKV> localKVs = new ArrayList<>();
+        int partitionKVTotalLen = 0;
+        // process buffers for specific partition
+        for (SerializedKV kv : kvs) {
+          partitionKVTotalLen += kv.kLen + kv.vLen;
+          localKVs.add(kv);
+          if (partitionKVTotalLen > maxPushDataSize) {
+            // limit max size of pushdata to avoid possible memory issue in 
Celeborn worker
+            // data layout
+            // pushdata header (16) + pushDataLen(4) +
+            // [varKeyLen+varValLen+serializedRecord(x)][...]
+            sendSortedBuffersPartition(partition, localKVs, 
partitionKVTotalLen);
+            localKVs.clear();
+            partitionKVTotalLen = 0;
+          }
+        }
+        if (!localKVs.isEmpty()) {
+          sendSortedBuffersPartition(partition, localKVs, partitionKVTotalLen);
+        }
+        kvs.clear();
+      }
+    }
+    // all data sent
+    currentSerializedKVs.clear();
+    writePos = 0;
+  }
+
+  private void sendSortedBuffersPartition(
+      int partition, List<SerializedKV> localKVs, int partitionKVTotalLen) 
throws IOException {
+    int extraSize = 0;
+    for (SerializedKV localKV : localKVs) {
+      extraSize += WritableUtils.getVIntSize(localKV.kLen);
+      extraSize += WritableUtils.getVIntSize(localKV.vLen);
+    }
+    // copied from hadoop logic
+    extraSize += WritableUtils.getVIntSize(-1);
+    extraSize += WritableUtils.getVIntSize(-1);
+    // whole buffer's size + 
[(keyLen+valueLen)+(serializedKey+serializedValue)]
+    byte[] pkvs = new byte[4 + extraSize + partitionKVTotalLen];
+    int pkvsPos = 4;
+    Platform.putInt(pkvs, Platform.BYTE_ARRAY_OFFSET, partitionKVTotalLen + 
extraSize);
+    for (SerializedKV kv : localKVs) {
+      int recordLen = kv.kLen + kv.vLen;
+      // write key len
+      pkvsPos = writeVLong(pkvs, pkvsPos, kv.kLen);
+      // write value len
+      pkvsPos = writeVLong(pkvs, pkvsPos, kv.vLen);
+      // write serialized record
+      System.arraycopy(serializedKV, kv.offset, pkvs, pkvsPos, recordLen);
+      pkvsPos += recordLen;
+    }
+    // finally write -1 two times
+    pkvsPos = writeVLong(pkvs, pkvsPos, -1);
+    writeVLong(pkvs, pkvsPos, -1);
+    int compressedSize =
+        shuffleClient.pushData(
+            0,
+            mapId,
+            attempt,
+            partition,
+            pkvs,
+            0,
+            4 + extraSize + partitionKVTotalLen,
+            numMappers,
+            numReducers);
+    if (logger.isDebugEnabled()) {
+      logger.debug(
+          "Send sorted buffer mapId:{} attemptId:{} to partition:{} 
uncompressed size:{} compressed size:{}",
+          mapId,
+          attempt,
+          partition,
+          Utils.bytesToString(4 + extraSize + partitionKVTotalLen),
+          Utils.bytesToString(compressedSize));
+    }
+  }
+
+  /**
+   * Write variable length int to array Modified from
+   * org.apache.hadoop.io.WritableUtils#writeVLong(java.io.DataOutput, long)
+   */
+  private int writeVLong(byte[] data, int offset, long dataInt) {
+    if (dataInt >= -112L && dataInt <= 127L) {
+      data[offset++] = (byte) ((int) dataInt);
+      return offset;
+    }
+
+    int len = -112;
+    if (dataInt < 0L) {
+      dataInt ^= -1L;
+      len = -120;
+    }
+
+    long tmp = dataInt;
+    while (tmp != 0) {
+      tmp = tmp >> 8;
+      len--;
+    }
+
+    data[offset++] = (byte) len;
+
+    len = len < -120 ? -(len + 120) : -(len + 112);
+
+    for (int idx = len; idx != 0; --idx) {
+      int shiftBits = (idx - 1) * 8;
+      long mask = 0xFFL << shiftBits;
+      data[offset++] = ((byte) ((int) ((dataInt & mask) >> shiftBits)));
+    }
+    return offset;
+  }
+
+  private void sortKVs() {
+    for (Map.Entry<Integer, List<SerializedKV>> partitionKVEntry :
+        currentSerializedKVs.entrySet()) {
+      partitionKVEntry
+          .getValue()
+          .sort(
+              (o1, o2) ->
+                  comparator.compare(
+                      serializedKV, o1.offset, o1.kLen, serializedKV, 
o2.offset, o2.kLen));
+    }
+  }
+
+  private int insertRecordInternal(K key, V value, int partition) throws 
IOException {

Review Comment:
   Should also consider giant records, maybe in later PRs



##########
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java:
##########
@@ -93,8 +93,22 @@ public int read(byte[] b, int off, int len) throws 
IOException {
 
         @Override
         public void setCallback(MetricsCallback callback) {}
+
+        @Override
+        public int totalPartitionsToRead() {
+          return 0;
+        }
+
+        @Override
+        public int readPartitions() {
+          return 0;
+        }
       };
 
+  public abstract int totalPartitionsToRead();

Review Comment:
   Better to move these methods after `logger`



##########
client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapreduce.task.reduce.Shuffle.ShuffleError;
+import org.apache.hadoop.util.Progress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.ShuffleClient;
+import org.apache.celeborn.client.read.CelebornInputStream;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.identity.UserIdentifier;
+import org.apache.celeborn.reflect.DynConstructors;
+import org.apache.celeborn.reflect.DynMethods;
+import org.apache.celeborn.util.HadoopUtils;
+
+public class CelebornShuffleConsumer<K, V>
+    implements ShuffleConsumerPlugin<K, V>, ExceptionReporter {
+  private static final Logger logger = 
LoggerFactory.getLogger(CelebornShuffleConsumer.class);
+  private JobConf mrJobConf;
+  private MergeManager<K, V> merger;
+  private Throwable throwable = null;
+  private Progress copyPhase;
+  private TaskStatus taskStatus;
+  private org.apache.hadoop.mapreduce.TaskAttemptID reduceId;
+  private TaskUmbilicalProtocol umbilical;
+  private Reporter reporter;
+  private ShuffleClientMetrics metrics;
+  private Task reduceTask;
+  private ShuffleClient shuffleClient;
+
+  @Override
+  public void init(Context<K, V> context) {
+
+    reduceId = context.getReduceId();
+    mrJobConf = context.getJobConf();
+    JobConf celebornJobConf = new JobConf(HadoopUtils.MR_CELEBORN_CONF);
+
+    umbilical = context.getUmbilical();
+    reporter = context.getReporter();
+    try {
+      this.metrics = createMetrics(reduceId, mrJobConf);
+    } catch (Exception e) {
+      logger.error("Fatal error occurred, failed to get shuffle client 
metrics.", e);
+      reportException(e);
+    }
+    copyPhase = context.getCopyPhase();
+    taskStatus = context.getStatus();
+    reduceTask = context.getReduceTask();
+
+    String appId = celebornJobConf.get(HadoopUtils.MR_CELEBORN_APPLICATION_ID);
+    String lcHost = celebornJobConf.get(HadoopUtils.MR_CELEBORN_LC_HOST);
+    int lcPort = 
Integer.parseInt(celebornJobConf.get(HadoopUtils.MR_CELEBORN_LC_PORT));
+    logger.info("Reducer initialized with celeborn {} {} {}", appId, lcHost, 
lcPort);
+    CelebornConf celebornConf = HadoopUtils.fromYarnConf(mrJobConf);
+    shuffleClient =
+        ShuffleClient.get(
+            appId,
+            lcHost,
+            lcPort,
+            celebornConf,
+            new UserIdentifier(
+                celebornConf.quotaUserSpecificTenant(), 
celebornConf.quotaUserSpecificUserName()));
+    this.merger = createMergeManager(context);
+  }
+
+  // Merge mapOutput and spill in local disks if necessary
+  protected MergeManager<K, V> 
createMergeManager(ShuffleConsumerPlugin.Context context) {
+    return new MergeManagerImpl<K, V>(
+        reduceId,
+        mrJobConf,
+        context.getLocalFS(),
+        context.getLocalDirAllocator(),
+        reporter,
+        context.getCodec(),
+        context.getCombinerClass(),
+        context.getCombineCollector(),
+        context.getSpilledRecordsCounter(),
+        context.getReduceCombineInputCounter(),
+        context.getMergedMapOutputsCounter(),
+        this,
+        context.getMergePhase(),
+        context.getMapOutputFile());
+  }
+
+  private ShuffleClientMetrics createMetrics(
+      org.apache.hadoop.mapreduce.TaskAttemptID taskAttemptID, JobConf jobConf)
+      throws NoSuchMethodException {
+    // for hadoop 3
+    try {
+      return DynMethods.builder("create")
+          .impl(
+              ShuffleClientMetrics.class,
+              org.apache.hadoop.mapreduce.TaskAttemptID.class,
+              JobConf.class)
+          .buildStaticChecked()
+          .invoke(taskAttemptID, jobConf);
+    } catch (Exception e) {
+      // ignore this exception because the createMetrics might use hadoop2
+    }
+    // for hadoop 2
+    return DynConstructors.builder(ShuffleClientMetrics.class)
+        .hiddenImpl(new Class[] 
{org.apache.hadoop.mapreduce.TaskAttemptID.class, JobConf.class})
+        .buildChecked()
+        .invoke(null, taskAttemptID, jobConf);
+  }
+
+  @Override
+  public RawKeyValueIterator run() throws IOException, InterruptedException {
+    logger.info(
+        "In reduce:{}, Celeborn mr client start to read shuffle data."
+            + " Create inputstream with params: shuffleId 0 reduceId:{} 
attemptId:{}",
+        reduceId,
+        reduceId.getTaskID().getId(),
+        reduceId.getId());
+
+    CelebornInputStream shuffleInputStream =
+        shuffleClient.readPartition(
+            0, reduceId.getTaskID().getId(), reduceId.getId(), 0, 
Integer.MAX_VALUE);
+    CelebornShuffleFetcher<K, V> shuffleReader =
+        new CelebornShuffleFetcher(
+            reduceId, taskStatus, merger, copyPhase, reporter, metrics, 
shuffleInputStream);
+    shuffleReader.fetchAndMerge();
+
+    copyPhase.complete();
+    taskStatus.setPhase(TaskStatus.Phase.SORT);
+    reduceTask.statusUpdate(umbilical);
+
+    // Finish the on-going merges...
+    RawKeyValueIterator kvIter = null;

Review Comment:
   No need to initialize to null



##########
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java:
##########
@@ -93,8 +93,22 @@ public int read(byte[] b, int off, int len) throws 
IOException {
 
         @Override
         public void setCallback(MetricsCallback callback) {}
+
+        @Override
+        public int totalPartitionsToRead() {
+          return 0;
+        }
+
+        @Override
+        public int readPartitions() {

Review Comment:
   partitionsRead()



##########
client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.mapreduce.v2.app;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ipc.CallerContext;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.LifecycleManager;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.exception.CelebornIOException;
+import org.apache.celeborn.util.HadoopUtils;
+
+public class MRAppMasterWithCeleborn extends MRAppMaster {
+  private static final Logger logger = 
LoggerFactory.getLogger(MRAppMasterWithCeleborn.class);
+
+  public MRAppMasterWithCeleborn(
+      ApplicationAttemptId applicationAttemptId,
+      ContainerId containerId,
+      String nmHost,
+      int nmPort,
+      int nmHttpPort,
+      long appSubmitTime,
+      JobConf jobConf)
+      throws CelebornIOException {
+    super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, 
appSubmitTime);
+
+    int numReducers = jobConf.getInt(MRJobConfig.NUM_REDUCES, 0);
+    if (numReducers > 0) {
+      CelebornConf conf = HadoopUtils.fromYarnConf(jobConf);
+      LifecycleManager lifecycleManager =
+          new LifecycleManager(applicationAttemptId.toString(), conf);
+      String lcHost = lifecycleManager.getHost();

Review Comment:
   lmHost, and other places



##########
client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.mapreduce.v2.app;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ipc.CallerContext;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.LifecycleManager;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.exception.CelebornIOException;
+import org.apache.celeborn.util.HadoopUtils;
+
+public class MRAppMasterWithCeleborn extends MRAppMaster {
+  private static final Logger logger = 
LoggerFactory.getLogger(MRAppMasterWithCeleborn.class);
+
+  public MRAppMasterWithCeleborn(
+      ApplicationAttemptId applicationAttemptId,
+      ContainerId containerId,
+      String nmHost,
+      int nmPort,
+      int nmHttpPort,
+      long appSubmitTime,
+      JobConf jobConf)
+      throws CelebornIOException {
+    super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, 
appSubmitTime);
+
+    int numReducers = jobConf.getInt(MRJobConfig.NUM_REDUCES, 0);
+    if (numReducers > 0) {
+      CelebornConf conf = HadoopUtils.fromYarnConf(jobConf);
+      LifecycleManager lifecycleManager =
+          new LifecycleManager(applicationAttemptId.toString(), conf);
+      String lcHost = lifecycleManager.getHost();
+      int lcPort = lifecycleManager.getPort();
+      logger.info("RMAppMaster initialized with {} {} {}", lcHost, lcPort, 
applicationAttemptId);
+      JobConf lcConf = new JobConf();
+      lcConf.clear();
+      lcConf.set(HadoopUtils.MR_CELEBORN_LC_HOST, lcHost);
+      lcConf.set(HadoopUtils.MR_CELEBORN_LC_PORT, lcPort + "");
+      lcConf.set(HadoopUtils.MR_CELEBORN_APPLICATION_ID, 
applicationAttemptId.toString());
+      writeLifecycleManagerConfToTask(jobConf, lcConf);
+    }
+  }
+
+  void writeLifecycleManagerConfToTask(JobConf conf, JobConf lcConf) throws 
CelebornIOException {
+    try {
+      FileSystem fs = new Cluster(conf).getFileSystem();
+      String jobDirStr = conf.get(MRJobConfig.MAPREDUCE_JOB_DIR);
+      Path celebornExtraConf = new Path(jobDirStr, 
HadoopUtils.MR_CELEBORN_CONF);
+
+      try (FSDataOutputStream out =
+          FileSystem.create(
+              fs, celebornExtraConf, new 
FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION))) {
+        lcConf.writeXml(out);
+      }
+      FileStatus status = fs.getFileStatus(celebornExtraConf);
+      long currentTs = status.getModificationTime();
+      String uri = fs.getUri() + Path.SEPARATOR + celebornExtraConf.toUri();
+      String files = conf.get(MRJobConfig.CACHE_FILES);
+      conf.set(MRJobConfig.CACHE_FILES, files == null ? uri : uri + "," + 
files);
+      String ts = conf.get(MRJobConfig.CACHE_FILE_TIMESTAMPS);
+      conf.set(
+          MRJobConfig.CACHE_FILE_TIMESTAMPS,
+          ts == null ? String.valueOf(currentTs) : currentTs + "," + ts);
+      String vis = conf.get(MRJobConfig.CACHE_FILE_VISIBILITIES);
+      conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, vis == null ? "false" : 
"false" + "," + vis);
+      long size = status.getLen();
+      String sizes = conf.get(MRJobConfig.CACHE_FILES_SIZES);
+      conf.set(
+          MRJobConfig.CACHE_FILES_SIZES, sizes == null ? String.valueOf(size) 
: size + "," + sizes);
+    } catch (InterruptedException | IOException e) {
+      logger.error("Upload extra conf exception", e);
+      throw new CelebornIOException("Upload extra conf exception ", e);
+    }
+  }
+
+  private static String getSysEnvAndValidateInputParam(String envName) throws 
IOException {

Review Comment:
   better to rename to `ensureGetSysEnv` or `getSysEnvOrThrow`



##########
client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornSortBasedPusher.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.ShuffleClient;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.unsafe.Platform;
+import org.apache.celeborn.common.util.Utils;
+
+public class CelebornSortBasedPusher<K, V> extends OutputStream {
+  final Logger logger = LoggerFactory.getLogger(CelebornSortBasedPusher.class);
+  private final int mapId;
+  private final int attempt;
+  private final int numMappers;
+  private final int numReducers;
+  private final ShuffleClient shuffleClient;
+  private final int maxIOBufferSize;
+  private final int spillIOBufferSize;
+  private final Serializer<K> kSer;
+  private final Serializer<V> vSer;
+  private final RawComparator<K> comparator;
+  private final AtomicReference<Exception> exception = new AtomicReference<>();
+  private final Counters.Counter mapOutputByteCounter;
+  private final Counters.Counter mapOutputRecordCounter;
+  private final Map<Integer, List<SerializedKV>> currentSerializedKVs;
+  private int writePos;
+  private byte[] serializedKV;
+  private final int maxPushDataSize;
+
+  public CelebornSortBasedPusher(
+      int numMappers,
+      int numReducers,
+      int mapId,
+      int attemptId,
+      Serializer<K> kSer,
+      Serializer<V> vSer,
+      int maxIOBufferSize,
+      int spillIOBufferSize,
+      RawComparator<K> comparator,
+      Counters.Counter mapOutputByteCounter,
+      Counters.Counter mapOutputRecordCounter,
+      ShuffleClient shuffleClient,
+      CelebornConf celebornConf) {
+    this.numMappers = numMappers;
+    this.numReducers = numReducers;
+    this.mapId = mapId;
+    this.attempt = attemptId;
+    this.kSer = kSer;
+    this.vSer = vSer;
+    this.maxIOBufferSize = maxIOBufferSize;
+    this.spillIOBufferSize = spillIOBufferSize;
+    this.mapOutputByteCounter = mapOutputByteCounter;
+    this.mapOutputRecordCounter = mapOutputRecordCounter;
+    this.comparator = comparator;
+    this.shuffleClient = shuffleClient;
+    currentSerializedKVs = new HashMap<>();
+    serializedKV = new byte[maxIOBufferSize];
+    maxPushDataSize = (int) celebornConf.clientMrMaxPushData();
+    logger.info(
+        "Sort based push initialized with"
+            + " numMappers:{} numReducers:{} mapId:{} attemptId:{}"
+            + " maxIOBufferSize:{} spillIOBufferSize:{}",
+        numMappers,
+        numReducers,
+        mapId,
+        attemptId,
+        maxIOBufferSize,
+        spillIOBufferSize);
+    try {
+      kSer.open(this);
+      vSer.open(this);
+    } catch (IOException e) {
+      exception.compareAndSet(null, e);
+    }
+  }
+
+  public void insert(K key, V value, int partition) {
+    try {
+      if (writePos >= spillIOBufferSize) {
+        // needs to sort and flush data
+        if (logger.isDebugEnabled()) {
+          logger.debug(
+              "Data is large enough {}/{}/{}, trigger sort and flush",
+              Utils.bytesToString(writePos),
+              Utils.bytesToString(spillIOBufferSize),
+              Utils.bytesToString(maxIOBufferSize));
+        }
+        synchronized (this) {
+          sortKVs();
+          sendKVAndUpdateWritePos();
+        }
+      }
+      int dataLen = insertRecordInternal(key, value, partition);
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "Sort based pusher insert into partition:{} with {} bytes", 
partition, dataLen);
+      }
+      mapOutputRecordCounter.increment(1);
+      mapOutputByteCounter.increment(dataLen);
+    } catch (IOException e) {
+      exception.compareAndSet(null, e);
+    }
+  }
+
+  private void sendKVAndUpdateWritePos() throws IOException {
+    for (Map.Entry<Integer, List<SerializedKV>> partitionKVEntry :
+        currentSerializedKVs.entrySet()) {
+      List<SerializedKV> kvs = partitionKVEntry.getValue();
+      synchronized (kvs) {

Review Comment:
   `sendKVAndUpdateWritePos` is called inside `synchronized(this)`, so I think 
it unnecessary to sync here



##########
client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.mapreduce.v2.app;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ipc.CallerContext;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.LifecycleManager;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.exception.CelebornIOException;
+import org.apache.celeborn.util.HadoopUtils;
+
+public class MRAppMasterWithCeleborn extends MRAppMaster {
+  private static final Logger logger = 
LoggerFactory.getLogger(MRAppMasterWithCeleborn.class);
+
+  public MRAppMasterWithCeleborn(
+      ApplicationAttemptId applicationAttemptId,
+      ContainerId containerId,
+      String nmHost,
+      int nmPort,
+      int nmHttpPort,
+      long appSubmitTime,
+      JobConf jobConf)
+      throws CelebornIOException {
+    super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, 
appSubmitTime);
+
+    int numReducers = jobConf.getInt(MRJobConfig.NUM_REDUCES, 0);
+    if (numReducers > 0) {
+      CelebornConf conf = HadoopUtils.fromYarnConf(jobConf);
+      LifecycleManager lifecycleManager =
+          new LifecycleManager(applicationAttemptId.toString(), conf);
+      String lcHost = lifecycleManager.getHost();
+      int lcPort = lifecycleManager.getPort();
+      logger.info("RMAppMaster initialized with {} {} {}", lcHost, lcPort, 
applicationAttemptId);
+      JobConf lcConf = new JobConf();

Review Comment:
   lmConf



##########
client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.mapreduce.v2.app;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ipc.CallerContext;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.LifecycleManager;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.exception.CelebornIOException;
+import org.apache.celeborn.util.HadoopUtils;
+
+public class MRAppMasterWithCeleborn extends MRAppMaster {
+  private static final Logger logger = 
LoggerFactory.getLogger(MRAppMasterWithCeleborn.class);
+
+  public MRAppMasterWithCeleborn(
+      ApplicationAttemptId applicationAttemptId,
+      ContainerId containerId,
+      String nmHost,
+      int nmPort,
+      int nmHttpPort,
+      long appSubmitTime,
+      JobConf jobConf)
+      throws CelebornIOException {
+    super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, 
appSubmitTime);
+
+    int numReducers = jobConf.getInt(MRJobConfig.NUM_REDUCES, 0);
+    if (numReducers > 0) {
+      CelebornConf conf = HadoopUtils.fromYarnConf(jobConf);
+      LifecycleManager lifecycleManager =
+          new LifecycleManager(applicationAttemptId.toString(), conf);
+      String lcHost = lifecycleManager.getHost();
+      int lcPort = lifecycleManager.getPort();

Review Comment:
   lmPort



##########
client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornSortBasedPusher.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.ShuffleClient;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.unsafe.Platform;
+import org.apache.celeborn.common.util.Utils;
+
+public class CelebornSortBasedPusher<K, V> extends OutputStream {
+  final Logger logger = LoggerFactory.getLogger(CelebornSortBasedPusher.class);
+  private final int mapId;
+  private final int attempt;
+  private final int numMappers;
+  private final int numReducers;
+  private final ShuffleClient shuffleClient;
+  private final int maxIOBufferSize;
+  private final int spillIOBufferSize;
+  private final Serializer<K> kSer;
+  private final Serializer<V> vSer;
+  private final RawComparator<K> comparator;
+  private final AtomicReference<Exception> exception = new AtomicReference<>();
+  private final Counters.Counter mapOutputByteCounter;
+  private final Counters.Counter mapOutputRecordCounter;
+  private final Map<Integer, List<SerializedKV>> currentSerializedKVs;
+  private int writePos;
+  private byte[] serializedKV;
+  private final int maxPushDataSize;
+
+  public CelebornSortBasedPusher(
+      int numMappers,
+      int numReducers,
+      int mapId,
+      int attemptId,
+      Serializer<K> kSer,
+      Serializer<V> vSer,
+      int maxIOBufferSize,
+      int spillIOBufferSize,
+      RawComparator<K> comparator,
+      Counters.Counter mapOutputByteCounter,
+      Counters.Counter mapOutputRecordCounter,
+      ShuffleClient shuffleClient,
+      CelebornConf celebornConf) {
+    this.numMappers = numMappers;
+    this.numReducers = numReducers;
+    this.mapId = mapId;
+    this.attempt = attemptId;
+    this.kSer = kSer;
+    this.vSer = vSer;
+    this.maxIOBufferSize = maxIOBufferSize;
+    this.spillIOBufferSize = spillIOBufferSize;
+    this.mapOutputByteCounter = mapOutputByteCounter;
+    this.mapOutputRecordCounter = mapOutputRecordCounter;
+    this.comparator = comparator;
+    this.shuffleClient = shuffleClient;
+    currentSerializedKVs = new HashMap<>();
+    serializedKV = new byte[maxIOBufferSize];
+    maxPushDataSize = (int) celebornConf.clientMrMaxPushData();
+    logger.info(
+        "Sort based push initialized with"
+            + " numMappers:{} numReducers:{} mapId:{} attemptId:{}"
+            + " maxIOBufferSize:{} spillIOBufferSize:{}",
+        numMappers,
+        numReducers,
+        mapId,
+        attemptId,
+        maxIOBufferSize,
+        spillIOBufferSize);
+    try {
+      kSer.open(this);
+      vSer.open(this);
+    } catch (IOException e) {
+      exception.compareAndSet(null, e);
+    }
+  }
+
+  public void insert(K key, V value, int partition) {
+    try {
+      if (writePos >= spillIOBufferSize) {
+        // needs to sort and flush data
+        if (logger.isDebugEnabled()) {
+          logger.debug(
+              "Data is large enough {}/{}/{}, trigger sort and flush",
+              Utils.bytesToString(writePos),
+              Utils.bytesToString(spillIOBufferSize),
+              Utils.bytesToString(maxIOBufferSize));
+        }
+        synchronized (this) {
+          sortKVs();
+          sendKVAndUpdateWritePos();
+        }
+      }
+      int dataLen = insertRecordInternal(key, value, partition);
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "Sort based pusher insert into partition:{} with {} bytes", 
partition, dataLen);
+      }
+      mapOutputRecordCounter.increment(1);
+      mapOutputByteCounter.increment(dataLen);
+    } catch (IOException e) {
+      exception.compareAndSet(null, e);
+    }
+  }
+
+  private void sendKVAndUpdateWritePos() throws IOException {
+    for (Map.Entry<Integer, List<SerializedKV>> partitionKVEntry :
+        currentSerializedKVs.entrySet()) {
+      List<SerializedKV> kvs = partitionKVEntry.getValue();
+      synchronized (kvs) {
+        int partition = partitionKVEntry.getKey();
+        List<SerializedKV> localKVs = new ArrayList<>();
+        int partitionKVTotalLen = 0;
+        // process buffers for specific partition
+        for (SerializedKV kv : kvs) {
+          partitionKVTotalLen += kv.kLen + kv.vLen;
+          localKVs.add(kv);
+          if (partitionKVTotalLen > maxPushDataSize) {
+            // limit max size of pushdata to avoid possible memory issue in 
Celeborn worker
+            // data layout
+            // pushdata header (16) + pushDataLen(4) +
+            // [varKeyLen+varValLen+serializedRecord(x)][...]
+            sendSortedBuffersPartition(partition, localKVs, 
partitionKVTotalLen);
+            localKVs.clear();
+            partitionKVTotalLen = 0;
+          }
+        }
+        if (!localKVs.isEmpty()) {
+          sendSortedBuffersPartition(partition, localKVs, partitionKVTotalLen);
+        }
+        kvs.clear();
+      }
+    }
+    // all data sent
+    currentSerializedKVs.clear();
+    writePos = 0;
+  }
+
+  private void sendSortedBuffersPartition(
+      int partition, List<SerializedKV> localKVs, int partitionKVTotalLen) 
throws IOException {
+    int extraSize = 0;
+    for (SerializedKV localKV : localKVs) {
+      extraSize += WritableUtils.getVIntSize(localKV.kLen);
+      extraSize += WritableUtils.getVIntSize(localKV.vLen);
+    }
+    // copied from hadoop logic
+    extraSize += WritableUtils.getVIntSize(-1);
+    extraSize += WritableUtils.getVIntSize(-1);
+    // whole buffer's size + 
[(keyLen+valueLen)+(serializedKey+serializedValue)]
+    byte[] pkvs = new byte[4 + extraSize + partitionKVTotalLen];
+    int pkvsPos = 4;
+    Platform.putInt(pkvs, Platform.BYTE_ARRAY_OFFSET, partitionKVTotalLen + 
extraSize);
+    for (SerializedKV kv : localKVs) {
+      int recordLen = kv.kLen + kv.vLen;
+      // write key len
+      pkvsPos = writeVLong(pkvs, pkvsPos, kv.kLen);
+      // write value len
+      pkvsPos = writeVLong(pkvs, pkvsPos, kv.vLen);
+      // write serialized record
+      System.arraycopy(serializedKV, kv.offset, pkvs, pkvsPos, recordLen);
+      pkvsPos += recordLen;
+    }
+    // finally write -1 two times
+    pkvsPos = writeVLong(pkvs, pkvsPos, -1);
+    writeVLong(pkvs, pkvsPos, -1);
+    int compressedSize =
+        shuffleClient.pushData(
+            0,
+            mapId,
+            attempt,
+            partition,
+            pkvs,
+            0,
+            4 + extraSize + partitionKVTotalLen,
+            numMappers,
+            numReducers);
+    if (logger.isDebugEnabled()) {
+      logger.debug(
+          "Send sorted buffer mapId:{} attemptId:{} to partition:{} 
uncompressed size:{} compressed size:{}",
+          mapId,
+          attempt,
+          partition,
+          Utils.bytesToString(4 + extraSize + partitionKVTotalLen),
+          Utils.bytesToString(compressedSize));
+    }
+  }
+
+  /**
+   * Write variable length int to array Modified from
+   * org.apache.hadoop.io.WritableUtils#writeVLong(java.io.DataOutput, long)
+   */
+  private int writeVLong(byte[] data, int offset, long dataInt) {
+    if (dataInt >= -112L && dataInt <= 127L) {
+      data[offset++] = (byte) ((int) dataInt);
+      return offset;
+    }
+
+    int len = -112;
+    if (dataInt < 0L) {
+      dataInt ^= -1L;
+      len = -120;
+    }
+
+    long tmp = dataInt;
+    while (tmp != 0) {
+      tmp = tmp >> 8;
+      len--;
+    }
+
+    data[offset++] = (byte) len;
+
+    len = len < -120 ? -(len + 120) : -(len + 112);
+
+    for (int idx = len; idx != 0; --idx) {
+      int shiftBits = (idx - 1) * 8;
+      long mask = 0xFFL << shiftBits;
+      data[offset++] = ((byte) ((int) ((dataInt & mask) >> shiftBits)));
+    }
+    return offset;
+  }
+
+  private void sortKVs() {
+    for (Map.Entry<Integer, List<SerializedKV>> partitionKVEntry :
+        currentSerializedKVs.entrySet()) {
+      partitionKVEntry
+          .getValue()
+          .sort(
+              (o1, o2) ->
+                  comparator.compare(
+                      serializedKV, o1.offset, o1.kLen, serializedKV, 
o2.offset, o2.kLen));
+    }
+  }
+
+  private int insertRecordInternal(K key, V value, int partition) throws 
IOException {
+    int offset = writePos;
+    int keyLen = writePos;

Review Comment:
   L261 and L262 is unnecessary



##########
client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.mapreduce.v2.app;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ipc.CallerContext;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.LifecycleManager;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.exception.CelebornIOException;
+import org.apache.celeborn.util.HadoopUtils;
+
+public class MRAppMasterWithCeleborn extends MRAppMaster {
+  private static final Logger logger = 
LoggerFactory.getLogger(MRAppMasterWithCeleborn.class);
+
+  public MRAppMasterWithCeleborn(
+      ApplicationAttemptId applicationAttemptId,
+      ContainerId containerId,
+      String nmHost,
+      int nmPort,
+      int nmHttpPort,
+      long appSubmitTime,
+      JobConf jobConf)
+      throws CelebornIOException {
+    super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, 
appSubmitTime);
+
+    int numReducers = jobConf.getInt(MRJobConfig.NUM_REDUCES, 0);
+    if (numReducers > 0) {
+      CelebornConf conf = HadoopUtils.fromYarnConf(jobConf);
+      LifecycleManager lifecycleManager =
+          new LifecycleManager(applicationAttemptId.toString(), conf);
+      String lcHost = lifecycleManager.getHost();
+      int lcPort = lifecycleManager.getPort();
+      logger.info("RMAppMaster initialized with {} {} {}", lcHost, lcPort, 
applicationAttemptId);
+      JobConf lcConf = new JobConf();
+      lcConf.clear();
+      lcConf.set(HadoopUtils.MR_CELEBORN_LC_HOST, lcHost);
+      lcConf.set(HadoopUtils.MR_CELEBORN_LC_PORT, lcPort + "");
+      lcConf.set(HadoopUtils.MR_CELEBORN_APPLICATION_ID, 
applicationAttemptId.toString());
+      writeLifecycleManagerConfToTask(jobConf, lcConf);
+    }
+  }
+
+  void writeLifecycleManagerConfToTask(JobConf conf, JobConf lcConf) throws 
CelebornIOException {
+    try {
+      FileSystem fs = new Cluster(conf).getFileSystem();
+      String jobDirStr = conf.get(MRJobConfig.MAPREDUCE_JOB_DIR);
+      Path celebornExtraConf = new Path(jobDirStr, 
HadoopUtils.MR_CELEBORN_CONF);

Review Comment:
   celebornConfPath



##########
client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.mapreduce.v2.app;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ipc.CallerContext;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.LifecycleManager;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.exception.CelebornIOException;
+import org.apache.celeborn.util.HadoopUtils;
+
+public class MRAppMasterWithCeleborn extends MRAppMaster {
+  private static final Logger logger = 
LoggerFactory.getLogger(MRAppMasterWithCeleborn.class);
+
+  public MRAppMasterWithCeleborn(
+      ApplicationAttemptId applicationAttemptId,
+      ContainerId containerId,
+      String nmHost,
+      int nmPort,
+      int nmHttpPort,
+      long appSubmitTime,
+      JobConf jobConf)
+      throws CelebornIOException {
+    super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, 
appSubmitTime);
+
+    int numReducers = jobConf.getInt(MRJobConfig.NUM_REDUCES, 0);
+    if (numReducers > 0) {
+      CelebornConf conf = HadoopUtils.fromYarnConf(jobConf);
+      LifecycleManager lifecycleManager =
+          new LifecycleManager(applicationAttemptId.toString(), conf);
+      String lcHost = lifecycleManager.getHost();
+      int lcPort = lifecycleManager.getPort();
+      logger.info("RMAppMaster initialized with {} {} {}", lcHost, lcPort, 
applicationAttemptId);
+      JobConf lcConf = new JobConf();
+      lcConf.clear();
+      lcConf.set(HadoopUtils.MR_CELEBORN_LC_HOST, lcHost);
+      lcConf.set(HadoopUtils.MR_CELEBORN_LC_PORT, lcPort + "");
+      lcConf.set(HadoopUtils.MR_CELEBORN_APPLICATION_ID, 
applicationAttemptId.toString());
+      writeLifecycleManagerConfToTask(jobConf, lcConf);
+    }
+  }
+
+  void writeLifecycleManagerConfToTask(JobConf conf, JobConf lcConf) throws 
CelebornIOException {
+    try {
+      FileSystem fs = new Cluster(conf).getFileSystem();
+      String jobDirStr = conf.get(MRJobConfig.MAPREDUCE_JOB_DIR);
+      Path celebornExtraConf = new Path(jobDirStr, 
HadoopUtils.MR_CELEBORN_CONF);
+
+      try (FSDataOutputStream out =
+          FileSystem.create(
+              fs, celebornExtraConf, new 
FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION))) {
+        lcConf.writeXml(out);
+      }
+      FileStatus status = fs.getFileStatus(celebornExtraConf);
+      long currentTs = status.getModificationTime();
+      String uri = fs.getUri() + Path.SEPARATOR + celebornExtraConf.toUri();
+      String files = conf.get(MRJobConfig.CACHE_FILES);
+      conf.set(MRJobConfig.CACHE_FILES, files == null ? uri : uri + "," + 
files);
+      String ts = conf.get(MRJobConfig.CACHE_FILE_TIMESTAMPS);
+      conf.set(
+          MRJobConfig.CACHE_FILE_TIMESTAMPS,
+          ts == null ? String.valueOf(currentTs) : currentTs + "," + ts);
+      String vis = conf.get(MRJobConfig.CACHE_FILE_VISIBILITIES);
+      conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, vis == null ? "false" : 
"false" + "," + vis);
+      long size = status.getLen();
+      String sizes = conf.get(MRJobConfig.CACHE_FILES_SIZES);
+      conf.set(
+          MRJobConfig.CACHE_FILES_SIZES, sizes == null ? String.valueOf(size) 
: size + "," + sizes);
+    } catch (InterruptedException | IOException e) {
+      logger.error("Upload extra conf exception", e);
+      throw new CelebornIOException("Upload extra conf exception ", e);
+    }
+  }
+
+  private static String getSysEnvAndValidateInputParam(String envName) throws 
IOException {
+    String value = System.getenv(envName);
+    if (value == null) {
+      String msg = envName + " is null";
+      logger.error(msg);
+      throw new CelebornIOException(msg);
+    }
+    return value;
+  }
+
+  public static void main(String[] args) {
+    JobConf rmAppConf = new JobConf(new YarnConfiguration());
+    rmAppConf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
+    try {
+      Thread.setDefaultUncaughtExceptionHandler(new 
YarnUncaughtExceptionHandler());
+      String containerIdStr =
+          
getSysEnvAndValidateInputParam(ApplicationConstants.Environment.CONTAINER_ID.name());
+      String nodeHostString =
+          
getSysEnvAndValidateInputParam(ApplicationConstants.Environment.NM_HOST.name());
+      String nodePortString =
+          
getSysEnvAndValidateInputParam(ApplicationConstants.Environment.NM_PORT.name());
+      String nodeHttpPortString =
+          
getSysEnvAndValidateInputParam(ApplicationConstants.Environment.NM_HTTP_PORT.name());
+      String appSubmitTimeStr = System.getenv("APP_SUBMIT_TIME_ENV");
+      getSysEnvAndValidateInputParam("APP_SUBMIT_TIME_ENV");

Review Comment:
   Should be `String appSubmitTimeStr = 
getSysEnvAndValidateInputParam("APP_SUBMIT_TIME_ENV");` here?



##########
client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.mapreduce.v2.app;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ipc.CallerContext;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.LifecycleManager;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.exception.CelebornIOException;
+import org.apache.celeborn.util.HadoopUtils;
+
+public class MRAppMasterWithCeleborn extends MRAppMaster {
+  private static final Logger logger = 
LoggerFactory.getLogger(MRAppMasterWithCeleborn.class);
+
+  public MRAppMasterWithCeleborn(
+      ApplicationAttemptId applicationAttemptId,
+      ContainerId containerId,
+      String nmHost,
+      int nmPort,
+      int nmHttpPort,
+      long appSubmitTime,
+      JobConf jobConf)
+      throws CelebornIOException {
+    super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, 
appSubmitTime);
+
+    int numReducers = jobConf.getInt(MRJobConfig.NUM_REDUCES, 0);
+    if (numReducers > 0) {
+      CelebornConf conf = HadoopUtils.fromYarnConf(jobConf);
+      LifecycleManager lifecycleManager =
+          new LifecycleManager(applicationAttemptId.toString(), conf);
+      String lcHost = lifecycleManager.getHost();
+      int lcPort = lifecycleManager.getPort();
+      logger.info("RMAppMaster initialized with {} {} {}", lcHost, lcPort, 
applicationAttemptId);
+      JobConf lcConf = new JobConf();
+      lcConf.clear();
+      lcConf.set(HadoopUtils.MR_CELEBORN_LC_HOST, lcHost);
+      lcConf.set(HadoopUtils.MR_CELEBORN_LC_PORT, lcPort + "");
+      lcConf.set(HadoopUtils.MR_CELEBORN_APPLICATION_ID, 
applicationAttemptId.toString());
+      writeLifecycleManagerConfToTask(jobConf, lcConf);
+    }
+  }
+
+  void writeLifecycleManagerConfToTask(JobConf conf, JobConf lcConf) throws 
CelebornIOException {
+    try {
+      FileSystem fs = new Cluster(conf).getFileSystem();
+      String jobDirStr = conf.get(MRJobConfig.MAPREDUCE_JOB_DIR);
+      Path celebornExtraConf = new Path(jobDirStr, 
HadoopUtils.MR_CELEBORN_CONF);
+
+      try (FSDataOutputStream out =
+          FileSystem.create(
+              fs, celebornExtraConf, new 
FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION))) {
+        lcConf.writeXml(out);
+      }
+      FileStatus status = fs.getFileStatus(celebornExtraConf);
+      long currentTs = status.getModificationTime();
+      String uri = fs.getUri() + Path.SEPARATOR + celebornExtraConf.toUri();
+      String files = conf.get(MRJobConfig.CACHE_FILES);
+      conf.set(MRJobConfig.CACHE_FILES, files == null ? uri : uri + "," + 
files);
+      String ts = conf.get(MRJobConfig.CACHE_FILE_TIMESTAMPS);
+      conf.set(
+          MRJobConfig.CACHE_FILE_TIMESTAMPS,
+          ts == null ? String.valueOf(currentTs) : currentTs + "," + ts);
+      String vis = conf.get(MRJobConfig.CACHE_FILE_VISIBILITIES);
+      conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, vis == null ? "false" : 
"false" + "," + vis);
+      long size = status.getLen();
+      String sizes = conf.get(MRJobConfig.CACHE_FILES_SIZES);
+      conf.set(
+          MRJobConfig.CACHE_FILES_SIZES, sizes == null ? String.valueOf(size) 
: size + "," + sizes);
+    } catch (InterruptedException | IOException e) {
+      logger.error("Upload extra conf exception", e);
+      throw new CelebornIOException("Upload extra conf exception ", e);
+    }
+  }
+
+  private static String getSysEnvAndValidateInputParam(String envName) throws 
IOException {
+    String value = System.getenv(envName);
+    if (value == null) {
+      String msg = envName + " is null";
+      logger.error(msg);
+      throw new CelebornIOException(msg);
+    }
+    return value;
+  }
+
+  public static void main(String[] args) {
+    JobConf rmAppConf = new JobConf(new YarnConfiguration());
+    rmAppConf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
+    try {
+      Thread.setDefaultUncaughtExceptionHandler(new 
YarnUncaughtExceptionHandler());
+      String containerIdStr =
+          
getSysEnvAndValidateInputParam(ApplicationConstants.Environment.CONTAINER_ID.name());
+      String nodeHostString =
+          
getSysEnvAndValidateInputParam(ApplicationConstants.Environment.NM_HOST.name());
+      String nodePortString =
+          
getSysEnvAndValidateInputParam(ApplicationConstants.Environment.NM_PORT.name());
+      String nodeHttpPortString =
+          
getSysEnvAndValidateInputParam(ApplicationConstants.Environment.NM_HTTP_PORT.name());
+      String appSubmitTimeStr = System.getenv("APP_SUBMIT_TIME_ENV");
+      getSysEnvAndValidateInputParam("APP_SUBMIT_TIME_ENV");
+      ContainerId containerId = ContainerId.fromString(containerIdStr);
+      ApplicationAttemptId applicationAttemptId = 
containerId.getApplicationAttemptId();
+      if (applicationAttemptId != null) {
+        CallerContext.setCurrent(
+            (new CallerContext.Builder(
+                    "mr_app_master_with_celeborn_" + 
applicationAttemptId.toString()))

Review Comment:
   `.toString()` is unnecessary



##########
client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornSortBasedPusher.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.ShuffleClient;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.unsafe.Platform;
+import org.apache.celeborn.common.util.Utils;
+
+public class CelebornSortBasedPusher<K, V> extends OutputStream {
+  final Logger logger = LoggerFactory.getLogger(CelebornSortBasedPusher.class);
+  private final int mapId;
+  private final int attempt;
+  private final int numMappers;
+  private final int numReducers;
+  private final ShuffleClient shuffleClient;
+  private final int maxIOBufferSize;
+  private final int spillIOBufferSize;
+  private final Serializer<K> kSer;
+  private final Serializer<V> vSer;
+  private final RawComparator<K> comparator;
+  private final AtomicReference<Exception> exception = new AtomicReference<>();
+  private final Counters.Counter mapOutputByteCounter;
+  private final Counters.Counter mapOutputRecordCounter;
+  private final Map<Integer, List<SerializedKV>> currentSerializedKVs;

Review Comment:
   better to rename to `partitionedKVs`



##########
client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapreduce.task.reduce.Shuffle.ShuffleError;
+import org.apache.hadoop.util.Progress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.ShuffleClient;
+import org.apache.celeborn.client.read.CelebornInputStream;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.identity.UserIdentifier;
+import org.apache.celeborn.reflect.DynConstructors;
+import org.apache.celeborn.reflect.DynMethods;
+import org.apache.celeborn.util.HadoopUtils;
+
+public class CelebornShuffleConsumer<K, V>
+    implements ShuffleConsumerPlugin<K, V>, ExceptionReporter {
+  private static final Logger logger = 
LoggerFactory.getLogger(CelebornShuffleConsumer.class);
+  private JobConf mrJobConf;
+  private MergeManager<K, V> merger;
+  private Throwable throwable = null;
+  private Progress copyPhase;
+  private TaskStatus taskStatus;
+  private org.apache.hadoop.mapreduce.TaskAttemptID reduceId;
+  private TaskUmbilicalProtocol umbilical;
+  private Reporter reporter;
+  private ShuffleClientMetrics metrics;
+  private Task reduceTask;
+  private ShuffleClient shuffleClient;
+
+  @Override
+  public void init(Context<K, V> context) {
+
+    reduceId = context.getReduceId();
+    mrJobConf = context.getJobConf();
+    JobConf celebornJobConf = new JobConf(HadoopUtils.MR_CELEBORN_CONF);
+
+    umbilical = context.getUmbilical();
+    reporter = context.getReporter();
+    try {
+      this.metrics = createMetrics(reduceId, mrJobConf);
+    } catch (Exception e) {
+      logger.error("Fatal error occurred, failed to get shuffle client 
metrics.", e);
+      reportException(e);
+    }
+    copyPhase = context.getCopyPhase();
+    taskStatus = context.getStatus();
+    reduceTask = context.getReduceTask();
+
+    String appId = celebornJobConf.get(HadoopUtils.MR_CELEBORN_APPLICATION_ID);
+    String lcHost = celebornJobConf.get(HadoopUtils.MR_CELEBORN_LC_HOST);
+    int lcPort = 
Integer.parseInt(celebornJobConf.get(HadoopUtils.MR_CELEBORN_LC_PORT));
+    logger.info("Reducer initialized with celeborn {} {} {}", appId, lcHost, 
lcPort);
+    CelebornConf celebornConf = HadoopUtils.fromYarnConf(mrJobConf);
+    shuffleClient =
+        ShuffleClient.get(
+            appId,
+            lcHost,
+            lcPort,
+            celebornConf,
+            new UserIdentifier(
+                celebornConf.quotaUserSpecificTenant(), 
celebornConf.quotaUserSpecificUserName()));
+    this.merger = createMergeManager(context);
+  }
+
+  // Merge mapOutput and spill in local disks if necessary
+  protected MergeManager<K, V> 
createMergeManager(ShuffleConsumerPlugin.Context context) {
+    return new MergeManagerImpl<K, V>(
+        reduceId,
+        mrJobConf,
+        context.getLocalFS(),
+        context.getLocalDirAllocator(),
+        reporter,
+        context.getCodec(),
+        context.getCombinerClass(),
+        context.getCombineCollector(),
+        context.getSpilledRecordsCounter(),
+        context.getReduceCombineInputCounter(),
+        context.getMergedMapOutputsCounter(),
+        this,
+        context.getMergePhase(),
+        context.getMapOutputFile());
+  }
+
+  private ShuffleClientMetrics createMetrics(
+      org.apache.hadoop.mapreduce.TaskAttemptID taskAttemptID, JobConf jobConf)
+      throws NoSuchMethodException {
+    // for hadoop 3
+    try {
+      return DynMethods.builder("create")
+          .impl(
+              ShuffleClientMetrics.class,
+              org.apache.hadoop.mapreduce.TaskAttemptID.class,
+              JobConf.class)
+          .buildStaticChecked()
+          .invoke(taskAttemptID, jobConf);
+    } catch (Exception e) {
+      // ignore this exception because the createMetrics might use hadoop2
+    }
+    // for hadoop 2
+    return DynConstructors.builder(ShuffleClientMetrics.class)
+        .hiddenImpl(new Class[] 
{org.apache.hadoop.mapreduce.TaskAttemptID.class, JobConf.class})
+        .buildChecked()
+        .invoke(null, taskAttemptID, jobConf);
+  }
+
+  @Override
+  public RawKeyValueIterator run() throws IOException, InterruptedException {

Review Comment:
   InterruptedException is not thrown



##########
client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.mapreduce.v2.app;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ipc.CallerContext;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.LifecycleManager;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.exception.CelebornIOException;
+import org.apache.celeborn.util.HadoopUtils;
+
+public class MRAppMasterWithCeleborn extends MRAppMaster {
+  private static final Logger logger = 
LoggerFactory.getLogger(MRAppMasterWithCeleborn.class);
+
+  public MRAppMasterWithCeleborn(
+      ApplicationAttemptId applicationAttemptId,
+      ContainerId containerId,
+      String nmHost,
+      int nmPort,
+      int nmHttpPort,
+      long appSubmitTime,
+      JobConf jobConf)
+      throws CelebornIOException {
+    super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, 
appSubmitTime);
+
+    int numReducers = jobConf.getInt(MRJobConfig.NUM_REDUCES, 0);
+    if (numReducers > 0) {
+      CelebornConf conf = HadoopUtils.fromYarnConf(jobConf);
+      LifecycleManager lifecycleManager =
+          new LifecycleManager(applicationAttemptId.toString(), conf);
+      String lcHost = lifecycleManager.getHost();
+      int lcPort = lifecycleManager.getPort();
+      logger.info("RMAppMaster initialized with {} {} {}", lcHost, lcPort, 
applicationAttemptId);
+      JobConf lcConf = new JobConf();
+      lcConf.clear();
+      lcConf.set(HadoopUtils.MR_CELEBORN_LC_HOST, lcHost);

Review Comment:
   MR_CELEBORN_LM_HOST/MR_CELEBORN_LM_PORT



##########
client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornMapOutputCollector.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.ShuffleClient;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.identity.UserIdentifier;
+import org.apache.celeborn.util.HadoopUtils;
+
+public class CelebornMapOutputCollector<K extends Object, V extends Object>
+    implements MapOutputCollector<K, V> {
+  private static final Logger logger = 
LoggerFactory.getLogger(CelebornMapOutputCollector.class);
+  private Class<K> keyClass;
+  private Class<V> valClass;
+  private Task.TaskReporter reporter;
+  private CelebornSortBasedPusher<K, V> celebornSortBasedPusher;
+  private int numReducers;
+
+  @Override
+  public void init(Context context) throws IOException {
+    JobConf jobConf = context.getJobConf();
+    reporter = context.getReporter();
+    keyClass = (Class<K>) jobConf.getMapOutputKeyClass();
+    valClass = (Class<V>) jobConf.getMapOutputValueClass();
+    context.getMapTask().getTaskID().getId();
+    numReducers = jobConf.getNumReduceTasks();
+
+    int IOBufferSize = jobConf.getInt(JobContext.IO_SORT_MB, 100);
+    // Java bytebuffer cannot be larger than Integer.MAX_VALUE
+    if ((IOBufferSize & 0x7FF) != IOBufferSize) {
+      throw new IOException("Invalid \"" + JobContext.IO_SORT_MB + "\": " + 
IOBufferSize);
+    }
+    jobConf.getNumReduceTasks();

Review Comment:
   This line seems unnecessary



##########
client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapreduce.task.reduce.Shuffle.ShuffleError;
+import org.apache.hadoop.util.Progress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.ShuffleClient;
+import org.apache.celeborn.client.read.CelebornInputStream;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.identity.UserIdentifier;
+import org.apache.celeborn.reflect.DynConstructors;
+import org.apache.celeborn.reflect.DynMethods;
+import org.apache.celeborn.util.HadoopUtils;
+
+public class CelebornShuffleConsumer<K, V>
+    implements ShuffleConsumerPlugin<K, V>, ExceptionReporter {
+  private static final Logger logger = 
LoggerFactory.getLogger(CelebornShuffleConsumer.class);
+  private JobConf mrJobConf;
+  private MergeManager<K, V> merger;
+  private Throwable throwable = null;
+  private Progress copyPhase;
+  private TaskStatus taskStatus;
+  private org.apache.hadoop.mapreduce.TaskAttemptID reduceId;
+  private TaskUmbilicalProtocol umbilical;
+  private Reporter reporter;
+  private ShuffleClientMetrics metrics;
+  private Task reduceTask;
+  private ShuffleClient shuffleClient;
+
+  @Override
+  public void init(Context<K, V> context) {
+
+    reduceId = context.getReduceId();
+    mrJobConf = context.getJobConf();
+    JobConf celebornJobConf = new JobConf(HadoopUtils.MR_CELEBORN_CONF);
+
+    umbilical = context.getUmbilical();
+    reporter = context.getReporter();
+    try {
+      this.metrics = createMetrics(reduceId, mrJobConf);
+    } catch (Exception e) {
+      logger.error("Fatal error occurred, failed to get shuffle client 
metrics.", e);
+      reportException(e);
+    }
+    copyPhase = context.getCopyPhase();
+    taskStatus = context.getStatus();
+    reduceTask = context.getReduceTask();
+
+    String appId = celebornJobConf.get(HadoopUtils.MR_CELEBORN_APPLICATION_ID);
+    String lcHost = celebornJobConf.get(HadoopUtils.MR_CELEBORN_LC_HOST);
+    int lcPort = 
Integer.parseInt(celebornJobConf.get(HadoopUtils.MR_CELEBORN_LC_PORT));
+    logger.info("Reducer initialized with celeborn {} {} {}", appId, lcHost, 
lcPort);
+    CelebornConf celebornConf = HadoopUtils.fromYarnConf(mrJobConf);
+    shuffleClient =
+        ShuffleClient.get(
+            appId,
+            lcHost,
+            lcPort,
+            celebornConf,
+            new UserIdentifier(
+                celebornConf.quotaUserSpecificTenant(), 
celebornConf.quotaUserSpecificUserName()));
+    this.merger = createMergeManager(context);
+  }
+
+  // Merge mapOutput and spill in local disks if necessary
+  protected MergeManager<K, V> 
createMergeManager(ShuffleConsumerPlugin.Context context) {

Review Comment:
   Seems this method is unnecessary since it's only used in one place



##########
client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornSortBasedPusher.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.ShuffleClient;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.unsafe.Platform;
+import org.apache.celeborn.common.util.Utils;
+
+public class CelebornSortBasedPusher<K, V> extends OutputStream {
+  final Logger logger = LoggerFactory.getLogger(CelebornSortBasedPusher.class);
+  private final int mapId;
+  private final int attempt;
+  private final int numMappers;
+  private final int numReducers;
+  private final ShuffleClient shuffleClient;
+  private final int maxIOBufferSize;
+  private final int spillIOBufferSize;
+  private final Serializer<K> kSer;
+  private final Serializer<V> vSer;
+  private final RawComparator<K> comparator;
+  private final AtomicReference<Exception> exception = new AtomicReference<>();
+  private final Counters.Counter mapOutputByteCounter;
+  private final Counters.Counter mapOutputRecordCounter;
+  private final Map<Integer, List<SerializedKV>> currentSerializedKVs;
+  private int writePos;
+  private byte[] serializedKV;
+  private final int maxPushDataSize;
+
+  public CelebornSortBasedPusher(
+      int numMappers,
+      int numReducers,
+      int mapId,
+      int attemptId,
+      Serializer<K> kSer,
+      Serializer<V> vSer,
+      int maxIOBufferSize,
+      int spillIOBufferSize,
+      RawComparator<K> comparator,
+      Counters.Counter mapOutputByteCounter,
+      Counters.Counter mapOutputRecordCounter,
+      ShuffleClient shuffleClient,
+      CelebornConf celebornConf) {
+    this.numMappers = numMappers;
+    this.numReducers = numReducers;
+    this.mapId = mapId;
+    this.attempt = attemptId;
+    this.kSer = kSer;
+    this.vSer = vSer;
+    this.maxIOBufferSize = maxIOBufferSize;
+    this.spillIOBufferSize = spillIOBufferSize;
+    this.mapOutputByteCounter = mapOutputByteCounter;
+    this.mapOutputRecordCounter = mapOutputRecordCounter;
+    this.comparator = comparator;
+    this.shuffleClient = shuffleClient;
+    currentSerializedKVs = new HashMap<>();
+    serializedKV = new byte[maxIOBufferSize];
+    maxPushDataSize = (int) celebornConf.clientMrMaxPushData();
+    logger.info(
+        "Sort based push initialized with"
+            + " numMappers:{} numReducers:{} mapId:{} attemptId:{}"
+            + " maxIOBufferSize:{} spillIOBufferSize:{}",
+        numMappers,
+        numReducers,
+        mapId,
+        attemptId,
+        maxIOBufferSize,
+        spillIOBufferSize);
+    try {
+      kSer.open(this);
+      vSer.open(this);
+    } catch (IOException e) {
+      exception.compareAndSet(null, e);
+    }
+  }
+
+  public void insert(K key, V value, int partition) {
+    try {
+      if (writePos >= spillIOBufferSize) {
+        // needs to sort and flush data
+        if (logger.isDebugEnabled()) {
+          logger.debug(
+              "Data is large enough {}/{}/{}, trigger sort and flush",
+              Utils.bytesToString(writePos),
+              Utils.bytesToString(spillIOBufferSize),
+              Utils.bytesToString(maxIOBufferSize));
+        }
+        synchronized (this) {
+          sortKVs();
+          sendKVAndUpdateWritePos();
+        }
+      }
+      int dataLen = insertRecordInternal(key, value, partition);
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "Sort based pusher insert into partition:{} with {} bytes", 
partition, dataLen);
+      }
+      mapOutputRecordCounter.increment(1);
+      mapOutputByteCounter.increment(dataLen);
+    } catch (IOException e) {
+      exception.compareAndSet(null, e);
+    }
+  }
+
+  private void sendKVAndUpdateWritePos() throws IOException {
+    for (Map.Entry<Integer, List<SerializedKV>> partitionKVEntry :
+        currentSerializedKVs.entrySet()) {
+      List<SerializedKV> kvs = partitionKVEntry.getValue();
+      synchronized (kvs) {
+        int partition = partitionKVEntry.getKey();
+        List<SerializedKV> localKVs = new ArrayList<>();
+        int partitionKVTotalLen = 0;
+        // process buffers for specific partition
+        for (SerializedKV kv : kvs) {
+          partitionKVTotalLen += kv.kLen + kv.vLen;
+          localKVs.add(kv);
+          if (partitionKVTotalLen > maxPushDataSize) {
+            // limit max size of pushdata to avoid possible memory issue in 
Celeborn worker
+            // data layout
+            // pushdata header (16) + pushDataLen(4) +
+            // [varKeyLen+varValLen+serializedRecord(x)][...]
+            sendSortedBuffersPartition(partition, localKVs, 
partitionKVTotalLen);
+            localKVs.clear();
+            partitionKVTotalLen = 0;
+          }
+        }
+        if (!localKVs.isEmpty()) {
+          sendSortedBuffersPartition(partition, localKVs, partitionKVTotalLen);
+        }
+        kvs.clear();
+      }
+    }
+    // all data sent
+    currentSerializedKVs.clear();
+    writePos = 0;
+  }
+
+  private void sendSortedBuffersPartition(
+      int partition, List<SerializedKV> localKVs, int partitionKVTotalLen) 
throws IOException {

Review Comment:
   We'd better add metrics showing `partitionKVTotalLen`. We can do this in 
later PRs.



##########
client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleFetcher.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.util.Progress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.read.CelebornInputStream;
+import org.apache.celeborn.common.exception.CelebornIOException;
+import org.apache.celeborn.common.unsafe.Platform;
+
+public class CelebornShuffleFetcher<K, V> {
+  private static final Logger logger = 
LoggerFactory.getLogger(CelebornShuffleFetcher.class);
+  private final TaskAttemptID reduceId;
+  private final Reporter reporter;
+  private final TaskStatus status;
+  private final MergeManager<K, V> merger;
+  private final Progress progress;
+  private final ShuffleClientMetrics metrics;
+  private final CelebornInputStream celebornInputStream;
+  private volatile boolean stopped = false;
+  private int uniqueMapId = 0;
+  private final Counters.Counter ioErrs;
+  private boolean hasPendingData = false;
+  private long inputShuffleSize;
+  private byte[] shuffleData;
+
+  public CelebornShuffleFetcher(
+      TaskAttemptID reduceId,
+      TaskStatus status,
+      MergeManager<K, V> merger,
+      Progress progress,
+      Reporter reporter,
+      ShuffleClientMetrics metrics,
+      CelebornInputStream input) {
+    this.reduceId = reduceId;
+    this.reporter = reporter;
+    this.status = status;
+    this.merger = merger;
+    this.progress = progress;
+    this.metrics = metrics;
+    this.celebornInputStream = input;
+
+    ioErrs = reporter.getCounter("Shuffle Errors", "IO_ERROR");
+  }
+
+  // fetch all push data and merge
+  public void fetchAndMerge() {
+    while (!stopped) {
+      try {
+        // If merge is on, block
+        merger.waitForResource();
+        // Do shuffle
+        metrics.threadBusy();
+        // read blocks
+        fetchToLocalAndMerge();
+      } catch (Exception e) {
+        logger.error("Celeborn shuffle fetcher fetch data failed.", e);
+      } finally {
+        metrics.threadFree();
+      }
+    }
+  }
+
+  private byte[] getShuffleBlock() throws IOException {
+    // get len
+    byte[] header = new byte[4];
+    int count = celebornInputStream.read(header);
+    if (count == -1) {
+      stopped = true;
+      return null;
+    }
+    while (count != header.length) {
+      count += celebornInputStream.read(header, count, 4 - count);
+    }
+
+    // get data
+    int blockLen = Platform.getInt(header, Platform.BYTE_ARRAY_OFFSET);
+    inputShuffleSize += blockLen;
+    byte[] shuffleData = new byte[blockLen];
+    count = 0;

Review Comment:
   this line is unnecessary



##########
client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleFetcher.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.util.Progress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.read.CelebornInputStream;
+import org.apache.celeborn.common.exception.CelebornIOException;
+import org.apache.celeborn.common.unsafe.Platform;
+
+public class CelebornShuffleFetcher<K, V> {
+  private static final Logger logger = 
LoggerFactory.getLogger(CelebornShuffleFetcher.class);
+  private final TaskAttemptID reduceId;
+  private final Reporter reporter;
+  private final TaskStatus status;
+  private final MergeManager<K, V> merger;
+  private final Progress progress;
+  private final ShuffleClientMetrics metrics;
+  private final CelebornInputStream celebornInputStream;
+  private volatile boolean stopped = false;
+  private int uniqueMapId = 0;
+  private final Counters.Counter ioErrs;
+  private boolean hasPendingData = false;
+  private long inputShuffleSize;
+  private byte[] shuffleData;
+
+  public CelebornShuffleFetcher(
+      TaskAttemptID reduceId,
+      TaskStatus status,
+      MergeManager<K, V> merger,
+      Progress progress,
+      Reporter reporter,
+      ShuffleClientMetrics metrics,
+      CelebornInputStream input) {
+    this.reduceId = reduceId;
+    this.reporter = reporter;
+    this.status = status;
+    this.merger = merger;
+    this.progress = progress;
+    this.metrics = metrics;
+    this.celebornInputStream = input;
+
+    ioErrs = reporter.getCounter("Shuffle Errors", "IO_ERROR");
+  }
+
+  // fetch all push data and merge
+  public void fetchAndMerge() {
+    while (!stopped) {
+      try {
+        // If merge is on, block
+        merger.waitForResource();
+        // Do shuffle
+        metrics.threadBusy();
+        // read blocks
+        fetchToLocalAndMerge();
+      } catch (Exception e) {
+        logger.error("Celeborn shuffle fetcher fetch data failed.", e);
+      } finally {
+        metrics.threadFree();
+      }
+    }
+  }
+
+  private byte[] getShuffleBlock() throws IOException {
+    // get len
+    byte[] header = new byte[4];
+    int count = celebornInputStream.read(header);
+    if (count == -1) {
+      stopped = true;
+      return null;
+    }
+    while (count != header.length) {
+      count += celebornInputStream.read(header, count, 4 - count);
+    }
+
+    // get data
+    int blockLen = Platform.getInt(header, Platform.BYTE_ARRAY_OFFSET);
+    inputShuffleSize += blockLen;
+    byte[] shuffleData = new byte[blockLen];
+    count = 0;
+    count = celebornInputStream.read(shuffleData);
+    while (count != shuffleData.length) {
+      count += celebornInputStream.read(shuffleData, count, blockLen - count);
+      if (count == -1) {
+        // read shuffle is done.
+        stopped = true;
+        throw new CelebornIOException("Read mr shuffle failed.");
+      }
+    }
+    return shuffleData;
+  }
+
+  private void fetchToLocalAndMerge() throws IOException {
+    if (!hasPendingData) {
+      shuffleData = getShuffleBlock();
+    }
+
+    if (shuffleData != null) {
+      // start to merge
+      if (wrapMapOutput(shuffleData)) {
+        hasPendingData = false;
+      } else {
+        return;
+      }
+
+      updateStatus();
+      reporter.progress();
+    } else {
+      celebornInputStream.close();
+      metrics.inputBytes(inputShuffleSize);
+      logger.info("reduce task {} read {} bytes", reduceId, inputShuffleSize);
+      stopped = true;
+    }
+  }
+
+  private boolean wrapMapOutput(byte[] shuffleData) throws IOException {
+    // treat push data as mapoutput
+    TaskAttemptID mapId =
+        new TaskAttemptID(new TaskID(reduceId.getJobID(), TaskType.MAP, 
uniqueMapId++), 0);
+    MapOutput<K, V> mapOutput = null;
+    try {
+      mapOutput = merger.reserve(mapId, shuffleData.length, 0);
+    } catch (IOException ioe) {
+      ioErrs.increment(1);
+      throw ioe;
+    }
+    if (mapOutput == null) {
+      logger.info(
+          "Celeborn fetcher returned status wait because reserve buffer for 
shuffle get null");
+      hasPendingData = true;
+      return false;
+    }
+
+    // write data to mapOutput
+    try {
+      writeShuffle(mapOutput, shuffleData);
+      // let the merger knows this block is ready for merging
+      mapOutput.commit();
+    } catch (Throwable t) {
+      ioErrs.increment(1);
+      mapOutput.abort();
+      throw new CelebornIOException(
+          "Reduce: {} "
+              + reduceId
+              + " fetch failed to {} "
+              + mapOutput.getClass().getSimpleName()
+              + " due to: {} "
+              + t.getClass().getName());
+    }
+    return true;
+  }
+
+  private Decompressor getDecompressor(InMemoryMapOutput inMemoryMapOutput)
+      throws CelebornIOException {
+    try {
+      Class clazz = Class.forName(InMemoryMapOutput.class.getName());
+      Field deCompressorField = clazz.getDeclaredField("decompressor");
+      deCompressorField.setAccessible(true);
+      return (Decompressor) deCompressorField.get(inMemoryMapOutput);
+    } catch (Exception e) {
+      throw new CelebornIOException("Get Decompressor fail " + e.getMessage());
+    }
+  }
+
+  private void writeShuffle(MapOutput mapOutput, byte[] shuffle) throws 
CelebornIOException {
+    if (mapOutput instanceof InMemoryMapOutput) {
+      InMemoryMapOutput inMemoryMapOutput = (InMemoryMapOutput) mapOutput;
+      CodecPool.returnDecompressor(getDecompressor(inMemoryMapOutput));
+      byte[] memory = inMemoryMapOutput.getMemory();
+      System.arraycopy(shuffle, 0, memory, 0, shuffle.length);
+    } else if (mapOutput instanceof OnDiskMapOutput) {

Review Comment:
   What will happen when memory is not enough to hold all shuffle data, will it 
be spilled to disk or throw Exception? I think we need to verify this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to