Author: vikram
Date: Thu Jul 31 18:27:18 2014
New Revision: 1614949

URL: http://svn.apache.org/r1614949
Log:
HIVE-7096: Support grouped splits in Tez partitioned broadcast join (Vikram 
Dixit, reviewed by Gunther Hagleitner)

Modified:
    hive/branches/tez/itests/qtest/testconfiguration.properties
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
    
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java

Modified: hive/branches/tez/itests/qtest/testconfiguration.properties
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/itests/qtest/testconfiguration.properties?rev=1614949&r1=1614948&r2=1614949&view=diff
==============================================================================
--- hive/branches/tez/itests/qtest/testconfiguration.properties (original)
+++ hive/branches/tez/itests/qtest/testconfiguration.properties Thu Jul 31 
18:27:18 2014
@@ -1,5 +1,5 @@
 
minimr.query.files=stats_counter_partitioned.q,list_bucket_dml_10.q,input16_cc.q,scriptfile1.q,scriptfile1_win.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q,bucketmapjoin7.q,optrstat_groupby.q,bucket_num_reducers.q,bucket5.q,load_fs2.q,bucket_num_reducers2.q,infer_bucket_sort_merge.q,infer_bucket_sort_reducers_power_two.q,infer_bucket_sort_dyn_part.q,infer_bucket_sort_bucketed_table.q,infer_bucket_sort_map_operators.q,infer_bucket_sort_num_buckets.q,leftsemijoin_mr.q,schemeAuthority.q,schemeAuthority2.q,truncate_column_buckets.q,remote_script.q,,load_hdfs_file_with_space_in_the_name.q,parallel_orderby.q,import_exported_table.q,stats_counter.q,auto_sortmerge_join_16.q,quotedid_smb.q,file_with_header_footer.q,external_table_with_space_in_location_path.q,root_dir_external_table.q,index_bitmap3.q,ql_rewrite_gbtoidx.q,index_bitmap_auto.q,udf_using.q,empty_dir_in_table.q,temp_table_external.q
 
minimr.query.negative.files=cluster_tasklog_retrieval.q,minimr_broken_pipe.q,mapreduce_stack_trace.q,mapreduce_stack_trace_turnoff.q,mapreduce_stack_trace_hadoop20.q,mapreduce_stack_trace_turnoff_hadoop20.q,file_with_header_footer_negative.q,udf_local_resource.q
-minitez.query.files=tez_fsstat.q,mapjoin_decimal.q,tez_join_tests.q,tez_joins_explain.q,mrr.q,tez_dml.q,tez_insert_overwrite_local_directory_1.q,tez_union.q,bucket_map_join_tez1.q,bucket_map_join_tez2.q,tez_schema_evolution.q,tez_join_hash.q
+minitez.query.files=tez_fsstat.q,mapjoin_decimal.q,tez_join_tests.q,tez_joins_explain.q,mrr.q,tez_dml.q,tez_insert_overwrite_local_directory_1.q,tez_union.q,bucket_map_join_tez1.q,bucket_map_join_tez2.q,tez_schema_evolution.q,tez_join_hash.q,tez_bmj_schema_evolution.q
 
minitez.query.files.shared=cross_product_check_1.q,cross_product_check_2.q,dynpart_sort_opt_vectorization.q,dynpart_sort_optimization.q,orc_analyze.q,join0.q,join1.q,auto_join0.q,auto_join1.q,bucket2.q,bucket3.q,bucket4.q,count.q,create_merge_compressed.q,cross_join.q,ctas.q,custom_input_output_format.q,disable_merge_for_bucketing.q,enforce_order.q,filter_join_breaktask.q,filter_join_breaktask2.q,groupby1.q,groupby2.q,groupby3.q,having.q,insert1.q,insert_into1.q,insert_into2.q,leftsemijoin.q,limit_pushdown.q,load_dyn_part1.q,load_dyn_part2.q,load_dyn_part3.q,mapjoin_mapjoin.q,mapreduce1.q,mapreduce2.q,merge1.q,merge2.q,metadata_only_queries.q,sample1.q,subquery_in.q,subquery_exists.q,vectorization_15.q,ptf.q,stats_counter.q,stats_noscan_1.q,stats_counter_partitioned.q,union2.q,union3.q,union4.q,union5.q,union6.q,union7.q,union8.q,union9.q,transform1.q,transform2.q,transform_ppr1.q,transform_ppr2.q,script_env_var1.q,script_env_var2.q,script_pipe.q,scriptfile1.q,metadataonly1.q,temp_t
 able.q,vectorized_ptf.q,optimize_nullscan.q
 
