This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 8948df17f [CELEBORN-1733] Support ordered grouped kv input for Tez
8948df17f is described below

commit 8948df17f9d52a5b4b61cb0a5ab8c69ae020d09a
Author: hongguangwei <[email protected]>
AuthorDate: Fri Dec 6 10:54:44 2024 +0800

    [CELEBORN-1733] Support ordered grouped kv input for Tez
    
    ### What changes were proposed in this pull request?
    1. Add CelebornOrderedGroupedKVInput and CelebornOrderedGroupedInputLegacy
    2. CelebornScheduler extends ShuffleScheduler to reduce redundant code
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #2972 from GH-Gloway/1733.
    
    Authored-by: hongguangwei <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../celeborn/tez/plugin/util/ChecksumUtils.java    |  41 +++
 .../shuffle/orderedgrouped/CelebornScheduler.java  | 364 +++++++++++++++++++++
 .../shuffle/orderedgrouped/CelebornShuffle.java    |  87 +++++
 .../orderedgrouped/CelebornTezBypassWriter.java    |  85 +++++
 .../CelebornTezShuffleDataFetcher.java             | 248 ++++++++++++++
 .../input/CelebornOrderedGroupedInputLegacy.java   |  84 +++++
 .../input/CelebornOrderedGroupedKVInput.java       | 354 ++++++++++++++++++++
 7 files changed, 1263 insertions(+)

diff --git 
a/client-tez/tez/src/main/java/org/apache/celeborn/tez/plugin/util/ChecksumUtils.java
 
