Repository: tez
Updated Branches:
  refs/heads/master 2358521fa -> 614937c5d


TEZ-3804. FetcherOrderedGrouped#setupLocalDiskFetch should ignore empty 
partition records (Kuhu Shukla via jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/614937c5
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/614937c5
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/614937c5

Branch: refs/heads/master
Commit: 614937c5df88b79c85ae9fc6394652fb65d98081
Parents: 2358521
Author: Jonathan Eagles <jeag...@yahoo-inc.com>
Authored: Tue Aug 1 16:58:42 2017 -0500
Committer: Jonathan Eagles <jeag...@yahoo-inc.com>
Committed: Tue Aug 1 16:58:42 2017 -0500

----------------------------------------------------------------------
 .../orderedgrouped/FetcherOrderedGrouped.java   |  3 +
 .../shuffle/orderedgrouped/TestFetcher.java     | 83 +++++++++++++++++++-
 2 files changed, 82 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/614937c5/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index 2c3aac3..68a54e9 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -715,6 +715,9 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
             srcAttemptId = 
scheduler.getIdentifierForFetchedOutput(srcAttemptId.getPathComponent(), 
reduceId);
             Path filename = 
getShuffleInputFileName(srcAttemptId.getPathComponent(), null);
             TezIndexRecord indexRecord = 
getIndexRecord(srcAttemptId.getPathComponent(), reduceId);
+            if(!indexRecord.hasData()) {
+              continue;
+            }
 
             mapOutput = getMapOutputForDirectDiskFetch(srcAttemptId, filename, 
indexRecord);
             long endTime = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/tez/blob/614937c5/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
index ef371c2..6d30448 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
@@ -299,7 +299,7 @@ public class TestFetcher {
             throw new IOException("failing to simulate failure case");
           }
           // match with params for copySucceeded below.
-          return new TezIndexRecord(p * 10, p * 1000, p * 100);
+          return new TezIndexRecord(p * 10, (p+1) * 1000, (p+2) * 100);
         }
       }).when(spyFetcher).getIndexRecord(anyString(), eq(host.getPartitionId() 
+ i));
     }
@@ -327,6 +327,81 @@ public class TestFetcher {
     verify(scheduler).putBackKnownMapOutput(host, 
srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX));
   }
 