beeline.positive.exclude=add_part_exist.q,alter1.q,alter2.q,alter4.q,alter5.q,alter_rename_partition.q,alter_rename_partition_authorization.q,archive.q,archive_corrupt.q,archive_multi.q,archive_mr_1806.q,archive_multi_mr_1806.q,authorization_1.q,authorization_2.q,authorization_4.q,authorization_5.q,authorization_6.q,authorization_7.q,ba_table1.q,ba_table2.q,ba_table3.q,ba_table_udfs.q,binary_table_bincolserde.q,binary_table_colserde.q,cluster.q,columnarserde_create_shortcut.q,combine2.q,constant_prop.q,create_nested_type.q,create_or_replace_view.q,create_struct_table.q,create_union_table.q,database.q,database_location.q,database_properties.q,ddltime.q,describe_database_json.q,drop_database_removes_partition_dirs.q,escape1.q,escape2.q,exim_00_nonpart_empty.q,exim_01_nonpart.q,exim_02_00_part_empty.q,exim_02_part.q,exim_03_nonpart_over_compat.q,exim_04_all_part.q,exim_04_evolved_parts.q,exim_05_some_part.q,exim_06_one_part.q,exim_07_all_part_over_nonoverlap.q,exim_08_nonpart_rename.q,
 
exim_09_part_spec_nonoverlap.q,exim_10_external_managed.q,exim_11_managed_external.q,exim_12_external_location.q,exim_13_managed_location.q,exim_14_managed_location_over_existing.q,exim_15_external_part.q,exim_16_part_external.q,exim_17_part_managed.q,exim_18_part_external.q,exim_19_00_part_external_location.q,exim_19_part_external_location.q,exim_20_part_managed_location.q,exim_21_export_authsuccess.q,exim_22_import_exist_authsuccess.q,exim_23_import_part_authsuccess.q,exim_24_import_nonexist_authsuccess.q,global_limit.q,groupby_complex_types.q,groupby_complex_types_multi_single_reducer.q,index_auth.q,index_auto.q,index_auto_empty.q,index_bitmap.q,index_bitmap1.q,index_bitmap2.q,index_bitmap3.q,index_bitmap_auto.q,index_bitmap_rc.q,index_compact.q,index_compact_1.q,index_compact_2.q,index_compact_3.q,index_stale_partitioned.q,init_file.q,input16.q,input16_cc.q,input46.q,input_columnarserde.q,input_dynamicserde.q,input_lazyserde.q,input_testxpath3.q,input_testxpath4.q,insert2_overwr
 
