This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 60d9b21f [#854][FOLLOWUP] feat(tez): Add Rss Input Class to begin Tez
input task (#949)
60d9b21f is described below
commit 60d9b21fe189bf3970186babbb10c883562dbc18
Author: Qing <[email protected]>
AuthorDate: Sun Jun 25 16:52:37 2023 +0800
[#854][FOLLOWUP] feat(tez): Add Rss Input Class to begin Tez input task
(#949)
### What changes were proposed in this pull request?
Add Rss Input Class to begin Tez input task
### Why are the changes needed?
Fix: https://github.com/apache/incubator-uniffle/issues/854
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
unit test
---
.../input/RssConcatenatedMergedKeyValueInput.java | 121 ++++++
.../input/RssConcatenatedMergedKeyValuesInput.java | 123 ++++++
.../input/RssOrderedGroupedInputLegacy.java | 96 +++++
.../library/input/RssOrderedGroupedKVInput.java | 414 +++++++++++++++++++++
.../input/RssOrderedGroupedMergedKVInput.java | 265 +++++++++++++
.../runtime/library/input/RssUnorderedKVInput.java | 318 ++++++++++++++++
.../input/RssOrderedGroupedKVInputTest.java | 133 +++++++
.../input/RssSortedGroupedMergedInputTest.java | 270 ++++++++++++++
8 files changed, 1740 insertions(+)
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssConcatenatedMergedKeyValueInput.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssConcatenatedMergedKeyValueInput.java
new file mode 100644
index 00000000..4a165088
--- /dev/null
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssConcatenatedMergedKeyValueInput.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.input;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.tez.dag.api.GroupInputEdge;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.MergedInputContext;
+import org.apache.tez.runtime.api.MergedLogicalInput;
+import org.apache.tez.runtime.api.ProgressFailedException;
+import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+/**
+ * Implements a {@link MergedLogicalInput} that merges the incoming inputs
+ * (e.g. from a {@link GroupInputEdge} and provide a unified view of the
+ * input. It concatenates all the inputs to provide a unified view
+ */
+@Public
+public class RssConcatenatedMergedKeyValueInput extends MergedLogicalInput {
+ private ConcatenatedMergedKeyValueReader concatenatedMergedKeyValueReader;
+
+ public RssConcatenatedMergedKeyValueInput(MergedInputContext context,
+ List<Input> inputs) {
+ super(context, inputs);
+ }
+
+ public class ConcatenatedMergedKeyValueReader extends KeyValueReader {
+ private int currentReaderIndex = 0;
+ private KeyValueReader currentReader;
+
+ @Override
+ public boolean next() throws IOException {
+ while ((currentReader == null) || !currentReader.next()) {
+ if (currentReaderIndex == getInputs().size()) {
+ hasCompletedProcessing();
+ completedProcessing = true;
+ getContext().notifyProgress();
+ return false;
+ }
+ try {
+ Reader reader = getInputs().get(currentReaderIndex).getReader();
+ if (!(reader instanceof KeyValueReader)) {
+ throw new TezUncheckedException("Expected KeyValueReader. "
+ + "Got: " + reader.getClass().getName());
+ }
+ currentReader = (KeyValueReader) reader;
+ currentReaderIndex++;
+ getContext().notifyProgress();
+ } catch (Exception e) {
+ // An InterruptedException is not expected here since this works off
of
+ // underlying readers which take care of throwing
IOInterruptedExceptions
+ if (e instanceof IOException) {
+ throw (IOException) e;
+ } else {
+ throw new IOException(e);
+ }
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public Object getCurrentKey() throws IOException {
+ return currentReader.getCurrentKey();
+ }
+
+ @Override
+ public Object getCurrentValue() throws IOException {
+ return currentReader.getCurrentValue();
+ }
+
+ public float getProgress() throws IOException, InterruptedException {
+ return (1.0f) * (currentReaderIndex + 1) / getInputs().size();
+ }
+ }
+
+ /**
+ * Provides a {@link KeyValueReader} that iterates over the
+ * concatenated input data
+ */
+ @Override
+ public KeyValueReader getReader() throws Exception {
+ concatenatedMergedKeyValueReader = new ConcatenatedMergedKeyValueReader();
+ return concatenatedMergedKeyValueReader;
+ }
+
+ @Override
+ public void setConstituentInputIsReady(Input input) {
+ informInputReady();
+ }
+
+ @Override
+ public float getProgress() throws ProgressFailedException,
InterruptedException {
+ try {
+ return concatenatedMergedKeyValueReader.getProgress();
+ } catch (IOException e) {
+ throw new ProgressFailedException("getProgress encountered IOException
", e);
+ }
+ }
+}
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssConcatenatedMergedKeyValuesInput.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssConcatenatedMergedKeyValuesInput.java
new file mode 100644
index 00000000..b9ab19c8
--- /dev/null
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssConcatenatedMergedKeyValuesInput.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.input;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.tez.dag.api.GroupInputEdge;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.MergedInputContext;
+import org.apache.tez.runtime.api.MergedLogicalInput;
+import org.apache.tez.runtime.api.ProgressFailedException;
+import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+
+/**
+ * Implements a {@link MergedLogicalInput} that merges the incoming inputs
+ * (e.g. from a {@link GroupInputEdge} and provide a unified view of the
+ * input. It concatenates all the inputs to provide a unified view
+ */
+
+@Public
+public class RssConcatenatedMergedKeyValuesInput extends MergedLogicalInput {
+
+ private ConcatenatedMergedKeyValuesReader concatenatedMergedKeyValuesReader;
+
+ public RssConcatenatedMergedKeyValuesInput(MergedInputContext context,
+ List<Input> inputs) {
+ super(context, inputs);
+ }
+
+ public class ConcatenatedMergedKeyValuesReader extends KeyValuesReader {
+ private int currentReaderIndex = 0;
+ private KeyValuesReader currentReader;
+
+ @Override
+ public boolean next() throws IOException {
+ while ((currentReader == null) || !currentReader.next()) {
+ if (currentReaderIndex == getInputs().size()) {
+ hasCompletedProcessing();
+ completedProcessing = true;
+ getContext().notifyProgress();
+ return false;
+ }
+ try {
+ Reader reader = getInputs().get(currentReaderIndex).getReader();
+ if (!(reader instanceof KeyValuesReader)) {
+ throw new TezUncheckedException("Expected KeyValuesReader. "
+ + "Got: " + reader.getClass().getName());
+ }
+ currentReader = (KeyValuesReader) reader;
+ currentReaderIndex++;
+ getContext().notifyProgress();
+ } catch (Exception e) {
+ // An InterruptedException is not expected here since this works off
of
+ // underlying readers which take care of throwing
IOInterruptedExceptions
+ if (e instanceof IOException) {
+ throw (IOException)e;
+ } else {
+ throw new IOException(e);
+ }
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public Object getCurrentKey() throws IOException {
+ return currentReader.getCurrentKey();
+ }
+
+ @Override
+ public Iterable<Object> getCurrentValues() throws IOException {
+ return currentReader.getCurrentValues();
+ }
+
+ public float getProgress() throws IOException, InterruptedException {
+ return (1.0f) * (currentReaderIndex + 1) / getInputs().size();
+ }
+ }
+
+ /**
+ * Provides a {@link KeyValuesReader} that iterates over the
+ * concatenated input data
+ */
+ @Override
+ public KeyValuesReader getReader() throws Exception {
+ concatenatedMergedKeyValuesReader = new
ConcatenatedMergedKeyValuesReader();
+ return concatenatedMergedKeyValuesReader;
+ }
+
+ @Override
+ public void setConstituentInputIsReady(Input input) {
+ informInputReady();
+ }
+
+ @Override
+ public float getProgress() throws ProgressFailedException,
InterruptedException {
+ try {
+ return concatenatedMergedKeyValuesReader.getProgress();
+ } catch (IOException e) {
+ throw new ProgressFailedException("getProgress encountered IOException
", e);
+ }
+ }
+}
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssOrderedGroupedInputLegacy.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssOrderedGroupedInputLegacy.java
new file mode 100644
index 00000000..40d599fe
--- /dev/null
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssOrderedGroupedInputLegacy.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.Progress;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+
+import org.apache.uniffle.common.exception.RssException;
+
+@Private
+public class RssOrderedGroupedInputLegacy extends RssOrderedGroupedKVInput {
+ private final Progress progress = new Progress();
+
+ public RssOrderedGroupedInputLegacy(InputContext inputContext, int
numPhysicalInputs) {
+ super(inputContext, numPhysicalInputs);
+ }
+
+ /**
+ *
+ * @return
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws TezException
+ */
+ @Private
+ public TezRawKeyValueIterator getIterator() throws IOException,
InterruptedException, TezException {
+ // wait for input so that iterator is available
+ synchronized (this) {
+ if (getNumPhysicalInputs() == 0) {
+ return new TezRawKeyValueIterator() {
+ @Override
+ public DataInputBuffer getKey() throws IOException {
+ throw new RssException("No data available in Input");
+ }
+
+ @Override
+ public DataInputBuffer getValue() throws IOException {
+ throw new RssException("No data available in Input");
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public Progress getProgress() {
+ progress.complete();
+ return progress;
+ }
+
+ @Override
+ public boolean isSameKey() throws IOException {
+ throw new UnsupportedOperationException("isSameKey is not
supported");
+ }
+ };
+ }
+ }
+
+ waitForInputReady();
+ synchronized (this) {
+ return rawIter;
+ }
+ }
+}
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssOrderedGroupedKVInput.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssOrderedGroupedKVInput.java
new file mode 100644
index 00000000..d18c6c48
--- /dev/null
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssOrderedGroupedKVInput.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.input;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.api.ProgressFailedException;
+import org.apache.tez.runtime.library.api.IOInterruptedException;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
+import org.apache.tez.runtime.library.common.ValuesIterator;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.RssShuffle;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.exception.RssException;
+
+/**
+ * {@link RssOrderedGroupedKVInput} in a {@link AbstractLogicalInput} which
shuffles
+ * intermediate sorted data, merges them and provides key/<values> to the
+ * consumer. This is typically used to bring one partition of a set of
partitioned
+ * distributed data to one consumer. The shuffle operation brings all
partitions
+ * to one place. These partitions are assumed to be sorted and are merged
sorted to
+ * merge them into a single input view.
+ *
+ * The Copy and Merge will be triggered by the initialization - which is
handled
+ * by the Tez framework. Input is not consumable until the Copy and Merge are
+ * complete. Methods are provided to check for this, as well as to wait for
+ * completion. Attempting to get a reader on a non-complete input will block.
+ */
+@Public
+public class RssOrderedGroupedKVInput extends AbstractLogicalInput {
+
+ static final Logger LOG =
LoggerFactory.getLogger(RssOrderedGroupedKVInput.class);
+
+ protected TezRawKeyValueIterator rawIter = null;
+ protected Configuration conf;
+ protected RssShuffle shuffle;
+ protected MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
+ private final BlockingQueue<Event> pendingEvents = new
LinkedBlockingQueue<>();
+ private long firstEventReceivedTime = -1;
+ @SuppressWarnings("rawtypes")
+ protected ValuesIterator vIter;
+
+ private TezCounter inputKeyCounter;
+ private TezCounter inputValueCounter;
+ private TezCounter shuffledInputs;
+
+ private final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+ public RssOrderedGroupedKVInput(InputContext inputContext, int
numPhysicalInputs) {
+ super(inputContext, numPhysicalInputs);
+ }
+
+ @Override
+ public synchronized List<Event> initialize() throws IOException {
+ this.conf =
TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+
+ if (this.getNumPhysicalInputs() == 0) {
+ getContext().requestInitialMemory(0L, null);
+ isStarted.set(true);
+ getContext().inputIsReady();
+ LOG.info("input fetch not required since there are 0 physical inputs for
input vertex: "
+ + getContext().getSourceVertexName());
+ return Collections.emptyList();
+ }
+
+ long initialMemoryRequest = RssShuffle.getInitialMemoryRequirement(conf,
+ getContext().getTotalMemoryAvailableToTask());
+ this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
+ getContext().requestInitialMemory(initialMemoryRequest,
memoryUpdateCallbackHandler);
+
+ this.inputKeyCounter =
getContext().getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
+ this.inputValueCounter =
getContext().getCounters().findCounter(TaskCounter.REDUCE_INPUT_RECORDS);
+ this.shuffledInputs =
getContext().getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
+ this.conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS,
getContext().getWorkDirs());
+ return Collections.emptyList();
+ }
+
+ @Override
+ public synchronized void start() throws IOException {
+ if (!isStarted.get()) {
+ memoryUpdateCallbackHandler.validateUpdateReceived();
+ // Start the shuffle - copy and merge
+ shuffle = createRssShuffle();
+ shuffle.run();
+ List<Event> pending = new LinkedList<>();
+ pendingEvents.drainTo(pending);
+ if (pending.size() > 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("NoAutoStart delay in processing first event: "
+ + (System.currentTimeMillis() - firstEventReceivedTime));
+ }
+ shuffle.handleEvents(pending);
+ }
+ isStarted.set(true);
+ }
+ }
+
+ @VisibleForTesting
+ RssShuffle createRssShuffle() throws IOException {
+ return new RssShuffle(getContext(), conf, getNumPhysicalInputs(),
memoryUpdateCallbackHandler.getMemoryAssigned());
+ }
+
+ /**
+ * Check if the input is ready for consumption
+ *
+ * @return true if the input is ready for consumption, or if an error
occurred
+ * processing fetching the input. false if the shuffle and merge are
+ * still in progress
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ public synchronized boolean isInputReady() throws IOException,
InterruptedException, TezException {
+ Preconditions.checkState(isStarted.get(), "Must start input before
invoking this method");
+ if (getNumPhysicalInputs() == 0) {
+ return true;
+ }
+ return shuffle.isInputReady();
+ }
+
+ /**
+ * Waits for the input to become ready for consumption
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public void waitForInputReady() throws IOException, InterruptedException,
TezException {
+ // Cannot synchronize entire method since this is called form user code
and can block.
+ RssShuffle localShuffleCopy = null;
+ synchronized (this) {
+ Preconditions.checkState(isStarted.get(), "Must start input before
invoking this method");
+ if (getNumPhysicalInputs() == 0) {
+ return;
+ }
+ localShuffleCopy = shuffle;
+ }
+
+ TezRawKeyValueIterator localRawIter = localShuffleCopy.waitForInput();
+ synchronized (this) {
+ rawIter = localRawIter;
+ createValuesIterator();
+ }
+ }
+
+ @Override
+ public synchronized List<Event> close() throws IOException {
+ if (this.getNumPhysicalInputs() != 0 && rawIter != null) {
+ rawIter.close();
+ }
+ if (shuffle != null) {
+ shuffle.shutdown();
+ }
+
+ long dataSize = getContext().getCounters()
+ .findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED).getValue();
+ getContext().getStatisticsReporter().reportDataSize(dataSize);
+ long inputRecords = getContext().getCounters()
+ .findCounter(TaskCounter.REDUCE_INPUT_RECORDS).getValue();
+ getContext().getStatisticsReporter().reportItemsProcessed(inputRecords);
+
+ return Collections.emptyList();
+ }
+
+ /**
+ * Get a KVReader for the Input.</p> This method will block until the input
is
+ * ready - i.e. the copy and merge stages are complete. Users can use the
+ * isInputReady method to check if the input is ready, which gives an
+ * indication of whether this method will block or not.
+ *
+ * NOTE: All values for the current K-V pair must be read prior to invoking
+ * moveToNext. Once moveToNext() is called, the valueIterator from the
+ * previous K-V pair will throw an Exception
+ *
+ * @return a KVReader over the sorted input.
+ * @throws {@link IOInterruptedException} if IO was performing a blocking
operation and was interrupted
+ */
+ @Override
+ public KeyValuesReader getReader() throws IOException, TezException {
+ // Cannot synchronize entire method since this is called form user code
and can block.
+ TezRawKeyValueIterator rawIterLocal;
+ synchronized (this) {
+ rawIterLocal = rawIter;
+ if (getNumPhysicalInputs() == 0) {
+ return new KeyValuesReader() {
+ @Override
+ public boolean next() throws IOException {
+ getContext().notifyProgress();
+ hasCompletedProcessing();
+ completedProcessing = true;
+ return false;
+ }
+
+ @Override
+ public Object getCurrentKey() throws IOException {
+ throw new RssException("No data available in Input");
+ }
+
+ @Override
+ public Iterable<Object> getCurrentValues() throws IOException {
+ throw new RssException("No data available in Input");
+ }
+ };
+ }
+ }
+ if (rawIterLocal == null) {
+ try {
+ waitForInputReady();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOInterruptedException("Interrupted while waiting for input
ready", e);
+ }
+ }
+ @SuppressWarnings("rawtypes")
+ ValuesIterator valuesIter = null;
+ synchronized (this) {
+ valuesIter = vIter;
+ }
+ return new OrderedGroupedKeyValuesReader(valuesIter, getContext());
+ }
+
+ // This could modify to progress of partition, but does not influence the
task execution.
+ @Override
+ public float getProgress() throws ProgressFailedException,
InterruptedException {
+ int totalInputs = getNumPhysicalInputs();
+ if (totalInputs != 0) {
+ synchronized (this) {
+ return ((0.5f) * this.shuffledInputs.getValue() / totalInputs)
+ + ((rawIter != null) ? ((0.5f) *
rawIter.getProgress().getProgress()) : 0.0f);
+ }
+ } else {
+ return 0.0f;
+ }
+ }
+
+ @Override
+ public void handleEvents(List<Event> inputEvents) throws IOException {
+ RssShuffle shuffleLocalRef;
+ synchronized (this) {
+ if (getNumPhysicalInputs() == 0) {
+ throw new RssException("No input events expected as numInputs is 0");
+ }
+ if (!isStarted.get()) {
+ if (firstEventReceivedTime == -1) {
+ firstEventReceivedTime = System.currentTimeMillis();
+ }
+ pendingEvents.addAll(inputEvents);
+ return;
+ }
+ shuffleLocalRef = shuffle;
+ }
+ shuffleLocalRef.handleEvents(inputEvents);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ protected synchronized void createValuesIterator()
+ throws IOException {
+ // Not used by ReduceProcessor
+ RawComparator rawComparator =
ConfigUtils.getIntermediateInputKeyComparator(conf);
+ Class<?> keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
+ Class<?> valClass = ConfigUtils.getIntermediateInputValueClass(conf);
+ LOG.info(getContext().getSourceVertexName() + ": " + "creating
ValuesIterator with "
+ + "comparator=" + rawComparator.getClass().getName()
+ + ", keyClass=" + keyClass.getName()
+ + ", valClass=" + valClass.getName());
+ vIter = new ValuesIterator(rawIter, rawComparator, keyClass, valClass,
+ conf, inputKeyCounter, inputValueCounter);
+ }
+
+ @SuppressWarnings("rawtypes")
+ public RawComparator getInputKeyComparator() {
+ return (RawComparator) ConfigUtils.getIntermediateInputKeyComparator(conf);
+ }
+
+ @SuppressWarnings("rawtypes")
+ private static class OrderedGroupedKeyValuesReader extends KeyValuesReader {
+
+ private final ValuesIterator valuesIter;
+ private final InputContext context;
+
+ OrderedGroupedKeyValuesReader(ValuesIterator valuesIter, InputContext
context) {
+ this.valuesIter = valuesIter;
+ this.context = context;
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ context.notifyProgress();
+ return valuesIter.moveToNext();
+ }
+
+ @Override
+ public Object getCurrentKey() throws IOException {
+ Object key = valuesIter.getKey();
+ return key;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Iterable<Object> getCurrentValues() throws IOException {
+ return valuesIter.getValues();
+ }
+ }
+
+
+ private static final Set<String> CONF_KEYS = new HashSet<String>();
+
+ static {
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES);
+
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT);
+
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE);
+
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT);
+
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED);
+
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL);
+
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM);
+
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT);
+
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT);
+
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM);
+ CONF_KEYS.add(TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT);
+ CONF_KEYS.add(TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION);
+ CONF_KEYS.add(TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST);
+ CONF_KEYS.add(TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION);
+ CONF_KEYS.add(TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION);
+ CONF_KEYS.add(TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION);
+ CONF_KEYS.add(TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION);
+
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL);
+
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC);
+
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH);
+
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
+ CONF_KEYS.add(TezConfiguration.TEZ_COUNTERS_MAX);
+ CONF_KEYS.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH);
+ CONF_KEYS.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);
+ CONF_KEYS.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS);
+
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
+ CONF_KEYS.add(Constants.TEZ_RUNTIME_TASK_MEMORY);
+ CONF_KEYS.add(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID);
+ }
+
+ // Maybe add helper methods to extract keys
+ // Maybe add constants or an Enum to access the keys
+
+ @InterfaceAudience.Private
+ public static Set<String> getConfigurationKeySet() {
+ return Collections.unmodifiableSet(CONF_KEYS);
+ }
+}
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssOrderedGroupedMergedKVInput.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssOrderedGroupedMergedKVInput.java
new file mode 100644
index 00000000..d03a8cdb
--- /dev/null
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssOrderedGroupedMergedKVInput.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.input;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.MergedInputContext;
+import org.apache.tez.runtime.api.MergedLogicalInput;
+import org.apache.tez.runtime.api.ProgressFailedException;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.exception.RssException;
+
+/**
+ * A {@link MergedLogicalInput} which merges multiple
+ * {@link OrderedGroupedKVInput}s and returns a single view of these by merging
+ * values which belong to the same key.
+ *
+ * Combiners and Secondary Sort are not implemented, so there is no guarantee
on
+ * the order of values.
+ */
+@Public
+public class RssOrderedGroupedMergedKVInput extends MergedLogicalInput {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RssOrderedGroupedMergedKVInput.class);
+ private final Set<Input> completedInputs = Collections
+ .newSetFromMap(new IdentityHashMap<Input, Boolean>());
+
+ public RssOrderedGroupedMergedKVInput(MergedInputContext context,
List<Input> inputs) {
+ super(context, inputs);
+ }
+
+ /**
+ * Provides an ordered {@link KeyValuesReader}
+ */
+ @Override
+ public KeyValuesReader getReader() throws Exception {
+ return new OrderedGroupedMergedKeyValuesReader(getInputs(), getContext());
+ }
+
+ @Override
+ public void setConstituentInputIsReady(Input input) {
+ synchronized (completedInputs) {
+ completedInputs.add(input);
+ if (completedInputs.size() == getInputs().size()) {
+ informInputReady();
+ }
+ }
+ }
+
+ private static class OrderedGroupedMergedKeyValuesReader extends
KeyValuesReader {
+ private final PriorityQueue<KeyValuesReader> pQueue;
+ @SuppressWarnings("rawtypes")
+ private final RawComparator keyComparator;
+ private final List<KeyValuesReader> finishedReaders;
+ private final ValuesIterable currentValues;
+ private KeyValuesReader nextKVReader;
+ private Object currentKey;
+ private final MergedInputContext context;
+
+ OrderedGroupedMergedKeyValuesReader(List<Input> inputs, MergedInputContext
context)
+ throws Exception {
+ keyComparator = ((OrderedGroupedKVInput) inputs.get(0))
+ .getInputKeyComparator();
+ pQueue = new PriorityQueue<KeyValuesReader>(inputs.size(),
+ new KVReaderComparator(keyComparator));
+ finishedReaders = new ArrayList<KeyValuesReader>(inputs.size());
+ for (Input input : inputs) {
+ KeyValuesReader reader = (KeyValuesReader) input.getReader();
+ if (reader.next()) {
+ pQueue.add(reader);
+ }
+ }
+ currentValues = new ValuesIterable();
+ this.context = context;
+ }
+
+ private void advanceAndAddToQueue(KeyValuesReader kvsReadr)
+ throws IOException {
+ if (kvsReadr.next()) {
+ pQueue.add(kvsReadr);
+ }
+ }
+
+ private void addToQueue(KeyValuesReader kvsReadr) throws IOException {
+ if (kvsReadr != null) {
+ pQueue.add(kvsReadr);
+ }
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ // Skip values of current key if not consumed by the user
+ currentValues.discardCurrent();
+
+ for (KeyValuesReader reader : finishedReaders) {
+ // add them back to queue
+ advanceAndAddToQueue(reader);
+ }
+ finishedReaders.clear();
+
+ nextKVReader = pQueue.poll();
+ context.notifyProgress();
+ if (nextKVReader != null) {
+ currentKey = nextKVReader.getCurrentKey();
+ currentValues.moveToNext();
+ return true;
+ } else {
+ hasCompletedProcessing();
+ completedProcessing = true;
+ }
+ return false;
+ }
+
+ @Override
+ public Object getCurrentKey() throws IOException {
+ return currentKey;
+ }
+
+ @Override
+ public Iterable<Object> getCurrentValues() throws IOException {
+ return currentValues;
+ }
+
+ private class ValuesIterable implements Iterable<Object> {
+ private ValuesIterator iterator = new ValuesIterator();
+
+ @Override
+ public Iterator<Object> iterator() {
+ return iterator;
+ }
+
+ public void discardCurrent() throws IOException {
+ iterator.discardCurrent();
+ }
+
+ public void moveToNext() throws IOException {
+ iterator.moveToNext();
+ }
+
+ }
+
+ @SuppressWarnings("unchecked")
+ private class ValuesIterator implements Iterator<Object> {
+
+ private Iterator<Object> currentValuesIter;
+
+ public void moveToNext() throws IOException {
+ currentValuesIter = nextKVReader.getCurrentValues().iterator();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (currentValuesIter != null) { // No current key. next needs to be
called.
+ if (currentValuesIter.hasNext()) {
+ return true;
+ } else {
+ finishedReaders.add(nextKVReader);
+ nextKVReader = pQueue.poll();
+ try {
+ if (nextKVReader != null
+ && keyComparator.compare(currentKey,
nextKVReader.getCurrentKey()) == 0) {
+ currentValuesIter = nextKVReader.getCurrentValues().iterator();
+ return true;
+ } else { // key changed or no more data.
+ // Add the reader back to queue
+ addToQueue(nextKVReader);
+ currentValuesIter = null;
+ return false;
+ }
+ } catch (IOException e) {
+ throw new RssException(e);
+ }
+ }
+ } else {
+ return false;
+ }
+ }
+
+ public void discardCurrent() throws IOException {
+ if (currentValuesIter != null) {
+ do {
+ finishedReaders.add(nextKVReader);
+ nextKVReader = pQueue.poll();
+ } while (nextKVReader != null
+ && keyComparator.compare(currentKey,
nextKVReader.getCurrentKey()) == 0);
+ addToQueue(nextKVReader);
+ currentValuesIter = null;
+ }
+ }
+
+ @Override
+ public Object next() {
+ return currentValuesIter.next();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ /**
+ * Comparator that compares KeyValuesReader on their current key
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private static class KVReaderComparator implements
+ Comparator<KeyValuesReader> {
+
+ private RawComparator keyComparator;
+
+ KVReaderComparator(RawComparator keyComparator) {
+ this.keyComparator = keyComparator;
+ }
+
+ @Override
+ public int compare(KeyValuesReader o1, KeyValuesReader o2) {
+ try {
+ return keyComparator.compare(o1.getCurrentKey(), o2.getCurrentKey());
+ } catch (IOException e) {
+ LOG.error("Caught exception while comparing keys in shuffle input",
e);
+ throw new RssException(e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public float getProgress() throws ProgressFailedException,
InterruptedException {
+ float totalProgress = 0.0f;
+ for (Input input : getInputs()) {
+ totalProgress += ((OrderedGroupedKVInput)input).getProgress();
+ }
+ return (1.0f) * totalProgress / getInputs().size();
+ }
+}
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssUnorderedKVInput.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssUnorderedKVInput.java
new file mode 100644
index 00000000..c1ac3119
--- /dev/null
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/input/RssUnorderedKVInput.java
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.input;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.api.ProgressFailedException;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
+import org.apache.tez.runtime.library.common.readers.UnorderedKVReader;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.impl.RssShuffleManager;
+import
org.apache.tez.runtime.library.common.shuffle.impl.RssSimpleFetchedInputAllocator;
+import
org.apache.tez.runtime.library.common.shuffle.impl.ShuffleInputEventHandlerImpl;
+import
org.apache.tez.runtime.library.common.shuffle.impl.SimpleFetchedInputAllocator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.exception.RssException;
+
+/**
+ * {@link RssUnorderedKVInput} provides unordered key value input by
+ * bringing together (shuffling) a set of distributed data and providing a
+ * unified view to that data. There are no ordering constraints applied by
+ * this input.
+ */
+@Public
+public class RssUnorderedKVInput extends AbstractLogicalInput {
+ private static final Logger LOG =
LoggerFactory.getLogger(RssUnorderedKVInput.class);
+
+ private Configuration conf;
+ private RssShuffleManager rssShuffleManager;
+ private final BlockingQueue<Event> pendingEvents = new
LinkedBlockingQueue<>();
+ private long firstEventReceivedTime = -1;
+ private MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
+ @SuppressWarnings("rawtypes")
+ private UnorderedKVReader kvReader;
+
+ private final AtomicBoolean isStarted = new AtomicBoolean(false);
+ private TezCounter inputRecordCounter;
+
+ private SimpleFetchedInputAllocator inputManager;
+ private ShuffleEventHandler inputEventHandler;
+
+ public RssUnorderedKVInput(InputContext inputContext, int numPhysicalInputs)
{
+ super(inputContext, numPhysicalInputs);
+ }
+
+ @Override
+ public synchronized List<Event> initialize() throws Exception {
+ LOG.info("RssUnorderedKVInput initializeļ¼ num of physicalInputs:{}",
getNumPhysicalInputs());
+ Preconditions.checkArgument(getNumPhysicalInputs() != -1, "Number of
Inputs has not been set");
+ this.conf =
TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+
+ if (getNumPhysicalInputs() == 0) {
+ getContext().requestInitialMemory(0L, null);
+ isStarted.set(true);
+ getContext().inputIsReady();
+ LOG.info("input fetch not required since there are 0 physical inputs for
input vertex: "
+ + getContext().getSourceVertexName());
+ return Collections.emptyList();
+ } else {
+ long initialMemReq = getInitialMemoryReq();
+ memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
+ this.getContext().requestInitialMemory(initialMemReq,
memoryUpdateCallbackHandler);
+ }
+
+ this.conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS,
getContext().getWorkDirs());
+ this.inputRecordCounter = getContext().getCounters().findCounter(
+ TaskCounter.INPUT_RECORDS_PROCESSED);
+ return Collections.emptyList();
+ }
+
+ @Override
+ public synchronized void start() throws IOException {
+ if (!isStarted.get()) {
+ ////// Initial configuration
+ memoryUpdateCallbackHandler.validateUpdateReceived();
+ CompressionCodec codec;
+ if (ConfigUtils.isIntermediateInputCompressed(conf)) {
+ Class<? extends CompressionCodec> codecClass = ConfigUtils
+ .getIntermediateInputCompressorClass(conf, DefaultCodec.class);
+ codec = ReflectionUtils.newInstance(codecClass, conf);
+ } else {
+ codec = null;
+ }
+
+ boolean ifileReadAhead =
conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
+ TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
+ int ifileReadAheadLength = 0;
+ int ifileBufferSize = 0;
+
+ if (ifileReadAhead) {
+ ifileReadAheadLength =
conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
+ TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
+ }
+ ifileBufferSize = conf.getInt("io.file.buffer.size",
+ TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
+
+ LOG.info("RssUnorderedKVInput, totalMemoryAvailable:{}, available
memory:{}",
+ getContext().getTotalMemoryAvailableToTask(),
memoryUpdateCallbackHandler.getMemoryAssigned());
+
+ boolean compositeFetch = ShuffleUtils.isTezShuffleHandler(conf);
+
+ this.inputManager = new RssSimpleFetchedInputAllocator(
+ TezUtilsInternal.cleanVertexName(getContext().getSourceVertexName()),
+ getContext().getUniqueIdentifier(),
+ getContext().getDagIdentifier(), conf,
+ getContext().getTotalMemoryAvailableToTask(),
+ memoryUpdateCallbackHandler.getMemoryAssigned());
+
+ this.rssShuffleManager = new RssShuffleManager(getContext(), conf,
getNumPhysicalInputs(), ifileBufferSize,
+ ifileReadAhead, ifileReadAheadLength, codec, inputManager);
+
+ this.inputEventHandler = new ShuffleInputEventHandlerImpl(getContext(),
rssShuffleManager,
+ inputManager, codec, ifileReadAhead, ifileReadAheadLength,
compositeFetch);
+
+ ////// End of Initial configuration
+
+ this.rssShuffleManager.run();
+ this.kvReader = createReader(inputRecordCounter, codec,
+ ifileBufferSize, ifileReadAhead, ifileReadAheadLength);
+ List<Event> pending = new LinkedList<>();
+ pendingEvents.drainTo(pending);
+ if (pending.size() > 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getContext().getSourceVertexName() + ": " + "NoAutoStart
delay in processing first event: "
+ + (System.currentTimeMillis() - firstEventReceivedTime));
+ }
+ inputEventHandler.handleEvents(pending);
+ }
+ isStarted.set(true);
+ }
+ }
+
+ @Override
+ public synchronized KeyValueReader getReader() throws Exception {
+ Preconditions.checkState(isStarted.get(), "Must start input before
invoking this method");
+ if (getNumPhysicalInputs() == 0) {
+ return new KeyValueReader() {
+ @Override
+ public boolean next() throws IOException {
+ getContext().notifyProgress();
+ hasCompletedProcessing();
+ completedProcessing = true;
+ return false;
+ }
+
+ @Override
+ public Object getCurrentKey() throws IOException {
+ throw new RssException("No data available in Input");
+ }
+
+ @Override
+ public Object getCurrentValue() throws IOException {
+ throw new RssException("No data available in Input");
+ }
+ };
+ }
+ return this.kvReader;
+ }
+
+ @Override
+ public void handleEvents(List<Event> inputEvents) throws IOException {
+ ShuffleEventHandler inputEventHandlerLocalRef;
+ synchronized (this) {
+ if (getNumPhysicalInputs() == 0) {
+ throw new RssException("No input events expected as numInputs is 0");
+ }
+ if (!isStarted.get()) {
+ if (firstEventReceivedTime == -1) {
+ firstEventReceivedTime = System.currentTimeMillis();
+ }
+ // This queue will keep growing if the Processor decides never to
+ // start the event. The Input, however has no idea, on whether start
+ // will be invoked or not.
+ pendingEvents.addAll(inputEvents);
+ return;
+ }
+ inputEventHandlerLocalRef = inputEventHandler;
+ }
+ inputEventHandlerLocalRef.handleEvents(inputEvents);
+ }
+
+ @Override
+ public synchronized List<Event> close() throws Exception {
+ if (this.inputEventHandler != null) {
+ this.inputEventHandler.logProgress(true);
+ }
+
+ if (this.rssShuffleManager != null) {
+ this.rssShuffleManager.shutdown();
+ }
+
+ long dataSize = getContext().getCounters()
+ .findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED).getValue();
+ getContext().getStatisticsReporter().reportDataSize(dataSize);
+ long inputRecords = getContext().getCounters()
+ .findCounter(TaskCounter.INPUT_RECORDS_PROCESSED).getValue();
+ getContext().getStatisticsReporter().reportItemsProcessed(inputRecords);
+ return null;
+ }
+
+ private long getInitialMemoryReq() {
+ return SimpleFetchedInputAllocator.getInitialMemoryReq(conf,
+ getContext().getTotalMemoryAvailableToTask());
+ }
+
+
+ @SuppressWarnings("rawtypes")
+ private UnorderedKVReader createReader(TezCounter inputRecordCounter,
CompressionCodec codec,
+ int ifileBufferSize, boolean
ifileReadAheadEnabled, int ifileReadAheadLength)
+ throws IOException {
+ return new UnorderedKVReader(rssShuffleManager, conf, null,
ifileReadAheadEnabled,
+ 0, ifileBufferSize, inputRecordCounter, getContext());
+ }
+
+ private static final Set<String> CONF_KEYS = new HashSet<>();
+
+ static {
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES);
+
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT);
+
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE);
+
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT);
+
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED);
+
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL);
+
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM);
+
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT);
+
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT);
+
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL);
+
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH);
+ CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH);
+
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
+ CONF_KEYS.add(TezConfiguration.TEZ_COUNTERS_MAX);
+ CONF_KEYS.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH);
+ CONF_KEYS.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);
+ CONF_KEYS.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS);
+
CONF_KEYS.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
+ CONF_KEYS.add(Constants.TEZ_RUNTIME_TASK_MEMORY);
+ CONF_KEYS.add(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID);
+ }
+
+ // Maybe add helper methods to extract keys
+ // Maybe add constants or an Enum to access the keys
+
+ @InterfaceAudience.Private
+ public static Set<String> getConfigurationKeySet() {
+ return Collections.unmodifiableSet(CONF_KEYS);
+ }
+
+ @Override
+ public float getProgress() throws ProgressFailedException,
InterruptedException {
+ try {
+ return kvReader.getProgress();
+ } catch (IOException e) {
+ throw new ProgressFailedException("getProgress encountered IOException
", e);
+ }
+ }
+}
diff --git
a/client-tez/src/test/java/org/apache/tez/runtime/library/input/RssOrderedGroupedKVInputTest.java
b/client-tez/src/test/java/org/apache/tez/runtime/library/input/RssOrderedGroupedKVInputTest.java
new file mode 100644
index 00000000..1502c95d
--- /dev/null
+++
b/client-tez/src/test/java/org/apache/tez/runtime/library/input/RssOrderedGroupedKVInputTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.input;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.IdUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.api.ExecutionContext;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
+import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.RssShuffle;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.Shuffle;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class RssOrderedGroupedKVInputTest {
+
+ @Test
+ @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
+ public void testInterruptWhileAwaitingInput() throws IOException {
+ try (MockedStatic<IdUtils> idUtils = Mockito.mockStatic(IdUtils.class)) {
+ ApplicationId appId = ApplicationId.newInstance(9999, 72);
+ ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
+ idUtils.when(IdUtils::getApplicationAttemptId).thenReturn(appAttemptId);
+ try (MockedStatic<ShuffleUtils> shuffleUtils =
Mockito.mockStatic(ShuffleUtils.class)) {
+ shuffleUtils.when(() ->
ShuffleUtils.deserializeShuffleProviderMetaData(any())).thenReturn(4);
+ InputContext inputContext = createMockInputContext();
+ RssOrderedGroupedKVInput kvInput = new
OrderedGroupedKVInputForTest(inputContext, 10);
+ kvInput.initialize();
+ kvInput.start();
+ try {
+ kvInput.getReader();
+ fail("getReader should not return since underlying inputs are not
ready");
+ } catch (Exception e) {
+ assertTrue(e instanceof RssShuffle.RssShuffleError);
+ }
+ }
+ }
+ }
+
+ private InputContext createMockInputContext() throws IOException {
+ InputContext inputContext = mock(InputContext.class);
+ doReturn("Map 1").when(inputContext).getSourceVertexName();
+ doReturn("Reducer 1").when(inputContext).getTaskVertexName();
+
when(inputContext.getUniqueIdentifier()).thenReturn("attempt_1685094627632_0157_1_01_000000_0_10006");
+
+ ApplicationId applicationId = ApplicationId.newInstance(1, 1);
+ doReturn(applicationId).when(inputContext).getApplicationId();
+
+ ExecutionContext executionContext = new ExecutionContextImpl("localhost");
+ doReturn(executionContext).when(inputContext).getExecutionContext();
+
+ Configuration conf = new TezConfiguration();
+ UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
+ String[] workingDirs = new String[]{"workDir1"};
+ TezCounters counters = new TezCounters();
+
+ doReturn(payLoad).when(inputContext).getUserPayload();
+ doReturn(workingDirs).when(inputContext).getWorkDirs();
+ doReturn(200 * 1024 *
1024L).when(inputContext).getTotalMemoryAvailableToTask();
+ doReturn(counters).when(inputContext).getCounters();
+
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ Object[] args = invocation.getArguments();
+
+ if (args[1] instanceof MemoryUpdateCallbackHandler) {
+ MemoryUpdateCallbackHandler memUpdateCallbackHandler =
(MemoryUpdateCallbackHandler) args[1];
+ memUpdateCallbackHandler.memoryAssigned(200 * 1024 * 1024);
+ } else {
+ fail();
+ }
+ return null;
+ }
+ }).when(inputContext).requestInitialMemory(any(long.class),
+ any(MemoryUpdateCallbackHandler.class));
+ return inputContext;
+ }
+
+ static class OrderedGroupedKVInputForTest extends RssOrderedGroupedKVInput {
+ OrderedGroupedKVInputForTest(InputContext inputContext, int
numPhysicalInputs) {
+ super(inputContext, numPhysicalInputs);
+ }
+
+ Shuffle createShuffle() throws IOException {
+ Shuffle shuffle = mock(Shuffle.class);
+ try {
+ doThrow(new InterruptedException()).when(shuffle).waitForInput();
+ } catch (InterruptedException e) {
+ fail();
+ } catch (Exception e) {
+ fail();
+ }
+ return shuffle;
+ }
+ }
+}
diff --git
a/client-tez/src/test/java/org/apache/tez/runtime/library/input/RssSortedGroupedMergedInputTest.java
b/client-tez/src/test/java/org/apache/tez/runtime/library/input/RssSortedGroupedMergedInputTest.java
new file mode 100644
index 00000000..365166d3
--- /dev/null
+++
b/client-tez/src/test/java/org/apache/tez/runtime/library/input/RssSortedGroupedMergedInputTest.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.input;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.MergedInputContext;
+import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class RssSortedGroupedMergedInputTest {
+ MergedInputContext createMergedInputContext() {
+ return mock(MergedInputContext.class);
+ }
+
+ @Test
+ @Timeout(value = 5000, unit = TimeUnit.MILLISECONDS)
+ public void testSimpleConcatenatedMergedKeyValueInput() throws Exception {
+
+ DummyInput sInput1 = new DummyInput(10);
+ DummyInput sInput2 = new DummyInput(10);
+ DummyInput sInput3 = new DummyInput(10);
+
+ List<Input> sInputs = new LinkedList<Input>();
+ sInputs.add(sInput1);
+ sInputs.add(sInput2);
+ sInputs.add(sInput3);
+ MergedInputContext mockContext = createMergedInputContext();
+ ConcatenatedMergedKeyValueInput input = new
ConcatenatedMergedKeyValueInput(mockContext, sInputs);
+
+ KeyValueReader kvReader = input.getReader();
+ int keyCount = 0;
+ while (kvReader.next()) {
+ keyCount++;
+ Integer key = (Integer) kvReader.getCurrentKey();
+ Integer value = (Integer) kvReader.getCurrentValue();
+ }
+ assertTrue(keyCount == 30);
+ // one for each reader change and one to exit
+ verify(mockContext, times(4)).notifyProgress();
+
+ getNextFromFinishedReader(kvReader);
+ }
+
+ @Test
+ @Timeout(value = 5000, unit = TimeUnit.MILLISECONDS)
+ public void testSimpleConcatenatedMergedKeyValuesInput() throws Exception {
+ SortedTestKeyValuesReader kvsReader1 = new SortedTestKeyValuesReader(new
int[] { 1, 2, 3 },
+ new int[][] { { 1, 1 }, { 2, 2 }, { 3, 3 } });
+
+ SortedTestKeyValuesReader kvsReader2 = new SortedTestKeyValuesReader(new
int[] { 1, 2, 3 },
+ new int[][] { { 1, 1 }, { 2, 2 }, { 3, 3 } });
+
+ SortedTestKeyValuesReader kvsReader3 = new SortedTestKeyValuesReader(new
int[] { 1, 2, 3 },
+ new int[][] { { 1, 1 }, { 2, 2 }, { 3, 3 } });
+
+ SortedTestInput sInput1 = new SortedTestInput(kvsReader1);
+ SortedTestInput sInput2 = new SortedTestInput(kvsReader2);
+ SortedTestInput sInput3 = new SortedTestInput(kvsReader3);
+
+ List<Input> sInputs = new LinkedList<Input>();
+ sInputs.add(sInput1);
+ sInputs.add(sInput2);
+ sInputs.add(sInput3);
+ MergedInputContext mockContext = createMergedInputContext();
+ ConcatenatedMergedKeyValuesInput input = new
ConcatenatedMergedKeyValuesInput(mockContext, sInputs);
+
+ KeyValuesReader kvsReader = input.getReader();
+ int keyCount = 0;
+ while (kvsReader.next()) {
+ keyCount++;
+ Integer key = (Integer) kvsReader.getCurrentKey();
+ Iterator<Object> valuesIter = kvsReader.getCurrentValues().iterator();
+ int valCount = 0;
+ while (valuesIter.hasNext()) {
+ valCount++;
+ Integer val = (Integer) valuesIter.next();
+ }
+ assertEquals(2, valCount);
+ }
+ assertEquals(9, keyCount);
+ // one for each reader change and one to exit
+ verify(mockContext, times(4)).notifyProgress();
+
+ getNextFromFinishedReader(kvsReader);
+ }
+
+ private void getNextFromFinishedReader(KeyValuesReader kvsReader) {
+ //Try reading again and it should throw IOException
+ try {
+ boolean hasNext = kvsReader.next();
+ fail();
+ } catch (IOException e) {
+ assertTrue(e.getMessage().contains("For usage, please refer to"));
+ }
+ }
+
+ private void getNextFromFinishedReader(KeyValueReader kvReader) {
+ //Try reading again and it should throw IOException
+ try {
+ boolean hasNext = kvReader.next();
+ fail();
+ } catch (IOException e) {
+ assertTrue(e.getMessage().contains("For usage, please refer to"));
+ }
+ }
+
+ private static class SortedTestInput extends RssOrderedGroupedKVInput {
+
+ final SortedTestKeyValuesReader reader;
+
+ SortedTestInput(SortedTestKeyValuesReader reader) {
+ super(null, 0);
+ this.reader = reader;
+ }
+
+ @Override
+ public List<Event> initialize() throws IOException {
+ return null;
+ }
+
+ @Override
+ public void start() throws IOException {
+ }
+
+ @Override
+ public KeyValuesReader getReader() throws IOException {
+ return reader;
+ }
+
+ @Override
+ public void handleEvents(List<Event> inputEvents) {
+ }
+
+ @Override
+ public List<Event> close() throws IOException {
+ return null;
+ }
+
+ @SuppressWarnings("rawtypes")
+ public RawComparator getInputKeyComparator() {
+ return new RawComparatorForTest();
+ }
+ }
+
+ private static class SortedTestKeyValuesReader extends KeyValuesReader {
+
+ final int[] keys;
+ final int[][] values;
+ int currentIndex = -1;
+
+ SortedTestKeyValuesReader(int[] keys, int[][] vals) {
+ this.keys = keys;
+ this.values = vals;
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ hasCompletedProcessing();
+ currentIndex++;
+ if (keys == null || currentIndex >= keys.length) {
+ completedProcessing = true;
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public Object getCurrentKey() throws IOException {
+ return keys[currentIndex];
+ }
+
+ @Override
+ public Iterable<Object> getCurrentValues() throws IOException {
+ List<Object> ints = new LinkedList<Object>();
+ for (int i = 0; i < values[currentIndex].length; i++) {
+ ints.add(Integer.valueOf(values[currentIndex][i]));
+ }
+ return ints;
+ }
+ }
+
+ private static class DummyInput implements Input {
+ DummyKeyValueReader reader;
+
+ DummyInput(int records) {
+ reader = new DummyKeyValueReader(records);
+ }
+
+ @Override
+ public void start() throws Exception {
+ }
+
+ @Override
+ public Reader getReader() throws Exception {
+ return reader;
+ }
+ }
+
+ private static class DummyKeyValueReader extends KeyValueReader {
+ private int records;
+
+ DummyKeyValueReader(int records) {
+ this.records = records;
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ return (records-- > 0);
+ }
+
+ @Override
+ public Object getCurrentKey() throws IOException {
+ return records;
+ }
+
+ @Override
+ public Object getCurrentValue() throws IOException {
+ return records;
+ }
+ }
+
+
+ private static class RawComparatorForTest implements RawComparator<Integer> {
+
+ @Override
+ public int compare(Integer o1, Integer o2) {
+ return o1 - o2;
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
+