Author: gunther
Date: Fri Sep  5 20:16:08 2014
New Revision: 1622783

URL: http://svn.apache.org/r1622783
Log:
HIVE-7976: Merge tez branch into trunk (tez 0.5.0) (Gopal V via Gunther 
Hagleitner)

Added:
    hive/trunk/data/files/agg_01-p1.txt
      - copied unchanged from r1622766, 
hive/branches/tez/data/files/agg_01-p1.txt
    hive/trunk/data/files/agg_01-p2.txt
      - copied unchanged from r1622766, 
hive/branches/tez/data/files/agg_01-p2.txt
    hive/trunk/data/files/agg_01-p3.txt
      - copied unchanged from r1622766, 
hive/branches/tez/data/files/agg_01-p3.txt
    hive/trunk/data/files/dim_shops.txt
      - copied unchanged from r1622766, 
hive/branches/tez/data/files/dim_shops.txt
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java
      - copied unchanged from r1622766, 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
      - copied unchanged from r1622766, 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
      - copied unchanged from r1622766, 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RemoveDynamicPruningBySize.java
      - copied unchanged from r1622766, 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RemoveDynamicPruningBySize.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/AppMasterEventProcessor.java
      - copied unchanged from r1622766, 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/AppMasterEventProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/UnionProcessor.java
      - copied unchanged from r1622766, 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/UnionProcessor.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/AppMasterEventDesc.java
      - copied unchanged from r1622766, 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/AppMasterEventDesc.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPruningEventDesc.java
      - copied unchanged from r1622766, 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPruningEventDesc.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDynamicListDesc.java
      - copied unchanged from r1622766, 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDynamicListDesc.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java
      - copied unchanged from r1622766, 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java
    hive/trunk/ql/src/test/queries/clientpositive/dynamic_partition_pruning.q
      - copied unchanged from r1622766, 
hive/branches/tez/ql/src/test/queries/clientpositive/dynamic_partition_pruning.q
    hive/trunk/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q
      - copied unchanged from r1622766, 
hive/branches/tez/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q
    hive/trunk/ql/src/test/queries/clientpositive/tez_bmj_schema_evolution.q
      - copied unchanged from r1622766, 
hive/branches/tez/ql/src/test/queries/clientpositive/tez_bmj_schema_evolution.q
    
hive/trunk/ql/src/test/results/clientpositive/tez/dynamic_partition_pruning.q.out
      - copied unchanged from r1622766, 
hive/branches/tez/ql/src/test/results/clientpositive/tez/dynamic_partition_pruning.q.out
    
hive/trunk/ql/src/test/results/clientpositive/tez/dynamic_partition_pruning_2.q.out
      - copied unchanged from r1622766, 
hive/branches/tez/ql/src/test/results/clientpositive/tez/dynamic_partition_pruning_2.q.out
    
hive/trunk/ql/src/test/results/clientpositive/tez/tez_bmj_schema_evolution.q.out
      - copied unchanged from r1622766, 
hive/branches/tez/ql/src/test/results/clientpositive/tez/tez_bmj_schema_evolution.q.out
Modified:
    hive/trunk/   (props changed)
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/hbase-handler/pom.xml   (props changed)
    hive/trunk/itests/src/test/resources/testconfiguration.properties
    
hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
    hive/trunk/pom.xml
    hive/trunk/ql/if/queryplan.thrift
    hive/trunk/ql/pom.xml
    hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
    hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.h
    
hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
    hive/trunk/ql/src/gen/thrift/gen-php/Types.php
    hive/trunk/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
    hive/trunk/ql/src/gen/thrift/gen-rb/queryplan_types.rb
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerExpressionOperatorFactory.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
    hive/trunk/ql/src/test/results/clientpositive/optimize_nullscan.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/bucket2.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/bucket3.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/bucket4.q.out
    
hive/trunk/ql/src/test/results/clientpositive/tez/cross_product_check_2.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/metadataonly1.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/optimize_nullscan.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/select_dummy_source.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/temp_table.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/vector_string_concat.q.out
    
hive/trunk/ql/src/test/results/clientpositive/tez/vectorization_short_regress.q.out
    
hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java