ite_partitions.q,insertexternal1.q,join_thrift.q,lateral_view.q,load_binary_data.q,load_exist_part_authsuccess.q,load_nonpart_authsuccess.q,load_part_authsuccess.q,loadpart_err.q,lock1.q,lock2.q,lock3.q,lock4.q,merge_dynamic_partition.q,multi_insert.q,multi_insert_move_tasks_share_dependencies.q,null_column.q,ppd_clusterby.q,query_with_semi.q,rename_column.q,sample6.q,sample_islocalmode_hook.q,set_processor_namespaces.q,show_tables.q,source.q,split_sample.q,str_to_map.q,transform1.q,udaf_collect_set.q,udaf_context_ngrams.q,udaf_histogram_numeric.q,udaf_ngrams.q,udaf_percentile_approx.q,udf_array.q,udf_bitmap_and.q,udf_bitmap_or.q,udf_explode.q,udf_format_number.q,udf_map.q,udf_map_keys.q,udf_map_values.q,udf_max.q,udf_min.q,udf_named_struct.q,udf_percentile.q,udf_printf.q,udf_sentences.q,udf_sort_array.q,udf_split.q,udf_struct.q,udf_substr.q,udf_translate.q,udf_union.q,udf_xpath.q,udtf_stack.q,view.q,virtual_column.q

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1614949&r1=1614948&r2=1614949&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
 (original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
 Thu Jul 31 18:27:18 2014
@@ -186,7 +186,7 @@ public class MapJoinOperator extends Abs
        * process different buckets and if the container is reused to join a 
different bucket,
        * join results can be incorrect. The cache is keyed on operator id and 
for bucket map join
        * the operator does not change but data needed is different. For a 
proper fix, this
-       * requires changes in the Tez API with regard to finding bucket id and 
+       * requires changes in the Tez API with regard to finding bucket id and
        * also ability to schedule tasks to re-use containers that have cached 
the specific bucket.
        */
       LOG.info("This is not bucket map join, so cache");

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java?rev=1614949&r1=1614948&r2=1614949&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java 
(original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java 
Thu Jul 31 18:27:18 2014
@@ -130,7 +130,7 @@ public class MapRedTask extends ExecDriv
 
       runningViaChild = conf.getBoolVar(HiveConf.ConfVars.SUBMITVIACHILD);
 
-      if(!runningViaChild) {
+      if (!runningViaChild) {
         // we are not running this mapred task via child jvm
         // so directly invoke ExecDriver
         return super.execute(driverContext);

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java?rev=1614949&r1=1614948&r2=1614949&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
 (original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
 Thu Jul 31 18:27:18 2014
@@ -14,10 +14,10 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch;
 import 
org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.SerDe;
-import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.ByteStream.Output;
 import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.WriteBuffers;
 import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
 import org.apache.hadoop.hive.serde2.io.ShortWritable;
@@ -57,7 +57,7 @@ public class MapJoinBytesTableContainer 
   private boolean[] sortableSortOrders;
   private KeyValueHelper writeHelper;
 
-  private List<Object> EMPTY_LIST = new ArrayList<Object>(0);
+  private final List<Object> EMPTY_LIST = new ArrayList<Object>(0);
 
   public MapJoinBytesTableContainer(Configuration hconf, 
MapJoinObjectSerDeContext valCtx)
       throws SerDeException {
@@ -474,6 +474,7 @@ public class MapJoinBytesTableContainer 
       return valueStruct.getFieldsAsList(); // TODO: should we unset bytes 
after that?
     }
 
+    @Override
     public void addRow(List<Object> t) {
       if (dummyRow != null || !refs.isEmpty()) {
         throw new RuntimeException("Cannot add rows when not empty");
@@ -482,9 +483,11 @@ public class MapJoinBytesTableContainer 
     }
 
     // Various unsupported methods.
+    @Override
     public void addRow(Object[] value) {
       throw new RuntimeException(this.getClass().getCanonicalName() + " cannot 
add arrays");
     }
+    @Override
     public void write(MapJoinObjectSerDeContext valueContext, 
ObjectOutputStream out) {
       throw new RuntimeException(this.getClass().getCanonicalName() + " cannot 
be serialized");
     }

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java?rev=1614949&r1=1614948&r2=1614949&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
 (original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
 Thu Jul 31 18:27:18 2014
@@ -42,7 +42,6 @@ import org.apache.hadoop.hive.serde2.obj
 import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-
 import org.apache.hadoop.io.Writable;
 
 /**

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java?rev=1614949&r1=1614948&r2=1614949&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
 (original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
 Thu Jul 31 18:27:18 2014
@@ -19,8 +19,8 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
-import java.util.ArrayList;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
@@ -31,8 +31,6 @@ import org.apache.tez.dag.api.EdgeManage
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 
-import com.google.common.collect.Multimap;
-
 public class CustomPartitionEdge extends EdgeManager {
 
   private static final Log LOG = 
LogFactory.getLog(CustomPartitionEdge.class.getName());
@@ -41,22 +39,22 @@ public class CustomPartitionEdge extends
   EdgeManagerContext context = null;
 
   // used by the framework at runtime. initialize is the real initializer at 
runtime
-  public CustomPartitionEdge() {  
+  public CustomPartitionEdge() {
   }
 
   @Override
-  public int getNumDestinationTaskPhysicalInputs(int numSourceTasks) {
-    return numSourceTasks;
+  public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) {
+    return context.getSourceVertexNumTasks();
   }
 
   @Override
-  public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks) {
+  public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) {
     return conf.getNumBuckets();
   }
 
   @Override
   public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
-    return sourceTaskIndex;
+    return context.getDestinationVertexNumTasks();
   }
 
   // called at runtime to initialize the custom edge.
@@ -83,30 +81,25 @@ public class CustomPartitionEdge extends
 
   @Override
   public void routeDataMovementEventToDestination(DataMovementEvent event,
-      int sourceTaskIndex, int numDestinationTasks, Map<Integer, 
List<Integer>> mapDestTaskIndices) {
-    int srcIndex = event.getSourceIndex();
-    List<Integer> destTaskIndices = new ArrayList<Integer>();
-    destTaskIndices.addAll(conf.getRoutingTable().get(srcIndex));
-    mapDestTaskIndices.put(new Integer(sourceTaskIndex), destTaskIndices);
+      int sourceTaskIndex, int sourceOutputIndex, Map<Integer, List<Integer>> 
mapDestTaskIndices) {
+    List<Integer> outputIndices = Collections.singletonList(sourceTaskIndex);
+    for (Integer destIndex : conf.getRoutingTable().get(sourceOutputIndex)) {
+      mapDestTaskIndices.put(destIndex, outputIndices);
+    }
   }
 
   @Override
-  public void routeInputSourceTaskFailedEventToDestination(int
-      sourceTaskIndex, Map<Integer, List<Integer>> mapDestTaskIndices) {
-    List<Integer> destTaskIndices = new ArrayList<Integer>();
-    addAllDestinationTaskIndices(context.getDestinationVertexNumTasks(), 
destTaskIndices);
-    mapDestTaskIndices.put(new Integer(sourceTaskIndex), destTaskIndices);
+  public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
+      Map<Integer, List<Integer>> mapDestTaskIndices) {
+    List<Integer> outputIndices = Collections.singletonList(sourceTaskIndex);
+    for (int i = 0; i < context.getDestinationVertexNumTasks(); i++) {
+      mapDestTaskIndices.put(i, outputIndices);
+    }
   }
 
   @Override
-  public int routeInputErrorEventToSource(InputReadErrorEvent event, 
+  public int routeInputErrorEventToSource(InputReadErrorEvent event,
       int destinationTaskIndex, int destinationFailedInputIndex) {
     return event.getIndex();
   }
-
-  void addAllDestinationTaskIndices(int numDestinationTasks, List<Integer> 
taskIndices) {
-    for(int i=0; i<numDestinationTasks; ++i) {
-      taskIndices.add(new Integer(i));
-    }
-  }
 }

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java?rev=1614949&r1=1614948&r2=1614949&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
 (original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
 Thu Jul 31 18:27:18 2014
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -30,11 +31,12 @@ import java.util.TreeMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat;
 import org.apache.tez.dag.api.EdgeManagerDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
@@ -44,10 +46,12 @@ import org.apache.tez.dag.api.TezConfigu
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.RootInputSpecUpdate;
 import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
 import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
 import org.apache.tez.runtime.api.events.RootInputUpdatePayloadEvent;
@@ -76,6 +80,7 @@ public class CustomPartitionVertex exten
   private Configuration conf = null;
   private boolean rootVertexInitialized = false;
   private final SplitGrouper grouper = new SplitGrouper();
+  private int taskCount = 0;
 
   public CustomPartitionVertex() {
   }
@@ -90,7 +95,8 @@ public class CustomPartitionVertex exten
   @Override
   public void onVertexStarted(Map<String, List<Integer>> completions) {
     int numTasks = context.getVertexNumTasks(context.getVertexName());
-    List<VertexManagerPluginContext.TaskWithLocationHint> scheduledTasks = new 
ArrayList<VertexManagerPluginContext.TaskWithLocationHint>(numTasks);
+    List<VertexManagerPluginContext.TaskWithLocationHint> scheduledTasks = 
+      new ArrayList<VertexManagerPluginContext.TaskWithLocationHint>(numTasks);
     for (int i = 0; i < numTasks; ++i) {
       scheduledTasks.add(new 
VertexManagerPluginContext.TaskWithLocationHint(new Integer(i), null));
     }
@@ -114,6 +120,7 @@ public class CustomPartitionVertex exten
     // ensure this method is called only once. Tez will call it once per Root
     // Input.
     Preconditions.checkState(rootVertexInitialized == false);
+    LOG.info("Root vertex not initialized");
     rootVertexInitialized = true;
     try {
       // This is using the payload from the RootVertexInitializer corresponding
@@ -151,7 +158,7 @@ public class CustomPartitionVertex exten
     }
 
     boolean dataInformationEventSeen = false;
-    Map<Path, List<FileSplit>> pathFileSplitsMap = new TreeMap<Path, 
List<FileSplit>>();
+    Map<String, List<FileSplit>> pathFileSplitsMap = new TreeMap<String, 
List<FileSplit>>();
 
     for (Event event : events) {
       if (event instanceof RootInputConfigureVertexTasksEvent) {
@@ -182,10 +189,10 @@ public class CustomPartitionVertex exten
         } catch (IOException e) {
           throw new RuntimeException("Failed to get file split for event: " + 
diEvent);
         }
-        List<FileSplit> fsList = pathFileSplitsMap.get(fileSplit.getPath());
+        List<FileSplit> fsList = 
pathFileSplitsMap.get(fileSplit.getPath().getName());
         if (fsList == null) {
           fsList = new ArrayList<FileSplit>();
-          pathFileSplitsMap.put(fileSplit.getPath(), fsList);
+          pathFileSplitsMap.put(fileSplit.getPath().getName(), fsList);
         }
         fsList.add(fileSplit);
       }
@@ -204,12 +211,23 @@ public class CustomPartitionVertex exten
       int availableSlots = totalResource / taskResource;
 
       LOG.info("Grouping splits. " + availableSlots + " available slots, " + 
waves + " waves.");
+      JobConf jobConf = new JobConf(conf);
+      ShimLoader.getHadoopShims().getMergedCredentials(jobConf);
 
       Multimap<Integer, InputSplit> bucketToGroupedSplitMap =
-          grouper.group(conf, bucketToInitialSplitMap, availableSlots, waves);
+          HashMultimap.<Integer, InputSplit> create();
+      for (Integer key : bucketToInitialSplitMap.keySet()) {
+        InputSplit[] inputSplitArray =
+            (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0]));
+        Multimap<Integer, InputSplit> groupedSplit =
+            HiveSplitGenerator.generateGroupedSplits(jobConf, conf, 
inputSplitArray, waves,
+            availableSlots);
+        bucketToGroupedSplitMap.putAll(key, groupedSplit.values());
+      }
 
+      LOG.info("We have grouped the splits into " + 
bucketToGroupedSplitMap.size() + " tasks");
       processAllEvents(inputName, bucketToGroupedSplitMap);
-    } catch (IOException e) {
+    } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
@@ -219,7 +237,6 @@ public class CustomPartitionVertex exten
 
     Multimap<Integer, Integer> bucketToTaskMap = HashMultimap.<Integer, 
Integer> create();
     List<InputSplit> finalSplits = Lists.newLinkedList();
-    int taskCount = 0;
     for (Entry<Integer, Collection<InputSplit>> entry : 
bucketToGroupedSplitMap.asMap().entrySet()) {
       int bucketNum = entry.getKey();
       Collection<InputSplit> initialSplits = entry.getValue();
@@ -264,10 +281,15 @@ public class CustomPartitionVertex exten
     }
 
     // Replace the Edge Managers
+    Map<String, RootInputSpecUpdate> rootInputSpecUpdate =
+      new HashMap<String, RootInputSpecUpdate>();
+    rootInputSpecUpdate.put(
+        inputName,
+        RootInputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
     context.setVertexParallelism(
         taskCount,
         new VertexLocationHint(grouper.createTaskLocationHints(finalSplits
-            .toArray(new InputSplit[finalSplits.size()]))), emMap, null);
+            .toArray(new InputSplit[finalSplits.size()]))), emMap, 
rootInputSpecUpdate);
 
     // Set the actual events for the tasks.
     context.addRootInputEvents(inputName, taskEvents);
@@ -304,7 +326,7 @@ public class CustomPartitionVertex exten
    * This method generates the map of bucket to file splits.
    */
   private Multimap<Integer, InputSplit> getBucketSplitMapForPath(
-      Map<Path, List<FileSplit>> pathFileSplitsMap) {
+      Map<String, List<FileSplit>> pathFileSplitsMap) {
 
     int bucketNum = 0;
     int fsCount = 0;
@@ -312,7 +334,7 @@ public class CustomPartitionVertex exten
     Multimap<Integer, InputSplit> bucketToInitialSplitMap =
         ArrayListMultimap.<Integer, InputSplit> create();
 
-    for (Map.Entry<Path, List<FileSplit>> entry : 
pathFileSplitsMap.entrySet()) {
+    for (Map.Entry<String, List<FileSplit>> entry : 
pathFileSplitsMap.entrySet()) {
       int bucketId = bucketNum % numBuckets;
       for (FileSplit fsplit : entry.getValue()) {
         fsCount++;

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1614949&r1=1614948&r2=1614949&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java 
(original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java 
Thu Jul 31 18:27:18 2014
@@ -95,12 +95,14 @@ import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexGroup;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
+import org.apache.tez.runtime.api.TezRootInputInitializer;
 import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer;
 import 
org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfigurer;
 import 
org.apache.tez.runtime.library.conf.UnorderedUnpartitionedKVEdgeConfigurer;
@@ -415,7 +417,7 @@ public class DagUtils {
     boolean useTezGroupedSplits = false;
 
     int numTasks = -1;
-    Class<HiveSplitGenerator> amSplitGeneratorClass = null;
+    Class<? extends TezRootInputInitializer> amSplitGeneratorClass = null;
     InputSplitInfo inputSplitInfo = null;
     Class inputFormatClass = conf.getClass("mapred.input.format.class",
         InputFormat.class);
@@ -451,7 +453,11 @@ public class DagUtils {
         && !mapWork.isUseOneNullRowInputFormat()) {
       // if we're generating the splits in the AM, we just need to set
       // the correct plugin.
-      amSplitGeneratorClass = HiveSplitGenerator.class;
+      if (useTezGroupedSplits) {
+        amSplitGeneratorClass = HiveSplitGenerator.class;
+      } else {
+        amSplitGeneratorClass = MRInputAMSplitGenerator.class;
+      }
     } else {
       // client side split generation means we have to compute them now
       inputSplitInfo = MRHelpers.generateInputSplits(conf,
@@ -482,6 +488,7 @@ public class DagUtils {
     } else {
       mrInput = MRHelpers.createMRInputPayload(serializedConf, null);
     }
+
     map.addInput(alias,
         new InputDescriptor(MRInputLegacy.class.getName()).
         setUserPayload(mrInput), amSplitGeneratorClass);

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java?rev=1614949&r1=1614948&r2=1614949&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
 (original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
 Thu Jul 31 18:27:18 2014
@@ -18,45 +18,29 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.debug.Utils;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKeyObject;
-import org.apache.hadoop.hive.ql.exec.persistence.LazyFlatRowContainer;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
-import org.apache.hadoop.hive.serde2.ByteStream.Output;
 import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryFactory;
-import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
-import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryStruct;
-import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.library.api.KeyValueReader;
@@ -73,6 +57,7 @@ public class HashTableLoader implements 
   private Configuration hconf;
   private MapJoinDesc desc;
   private MapJoinKey lastKey = null;
+  private int rowCount = 0;
 
   @Override
   public void init(ExecMapperContext context, Configuration hconf, 
MapJoinOperator joinOp) {
@@ -121,6 +106,7 @@ public class HashTableLoader implements 
             ? new MapJoinBytesTableContainer(hconf, valCtx) : new 
HashMapWrapper(hconf);
 
         while (kvReader.next()) {
+          rowCount++;
           lastKey = tableContainer.putRow(keyCtx, 
(Writable)kvReader.getCurrentKey(),
               valCtx, (Writable)kvReader.getCurrentValue());
         }

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java?rev=1614949&r1=1614948&r2=1614949&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
 (original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
 Thu Jul 31 18:27:18 2014
@@ -45,11 +45,12 @@ import org.apache.tez.mapreduce.protos.M
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.RootInputSpecUpdate;
-import org.apache.tez.runtime.api.TezRootInputInitializer;
-import org.apache.tez.runtime.api.TezRootInputInitializerContext;
 import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
 import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
 import org.apache.tez.runtime.api.events.RootInputInitializerEvent;
+import org.apache.tez.runtime.api.RootInputSpecUpdate;
+import org.apache.tez.runtime.api.TezRootInputInitializer;
+import org.apache.tez.runtime.api.TezRootInputInitializerContext;
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Lists;
@@ -65,7 +66,7 @@ public class HiveSplitGenerator implemen
 
   private static final Log LOG = LogFactory.getLog(HiveSplitGenerator.class);
 
-  private final SplitGrouper grouper = new SplitGrouper();
+  private static final SplitGrouper grouper = new SplitGrouper();
 
   @Override
   public List<Event> initialize(TezRootInputInitializerContext 
rootInputContext) throws Exception {
@@ -86,7 +87,36 @@ public class HiveSplitGenerator implemen
     InputSplitInfoMem inputSplitInfo = null;
     String realInputFormatName = userPayloadProto.getInputFormatName();
     if (realInputFormatName != null && !realInputFormatName.isEmpty()) {
-      inputSplitInfo = generateGroupedSplits(rootInputContext, jobConf, conf, 
realInputFormatName);
+      // Need to instantiate the realInputFormat
+      InputFormat<?, ?> inputFormat =
+          (InputFormat<?, ?>) 
ReflectionUtils.newInstance(Class.forName(realInputFormatName),
+              jobConf);
+
+      int totalResource = 
rootInputContext.getTotalAvailableResource().getMemory();
+      int taskResource = rootInputContext.getVertexTaskResource().getMemory();
+      int availableSlots = totalResource / taskResource;
+
+      // Create the un-grouped splits
+      float waves =
+          conf.getFloat(TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES,
+              TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES_DEFAULT);
+
+      InputSplit[] splits = inputFormat.getSplits(jobConf, (int) 
(availableSlots * waves));
+      LOG.info("Number of input splits: " + splits.length + ". " + 
availableSlots
+          + " available slots, " + waves + " waves. Input format is: " + 
realInputFormatName);
+
+      Multimap<Integer, InputSplit> groupedSplits =
+          generateGroupedSplits(jobConf, conf, splits, waves, availableSlots);
+      // And finally return them in a flat array
+      InputSplit[] flatSplits = groupedSplits.values().toArray(new 
InputSplit[0]);
+      LOG.info("Number of grouped splits: " + flatSplits.length);
+
+      List<TaskLocationHint> locationHints = 
grouper.createTaskLocationHints(flatSplits);
+
+      Utilities.clearWork(jobConf);
+
+      inputSplitInfo =
+          new InputSplitInfoMem(flatSplits, locationHints, flatSplits.length, 
null, jobConf);
     } else {
       inputSplitInfo = MRHelpers.generateInputSplitsToMem(jobConf);
     }
@@ -94,31 +124,12 @@ public class HiveSplitGenerator implemen
     return createEventList(sendSerializedEvents, inputSplitInfo);
   }
 
-  private InputSplitInfoMem 
generateGroupedSplits(TezRootInputInitializerContext context,
-      JobConf jobConf, Configuration conf, String realInputFormatName) throws 
Exception {
-
-    int totalResource = context.getTotalAvailableResource().getMemory();
-    int taskResource = context.getVertexTaskResource().getMemory();
-    int availableSlots = totalResource / taskResource;
-
-    float waves =
-        conf.getFloat(TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES,
-            TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES_DEFAULT);
+  public static Multimap<Integer, InputSplit> generateGroupedSplits(JobConf 
jobConf,
+      Configuration conf, InputSplit[] splits, float waves, int availableSlots)
+      throws Exception {
 
     MapWork work = Utilities.getMapWork(jobConf);
 
-    LOG.info("Grouping splits for " + work.getName() + ". " + availableSlots + 
" available slots, "
-        + waves + " waves. Input format is: " + realInputFormatName);
-
-    // Need to instantiate the realInputFormat
-    InputFormat<?, ?> inputFormat =
-        (InputFormat<?, ?>) ReflectionUtils
-            .newInstance(Class.forName(realInputFormatName), jobConf);
-
-    // Create the un-grouped splits
-    InputSplit[] splits = inputFormat.getSplits(jobConf, (int) (availableSlots 
* waves));
-    LOG.info("Number of input splits: " + splits.length);
-
     Multimap<Integer, InputSplit> bucketSplitMultiMap =
         ArrayListMultimap.<Integer, InputSplit> create();
 
@@ -161,15 +172,7 @@ public class HiveSplitGenerator implemen
     Multimap<Integer, InputSplit> groupedSplits =
         grouper.group(jobConf, bucketSplitMultiMap, availableSlots, waves);
 
-    // And finally return them in a flat array
-    InputSplit[] flatSplits = groupedSplits.values().toArray(new 
InputSplit[0]);
-    LOG.info("Number of grouped splits: " + flatSplits.length);
-
-    List<TaskLocationHint> locationHints = 
grouper.createTaskLocationHints(flatSplits);
-
-    Utilities.clearWork(jobConf);
-
-    return new InputSplitInfoMem(flatSplits, locationHints, flatSplits.length, 
null, jobConf);
+    return groupedSplits;
   }
 
   private List<Event> createEventList(boolean sendSerializedEvents, 
InputSplitInfoMem inputSplitInfo) {
@@ -178,7 +181,8 @@ public class HiveSplitGenerator implemen
 
     RootInputConfigureVertexTasksEvent configureVertexEvent = new
         RootInputConfigureVertexTasksEvent(inputSplitInfo.getNumTasks(),
-        inputSplitInfo.getTaskLocationHints(), 
RootInputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
+        inputSplitInfo.getTaskLocationHints(),
+        RootInputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
     events.add(configureVertexEvent);
 
     if (sendSerializedEvents) {

Modified: 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
URL: 
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java?rev=1614949&r1=1614948&r2=1614949&view=diff
==============================================================================
--- 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
 (original)
+++ 
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
 Thu Jul 31 18:27:18 2014
@@ -18,14 +18,13 @@
 
 package org.apache.hadoop.hive.ql.io;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.lib.HashPartitioner;
 
 /** Partition keys by their {@link Object#hashCode()}. */
 public class DefaultHivePartitioner<K2, V2> extends HashPartitioner<K2, V2> 
implements HivePartitioner<K2, V2> {
 
   /** Use {@link Object#hashCode()} to partition. */
+  @Override
   public int getBucket(K2 key, V2 value, int numBuckets) {
     return (key.hashCode() & Integer.MAX_VALUE) % numBuckets;
   }


Reply via email to