This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 60d9b21f [#854][FOLLOWUP] feat(tez): Add Rss Input Class to begin Tez 
input task (#949)
60d9b21f is described below

commit 60d9b21fe189bf3970186babbb10c883562dbc18
Author: Qing <[email protected]>
AuthorDate: Sun Jun 25 16:52:37 2023 +0800

    [#854][FOLLOWUP] feat(tez): Add Rss Input Class to begin Tez input task 
(#949)
    
    ### What changes were proposed in this pull request?
    
    Add Rss Input Class to begin Tez input task
    
    ### Why are the changes needed?
    
    Fix: https://github.com/apache/incubator-uniffle/issues/854
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    
    unit test
---
 .../input/RssConcatenatedMergedKeyValueInput.java  | 121 ++++++
 .../input/RssConcatenatedMergedKeyValuesInput.java | 123 ++++++
 .../input/RssOrderedGroupedInputLegacy.java        |  96 +++++
 .../library/input/RssOrderedGroupedKVInput.java    | 414 +++++++++++++++++++++
 .../input/RssOrderedGroupedMergedKVInput.java      | 265 +++++++++++++
 .../runtime/library/input/RssUnorderedKVInput.java | 318 ++++++++++++++++
 .../input/RssOrderedGroupedKVInputTest.java        | 133 +++++++
 .../input/RssSortedGroupedMergedInputTest.java     | 270 ++++++++++++++
 8 files changed, 1740 insertions(+)

diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssConcatenatedMergedKeyValueInput.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssConcatenatedMergedKeyValueInput.java
new file mode 100644
index 00000000..4a165088
--- /dev/null
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssConcatenatedMergedKeyValueInput.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.input;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.tez.dag.api.GroupInputEdge;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.MergedInputContext;
+import org.apache.tez.runtime.api.MergedLogicalInput;
+import org.apache.tez.runtime.api.ProgressFailedException;
+import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+/**
+ * Implements a {@link MergedLogicalInput} that merges the incoming inputs
+ * (e.g. from a {@link GroupInputEdge} and provide a unified view of the 
+ * input. It concatenates all the inputs to provide a unified view
+ */
+@Public
+public class RssConcatenatedMergedKeyValueInput extends MergedLogicalInput {
+  private ConcatenatedMergedKeyValueReader concatenatedMergedKeyValueReader;
+
+  public RssConcatenatedMergedKeyValueInput(MergedInputContext context,
+                                            List<Input> inputs) {
+    super(context, inputs);
+  }
+
+  public class ConcatenatedMergedKeyValueReader extends KeyValueReader {
+    private int currentReaderIndex = 0;
+    private KeyValueReader currentReader;
+
+    @Override
+    public boolean next() throws IOException {
+      while ((currentReader == null) || !currentReader.next()) {
+        if (currentReaderIndex == getInputs().size()) {
+          hasCompletedProcessing();
+          completedProcessing = true;
+          getContext().notifyProgress();
+          return false;
+        }
+        try {
+          Reader reader = getInputs().get(currentReaderIndex).getReader();
+          if (!(reader instanceof KeyValueReader)) {
+            throw new TezUncheckedException("Expected KeyValueReader. "
+                + "Got: " + reader.getClass().getName());
+          }
+          currentReader = (KeyValueReader) reader;
+          currentReaderIndex++;
+          getContext().notifyProgress();
+        } catch (Exception e) {
+          // An InterruptedException is not expected here since this works off 
of
+          // underlying readers which take care of throwing 
IOInterruptedExceptions
+          if (e instanceof IOException) {
+            throw (IOException) e;
+          } else {
+            throw new IOException(e);
+          }
+        }
+      }
+      return true;
+    }
+
+    @Override
+    public Object getCurrentKey() throws IOException {
+      return currentReader.getCurrentKey();
+    }
+
+    @Override
+    public Object getCurrentValue() throws IOException {
+      return currentReader.getCurrentValue();
+    }
+
+    public float getProgress() throws IOException, InterruptedException {
+      return (1.0f) * (currentReaderIndex + 1) / getInputs().size();
+    }
+  }
+
+  /**
+   * Provides a {@link KeyValueReader} that iterates over the 
+   * concatenated input data
+   */
+  @Override
+  public KeyValueReader getReader() throws Exception {
+    concatenatedMergedKeyValueReader = new ConcatenatedMergedKeyValueReader();
+    return concatenatedMergedKeyValueReader;
+  }
+
+  @Override
+  public void setConstituentInputIsReady(Input input) {
+    informInputReady();
+  }
+
+  @Override
+  public float getProgress() throws ProgressFailedException, 
InterruptedException {
+    try {
+      return concatenatedMergedKeyValueReader.getProgress();
+    } catch (IOException e) {
+      throw new ProgressFailedException("getProgress encountered IOException 
", e);
+    }
+  }
+}
diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssConcatenatedMergedKeyValuesInput.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssConcatenatedMergedKeyValuesInput.java
new file mode 100644
index 00000000..b9ab19c8
--- /dev/null
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssConcatenatedMergedKeyValuesInput.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.input;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.tez.dag.api.GroupInputEdge;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.MergedInputContext;
+import org.apache.tez.runtime.api.MergedLogicalInput;
+import org.apache.tez.runtime.api.ProgressFailedException;
+import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+
+/**
+ * Implements a {@link MergedLogicalInput} that merges the incoming inputs
+ * (e.g. from a {@link GroupInputEdge} and provide a unified view of the 
+ * input. It concatenates all the inputs to provide a unified view
+ */
+
+@Public
+public class RssConcatenatedMergedKeyValuesInput extends MergedLogicalInput {
+
+  private ConcatenatedMergedKeyValuesReader concatenatedMergedKeyValuesReader;
+
+  public RssConcatenatedMergedKeyValuesInput(MergedInputContext context,
+                                             List<Input> inputs) {
+    super(context, inputs);
+  }
+
+  public class ConcatenatedMergedKeyValuesReader extends KeyValuesReader {
+    private int currentReaderIndex = 0;
+    private KeyValuesReader currentReader;
+
+    @Override
+    public boolean next() throws IOException {
+      while ((currentReader == null) || !currentReader.next()) {
+        if (currentReaderIndex == getInputs().size()) {
+          hasCompletedProcessing();
+          completedProcessing = true;
+          getContext().notifyProgress();
+          return false;
+        }
+        try {
+          Reader reader = getInputs().get(currentReaderIndex).getReader();
+          if (!(reader instanceof KeyValuesReader)) {
+            throw new TezUncheckedException("Expected KeyValuesReader. "
+                + "Got: " + reader.getClass().getName());
+          }
+          currentReader = (KeyValuesReader) reader;
+          currentReaderIndex++;
+          getContext().notifyProgress();
+        } catch (Exception e) {
+          // An InterruptedException is not expected here since this works off 
of
+          // underlying readers which take care of throwing 
IOInterruptedExceptions
+          if (e instanceof IOException) {
+            throw (IOException)e;
+          } else {
+            throw new IOException(e);
+          }
+        }
+      }
+      return true;
+    }
+
+    @Override
+    public Object getCurrentKey() throws IOException {
+      return currentReader.getCurrentKey();
+    }
+
+    @Override
+    public Iterable<Object> getCurrentValues() throws IOException {
+      return currentReader.getCurrentValues();
+    }
+
+    public float getProgress() throws IOException, InterruptedException {
+      return (1.0f) * (currentReaderIndex + 1) / getInputs().size();
+    }
+  }
+   
+  /**
+   * Provides a {@link KeyValuesReader} that iterates over the 
+   * concatenated input data
+   */
+  @Override
+  public KeyValuesReader getReader() throws Exception {
+    concatenatedMergedKeyValuesReader = new 
ConcatenatedMergedKeyValuesReader();
+    return concatenatedMergedKeyValuesReader;
+  }
+
+  @Override
+  public void setConstituentInputIsReady(Input input) {
+    informInputReady();
+  }
+
+  @Override
+  public float getProgress() throws ProgressFailedException, 
InterruptedException {
+    try {
+      return concatenatedMergedKeyValuesReader.getProgress();
+    } catch (IOException e) {
+      throw new ProgressFailedException("getProgress encountered IOException 
", e);
+    }
+  }
+}
diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssOrderedGroupedInputLegacy.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssOrderedGroupedInputLegacy.java
new file mode 100644
index 00000000..40d599fe
--- /dev/null
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssOrderedGroupedInputLegacy.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.Progress;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+
+import org.apache.uniffle.common.exception.RssException;
+
+@Private
+public class RssOrderedGroupedInputLegacy extends RssOrderedGroupedKVInput {
+  private final Progress progress = new Progress();
+
+  public RssOrderedGroupedInputLegacy(InputContext inputContext, int 
numPhysicalInputs) {
+    super(inputContext, numPhysicalInputs);
+  }
+
+  /**
+   *
+   * @return
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TezException
+   */
+  @Private
+  public TezRawKeyValueIterator getIterator() throws IOException, 
InterruptedException, TezException {
+    // wait for input so that iterator is available
+    synchronized (this) {
+      if (getNumPhysicalInputs() == 0) {
+        return new TezRawKeyValueIterator() {
+          @Override
+          public DataInputBuffer getKey() throws IOException {
+            throw new RssException("No data available in Input");
+          }
+
+          @Override
+          public DataInputBuffer getValue() throws IOException {
+            throw new RssException("No data available in Input");
+          }
+
+          @Override
+          public boolean next() throws IOException {
+            return false;
+          }
+
+          @Override
+          public boolean hasNext() throws IOException {
+            return false;
+          }
+
+          @Override
+          public void close() throws IOException {
+          }
+
+          @Override
+          public Progress getProgress() {
+            progress.complete();
+            return progress;
+          }
+
+          @Override
+          public boolean isSameKey() throws IOException {
+            throw new UnsupportedOperationException("isSameKey is not 
supported");
+          }
+        };
+      }
+    }
+
+    waitForInputReady();
+    synchronized (this) {
+      return rawIter;
+    }
+  }
+}
diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssOrderedGroupedKVInput.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssOrderedGroupedKVInput.java
new file mode 100644
index 00000000..d18c6c48
--- /dev/null
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssOrderedGroupedKVInput.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.input;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.api.ProgressFailedException;
+import org.apache.tez.runtime.library.api.IOInterruptedException;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
+import org.apache.tez.runtime.library.common.ValuesIterator;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.RssShuffle;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.exception.RssException;
+
+/**
+ * {@link RssOrderedGroupedKVInput} in a {@link AbstractLogicalInput} which 
shuffles
+ * intermediate sorted data, merges them and provides key/<values> to the
+ * consumer. This is typically used to bring one partition of a set of 
partitioned
+ * distributed data to one consumer. The shuffle operation brings all 
partitions
+ * to one place. These partitions are assumed to be sorted and are merged 
sorted to
+ * merge them into a single input view.
+ *
+ * The Copy and Merge will be triggered by the initialization - which is 
handled
+ * by the Tez framework. Input is not consumable until the Copy and Merge are
+ * complete. Methods are provided to check for this, as well as to wait for
+ * completion. Attempting to get a reader on a non-complete input will block.
+ */
+@Public
+public class RssOrderedGroupedKVInput extends AbstractLogicalInput {
+
+  static final Logger LOG = 
LoggerFactory.getLogger(RssOrderedGroupedKVInput.class);
+
+  protected TezRawKeyValueIterator rawIter = null;
+  protected Configuration conf;
+  protected RssShuffle shuffle;
+  protected MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
+  private final BlockingQueue<Event> pendingEvents = new 
LinkedBlockingQueue<>();
+  private long firstEventReceivedTime = -1;
+  @SuppressWarnings("rawtypes")
+  protected ValuesIterator vIter;
+
+  private TezCounter inputKeyCounter;
+  private TezCounter inputValueCounter;
+  private TezCounter shuffledInputs;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+  public RssOrderedGroupedKVInput(InputContext inputContext, int 
numPhysicalInputs) {
+    super(inputContext, numPhysicalInputs);
+  }
+
+  @Override
+  public synchronized List<Event> initialize() throws IOException {
+    this.conf = 
TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+
+    if (this.getNumPhysicalInputs() == 0) {
+      getContext().requestInitialMemory(0L, null);
+      isStarted.set(true);
+      getContext().inputIsReady();
+      LOG.info("input fetch not required since there are 0 physical inputs for 
input vertex: "
+              + getContext().getSourceVertexName());
+      return Collections.emptyList();
+    }
+
+    long initialMemoryRequest = RssShuffle.getInitialMemoryRequirement(conf,
+            getContext().getTotalMemoryAvailableToTask());
+    this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
+    getContext().requestInitialMemory(initialMemoryRequest, 
memoryUpdateCallbackHandler);
+
+    this.inputKeyCounter = 
getContext().getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
+    this.inputValueCounter = 
getContext().getCounters().findCounter(TaskCounter.REDUCE_INPUT_RECORDS);
+    this.shuffledInputs = 
getContext().getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
+    this.conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, 
getContext().getWorkDirs());
+    return Collections.emptyList();
+  }
+
+  @Override
+  public synchronized void start() throws IOException {
+    if (!isStarted.get()) {
+      memoryUpdateCallbackHandler.validateUpdateReceived();
+      // Start the shuffle - copy and merge
+      shuffle = createRssShuffle();
+      shuffle.run();
+      List<Event> pending = new LinkedList<>();
+      pendingEvents.drainTo(pending);
+      if (pending.size() > 0) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("NoAutoStart delay in processing first event: "
+                  + (System.currentTimeMillis() - firstEventReceivedTime));
+        }
+        shuffle.handleEvents(pending);
+      }
+      isStarted.set(true);
+    }
+  }
+
+  @VisibleForTesting
+  RssShuffle createRssShuffle() throws IOException {
+    return new RssShuffle(getContext(), conf, getNumPhysicalInputs(), 
memoryUpdateCallbackHandler.getMemoryAssigned());
+  }
+
+  /**
+   * Check if the input is ready for consumption
+   *
+   * @return true if the input is ready for consumption, or if an error 
occurred
+   *         processing fetching the input. false if the shuffle and merge are
+   *         still in progress
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  public synchronized boolean isInputReady() throws IOException, 
InterruptedException, TezException {
+    Preconditions.checkState(isStarted.get(), "Must start input before 
invoking this method");
+    if (getNumPhysicalInputs() == 0) {
+      return true;
+    }
+    return shuffle.isInputReady();
+  }
+
+  /**
+   * Waits for the input to become ready for consumption
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public void waitForInputReady() throws IOException, InterruptedException, 
TezException {
+    // Cannot synchronize entire method since this is called form user code 
and can block.
+    RssShuffle localShuffleCopy = null;
+    synchronized (this) {
+      Preconditions.checkState(isStarted.get(), "Must start input before 
invoking this method");
+      if (getNumPhysicalInputs() == 0) {
+        return;
+      }
+      localShuffleCopy = shuffle;
+    }
+
+    TezRawKeyValueIterator localRawIter = localShuffleCopy.waitForInput();
+    synchronized (this) {
+      rawIter = localRawIter;
+      createValuesIterator();
+    }
+  }
+
+  @Override
+  public synchronized List<Event> close() throws IOException {
+    if (this.getNumPhysicalInputs() != 0 && rawIter != null) {
+      rawIter.close();
+    }
+    if (shuffle != null) {
+      shuffle.shutdown();
+    }
+
+    long dataSize = getContext().getCounters()
+            .findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED).getValue();
+    getContext().getStatisticsReporter().reportDataSize(dataSize);
+    long inputRecords = getContext().getCounters()
+            .findCounter(TaskCounter.REDUCE_INPUT_RECORDS).getValue();
+    getContext().getStatisticsReporter().reportItemsProcessed(inputRecords);
+
+    return Collections.emptyList();
+  }
+
+  /**
+   * Get a KVReader for the Input.</p> This method will block until the input 
is
+   * ready - i.e. the copy and merge stages are complete. Users can use the
+   * isInputReady method to check if the input is ready, which gives an
+   * indication of whether this method will block or not.
+   *
+   * NOTE: All values for the current K-V pair must be read prior to invoking
+   * moveToNext. Once moveToNext() is called, the valueIterator from the
+   * previous K-V pair will throw an Exception
+   *
+   * @return a KVReader over the sorted input.
+   * @throws {@link IOInterruptedException} if IO was performing a blocking 
operation and was interrupted
+   */
+  @Override
+  public KeyValuesReader getReader() throws IOException, TezException {
+    // Cannot synchronize entire method since this is called form user code 
and can block.
+    TezRawKeyValueIterator rawIterLocal;
+    synchronized (this) {
+      rawIterLocal = rawIter;
+      if (getNumPhysicalInputs() == 0) {
+        return new KeyValuesReader() {
+          @Override
+          public boolean next() throws IOException {
+            getContext().notifyProgress();
+            hasCompletedProcessing();
+            completedProcessing = true;
+            return false;
+          }
+
+          @Override
+          public Object getCurrentKey() throws IOException {
+            throw new RssException("No data available in Input");
+          }
+
+          @Override
+          public Iterable<Object> getCurrentValues() throws IOException {
+            throw new RssException("No data available in Input");
+          }
+        };
+      }
+    }
+    if (rawIterLocal == null) {
+      try {
+        waitForInputReady();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IOInterruptedException("Interrupted while waiting for input 
ready", e);
+      }
+    }
+    @SuppressWarnings("rawtypes")
+    ValuesIterator valuesIter = null;
+    synchronized (this) {
+      valuesIter = vIter;
+    }
+    return new OrderedGroupedKeyValuesReader(valuesIter, getContext());
+  }
+
+  // This could modify to progress of partition, but does not influence the 
task execution.
+  @Override
+  public float getProgress() throws ProgressFailedException, 
InterruptedException {
+    int totalInputs = getNumPhysicalInputs();
+    if (totalInputs != 0) {
+      synchronized (this) {
+        return ((0.5f) * this.shuffledInputs.getValue() / totalInputs)
+                + ((rawIter != null) ? ((0.5f) * 
rawIter.getProgress().getProgress()) : 0.0f);
+      }
+    } else {
+      return 0.0f;
+    }
+  }
+
+  @Override
+  public void handleEvents(List<Event> inputEvents) throws IOException {
+    RssShuffle shuffleLocalRef;
+    synchronized (this) {
+      if (getNumPhysicalInputs() == 0) {
+        throw new RssException("No input events expected as numInputs is 0");
+      }
+      if (!isStarted.get()) {
+        if (firstEventReceivedTime == -1) {
+          firstEventReceivedTime = System.currentTimeMillis();
+        }
+        pendingEvents.addAll(inputEvents);
+        return;
+      }
+      shuffleLocalRef = shuffle;
+    }
+    shuffleLocalRef.handleEvents(inputEvents);
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  protected synchronized void createValuesIterator()
+          throws IOException {
+    // Not used by ReduceProcessor
+    RawComparator rawComparator = 
ConfigUtils.getIntermediateInputKeyComparator(conf);
+    Class<?> keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
+    Class<?> valClass = ConfigUtils.getIntermediateInputValueClass(conf);
+    LOG.info(getContext().getSourceVertexName() + ": " + "creating 
ValuesIterator with "
+            + "comparator=" + rawComparator.getClass().getName()
+            + ", keyClass=" + keyClass.getName()
+            + ", valClass=" + valClass.getName());
+    vIter = new ValuesIterator(rawIter, rawComparator, keyClass, valClass,
+            conf, inputKeyCounter, inputValueCounter);
+  }
+
+  @SuppressWarnings("rawtypes")
+  public RawComparator getInputKeyComparator() {
+    return (RawComparator) ConfigUtils.getIntermediateInputKeyComparator(conf);
+  }
+
+  @SuppressWarnings("rawtypes")
+  private static class OrderedGroupedKeyValuesReader extends KeyValuesReader {
+
+    private final ValuesIterator valuesIter;
+    private final InputContext context;
+
+    OrderedGroupedKeyValuesReader(ValuesIterator valuesIter, InputContext 
context) {
+      this.valuesIter = valuesIter;
+      this.context = context;
+    }
+
+    @Override
+    public boolean next() throws IOException {
+      context.notifyProgress();
+      return valuesIter.moveToNext();
+    }
+
+    @Override
+    public Object getCurrentKey() throws IOException {
+      Object key =  valuesIter.getKey();
+      return key;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Iterable<Object> getCurrentValues() throws IOException {
+      return valuesIter.getValues();
+    }
+  }
+
+
+  private static final Set<String> CONF_KEYS = new HashSet<String>();
+
+  static {
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES);
+    
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT);
+    
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE);
+    
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT);
+    
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED);
+    
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL);
+    
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM);
+    
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT);
+    
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT);
+    
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM);
+    CONF_KEYS.add(TezRuntimeConfiguration
+            .TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT);
+    CONF_KEYS.add(TezRuntimeConfiguration
+            .TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION);
+    CONF_KEYS.add(TezRuntimeConfiguration
+            .TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST);
+    CONF_KEYS.add(TezRuntimeConfiguration
+            .TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION);
+    CONF_KEYS.add(TezRuntimeConfiguration
+            .TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION);
+    CONF_KEYS.add(TezRuntimeConfiguration
+            .TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION);
+    CONF_KEYS.add(TezRuntimeConfiguration
+            .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION);
+    
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL);
+    
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC);
+    
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH);
+    
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
+    CONF_KEYS.add(TezConfiguration.TEZ_COUNTERS_MAX);
+    CONF_KEYS.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH);
+    CONF_KEYS.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);
+    CONF_KEYS.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS);
+    
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
+    CONF_KEYS.add(Constants.TEZ_RUNTIME_TASK_MEMORY);
+    CONF_KEYS.add(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID);
+  }
+
+  // Maybe add helper methods to extract keys
+  // Maybe add constants or an Enum to access the keys
+
+  @InterfaceAudience.Private
+  public static Set<String> getConfigurationKeySet() {
+    return Collections.unmodifiableSet(CONF_KEYS);
+  }
+}
diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssOrderedGroupedMergedKVInput.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssOrderedGroupedMergedKVInput.java
new file mode 100644
index 00000000..d03a8cdb
--- /dev/null
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssOrderedGroupedMergedKVInput.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.input;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.MergedInputContext;
+import org.apache.tez.runtime.api.MergedLogicalInput;
+import org.apache.tez.runtime.api.ProgressFailedException;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.exception.RssException;
+
+/**
+ * A {@link MergedLogicalInput} which merges multiple
+ * {@link OrderedGroupedKVInput}s and returns a single view of these by merging
+ * values which belong to the same key.
+ * 
+ * Combiners and Secondary Sort are not implemented, so there is no guarantee 
on
+ * the order of values.
+ */
+@Public
+public class RssOrderedGroupedMergedKVInput extends MergedLogicalInput {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RssOrderedGroupedMergedKVInput.class);
+  private final Set<Input> completedInputs = Collections
+      .newSetFromMap(new IdentityHashMap<Input, Boolean>());
+
+  public RssOrderedGroupedMergedKVInput(MergedInputContext context, 
List<Input> inputs) {
+    super(context, inputs);
+  }
+
+  /**
+   * Provides an ordered {@link KeyValuesReader}
+   */
+  @Override
+  public KeyValuesReader getReader() throws Exception {
+    return new OrderedGroupedMergedKeyValuesReader(getInputs(), getContext());
+  }
+
+  @Override
+  public void setConstituentInputIsReady(Input input) {
+    synchronized (completedInputs) {
+      completedInputs.add(input);
+      if (completedInputs.size() == getInputs().size()) {
+        informInputReady();
+      }
+    }
+  }
+
+  private static class OrderedGroupedMergedKeyValuesReader extends 
KeyValuesReader {
+    private final PriorityQueue<KeyValuesReader> pQueue;
+    @SuppressWarnings("rawtypes")
+    private final RawComparator keyComparator;
+    private final List<KeyValuesReader> finishedReaders;
+    private final ValuesIterable currentValues;
+    private KeyValuesReader nextKVReader;
+    private Object currentKey;
+    private final MergedInputContext context;
+
+    OrderedGroupedMergedKeyValuesReader(List<Input> inputs, MergedInputContext 
context)
+        throws Exception {
+      keyComparator = ((OrderedGroupedKVInput) inputs.get(0))
+          .getInputKeyComparator();
+      pQueue = new PriorityQueue<KeyValuesReader>(inputs.size(),
+          new KVReaderComparator(keyComparator));
+      finishedReaders = new ArrayList<KeyValuesReader>(inputs.size());
+      for (Input input : inputs) {
+        KeyValuesReader reader = (KeyValuesReader) input.getReader();
+        if (reader.next()) {
+          pQueue.add(reader);
+        }
+      }
+      currentValues = new ValuesIterable();
+      this.context = context;
+    }
+
+    private void advanceAndAddToQueue(KeyValuesReader kvsReadr)
+        throws IOException {
+      if (kvsReadr.next()) {
+        pQueue.add(kvsReadr);
+      }
+    }
+
+    private void addToQueue(KeyValuesReader kvsReadr) throws IOException {
+      if (kvsReadr != null) {
+        pQueue.add(kvsReadr);
+      }
+    }
+
+    @Override
+    public boolean next() throws IOException {
+      // Skip values of current key if not consumed by the user
+      currentValues.discardCurrent();
+
+      for (KeyValuesReader reader : finishedReaders) {
+        // add them back to queue
+        advanceAndAddToQueue(reader);
+      }
+      finishedReaders.clear();
+
+      nextKVReader = pQueue.poll();
+      context.notifyProgress();
+      if (nextKVReader != null) {
+        currentKey = nextKVReader.getCurrentKey();
+        currentValues.moveToNext();
+        return true;
+      } else {
+        hasCompletedProcessing();
+        completedProcessing = true;
+      }
+      return false;
+    }
+
+    @Override
+    public Object getCurrentKey() throws IOException {
+      return currentKey;
+    }
+
+    @Override
+    public Iterable<Object> getCurrentValues() throws IOException {
+      return currentValues;
+    }
+
+    private class ValuesIterable implements Iterable<Object> {
+      private ValuesIterator iterator = new ValuesIterator();
+
+      @Override
+      public Iterator<Object> iterator() {
+        return iterator;
+      }
+
+      public void discardCurrent() throws IOException {
+        iterator.discardCurrent();
+      }
+
+      public void moveToNext() throws IOException {
+        iterator.moveToNext();
+      }
+
+    }
+
+    @SuppressWarnings("unchecked")
+    private class ValuesIterator implements Iterator<Object> {
+
+      private Iterator<Object> currentValuesIter;
+
+      public void moveToNext() throws IOException {
+        currentValuesIter = nextKVReader.getCurrentValues().iterator();
+      }
+
+      @Override
+      public boolean hasNext() {
+        if (currentValuesIter != null) { // No current key. next needs to be 
called.
+          if (currentValuesIter.hasNext()) {
+            return true;
+          } else {
+            finishedReaders.add(nextKVReader);
+            nextKVReader = pQueue.poll();
+            try {
+              if (nextKVReader != null
+                  && keyComparator.compare(currentKey, 
nextKVReader.getCurrentKey()) == 0) {
+                currentValuesIter = nextKVReader.getCurrentValues().iterator();
+                return true;
+              } else { // key changed or no more data.
+                // Add the reader back to queue
+                addToQueue(nextKVReader);
+                currentValuesIter = null;
+                return false;
+              }
+            } catch (IOException e) {
+              throw new RssException(e);
+            }
+          }
+        } else {
+          return false;
+        }
+      }
+
+      public void discardCurrent() throws IOException {
+        if (currentValuesIter != null) {
+          do {
+            finishedReaders.add(nextKVReader);
+            nextKVReader = pQueue.poll();
+          } while (nextKVReader != null
+              && keyComparator.compare(currentKey, 
nextKVReader.getCurrentKey()) == 0);
+          addToQueue(nextKVReader);
+          currentValuesIter = null;
+        }
+      }
+
+      @Override
+      public Object next() {
+        return currentValuesIter.next();
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+    }
+
+    /**
+     * Comparator that compares KeyValuesReader on their current key
+     */
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    private static class KVReaderComparator implements
+        Comparator<KeyValuesReader> {
+
+      private RawComparator keyComparator;
+
+      KVReaderComparator(RawComparator keyComparator) {
+        this.keyComparator = keyComparator;
+      }
+
+      @Override
+      public int compare(KeyValuesReader o1, KeyValuesReader o2) {
+        try {
+          return keyComparator.compare(o1.getCurrentKey(), o2.getCurrentKey());
+        } catch (IOException e) {
+          LOG.error("Caught exception while comparing keys in shuffle input", 
e);
+          throw new RssException(e);
+        }
+      }
+    }
+  }
+
+  @Override
+  public float getProgress() throws ProgressFailedException, 
InterruptedException {
+    float totalProgress = 0.0f;
+    for (Input input : getInputs()) {
+      totalProgress += ((OrderedGroupedKVInput)input).getProgress();
+    }
+    return (1.0f) * totalProgress / getInputs().size();
+  }
+}
diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssUnorderedKVInput.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssUnorderedKVInput.java
new file mode 100644
index 00000000..c1ac3119
--- /dev/null
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssUnorderedKVInput.java
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.input;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.api.ProgressFailedException;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
+import org.apache.tez.runtime.library.common.readers.UnorderedKVReader;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.impl.RssShuffleManager;
+import 
org.apache.tez.runtime.library.common.shuffle.impl.RssSimpleFetchedInputAllocator;
+import 
org.apache.tez.runtime.library.common.shuffle.impl.ShuffleInputEventHandlerImpl;
+import 
org.apache.tez.runtime.library.common.shuffle.impl.SimpleFetchedInputAllocator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.exception.RssException;
+
+/**
+ * {@link RssUnorderedKVInput} provides unordered key value input by
+ * bringing together (shuffling) a set of distributed data and providing a
+ * unified view to that data. There are no ordering constraints applied by
+ * this input.
+ */
+@Public
+public class RssUnorderedKVInput extends AbstractLogicalInput {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RssUnorderedKVInput.class);
+
+  private Configuration conf;
+  private RssShuffleManager rssShuffleManager;
+  private final BlockingQueue<Event> pendingEvents = new 
LinkedBlockingQueue<>();
+  private long firstEventReceivedTime = -1;
+  private MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
+  @SuppressWarnings("rawtypes")
+  private UnorderedKVReader kvReader;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private TezCounter inputRecordCounter;
+
+  private SimpleFetchedInputAllocator inputManager;
+  private ShuffleEventHandler inputEventHandler;
+
+  public RssUnorderedKVInput(InputContext inputContext, int numPhysicalInputs) 
{
+    super(inputContext, numPhysicalInputs);
+  }
+
+  @Override
+  public synchronized List<Event> initialize() throws Exception {
+    LOG.info("RssUnorderedKVInput initialize, num of physicalInputs:{}", 
getNumPhysicalInputs());
+    Preconditions.checkArgument(getNumPhysicalInputs() != -1, "Number of 
Inputs has not been set");
+    this.conf = 
TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+
+    if (getNumPhysicalInputs() == 0) {
+      getContext().requestInitialMemory(0L, null);
+      isStarted.set(true);
+      getContext().inputIsReady();
+      LOG.info("input fetch not required since there are 0 physical inputs for 
input vertex: "
+          + getContext().getSourceVertexName());
+      return Collections.emptyList();
+    } else {
+      long initialMemReq = getInitialMemoryReq();
+      memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
+      this.getContext().requestInitialMemory(initialMemReq, 
memoryUpdateCallbackHandler);
+    }
+
+    this.conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, 
getContext().getWorkDirs());
+    this.inputRecordCounter = getContext().getCounters().findCounter(
+        TaskCounter.INPUT_RECORDS_PROCESSED);
+    return Collections.emptyList();
+  }
+
+  @Override
+  public synchronized void start() throws IOException {
+    if (!isStarted.get()) {
+      ////// Initial configuration
+      memoryUpdateCallbackHandler.validateUpdateReceived();
+      CompressionCodec codec;
+      if (ConfigUtils.isIntermediateInputCompressed(conf)) {
+        Class<? extends CompressionCodec> codecClass = ConfigUtils
+            .getIntermediateInputCompressorClass(conf, DefaultCodec.class);
+        codec = ReflectionUtils.newInstance(codecClass, conf);
+      } else {
+        codec = null;
+      }
+
+      boolean ifileReadAhead = 
conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
+          TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
+      int ifileReadAheadLength = 0;
+      int ifileBufferSize = 0;
+
+      if (ifileReadAhead) {
+        ifileReadAheadLength = 
conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
+            TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
+      }
+      ifileBufferSize = conf.getInt("io.file.buffer.size",
+          TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
+
+      LOG.info("RssUnorderedKVInput, totalMemoryAvailable:{}, available 
memory:{}",
+          getContext().getTotalMemoryAvailableToTask(), 
memoryUpdateCallbackHandler.getMemoryAssigned());
+
+      boolean compositeFetch = ShuffleUtils.isTezShuffleHandler(conf);
+
+      this.inputManager = new RssSimpleFetchedInputAllocator(
+          TezUtilsInternal.cleanVertexName(getContext().getSourceVertexName()),
+          getContext().getUniqueIdentifier(),
+          getContext().getDagIdentifier(), conf,
+          getContext().getTotalMemoryAvailableToTask(),
+          memoryUpdateCallbackHandler.getMemoryAssigned());
+
+      this.rssShuffleManager = new RssShuffleManager(getContext(), conf, 
getNumPhysicalInputs(), ifileBufferSize,
+          ifileReadAhead, ifileReadAheadLength, codec, inputManager);
+
+      this.inputEventHandler = new ShuffleInputEventHandlerImpl(getContext(), 
rssShuffleManager,
+          inputManager, codec, ifileReadAhead, ifileReadAheadLength, 
compositeFetch);
+
+      ////// End of Initial configuration
+
+      this.rssShuffleManager.run();
+      this.kvReader = createReader(inputRecordCounter, codec,
+          ifileBufferSize, ifileReadAhead, ifileReadAheadLength);
+      List<Event> pending = new LinkedList<>();
+      pendingEvents.drainTo(pending);
+      if (pending.size() > 0) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(getContext().getSourceVertexName() + ": " + "NoAutoStart 
delay in processing first event: "
+              + (System.currentTimeMillis() - firstEventReceivedTime));
+        }
+        inputEventHandler.handleEvents(pending);
+      }
+      isStarted.set(true);
+    }
+  }
+
+  @Override
+  public synchronized KeyValueReader getReader() throws Exception {
+    Preconditions.checkState(isStarted.get(), "Must start input before 
invoking this method");
+    if (getNumPhysicalInputs() == 0) {
+      return new KeyValueReader() {
+        @Override
+        public boolean next() throws IOException {
+          getContext().notifyProgress();
+          hasCompletedProcessing();
+          completedProcessing = true;
+          return false;
+        }
+
+        @Override
+        public Object getCurrentKey() throws IOException {
+          throw new RssException("No data available in Input");
+        }
+
+        @Override
+        public Object getCurrentValue() throws IOException {
+          throw new RssException("No data available in Input");
+        }
+      };
+    }
+    return this.kvReader;
+  }
+
+  @Override
+  public void handleEvents(List<Event> inputEvents) throws IOException {
+    ShuffleEventHandler inputEventHandlerLocalRef;
+    synchronized (this) {
+      if (getNumPhysicalInputs() == 0) {
+        throw new RssException("No input events expected as numInputs is 0");
+      }
+      if (!isStarted.get()) {
+        if (firstEventReceivedTime == -1) {
+          firstEventReceivedTime = System.currentTimeMillis();
+        }
+        // This queue will keep growing if the Processor decides never to
+        // start the event. The Input, however has no idea, on whether start
+        // will be invoked or not.
+        pendingEvents.addAll(inputEvents);
+        return;
+      }
+      inputEventHandlerLocalRef = inputEventHandler;
+    }
+    inputEventHandlerLocalRef.handleEvents(inputEvents);
+  }
+
+  @Override
+  public synchronized List<Event> close() throws Exception {
+    if (this.inputEventHandler != null) {
+      this.inputEventHandler.logProgress(true);
+    }
+
+    if (this.rssShuffleManager != null) {
+      this.rssShuffleManager.shutdown();
+    }
+
+    long dataSize = getContext().getCounters()
+        .findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED).getValue();
+    getContext().getStatisticsReporter().reportDataSize(dataSize);
+    long inputRecords = getContext().getCounters()
+        .findCounter(TaskCounter.INPUT_RECORDS_PROCESSED).getValue();
+    getContext().getStatisticsReporter().reportItemsProcessed(inputRecords);
+    return null;
+  }
+
+  private long getInitialMemoryReq() {
+    return SimpleFetchedInputAllocator.getInitialMemoryReq(conf,
+        getContext().getTotalMemoryAvailableToTask());
+  }
+
+
+  @SuppressWarnings("rawtypes")
+  private UnorderedKVReader createReader(TezCounter inputRecordCounter, 
CompressionCodec codec,
+                                         int ifileBufferSize, boolean 
ifileReadAheadEnabled, int ifileReadAheadLength)
+      throws IOException {
+    return new UnorderedKVReader(rssShuffleManager, conf, null, 
ifileReadAheadEnabled,
+        0, ifileBufferSize, inputRecordCounter, getContext());
+  }
+
+  private static final Set<String> CONF_KEYS = new HashSet<>();
+
+  static {
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES);
+    
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT);
+    
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE);
+    
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT);
+    
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED);
+    
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL);
+    
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM);
+    
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT);
+    
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT);
+    
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL);
+    
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH);
+    CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH);
+    
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
+    CONF_KEYS.add(TezConfiguration.TEZ_COUNTERS_MAX);
+    CONF_KEYS.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH);
+    CONF_KEYS.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);
+    CONF_KEYS.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS);
+    
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
+    CONF_KEYS.add(Constants.TEZ_RUNTIME_TASK_MEMORY);
+    CONF_KEYS.add(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID);
+  }
+
+  // Maybe add helper methods to extract keys
+  // Maybe add constants or an Enum to access the keys
+
+  @InterfaceAudience.Private
+  public static Set<String> getConfigurationKeySet() {
+    return Collections.unmodifiableSet(CONF_KEYS);
+  }
+
+  @Override
+  public float getProgress() throws ProgressFailedException, 
InterruptedException {
+    try {
+      return kvReader.getProgress();
+    } catch (IOException e) {
+      throw new ProgressFailedException("getProgress encountered IOException 
", e);
+    }
+  }
+}
diff --git 
a/client-tez/src/test/java/org/apache/tez/runtime/library/input/RssOrderedGroupedKVInputTest.java
 
