Repository: tez
Updated Branches:
  refs/heads/master cb3338e86 -> 65265de17


TEZ-1257. Error on empty partition when using OnFileUnorderedKVOutput and 
ShuffledMergedInput


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/65265de1
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/65265de1
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/65265de1

Branch: refs/heads/master
Commit: 65265de17b2356dfbe6bc3e1a54804cd9eb3850b
Parents: cb3338e
Author: Rajesh Balamohan <[email protected]>
Authored: Tue Jul 29 06:38:35 2014 +0530
Committer: Rajesh Balamohan <[email protected]>
Committed: Tue Jul 29 06:38:35 2014 +0530

----------------------------------------------------------------------
 .../shuffle/impl/ShuffleInputEventHandler.java  |  18 +-
 .../library/shuffle/common/ShuffleUtils.java    |  14 ++
 .../impl/ShuffleInputEventHandlerImpl.java      |  15 +-
 .../impl/TestShuffleInputEventHandler.java      | 171 +++++++++++++++++++
 4 files changed, 198 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/65265de1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
index ac88865..6fdf65e 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
@@ -23,6 +23,7 @@ import java.net.URI;
 import java.util.BitSet;
 import java.util.List;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tez.common.TezCommonUtils;
@@ -78,11 +79,8 @@ public class ShuffleInputEventHandler {
       throw new TezUncheckedException("Unable to parse DataMovementEvent 
payload", e);
     } 
     int partitionId = dmEvent.getSourceIndex();
-    URI baseUri = getBaseURI(shufflePayload.getHost(), 
shufflePayload.getPort(), partitionId);
-    InputAttemptIdentifier srcAttemptIdentifier = 
-        new InputAttemptIdentifier(dmEvent.getTargetIndex(), 
dmEvent.getVersion(), shufflePayload.getPathComponent());
-    LOG.info("DataMovementEvent baseUri:" + baseUri + ", src: " + 
srcAttemptIdentifier);
-    
+    LOG.info("DataMovementEvent partitionId:" + partitionId + ", targetIndex: 
" + dmEvent.getTargetIndex()
+        + ", attemptNum: " + dmEvent.getVersion() + ", payload: " + 
ShuffleUtils.stringify(shufflePayload));
     // TODO NEWTEZ See if this duration hack can be removed.
     int duration = shufflePayload.getRunDuration();
     if (duration > maxMapRuntime) {
@@ -94,6 +92,8 @@ public class ShuffleInputEventHandler {
         byte[] emptyPartitions = 
TezCommonUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions());
         BitSet emptyPartitionsBitSet = TezUtils.fromByteArray(emptyPartitions);
         if (emptyPartitionsBitSet.get(partitionId)) {
+          InputAttemptIdentifier srcAttemptIdentifier =
+              new InputAttemptIdentifier(dmEvent.getTargetIndex(), 
dmEvent.getVersion());
           LOG.info("Source partition: " + partitionId + " did not generate any 
data. SrcAttempt: ["
               + srcAttemptIdentifier + "]. Not fetching.");
           scheduler.copySucceeded(srcAttemptIdentifier, null, 0, 0, 0, null);
@@ -104,7 +104,10 @@ public class ShuffleInputEventHandler {
                 "the empty partition to succeeded", e);
       }
     }
-    scheduler.addKnownMapOutput(shufflePayload.getHost(), 
shufflePayload.getPort(), 
+    URI baseUri = getBaseURI(shufflePayload.getHost(), 
shufflePayload.getPort(), partitionId);
+    InputAttemptIdentifier srcAttemptIdentifier =
+        new InputAttemptIdentifier(dmEvent.getTargetIndex(), 
dmEvent.getVersion(), shufflePayload.getPathComponent());
+    scheduler.addKnownMapOutput(shufflePayload.getHost(), 
shufflePayload.getPort(),
         partitionId, baseUri.toString(), srcAttemptIdentifier);
   }
   
@@ -115,7 +118,8 @@ public class ShuffleInputEventHandler {
   }
 
   // TODO NEWTEZ Handle encrypted shuffle
-  private URI getBaseURI(String host, int port, int partitionId) {
+  @VisibleForTesting
+  URI getBaseURI(String host, int port, int partitionId) {
     StringBuilder sb = ShuffleUtils.constructBaseURIForShuffleHandler(host, 
port,
       partitionId, inputContext.getApplicationId().toString(), sslShuffle);
     URI u = URI.create(sb.toString());

http://git-wip-us.apache.org/repos/asf/tez/blob/65265de1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java
index c7b1dde..44b9a3b 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java
@@ -44,6 +44,7 @@ import 
org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.sort.impl.IFile;
 import 
org.apache.tez.runtime.library.shuffle.common.HttpConnection.HttpConnectionParams;
 import 
org.apache.tez.runtime.library.shuffle.common.HttpConnection.HttpConnectionParamsBuilder;
+import 
org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
 
 public class ShuffleUtils {
 
@@ -220,5 +221,18 @@ public class ShuffleUtils {
 
     return builder.build();
   }
+
+  public static String stringify(DataMovementEventPayloadProto dmProto) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("[");
+    sb.append("hasEmptyPartitions: 
").append(dmProto.hasEmptyPartitions()).append(", ");
+    sb.append("host: " + dmProto.getHost()).append(", ");
+    sb.append("port: " + dmProto.getPort()).append(", ");
+    sb.append("pathComponent: " + dmProto.getPathComponent()).append(", ");
+    sb.append("runDuration: " + dmProto.getRunDuration()).append(", ");
+    sb.append("hasDataInEvent: " + dmProto.hasData());
+    sb.append("]");
+    return sb.toString();
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/65265de1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
index 65dba32..83d9502 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
@@ -99,7 +99,7 @@ public class ShuffleInputEventHandlerImpl implements 
ShuffleEventHandler {
     LOG.info("Processing DataMovementEvent with srcIndex: "
         + srcIndex + ", targetIndex: " + dme.getTargetIndex()
         + ", attemptNum: " + dme.getVersion() + ", payload: "
-        + stringify(shufflePayload));
+        + ShuffleUtils.stringify(shufflePayload));
 
     if (shufflePayload.hasEmptyPartitions()) {
       byte[] emptyPartitions = 
TezCommonUtils.decompressByteStringToByteArray(shufflePayload
@@ -154,17 +154,6 @@ public class ShuffleInputEventHandlerImpl implements 
ShuffleEventHandler {
     InputAttemptIdentifier srcAttemptIdentifier = new 
InputAttemptIdentifier(ife.getTargetIndex(), ife.getVersion());
     shuffleManager.obsoleteKnownInput(srcAttemptIdentifier);
   }
-  
-  private String stringify(DataMovementEventPayloadProto dmProto) {
-    StringBuilder sb = new StringBuilder();
-    sb.append("[");
-    sb.append("hasEmptyPartitions: 
").append(dmProto.hasEmptyPartitions()).append(", ");
-    sb.append("host: " + dmProto.getHost()).append(", ");
-    sb.append("port: " + dmProto.getPort()).append(", ");
-    sb.append("pathComponent: " + dmProto.getPathComponent()).append(", ");
-    sb.append("runDuration: " + dmProto.getRunDuration()).append(", ");
-    sb.append("hasDataInEvent: " + dmProto.hasData());
-    return sb.toString();
-  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/65265de1/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandler.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandler.java
new file mode 100644
index 0000000..f921567
--- /dev/null
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandler.java
@@ -0,0 +1,171 @@
+package org.apache.tez.runtime.library.common.shuffle.impl;
+
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputFailedEvent;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class TestShuffleInputEventHandler {
+
+  private static final String HOST = "localhost";
+  private static final int PORT = 8080;
+  private static final String PATH_COMPONENT = "attempt";
+
+  private ShuffleInputEventHandler handler;
+  private ShuffleScheduler scheduler;
+
+  private TezInputContext createTezInputContext() {
+    ApplicationId applicationId = ApplicationId.newInstance(1, 1);
+    TezInputContext inputContext = mock(TezInputContext.class);
+    doReturn(applicationId).when(inputContext).getApplicationId();
+    return inputContext;
+  }
+
+  private Event createDataMovementEvent(int srcIndex, int targetIndex,
+      ByteString emptyPartitionByteString, boolean allPartitionsEmpty) {
+    ShuffleUserPayloads.DataMovementEventPayloadProto.Builder builder =
+        ShuffleUserPayloads.DataMovementEventPayloadProto
+            .newBuilder();
+    if (!allPartitionsEmpty) {
+      builder.setHost(HOST);
+      builder.setPort(PORT);
+      builder.setPathComponent(PATH_COMPONENT);
+    }
+    builder.setRunDuration(10);
+    if (emptyPartitionByteString != null) {
+      builder.setEmptyPartitions(emptyPartitionByteString);
+    }
+    return new DataMovementEvent(srcIndex, targetIndex, 0, 
builder.build().toByteArray());
+  }
+
+  @Before
+  public void setup() throws Exception {
+    TezInputContext inputContext = createTezInputContext();
+    scheduler = mock(ShuffleScheduler.class);
+    handler = new ShuffleInputEventHandler(inputContext, scheduler, false);
+  }
+
+  @Test
+  public void basicTest() {
+    List<Event> events = new LinkedList<Event>();
+    int srcIdx = 0;
+    int targetIdx = 1;
+    Event dme = createDataMovementEvent(srcIdx, targetIdx, null, false);
+    events.add(dme);
+    handler.handleEvents(events);
+    InputAttemptIdentifier expectedIdentifier = new 
InputAttemptIdentifier(targetIdx, 0,
+        PATH_COMPONENT);
+
+    String baseUri = handler.getBaseURI(HOST, PORT, srcIdx).toString();
+    int partitionId = srcIdx;
+    verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId),
+        eq(baseUri), eq(expectedIdentifier));
+  }
+
+  @Test
+  public void testFailedEvent() {
+    List<Event> events = new LinkedList<Event>();
+    int targetIdx = 1;
+    InputFailedEvent failedEvent = new InputFailedEvent(targetIdx, 0);
+    events.add(failedEvent);
+    handler.handleEvents(events);
+    InputAttemptIdentifier expectedIdentifier = new 
InputAttemptIdentifier(targetIdx, 0);
+    verify(scheduler).obsoleteInput(eq(expectedIdentifier));
+  }
+
+  @Test
+  public void testAllPartitionsEmpty() throws IOException {
+    List<Event> events = new LinkedList<Event>();
+    int srcIdx = 0;
+    int targetIdx = 1;
+    Event dme = createDataMovementEvent(srcIdx, targetIdx, 
createEmptyPartitionByteString(srcIdx)
+        , true);
+    events.add(dme);
+    handler.handleEvents(events);
+
+    InputAttemptIdentifier expectedIdentifier = new 
InputAttemptIdentifier(targetIdx, 0);
+    verify(scheduler).copySucceeded(eq(expectedIdentifier), 
any(MapHost.class), eq(0l),
+        eq(0l), eq(0l), any(MapOutput.class));
+  }
+
+  @Test
+  public void testCurrentPartitionEmpty() throws IOException {
+    List<Event> events = new LinkedList<Event>();
+    int srcIdx = 0;
+    int targetIdx = 1;
+    Event dme = createDataMovementEvent(srcIdx, targetIdx, 
createEmptyPartitionByteString(srcIdx)
+        , false);
+    events.add(dme);
+    handler.handleEvents(events);
+
+    InputAttemptIdentifier expectedIdentifier = new 
InputAttemptIdentifier(targetIdx, 0);
+
+    verify(scheduler).copySucceeded(eq(expectedIdentifier), 
any(MapHost.class), eq(0l),
+        eq(0l), eq(0l), any(MapOutput.class));
+  }
+
+  @Test
+  public void testOtherPartitionEmpty() throws IOException {
+    List<Event> events = new LinkedList<Event>();
+    int srcIdx = 0;
+    int taskIndex = 1;
+    Event dme = createDataMovementEvent(srcIdx, taskIndex, 
createEmptyPartitionByteString(100),
+        false);
+    events.add(dme);
+    handler.handleEvents(events);
+
+    String baseUri = handler.getBaseURI(HOST, PORT, srcIdx).toString();
+    int partitionId = srcIdx;
+    InputAttemptIdentifier expectedIdentifier =
+        new InputAttemptIdentifier(taskIndex, 0, PATH_COMPONENT);
+
+    verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), 
eq(baseUri),
+        eq(expectedIdentifier));
+  }
+
+  private ByteString createEmptyPartitionByteString(int... emptyPartitions) 
throws IOException {
+    BitSet bitSet = new BitSet();
+    for (int i : emptyPartitions) {
+      bitSet.set(i);
+    }
+    return 
TezCommonUtils.compressByteArrayToByteString(TezUtils.toByteArray(bitSet));
+  }
+
+}

Reply via email to