This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 8948df17f [CELEBORN-1733] Support ordered grouped kv input for Tez
8948df17f is described below
commit 8948df17f9d52a5b4b61cb0a5ab8c69ae020d09a
Author: hongguangwei <[email protected]>
AuthorDate: Fri Dec 6 10:54:44 2024 +0800
[CELEBORN-1733] Support ordered grouped kv input for Tez
### What changes were proposed in this pull request?
1. Add CelebornOrderedGroupedKVInput and CelebornOrderedGroupedInputLegacy
2. CelebornScheduler extends ShuffleScheduler to reduce redundant code
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #2972 from GH-Gloway/1733.
Authored-by: hongguangwei <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../celeborn/tez/plugin/util/ChecksumUtils.java | 41 +++
.../shuffle/orderedgrouped/CelebornScheduler.java | 364 +++++++++++++++++++++
.../shuffle/orderedgrouped/CelebornShuffle.java | 87 +++++
.../orderedgrouped/CelebornTezBypassWriter.java | 85 +++++
.../CelebornTezShuffleDataFetcher.java | 248 ++++++++++++++
.../input/CelebornOrderedGroupedInputLegacy.java | 84 +++++
.../input/CelebornOrderedGroupedKVInput.java | 354 ++++++++++++++++++++
7 files changed, 1263 insertions(+)
diff --git
a/client-tez/tez/src/main/java/org/apache/celeborn/tez/plugin/util/ChecksumUtils.java
b/client-tez/tez/src/main/java/org/apache/celeborn/tez/plugin/util/ChecksumUtils.java
new file mode 100644
index 000000000..d1a9dff6a
--- /dev/null
+++
b/client-tez/tez/src/main/java/org/apache/celeborn/tez/plugin/util/ChecksumUtils.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.tez.plugin.util;
+
+import java.util.zip.CRC32;
+
+public class ChecksumUtils {
+
+ private static final int LENGTH_PER_CRC = 4 * 1024;
+
+ public static long getCrc32(byte[] buf) {
+ return getCrc32(buf, 0, buf.length);
+ }
+
+ public static long getCrc32(byte[] buf, int offset, int length) {
+ CRC32 crc32 = new CRC32();
+
+ for (int i = 0; i < length; ) {
+ int len = Math.min(LENGTH_PER_CRC, length - i);
+ crc32.update(buf, i + offset, len);
+ i += len;
+ }
+
+ return crc32.getValue();
+ }
+}
diff --git
a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/CelebornScheduler.java
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/CelebornScheduler.java
new file mode 100644
index 000000000..d2dd3fa90
--- /dev/null
+++
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/CelebornScheduler.java
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software
distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import static
org.apache.celeborn.tez.plugin.util.CelebornTezUtils.getParentPrivateField;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.tez.common.CallableWithNdc;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.CelebornTezReader;
+import org.apache.celeborn.client.ShuffleClient;
+
+class CelebornScheduler extends ShuffleScheduler {
+ private static final Logger LOG =
LoggerFactory.getLogger(ShuffleScheduler.class);
+
+ private final AtomicBoolean isShutdown;
+ private final Random random = new Random(System.currentTimeMillis());
+
+ private final String srcNameTrimmed;
+
+ private final int numFetchers;
+ private final ListeningExecutorService fetcherExecutor;
+ private final Set<CelebornTezShuffleDataFetcher> celebornRunningFetchers =
+ Collections.newSetFromMap(new
ConcurrentHashMap<CelebornTezShuffleDataFetcher, Boolean>());
+ private volatile Thread shuffleSchedulerThread = null;
+ private final MergeManager mergeManager;
+ private final InputContext inputContext;
+ private final ExceptionReporter exceptionReporter;
+ private final int numInputs;
+
+ // celeborn
+ private final int shuffleId;
+ private final ApplicationAttemptId applicationAttemptId;
+ private final ShuffleClient shuffleClient;
+ private final Map<Integer, MapHost> runningRssPartitionMap = new HashMap<>();
+
+ private final Set<Integer> successRssPartitionSet =
Sets.newConcurrentHashSet();
+ private final Set<Integer> allRssPartition = Sets.newConcurrentHashSet();
+
+ private final Map<Integer, Set<InputAttemptIdentifier>>
partitionIdToSuccessMapTaskAttempts =
+ new HashMap<>();
+ final Map<Integer, Set<TezTaskID>> partitionIdToSuccessTezTasks = new
HashMap<>();
+ private final TezCounter skippedInputCounter;
+
+ public CelebornScheduler(
+ InputContext inputContext,
+ Configuration conf,
+ int numberOfInputs,
+ ExceptionReporter exceptionReporter,
+ ShuffleClient shuffleClient,
+ MergeManager mergeManager,
+ FetchedInputAllocatorOrderedGrouped allocator,
+ long startTime,
+ CompressionCodec codec,
+ boolean ifileReadAhead,
+ int ifileReadAheadLength,
+ String srcNameTrimmed,
+ int shuffleId,
+ ApplicationAttemptId applicationAttemptId)
+ throws IOException {
+ super(
+ inputContext,
+ conf,
+ numberOfInputs,
+ exceptionReporter,
+ mergeManager,
+ allocator,
+ startTime,
+ codec,
+ ifileReadAhead,
+ ifileReadAheadLength,
+ srcNameTrimmed);
+ this.inputContext = inputContext;
+ this.exceptionReporter = exceptionReporter;
+ this.srcNameTrimmed = srcNameTrimmed;
+ this.shuffleClient = shuffleClient;
+ this.shuffleId = shuffleId;
+ this.applicationAttemptId = applicationAttemptId;
+ this.mergeManager = mergeManager;
+ this.numInputs = numberOfInputs;
+ this.numFetchers = (int) getParentPrivateField(this, "numFetchers");
+ this.fetcherExecutor =
+ (ListeningExecutorService) getParentPrivateField(this,
"fetcherExecutor");
+ this.isShutdown = (AtomicBoolean) getParentPrivateField(this,
"isShutdown");
+ this.skippedInputCounter = (TezCounter) getParentPrivateField(this,
"skippedInputCounter");
+ }
+
+ public void start() throws Exception {
+ shuffleSchedulerThread = Thread.currentThread();
+ mergeManager.setupParentThread(shuffleSchedulerThread);
+ CelebornShuffleSchedulerCallable schedulerCallable = new
CelebornShuffleSchedulerCallable();
+ schedulerCallable.call();
+ }
+
+ private boolean allInputTaskAttemptDone() {
+ return (this.partitionIdToSuccessTezTasks.values().stream().mapToInt(s ->
s.size()).sum()
+ + skippedInputCounter.getValue())
+ == numInputs;
+ }
+
+ private boolean isAllInputFetched() {
+ return allInputTaskAttemptDone() && (successRssPartitionSet.size() >=
allRssPartition.size());
+ }
+
+ public synchronized void addKnownMapOutput(
+ String inputHostName, int port, int partitionId,
CompositeInputAttemptIdentifier srcAttempt) {
+
+ allRssPartition.add(partitionId);
+ Set<InputAttemptIdentifier> inputAttemptIdentifiers =
+ partitionIdToSuccessMapTaskAttempts.computeIfAbsent(partitionId, id ->
new HashSet<>());
+ String pathComponent = srcAttempt.getPathComponent();
+ TezTaskAttemptID tezTaskAttemptId =
+ TezTaskAttemptID.fromString(pathComponent.substring(0,
pathComponent.length() - 6));
+ partitionIdToSuccessTezTasks.putIfAbsent(partitionId, new HashSet<>());
+
partitionIdToSuccessTezTasks.get(partitionId).add(tezTaskAttemptId.getTaskID());
+
+ inputAttemptIdentifiers.add(srcAttempt);
+ super.addKnownMapOutput(inputHostName, port, partitionId, srcAttempt);
+ }
+
+ public synchronized MapHost getHost() throws InterruptedException {
+ while (pendingHosts.isEmpty() && !isAllInputFetched()) {
+ LOG.debug("PendingHosts={}", pendingHosts);
+ waitAndNotifyProgress();
+ }
+
+ if (!pendingHosts.isEmpty()) {
+
+ MapHost host = null;
+ Iterator<MapHost> iter = pendingHosts.iterator();
+ int numToPick = random.nextInt(pendingHosts.size());
+ for (int i = 0; i <= numToPick; ++i) {
+ host = iter.next();
+ }
+
+ pendingHosts.remove(host);
+ host.markBusy();
+ return host;
+ } else {
+ return null;
+ }
+ }
+
+ private class CelebornShuffleSchedulerCallable extends CallableWithNdc<Void>
{
+
+ @Override
+ protected Void callInternal() throws InterruptedException {
+ while (!isShutdown.get() && !isAllInputFetched()) {
+ synchronized (CelebornScheduler.this) {
+ while (!allInputTaskAttemptDone()
+ || ((celebornRunningFetchers.size() >= numFetchers ||
pendingHosts.isEmpty())
+ && !isAllInputFetched())) {
+ try {
+ waitAndNotifyProgress();
+ } catch (InterruptedException e) {
+ if (isShutdown.get()) {
+ LOG.info(
+ srcNameTrimmed
+ + ": "
+ + "Interrupted while waiting for fetchers to complete "
+ + "and hasBeenShutdown. Breaking out of
ShuffleSchedulerCallable loop");
+ Thread.currentThread().interrupt();
+ break;
+ } else {
+ throw e;
+ }
+ }
+ }
+ }
+
+ // Ensure there's memory available before scheduling the next Fetcher.
+ try {
+ // If merge is on, block
+ mergeManager.waitForInMemoryMerge();
+ // In case usedMemory > memorylimit, wait until some memory is
released
+ mergeManager.waitForShuffleToMergeMemory();
+ } catch (InterruptedException e) {
+ if (isShutdown.get()) {
+ LOG.info(
+ srcNameTrimmed
+ + ": "
+ + "Interrupted while waiting for merge to complete and
hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop");
+ Thread.currentThread().interrupt();
+ break;
+ } else {
+ throw e;
+ }
+ }
+
+ if (!isShutdown.get() && !isAllInputFetched()) {
+ synchronized (CelebornScheduler.this) {
+ int numFetchersToRun = numFetchers -
celebornRunningFetchers.size();
+ int count = 0;
+ while (count < numFetchersToRun && !isShutdown.get() &&
!isAllInputFetched()) {
+ MapHost mapHost;
+ try {
+ mapHost = getHost(); // Leads to a wait.
+ } catch (InterruptedException e) {
+ if (isShutdown.get()) {
+ LOG.info(
+ srcNameTrimmed
+ + ": "
+ + "Interrupted while waiting for host and
hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop");
+ Thread.currentThread().interrupt();
+ break;
+ } else {
+ throw e;
+ }
+ }
+ if (mapHost == null) {
+ break; // Check for the exit condition.
+ }
+ LOG.debug("{}: Processing pending host: {}", srcNameTrimmed,
mapHost);
+ if (!isShutdown.get()) {
+ count++;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ srcNameTrimmed + ": " + "Scheduling fetch for inputHost:
{}",
+ mapHost.getHostIdentifier() + ":" +
mapHost.getPartitionId());
+ }
+
+ if (isFirstRssPartitionFetch(mapHost)) {
+ CelebornTezShuffleDataFetcher celebornTezShuffleDataFetcher =
+ constructCelebornFetcherForPartition(mapHost);
+
+ celebornRunningFetchers.add(celebornTezShuffleDataFetcher);
+ ListenableFuture<Void> future =
+ fetcherExecutor.submit(celebornTezShuffleDataFetcher);
+ Futures.addCallback(
+ future,
+ new FetchFutureCallback(celebornTezShuffleDataFetcher),
+ MoreExecutors.directExecutor());
+ } else {
+ for (int i = 0; i < mapHost.getAndClearKnownMaps().size();
i++) {
+ remainingMaps.decrementAndGet();
+ }
+ LOG.info(
+ "Partition was fetched, remainingMaps desc, now
value:{}",
+ remainingMaps.get());
+ }
+ }
+ }
+ }
+ }
+ }
+ LOG.info(
+ "Shutting down FetchScheduler for input: {}, wasInterrupted={}",
+ srcNameTrimmed,
+ Thread.currentThread().isInterrupted());
+ if (!fetcherExecutor.isShutdown()) {
+ fetcherExecutor.shutdownNow();
+ }
+ return null;
+ }
+
+ private synchronized boolean isFirstRssPartitionFetch(MapHost mapHost) {
+ Integer partitionId = mapHost.getPartitionId();
+ LOG.info(
+ "Check isFirstCelebornPartitionFetch, mapHost:{},partitionId:{}",
mapHost, partitionId);
+
+ if (runningRssPartitionMap.containsKey(partitionId)
+ || successRssPartitionSet.contains(partitionId)) {
+ return false;
+ }
+ runningRssPartitionMap.put(partitionId, mapHost);
+ return true;
+ }
+
+ private CelebornTezShuffleDataFetcher
constructCelebornFetcherForPartition(MapHost mapHost) {
+ int partitionId = mapHost.getPartitionId();
+ CelebornTezReader reader =
+ new CelebornTezReader(
+ shuffleClient, shuffleId, partitionId,
applicationAttemptId.getAttemptId());
+ return new CelebornTezShuffleDataFetcher(
+
partitionIdToSuccessMapTaskAttempts.get(mapHost.getPartitionId()).iterator().next(),
+ mapHost.getPartitionId(),
+ mergeManager,
+ inputContext.getCounters(),
+ reader,
+ exceptionReporter);
+ }
+ }
+
+ private synchronized void waitAndNotifyProgress() throws
InterruptedException {
+ inputContext.notifyProgress();
+ wait(1000);
+ }
+
+ private class FetchFutureCallback implements FutureCallback<Void> {
+
+ private final CelebornTezShuffleDataFetcher fetcherOrderedGrouped;
+ private final Integer partitionId;
+
+ public FetchFutureCallback(CelebornTezShuffleDataFetcher
fetcherOrderedGrouped) {
+ this.fetcherOrderedGrouped = fetcherOrderedGrouped;
+ this.partitionId = fetcherOrderedGrouped.getPartitionId();
+ }
+
+ private void doBookKeepingForFetcherComplete() {
+ synchronized (CelebornScheduler.this) {
+ celebornRunningFetchers.remove(fetcherOrderedGrouped);
+ CelebornScheduler.this.notifyAll();
+ }
+ }
+
+ @Override
+ public void onSuccess(Void result) {
+ fetcherOrderedGrouped.shutDown();
+ if (isShutdown.get()) {
+ LOG.info(srcNameTrimmed + ": " + "Already shutdown. Ignoring fetch
complete");
+ } else {
+ successRssPartitionSet.add(partitionId);
+ MapHost mapHost = runningRssPartitionMap.remove(partitionId);
+ if (mapHost != null) {
+ for (int i = 0; i < mapHost.getAndClearKnownMaps().size(); i++) {
+ remainingMaps.decrementAndGet();
+ }
+ }
+ doBookKeepingForFetcherComplete();
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ fetcherOrderedGrouped.shutDown();
+ if (isShutdown.get()) {
+ LOG.info(srcNameTrimmed + ": " + "Already shutdown. Ignoring fetch
complete");
+ } else {
+ LOG.error(srcNameTrimmed + ": " + "Fetcher failed with error", t);
+ exceptionReporter.reportException(t);
+ doBookKeepingForFetcherComplete();
+ }
+ }
+ }
+}
diff --git
a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/CelebornShuffle.java
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/CelebornShuffle.java
new file mode 100644
index 000000000..a1a15bf0f
--- /dev/null
+++
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/CelebornShuffle.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import static org.apache.celeborn.tez.plugin.util.CelebornTezUtils.*;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+
+import org.apache.celeborn.client.ShuffleClient;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.identity.UserIdentifier;
+import org.apache.celeborn.tez.plugin.util.CelebornTezUtils;
+
+public class CelebornShuffle extends Shuffle {
+ public CelebornShuffle(
+ InputContext inputContext,
+ Configuration conf,
+ int numInputs,
+ long initialMemoryAvailable,
+ ApplicationAttemptId applicationAttemptId)
+ throws IOException {
+ super(inputContext, conf, numInputs, initialMemoryAvailable);
+
+ String host = conf.get(TEZ_CELEBORN_LM_HOST);
+ int port = conf.getInt(TEZ_CELEBORN_LM_PORT, -1);
+ int shuffleId = conf.getInt(TEZ_SHUFFLE_ID, -1);
+ String appId = conf.get(TEZ_CELEBORN_APPLICATION_ID);
+ CelebornConf celebornConf = CelebornTezUtils.fromTezConfiguration(conf);
+ ShuffleClient shuffleClient =
+ ShuffleClient.get(
+ appId,
+ host,
+ port,
+ celebornConf,
+ new UserIdentifier(
+ celebornConf.quotaUserSpecificTenant(),
celebornConf.quotaUserSpecificUserName()),
+ null);
+
+ long startTime = (long) getParentPrivateField(this, "startTime");
+ CompressionCodec codec = (CompressionCodec) getParentPrivateField(this,
"codec");
+ boolean ifileReadAhead = (boolean) getParentPrivateField(this,
"ifileReadAhead");
+ int ifileReadAheadLength = (int) getParentPrivateField(this,
"ifileReadAheadLength");
+ String sourceDestNameTrimmed = (String) getParentPrivateField(this,
"sourceDestNameTrimmed");
+ CelebornScheduler scheduler =
+ new CelebornScheduler(
+ inputContext,
+ conf,
+ numInputs,
+ this,
+ shuffleClient,
+ merger,
+ merger,
+ startTime,
+ codec,
+ ifileReadAhead,
+ ifileReadAheadLength,
+ sourceDestNameTrimmed,
+ shuffleId,
+ applicationAttemptId);
+ ShuffleInputEventHandlerOrderedGrouped eventHandler =
+ new ShuffleInputEventHandlerOrderedGrouped(
+ inputContext, scheduler, ShuffleUtils.isTezShuffleHandler(conf));
+ setParentPrivateField(this, "scheduler", scheduler);
+ setParentPrivateField(this, "eventHandler", eventHandler);
+ }
+}
diff --git
a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/CelebornTezBypassWriter.java
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/CelebornTezBypassWriter.java
new file mode 100644
index 000000000..70dd77241
--- /dev/null
+++
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/CelebornTezBypassWriter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import com.google.common.primitives.Ints;
+import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
+import org.apache.tez.runtime.library.common.shuffle.MemoryFetchedInput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.exception.CelebornIOException;
+import org.apache.celeborn.tez.plugin.util.ChecksumUtils;
+
+// In Tez shuffle, MapOutput encapsulates the logic to fetch map task's output
data via http.
+// So, in Celeborn, we should bypass this logic, and directly write data to
MapOutput.
+public class CelebornTezBypassWriter {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CelebornTezBypassWriter.class);
+ private static final byte[] HEADER = new byte[] {(byte) 'T', (byte) 'I',
(byte) 'F', (byte) 0};
+
+ public static void write(MapOutput mapOutput, byte[] buffer) {
+ LOG.debug(
+ "CelebornTezBypassWriter write mapOutput, type:{}, buffer length:{}",
+ mapOutput.getType(),
+ buffer.length);
+ // Write and commit uncompressed data to MapOutput.
+ // In the majority of cases, merger allocates memory to accept data,
+ // but when data size exceeds the threshold, merger can also allocate disk.
+ // So, we should consider the two situations, respectively.
+ if (mapOutput.getType() == MapOutput.Type.MEMORY) {
+ byte[] memory = mapOutput.getMemory();
+ System.arraycopy(buffer, 0, memory, 0, buffer.length);
+ } else if (mapOutput.getType() == MapOutput.Type.DISK) {
+ throw new IllegalStateException(
+ "Celeborn map reduce client do not support OnDiskMapOutput. Try to
increase mapreduce.reduce.shuffle.memory.limit.percent");
+ } else {
+ throw new IllegalStateException(
+ "Merger reserve unknown type of MapOutput: " +
mapOutput.getClass().getCanonicalName());
+ }
+ }
+
+ public static void write(final FetchedInput fetchedInput, byte[] buffer)
throws IOException {
+ LOG.debug(
+ "CelebornTezBypassWriter write mapOutput, type:{}, buffer length:{}",
+ fetchedInput.getType(),
+ buffer.length);
+ // Write and commit uncompressed data to MapOutput.
+ // In the majority of cases, merger allocates memory to accept data,
+ // but when data size exceeds the threshold, merger can also allocate disk.
+ // So, we should consider the two situations, respectively.
+ if (fetchedInput.getType() == FetchedInput.Type.MEMORY) {
+ byte[] memory = ((MemoryFetchedInput) fetchedInput).getBytes();
+ System.arraycopy(buffer, 0, memory, 0, buffer.length);
+ } else if (fetchedInput.getType() == FetchedInput.Type.DISK) {
+ OutputStream output = fetchedInput.getOutputStream();
+ output.write(HEADER);
+ output.write(buffer);
+ output.write(Ints.toByteArray((int) ChecksumUtils.getCrc32(buffer)));
+ output.flush();
+ output.close();
+ } else {
+ throw new CelebornIOException(
+ "Merger reserve unknown type of MapOutput: "
+ + fetchedInput.getClass().getCanonicalName());
+ }
+ }
+}
diff --git
a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/CelebornTezShuffleDataFetcher.java
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/CelebornTezShuffleDataFetcher.java
new file mode 100644
index 000000000..bf8310c46
--- /dev/null
+++
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/CelebornTezShuffleDataFetcher.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.tez.common.CallableWithNdc;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.CelebornTezReader;
+import org.apache.celeborn.common.exception.CelebornIOException;
+
+public class CelebornTezShuffleDataFetcher extends CallableWithNdc<Void> {
+ private static final Logger LOG =
LoggerFactory.getLogger(CelebornTezShuffleDataFetcher.class);
+
+ private enum ShuffleErrors {
+ IO_ERROR,
+ WRONG_LENGTH,
+ BAD_ID,
+ WRONG_MAP,
+ CONNECTION,
+ WRONG_REDUCE
+ }
+
+ private static final String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
+
+ private final TezCounter ioErrs;
+ private final MergeManager merger;
+
+ private long copyBlockCount = 0;
+ private volatile boolean stopped = false;
+
+ private final CelebornTezReader celebornTezReader;
+ private long readTime = 0;
+ private long decompressTime = 0;
+ private long serializeTime = 0;
+ private long waitTime = 0;
+ private long copyTime = 0; // the sum of readTime + decompressTime +
serializeTime + waitTime
+ private final InputAttemptIdentifier inputAttemptIdentifier;
+ private static int uniqueMapId = 0;
+
+ private boolean hasPendingData = false;
+ private long startWait;
+ private int waitCount = 0;
+ private byte[] shuffleData = null;
+ private Integer partitionId;
+ private final ExceptionReporter exceptionReporter;
+
+ private final AtomicInteger issuedCnt = new AtomicInteger(0);
+
+ public CelebornTezShuffleDataFetcher(
+ InputAttemptIdentifier inputAttemptIdentifier,
+ Integer partitionId,
+ MergeManager merger,
+ TezCounters tezCounters,
+ CelebornTezReader celebornTezReader,
+ ExceptionReporter exceptionReporter) {
+ this.merger = merger;
+ this.partitionId = partitionId;
+ this.inputAttemptIdentifier = inputAttemptIdentifier;
+ this.exceptionReporter = exceptionReporter;
+ ioErrs = tezCounters.findCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.IO_ERROR.toString());
+ this.celebornTezReader = celebornTezReader;
+
+ LOG.info(
+ "CelebornTezShuffleDataFetcher, partitionId:{},
inputAttemptIdentifier:{}.",
+ this.partitionId,
+ this.inputAttemptIdentifier);
+ }
+
+ @Override
+ public Void callInternal() {
+ try {
+ celebornTezReader.init();
+ fetchAllRssBlocks();
+ } catch (InterruptedException ie) {
+ // might not be respected when fetcher is in progress / server is busy.
TEZ-711
+ // Set the status back
+ LOG.warn(ie.getMessage(), ie);
+ Thread.currentThread().interrupt();
+ return null;
+ } catch (Throwable t) {
+ LOG.warn(t.getMessage(), t);
+ exceptionReporter.reportException(t);
+ // Shuffle knows how to deal with failures post shutdown via the
onFailure hook
+ }
+ return null;
+ }
+
+ public void fetchAllRssBlocks() throws IOException, InterruptedException {
+ while (!stopped) {
+ try {
+ // If merge is on, block
+ merger.waitForInMemoryMerge();
+ // Do shuffle
+ copyFromRssServer();
+ } catch (Exception e) {
+ LOG.warn(e.getMessage(), e);
+ throw e;
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public void copyFromRssServer() throws IOException {
+ // fetch a block
+ if (!hasPendingData) {
+ final long startFetch = System.currentTimeMillis();
+ shuffleData = celebornTezReader.getShuffleBlock();
+ long fetchDuration = System.currentTimeMillis() - startFetch;
+ readTime += fetchDuration;
+ }
+
+ if (shuffleData != null) {
+ // start to merge
+ final long startSerialization = System.currentTimeMillis();
+ if (issueMapOutputMerge()) {
+ long serializationDuration = System.currentTimeMillis() -
startSerialization;
+ serializeTime += serializationDuration;
+ // if reserve successes, reset status for next fetch
+ if (hasPendingData) {
+ waitTime += System.currentTimeMillis() - startWait;
+ }
+ hasPendingData = false;
+ } else {
+ // if reserve fail, return and wait
+ startWait = System.currentTimeMillis();
+ return;
+ }
+
+ // update some status
+ copyBlockCount++;
+ copyTime = readTime + decompressTime + serializeTime + waitTime;
+ updateStatus();
+ } else {
+ // finish reading data, close related reader and check data consistent
+ celebornTezReader.close();
+ LOG.info(
+ "Reduce task "
+ + inputAttemptIdentifier
+ + " read block cnt: "
+ + copyBlockCount
+ + " cost "
+ + readTime
+ + " ms to fetch and "
+ + decompressTime
+ + " ms to decompress "
+ + serializeTime
+ + " ms to serialize and "
+ + waitTime
+ + " ms to wait resource"
+ + ", copy time:"
+ + copyTime);
+ stopFetch();
+ }
+ }
+
+ public Integer getPartitionId() {
+ return partitionId;
+ }
+
+ private boolean issueMapOutputMerge() throws IOException {
+ // Allocate a MapOutput (either in-memory or on-disk) to put uncompressed
block
+ // In Rss, a MapOutput is sent as multiple blocks, so the reducer needs to
+ // treat each "block" as a faked "mapout".
+ // To avoid name conflicts, we use getNextUniqueTaskAttemptID instead.
+ // It will generate a unique TaskAttemptID(increased_seq++, 0).
+ InputAttemptIdentifier uniqueInputAttemptIdentifier =
getNextUniqueInputAttemptIdentifier();
+ MapOutput mapOutput = null;
+ try {
+ issuedCnt.incrementAndGet();
+ mapOutput = merger.reserve(uniqueInputAttemptIdentifier,
shuffleData.length, 0, 1);
+ } catch (IOException ioe) {
+ // kill this reduce attempt
+ ioErrs.increment(1);
+ throw ioe;
+ }
+ // Check if we can shuffle *now* ...
+ if (mapOutput == null || mapOutput.getType() == MapOutput.Type.WAIT) {
+ LOG.info("RssMRFetcher - MergeManager returned status WAIT ...");
+ // Not an error but wait to process data.
+ // Use a retry flag to avoid re-fetch and re-uncompress.
+ hasPendingData = true;
+ waitCount++;
+ return false;
+ }
+
+ // write data to mapOutput
+ try {
+ CelebornTezBypassWriter.write(mapOutput, shuffleData);
+ // let the merger knows this block is ready for merging
+ mapOutput.commit();
+ } catch (Throwable t) {
+ ioErrs.increment(1);
+ mapOutput.abort();
+ throw new CelebornIOException(
+ "Reduce: "
+ + inputAttemptIdentifier
+ + " cannot write block to "
+ + mapOutput.getClass().getSimpleName()
+ + " due to: "
+ + t.getClass().getName());
+ }
+ return true;
+ }
+
+ private InputAttemptIdentifier getNextUniqueInputAttemptIdentifier() {
+ return new InputAttemptIdentifier(uniqueMapId++, 0);
+ }
+
+ private void updateStatus() {}
+
+ @VisibleForTesting
+ public int getRetryCount() {
+ return waitCount;
+ }
+
+ private void stopFetch() {
+ LOG.info("CelebornTezShuffleDataFetcher stop fetch");
+ stopped = true;
+ }
+
+ public void shutDown() {
+ stopFetch();
+ LOG.info("CelebornTezShuffleDataFetcher shutdown");
+ }
+}
diff --git
a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornOrderedGroupedInputLegacy.java
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornOrderedGroupedInputLegacy.java
new file mode 100644
index 000000000..f098a4284
--- /dev/null
+++
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornOrderedGroupedInputLegacy.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software
distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.Progress;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+
+@Private
+public class CelebornOrderedGroupedInputLegacy extends
CelebornOrderedGroupedKVInput {
+
+ private final Progress progress = new Progress();
+
+ public CelebornOrderedGroupedInputLegacy(InputContext inputContext, int
numPhysicalInputs) {
+ super(inputContext, numPhysicalInputs);
+ }
+
+ @Private
+ public TezRawKeyValueIterator getIterator()
+ throws IOException, InterruptedException, TezException {
+ // wait for input so that iterator is available
+ synchronized (this) {
+ if (getNumPhysicalInputs() == 0) {
+ return new TezRawKeyValueIterator() {
+ @Override
+ public DataInputBuffer getKey() throws IOException {
+ throw new RuntimeException("No data available in Input");
+ }
+
+ @Override
+ public DataInputBuffer getValue() throws IOException {
+ throw new RuntimeException("No data available in Input");
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {}
+
+ @Override
+ public Progress getProgress() {
+ progress.complete();
+ return progress;
+ }
+
+ @Override
+ public boolean isSameKey() throws IOException {
+ throw new UnsupportedOperationException("isSameKey is not
supported");
+ }
+ };
+ }
+ }
+
+ waitForInputReady();
+ synchronized (this) {
+ return rawIter;
+ }
+ }
+}
diff --git
a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornOrderedGroupedKVInput.java
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornOrderedGroupedKVInput.java
new file mode 100644
index 000000000..627d5f7a7
--- /dev/null
+++
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornOrderedGroupedKVInput.java
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software
distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.input;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.tez.common.Preconditions;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.api.ProgressFailedException;
+import org.apache.tez.runtime.library.api.IOInterruptedException;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
+import org.apache.tez.runtime.library.common.ValuesIterator;
+import
org.apache.tez.runtime.library.common.shuffle.orderedgrouped.CelebornShuffle;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.Shuffle;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link CelebornOrderedGroupedKVInput} in a {@link AbstractLogicalInput}
which shuffles
+ * intermediate sorted data, merges them and provides key/<values> to the
consumer. This is
+ * typically used to bring one partition of a set of partitioned distributed
data to one consumer.
+ * The shuffle operation brings all partitions to one place. These partitions
are assumed to be
+ * sorted and are merged sorted to merge them into a single input view.
+ *
+ * <p>The Copy and Merge will be triggered by the initialization - which is
handled by the Tez
+ * framework. Input is not consumable until the Copy and Merge are complete.
Methods are provided to
+ * check for this, as well as to wait for completion. Attempting to get a
reader on a non-complete
+ * input will block.
+ */
+@Public
+public class CelebornOrderedGroupedKVInput extends AbstractLogicalInput {
+
+ static final Logger LOG =
LoggerFactory.getLogger(CelebornOrderedGroupedKVInput.class);
+
+ protected TezRawKeyValueIterator rawIter = null;
+ protected Configuration conf;
+ protected Shuffle shuffle;
+ private ApplicationAttemptId applicationAttemptId;
+ protected MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
+ private final BlockingQueue<Event> pendingEvents = new
LinkedBlockingQueue<Event>();
+ private long firstEventReceivedTime = -1;
+
+ @SuppressWarnings("rawtypes")
+ protected ValuesIterator vIter;
+
+ private TezCounter inputKeyCounter;
+ private TezCounter inputValueCounter;
+ private TezCounter shuffledInputs;
+
+ private final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+ public CelebornOrderedGroupedKVInput(InputContext inputContext, int
numPhysicalInputs) {
+ super(inputContext, numPhysicalInputs);
+ }
+
+ @Override
+ public synchronized List<Event> initialize() throws IOException {
+ this.conf = TezUtils.createConfFromBaseConfAndPayload(getContext());
+
+ if (this.getNumPhysicalInputs() == 0) {
+ getContext().requestInitialMemory(0l, null);
+ isStarted.set(true);
+ getContext().inputIsReady();
+ LOG.info(
+ "input fetch not required since there are 0 physical inputs for
input vertex: "
+ + getContext().getInputOutputVertexNames());
+ return Collections.emptyList();
+ }
+
+ long initialMemoryRequest =
+ Shuffle.getInitialMemoryRequirement(conf,
getContext().getTotalMemoryAvailableToTask());
+ this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
+ getContext().requestInitialMemory(initialMemoryRequest,
memoryUpdateCallbackHandler);
+
+ this.inputKeyCounter =
getContext().getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
+ this.inputValueCounter =
+
getContext().getCounters().findCounter(TaskCounter.REDUCE_INPUT_RECORDS);
+ this.shuffledInputs =
getContext().getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
+ this.conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS,
getContext().getWorkDirs());
+ this.applicationAttemptId =
+ ApplicationAttemptId.newInstance(
+ getContext().getApplicationId(),
getContext().getDAGAttemptNumber());
+ return Collections.emptyList();
+ }
+
+ @Override
+ public synchronized void start() throws IOException {
+ if (!isStarted.get()) {
+ memoryUpdateCallbackHandler.validateUpdateReceived();
+ // Start the shuffle - copy and merge
+ shuffle = createShuffle();
+ shuffle.run();
+ LOG.debug("Initialized the handlers in shuffle..Safe to start
processing..");
+ List<Event> pending = new LinkedList<Event>();
+ pendingEvents.drainTo(pending);
+ if (pending.size() > 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "NoAutoStart delay in processing first event: "
+ + (System.currentTimeMillis() - firstEventReceivedTime));
+ }
+ shuffle.handleEvents(pending);
+ }
+ isStarted.set(true);
+ }
+ }
+
+ @VisibleForTesting
+ Shuffle createShuffle() throws IOException {
+ return new CelebornShuffle(
+ getContext(),
+ conf,
+ getNumPhysicalInputs(),
+ memoryUpdateCallbackHandler.getMemoryAssigned(),
+ applicationAttemptId);
+ }
+
+ /**
+ * Check if the input is ready for consumption
+ *
+ * @return true if the input is ready for consumption, or if an error
occurred processing fetching
+ * the input. false if the shuffle and merge are still in progress
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ public synchronized boolean isInputReady()
+ throws IOException, InterruptedException, TezException {
+ Preconditions.checkState(isStarted.get(), "Must start input before
invoking this method");
+ if (getNumPhysicalInputs() == 0) {
+ return true;
+ }
+ return shuffle.isInputReady();
+ }
+
+ /**
+ * Waits for the input to become ready for consumption
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public void waitForInputReady() throws IOException, InterruptedException,
TezException {
+ // Cannot synchronize entire method since this is called form user code
and can block.
+ Shuffle localShuffleCopy = null;
+ synchronized (this) {
+ Preconditions.checkState(isStarted.get(), "Must start input before
invoking this method");
+ if (getNumPhysicalInputs() == 0) {
+ return;
+ }
+ localShuffleCopy = shuffle;
+ }
+
+ TezRawKeyValueIterator localRawIter = localShuffleCopy.waitForInput();
+ synchronized (this) {
+ rawIter = localRawIter;
+ createValuesIterator();
+ }
+ }
+
+ @Override
+ public synchronized List<Event> close() throws IOException {
+ if (this.getNumPhysicalInputs() != 0 && rawIter != null) {
+ rawIter.close();
+ }
+ if (shuffle != null) {
+ shuffle.shutdown();
+ }
+
+ long dataSize =
+
getContext().getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED).getValue();
+ getContext().getStatisticsReporter().reportDataSize(dataSize);
+ long inputRecords =
+
getContext().getCounters().findCounter(TaskCounter.REDUCE_INPUT_RECORDS).getValue();
+ getContext().getStatisticsReporter().reportItemsProcessed(inputRecords);
+
+ return Collections.emptyList();
+ }
+
+ /**
+ * Get a KVReader for the Input. This method will block until the input is
ready - i.e. the copy
+ * and merge stages are complete. Users can use the isInputReady method to
check if the input is
+ * ready, which gives an indication of whether this method will block or not.
+ *
+ * <p>NOTE: All values for the current K-V pair must be read prior to
invoking moveToNext. Once
+ * moveToNext() is called, the valueIterator from the previous K-V pair will
throw an Exception
+ *
+ * @return a KVReader over the sorted input.
+ * @throws {@link IOInterruptedException} if IO was performing a blocking
operation and was
+ * interrupted
+ */
+ @Override
+ public KeyValuesReader getReader() throws IOException, TezException {
+ // Cannot synchronize entire method since this is called form user code
and can block.
+ TezRawKeyValueIterator rawIterLocal;
+ synchronized (this) {
+ rawIterLocal = rawIter;
+ if (getNumPhysicalInputs() == 0) {
+ return new KeyValuesReader() {
+ @Override
+ public boolean next() throws IOException {
+ getContext().notifyProgress();
+ hasCompletedProcessing();
+ completedProcessing = true;
+ return false;
+ }
+
+ @Override
+ public Object getCurrentKey() throws IOException {
+ throw new RuntimeException("No data available in Input");
+ }
+
+ @Override
+ public Iterable<Object> getCurrentValues() throws IOException {
+ throw new RuntimeException("No data available in Input");
+ }
+ };
+ }
+ }
+ if (rawIterLocal == null) {
+ try {
+ waitForInputReady();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOInterruptedException("Interrupted while waiting for input
ready", e);
+ }
+ }
+ @SuppressWarnings("rawtypes")
+ ValuesIterator valuesIter = null;
+ synchronized (this) {
+ valuesIter = vIter;
+ }
+ return new OrderedGroupedKeyValuesReader(valuesIter, getContext());
+ }
+
+ @Override
+ public float getProgress() throws ProgressFailedException,
InterruptedException {
+ int totalInputs = getNumPhysicalInputs();
+ if (totalInputs != 0) {
+ synchronized (this) {
+ return ((0.5f) * this.shuffledInputs.getValue() / totalInputs)
+ + ((rawIter != null) ? ((0.5f) *
rawIter.getProgress().getProgress()) : 0.0f);
+ }
+ } else {
+ return 0.0f;
+ }
+ }
+
+ @Override
+ public void handleEvents(List<Event> inputEvents) throws IOException {
+ Shuffle shuffleLocalRef;
+ synchronized (this) {
+ if (getNumPhysicalInputs() == 0) {
+ throw new RuntimeException("No input events expected as numInputs is
0");
+ }
+ if (!isStarted.get()) {
+ if (firstEventReceivedTime == -1) {
+ firstEventReceivedTime = System.currentTimeMillis();
+ }
+ pendingEvents.addAll(inputEvents);
+ return;
+ }
+ shuffleLocalRef = shuffle;
+ }
+ shuffleLocalRef.handleEvents(inputEvents);
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ protected synchronized void createValuesIterator() throws IOException {
+ // Not used by ReduceProcessor
+ RawComparator rawComparator =
ConfigUtils.getIntermediateInputKeyComparator(conf);
+ Class<?> keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
+ Class<?> valClass = ConfigUtils.getIntermediateInputValueClass(conf);
+ LOG.info(
+ getContext().getInputOutputVertexNames()
+ + ": "
+ + "creating ValuesIterator with "
+ + "comparator="
+ + rawComparator.getClass().getName()
+ + ", keyClass="
+ + keyClass.getName()
+ + ", valClass="
+ + valClass.getName());
+
+ vIter =
+ new ValuesIterator(
+ rawIter, rawComparator, keyClass, valClass, conf, inputKeyCounter,
inputValueCounter);
+ }
+
+ @SuppressWarnings("rawtypes")
+ public RawComparator getInputKeyComparator() {
+ return (RawComparator) ConfigUtils.getIntermediateInputKeyComparator(conf);
+ }
+
+ @SuppressWarnings("rawtypes")
+ private static class OrderedGroupedKeyValuesReader extends KeyValuesReader {
+
+ private final ValuesIterator valuesIter;
+ private final InputContext context;
+
+ OrderedGroupedKeyValuesReader(ValuesIterator valuesIter, InputContext
context) {
+ this.valuesIter = valuesIter;
+ this.context = context;
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ context.notifyProgress();
+ return valuesIter.moveToNext();
+ }
+
+ @Override
+ public Object getCurrentKey() throws IOException {
+ return valuesIter.getKey();
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Iterable<Object> getCurrentValues() throws IOException {
+ return valuesIter.getValues();
+ }
+ }
+}