+  @Test (timeout = 5000)
+  public void testSetupLocalDiskFetchEmptyPartitions() throws Exception {
+    Configuration conf = new TezConfiguration();
+    ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
+    MergeManager merger = mock(MergeManager.class);
+    Shuffle shuffle = mock(Shuffle.class);
+    InputContext inputContext = mock(InputContext.class);
+    when(inputContext.getCounters()).thenReturn(new TezCounters());
+    when(inputContext.getSourceVertexName()).thenReturn("");
+
+    MapHost host = new MapHost(HOST, PORT, 1, 1);
+    FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, 
merger, shuffle, null, false, 0,
+        null, conf, true, HOST, PORT, "src vertex", host, ioErrsCounter, 
wrongLengthErrsCounter, badIdErrsCounter,
+        wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, 
APP_ID, DAG_ID,
+        false, false, true, false);
+    FetcherOrderedGrouped spyFetcher = spy(fetcher);
+
+    final List<CompositeInputAttemptIdentifier> srcAttempts = Arrays.asList(
+        new CompositeInputAttemptIdentifier(0, 1, 
InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0", 1),
+        new CompositeInputAttemptIdentifier(1, 2, 
InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", 1),
+        new CompositeInputAttemptIdentifier(2, 3, 
InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2", 1),
+        new CompositeInputAttemptIdentifier(3, 4, 
InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", 1),
+        new CompositeInputAttemptIdentifier(4, 4, 
InputAttemptIdentifier.PATH_PREFIX + "pathComponent_4", 1)
+    );
+
+    doReturn(srcAttempts).when(scheduler).getMapsForHost(host);
+
+    final ConcurrentMap<ShuffleScheduler.PathPartition, 
InputAttemptIdentifier> pathToIdentifierMap = new 
ConcurrentHashMap<ShuffleScheduler.PathPartition, InputAttemptIdentifier>();
+    for (CompositeInputAttemptIdentifier srcAttempt : srcAttempts) {
+      for (int i = 0; i < srcAttempt.getInputIdentifierCount(); i++) {
+        ShuffleScheduler.PathPartition pathPartition = new 
ShuffleScheduler.PathPartition(srcAttempt.getPathComponent(), 
host.getPartitionId() + i);
+        pathToIdentifierMap.put(pathPartition, srcAttempt.expand(i));
+      }
+    }
+    doAnswer(new Answer<InputAttemptIdentifier>() {
+      @Override
+      public InputAttemptIdentifier answer(InvocationOnMock invocation) throws 
Throwable {
+        Object[] args = invocation.getArguments();
+        String path = (String) args[0];
+        int reduceId = (int) args[1];
+        return pathToIdentifierMap.get(new 
ShuffleScheduler.PathPartition(path, reduceId));
+      }
+    }).when(scheduler)
+        .getIdentifierForFetchedOutput(any(String.class), any(int.class));
+
+    doAnswer(new Answer<Path>() {
+      @Override
+      public Path answer(InvocationOnMock invocation) throws Throwable {
+        Object[] args = invocation.getArguments();
+        return new Path(SHUFFLE_INPUT_FILE_PREFIX + args[0]);
+      }
+    }).when(spyFetcher).getShuffleInputFileName(anyString(), anyString());
+
+    for (int i = 0; i < host.getPartitionCount(); i++) {
+      doAnswer(new Answer<TezIndexRecord>() {
+        @Override
+        public TezIndexRecord answer(InvocationOnMock invocation) throws 
Throwable {
+          Object[] args = invocation.getArguments();
+          String pathComponent = (String) args[0];
+          int len = pathComponent.length();
+          long p = Long.valueOf(pathComponent.substring(len - 1, len));
+          // match with params for copySucceeded below.
+          return new TezIndexRecord(p * 10, 0, 0);
+        }
+      }).when(spyFetcher).getIndexRecord(anyString(), eq(host.getPartitionId() 
+ i));
+    }
+
+    
doNothing().when(scheduler).copySucceeded(any(InputAttemptIdentifier.class), 
any(MapHost.class),
+        anyLong(), anyLong(), anyLong(), any(MapOutput.class), anyBoolean());
+    spyFetcher.setupLocalDiskFetch(host);
+    verify(scheduler, 
times(0)).copySucceeded(any(InputAttemptIdentifier.class), any(MapHost.class),
+        anyLong(), anyLong(), anyLong(), any(MapOutput.class), anyBoolean());
+    verify(spyFetcher).putBackRemainingMapOutputs(host);
+  }
+
   @Test(timeout = 5000)
   public void testSetupLocalDiskFetchAutoReduce() throws Exception {
     Configuration conf = new TezConfiguration();
@@ -412,7 +487,7 @@ public class TestFetcher {
             throw new IOException("Thowing exception to simulate failure 
case");
           }
           // match with params for copySucceeded below.
-          return new TezIndexRecord(p * 10, p * 1000, p * 100);
+          return new TezIndexRecord(p * 10, (p + 1) * 1000, (p + 2) * 100);
         }
       }).when(spyFetcher).getIndexRecord(anyString(), eq(host.getPartitionId() 
+ i));
     }
@@ -455,8 +530,8 @@ public class TestFetcher {
     InputAttemptIdentifier srcAttemptToMatch = srcAttempts.get((int) 
p).expand(j);
     String filenameToMatch = SHUFFLE_INPUT_FILE_PREFIX + 
srcAttemptToMatch.getPathComponent();
     ArgumentCaptor<MapOutput> captureMapOutput = 
ArgumentCaptor.forClass(MapOutput.class);
-    verify(scheduler).copySucceeded(eq(srcAttemptToMatch), eq(host), eq(p * 
100),
-        eq(p * 1000), anyLong(), captureMapOutput.capture(), anyBoolean());
+    verify(scheduler).copySucceeded(eq(srcAttemptToMatch), eq(host), eq((p+2) 
* 100),
+        eq((p+1) * 1000), anyLong(), captureMapOutput.capture(), anyBoolean());
 
     // cannot use the equals of MapOutput as it compares id which is private. 
so doing it manually
     MapOutput m = captureMapOutput.getAllValues().get(0);

Reply via email to