Repository: tez
Updated Branches:
  refs/heads/master e10d80fe6 -> 73ce1d0e3


TEZ-2613. Fetcher(unordered) using List to store InputAttemptIdentifier can 
lead to some inefficiency during remove() operation (Saikat via rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/73ce1d0e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/73ce1d0e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/73ce1d0e

Branch: refs/heads/master
Commit: 73ce1d0e3dec284f15d3a61c4569813cf5ded3cd
Parents: e10d80f
Author: Rajesh Balamohan <[email protected]>
Authored: Thu Jul 30 04:16:33 2015 +0530
Committer: Rajesh Balamohan <[email protected]>
Committed: Thu Jul 30 04:16:33 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../runtime/library/common/shuffle/Fetcher.java | 102 +++++++++++--------
 .../library/common/shuffle/ShuffleUtils.java    |   6 +-
 .../library/common/shuffle/TestFetcher.java     |  58 +++++++++++
 4 files changed, 121 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/73ce1d0e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 687f996..0953aec 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@ INCOMPATIBLE CHANGES
   TEZ-2468. Change the minimum Java version to Java 7.
 
 ALL CHANGES:
+  TEZ-2613. Fetcher(unordered) using List to store InputAttemptIdentifier can 
lead to some inefficiency during remove() operation.
   TEZ-2645. Provide standard analyzers for job analysis.
   TEZ-2627. Support for Tez Job Priorities.
   TEZ-2623. Fix module dependencies related to hadoop-auth.

http://git-wip-us.apache.org/repos/asf/tez/blob/73ce1d0e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index 04cf5b5..08b59ed 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -28,11 +28,13 @@ import java.nio.channels.FileChannel;
 import java.nio.channels.FileLock;
 import java.nio.channels.OverlappingFileLockException;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedList;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -98,13 +100,14 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
 
   // Parameters to track work.
   private List<InputAttemptIdentifier> srcAttempts;
+  @VisibleForTesting
+  Map<String, InputAttemptIdentifier> srcAttemptsRemaining;
   private String host;
   private int port;
   private int partition;
 
   // Maps from the pathComponents (unique per srcTaskId) to the specific taskId
   private final Map<String, InputAttemptIdentifier> pathToAttemptMap;
-  private List<InputAttemptIdentifier> remaining;
 
   private URL url;
   private volatile DataInputStream input;
@@ -166,6 +169,16 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
     }
   }
 
+  // helper method to populate the remaining map
+  void populateRemainingMap(List<InputAttemptIdentifier> origlist) {
+    if (srcAttemptsRemaining == null) {
+      srcAttemptsRemaining = new LinkedHashMap<String, 
InputAttemptIdentifier>(origlist.size());
+    }
+    for (InputAttemptIdentifier id : origlist) {
+      srcAttemptsRemaining.put(id.toString(), id);
+    }
+  }
+
   @Override
   protected FetchResult callInternal() throws Exception {
     boolean multiplex = (this.sharedFetchEnabled && 
this.localDiskFetchEnabled);
@@ -174,7 +187,8 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
       return new FetchResult(host, port, partition, srcAttempts);
     }
 
-    for (InputAttemptIdentifier in : srcAttempts) {
+    populateRemainingMap(srcAttempts);
+    for (InputAttemptIdentifier in : srcAttemptsRemaining.values()) {
       pathToAttemptMap.put(in.getPathComponent(), in);
       // do only if all of them are shared fetches
       multiplex &= in.isShared();
@@ -186,9 +200,6 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
               + "- partition is non-zero (%d)", partition);
     }
 