b/client-tez/src/test/java/org/apache/tez/runtime/library/input/RssOrderedGroupedKVInputTest.java
new file mode 100644
index 00000000..1502c95d
--- /dev/null
+++ 
b/client-tez/src/test/java/org/apache/tez/runtime/library/input/RssOrderedGroupedKVInputTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.input;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.IdUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.api.ExecutionContext;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
+import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.RssShuffle;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.Shuffle;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class RssOrderedGroupedKVInputTest {
+
+  @Test
+  @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
+  public void testInterruptWhileAwaitingInput() throws IOException {
+    try (MockedStatic<IdUtils> idUtils = Mockito.mockStatic(IdUtils.class)) {
+      ApplicationId appId = ApplicationId.newInstance(9999, 72);
+      ApplicationAttemptId appAttemptId = 
ApplicationAttemptId.newInstance(appId, 1);
+      idUtils.when(IdUtils::getApplicationAttemptId).thenReturn(appAttemptId);
+      try (MockedStatic<ShuffleUtils> shuffleUtils = 
Mockito.mockStatic(ShuffleUtils.class)) {
+        shuffleUtils.when(() -> 
ShuffleUtils.deserializeShuffleProviderMetaData(any())).thenReturn(4);
+        InputContext inputContext = createMockInputContext();
+        RssOrderedGroupedKVInput kvInput = new 
OrderedGroupedKVInputForTest(inputContext, 10);
+        kvInput.initialize();
+        kvInput.start();
+        try {
+          kvInput.getReader();
+          fail("getReader should not return since underlying inputs are not 
ready");
+        } catch (Exception e) {
+          assertTrue(e instanceof RssShuffle.RssShuffleError);
+        }
+      }
+    }
+  }
+
+  private InputContext createMockInputContext() throws IOException {
+    InputContext inputContext = mock(InputContext.class);
+    doReturn("Map 1").when(inputContext).getSourceVertexName();
+    doReturn("Reducer 1").when(inputContext).getTaskVertexName();
+    
when(inputContext.getUniqueIdentifier()).thenReturn("attempt_1685094627632_0157_1_01_000000_0_10006");
+
+    ApplicationId applicationId = ApplicationId.newInstance(1, 1);
+    doReturn(applicationId).when(inputContext).getApplicationId();
+
+    ExecutionContext executionContext = new ExecutionContextImpl("localhost");
+    doReturn(executionContext).when(inputContext).getExecutionContext();
+
+    Configuration conf = new TezConfiguration();
+    UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
+    String[] workingDirs = new String[]{"workDir1"};
+    TezCounters counters = new TezCounters();
+
+    doReturn(payLoad).when(inputContext).getUserPayload();
+    doReturn(workingDirs).when(inputContext).getWorkDirs();
+    doReturn(200 * 1024 * 
1024L).when(inputContext).getTotalMemoryAvailableToTask();
+    doReturn(counters).when(inputContext).getCounters();
+
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        Object[] args = invocation.getArguments();
+
+        if (args[1] instanceof MemoryUpdateCallbackHandler) {
+          MemoryUpdateCallbackHandler memUpdateCallbackHandler = 
(MemoryUpdateCallbackHandler) args[1];
+          memUpdateCallbackHandler.memoryAssigned(200 * 1024 * 1024);
+        } else {
+          fail();
+        }
+        return null;
+      }
+    }).when(inputContext).requestInitialMemory(any(long.class),
+        any(MemoryUpdateCallbackHandler.class));
+    return inputContext;
+  }
+
+  static class OrderedGroupedKVInputForTest extends RssOrderedGroupedKVInput {
+    OrderedGroupedKVInputForTest(InputContext inputContext, int 
numPhysicalInputs) {
+      super(inputContext, numPhysicalInputs);
+    }
+
+    Shuffle createShuffle() throws IOException {
+      Shuffle shuffle = mock(Shuffle.class);
+      try {
+        doThrow(new InterruptedException()).when(shuffle).waitForInput();
+      } catch (InterruptedException e) {
+        fail();
+      } catch (Exception e) {
+        fail();
+      }
+      return shuffle;
+    }
+  }
+}
diff --git 
a/client-tez/src/test/java/org/apache/tez/runtime/library/input/RssSortedGroupedMergedInputTest.java
 