Propchange: hive/trunk/
------------------------------------------------------------------------------
  Merged /hive/branches/tez:r1573250-1622766

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
(original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Fri 
Sep  5 20:16:08 2014
@@ -374,7 +374,6 @@ public class HiveConf extends Configurat
     METASTORECONNECTURLKEY("javax.jdo.option.ConnectionURL",
         "jdbc:derby:;databaseName=metastore_db;create=true",
         "JDBC connect string for a JDBC metastore"),
-
     HMSHANDLERATTEMPTS("hive.hmshandler.retry.attempts", 1,
         "The number of times to retry a HMSHandler call if there were a 
connection error."),
     HMSHANDLERINTERVAL("hive.hmshandler.retry.interval", "1000ms",
@@ -1782,7 +1781,15 @@ public class HiveConf extends Configurat
         "When auto reducer parallelism is enabled this factor will be used to 
over-partition data in shuffle edges."),
     TEZ_MIN_PARTITION_FACTOR("hive.tez.min.partition.factor", 0.25f,
         "When auto reducer parallelism is enabled this factor will be used to 
put a lower limit to the number\n" +
-        "of reducers that tez specifies.")
+        "of reducers that tez specifies."),
+    TEZ_DYNAMIC_PARTITION_PRUNING(
+        "hive.tez.dynamic.partition.pruning", true,
+        "When dynamic pruning is enabled, joins on partition keys will be 
processed by sending events from the processing " +
+        "vertices to the tez application master. These events will be used to 
prune unnecessary partitions."),
+    
TEZ_DYNAMIC_PARTITION_PRUNING_MAX_EVENT_SIZE("hive.tez.dynamic.partition.pruning.max.event.size",
 1*1024*1024L,
+        "Maximum size of events sent by processors in dynamic pruning. If this 
size is crossed no pruning will take place."),
+    
TEZ_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE("hive.tez.dynamic.parition.pruning.max.data.size",
 100*1024*1024L,
+        "Maximum total data size of events in dynamic pruning.")
     ;
 
     public final String varname;

Propchange: hive/trunk/hbase-handler/pom.xml
------------------------------------------------------------------------------
  Merged /hive/branches/tez/hbase-handler/pom.xml:r1573250-1622766
  Merged /hive/trunk/hbase-handler/pom.xml:r1494760-1537575

Modified: hive/trunk/itests/src/test/resources/testconfiguration.properties
URL: 
http://svn.apache.org/viewvc/hive/trunk/itests/src/test/resources/testconfiguration.properties?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/itests/src/test/resources/testconfiguration.properties (original)
+++ hive/trunk/itests/src/test/resources/testconfiguration.properties Fri Sep  
5 20:16:08 2014
@@ -140,8 +140,11 @@ minitez.query.files.shared=alter_merge_2
 
 minitez.query.files=bucket_map_join_tez1.q,\
   bucket_map_join_tez2.q,\
+  dynamic_partition_pruning.q,\
+  dynamic_partition_pruning_2.q,\
   mapjoin_decimal.q,\
   mrr.q,\
+  tez_bmj_schema_evolution.q,\
   tez_dml.q,\
   tez_fsstat.q,\
   tez_insert_overwrite_local_directory_1.q,\

Modified: 
hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- 
hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java 
(original)
+++ 
hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java 
Fri Sep  5 20:16:08 2014
@@ -43,7 +43,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Deque;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -64,10 +63,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
 import org.apache.hadoop.hive.cli.CliDriver;
 import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.common.io.CachingPrintStream;
 import org.apache.hadoop.hive.common.io.DigestPrintStream;
 import org.apache.hadoop.hive.common.io.SortAndDigestPrintStream;
 import org.apache.hadoop.hive.common.io.SortPrintStream;
-import org.apache.hadoop.hive.common.io.CachingPrintStream;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
@@ -75,8 +74,6 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.vector.util.AllVectorTypesRecord;
-import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
 import org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.Table;
@@ -87,22 +84,14 @@ import org.apache.hadoop.hive.ql.parse.P
 import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer;
-import org.apache.hadoop.hive.serde2.thrift.test.Complex;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.util.Shell;
 import org.apache.hive.common.util.StreamPrinter;
-import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.tools.ant.BuildException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
-import org.junit.Assume;
 
 import com.google.common.collect.ImmutableList;
 
@@ -145,8 +134,8 @@ public class QTestUtil {
   private QTestSetup setup = null;
   private boolean isSessionStateStarted = false;
 
-  private String initScript;
-  private String cleanupScript;
+  private final String initScript;
+  private final String cleanupScript;
 
   static {
     for (String srcTable : System.getProperty("test.src.tables", 
"").trim().split(",")) {
@@ -332,14 +321,6 @@ public class QTestUtil {
     HadoopShims shims = ShimLoader.getHadoopShims();
     int numberOfDataNodes = 4;
 
-    // can run tez tests only on hadoop 2
-    if (clusterType == MiniClusterType.tez) {
-      Assume.assumeTrue(ShimLoader.getMajorVersion().equals("0.23"));
-      // this is necessary temporarily - there's a probem with multi datanodes 
on MiniTezCluster
-      // will be fixed in 0.3
-      numberOfDataNodes = 1;
-    }
-
     if (clusterType != MiniClusterType.none) {
       dfs = shims.getMiniDfs(conf, numberOfDataNodes, true, null);
       FileSystem fs = dfs.getFileSystem();

Modified: hive/trunk/pom.xml
URL: 
http://svn.apache.org/viewvc/hive/trunk/pom.xml?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/pom.xml (original)
+++ hive/trunk/pom.xml Fri Sep  5 20:16:08 2014
@@ -150,8 +150,8 @@
     <stax.version>1.0.1</stax.version>
     <slf4j.version>1.7.5</slf4j.version>
     <ST4.version>4.0.4</ST4.version>
+    <tez.version>0.5.0</tez.version>
     <super-csv.version>2.2.0</super-csv.version>
-    <tez.version>0.4.1-incubating</tez.version>
     <tempus-fugit.version>1.1</tempus-fugit.version>
     <snappy.version>0.2</snappy.version>
     <wadl-resourcedoc-doclet.version>1.4</wadl-resourcedoc-doclet.version>

Modified: hive/trunk/ql/if/queryplan.thrift
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/if/queryplan.thrift?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/if/queryplan.thrift (original)
+++ hive/trunk/ql/if/queryplan.thrift Fri Sep  5 20:16:08 2014
@@ -56,6 +56,7 @@ enum OperatorType {
   PTF,
   MUX,
   DEMUX,
+  EVENT,
 }
 
 struct Operator {

Modified: hive/trunk/ql/pom.xml
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/pom.xml?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/pom.xml (original)
+++ hive/trunk/ql/pom.xml Fri Sep  5 20:16:08 2014
@@ -297,6 +297,38 @@
     </dependency>
     <dependency>
       <groupId>org.apache.tez</groupId>
+      <artifactId>tez-runtime-internals</artifactId>
+      <version>${tez.version}</version>
+      <optional>true</optional>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-mapreduce-client-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-mapreduce-client-common</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-hdfs</artifactId>
+        </exclusion>
+       <exclusion>
+         <groupId>org.apache.hadoop</groupId>
+         <artifactId>hadoop-yarn-client</artifactId>
+       </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
       <artifactId>tez-mapreduce</artifactId>
       <version>${tez.version}</version>
       <optional>true</optional>

Modified: hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp (original)
+++ hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp Fri Sep  5 
20:16:08 2014
@@ -51,7 +51,8 @@ int _kOperatorTypeValues[] = {
   OperatorType::HASHTABLEDUMMY,
   OperatorType::PTF,
   OperatorType::MUX,
-  OperatorType::DEMUX
+  OperatorType::DEMUX,
+  OperatorType::EVENT
 };
 const char* _kOperatorTypeNames[] = {
   "JOIN",
@@ -74,9 +75,10 @@ const char* _kOperatorTypeNames[] = {
   "HASHTABLEDUMMY",
   "PTF",
   "MUX",
-  "DEMUX"
+  "DEMUX",
+  "EVENT"
 };
-const std::map<int, const char*> 
_OperatorType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(21, 
_kOperatorTypeValues, _kOperatorTypeNames), ::apache::thrift::TEnumIterator(-1, 
NULL, NULL));
+const std::map<int, const char*> 
_OperatorType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(22, 
_kOperatorTypeValues, _kOperatorTypeNames), ::apache::thrift::TEnumIterator(-1, 
NULL, NULL));
 
 int _kTaskTypeValues[] = {
   TaskType::MAP,

Modified: hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.h
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.h?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.h (original)
+++ hive/trunk/ql/src/gen/thrift/gen-cpp/queryplan_types.h Fri Sep  5 20:16:08 
2014
@@ -56,7 +56,8 @@ struct OperatorType {
     HASHTABLEDUMMY = 17,
     PTF = 18,
     MUX = 19,
-    DEMUX = 20
+    DEMUX = 20,
+    EVENT = 21
   };
 };
 

Modified: 
hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- 
hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
 (original)
+++ 
hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
 Fri Sep  5 20:16:08 2014
@@ -32,7 +32,8 @@ public enum OperatorType implements org.
   HASHTABLEDUMMY(17),
   PTF(18),
   MUX(19),
-  DEMUX(20);
+  DEMUX(20),
+  EVENT(21);
 
   private final int value;
 
@@ -95,6 +96,8 @@ public enum OperatorType implements org.
         return MUX;
       case 20:
         return DEMUX;
+      case 21:
+        return EVENT;
       default:
         return null;
     }

Modified: hive/trunk/ql/src/gen/thrift/gen-php/Types.php
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-php/Types.php?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-php/Types.php (original)
+++ hive/trunk/ql/src/gen/thrift/gen-php/Types.php Fri Sep  5 20:16:08 2014
@@ -56,6 +56,7 @@ final class OperatorType {
   const PTF = 18;
   const MUX = 19;
   const DEMUX = 20;
+  const EVENT = 21;
   static public $__names = array(
     0 => 'JOIN',
     1 => 'MAPJOIN',
@@ -78,6 +79,7 @@ final class OperatorType {
     18 => 'PTF',
     19 => 'MUX',
     20 => 'DEMUX',
+    21 => 'EVENT',
   );
 }
 

Modified: hive/trunk/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-py/queryplan/ttypes.py?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-py/queryplan/ttypes.py (original)
+++ hive/trunk/ql/src/gen/thrift/gen-py/queryplan/ttypes.py Fri Sep  5 20:16:08 
2014
@@ -66,6 +66,7 @@ class OperatorType:
   PTF = 18
   MUX = 19
   DEMUX = 20
+  EVENT = 21
 
   _VALUES_TO_NAMES = {
     0: "JOIN",
@@ -89,6 +90,7 @@ class OperatorType:
     18: "PTF",
     19: "MUX",
     20: "DEMUX",
+    21: "EVENT",
   }
 
   _NAMES_TO_VALUES = {
@@ -113,6 +115,7 @@ class OperatorType:
     "PTF": 18,
     "MUX": 19,
     "DEMUX": 20,
+    "EVENT": 21,
   }
 
 class TaskType:

Modified: hive/trunk/ql/src/gen/thrift/gen-rb/queryplan_types.rb
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-rb/queryplan_types.rb?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-rb/queryplan_types.rb (original)
+++ hive/trunk/ql/src/gen/thrift/gen-rb/queryplan_types.rb Fri Sep  5 20:16:08 
2014
@@ -42,8 +42,9 @@ module OperatorType
   PTF = 18
   MUX = 19
   DEMUX = 20
-  VALUE_MAP = {0 => "JOIN", 1 => "MAPJOIN", 2 => "EXTRACT", 3 => "FILTER", 4 
=> "FORWARD", 5 => "GROUPBY", 6 => "LIMIT", 7 => "SCRIPT", 8 => "SELECT", 9 => 
"TABLESCAN", 10 => "FILESINK", 11 => "REDUCESINK", 12 => "UNION", 13 => "UDTF", 
14 => "LATERALVIEWJOIN", 15 => "LATERALVIEWFORWARD", 16 => "HASHTABLESINK", 17 
=> "HASHTABLEDUMMY", 18 => "PTF", 19 => "MUX", 20 => "DEMUX"}
-  VALID_VALUES = Set.new([JOIN, MAPJOIN, EXTRACT, FILTER, FORWARD, GROUPBY, 
LIMIT, SCRIPT, SELECT, TABLESCAN, FILESINK, REDUCESINK, UNION, UDTF, 
LATERALVIEWJOIN, LATERALVIEWFORWARD, HASHTABLESINK, HASHTABLEDUMMY, PTF, MUX, 
DEMUX]).freeze
+  EVENT = 21
+  VALUE_MAP = {0 => "JOIN", 1 => "MAPJOIN", 2 => "EXTRACT", 3 => "FILTER", 4 
=> "FORWARD", 5 => "GROUPBY", 6 => "LIMIT", 7 => "SCRIPT", 8 => "SELECT", 9 => 
"TABLESCAN", 10 => "FILESINK", 11 => "REDUCESINK", 12 => "UNION", 13 => "UDTF", 
14 => "LATERALVIEWJOIN", 15 => "LATERALVIEWFORWARD", 16 => "HASHTABLESINK", 17 
=> "HASHTABLEDUMMY", 18 => "PTF", 19 => "MUX", 20 => "DEMUX", 21 => "EVENT"}
+  VALID_VALUES = Set.new([JOIN, MAPJOIN, EXTRACT, FILTER, FORWARD, GROUPBY, 
LIMIT, SCRIPT, SELECT, TABLESCAN, FILESINK, REDUCESINK, UNION, UDTF, 
LATERALVIEWJOIN, LATERALVIEWFORWARD, HASHTABLESINK, HASHTABLEDUMMY, PTF, MUX, 
DEMUX, EVENT]).freeze
 end
 
 module TaskType

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java 
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java 
Fri Sep  5 20:16:08 2014
@@ -104,6 +104,8 @@ public class MapJoinOperator extends Abs
     cache = ObjectCacheFactory.getCache(hconf);
     loader = HashTableLoaderFactory.getLoader(hconf);
 
+    hashMapRowGetters = null;
+
     mapJoinTables = (MapJoinTableContainer[]) cache.retrieve(tableKey);
     mapJoinTableSerdes = (MapJoinTableContainerSerDe[]) 
cache.retrieve(serdeKey);
     hashTblInitedOnce = true;
@@ -186,7 +188,7 @@ public class MapJoinOperator extends Abs
        * process different buckets and if the container is reused to join a 
different bucket,
        * join results can be incorrect. The cache is keyed on operator id and 
for bucket map join
        * the operator does not change but data needed is different. For a 
proper fix, this
-       * requires changes in the Tez API with regard to finding bucket id and 
+       * requires changes in the Tez API with regard to finding bucket id and
        * also ability to schedule tasks to re-use containers that have cached 
the specific bucket.
        */
       LOG.info("This is not bucket map join, so cache");
@@ -237,7 +239,7 @@ public class MapJoinOperator extends Abs
         firstRow = false;
       }
 
-      alias = (byte)tag;
+      alias = (byte) tag;
       if (hashMapRowGetters == null) {
         hashMapRowGetters = new ReusableGetAdaptor[mapJoinTables.length];
         MapJoinKey refKey = getRefKey(alias);

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java 
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java 
Fri Sep  5 20:16:08 2014
@@ -29,13 +29,15 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.exec.vector.VectorLimitOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorSMBMapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc;
 import org.apache.hadoop.hive.ql.plan.CollectDesc;
 import org.apache.hadoop.hive.ql.plan.DemuxDesc;
 import org.apache.hadoop.hive.ql.plan.DummyStoreDesc;
+import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExtractDesc;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
@@ -64,6 +66,7 @@ import org.apache.hadoop.hive.ql.plan.Un
  * OperatorFactory.
  *
  */
+@SuppressWarnings({ "rawtypes", "unchecked" })
 public final class OperatorFactory {
   private static final List<OpTuple> opvec;
   private static final List<OpTuple> vectorOpvec;
@@ -101,6 +104,10 @@ public final class OperatorFactory {
         DemuxOperator.class));
     opvec.add(new OpTuple<MuxDesc>(MuxDesc.class,
         MuxOperator.class));
+    opvec.add(new OpTuple<AppMasterEventDesc>(AppMasterEventDesc.class,
+        AppMasterEventOperator.class));
+    opvec.add(new 
OpTuple<DynamicPruningEventDesc>(DynamicPruningEventDesc.class,
+        AppMasterEventOperator.class));
   }
 
   static {
@@ -119,9 +126,9 @@ public final class OperatorFactory {
 
   private static final class OpTuple<T extends OperatorDesc> {
     private final Class<T> descClass;
-    private final Class<? extends Operator<T>> opClass;
+    private final Class<? extends Operator<?>> opClass;
 
-    public OpTuple(Class<T> descClass, Class<? extends Operator<T>> opClass) {
+    public OpTuple(Class<T> descClass, Class<? extends Operator<?>> opClass) {
       this.descClass = descClass;
       this.opClass = opClass;
     }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java 
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Fri 
Sep  5 20:16:08 2014
@@ -820,10 +820,12 @@ public final class Utilities {
     }
   }
 
-  public static Set<Operator<?>> cloneOperatorTree(Configuration conf, 
Set<Operator<?>> roots) {
+  public static List<Operator<?>> cloneOperatorTree(Configuration conf, 
List<Operator<?>> roots) {
     ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
     serializePlan(roots, baos, conf, true);
-    Set<Operator<?>> result = deserializePlan(new 
ByteArrayInputStream(baos.toByteArray()),
+    @SuppressWarnings("unchecked")
+    List<Operator<?>> result =
+        deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
         roots.getClass(), conf, true);
     return result;
   }

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java 
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java 
Fri Sep  5 20:16:08 2014
@@ -130,7 +130,7 @@ public class MapRedTask extends ExecDriv
 
       runningViaChild = conf.getBoolVar(HiveConf.ConfVars.SUBMITVIACHILD);
 
-      if(!runningViaChild) {
+      if (!runningViaChild) {
         // we are not running this mapred task via child jvm
         // so directly invoke ExecDriver
         return super.execute(driverContext);

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
 (original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
 Fri Sep  5 20:16:08 2014
@@ -14,10 +14,10 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch;
 import 
org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.SerDe;
-import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.ByteStream.Output;
 import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.WriteBuffers;
 import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
 import org.apache.hadoop.hive.serde2.io.ShortWritable;
@@ -57,7 +57,7 @@ public class MapJoinBytesTableContainer 
   private boolean[] sortableSortOrders;
   private KeyValueHelper writeHelper;
 
-  private List<Object> EMPTY_LIST = new ArrayList<Object>(0);
+  private final List<Object> EMPTY_LIST = new ArrayList<Object>(0);
 
   public MapJoinBytesTableContainer(Configuration hconf,
       MapJoinObjectSerDeContext valCtx, long keyCount) throws SerDeException {
@@ -476,6 +476,7 @@ public class MapJoinBytesTableContainer 
       return valueStruct.getFieldsAsList(); // TODO: should we unset bytes 
after that?
     }
 
+    @Override
     public void addRow(List<Object> t) {
       if (dummyRow != null || !refs.isEmpty()) {
         throw new RuntimeException("Cannot add rows when not empty");
@@ -484,9 +485,11 @@ public class MapJoinBytesTableContainer 
     }
 
     // Various unsupported methods.
+    @Override
     public void addRow(Object[] value) {
       throw new RuntimeException(this.getClass().getCanonicalName() + " cannot 
add arrays");
     }
+    @Override
     public void write(MapJoinObjectSerDeContext valueContext, 
ObjectOutputStream out) {
       throw new RuntimeException(this.getClass().getCanonicalName() + " cannot 
be serialized");
     }

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
 (original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
 Fri Sep  5 20:16:08 2014
@@ -42,7 +42,6 @@ import org.apache.hadoop.hive.serde2.obj
 import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-
 import org.apache.hadoop.io.Writable;
 
 /**

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
 (original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
 Fri Sep  5 20:16:08 2014
@@ -19,61 +19,62 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.List;
-import java.util.ArrayList;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.tez.dag.api.EdgeManager;
-import org.apache.tez.dag.api.EdgeManagerContext;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.tez.dag.api.EdgeManagerPlugin;
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 
-import com.google.common.collect.Multimap;
-
-public class CustomPartitionEdge implements EdgeManager {
+public class CustomPartitionEdge extends EdgeManagerPlugin {
 
   private static final Log LOG = 
LogFactory.getLog(CustomPartitionEdge.class.getName());
 
   CustomEdgeConfiguration conf = null;
+  final EdgeManagerPluginContext context;
 
   // used by the framework at runtime. initialize is the real initializer at 
runtime
-  public CustomPartitionEdge() {  
+  public CustomPartitionEdge(EdgeManagerPluginContext context) {
+    super(context);
+    this.context = context;
   }
 
+
   @Override
-  public int getNumDestinationTaskPhysicalInputs(int numSourceTasks, 
-      int destinationTaskIndex) {
-    return numSourceTasks;
+  public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) {
+    return context.getSourceVertexNumTasks();
   }
 
   @Override
-  public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks, 
-      int sourceTaskIndex) {
+  public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) {
     return conf.getNumBuckets();
   }
 
   @Override
-  public int getNumDestinationConsumerTasks(int sourceTaskIndex, int 
numDestinationTasks) {
-    return numDestinationTasks;
+  public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
+    return context.getDestinationVertexNumTasks();
   }
 
   // called at runtime to initialize the custom edge.
   @Override
-  public void initialize(EdgeManagerContext context) {
-    byte[] payload = context.getUserPayload();
+  public void initialize() {
+    ByteBuffer payload = context.getUserPayload().getPayload();
     LOG.info("Initializing the edge, payload: " + payload);
     if (payload == null) {
       throw new RuntimeException("Invalid payload");
     }
     // De-serialization code
-    DataInputBuffer dib = new DataInputBuffer();
-    dib.reset(payload, payload.length);
+    DataInputByteBuffer dibb = new DataInputByteBuffer();
+    dibb.reset(payload);
     conf = new CustomEdgeConfiguration();
     try {
-      conf.readFields(dib);
+      conf.readFields(dibb);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -83,30 +84,25 @@ public class CustomPartitionEdge impleme
 
   @Override
   public void routeDataMovementEventToDestination(DataMovementEvent event,
-      int sourceTaskIndex, int numDestinationTasks, Map<Integer, 
List<Integer>> mapDestTaskIndices) {
-    int srcIndex = event.getSourceIndex();
-    List<Integer> destTaskIndices = new ArrayList<Integer>();
-    destTaskIndices.addAll(conf.getRoutingTable().get(srcIndex));
-    mapDestTaskIndices.put(new Integer(sourceTaskIndex), destTaskIndices);
+      int sourceTaskIndex, int sourceOutputIndex, Map<Integer, List<Integer>> 
mapDestTaskIndices) {
+    List<Integer> outputIndices = Collections.singletonList(sourceTaskIndex);
+    for (Integer destIndex : conf.getRoutingTable().get(sourceOutputIndex)) {
+      mapDestTaskIndices.put(destIndex, outputIndices);
+    }
   }
 
   @Override
-  public void routeInputSourceTaskFailedEventToDestination(int 
sourceTaskIndex, 
-      int numDestinationTasks, Map<Integer, List<Integer>> mapDestTaskIndices) 
{
-    List<Integer> destTaskIndices = new ArrayList<Integer>();
-    addAllDestinationTaskIndices(numDestinationTasks, destTaskIndices);
-    mapDestTaskIndices.put(new Integer(sourceTaskIndex), destTaskIndices);
+  public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
+      Map<Integer, List<Integer>> mapDestTaskIndices) {
+    List<Integer> outputIndices = Collections.singletonList(sourceTaskIndex);
+    for (int i = 0; i < context.getDestinationVertexNumTasks(); i++) {
+      mapDestTaskIndices.put(i, outputIndices);
+    }
   }
 
   @Override
-  public int routeInputErrorEventToSource(InputReadErrorEvent event, 
-      int destinationTaskIndex) {
+  public int routeInputErrorEventToSource(InputReadErrorEvent event,
+      int destinationTaskIndex, int destinationFailedInputIndex) {
     return event.getIndex();
   }
-
-  void addAllDestinationTaskIndices(int numDestinationTasks, List<Integer> 
taskIndices) {
-    for(int i=0; i<numDestinationTasks; ++i) {
-      taskIndices.add(new Integer(i));
-    }
-  }
 }

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
 (original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
 Fri Sep  5 20:16:08 2014
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -30,27 +31,30 @@ import java.util.TreeMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat;
-import org.apache.tez.dag.api.EdgeManagerDescriptor;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
 import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
-import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
-import org.apache.tez.runtime.api.events.RootInputUpdatePayloadEvent;
+import org.apache.tez.runtime.api.InputSpecUpdate;
+import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputUpdatePayloadEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 
 import com.google.common.base.Preconditions;
@@ -59,40 +63,44 @@ import com.google.common.collect.HashMul
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
+import com.google.protobuf.ByteString;
 
 /*
  * Only works with old mapred API
  * Will only work with a single MRInput for now.
  */
-public class CustomPartitionVertex implements VertexManagerPlugin {
+public class CustomPartitionVertex extends VertexManagerPlugin {
 
   private static final Log LOG = 
LogFactory.getLog(CustomPartitionVertex.class.getName());
 
   VertexManagerPluginContext context;
 
-  private RootInputConfigureVertexTasksEvent configureVertexTaskEvent;
-  private List<RootInputDataInformationEvent> dataInformationEvents;
+  private InputConfigureVertexTasksEvent configureVertexTaskEvent;
+  private List<InputDataInformationEvent> dataInformationEvents;
   private int numBuckets = -1;
   private Configuration conf = null;
   private boolean rootVertexInitialized = false;
   private final SplitGrouper grouper = new SplitGrouper();
+  private int taskCount = 0;
 
-  public CustomPartitionVertex() {
+  public CustomPartitionVertex(VertexManagerPluginContext context) {
+    super(context);
   }
 
   @Override
-  public void initialize(VertexManagerPluginContext context) {
-    this.context = context;
-    ByteBuffer byteBuf = ByteBuffer.wrap(context.getUserPayload());
+  public void initialize() {
+    this.context = getContext();
+    ByteBuffer byteBuf = context.getUserPayload().getPayload();
     this.numBuckets = byteBuf.getInt();
   }
 
   @Override
   public void onVertexStarted(Map<String, List<Integer>> completions) {
     int numTasks = context.getVertexNumTasks(context.getVertexName());
-    List<Integer> scheduledTasks = new ArrayList<Integer>(numTasks);
+    List<VertexManagerPluginContext.TaskWithLocationHint> scheduledTasks = 
+      new ArrayList<VertexManagerPluginContext.TaskWithLocationHint>(numTasks);
     for (int i = 0; i < numTasks; ++i) {
-      scheduledTasks.add(new Integer(i));
+      scheduledTasks.add(new 
VertexManagerPluginContext.TaskWithLocationHint(new Integer(i), null));
     }
     context.scheduleVertexTasks(scheduledTasks);
   }
@@ -114,6 +122,7 @@ public class CustomPartitionVertex imple
     // ensure this method is called only once. Tez will call it once per Root
     // Input.
     Preconditions.checkState(rootVertexInitialized == false);
+    LOG.info("Root vertex not initialized");
     rootVertexInitialized = true;
     try {
       // This is using the payload from the RootVertexInitializer corresponding
@@ -121,8 +130,8 @@ public class CustomPartitionVertex imple
       // but that
       // means serializing another instance.
       MRInputUserPayloadProto protoPayload =
-          MRHelpers.parseMRInputPayload(inputDescriptor.getUserPayload());
-      this.conf = 
MRHelpers.createConfFromByteString(protoPayload.getConfigurationBytes());
+          MRInputHelpers.parseMRInputPayload(inputDescriptor.getUserPayload());
+      this.conf = 
TezUtils.createConfFromByteString(protoPayload.getConfigurationBytes());
 
       /*
        * Currently in tez, the flow of events is thus:
@@ -138,30 +147,27 @@ public class CustomPartitionVertex imple
        */
 
       // This assumes that Grouping will always be used.
-      // Changing the InputFormat - so that the correct one is initialized in
-      // MRInput.
-      this.conf.set("mapred.input.format.class", 
TezGroupedSplitsInputFormat.class.getName());
+      // Enabling grouping on the payload.
       MRInputUserPayloadProto updatedPayload =
-          MRInputUserPayloadProto.newBuilder(protoPayload)
-              
.setConfigurationBytes(MRHelpers.createByteStringFromConf(conf)).build();
-      inputDescriptor.setUserPayload(updatedPayload.toByteArray());
+          
MRInputUserPayloadProto.newBuilder(protoPayload).setGroupingEnabled(true).build();
+      
inputDescriptor.setUserPayload(UserPayload.create(updatedPayload.toByteString().asReadOnlyByteBuffer()));
     } catch (IOException e) {
       e.printStackTrace();
       throw new RuntimeException(e);
     }
 
     boolean dataInformationEventSeen = false;
-    Map<Path, List<FileSplit>> pathFileSplitsMap = new TreeMap<Path, 
List<FileSplit>>();
+    Map<String, List<FileSplit>> pathFileSplitsMap = new TreeMap<String, 
List<FileSplit>>();
 
     for (Event event : events) {
-      if (event instanceof RootInputConfigureVertexTasksEvent) {
+      if (event instanceof InputConfigureVertexTasksEvent) {
         // No tasks should have been started yet. Checked by initial state
         // check.
         Preconditions.checkState(dataInformationEventSeen == false);
         Preconditions
             .checkState(context.getVertexNumTasks(context.getVertexName()) == 
-1,
                 "Parallelism for the vertex should be set to -1 if the 
InputInitializer is setting parallelism");
-        RootInputConfigureVertexTasksEvent cEvent = 
(RootInputConfigureVertexTasksEvent) event;
+        InputConfigureVertexTasksEvent cEvent = 
(InputConfigureVertexTasksEvent) event;
 
         // The vertex cannot be configured until all DataEvents are seen - to
         // build the routing table.
@@ -169,12 +175,12 @@ public class CustomPartitionVertex imple
         dataInformationEvents =
             
Lists.newArrayListWithCapacity(configureVertexTaskEvent.getNumTasks());
       }
-      if (event instanceof RootInputUpdatePayloadEvent) {
+      if (event instanceof InputUpdatePayloadEvent) {
         // this event can never occur. If it does, fail.
         Preconditions.checkState(false);
-      } else if (event instanceof RootInputDataInformationEvent) {
+      } else if (event instanceof InputDataInformationEvent) {
         dataInformationEventSeen = true;
-        RootInputDataInformationEvent diEvent = 
(RootInputDataInformationEvent) event;
+        InputDataInformationEvent diEvent = (InputDataInformationEvent) event;
         dataInformationEvents.add(diEvent);
         FileSplit fileSplit;
         try {
@@ -182,10 +188,10 @@ public class CustomPartitionVertex imple
         } catch (IOException e) {
           throw new RuntimeException("Failed to get file split for event: " + 
diEvent);
         }
-        List<FileSplit> fsList = pathFileSplitsMap.get(fileSplit.getPath());
+        List<FileSplit> fsList = 
pathFileSplitsMap.get(fileSplit.getPath().getName());
         if (fsList == null) {
           fsList = new ArrayList<FileSplit>();
-          pathFileSplitsMap.put(fileSplit.getPath(), fsList);
+          pathFileSplitsMap.put(fileSplit.getPath().getName(), fsList);
         }
         fsList.add(fileSplit);
       }
@@ -195,21 +201,32 @@ public class CustomPartitionVertex imple
         getBucketSplitMapForPath(pathFileSplitsMap);
 
     try {
-      int totalResource = context.getTotalAVailableResource().getMemory();
+      int totalResource = context.getTotalAvailableResource().getMemory();
       int taskResource = context.getVertexTaskResource().getMemory();
       float waves =
-          conf.getFloat(TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES,
-              TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES_DEFAULT);
+          conf.getFloat(TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES,
+              TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT);
 
       int availableSlots = totalResource / taskResource;
 
       LOG.info("Grouping splits. " + availableSlots + " available slots, " + 
waves + " waves.");
+      JobConf jobConf = new JobConf(conf);
+      ShimLoader.getHadoopShims().getMergedCredentials(jobConf);
 
       Multimap<Integer, InputSplit> bucketToGroupedSplitMap =
-          grouper.group(conf, bucketToInitialSplitMap, availableSlots, waves);
+          HashMultimap.<Integer, InputSplit> create();
+      for (Integer key : bucketToInitialSplitMap.keySet()) {
+        InputSplit[] inputSplitArray =
+            (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0]));
+        Multimap<Integer, InputSplit> groupedSplit =
+            HiveSplitGenerator.generateGroupedSplits(jobConf, conf, 
inputSplitArray, waves,
+            availableSlots);
+        bucketToGroupedSplitMap.putAll(key, groupedSplit.values());
+      }
 
+      LOG.info("We have grouped the splits into " + 
bucketToGroupedSplitMap.size() + " tasks");
       processAllEvents(inputName, bucketToGroupedSplitMap);
-    } catch (IOException e) {
+    } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
@@ -219,7 +236,6 @@ public class CustomPartitionVertex imple
 
     Multimap<Integer, Integer> bucketToTaskMap = HashMultimap.<Integer, 
Integer> create();
     List<InputSplit> finalSplits = Lists.newLinkedList();
-    int taskCount = 0;
     for (Entry<Integer, Collection<InputSplit>> entry : 
bucketToGroupedSplitMap.asMap().entrySet()) {
       int bucketNum = entry.getKey();
       Collection<InputSplit> initialSplits = entry.getValue();
@@ -232,12 +248,12 @@ public class CustomPartitionVertex imple
 
     // Construct the EdgeManager descriptor to be used by all edges which need
     // the routing table.
-    EdgeManagerDescriptor hiveEdgeManagerDesc =
-        new EdgeManagerDescriptor(CustomPartitionEdge.class.getName());
-    byte[] payload = getBytePayload(bucketToTaskMap);
+    EdgeManagerPluginDescriptor hiveEdgeManagerDesc =
+        
EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName());
+    UserPayload payload = getBytePayload(bucketToTaskMap);
     hiveEdgeManagerDesc.setUserPayload(payload);
 
-    Map<String, EdgeManagerDescriptor> emMap = Maps.newHashMap();
+    Map<String, EdgeManagerPluginDescriptor> emMap = Maps.newHashMap();
 
     // Replace the edge manager for all vertices which have routing type 
custom.
     for (Entry<String, EdgeProperty> edgeEntry : 
context.getInputVertexEdgeProperties().entrySet()) {
@@ -250,47 +266,51 @@ public class CustomPartitionVertex imple
 
     LOG.info("Task count is " + taskCount);
 
-    List<RootInputDataInformationEvent> taskEvents =
+    List<InputDataInformationEvent> taskEvents =
         Lists.newArrayListWithCapacity(finalSplits.size());
     // Re-serialize the splits after grouping.
     int count = 0;
     for (InputSplit inputSplit : finalSplits) {
-      MRSplitProto serializedSplit = MRHelpers.createSplitProto(inputSplit);
-      RootInputDataInformationEvent diEvent =
-          new RootInputDataInformationEvent(count, 
serializedSplit.toByteArray());
+      MRSplitProto serializedSplit = 
MRInputHelpers.createSplitProto(inputSplit);
+      InputDataInformationEvent diEvent = 
InputDataInformationEvent.createWithSerializedPayload(
+          count, serializedSplit.toByteString().asReadOnlyByteBuffer());
       diEvent.setTargetIndex(count);
       count++;
       taskEvents.add(diEvent);
     }
 
     // Replace the Edge Managers
+    Map<String, InputSpecUpdate> rootInputSpecUpdate =
+      new HashMap<String, InputSpecUpdate>();
+    rootInputSpecUpdate.put(
+        inputName,
+        InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
     context.setVertexParallelism(
         taskCount,
-        new VertexLocationHint(grouper.createTaskLocationHints(finalSplits
-            .toArray(new InputSplit[finalSplits.size()]))), emMap);
+        VertexLocationHint.create(grouper.createTaskLocationHints(finalSplits
+            .toArray(new InputSplit[finalSplits.size()]))), emMap, 
rootInputSpecUpdate);
 
     // Set the actual events for the tasks.
     context.addRootInputEvents(inputName, taskEvents);
   }
 
-  private byte[] getBytePayload(Multimap<Integer, Integer> routingTable) 
throws IOException {
+  UserPayload getBytePayload(Multimap<Integer, Integer> routingTable) throws 
IOException {
     CustomEdgeConfiguration edgeConf =
         new CustomEdgeConfiguration(routingTable.keySet().size(), 
routingTable);
     DataOutputBuffer dob = new DataOutputBuffer();
     edgeConf.write(dob);
     byte[] serialized = dob.getData();
-
-    return serialized;
+    return UserPayload.create(ByteBuffer.wrap(serialized));
   }
 
-  private FileSplit getFileSplitFromEvent(RootInputDataInformationEvent event) 
throws IOException {
+  private FileSplit getFileSplitFromEvent(InputDataInformationEvent event) 
throws IOException {
     InputSplit inputSplit = null;
     if (event.getDeserializedUserPayload() != null) {
       inputSplit = (InputSplit) event.getDeserializedUserPayload();
     } else {
-      MRSplitProto splitProto = MRSplitProto.parseFrom(event.getUserPayload());
+      MRSplitProto splitProto = 
MRSplitProto.parseFrom(ByteString.copyFrom(event.getUserPayload()));
       SerializationFactory serializationFactory = new SerializationFactory(new 
Configuration());
-      inputSplit = MRHelpers.createOldFormatSplitFromUserPayload(splitProto, 
serializationFactory);
+      inputSplit = 
MRInputHelpers.createOldFormatSplitFromUserPayload(splitProto, 
serializationFactory);
     }
 
     if (!(inputSplit instanceof FileSplit)) {
@@ -304,7 +324,7 @@ public class CustomPartitionVertex imple
    * This method generates the map of bucket to file splits.
    */
   private Multimap<Integer, InputSplit> getBucketSplitMapForPath(
-      Map<Path, List<FileSplit>> pathFileSplitsMap) {
+      Map<String, List<FileSplit>> pathFileSplitsMap) {
 
     int bucketNum = 0;
     int fsCount = 0;
@@ -312,7 +332,7 @@ public class CustomPartitionVertex imple
     Multimap<Integer, InputSplit> bucketToInitialSplitMap =
         ArrayListMultimap.<Integer, InputSplit> create();
 
-    for (Map.Entry<Path, List<FileSplit>> entry : 
pathFileSplitsMap.entrySet()) {
+    for (Map.Entry<String, List<FileSplit>> entry : 
pathFileSplitsMap.entrySet()) {
       int bucketId = bucketNum % numBuckets;
       for (FileSplit fsplit : entry.getValue()) {
         fsCount++;

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java 
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Fri 
Sep  5 20:16:08 2014
@@ -71,7 +71,6 @@ import org.apache.hadoop.io.DataOutputBu
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -80,42 +79,43 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
-import org.apache.tez.client.PreWarmContext;
-import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeManagerDescriptor;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
-import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.GroupInputEdge;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.PreWarmVertex;
 import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexGroup;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
-import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.comparator.TezBytesComparator;
+import 
org.apache.tez.runtime.library.common.serializer.TezBytesWritableSerialization;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
+import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
+import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
 import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
-import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
-import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
-import org.apache.tez.runtime.library.output.OnFileSortedOutput;
-import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
-import 
org.apache.tez.runtime.library.output.OnFileUnorderedPartitionedKVOutput;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
 
 /**
  * DagUtils. DagUtils is a collection of helper methods to convert
@@ -163,6 +163,7 @@ public class DagUtils {
     JobConf conf = new JobConf(baseConf);
 
     if (mapWork.getNumMapTasks() != null) {
+      // Is this required ?
       conf.setInt(MRJobConfig.NUM_MAPS, mapWork.getNumMapTasks().intValue());
     }
 
@@ -219,20 +220,19 @@ public class DagUtils {
    * Edge between them.
    *
    * @param group The parent VertexGroup
-   * @param wConf The job conf of the child vertex
+   * @param vConf The job conf of one of the parrent (grouped) vertices
    * @param w The child vertex
    * @param edgeProp the edge property of connection between the two
    * endpoints.
    */
   @SuppressWarnings("rawtypes")
-  public GroupInputEdge createEdge(VertexGroup group, JobConf wConf,
+  public GroupInputEdge createEdge(VertexGroup group, JobConf vConf,
       Vertex w, TezEdgeProperty edgeProp)
     throws IOException {
 
     Class mergeInputClass;
 
-    LOG.info("Creating Edge between " + group.getGroupName() + " and " + 
w.getVertexName());
-    
w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf));
+    LOG.info("Creating Edge between " + group.getGroupName() + " and " + 
w.getName());
 
     EdgeType edgeType = edgeProp.getEdgeType();
     switch (edgeType) {
@@ -243,9 +243,10 @@ public class DagUtils {
       mergeInputClass = ConcatenatedMergedKeyValueInput.class;
       int numBuckets = edgeProp.getNumBuckets();
       VertexManagerPluginDescriptor desc =
-          new 
VertexManagerPluginDescriptor(CustomPartitionVertex.class.getName());
-      byte[] userPayload = ByteBuffer.allocate(4).putInt(numBuckets).array();
-      desc.setUserPayload(userPayload);
+          
VertexManagerPluginDescriptor.create(CustomPartitionVertex.class.getName());
+      ByteBuffer userPayload = ByteBuffer.allocate(4).putInt(numBuckets);
+      userPayload.flip();
+      desc.setUserPayload(UserPayload.create(userPayload));
       w.setVertexManagerPlugin(desc);
       break;
     }
@@ -263,47 +264,31 @@ public class DagUtils {
       break;
     }
 
-    return new GroupInputEdge(group, w, createEdgeProperty(edgeProp),
-        new InputDescriptor(mergeInputClass.getName()));
+    return GroupInputEdge.create(group, w, createEdgeProperty(edgeProp, vConf),
+        InputDescriptor.create(mergeInputClass.getName()));
   }
 
   /**
-   * Given two vertices a, b update their configurations to be used in an Edge 
a-b
-   */
-  public void updateConfigurationForEdge(JobConf vConf, Vertex v, JobConf 
wConf, Vertex w)
-    throws IOException {
-
-    // Tez needs to setup output subsequent input pairs correctly
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(wConf, vConf);
-
-    // update payloads (configuration for the vertices might have changed)
-    
v.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(vConf));
-    
w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf));
-  }
-
-  /**
-   * Given two vertices and their respective configuration objects createEdge
+   * Given two vertices and the configuration for the source vertex, createEdge
    * will create an Edge object that connects the two.
    *
-   * @param vConf JobConf of the first vertex
+   * @param vConf JobConf of the first (source) vertex
    * @param v The first vertex (source)
-   * @param wConf JobConf of the second vertex
    * @param w The second vertex (sink)
    * @return
    */
-  public Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w,
+  public Edge createEdge(JobConf vConf, Vertex v, Vertex w,
       TezEdgeProperty edgeProp)
     throws IOException {
 
-    updateConfigurationForEdge(vConf, v, wConf, w);
-
     switch(edgeProp.getEdgeType()) {
     case CUSTOM_EDGE: {
       int numBuckets = edgeProp.getNumBuckets();
-      byte[] userPayload = ByteBuffer.allocate(4).putInt(numBuckets).array();
-      VertexManagerPluginDescriptor desc = new VertexManagerPluginDescriptor(
+      ByteBuffer userPayload = ByteBuffer.allocate(4).putInt(numBuckets);
+      userPayload.flip();
+      VertexManagerPluginDescriptor desc = 
VertexManagerPluginDescriptor.create(
           CustomPartitionVertex.class.getName());
-      desc.setUserPayload(userPayload);
+      desc.setUserPayload(UserPayload.create(userPayload));
       w.setVertexManagerPlugin(desc);
       break;
     }
@@ -315,71 +300,92 @@ public class DagUtils {
       // nothing
     }
 
-    return new Edge(v, w, createEdgeProperty(edgeProp));
+    return Edge.create(v, w, createEdgeProperty(edgeProp, vConf));
   }
 
   /*
    * Helper function to create an edge property from an edge type.
    */
   @SuppressWarnings("rawtypes")
-  private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp) throws 
IOException {
-    DataMovementType dataMovementType;
-    Class logicalInputClass;
-    Class logicalOutputClass;
+  private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, 
Configuration conf)
+      throws IOException {
+    MRHelpers.translateMRConfToTez(conf);
+    String keyClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS);
+    String valClass = 
conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS);
+    String partitionerClassName = conf.get("mapred.partitioner.class");
+    Map<String, String> partitionerConf;
 
-    EdgeProperty edgeProperty = null;
     EdgeType edgeType = edgeProp.getEdgeType();
     switch (edgeType) {
     case BROADCAST_EDGE:
-      dataMovementType = DataMovementType.BROADCAST;
-      logicalOutputClass = OnFileUnorderedKVOutput.class;
-      logicalInputClass = ShuffledUnorderedKVInput.class;
-      break;
-
+      UnorderedKVEdgeConfig et1Conf = UnorderedKVEdgeConfig
+          .newBuilder(keyClass, valClass)
+          .setFromConfiguration(conf)
+          
.setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null)
+          
.setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null)
+          .build();
+      return et1Conf.createDefaultBroadcastEdgeProperty();
     case CUSTOM_EDGE:
-      dataMovementType = DataMovementType.CUSTOM;
-      logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class;
-      logicalInputClass = ShuffledUnorderedKVInput.class;
-      EdgeManagerDescriptor edgeDesc =
-          new EdgeManagerDescriptor(CustomPartitionEdge.class.getName());
+      assert partitionerClassName != null;
+      partitionerConf = createPartitionerConf(partitionerClassName, conf);
+      UnorderedPartitionedKVEdgeConfig et2Conf = 
UnorderedPartitionedKVEdgeConfig
+          .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), 
partitionerConf)
+          .setFromConfiguration(conf)
+          
.setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null)
+          
.setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null)
+          .build();
+      EdgeManagerPluginDescriptor edgeDesc =
+          
EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName());
       CustomEdgeConfiguration edgeConf =
           new CustomEdgeConfiguration(edgeProp.getNumBuckets(), null);
       DataOutputBuffer dob = new DataOutputBuffer();
       edgeConf.write(dob);
       byte[] userPayload = dob.getData();
-      edgeDesc.setUserPayload(userPayload);
-      edgeProperty =
-          new EdgeProperty(edgeDesc,
-              DataSourceType.PERSISTED,
-              SchedulingType.SEQUENTIAL,
-              new OutputDescriptor(logicalOutputClass.getName()),
-              new InputDescriptor(logicalInputClass.getName()));
-      break;
-
+      
edgeDesc.setUserPayload(UserPayload.create(ByteBuffer.wrap(userPayload)));
+      return et2Conf.createDefaultCustomEdgeProperty(edgeDesc);
     case CUSTOM_SIMPLE_EDGE:
-      dataMovementType = DataMovementType.SCATTER_GATHER;
-      logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class;
-      logicalInputClass = ShuffledUnorderedKVInput.class;
-      break;
-
+      assert partitionerClassName != null;
+      partitionerConf = createPartitionerConf(partitionerClassName, conf);
+      UnorderedPartitionedKVEdgeConfig et3Conf = 
UnorderedPartitionedKVEdgeConfig
+          .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), 
partitionerConf)
+          .setFromConfiguration(conf)
+          
.setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null)
+          
.setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null)
+          .build();
+      return et3Conf.createDefaultEdgeProperty();
     case SIMPLE_EDGE:
     default:
-      dataMovementType = DataMovementType.SCATTER_GATHER;
-      logicalOutputClass = OnFileSortedOutput.class;
-      logicalInputClass = ShuffledMergedInputLegacy.class;
-      break;
+      assert partitionerClassName != null;
+      partitionerConf = createPartitionerConf(partitionerClassName, conf);
+      OrderedPartitionedKVEdgeConfig et4Conf = OrderedPartitionedKVEdgeConfig
+          .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), 
partitionerConf)
+          .setFromConfiguration(conf)
+          
.setKeySerializationClass(TezBytesWritableSerialization.class.getName(),
+              TezBytesComparator.class.getName(), null)
+          
.setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null)
+          .build();
+      return et4Conf.createDefaultEdgeProperty();
     }
+  }
 
-    if (edgeProperty == null) {
-      edgeProperty =
-        new EdgeProperty(dataMovementType,
-            DataSourceType.PERSISTED,
-            SchedulingType.SEQUENTIAL,
-            new OutputDescriptor(logicalOutputClass.getName()),
-            new InputDescriptor(logicalInputClass.getName()));
+  /**
+   * Utility method to create a stripped down configuration for the MR 
partitioner.
+   *
+   * @param partitionerClassName
+   *          the real MR partitioner class name
+   * @param baseConf
+   *          a base configuration to extract relevant properties
+   * @return
+   */
+  private Map<String, String> createPartitionerConf(String 
partitionerClassName,
+      Configuration baseConf) {
+    Map<String, String> partitionerConf = new HashMap<String, String>();
+    partitionerConf.put("mapred.partitioner.class", partitionerClassName);
+    if (baseConf.get("mapreduce.totalorderpartitioner.path") != null) {
+      partitionerConf.put("mapreduce.totalorderpartitioner.path",
+      baseConf.get("mapreduce.totalorderpartitioner.path"));
     }
-
-    return edgeProperty;
+    return partitionerConf;
   }
 
   /*
@@ -397,6 +403,15 @@ public class DagUtils {
   }
 
   /*
+   * Helper to setup default environment for a task in YARN.
+   */
+  private Map<String, String> getContainerEnvironment(Configuration conf, 
boolean isMap) {
+    Map<String, String> environment = new HashMap<String, String>();
+    MRHelpers.updateEnvBasedOnMRTaskEnv(conf, environment, isMap);
+    return environment;
+  }
+
+  /*
    * Helper to determine what java options to use for the containers
    * Falls back to Map-reduces map java opts if no tez specific options
    * are set
@@ -406,14 +421,14 @@ public class DagUtils {
     if (javaOpts != null && !javaOpts.isEmpty()) {
       String logLevel = HiveConf.getVar(conf, 
HiveConf.ConfVars.HIVETEZLOGLEVEL);
       List<String> logProps = Lists.newArrayList();
-      MRHelpers.addLog4jSystemProperties(logLevel, logProps);
+      TezUtils.addLog4jSystemProperties(logLevel, logProps);
       StringBuilder sb = new StringBuilder();
       for (String str : logProps) {
         sb.append(str).append(" ");
       }
       return javaOpts + " " + sb.toString();
     }
-    return MRHelpers.getMapJavaOpts(conf);
+    return MRHelpers.getJavaOptsForMRMapper(conf);
   }
 
   /*
@@ -431,18 +446,15 @@ public class DagUtils {
     // create the directories FileSinkOperators need
     Utilities.createTmpDirs(conf, mapWork);
 
-    // Tez ask us to call this even if there's no preceding vertex
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(conf, null);
-
     // finally create the vertex
     Vertex map = null;
 
     // use tez to combine splits
-    boolean useTezGroupedSplits = false;
+    boolean groupSplitsInInputInitializer;
+
+    DataSourceDescriptor dataSource;
 
     int numTasks = -1;
-    Class<HiveSplitGenerator> amSplitGeneratorClass = null;
-    InputSplitInfo inputSplitInfo = null;
     Class inputFormatClass = conf.getClass("mapred.input.format.class",
         InputFormat.class);
 
@@ -456,9 +468,9 @@ public class DagUtils {
     }
 
     if (vertexHasCustomInput) {
-      useTezGroupedSplits = false;
-      // grouping happens in execution phase. Setting the class to 
TezGroupedSplitsInputFormat
-      // here would cause pre-mature grouping which would be incorrect.
+      groupSplitsInInputInitializer = false;
+      // grouping happens in execution phase. The input payload should not 
enable grouping here,
+      // it will be enabled in the CustomVertex.
       inputFormatClass = HiveInputFormat.class;
       conf.setClass("mapred.input.format.class", HiveInputFormat.class, 
InputFormat.class);
       // mapreduce.tez.input.initializer.serialize.event.payload should be set 
to false when using
@@ -468,49 +480,52 @@ public class DagUtils {
       // we'll set up tez to combine spits for us iff the input format
       // is HiveInputFormat
       if (inputFormatClass == HiveInputFormat.class) {
-        useTezGroupedSplits = true;
-        conf.setClass("mapred.input.format.class", 
TezGroupedSplitsInputFormat.class, InputFormat.class);
+        groupSplitsInInputInitializer = true;
+      } else {
+        groupSplitsInInputInitializer = false;
       }
     }
 
     if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION)
         && !mapWork.isUseOneNullRowInputFormat()) {
+
+      // set up the operator plan. (before setting up splits on the AM)
+      Utilities.setMapWork(conf, mapWork, mrScratchDir, false);
+
       // if we're generating the splits in the AM, we just need to set
       // the correct plugin.
-      amSplitGeneratorClass = HiveSplitGenerator.class;
+      if (groupSplitsInInputInitializer) {
+        // Not setting a payload, since the MRInput payload is the same and 
can be accessed.
+        InputInitializerDescriptor descriptor = 
InputInitializerDescriptor.create(
+            HiveSplitGenerator.class.getName());
+        dataSource = MRInputLegacy.createConfigBuilder(conf, 
inputFormatClass).groupSplits(true)
+            .setCustomInitializerDescriptor(descriptor).build();
+      } else {
+        // Not HiveInputFormat, or a custom VertexManager will take care of 
grouping splits
+        dataSource = MRInputLegacy.createConfigBuilder(conf, 
inputFormatClass).groupSplits(false).build();
+      }
     } else {
-      // client side split generation means we have to compute them now
-      inputSplitInfo = MRHelpers.generateInputSplits(conf,
-          new Path(tezDir, "split_"+mapWork.getName().replaceAll(" ", "_")));
-      numTasks = inputSplitInfo.getNumTasks();
+      // Setup client side split generation.
+      dataSource = 
MRInputHelpers.configureMRInputWithLegacySplitGeneration(conf, new Path(tezDir,
+          "split_" + mapWork.getName().replaceAll(" ", "_")), true);
+      numTasks = dataSource.getNumberOfShards();
+
+      // set up the operator plan. (after generating splits - that changes 
configs)
+      Utilities.setMapWork(conf, mapWork, mrScratchDir, false);
     }
 
-    // set up the operator plan
-    Utilities.setMapWork(conf, mapWork, mrScratchDir, false);
-
-    byte[] serializedConf = MRHelpers.createUserPayloadFromConf(conf);
-    map = new Vertex(mapWork.getName(),
-        new ProcessorDescriptor(MapTezProcessor.class.getName()).
+    UserPayload serializedConf = TezUtils.createUserPayloadFromConf(conf);
+    map = Vertex.create(mapWork.getName(),
+        ProcessorDescriptor.create(MapTezProcessor.class.getName()).
         setUserPayload(serializedConf), numTasks, getContainerResource(conf));
-    Map<String, String> environment = new HashMap<String, String>();
-    MRHelpers.updateEnvironmentForMRTasks(conf, environment, true);
-    map.setTaskEnvironment(environment);
-    map.setJavaOpts(getContainerJavaOpts(conf));
+    map.setTaskEnvironment(getContainerEnvironment(conf, true));
+    map.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
 
     assert mapWork.getAliasToWork().keySet().size() == 1;
 
+    // Add the actual source input
     String alias = mapWork.getAliasToWork().keySet().iterator().next();
-
-    byte[] mrInput = null;
-    if (useTezGroupedSplits) {
-      mrInput = MRHelpers.createMRInputPayloadWithGrouping(serializedConf,
-          HiveInputFormat.class.getName());
-    } else {
-      mrInput = MRHelpers.createMRInputPayload(serializedConf, null);
-    }
-    map.addInput(alias,
-        new InputDescriptor(MRInputLegacy.class.getName()).
-        setUserPayload(mrInput), amSplitGeneratorClass);
+    map.addDataSource(alias, dataSource);
 
     Map<String, LocalResource> localResources = new HashMap<String, 
LocalResource>();
     localResources.put(getBaseName(appJarLr), appJarLr);
@@ -518,14 +533,7 @@ public class DagUtils {
       localResources.put(getBaseName(lr), lr);
     }
 
-    if (inputSplitInfo != null) {
-      // only relevant for client-side split generation
-      map.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
-      MRHelpers.updateLocalResourcesForInputSplits(FileSystem.get(conf), 
inputSplitInfo,
-          localResources);
-    }
-
-    map.setTaskLocalResources(localResources);
+    map.addTaskLocalFiles(localResources);
     return map;
   }
 
@@ -535,6 +543,7 @@ public class DagUtils {
   private JobConf initializeVertexConf(JobConf baseConf, Context context, 
ReduceWork reduceWork) {
     JobConf conf = new JobConf(baseConf);
 
+    // Is this required ?
     conf.set("mapred.reducer.class", ExecReducer.class.getName());
 
     boolean useSpeculativeExecReducers = HiveConf.getBoolVar(conf,
@@ -558,29 +567,22 @@ public class DagUtils {
     // create the directories FileSinkOperators need
     Utilities.createTmpDirs(conf, reduceWork);
 
-    // Call once here, will be updated when we find edges
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(conf, null);
-
     // create the vertex
-    Vertex reducer = new Vertex(reduceWork.getName(),
-        new ProcessorDescriptor(ReduceTezProcessor.class.getName()).
-        setUserPayload(MRHelpers.createUserPayloadFromConf(conf)),
+    Vertex reducer = Vertex.create(reduceWork.getName(),
+        ProcessorDescriptor.create(ReduceTezProcessor.class.getName()).
+        setUserPayload(TezUtils.createUserPayloadFromConf(conf)),
             reduceWork.isAutoReduceParallelism() ? 
reduceWork.getMaxReduceTasks() : reduceWork
                 .getNumReduceTasks(), getContainerResource(conf));
 
-    Map<String, String> environment = new HashMap<String, String>();
-
-    MRHelpers.updateEnvironmentForMRTasks(conf, environment, false);
-    reducer.setTaskEnvironment(environment);
-
-    reducer.setJavaOpts(getContainerJavaOpts(conf));
+    reducer.setTaskEnvironment(getContainerEnvironment(conf, false));
+    reducer.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
 
     Map<String, LocalResource> localResources = new HashMap<String, 
LocalResource>();
     localResources.put(getBaseName(appJarLr), appJarLr);
     for (LocalResource lr: additionalLr) {
       localResources.put(getBaseName(lr), lr);
     }
-    reducer.setTaskLocalResources(localResources);
+    reducer.addTaskLocalFiles(localResources);
 
     return reducer;
   }
@@ -614,37 +616,29 @@ public class DagUtils {
   }
 
   /**
-   * @param sessionConfig session configuration
    * @param numContainers number of containers to pre-warm
    * @param localResources additional resources to pre-warm with
-   * @return prewarm context object
+   * @return prewarm vertex to run
    */
-  public PreWarmContext createPreWarmContext(TezSessionConfiguration 
sessionConfig, int numContainers,
-      Map<String, LocalResource> localResources) throws IOException, 
TezException {
-
-    Configuration conf = sessionConfig.getTezConfiguration();
+  public PreWarmVertex createPreWarmVertex(TezConfiguration conf,
+      int numContainers, Map<String, LocalResource> localResources) throws
+      IOException, TezException {
 
-    ProcessorDescriptor prewarmProcDescriptor = new 
ProcessorDescriptor(HivePreWarmProcessor.class.getName());
-    
prewarmProcDescriptor.setUserPayload(MRHelpers.createUserPayloadFromConf(conf));
+    ProcessorDescriptor prewarmProcDescriptor = 
ProcessorDescriptor.create(HivePreWarmProcessor.class.getName());
+    
prewarmProcDescriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
 
-    PreWarmContext context = new PreWarmContext(prewarmProcDescriptor, 
getContainerResource(conf),
-        numContainers, null);
+    PreWarmVertex prewarmVertex = PreWarmVertex.create("prewarm", 
prewarmProcDescriptor, numContainers,getContainerResource(conf));
 
     Map<String, LocalResource> combinedResources = new HashMap<String, 
LocalResource>();
 
-    combinedResources.putAll(sessionConfig.getSessionResources());
     if (localResources != null) {
       combinedResources.putAll(localResources);
     }
 
-    context.setLocalResources(combinedResources);
-
-    /* boiler plate task env */
-    Map<String, String> environment = new HashMap<String, String>();
-    MRHelpers.updateEnvironmentForMRTasks(conf, environment, true);
-    context.setEnvironment(environment);
-    context.setJavaOpts(getContainerJavaOpts(conf));
-    return context;
+    prewarmVertex.addTaskLocalFiles(localResources);
+    prewarmVertex.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
+    prewarmVertex.setTaskEnvironment(getContainerEnvironment(conf, false));
+    return prewarmVertex;
   }
 
   /**
@@ -878,7 +872,7 @@ public class DagUtils {
   public JobConf createConfiguration(HiveConf hiveConf) throws IOException {
     hiveConf.setBoolean("mapred.mapper.new-api", false);
 
-    JobConf conf = (JobConf) MRHelpers.getBaseMRConfiguration(hiveConf);
+    JobConf conf = new JobConf(hiveConf);
 
     conf.set("mapred.output.committer.class", 
NullOutputCommitter.class.getName());
 
@@ -967,9 +961,9 @@ public class DagUtils {
 
     // final vertices need to have at least one output
     if (!hasChildren) {
-      v.addOutput("out_"+work.getName(),
-          new OutputDescriptor(MROutput.class.getName())
-          .setUserPayload(MRHelpers.createUserPayloadFromConf(conf)));
+      v.addDataSink("out_"+work.getName(), new DataSinkDescriptor(
+          OutputDescriptor.create(MROutput.class.getName())
+          .setUserPayload(TezUtils.createUserPayloadFromConf(conf)), null, 
null));
     }
 
     return v;
@@ -1037,16 +1031,16 @@ public class DagUtils {
     if (edgeProp.isAutoReduce()) {
       Configuration pluginConf = new Configuration(false);
       VertexManagerPluginDescriptor desc =
-          new 
VertexManagerPluginDescriptor(ShuffleVertexManager.class.getName());
+          
VertexManagerPluginDescriptor.create(ShuffleVertexManager.class.getName());
       pluginConf.setBoolean(
-          
ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
-      
pluginConf.setInt(ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM,
+          
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
+      
pluginConf.setInt(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM,
           edgeProp.getMinReducer());
       pluginConf.setLong(
-          
ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
+          
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
           edgeProp.getInputSizePerReducer());
-      ByteString payload = MRHelpers.createByteStringFromConf(pluginConf);
-      desc.setUserPayload(payload.toByteArray());
+      UserPayload payload = TezUtils.createUserPayloadFromConf(pluginConf);
+      desc.setUserPayload(payload);
       v.setVertexManagerPlugin(desc);
     }
   }


Reply via email to