-    //Similar to TEZ-2172 (remove can be expensive with list)
-    remaining = new LinkedList<InputAttemptIdentifier>(srcAttempts);
-
     HostFetchResult hostFetchResult;
 
     if (localDiskFetchEnabled && host.equals(localHostname) && port == 
shufflePort) {
@@ -214,12 +225,12 @@ public class Fetcher extends CallableWithNdc<FetchResult> 
{
     shutdown();
 
     // Sanity check
-    if (hostFetchResult.failedInputs == null && !remaining.isEmpty()) {
+    if (hostFetchResult.failedInputs == null && 
!srcAttemptsRemaining.isEmpty()) {
       if (!multiplex) {
         throw new IOException("server didn't return all expected map outputs: "
-            + remaining.size() + " left.");
+            + srcAttemptsRemaining.size() + " left.");
       } else {
-        LOG.info("Shared fetch failed to return " + remaining.size() + " 
inputs on this try");
+        LOG.info("Shared fetch failed to return " + 
srcAttemptsRemaining.size() + " inputs on this try");
       }
     }
 
@@ -294,7 +305,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
 
   private int findInputs() throws IOException {
     int k = 0;
-    for (InputAttemptIdentifier src : srcAttempts) {
+    for (InputAttemptIdentifier src : srcAttemptsRemaining.values()) {
       try {
         if (getShuffleInputFileName(src.getPathComponent(),
             Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING) != null) {
@@ -343,7 +354,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
   protected HostFetchResult doSharedFetch() throws IOException {
     int inputs = findInputs();
 
-    if (inputs == srcAttempts.size()) {
+    if (inputs == srcAttemptsRemaining.size()) {
       if (isDebugEnabled) {
         LOG.debug("Using the copies found locally");
       }
@@ -366,9 +377,9 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
         LOG.info("Requeuing " + host + ":" + port
             + " downloads because we didn't get a lock");
         return new HostFetchResult(new FetchResult(host, port, partition,
-            remaining), null, false);
+            srcAttemptsRemaining.values()), null, false);
       } else {
-        if (findInputs() == srcAttempts.size()) {
+        if (findInputs() == srcAttemptsRemaining.size()) {
           // double checked after lock
           releaseLock(lock);
           lock = null;
@@ -392,7 +403,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
       // if any exception was due to shut-down don't bother firing any more
       // requests
       return new HostFetchResult(new FetchResult(host, port, partition,
-          remaining), null, false);
+          srcAttemptsRemaining.values()), null, false);
     }
     // no more caching
     return doHttpFetch();
@@ -403,7 +414,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
     return doHttpFetch(null);
   }
 
-  private HostFetchResult setupConnection(List<InputAttemptIdentifier> 
attempts) {
+  private HostFetchResult setupConnection(Collection<InputAttemptIdentifier> 
attempts) {
     try {
       StringBuilder baseURI = 
ShuffleUtils.constructBaseURIForShuffleHandler(host,
           port, partition, appId.toString(), 
httpConnectionParams.isSslShuffle());
@@ -426,15 +437,16 @@ public class Fetcher extends CallableWithNdc<FetchResult> 
{
             "Not reporting fetch failure during connection establishment, 
since an Exception was caught after shutdown." +
                 e.getClass().getName() + ", Message: " + e.getMessage());
       } else {
-        failedFetches = remaining.toArray(new 
InputAttemptIdentifier[remaining.size()]);
+        failedFetches = srcAttemptsRemaining.values().
+            toArray(new 
InputAttemptIdentifier[srcAttemptsRemaining.values().size()]);
       }
-      return new HostFetchResult(new FetchResult(host, port, partition, 
remaining), failedFetches, true);
+      return new HostFetchResult(new FetchResult(host, port, partition, 
srcAttemptsRemaining.values()), failedFetches, true);
     }
     if (isShutDown.get()) {
       // shutdown would have no effect if in the process of establishing the 
connection.
       shutdownInternal();
       LOG.info("Detected fetcher has been shutdown after connection 
establishment. Returning");
-      return new HostFetchResult(new FetchResult(host, port, partition, 
remaining), null, false);
+      return new HostFetchResult(new FetchResult(host, port, partition, 
srcAttemptsRemaining.values()), null, false);
     }
 
     try {
@@ -451,10 +463,10 @@ public class Fetcher extends CallableWithNdc<FetchResult> 
{
             "Not reporting fetch failure during connection establishment, 
since an Exception was caught after shutdown." +
                 e.getClass().getName() + ", Message: " + e.getMessage());
       } else {
-        InputAttemptIdentifier firstAttempt = attempts.get(0);
+        InputAttemptIdentifier firstAttempt = attempts.iterator().next();
         LOG.warn("Fetch Failure from host while connecting: " + host + ", 
attempt: " + firstAttempt
             + " Informing ShuffleManager: ", e);
-        return new HostFetchResult(new FetchResult(host, port, partition, 
remaining),
+        return new HostFetchResult(new FetchResult(host, port, partition, 
srcAttemptsRemaining.values()),
             new InputAttemptIdentifier[] { firstAttempt }, false);
       }
     } catch (InterruptedException e) {
@@ -467,7 +479,8 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
   @VisibleForTesting
   protected HostFetchResult doHttpFetch(CachingCallBack callback) {
 
-    HostFetchResult connectionsWithRetryResult = setupConnection(srcAttempts);
+    HostFetchResult connectionsWithRetryResult =
+        setupConnection(srcAttemptsRemaining.values());
     if (connectionsWithRetryResult != null) {
       return connectionsWithRetryResult;
     }
@@ -479,7 +492,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
       // shutdown would have no effect if in the process of establishing the 
connection.
       shutdownInternal();
       LOG.info("Detected fetcher has been shutdown after opening stream. 
Returning");
-      return new HostFetchResult(new FetchResult(host, port, partition, 
remaining), null, false);
+      return new HostFetchResult(new FetchResult(host, port, partition, 
srcAttemptsRemaining.values()), null, false);
     }
     // After this point, closing the stream and connection, should cause a
     // SocketException,
@@ -490,11 +503,11 @@ public class Fetcher extends CallableWithNdc<FetchResult> 
{
     // after putting back the remaining maps to the
     // yet_to_be_fetched list and marking the failed tasks.
     InputAttemptIdentifier[] failedInputs = null;
-    while (!remaining.isEmpty() && failedInputs == null) {
+    while (!srcAttemptsRemaining.isEmpty() && failedInputs == null) {
       if (isShutDown.get()) {
         shutdownInternal(true);
-        LOG.info("Fetcher already shutdown. Aborting queued fetches for " + 
remaining.size() + " inputs");
-        return new HostFetchResult(new FetchResult(host, port, partition, 
remaining), null,
+        LOG.info("Fetcher already shutdown. Aborting queued fetches for " + 
srcAttemptsRemaining.size() + " inputs");
+        return new HostFetchResult(new FetchResult(host, port, partition, 
srcAttemptsRemaining.values()), null,
             false);
       }
       try {
@@ -503,13 +516,12 @@ public class Fetcher extends CallableWithNdc<FetchResult> 
{
         //clean up connection
         shutdownInternal(true);
         if (isShutDown.get()) {
-          LOG.info("Fetcher already shutdown. Aborting reconnection and queued 
fetches for " + remaining.size() + " inputs");
-          return new HostFetchResult(new FetchResult(host, port, partition, 
remaining), null,
+          LOG.info("Fetcher already shutdown. Aborting reconnection and queued 
fetches for " + srcAttemptsRemaining.size() + " inputs");
+          return new HostFetchResult(new FetchResult(host, port, partition, 
srcAttemptsRemaining.values()), null,
               false);
         }
         // Connect again.
-        connectionsWithRetryResult = setupConnection(
-            new LinkedList<InputAttemptIdentifier>(remaining));
+        connectionsWithRetryResult = 
setupConnection(srcAttemptsRemaining.values());
         if (connectionsWithRetryResult != null) {
           break;
         }
@@ -521,7 +533,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
           failedInputs.length + " failed inputs");
       failedInputs = null;
     }
-    return new HostFetchResult(new FetchResult(host, port, partition, 
remaining), failedInputs,
+    return new HostFetchResult(new FetchResult(host, port, partition, 
srcAttemptsRemaining.values()), failedInputs,
         false);
   }
 
@@ -533,13 +545,13 @@ public class Fetcher extends CallableWithNdc<FetchResult> 
{
   @VisibleForTesting
   private HostFetchResult doLocalDiskFetch(boolean failMissing) {
 
-    Iterator<InputAttemptIdentifier> iterator = remaining.iterator();
+    Iterator<Entry<String, InputAttemptIdentifier>> iterator = 
srcAttemptsRemaining.entrySet().iterator();
     while (iterator.hasNext()) {
       if (isShutDown.get()) {
-        LOG.info("Already shutdown. Skipping fetch for " + remaining.size() + 
" inputs");
+        LOG.info("Already shutdown. Skipping fetch for " + 
srcAttemptsRemaining.size() + " inputs");
         break;
       }
-      InputAttemptIdentifier srcAttemptId = iterator.next();
+      InputAttemptIdentifier srcAttemptId = iterator.next().getValue();
       long startTime = System.currentTimeMillis();
 
       FetchedInput fetchedInput = null;
@@ -586,17 +598,18 @@ public class Fetcher extends CallableWithNdc<FetchResult> 
{
     }
 
     InputAttemptIdentifier[] failedFetches = null;
-    if (failMissing && remaining.size() > 0) {
+    if (failMissing && srcAttemptsRemaining.size() > 0) {
       if (isShutDown.get()) {
-        LOG.info("Already shutdown, not reporting fetch failures for: " + 
remaining.size() +
+        LOG.info("Already shutdown, not reporting fetch failures for: " + 
srcAttemptsRemaining.size() +
             " remaining inputs");
       } else {
-        failedFetches = remaining.toArray(new 
InputAttemptIdentifier[remaining.size()]);
+        failedFetches = srcAttemptsRemaining.values().
+            toArray(new 
InputAttemptIdentifier[srcAttemptsRemaining.values().size()]);
       }
     } else {
       // nothing needs to be done to requeue remaining entries
     }
-    return new HostFetchResult(new FetchResult(host, port, partition, 
remaining),
+    return new HostFetchResult(new FetchResult(host, port, partition, 
srcAttemptsRemaining.values()),
         failedFetches, false);
   }
 
@@ -695,7 +708,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
         if (!isShutDown.get()) {
           LOG.warn("Invalid src id ", e);
           // Don't know which one was bad, so consider all of them as bad
-          return remaining.toArray(new 
InputAttemptIdentifier[remaining.size()]);
+          return srcAttemptsRemaining.values().toArray(new 
InputAttemptIdentifier[srcAttemptsRemaining.size()]);
         } else {
           LOG.info("Already shutdown. Ignoring badId error with message: " + 
e.getMessage());
           return null;
@@ -777,7 +790,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
           compressedLength, decompressedLength, (endTime - startTime));
 
       // Note successful shuffle
-      remaining.remove(srcAttemptId);
+      srcAttemptsRemaining.remove(srcAttemptId.toString());
 
       // metrics.successFetch();
       return null;
@@ -800,8 +813,8 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
         // Cleanup the fetchedInput before returning.
         cleanupFetchedInput(fetchedInput);
         if (srcAttemptId == null) {
-          return remaining
-              .toArray(new InputAttemptIdentifier[remaining.size()]);
+          return srcAttemptsRemaining.values()
+              .toArray(new 
InputAttemptIdentifier[srcAttemptsRemaining.size()]);
         } else {
           return new InputAttemptIdentifier[] { srcAttemptId };
         }
@@ -889,7 +902,8 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
     }
 
     // Sanity check
-    if (!remaining.contains(srcAttemptId)) {
+    // we are guaranteed that key is not null
+    if (srcAttemptsRemaining.get(srcAttemptId.toString()) == null) {
       // wrongMapErrs.increment(1);
       LOG.warn("Invalid input. Received output for headerPathComponent: "
           + pathComponent + "nextRemainingSrcAttemptId: "
@@ -900,8 +914,8 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
   }
   
   private InputAttemptIdentifier getNextRemainingAttempt() {
-    if (remaining.size() > 0) {
-      return remaining.iterator().next();
+    if (srcAttemptsRemaining.size() > 0) {
+      return srcAttemptsRemaining.values().iterator().next();
     } else {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/73ce1d0e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index e109d6a..1081587 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -27,6 +27,7 @@ import java.net.URL;
 import java.nio.ByteBuffer;
 import java.text.DecimalFormat;
 import java.util.BitSet;
+import java.util.Collection;
 import java.util.List;
 
 import javax.annotation.Nullable;
@@ -34,6 +35,7 @@ import javax.crypto.SecretKey;
 
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.tez.http.BaseHttpConnection;
@@ -201,8 +203,8 @@ public class ShuffleUtils {
     return sb;
   }
 
-  public static URL constructInputURL(String baseURI, 
-      List<InputAttemptIdentifier> inputs, boolean keepAlive) throws 
MalformedURLException {
+  public static URL constructInputURL(String baseURI,
+      Collection<InputAttemptIdentifier> inputs, boolean keepAlive) throws 
MalformedURLException {
     StringBuilder url = new StringBuilder(baseURI);
     boolean first = true;
     for (InputAttemptIdentifier input : inputs) {

http://git-wip-us.apache.org/repos/asf/tez/blob/73ce1d0e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
index 34c2ca7..7678b18 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
@@ -33,9 +33,13 @@ import static org.mockito.Mockito.verify;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map.Entry;
 
 import com.google.common.collect.Lists;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
@@ -46,6 +50,7 @@ import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.InputIdentifier;
 import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
 import org.junit.Assert;
 import org.junit.Test;
@@ -231,4 +236,57 @@ public class TestFetcher {
     Assert.assertEquals("success callback input id", 
f.getInputAttemptIdentifier(), srcAttempId);
     Assert.assertEquals("success callback type", f.getType(), 
FetchedInput.Type.DISK_DIRECT);
   }
+
+  @Test(timeout=1000)
+  public void testInputAttemptIdentifierMap() {
+    InputAttemptIdentifier[] srcAttempts = {
+        new InputAttemptIdentifier(new InputIdentifier(0), 1, 
InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
+            false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
+            //duplicate entry
+        new InputAttemptIdentifier(new InputIdentifier(0), 1, 
InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
+            false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
+        // pipeline shuffle based identifiers, with multiple attempts
+        new InputAttemptIdentifier(new InputIdentifier(1), 1, 
InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
+            false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
+        new InputAttemptIdentifier(new InputIdentifier(1), 2, 
InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
+            false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
+        new InputAttemptIdentifier(new InputIdentifier(1), 1, 
InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2",
+            false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1),
+        new InputAttemptIdentifier(new InputIdentifier(1), 1, 
InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
+            false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 2),
+        new InputAttemptIdentifier(new InputIdentifier(2), 1, 
InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
+            false, InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED, 0)
+    };
+    InputAttemptIdentifier[] expectedSrcAttempts = {
+        new InputAttemptIdentifier(new InputIdentifier(0), 1, 
InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
+            false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
+        // pipeline shuffle based identifiers
+        new InputAttemptIdentifier(new InputIdentifier(1), 1, 
InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
+            false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
+        new InputAttemptIdentifier(new InputIdentifier(1), 2, 
InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
+            false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
+        new InputAttemptIdentifier(new InputIdentifier(1), 1, 
InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2",
+            false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1),
+        new InputAttemptIdentifier(new InputIdentifier(1), 1, 
InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
+            false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 2),
+        new InputAttemptIdentifier(new InputIdentifier(2), 1, 
InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
+            false, InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED, 0)
+    };
+    TezConfiguration conf = new TezConfiguration();
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, "true");
+    int partition = 42;
+    FetcherCallback callback = mock(FetcherCallback.class);
+    Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, 
null, null,
+        ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, true, 
HOST, PORT, false);
+    builder.assignWork(HOST, PORT, partition, Arrays.asList(srcAttempts));
+    Fetcher fetcher = spy(builder.build());
+    fetcher.populateRemainingMap(new 
LinkedList<InputAttemptIdentifier>(Arrays.asList(srcAttempts)));
+    Assert.assertTrue(expectedSrcAttempts.length == 
fetcher.srcAttemptsRemaining.size());
+    Iterator<Entry<String, InputAttemptIdentifier>> iterator = 
fetcher.srcAttemptsRemaining.entrySet().iterator();
+    int count = 0;
+    while(iterator.hasNext()) {
+      String key = iterator.next().getKey();
+      Assert.assertTrue(expectedSrcAttempts[count++].toString().compareTo(key) 
== 0);
+    }
+  }
 }

Reply via email to