Author: sseth
Date: Tue Apr 21 03:38:16 2015
New Revision: 1675025

URL: http://svn.apache.org/r1675025
Log:
LLAP: Avoid fetching data multiple times in case of broadcast. (Siddharth Seth)

Modified:
    
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java

Modified: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java?rev=1675025&r1=1675024&r2=1675025&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
 (original)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
 Tue Apr 21 03:38:16 2015
@@ -33,6 +33,8 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hive.common.CallableWithNdc;
 import org.apache.hadoop.hive.llap.daemon.HistoryLogger;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
 import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
@@ -55,6 +57,7 @@ import org.apache.tez.runtime.api.impl.I
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
 import org.apache.tez.runtime.internals.api.TaskReporterInterface;
+import org.apache.tez.runtime.library.input.UnorderedKVInput;
 import org.apache.tez.runtime.task.TezChild;
 import org.apache.tez.runtime.task.TezTaskRunner;
 
@@ -150,7 +153,7 @@ public class TaskRunnerCallable extends
     Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<>();
     serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
         TezCommonUtils.convertJobTokenToBytes(jobToken));
-    Multimap<String, String> startedInputsMap = HashMultimap.create();
+    Multimap<String, String> startedInputsMap = 
createStartedInputMap(request.getFragmentSpec());
 
     UserGroupInformation taskOwner =
         UserGroupInformation.createRemoteUser(request.getTokenIdentifier());
@@ -187,7 +190,7 @@ public class TaskRunnerCallable extends
       if (shouldDie) {
         LOG.info("Got a shouldDie notification via heartbeats. Shutting down");
         return new TezChild.ContainerExecutionResult(
-            TezChild.ContainerExecutionResult.ExitStatus.SUCCESS, null,
+            TezChild.ContainerExecutionResult.ExitStatus.ASKED_TO_DIE, null,
             "Asked to die by the AM");
       }
     } catch (IOException e) {
@@ -252,6 +255,20 @@ public class TaskRunnerCallable extends
     return !inputClassName.equals(MRInputLegacy.class.getName());
   }
 
+  private Multimap<String, String> createStartedInputMap(FragmentSpecProto 
fragmentSpec) {
+    Multimap<String, String> startedInputMap = HashMultimap.create();
+    // Let the Processor control start for Broadcast inputs.
+
+    // TODO For now, this affects non broadcast unsorted cases as well. Make 
use of the edge
+    // property when it's available.
+    for (IOSpecProto inputSpec : fragmentSpec.getInputSpecsList()) {
+      if 
(inputSpec.getIoDescriptor().getClassName().equals(UnorderedKVInput.class.getName()))
 {
+        startedInputMap.put(fragmentSpec.getVertexName(), 
inputSpec.getConnectedVertexName());
+      }
+    }
+    return startedInputMap;
+  }
+
   public void shutdown() {
     if (executor != null) {
       executor.shutdownNow();


Reply via email to