Repository: tez Updated Branches: refs/heads/master e10d80fe6 -> 73ce1d0e3
TEZ-2613. Fetcher(unordered) using List to store InputAttemptIdentifier can lead to some inefficiency during remove() operation (Saikat via rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/73ce1d0e Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/73ce1d0e Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/73ce1d0e Branch: refs/heads/master Commit: 73ce1d0e3dec284f15d3a61c4569813cf5ded3cd Parents: e10d80f Author: Rajesh Balamohan <[email protected]> Authored: Thu Jul 30 04:16:33 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Thu Jul 30 04:16:33 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../runtime/library/common/shuffle/Fetcher.java | 102 +++++++++++-------- .../library/common/shuffle/ShuffleUtils.java | 6 +- .../library/common/shuffle/TestFetcher.java | 58 +++++++++++ 4 files changed, 121 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/73ce1d0e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 687f996..0953aec 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -11,6 +11,7 @@ INCOMPATIBLE CHANGES TEZ-2468. Change the minimum Java version to Java 7. ALL CHANGES: + TEZ-2613. Fetcher(unordered) using List to store InputAttemptIdentifier can lead to some inefficiency during remove() operation. TEZ-2645. Provide standard analyzers for job analysis. TEZ-2627. Support for Tez Job Priorities. TEZ-2623. Fix module dependencies related to hadoop-auth. http://git-wip-us.apache.org/repos/asf/tez/blob/73ce1d0e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java index 04cf5b5..08b59ed 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java @@ -28,11 +28,13 @@ import java.nio.channels.FileChannel; import java.nio.channels.FileLock; import java.nio.channels.OverlappingFileLockException; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedList; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -98,13 +100,14 @@ public class Fetcher extends CallableWithNdc<FetchResult> { // Parameters to track work. private List<InputAttemptIdentifier> srcAttempts; + @VisibleForTesting + Map<String, InputAttemptIdentifier> srcAttemptsRemaining; private String host; private int port; private int partition; // Maps from the pathComponents (unique per srcTaskId) to the specific taskId private final Map<String, InputAttemptIdentifier> pathToAttemptMap; - private List<InputAttemptIdentifier> remaining; private URL url; private volatile DataInputStream input; @@ -166,6 +169,16 @@ public class Fetcher extends CallableWithNdc<FetchResult> { } } + // helper method to populate the remaining map + void populateRemainingMap(List<InputAttemptIdentifier> origlist) { + if (srcAttemptsRemaining == null) { + srcAttemptsRemaining = new LinkedHashMap<String, InputAttemptIdentifier>(origlist.size()); + } + for (InputAttemptIdentifier id : origlist) { + srcAttemptsRemaining.put(id.toString(), id); + } + } + @Override protected FetchResult callInternal() throws Exception { boolean multiplex = (this.sharedFetchEnabled && this.localDiskFetchEnabled); @@ -174,7 +187,8 @@ public class Fetcher extends CallableWithNdc<FetchResult> { return new FetchResult(host, port, partition, srcAttempts); } - for (InputAttemptIdentifier in : srcAttempts) { + populateRemainingMap(srcAttempts); + for (InputAttemptIdentifier in : srcAttemptsRemaining.values()) { pathToAttemptMap.put(in.getPathComponent(), in); // do only if all of them are shared fetches multiplex &= in.isShared(); @@ -186,9 +200,6 @@ public class Fetcher extends CallableWithNdc<FetchResult> { + "- partition is non-zero (%d)", partition); } - //Similar to TEZ-2172 (remove can be expensive with list) - remaining = new LinkedList<InputAttemptIdentifier>(srcAttempts); - HostFetchResult hostFetchResult; if (localDiskFetchEnabled && host.equals(localHostname) && port == shufflePort) { @@ -214,12 +225,12 @@ public class Fetcher extends CallableWithNdc<FetchResult> { shutdown(); // Sanity check - if (hostFetchResult.failedInputs == null && !remaining.isEmpty()) { + if (hostFetchResult.failedInputs == null && !srcAttemptsRemaining.isEmpty()) { if (!multiplex) { throw new IOException("server didn't return all expected map outputs: " - + remaining.size() + " left."); + + srcAttemptsRemaining.size() + " left."); } else { - LOG.info("Shared fetch failed to return " + remaining.size() + " inputs on this try"); + LOG.info("Shared fetch failed to return " + srcAttemptsRemaining.size() + " inputs on this try"); } } @@ -294,7 +305,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { private int findInputs() throws IOException { int k = 0; - for (InputAttemptIdentifier src : srcAttempts) { + for (InputAttemptIdentifier src : srcAttemptsRemaining.values()) { try { if (getShuffleInputFileName(src.getPathComponent(), Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING) != null) { @@ -343,7 +354,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { protected HostFetchResult doSharedFetch() throws IOException { int inputs = findInputs(); - if (inputs == srcAttempts.size()) { + if (inputs == srcAttemptsRemaining.size()) { if (isDebugEnabled) { LOG.debug("Using the copies found locally"); } @@ -366,9 +377,9 @@ public class Fetcher extends CallableWithNdc<FetchResult> { LOG.info("Requeuing " + host + ":" + port + " downloads because we didn't get a lock"); return new HostFetchResult(new FetchResult(host, port, partition, - remaining), null, false); + srcAttemptsRemaining.values()), null, false); } else { - if (findInputs() == srcAttempts.size()) { + if (findInputs() == srcAttemptsRemaining.size()) { // double checked after lock releaseLock(lock); lock = null; @@ -392,7 +403,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { // if any exception was due to shut-down don't bother firing any more // requests return new HostFetchResult(new FetchResult(host, port, partition, - remaining), null, false); + srcAttemptsRemaining.values()), null, false); } // no more caching return doHttpFetch(); @@ -403,7 +414,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { return doHttpFetch(null); } - private HostFetchResult setupConnection(List<InputAttemptIdentifier> attempts) { + private HostFetchResult setupConnection(Collection<InputAttemptIdentifier> attempts) { try { StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(host, port, partition, appId.toString(), httpConnectionParams.isSslShuffle()); @@ -426,15 +437,16 @@ public class Fetcher extends CallableWithNdc<FetchResult> { "Not reporting fetch failure during connection establishment, since an Exception was caught after shutdown." + e.getClass().getName() + ", Message: " + e.getMessage()); } else { - failedFetches = remaining.toArray(new InputAttemptIdentifier[remaining.size()]); + failedFetches = srcAttemptsRemaining.values(). + toArray(new InputAttemptIdentifier[srcAttemptsRemaining.values().size()]); } - return new HostFetchResult(new FetchResult(host, port, partition, remaining), failedFetches, true); + return new HostFetchResult(new FetchResult(host, port, partition, srcAttemptsRemaining.values()), failedFetches, true); } if (isShutDown.get()) { // shutdown would have no effect if in the process of establishing the connection. shutdownInternal(); LOG.info("Detected fetcher has been shutdown after connection establishment. Returning"); - return new HostFetchResult(new FetchResult(host, port, partition, remaining), null, false); + return new HostFetchResult(new FetchResult(host, port, partition, srcAttemptsRemaining.values()), null, false); } try { @@ -451,10 +463,10 @@ public class Fetcher extends CallableWithNdc<FetchResult> { "Not reporting fetch failure during connection establishment, since an Exception was caught after shutdown." + e.getClass().getName() + ", Message: " + e.getMessage()); } else { - InputAttemptIdentifier firstAttempt = attempts.get(0); + InputAttemptIdentifier firstAttempt = attempts.iterator().next(); LOG.warn("Fetch Failure from host while connecting: " + host + ", attempt: " + firstAttempt + " Informing ShuffleManager: ", e); - return new HostFetchResult(new FetchResult(host, port, partition, remaining), + return new HostFetchResult(new FetchResult(host, port, partition, srcAttemptsRemaining.values()), new InputAttemptIdentifier[] { firstAttempt }, false); } } catch (InterruptedException e) { @@ -467,7 +479,8 @@ public class Fetcher extends CallableWithNdc<FetchResult> { @VisibleForTesting protected HostFetchResult doHttpFetch(CachingCallBack callback) { - HostFetchResult connectionsWithRetryResult = setupConnection(srcAttempts); + HostFetchResult connectionsWithRetryResult = + setupConnection(srcAttemptsRemaining.values()); if (connectionsWithRetryResult != null) { return connectionsWithRetryResult; } @@ -479,7 +492,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { // shutdown would have no effect if in the process of establishing the connection. shutdownInternal(); LOG.info("Detected fetcher has been shutdown after opening stream. Returning"); - return new HostFetchResult(new FetchResult(host, port, partition, remaining), null, false); + return new HostFetchResult(new FetchResult(host, port, partition, srcAttemptsRemaining.values()), null, false); } // After this point, closing the stream and connection, should cause a // SocketException, @@ -490,11 +503,11 @@ public class Fetcher extends CallableWithNdc<FetchResult> { // after putting back the remaining maps to the // yet_to_be_fetched list and marking the failed tasks. InputAttemptIdentifier[] failedInputs = null; - while (!remaining.isEmpty() && failedInputs == null) { + while (!srcAttemptsRemaining.isEmpty() && failedInputs == null) { if (isShutDown.get()) { shutdownInternal(true); - LOG.info("Fetcher already shutdown. Aborting queued fetches for " + remaining.size() + " inputs"); - return new HostFetchResult(new FetchResult(host, port, partition, remaining), null, + LOG.info("Fetcher already shutdown. Aborting queued fetches for " + srcAttemptsRemaining.size() + " inputs"); + return new HostFetchResult(new FetchResult(host, port, partition, srcAttemptsRemaining.values()), null, false); } try { @@ -503,13 +516,12 @@ public class Fetcher extends CallableWithNdc<FetchResult> { //clean up connection shutdownInternal(true); if (isShutDown.get()) { - LOG.info("Fetcher already shutdown. Aborting reconnection and queued fetches for " + remaining.size() + " inputs"); - return new HostFetchResult(new FetchResult(host, port, partition, remaining), null, + LOG.info("Fetcher already shutdown. Aborting reconnection and queued fetches for " + srcAttemptsRemaining.size() + " inputs"); + return new HostFetchResult(new FetchResult(host, port, partition, srcAttemptsRemaining.values()), null, false); } // Connect again. - connectionsWithRetryResult = setupConnection( - new LinkedList<InputAttemptIdentifier>(remaining)); + connectionsWithRetryResult = setupConnection(srcAttemptsRemaining.values()); if (connectionsWithRetryResult != null) { break; } @@ -521,7 +533,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { failedInputs.length + " failed inputs"); failedInputs = null; } - return new HostFetchResult(new FetchResult(host, port, partition, remaining), failedInputs, + return new HostFetchResult(new FetchResult(host, port, partition, srcAttemptsRemaining.values()), failedInputs, false); } @@ -533,13 +545,13 @@ public class Fetcher extends CallableWithNdc<FetchResult> { @VisibleForTesting private HostFetchResult doLocalDiskFetch(boolean failMissing) { - Iterator<InputAttemptIdentifier> iterator = remaining.iterator(); + Iterator<Entry<String, InputAttemptIdentifier>> iterator = srcAttemptsRemaining.entrySet().iterator(); while (iterator.hasNext()) { if (isShutDown.get()) { - LOG.info("Already shutdown. Skipping fetch for " + remaining.size() + " inputs"); + LOG.info("Already shutdown. Skipping fetch for " + srcAttemptsRemaining.size() + " inputs"); break; } - InputAttemptIdentifier srcAttemptId = iterator.next(); + InputAttemptIdentifier srcAttemptId = iterator.next().getValue(); long startTime = System.currentTimeMillis(); FetchedInput fetchedInput = null; @@ -586,17 +598,18 @@ public class Fetcher extends CallableWithNdc<FetchResult> { } InputAttemptIdentifier[] failedFetches = null; - if (failMissing && remaining.size() > 0) { + if (failMissing && srcAttemptsRemaining.size() > 0) { if (isShutDown.get()) { - LOG.info("Already shutdown, not reporting fetch failures for: " + remaining.size() + + LOG.info("Already shutdown, not reporting fetch failures for: " + srcAttemptsRemaining.size() + " remaining inputs"); } else { - failedFetches = remaining.toArray(new InputAttemptIdentifier[remaining.size()]); + failedFetches = srcAttemptsRemaining.values(). + toArray(new InputAttemptIdentifier[srcAttemptsRemaining.values().size()]); } } else { // nothing needs to be done to requeue remaining entries } - return new HostFetchResult(new FetchResult(host, port, partition, remaining), + return new HostFetchResult(new FetchResult(host, port, partition, srcAttemptsRemaining.values()), failedFetches, false); } @@ -695,7 +708,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { if (!isShutDown.get()) { LOG.warn("Invalid src id ", e); // Don't know which one was bad, so consider all of them as bad - return remaining.toArray(new InputAttemptIdentifier[remaining.size()]); + return srcAttemptsRemaining.values().toArray(new InputAttemptIdentifier[srcAttemptsRemaining.size()]); } else { LOG.info("Already shutdown. Ignoring badId error with message: " + e.getMessage()); return null; @@ -777,7 +790,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { compressedLength, decompressedLength, (endTime - startTime)); // Note successful shuffle - remaining.remove(srcAttemptId); + srcAttemptsRemaining.remove(srcAttemptId.toString()); // metrics.successFetch(); return null; @@ -800,8 +813,8 @@ public class Fetcher extends CallableWithNdc<FetchResult> { // Cleanup the fetchedInput before returning. cleanupFetchedInput(fetchedInput); if (srcAttemptId == null) { - return remaining - .toArray(new InputAttemptIdentifier[remaining.size()]); + return srcAttemptsRemaining.values() + .toArray(new InputAttemptIdentifier[srcAttemptsRemaining.size()]); } else { return new InputAttemptIdentifier[] { srcAttemptId }; } @@ -889,7 +902,8 @@ public class Fetcher extends CallableWithNdc<FetchResult> { } // Sanity check - if (!remaining.contains(srcAttemptId)) { + // we are guaranteed that key is not null + if (srcAttemptsRemaining.get(srcAttemptId.toString()) == null) { // wrongMapErrs.increment(1); LOG.warn("Invalid input. Received output for headerPathComponent: " + pathComponent + "nextRemainingSrcAttemptId: " @@ -900,8 +914,8 @@ public class Fetcher extends CallableWithNdc<FetchResult> { } private InputAttemptIdentifier getNextRemainingAttempt() { - if (remaining.size() > 0) { - return remaining.iterator().next(); + if (srcAttemptsRemaining.size() > 0) { + return srcAttemptsRemaining.values().iterator().next(); } else { return null; } http://git-wip-us.apache.org/repos/asf/tez/blob/73ce1d0e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index e109d6a..1081587 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@ -27,6 +27,7 @@ import java.net.URL; import java.nio.ByteBuffer; import java.text.DecimalFormat; import java.util.BitSet; +import java.util.Collection; import java.util.List; import javax.annotation.Nullable; @@ -34,6 +35,7 @@ import javax.crypto.SecretKey; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.tez.http.BaseHttpConnection; @@ -201,8 +203,8 @@ public class ShuffleUtils { return sb; } - public static URL constructInputURL(String baseURI, - List<InputAttemptIdentifier> inputs, boolean keepAlive) throws MalformedURLException { + public static URL constructInputURL(String baseURI, + Collection<InputAttemptIdentifier> inputs, boolean keepAlive) throws MalformedURLException { StringBuilder url = new StringBuilder(baseURI); boolean first = true; for (InputAttemptIdentifier input : inputs) { http://git-wip-us.apache.org/repos/asf/tez/blob/73ce1d0e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java index 34c2ca7..7678b18 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java @@ -33,9 +33,13 @@ import static org.mockito.Mockito.verify; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; +import java.util.Map.Entry; import com.google.common.collect.Lists; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; @@ -46,6 +50,7 @@ import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; +import org.apache.tez.runtime.library.common.InputIdentifier; import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord; import org.junit.Assert; import org.junit.Test; @@ -231,4 +236,57 @@ public class TestFetcher { Assert.assertEquals("success callback input id", f.getInputAttemptIdentifier(), srcAttempId); Assert.assertEquals("success callback type", f.getType(), FetchedInput.Type.DISK_DIRECT); } + + @Test(timeout=1000) + public void testInputAttemptIdentifierMap() { + InputAttemptIdentifier[] srcAttempts = { + new InputAttemptIdentifier(new InputIdentifier(0), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0", + false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), + //duplicate entry + new InputAttemptIdentifier(new InputIdentifier(0), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0", + false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), + // pipeline shuffle based identifiers, with multiple attempts + new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", + false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), + new InputAttemptIdentifier(new InputIdentifier(1), 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", + false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), + new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2", + false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1), + new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", + false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 2), + new InputAttemptIdentifier(new InputIdentifier(2), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", + false, InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED, 0) + }; + InputAttemptIdentifier[] expectedSrcAttempts = { + new InputAttemptIdentifier(new InputIdentifier(0), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0", + false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), + // pipeline shuffle based identifiers + new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", + false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), + new InputAttemptIdentifier(new InputIdentifier(1), 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", + false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0), + new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2", + false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1), + new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", + false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 2), + new InputAttemptIdentifier(new InputIdentifier(2), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", + false, InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED, 0) + }; + TezConfiguration conf = new TezConfiguration(); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, "true"); + int partition = 42; + FetcherCallback callback = mock(FetcherCallback.class); + Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, null, null, + ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, true, HOST, PORT, false); + builder.assignWork(HOST, PORT, partition, Arrays.asList(srcAttempts)); + Fetcher fetcher = spy(builder.build()); + fetcher.populateRemainingMap(new LinkedList<InputAttemptIdentifier>(Arrays.asList(srcAttempts))); + Assert.assertTrue(expectedSrcAttempts.length == fetcher.srcAttemptsRemaining.size()); + Iterator<Entry<String, InputAttemptIdentifier>> iterator = fetcher.srcAttemptsRemaining.entrySet().iterator(); + int count = 0; + while(iterator.hasNext()) { + String key = iterator.next().getKey(); + Assert.assertTrue(expectedSrcAttempts[count++].toString().compareTo(key) == 0); + } + } }