b/client-tez/tez/src/main/java/org/apache/celeborn/tez/plugin/util/ChecksumUtils.java
new file mode 100644
index 000000000..d1a9dff6a
--- /dev/null
+++ 
b/client-tez/tez/src/main/java/org/apache/celeborn/tez/plugin/util/ChecksumUtils.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.tez.plugin.util;
+
+import java.util.zip.CRC32;
+
+public class ChecksumUtils {
+
+  private static final int LENGTH_PER_CRC = 4 * 1024;
+
+  public static long getCrc32(byte[] buf) {
+    return getCrc32(buf, 0, buf.length);
+  }
+
+  public static long getCrc32(byte[] buf, int offset, int length) {
+    CRC32 crc32 = new CRC32();
+
+    for (int i = 0; i < length; ) {
+      int len = Math.min(LENGTH_PER_CRC, length - i);
+      crc32.update(buf, i + offset, len);
+      i += len;
+    }
+
+    return crc32.getValue();
+  }
+}
diff --git 
a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/CelebornScheduler.java
 
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/CelebornScheduler.java
new file mode 100644
index 000000000..d2dd3fa90
--- /dev/null
+++ 
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/CelebornScheduler.java
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software 
distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing 
permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import static 
org.apache.celeborn.tez.plugin.util.CelebornTezUtils.getParentPrivateField;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.tez.common.CallableWithNdc;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.CelebornTezReader;
+import org.apache.celeborn.client.ShuffleClient;
+
+class CelebornScheduler extends ShuffleScheduler {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ShuffleScheduler.class);
+
+  private final AtomicBoolean isShutdown;
+  private final Random random = new Random(System.currentTimeMillis());
+
+  private final String srcNameTrimmed;
+
+  private final int numFetchers;
+  private final ListeningExecutorService fetcherExecutor;
+  private final Set<CelebornTezShuffleDataFetcher> celebornRunningFetchers =
+      Collections.newSetFromMap(new 
ConcurrentHashMap<CelebornTezShuffleDataFetcher, Boolean>());
+  private volatile Thread shuffleSchedulerThread = null;
+  private final MergeManager mergeManager;
+  private final InputContext inputContext;
+  private final ExceptionReporter exceptionReporter;
+  private final int numInputs;
+
+  // celeborn
+  private final int shuffleId;
+  private final ApplicationAttemptId applicationAttemptId;
+  private final ShuffleClient shuffleClient;
+  private final Map<Integer, MapHost> runningRssPartitionMap = new HashMap<>();
+
+  private final Set<Integer> successRssPartitionSet = 
Sets.newConcurrentHashSet();
+  private final Set<Integer> allRssPartition = Sets.newConcurrentHashSet();
+
+  private final Map<Integer, Set<InputAttemptIdentifier>> 
partitionIdToSuccessMapTaskAttempts =
+      new HashMap<>();
+  final Map<Integer, Set<TezTaskID>> partitionIdToSuccessTezTasks = new 
HashMap<>();
+  private final TezCounter skippedInputCounter;
+
+  public CelebornScheduler(
+      InputContext inputContext,
+      Configuration conf,
+      int numberOfInputs,
+      ExceptionReporter exceptionReporter,
+      ShuffleClient shuffleClient,
+      MergeManager mergeManager,
+      FetchedInputAllocatorOrderedGrouped allocator,
+      long startTime,
+      CompressionCodec codec,
+      boolean ifileReadAhead,
+      int ifileReadAheadLength,
+      String srcNameTrimmed,
+      int shuffleId,
+      ApplicationAttemptId applicationAttemptId)
+      throws IOException {
+    super(
+        inputContext,
+        conf,
+        numberOfInputs,
+        exceptionReporter,
+        mergeManager,
+        allocator,
+        startTime,
+        codec,
+        ifileReadAhead,
+        ifileReadAheadLength,
+        srcNameTrimmed);
+    this.inputContext = inputContext;
+    this.exceptionReporter = exceptionReporter;
+    this.srcNameTrimmed = srcNameTrimmed;
+    this.shuffleClient = shuffleClient;
+    this.shuffleId = shuffleId;
+    this.applicationAttemptId = applicationAttemptId;
+    this.mergeManager = mergeManager;
+    this.numInputs = numberOfInputs;
+    this.numFetchers = (int) getParentPrivateField(this, "numFetchers");
+    this.fetcherExecutor =
+        (ListeningExecutorService) getParentPrivateField(this, 
"fetcherExecutor");
+    this.isShutdown = (AtomicBoolean) getParentPrivateField(this, 
"isShutdown");
+    this.skippedInputCounter = (TezCounter) getParentPrivateField(this, 
"skippedInputCounter");
+  }
+
+  public void start() throws Exception {
+    shuffleSchedulerThread = Thread.currentThread();
+    mergeManager.setupParentThread(shuffleSchedulerThread);
+    CelebornShuffleSchedulerCallable schedulerCallable = new 
CelebornShuffleSchedulerCallable();
+    schedulerCallable.call();
+  }
+
+  private boolean allInputTaskAttemptDone() {
+    return (this.partitionIdToSuccessTezTasks.values().stream().mapToInt(s -> 
s.size()).sum()
+            + skippedInputCounter.getValue())
+        == numInputs;
+  }
+
+  private boolean isAllInputFetched() {
+    return allInputTaskAttemptDone() && (successRssPartitionSet.size() >= 
allRssPartition.size());
+  }
+
+  public synchronized void addKnownMapOutput(
+      String inputHostName, int port, int partitionId, 
CompositeInputAttemptIdentifier srcAttempt) {
+
+    allRssPartition.add(partitionId);
+    Set<InputAttemptIdentifier> inputAttemptIdentifiers =
+        partitionIdToSuccessMapTaskAttempts.computeIfAbsent(partitionId, id -> 
new HashSet<>());
+    String pathComponent = srcAttempt.getPathComponent();
+    TezTaskAttemptID tezTaskAttemptId =
+        TezTaskAttemptID.fromString(pathComponent.substring(0, 
pathComponent.length() - 6));
+    partitionIdToSuccessTezTasks.putIfAbsent(partitionId, new HashSet<>());
+    
partitionIdToSuccessTezTasks.get(partitionId).add(tezTaskAttemptId.getTaskID());
+
+    inputAttemptIdentifiers.add(srcAttempt);
+    super.addKnownMapOutput(inputHostName, port, partitionId, srcAttempt);
+  }
+
+  public synchronized MapHost getHost() throws InterruptedException {
+    while (pendingHosts.isEmpty() && !isAllInputFetched()) {
+      LOG.debug("PendingHosts={}", pendingHosts);
+      waitAndNotifyProgress();
+    }
+
+    if (!pendingHosts.isEmpty()) {
+
+      MapHost host = null;
+      Iterator<MapHost> iter = pendingHosts.iterator();
+      int numToPick = random.nextInt(pendingHosts.size());
+      for (int i = 0; i <= numToPick; ++i) {
+        host = iter.next();
+      }
+
+      pendingHosts.remove(host);
+      host.markBusy();
+      return host;
+    } else {
+      return null;
+    }
+  }
+
+  private class CelebornShuffleSchedulerCallable extends CallableWithNdc<Void> 
{
+
+    @Override
+    protected Void callInternal() throws InterruptedException {
+      while (!isShutdown.get() && !isAllInputFetched()) {
+        synchronized (CelebornScheduler.this) {
+          while (!allInputTaskAttemptDone()
+              || ((celebornRunningFetchers.size() >= numFetchers || 
pendingHosts.isEmpty())
+                  && !isAllInputFetched())) {
+            try {
+              waitAndNotifyProgress();
+            } catch (InterruptedException e) {
+              if (isShutdown.get()) {
+                LOG.info(
+                    srcNameTrimmed
+                        + ": "
+                        + "Interrupted while waiting for fetchers to complete "
+                        + "and hasBeenShutdown. Breaking out of 
ShuffleSchedulerCallable loop");
+                Thread.currentThread().interrupt();
+                break;
+              } else {
+                throw e;
+              }
+            }
+          }
+        }
+
+        // Ensure there's memory available before scheduling the next Fetcher.
+        try {
+          // If merge is on, block
+          mergeManager.waitForInMemoryMerge();
+          // In case usedMemory > memorylimit, wait until some memory is 
released
+          mergeManager.waitForShuffleToMergeMemory();
+        } catch (InterruptedException e) {
+          if (isShutdown.get()) {
+            LOG.info(
+                srcNameTrimmed
+                    + ": "
+                    + "Interrupted while waiting for merge to complete and 
hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop");
+            Thread.currentThread().interrupt();
+            break;
+          } else {
+            throw e;
+          }
+        }
+
+        if (!isShutdown.get() && !isAllInputFetched()) {
+          synchronized (CelebornScheduler.this) {
+            int numFetchersToRun = numFetchers - 
celebornRunningFetchers.size();
+            int count = 0;
+            while (count < numFetchersToRun && !isShutdown.get() && 
!isAllInputFetched()) {
+              MapHost mapHost;
+              try {
+                mapHost = getHost(); // Leads to a wait.
+              } catch (InterruptedException e) {
+                if (isShutdown.get()) {
+                  LOG.info(
+                      srcNameTrimmed
+                          + ": "
+                          + "Interrupted while waiting for host and 
hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop");
+                  Thread.currentThread().interrupt();
+                  break;
+                } else {
+                  throw e;
+                }
+              }
+              if (mapHost == null) {
+                break; // Check for the exit condition.
+              }
+              LOG.debug("{}: Processing pending host: {}", srcNameTrimmed, 
mapHost);
+              if (!isShutdown.get()) {
+                count++;
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug(
+                      srcNameTrimmed + ": " + "Scheduling fetch for inputHost: 
{}",
+                      mapHost.getHostIdentifier() + ":" + 
mapHost.getPartitionId());
+                }
+
+                if (isFirstRssPartitionFetch(mapHost)) {
+                  CelebornTezShuffleDataFetcher celebornTezShuffleDataFetcher =
+                      constructCelebornFetcherForPartition(mapHost);
+
+                  celebornRunningFetchers.add(celebornTezShuffleDataFetcher);
+                  ListenableFuture<Void> future =
+                      fetcherExecutor.submit(celebornTezShuffleDataFetcher);
+                  Futures.addCallback(
+                      future,
+                      new FetchFutureCallback(celebornTezShuffleDataFetcher),
+                      MoreExecutors.directExecutor());
+                } else {
+                  for (int i = 0; i < mapHost.getAndClearKnownMaps().size(); 
i++) {
+                    remainingMaps.decrementAndGet();
+                  }
+                  LOG.info(
+                      "Partition was fetched, remainingMaps desc, now 
value:{}",
+                      remainingMaps.get());
+                }
+              }
+            }
+          }
+        }
+      }
+      LOG.info(
+          "Shutting down FetchScheduler for input: {}, wasInterrupted={}",
+          srcNameTrimmed,
+          Thread.currentThread().isInterrupted());
+      if (!fetcherExecutor.isShutdown()) {
+        fetcherExecutor.shutdownNow();
+      }
+      return null;
+    }
+
+    private synchronized boolean isFirstRssPartitionFetch(MapHost mapHost) {
+      Integer partitionId = mapHost.getPartitionId();
+      LOG.info(
+          "Check isFirstCelebornPartitionFetch, mapHost:{},partitionId:{}", 
mapHost, partitionId);
+
+      if (runningRssPartitionMap.containsKey(partitionId)
+          || successRssPartitionSet.contains(partitionId)) {
+        return false;
+      }
+      runningRssPartitionMap.put(partitionId, mapHost);
+      return true;
+    }
+
+    private CelebornTezShuffleDataFetcher 
constructCelebornFetcherForPartition(MapHost mapHost) {
+      int partitionId = mapHost.getPartitionId();
+      CelebornTezReader reader =
+          new CelebornTezReader(
+              shuffleClient, shuffleId, partitionId, 
applicationAttemptId.getAttemptId());
+      return new CelebornTezShuffleDataFetcher(
+          
partitionIdToSuccessMapTaskAttempts.get(mapHost.getPartitionId()).iterator().next(),
+          mapHost.getPartitionId(),
+          mergeManager,
+          inputContext.getCounters(),
+          reader,
+          exceptionReporter);
+    }
+  }
+
+  private synchronized void waitAndNotifyProgress() throws 
InterruptedException {
+    inputContext.notifyProgress();
+    wait(1000);
+  }
+
+  private class FetchFutureCallback implements FutureCallback<Void> {
+
+    private final CelebornTezShuffleDataFetcher fetcherOrderedGrouped;
+    private final Integer partitionId;
+
+    public FetchFutureCallback(CelebornTezShuffleDataFetcher 
fetcherOrderedGrouped) {
+      this.fetcherOrderedGrouped = fetcherOrderedGrouped;
+      this.partitionId = fetcherOrderedGrouped.getPartitionId();
+    }
+
+    private void doBookKeepingForFetcherComplete() {
+      synchronized (CelebornScheduler.this) {
+        celebornRunningFetchers.remove(fetcherOrderedGrouped);
+        CelebornScheduler.this.notifyAll();
+      }
+    }
+
+    @Override
+    public void onSuccess(Void result) {
+      fetcherOrderedGrouped.shutDown();
+      if (isShutdown.get()) {
+        LOG.info(srcNameTrimmed + ": " + "Already shutdown. Ignoring fetch 
complete");
+      } else {
+        successRssPartitionSet.add(partitionId);
+        MapHost mapHost = runningRssPartitionMap.remove(partitionId);
+        if (mapHost != null) {
+          for (int i = 0; i < mapHost.getAndClearKnownMaps().size(); i++) {
+            remainingMaps.decrementAndGet();
+          }
+        }
+        doBookKeepingForFetcherComplete();
+      }
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      fetcherOrderedGrouped.shutDown();
+      if (isShutdown.get()) {
+        LOG.info(srcNameTrimmed + ": " + "Already shutdown. Ignoring fetch 
complete");
+      } else {
+        LOG.error(srcNameTrimmed + ": " + "Fetcher failed with error", t);
+        exceptionReporter.reportException(t);
+        doBookKeepingForFetcherComplete();
+      }
+    }
+  }
+}
diff --git 
a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/CelebornShuffle.java
 
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/CelebornShuffle.java
new file mode 100644
index 000000000..a1a15bf0f
--- /dev/null
+++ 
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/CelebornShuffle.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import static org.apache.celeborn.tez.plugin.util.CelebornTezUtils.*;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+
+import org.apache.celeborn.client.ShuffleClient;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.identity.UserIdentifier;
+import org.apache.celeborn.tez.plugin.util.CelebornTezUtils;
+
+public class CelebornShuffle extends Shuffle {
+  public CelebornShuffle(
+      InputContext inputContext,
+      Configuration conf,
+      int numInputs,
+      long initialMemoryAvailable,
+      ApplicationAttemptId applicationAttemptId)
+      throws IOException {
+    super(inputContext, conf, numInputs, initialMemoryAvailable);
+
+    String host = conf.get(TEZ_CELEBORN_LM_HOST);
+    int port = conf.getInt(TEZ_CELEBORN_LM_PORT, -1);
+    int shuffleId = conf.getInt(TEZ_SHUFFLE_ID, -1);
+    String appId = conf.get(TEZ_CELEBORN_APPLICATION_ID);
+    CelebornConf celebornConf = CelebornTezUtils.fromTezConfiguration(conf);
+    ShuffleClient shuffleClient =
+        ShuffleClient.get(
+            appId,
+            host,
+            port,
+            celebornConf,
+            new UserIdentifier(
+                celebornConf.quotaUserSpecificTenant(), 
celebornConf.quotaUserSpecificUserName()),
+            null);
+
+    long startTime = (long) getParentPrivateField(this, "startTime");
+    CompressionCodec codec = (CompressionCodec) getParentPrivateField(this, 
"codec");
+    boolean ifileReadAhead = (boolean) getParentPrivateField(this, 
"ifileReadAhead");
+    int ifileReadAheadLength = (int) getParentPrivateField(this, 
"ifileReadAheadLength");
+    String sourceDestNameTrimmed = (String) getParentPrivateField(this, 
"sourceDestNameTrimmed");
+    CelebornScheduler scheduler =
+        new CelebornScheduler(
+            inputContext,
+            conf,
+            numInputs,
+            this,
+            shuffleClient,
+            merger,
+            merger,
+            startTime,
+            codec,
+            ifileReadAhead,
+            ifileReadAheadLength,
+            sourceDestNameTrimmed,
+            shuffleId,
+            applicationAttemptId);
+    ShuffleInputEventHandlerOrderedGrouped eventHandler =
+        new ShuffleInputEventHandlerOrderedGrouped(
+            inputContext, scheduler, ShuffleUtils.isTezShuffleHandler(conf));
+    setParentPrivateField(this, "scheduler", scheduler);
+    setParentPrivateField(this, "eventHandler", eventHandler);
+  }
+}
diff --git 
a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/CelebornTezBypassWriter.java
 
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/CelebornTezBypassWriter.java
new file mode 100644
index 000000000..70dd77241
--- /dev/null
+++ 
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/CelebornTezBypassWriter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import com.google.common.primitives.Ints;
+import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
+import org.apache.tez.runtime.library.common.shuffle.MemoryFetchedInput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.exception.CelebornIOException;
+import org.apache.celeborn.tez.plugin.util.ChecksumUtils;
+
+// In Tez shuffle, MapOutput encapsulates the logic to fetch map task's output 
data via http.
+// So, in Celeborn, we should bypass this logic, and directly write data to 
MapOutput.
+public class CelebornTezBypassWriter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CelebornTezBypassWriter.class);
+  private static final byte[] HEADER = new byte[] {(byte) 'T', (byte) 'I', 
(byte) 'F', (byte) 0};
+
+  public static void write(MapOutput mapOutput, byte[] buffer) {
+    LOG.debug(
+        "CelebornTezBypassWriter write mapOutput, type:{}, buffer length:{}",
+        mapOutput.getType(),
+        buffer.length);
+    // Write and commit uncompressed data to MapOutput.
+    // In the majority of cases, merger allocates memory to accept data,
+    // but when data size exceeds the threshold, merger can also allocate disk.
+    // So, we should consider the two situations, respectively.
+    if (mapOutput.getType() == MapOutput.Type.MEMORY) {
+      byte[] memory = mapOutput.getMemory();
+      System.arraycopy(buffer, 0, memory, 0, buffer.length);
+    } else if (mapOutput.getType() == MapOutput.Type.DISK) {
+      throw new IllegalStateException(
+          "Celeborn map reduce client do not support OnDiskMapOutput. Try to 
increase mapreduce.reduce.shuffle.memory.limit.percent");
+    } else {
+      throw new IllegalStateException(
+          "Merger reserve unknown type of MapOutput: " + 
mapOutput.getClass().getCanonicalName());
+    }
+  }
+
+  public static void write(final FetchedInput fetchedInput, byte[] buffer) 
throws IOException {
+    LOG.debug(
+        "CelebornTezBypassWriter write mapOutput, type:{}, buffer length:{}",
+        fetchedInput.getType(),
+        buffer.length);
+    // Write and commit uncompressed data to MapOutput.
+    // In the majority of cases, merger allocates memory to accept data,
+    // but when data size exceeds the threshold, merger can also allocate disk.
+    // So, we should consider the two situations, respectively.
+    if (fetchedInput.getType() == FetchedInput.Type.MEMORY) {
+      byte[] memory = ((MemoryFetchedInput) fetchedInput).getBytes();
+      System.arraycopy(buffer, 0, memory, 0, buffer.length);
+    } else if (fetchedInput.getType() == FetchedInput.Type.DISK) {
+      OutputStream output = fetchedInput.getOutputStream();
+      output.write(HEADER);
+      output.write(buffer);
+      output.write(Ints.toByteArray((int) ChecksumUtils.getCrc32(buffer)));
+      output.flush();
+      output.close();
+    } else {
+      throw new CelebornIOException(
+          "Merger reserve unknown type of MapOutput: "
+              + fetchedInput.getClass().getCanonicalName());
+    }
+  }
+}
diff --git 
a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/CelebornTezShuffleDataFetcher.java
 
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/CelebornTezShuffleDataFetcher.java
new file mode 100644
index 000000000..bf8310c46
--- /dev/null
+++ 
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/CelebornTezShuffleDataFetcher.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.tez.common.CallableWithNdc;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.CelebornTezReader;
+import org.apache.celeborn.common.exception.CelebornIOException;
+
+public class CelebornTezShuffleDataFetcher extends CallableWithNdc<Void> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CelebornTezShuffleDataFetcher.class);
+
+  private enum ShuffleErrors {
+    IO_ERROR,
+    WRONG_LENGTH,
+    BAD_ID,
+    WRONG_MAP,
+    CONNECTION,
+    WRONG_REDUCE
+  }
+
+  private static final String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
+
+  private final TezCounter ioErrs;
+  private final MergeManager merger;
+
+  private long copyBlockCount = 0;
+  private volatile boolean stopped = false;
+
+  private final CelebornTezReader celebornTezReader;
+  private long readTime = 0;
+  private long decompressTime = 0;
+  private long serializeTime = 0;
+  private long waitTime = 0;
+  private long copyTime = 0; // the sum of readTime + decompressTime + 
serializeTime + waitTime
+  private final InputAttemptIdentifier inputAttemptIdentifier;
+  private static int uniqueMapId = 0;
+
+  private boolean hasPendingData = false;
+  private long startWait;
+  private int waitCount = 0;
+  private byte[] shuffleData = null;
+  private Integer partitionId;
+  private final ExceptionReporter exceptionReporter;
+
+  private final AtomicInteger issuedCnt = new AtomicInteger(0);
+
+  public CelebornTezShuffleDataFetcher(
+      InputAttemptIdentifier inputAttemptIdentifier,
+      Integer partitionId,
+      MergeManager merger,
+      TezCounters tezCounters,
+      CelebornTezReader celebornTezReader,
+      ExceptionReporter exceptionReporter) {
+    this.merger = merger;
+    this.partitionId = partitionId;
+    this.inputAttemptIdentifier = inputAttemptIdentifier;
+    this.exceptionReporter = exceptionReporter;
+    ioErrs = tezCounters.findCounter(SHUFFLE_ERR_GRP_NAME, 
ShuffleErrors.IO_ERROR.toString());
+    this.celebornTezReader = celebornTezReader;
+
+    LOG.info(
+        "CelebornTezShuffleDataFetcher, partitionId:{}, 
inputAttemptIdentifier:{}.",
+        this.partitionId,
+        this.inputAttemptIdentifier);
+  }
+
+  @Override
+  public Void callInternal() {
+    try {
+      celebornTezReader.init();
+      fetchAllRssBlocks();
+    } catch (InterruptedException ie) {
+      // might not be respected when fetcher is in progress / server is busy.  
TEZ-711
+      // Set the status back
+      LOG.warn(ie.getMessage(), ie);
+      Thread.currentThread().interrupt();
+      return null;
+    } catch (Throwable t) {
+      LOG.warn(t.getMessage(), t);
+      exceptionReporter.reportException(t);
+      // Shuffle knows how to deal with failures post shutdown via the 
onFailure hook
+    }
+    return null;
+  }
+
+  public void fetchAllRssBlocks() throws IOException, InterruptedException {
+    while (!stopped) {
+      try {
+        // If merge is on, block
+        merger.waitForInMemoryMerge();
+        // Do shuffle
+        copyFromRssServer();
+      } catch (Exception e) {
+        LOG.warn(e.getMessage(), e);
+        throw e;
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public void copyFromRssServer() throws IOException {
+    // fetch a block
+    if (!hasPendingData) {
+      final long startFetch = System.currentTimeMillis();
+      shuffleData = celebornTezReader.getShuffleBlock();
+      long fetchDuration = System.currentTimeMillis() - startFetch;
+      readTime += fetchDuration;
+    }
+
+    if (shuffleData != null) {
+      // start to merge
+      final long startSerialization = System.currentTimeMillis();
+      if (issueMapOutputMerge()) {
+        long serializationDuration = System.currentTimeMillis() - 
startSerialization;
+        serializeTime += serializationDuration;
+        // if reserve successes, reset status for next fetch
+        if (hasPendingData) {
+          waitTime += System.currentTimeMillis() - startWait;
+        }
+        hasPendingData = false;
+      } else {
+        // if reserve fail, return and wait
+        startWait = System.currentTimeMillis();
+        return;
+      }
+
+      // update some status
+      copyBlockCount++;
+      copyTime = readTime + decompressTime + serializeTime + waitTime;
+      updateStatus();
+    } else {
+      // finish reading data, close related reader and check data consistent
+      celebornTezReader.close();
+      LOG.info(
+          "Reduce task "
+              + inputAttemptIdentifier
+              + " read block cnt: "
+              + copyBlockCount
+              + " cost "
+              + readTime
+              + " ms to fetch and "
+              + decompressTime
+              + " ms to decompress "
+              + serializeTime
+              + " ms to serialize and "
+              + waitTime
+              + " ms to wait resource"
+              + ", copy time:"
+              + copyTime);
+      stopFetch();
+    }
+  }
+
+  public Integer getPartitionId() {
+    return partitionId;
+  }
+
+  private boolean issueMapOutputMerge() throws IOException {
+    // Allocate a MapOutput (either in-memory or on-disk) to put uncompressed 
block
+    // In Rss, a MapOutput is sent as multiple blocks, so the reducer needs to
+    // treat each "block" as a faked "mapout".
+    // To avoid name conflicts, we use getNextUniqueTaskAttemptID instead.
+    // It will generate a unique TaskAttemptID(increased_seq++, 0).
+    InputAttemptIdentifier uniqueInputAttemptIdentifier = 
getNextUniqueInputAttemptIdentifier();
+    MapOutput mapOutput = null;
+    try {
+      issuedCnt.incrementAndGet();
+      mapOutput = merger.reserve(uniqueInputAttemptIdentifier, 
shuffleData.length, 0, 1);
+    } catch (IOException ioe) {
+      // kill this reduce attempt
+      ioErrs.increment(1);
+      throw ioe;
+    }
+    // Check if we can shuffle *now* ...
+    if (mapOutput == null || mapOutput.getType() == MapOutput.Type.WAIT) {
+      LOG.info("RssMRFetcher - MergeManager returned status WAIT ...");
+      // Not an error but wait to process data.
+      // Use a retry flag to avoid re-fetch and re-uncompress.
+      hasPendingData = true;
+      waitCount++;
+      return false;
+    }
+
+    // write data to mapOutput
+    try {
+      CelebornTezBypassWriter.write(mapOutput, shuffleData);
+      // let the merger knows this block is ready for merging
+      mapOutput.commit();
+    } catch (Throwable t) {
+      ioErrs.increment(1);
+      mapOutput.abort();
+      throw new CelebornIOException(
+          "Reduce: "
+              + inputAttemptIdentifier
+              + " cannot write block to "
+              + mapOutput.getClass().getSimpleName()
+              + " due to: "
+              + t.getClass().getName());
+    }
+    return true;
+  }
+
+  private InputAttemptIdentifier getNextUniqueInputAttemptIdentifier() {
+    return new InputAttemptIdentifier(uniqueMapId++, 0);
+  }
+
+  private void updateStatus() {}
+
+  @VisibleForTesting
+  public int getRetryCount() {
+    return waitCount;
+  }
+
+  private void stopFetch() {
+    LOG.info("CelebornTezShuffleDataFetcher stop fetch");
+    stopped = true;
+  }
+
+  public void shutDown() {
+    stopFetch();
+    LOG.info("CelebornTezShuffleDataFetcher shutdown");
+  }
+}
diff --git 
a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornOrderedGroupedInputLegacy.java
 
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornOrderedGroupedInputLegacy.java
new file mode 100644
index 000000000..f098a4284
--- /dev/null
+++ 
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornOrderedGroupedInputLegacy.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software 
distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing 
permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.Progress;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+
+@Private
+public class CelebornOrderedGroupedInputLegacy extends 
CelebornOrderedGroupedKVInput {
+
+  private final Progress progress = new Progress();
+
+  public CelebornOrderedGroupedInputLegacy(InputContext inputContext, int 
numPhysicalInputs) {
+    super(inputContext, numPhysicalInputs);
+  }
+
+  @Private
+  public TezRawKeyValueIterator getIterator()
+      throws IOException, InterruptedException, TezException {
+    // wait for input so that iterator is available
+    synchronized (this) {
+      if (getNumPhysicalInputs() == 0) {
+        return new TezRawKeyValueIterator() {
+          @Override
+          public DataInputBuffer getKey() throws IOException {
+            throw new RuntimeException("No data available in Input");
+          }
+
+          @Override
+          public DataInputBuffer getValue() throws IOException {
+            throw new RuntimeException("No data available in Input");
+          }
+
+          @Override
+          public boolean next() throws IOException {
+            return false;
+          }
+
+          @Override
+          public boolean hasNext() throws IOException {
+            return false;
+          }
+
+          @Override
+          public void close() throws IOException {}
+
+          @Override
+          public Progress getProgress() {
+            progress.complete();
+            return progress;
+          }
+
+          @Override
+          public boolean isSameKey() throws IOException {
+            throw new UnsupportedOperationException("isSameKey is not 
supported");
+          }
+        };
+      }
+    }
+
+    waitForInputReady();
+    synchronized (this) {
+      return rawIter;
+    }
+  }
+}
diff --git 
a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornOrderedGroupedKVInput.java
 
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornOrderedGroupedKVInput.java
new file mode 100644
index 000000000..627d5f7a7
--- /dev/null
+++ 
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornOrderedGroupedKVInput.java
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software 
distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing 
permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.input;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.tez.common.Preconditions;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.api.ProgressFailedException;
+import org.apache.tez.runtime.library.api.IOInterruptedException;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
+import org.apache.tez.runtime.library.common.ValuesIterator;
+import 
org.apache.tez.runtime.library.common.shuffle.orderedgrouped.CelebornShuffle;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.Shuffle;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link CelebornOrderedGroupedKVInput} in a {@link AbstractLogicalInput} 
which shuffles
+ * intermediate sorted data, merges them and provides key/<values> to the 
consumer. This is
+ * typically used to bring one partition of a set of partitioned distributed 
data to one consumer.
+ * The shuffle operation brings all partitions to one place. These partitions 
are assumed to be
+ * sorted and are merged sorted to merge them into a single input view.
+ *
+ * <p>The Copy and Merge will be triggered by the initialization - which is 
handled by the Tez
+ * framework. Input is not consumable until the Copy and Merge are complete. 
Methods are provided to
+ * check for this, as well as to wait for completion. Attempting to get a 
reader on a non-complete
+ * input will block.
+ */
+@Public
+public class CelebornOrderedGroupedKVInput extends AbstractLogicalInput {
+
+  static final Logger LOG = 
LoggerFactory.getLogger(CelebornOrderedGroupedKVInput.class);
+
+  protected TezRawKeyValueIterator rawIter = null;
+  protected Configuration conf;
+  protected Shuffle shuffle;
+  private ApplicationAttemptId applicationAttemptId;
+  protected MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
+  private final BlockingQueue<Event> pendingEvents = new 
LinkedBlockingQueue<Event>();
+  private long firstEventReceivedTime = -1;
+
+  @SuppressWarnings("rawtypes")
+  protected ValuesIterator vIter;
+
+  private TezCounter inputKeyCounter;
+  private TezCounter inputValueCounter;
+  private TezCounter shuffledInputs;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+  public CelebornOrderedGroupedKVInput(InputContext inputContext, int 
numPhysicalInputs) {
+    super(inputContext, numPhysicalInputs);
+  }
+
+  @Override
+  public synchronized List<Event> initialize() throws IOException {
+    this.conf = TezUtils.createConfFromBaseConfAndPayload(getContext());
+
+    if (this.getNumPhysicalInputs() == 0) {
+      getContext().requestInitialMemory(0l, null);
+      isStarted.set(true);
+      getContext().inputIsReady();
+      LOG.info(
+          "input fetch not required since there are 0 physical inputs for 
input vertex: "
+              + getContext().getInputOutputVertexNames());
+      return Collections.emptyList();
+    }
+
+    long initialMemoryRequest =
+        Shuffle.getInitialMemoryRequirement(conf, 
getContext().getTotalMemoryAvailableToTask());
+    this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
+    getContext().requestInitialMemory(initialMemoryRequest, 
memoryUpdateCallbackHandler);
+
+    this.inputKeyCounter = 
getContext().getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
+    this.inputValueCounter =
+        
getContext().getCounters().findCounter(TaskCounter.REDUCE_INPUT_RECORDS);
+    this.shuffledInputs = 
getContext().getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
+    this.conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, 
getContext().getWorkDirs());
+    this.applicationAttemptId =
+        ApplicationAttemptId.newInstance(
+            getContext().getApplicationId(), 
getContext().getDAGAttemptNumber());
+    return Collections.emptyList();
+  }
+
+  @Override
+  public synchronized void start() throws IOException {
+    if (!isStarted.get()) {
+      memoryUpdateCallbackHandler.validateUpdateReceived();
+      // Start the shuffle - copy and merge
+      shuffle = createShuffle();
+      shuffle.run();
+      LOG.debug("Initialized the handlers in shuffle..Safe to start 
processing..");
+      List<Event> pending = new LinkedList<Event>();
+      pendingEvents.drainTo(pending);
+      if (pending.size() > 0) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+              "NoAutoStart delay in processing first event: "
+                  + (System.currentTimeMillis() - firstEventReceivedTime));
+        }
+        shuffle.handleEvents(pending);
+      }
+      isStarted.set(true);
+    }
+  }
+
+  @VisibleForTesting
+  Shuffle createShuffle() throws IOException {
+    return new CelebornShuffle(
+        getContext(),
+        conf,
+        getNumPhysicalInputs(),
+        memoryUpdateCallbackHandler.getMemoryAssigned(),
+        applicationAttemptId);
+  }
+
+  /**
+   * Check if the input is ready for consumption
+   *
+   * @return true if the input is ready for consumption, or if an error 
occurred processing fetching
+   *     the input. false if the shuffle and merge are still in progress
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  public synchronized boolean isInputReady()
+      throws IOException, InterruptedException, TezException {
+    Preconditions.checkState(isStarted.get(), "Must start input before 
invoking this method");
+    if (getNumPhysicalInputs() == 0) {
+      return true;
+    }
+    return shuffle.isInputReady();
+  }
+
+  /**
+   * Waits for the input to become ready for consumption
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public void waitForInputReady() throws IOException, InterruptedException, 
TezException {
+    // Cannot synchronize entire method since this is called form user code 
and can block.
+    Shuffle localShuffleCopy = null;
+    synchronized (this) {
+      Preconditions.checkState(isStarted.get(), "Must start input before 
invoking this method");
+      if (getNumPhysicalInputs() == 0) {
+        return;
+      }
+      localShuffleCopy = shuffle;
+    }
+
+    TezRawKeyValueIterator localRawIter = localShuffleCopy.waitForInput();
+    synchronized (this) {
+      rawIter = localRawIter;
+      createValuesIterator();
+    }
+  }
+
+  @Override
+  public synchronized List<Event> close() throws IOException {
+    if (this.getNumPhysicalInputs() != 0 && rawIter != null) {
+      rawIter.close();
+    }
+    if (shuffle != null) {
+      shuffle.shutdown();
+    }
+
+    long dataSize =
+        
getContext().getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED).getValue();
+    getContext().getStatisticsReporter().reportDataSize(dataSize);
+    long inputRecords =
+        
getContext().getCounters().findCounter(TaskCounter.REDUCE_INPUT_RECORDS).getValue();
+    getContext().getStatisticsReporter().reportItemsProcessed(inputRecords);
+
+    return Collections.emptyList();
+  }
+
+  /**
+   * Get a KVReader for the Input. This method will block until the input is 
ready - i.e. the copy
+   * and merge stages are complete. Users can use the isInputReady method to 
check if the input is
+   * ready, which gives an indication of whether this method will block or not.
+   *
+   * <p>NOTE: All values for the current K-V pair must be read prior to 
invoking moveToNext. Once
+   * moveToNext() is called, the valueIterator from the previous K-V pair will 
throw an Exception
+   *
+   * @return a KVReader over the sorted input.
+   * @throws {@link IOInterruptedException} if IO was performing a blocking 
operation and was
+   *     interrupted
+   */
+  @Override
+  public KeyValuesReader getReader() throws IOException, TezException {
+    // Cannot synchronize entire method since this is called form user code 
and can block.
+    TezRawKeyValueIterator rawIterLocal;
+    synchronized (this) {
+      rawIterLocal = rawIter;
+      if (getNumPhysicalInputs() == 0) {
+        return new KeyValuesReader() {
+          @Override
+          public boolean next() throws IOException {
+            getContext().notifyProgress();
+            hasCompletedProcessing();
+            completedProcessing = true;
+            return false;
+          }
+
+          @Override
+          public Object getCurrentKey() throws IOException {
+            throw new RuntimeException("No data available in Input");
+          }
+
+          @Override
+          public Iterable<Object> getCurrentValues() throws IOException {
+            throw new RuntimeException("No data available in Input");
+          }
+        };
+      }
+    }
+    if (rawIterLocal == null) {
+      try {
+        waitForInputReady();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IOInterruptedException("Interrupted while waiting for input 
ready", e);
+      }
+    }
+    @SuppressWarnings("rawtypes")
+    ValuesIterator valuesIter = null;
+    synchronized (this) {
+      valuesIter = vIter;
+    }
+    return new OrderedGroupedKeyValuesReader(valuesIter, getContext());
+  }
+
+  @Override
+  public float getProgress() throws ProgressFailedException, 
InterruptedException {
+    int totalInputs = getNumPhysicalInputs();
+    if (totalInputs != 0) {
+      synchronized (this) {
+        return ((0.5f) * this.shuffledInputs.getValue() / totalInputs)
+            + ((rawIter != null) ? ((0.5f) * 
rawIter.getProgress().getProgress()) : 0.0f);
+      }
+    } else {
+      return 0.0f;
+    }
+  }
+
+  @Override
+  public void handleEvents(List<Event> inputEvents) throws IOException {
+    Shuffle shuffleLocalRef;
+    synchronized (this) {
+      if (getNumPhysicalInputs() == 0) {
+        throw new RuntimeException("No input events expected as numInputs is 
0");
+      }
+      if (!isStarted.get()) {
+        if (firstEventReceivedTime == -1) {
+          firstEventReceivedTime = System.currentTimeMillis();
+        }
+        pendingEvents.addAll(inputEvents);
+        return;
+      }
+      shuffleLocalRef = shuffle;
+    }
+    shuffleLocalRef.handleEvents(inputEvents);
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  protected synchronized void createValuesIterator() throws IOException {
+    // Not used by ReduceProcessor
+    RawComparator rawComparator = 
ConfigUtils.getIntermediateInputKeyComparator(conf);
+    Class<?> keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
+    Class<?> valClass = ConfigUtils.getIntermediateInputValueClass(conf);
+    LOG.info(
+        getContext().getInputOutputVertexNames()
+            + ": "
+            + "creating ValuesIterator with "
+            + "comparator="
+            + rawComparator.getClass().getName()
+            + ", keyClass="
+            + keyClass.getName()
+            + ", valClass="
+            + valClass.getName());
+
+    vIter =
+        new ValuesIterator(
+            rawIter, rawComparator, keyClass, valClass, conf, inputKeyCounter, 
inputValueCounter);
+  }
+
+  @SuppressWarnings("rawtypes")
+  public RawComparator getInputKeyComparator() {
+    return (RawComparator) ConfigUtils.getIntermediateInputKeyComparator(conf);
+  }
+
+  @SuppressWarnings("rawtypes")
+  private static class OrderedGroupedKeyValuesReader extends KeyValuesReader {
+
+    private final ValuesIterator valuesIter;
+    private final InputContext context;
+
+    OrderedGroupedKeyValuesReader(ValuesIterator valuesIter, InputContext 
context) {
+      this.valuesIter = valuesIter;
+      this.context = context;
+    }
+
+    @Override
+    public boolean next() throws IOException {
+      context.notifyProgress();
+      return valuesIter.moveToNext();
+    }
+
+    @Override
+    public Object getCurrentKey() throws IOException {
+      return valuesIter.getKey();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Iterable<Object> getCurrentValues() throws IOException {
+      return valuesIter.getValues();
+    }
+  }
+}

Reply via email to