b/client-tez/src/test/java/org/apache/tez/runtime/library/input/RssSortedGroupedMergedInputTest.java
new file mode 100644
index 00000000..365166d3
--- /dev/null
+++ 
b/client-tez/src/test/java/org/apache/tez/runtime/library/input/RssSortedGroupedMergedInputTest.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.input;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.MergedInputContext;
+import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class RssSortedGroupedMergedInputTest {
+  MergedInputContext createMergedInputContext() {
+    return mock(MergedInputContext.class);
+  }
+
+  @Test
+  @Timeout(value = 5000, unit = TimeUnit.MILLISECONDS)
+  public void testSimpleConcatenatedMergedKeyValueInput() throws Exception {
+
+    DummyInput sInput1 = new DummyInput(10);
+    DummyInput sInput2 = new DummyInput(10);
+    DummyInput sInput3 = new DummyInput(10);
+
+    List<Input> sInputs = new LinkedList<Input>();
+    sInputs.add(sInput1);
+    sInputs.add(sInput2);
+    sInputs.add(sInput3);
+    MergedInputContext mockContext = createMergedInputContext();
+    ConcatenatedMergedKeyValueInput input = new 
ConcatenatedMergedKeyValueInput(mockContext, sInputs);
+
+    KeyValueReader kvReader = input.getReader();
+    int keyCount = 0;
+    while (kvReader.next()) {
+      keyCount++;
+      Integer key = (Integer) kvReader.getCurrentKey();
+      Integer value = (Integer) kvReader.getCurrentValue();
+    }
+    assertTrue(keyCount == 30);
+    // one for each reader change and one to exit
+    verify(mockContext, times(4)).notifyProgress();
+
+    getNextFromFinishedReader(kvReader);
+  }
+
+  @Test
+  @Timeout(value = 5000, unit = TimeUnit.MILLISECONDS)
+  public void testSimpleConcatenatedMergedKeyValuesInput() throws Exception {
+    SortedTestKeyValuesReader kvsReader1 = new SortedTestKeyValuesReader(new 
int[] { 1, 2, 3 },
+        new int[][] { { 1, 1 }, { 2, 2 }, { 3, 3 } });
+
+    SortedTestKeyValuesReader kvsReader2 = new SortedTestKeyValuesReader(new 
int[] { 1, 2, 3 },
+        new int[][] { { 1, 1 }, { 2, 2 }, { 3, 3 } });
+
+    SortedTestKeyValuesReader kvsReader3 = new SortedTestKeyValuesReader(new 
int[] { 1, 2, 3 },
+        new int[][] { { 1, 1 }, { 2, 2 }, { 3, 3 } });
+
+    SortedTestInput sInput1 = new SortedTestInput(kvsReader1);
+    SortedTestInput sInput2 = new SortedTestInput(kvsReader2);
+    SortedTestInput sInput3 = new SortedTestInput(kvsReader3);
+
+    List<Input> sInputs = new LinkedList<Input>();
+    sInputs.add(sInput1);
+    sInputs.add(sInput2);
+    sInputs.add(sInput3);
+    MergedInputContext mockContext = createMergedInputContext();
+    ConcatenatedMergedKeyValuesInput input = new 
ConcatenatedMergedKeyValuesInput(mockContext, sInputs);
+
+    KeyValuesReader kvsReader = input.getReader();
+    int keyCount = 0;
+    while (kvsReader.next()) {
+      keyCount++;
+      Integer key = (Integer) kvsReader.getCurrentKey();
+      Iterator<Object> valuesIter = kvsReader.getCurrentValues().iterator();
+      int valCount = 0;
+      while (valuesIter.hasNext()) {
+        valCount++;
+        Integer val = (Integer) valuesIter.next();
+      }
+      assertEquals(2, valCount);
+    }
+    assertEquals(9, keyCount);
+    // one for each reader change and one to exit
+    verify(mockContext, times(4)).notifyProgress();
+
+    getNextFromFinishedReader(kvsReader);
+  }
+
+  private void getNextFromFinishedReader(KeyValuesReader kvsReader) {
+    //Try reading again and it should throw IOException
+    try {
+      boolean hasNext = kvsReader.next();
+      fail();
+    } catch (IOException e) {
+      assertTrue(e.getMessage().contains("For usage, please refer to"));
+    }
+  }
+
+  private void getNextFromFinishedReader(KeyValueReader kvReader) {
+    //Try reading again and it should throw IOException
+    try {
+      boolean hasNext = kvReader.next();
+      fail();
+    } catch (IOException e) {
+      assertTrue(e.getMessage().contains("For usage, please refer to"));
+    }
+  }
+
+  private static class SortedTestInput extends RssOrderedGroupedKVInput {
+
+    final SortedTestKeyValuesReader reader;
+
+    SortedTestInput(SortedTestKeyValuesReader reader) {
+      super(null, 0);
+      this.reader = reader;
+    }
+
+    @Override
+    public List<Event> initialize() throws IOException {
+      return null;
+    }
+
+    @Override
+    public void start() throws IOException {
+    }
+
+    @Override
+    public KeyValuesReader getReader() throws IOException {
+      return reader;
+    }
+
+    @Override
+    public void handleEvents(List<Event> inputEvents) {
+    }
+
+    @Override
+    public List<Event> close() throws IOException {
+      return null;
+    }
+
+    @SuppressWarnings("rawtypes")
+    public RawComparator getInputKeyComparator() {
+      return new RawComparatorForTest();
+    }
+  }
+
+  private static class SortedTestKeyValuesReader extends KeyValuesReader {
+
+    final int[] keys;
+    final int[][] values;
+    int currentIndex = -1;
+
+    SortedTestKeyValuesReader(int[] keys, int[][] vals) {
+      this.keys = keys;
+      this.values = vals;
+    }
+
+    @Override
+    public boolean next() throws IOException {
+      hasCompletedProcessing();
+      currentIndex++;
+      if (keys == null || currentIndex >= keys.length) {
+        completedProcessing = true;
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    public Object getCurrentKey() throws IOException {
+      return keys[currentIndex];
+    }
+
+    @Override
+    public Iterable<Object> getCurrentValues() throws IOException {
+      List<Object> ints = new LinkedList<Object>();
+      for (int i = 0; i < values[currentIndex].length; i++) {
+        ints.add(Integer.valueOf(values[currentIndex][i]));
+      }
+      return ints;
+    }
+  }
+
+  private static class DummyInput implements Input {
+    DummyKeyValueReader reader;
+
+    DummyInput(int records) {
+      reader = new DummyKeyValueReader(records);
+    }
+
+    @Override
+    public void start() throws Exception {
+    }
+
+    @Override
+    public Reader getReader() throws Exception {
+      return reader;
+    }
+  }
+
+  private static class DummyKeyValueReader extends KeyValueReader {
+    private int records;
+
+    DummyKeyValueReader(int records) {
+      this.records = records;
+    }
+
+    @Override
+    public boolean next() throws IOException {
+      return (records-- > 0);
+    }
+
+    @Override
+    public Object getCurrentKey() throws IOException {
+      return records;
+    }
+
+    @Override
+    public Object getCurrentValue() throws IOException {
+      return records;
+    }
+  }
+
+
+  private static class RawComparatorForTest implements RawComparator<Integer> {
+
+    @Override
+    public int compare(Integer o1, Integer o2) {
+      return o1 - o2;
+    }
+
+    @Override
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      throw new UnsupportedOperationException();
+    }
+  }
+}
+

Reply via email to