Author: vikram
Date: Thu Jul 31 18:27:18 2014
New Revision: 1614949
URL: http://svn.apache.org/r1614949
Log:
HIVE-7096: Support grouped splits in Tez partitioned broadcast join (Vikram
Dixit, reviewed by Gunther Hagleitner)
Modified:
hive/branches/tez/itests/qtest/testconfiguration.properties
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
Modified: hive/branches/tez/itests/qtest/testconfiguration.properties
URL:
http://svn.apache.org/viewvc/hive/branches/tez/itests/qtest/testconfiguration.properties?rev=1614949&r1=1614948&r2=1614949&view=diff
==============================================================================
--- hive/branches/tez/itests/qtest/testconfiguration.properties (original)
+++ hive/branches/tez/itests/qtest/testconfiguration.properties Thu Jul 31
18:27:18 2014
@@ -1,5 +1,5 @@
minimr.query.files=stats_counter_partitioned.q,list_bucket_dml_10.q,input16_cc.q,scriptfile1.q,scriptfile1_win.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q,bucketmapjoin7.q,optrstat_groupby.q,bucket_num_reducers.q,bucket5.q,load_fs2.q,bucket_num_reducers2.q,infer_bucket_sort_merge.q,infer_bucket_sort_reducers_power_two.q,infer_bucket_sort_dyn_part.q,infer_bucket_sort_bucketed_table.q,infer_bucket_sort_map_operators.q,infer_bucket_sort_num_buckets.q,leftsemijoin_mr.q,schemeAuthority.q,schemeAuthority2.q,truncate_column_buckets.q,remote_script.q,,load_hdfs_file_with_space_in_the_name.q,parallel_orderby.q,import_exported_table.q,stats_counter.q,auto_sortmerge_join_16.q,quotedid_smb.q,file_with_header_footer.q,external_table_with_space_in_location_path.q,root_dir_external_table.q,index_bitmap3.q,ql_rewrite_gbtoidx.q,index_bitmap_auto.q,udf_using.q,empty_dir_in_table.q,temp_table_external.q
minimr.query.negative.files=cluster_tasklog_retrieval.q,minimr_broken_pipe.q,mapreduce_stack_trace.q,mapreduce_stack_trace_turnoff.q,mapreduce_stack_trace_hadoop20.q,mapreduce_stack_trace_turnoff_hadoop20.q,file_with_header_footer_negative.q,udf_local_resource.q
-minitez.query.files=tez_fsstat.q,mapjoin_decimal.q,tez_join_tests.q,tez_joins_explain.q,mrr.q,tez_dml.q,tez_insert_overwrite_local_directory_1.q,tez_union.q,bucket_map_join_tez1.q,bucket_map_join_tez2.q,tez_schema_evolution.q,tez_join_hash.q
+minitez.query.files=tez_fsstat.q,mapjoin_decimal.q,tez_join_tests.q,tez_joins_explain.q,mrr.q,tez_dml.q,tez_insert_overwrite_local_directory_1.q,tez_union.q,bucket_map_join_tez1.q,bucket_map_join_tez2.q,tez_schema_evolution.q,tez_join_hash.q,tez_bmj_schema_evolution.q
minitez.query.files.shared=cross_product_check_1.q,cross_product_check_2.q,dynpart_sort_opt_vectorization.q,dynpart_sort_optimization.q,orc_analyze.q,join0.q,join1.q,auto_join0.q,auto_join1.q,bucket2.q,bucket3.q,bucket4.q,count.q,create_merge_compressed.q,cross_join.q,ctas.q,custom_input_output_format.q,disable_merge_for_bucketing.q,enforce_order.q,filter_join_breaktask.q,filter_join_breaktask2.q,groupby1.q,groupby2.q,groupby3.q,having.q,insert1.q,insert_into1.q,insert_into2.q,leftsemijoin.q,limit_pushdown.q,load_dyn_part1.q,load_dyn_part2.q,load_dyn_part3.q,mapjoin_mapjoin.q,mapreduce1.q,mapreduce2.q,merge1.q,merge2.q,metadata_only_queries.q,sample1.q,subquery_in.q,subquery_exists.q,vectorization_15.q,ptf.q,stats_counter.q,stats_noscan_1.q,stats_counter_partitioned.q,union2.q,union3.q,union4.q,union5.q,union6.q,union7.q,union8.q,union9.q,transform1.q,transform2.q,transform_ppr1.q,transform_ppr2.q,script_env_var1.q,script_env_var2.q,script_pipe.q,scriptfile1.q,metadataonly1.q,temp_t
able.q,vectorized_ptf.q,optimize_nullscan.q
beeline.positive.exclude=add_part_exist.q,alter1.q,alter2.q,alter4.q,alter5.q,alter_rename_partition.q,alter_rename_partition_authorization.q,archive.q,archive_corrupt.q,archive_multi.q,archive_mr_1806.q,archive_multi_mr_1806.q,authorization_1.q,authorization_2.q,authorization_4.q,authorization_5.q,authorization_6.q,authorization_7.q,ba_table1.q,ba_table2.q,ba_table3.q,ba_table_udfs.q,binary_table_bincolserde.q,binary_table_colserde.q,cluster.q,columnarserde_create_shortcut.q,combine2.q,constant_prop.q,create_nested_type.q,create_or_replace_view.q,create_struct_table.q,create_union_table.q,database.q,database_location.q,database_properties.q,ddltime.q,describe_database_json.q,drop_database_removes_partition_dirs.q,escape1.q,escape2.q,exim_00_nonpart_empty.q,exim_01_nonpart.q,exim_02_00_part_empty.q,exim_02_part.q,exim_03_nonpart_over_compat.q,exim_04_all_part.q,exim_04_evolved_parts.q,exim_05_some_part.q,exim_06_one_part.q,exim_07_all_part_over_nonoverlap.q,exim_08_nonpart_rename.q,
exim_09_part_spec_nonoverlap.q,exim_10_external_managed.q,exim_11_managed_external.q,exim_12_external_location.q,exim_13_managed_location.q,exim_14_managed_location_over_existing.q,exim_15_external_part.q,exim_16_part_external.q,exim_17_part_managed.q,exim_18_part_external.q,exim_19_00_part_external_location.q,exim_19_part_external_location.q,exim_20_part_managed_location.q,exim_21_export_authsuccess.q,exim_22_import_exist_authsuccess.q,exim_23_import_part_authsuccess.q,exim_24_import_nonexist_authsuccess.q,global_limit.q,groupby_complex_types.q,groupby_complex_types_multi_single_reducer.q,index_auth.q,index_auto.q,index_auto_empty.q,index_bitmap.q,index_bitmap1.q,index_bitmap2.q,index_bitmap3.q,index_bitmap_auto.q,index_bitmap_rc.q,index_compact.q,index_compact_1.q,index_compact_2.q,index_compact_3.q,index_stale_partitioned.q,init_file.q,input16.q,input16_cc.q,input46.q,input_columnarserde.q,input_dynamicserde.q,input_lazyserde.q,input_testxpath3.q,input_testxpath4.q,insert2_overwr
ite_partitions.q,insertexternal1.q,join_thrift.q,lateral_view.q,load_binary_data.q,load_exist_part_authsuccess.q,load_nonpart_authsuccess.q,load_part_authsuccess.q,loadpart_err.q,lock1.q,lock2.q,lock3.q,lock4.q,merge_dynamic_partition.q,multi_insert.q,multi_insert_move_tasks_share_dependencies.q,null_column.q,ppd_clusterby.q,query_with_semi.q,rename_column.q,sample6.q,sample_islocalmode_hook.q,set_processor_namespaces.q,show_tables.q,source.q,split_sample.q,str_to_map.q,transform1.q,udaf_collect_set.q,udaf_context_ngrams.q,udaf_histogram_numeric.q,udaf_ngrams.q,udaf_percentile_approx.q,udf_array.q,udf_bitmap_and.q,udf_bitmap_or.q,udf_explode.q,udf_format_number.q,udf_map.q,udf_map_keys.q,udf_map_values.q,udf_max.q,udf_min.q,udf_named_struct.q,udf_percentile.q,udf_printf.q,udf_sentences.q,udf_sort_array.q,udf_split.q,udf_struct.q,udf_substr.q,udf_translate.q,udf_union.q,udf_xpath.q,udtf_stack.q,view.q,virtual_column.q
Modified:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL:
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1614949&r1=1614948&r2=1614949&view=diff
==============================================================================
---
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
(original)
+++
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
Thu Jul 31 18:27:18 2014
@@ -186,7 +186,7 @@ public class MapJoinOperator extends Abs
* process different buckets and if the container is reused to join a
different bucket,
* join results can be incorrect. The cache is keyed on operator id and
for bucket map join
* the operator does not change but data needed is different. For a
proper fix, this
- * requires changes in the Tez API with regard to finding bucket id and
+ * requires changes in the Tez API with regard to finding bucket id and
* also ability to schedule tasks to re-use containers that have cached
the specific bucket.
*/
LOG.info("This is not bucket map join, so cache");
Modified:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
URL:
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java?rev=1614949&r1=1614948&r2=1614949&view=diff
==============================================================================
---
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
(original)
+++
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
Thu Jul 31 18:27:18 2014
@@ -130,7 +130,7 @@ public class MapRedTask extends ExecDriv
runningViaChild = conf.getBoolVar(HiveConf.ConfVars.SUBMITVIACHILD);
- if(!runningViaChild) {
+ if (!runningViaChild) {
// we are not running this mapred task via child jvm
// so directly invoke ExecDriver
return super.execute(driverContext);
Modified:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
URL:
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java?rev=1614949&r1=1614948&r2=1614949&view=diff
==============================================================================
---
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
(original)
+++
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
Thu Jul 31 18:27:18 2014
@@ -14,10 +14,10 @@ import org.apache.hadoop.hive.ql.exec.ve
import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch;
import
org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.SerDe;
-import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.ByteStream.Output;
import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.WriteBuffers;
import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
@@ -57,7 +57,7 @@ public class MapJoinBytesTableContainer
private boolean[] sortableSortOrders;
private KeyValueHelper writeHelper;
- private List<Object> EMPTY_LIST = new ArrayList<Object>(0);
+ private final List<Object> EMPTY_LIST = new ArrayList<Object>(0);
public MapJoinBytesTableContainer(Configuration hconf,
MapJoinObjectSerDeContext valCtx)
throws SerDeException {
@@ -474,6 +474,7 @@ public class MapJoinBytesTableContainer
return valueStruct.getFieldsAsList(); // TODO: should we unset bytes
after that?
}
+ @Override
public void addRow(List<Object> t) {
if (dummyRow != null || !refs.isEmpty()) {
throw new RuntimeException("Cannot add rows when not empty");
@@ -482,9 +483,11 @@ public class MapJoinBytesTableContainer
}
// Various unsupported methods.
+ @Override
public void addRow(Object[] value) {
throw new RuntimeException(this.getClass().getCanonicalName() + " cannot
add arrays");
}
+ @Override
public void write(MapJoinObjectSerDeContext valueContext,
ObjectOutputStream out) {
throw new RuntimeException(this.getClass().getCanonicalName() + " cannot
be serialized");
}
Modified:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
URL:
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java?rev=1614949&r1=1614948&r2=1614949&view=diff
==============================================================================
---
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
(original)
+++
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
Thu Jul 31 18:27:18 2014
@@ -42,7 +42,6 @@ import org.apache.hadoop.hive.serde2.obj
import
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-
import org.apache.hadoop.io.Writable;
/**
Modified:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
URL:
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java?rev=1614949&r1=1614948&r2=1614949&view=diff
==============================================================================
---
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
(original)
+++
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
Thu Jul 31 18:27:18 2014
@@ -19,8 +19,8 @@
package org.apache.hadoop.hive.ql.exec.tez;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
-import java.util.ArrayList;
import java.util.Map;
import org.apache.commons.logging.Log;
@@ -31,8 +31,6 @@ import org.apache.tez.dag.api.EdgeManage
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
-import com.google.common.collect.Multimap;
-
public class CustomPartitionEdge extends EdgeManager {
private static final Log LOG =
LogFactory.getLog(CustomPartitionEdge.class.getName());
@@ -41,22 +39,22 @@ public class CustomPartitionEdge extends
EdgeManagerContext context = null;
// used by the framework at runtime. initialize is the real initializer at
runtime
- public CustomPartitionEdge() {
+ public CustomPartitionEdge() {
}
@Override
- public int getNumDestinationTaskPhysicalInputs(int numSourceTasks) {
- return numSourceTasks;
+ public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) {
+ return context.getSourceVertexNumTasks();
}
@Override
- public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks) {
+ public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) {
return conf.getNumBuckets();
}
@Override
public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
- return sourceTaskIndex;
+ return context.getDestinationVertexNumTasks();
}
// called at runtime to initialize the custom edge.
@@ -83,30 +81,25 @@ public class CustomPartitionEdge extends
@Override
public void routeDataMovementEventToDestination(DataMovementEvent event,
- int sourceTaskIndex, int numDestinationTasks, Map<Integer,
List<Integer>> mapDestTaskIndices) {
- int srcIndex = event.getSourceIndex();
- List<Integer> destTaskIndices = new ArrayList<Integer>();
- destTaskIndices.addAll(conf.getRoutingTable().get(srcIndex));
- mapDestTaskIndices.put(new Integer(sourceTaskIndex), destTaskIndices);
+ int sourceTaskIndex, int sourceOutputIndex, Map<Integer, List<Integer>>
mapDestTaskIndices) {
+ List<Integer> outputIndices = Collections.singletonList(sourceTaskIndex);
+ for (Integer destIndex : conf.getRoutingTable().get(sourceOutputIndex)) {
+ mapDestTaskIndices.put(destIndex, outputIndices);
+ }
}
@Override
- public void routeInputSourceTaskFailedEventToDestination(int
- sourceTaskIndex, Map<Integer, List<Integer>> mapDestTaskIndices) {
- List<Integer> destTaskIndices = new ArrayList<Integer>();
- addAllDestinationTaskIndices(context.getDestinationVertexNumTasks(),
destTaskIndices);
- mapDestTaskIndices.put(new Integer(sourceTaskIndex), destTaskIndices);
+ public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
+ Map<Integer, List<Integer>> mapDestTaskIndices) {
+ List<Integer> outputIndices = Collections.singletonList(sourceTaskIndex);
+ for (int i = 0; i < context.getDestinationVertexNumTasks(); i++) {
+ mapDestTaskIndices.put(i, outputIndices);
+ }
}
@Override
- public int routeInputErrorEventToSource(InputReadErrorEvent event,
+ public int routeInputErrorEventToSource(InputReadErrorEvent event,
int destinationTaskIndex, int destinationFailedInputIndex) {
return event.getIndex();
}
-
- void addAllDestinationTaskIndices(int numDestinationTasks, List<Integer>
taskIndices) {
- for(int i=0; i<numDestinationTasks; ++i) {
- taskIndices.add(new Integer(i));
- }
- }
}
Modified:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
URL:
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java?rev=1614949&r1=1614948&r2=1614949&view=diff
==============================================================================
---
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
(original)
+++
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
Thu Jul 31 18:27:18 2014
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -30,11 +31,12 @@ import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat;
import org.apache.tez.dag.api.EdgeManagerDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
@@ -44,10 +46,12 @@ import org.apache.tez.dag.api.TezConfigu
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.RootInputSpecUpdate;
import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
import org.apache.tez.runtime.api.events.RootInputUpdatePayloadEvent;
@@ -76,6 +80,7 @@ public class CustomPartitionVertex exten
private Configuration conf = null;
private boolean rootVertexInitialized = false;
private final SplitGrouper grouper = new SplitGrouper();
+ private int taskCount = 0;
public CustomPartitionVertex() {
}
@@ -90,7 +95,8 @@ public class CustomPartitionVertex exten
@Override
public void onVertexStarted(Map<String, List<Integer>> completions) {
int numTasks = context.getVertexNumTasks(context.getVertexName());
- List<VertexManagerPluginContext.TaskWithLocationHint> scheduledTasks = new
ArrayList<VertexManagerPluginContext.TaskWithLocationHint>(numTasks);
+ List<VertexManagerPluginContext.TaskWithLocationHint> scheduledTasks =
+ new ArrayList<VertexManagerPluginContext.TaskWithLocationHint>(numTasks);
for (int i = 0; i < numTasks; ++i) {
scheduledTasks.add(new
VertexManagerPluginContext.TaskWithLocationHint(new Integer(i), null));
}
@@ -114,6 +120,7 @@ public class CustomPartitionVertex exten
// ensure this method is called only once. Tez will call it once per Root
// Input.
Preconditions.checkState(rootVertexInitialized == false);
+ LOG.info("Root vertex not initialized");
rootVertexInitialized = true;
try {
// This is using the payload from the RootVertexInitializer corresponding
@@ -151,7 +158,7 @@ public class CustomPartitionVertex exten
}
boolean dataInformationEventSeen = false;
- Map<Path, List<FileSplit>> pathFileSplitsMap = new TreeMap<Path,
List<FileSplit>>();
+ Map<String, List<FileSplit>> pathFileSplitsMap = new TreeMap<String,
List<FileSplit>>();
for (Event event : events) {
if (event instanceof RootInputConfigureVertexTasksEvent) {
@@ -182,10 +189,10 @@ public class CustomPartitionVertex exten
} catch (IOException e) {
throw new RuntimeException("Failed to get file split for event: " +
diEvent);
}
- List<FileSplit> fsList = pathFileSplitsMap.get(fileSplit.getPath());
+ List<FileSplit> fsList =
pathFileSplitsMap.get(fileSplit.getPath().getName());
if (fsList == null) {
fsList = new ArrayList<FileSplit>();
- pathFileSplitsMap.put(fileSplit.getPath(), fsList);
+ pathFileSplitsMap.put(fileSplit.getPath().getName(), fsList);
}
fsList.add(fileSplit);
}
@@ -204,12 +211,23 @@ public class CustomPartitionVertex exten
int availableSlots = totalResource / taskResource;
LOG.info("Grouping splits. " + availableSlots + " available slots, " +
waves + " waves.");
+ JobConf jobConf = new JobConf(conf);
+ ShimLoader.getHadoopShims().getMergedCredentials(jobConf);
Multimap<Integer, InputSplit> bucketToGroupedSplitMap =
- grouper.group(conf, bucketToInitialSplitMap, availableSlots, waves);
+ HashMultimap.<Integer, InputSplit> create();
+ for (Integer key : bucketToInitialSplitMap.keySet()) {
+ InputSplit[] inputSplitArray =
+ (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0]));
+ Multimap<Integer, InputSplit> groupedSplit =
+ HiveSplitGenerator.generateGroupedSplits(jobConf, conf,
inputSplitArray, waves,
+ availableSlots);
+ bucketToGroupedSplitMap.putAll(key, groupedSplit.values());
+ }
+ LOG.info("We have grouped the splits into " +
bucketToGroupedSplitMap.size() + " tasks");
processAllEvents(inputName, bucketToGroupedSplitMap);
- } catch (IOException e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
}
}
@@ -219,7 +237,6 @@ public class CustomPartitionVertex exten
Multimap<Integer, Integer> bucketToTaskMap = HashMultimap.<Integer,
Integer> create();
List<InputSplit> finalSplits = Lists.newLinkedList();
- int taskCount = 0;
for (Entry<Integer, Collection<InputSplit>> entry :
bucketToGroupedSplitMap.asMap().entrySet()) {
int bucketNum = entry.getKey();
Collection<InputSplit> initialSplits = entry.getValue();
@@ -264,10 +281,15 @@ public class CustomPartitionVertex exten
}
// Replace the Edge Managers
+ Map<String, RootInputSpecUpdate> rootInputSpecUpdate =
+ new HashMap<String, RootInputSpecUpdate>();
+ rootInputSpecUpdate.put(
+ inputName,
+ RootInputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
context.setVertexParallelism(
taskCount,
new VertexLocationHint(grouper.createTaskLocationHints(finalSplits
- .toArray(new InputSplit[finalSplits.size()]))), emMap, null);
+ .toArray(new InputSplit[finalSplits.size()]))), emMap,
rootInputSpecUpdate);
// Set the actual events for the tasks.
context.addRootInputEvents(inputName, taskEvents);
@@ -304,7 +326,7 @@ public class CustomPartitionVertex exten
* This method generates the map of bucket to file splits.
*/
private Multimap<Integer, InputSplit> getBucketSplitMapForPath(
- Map<Path, List<FileSplit>> pathFileSplitsMap) {
+ Map<String, List<FileSplit>> pathFileSplitsMap) {
int bucketNum = 0;
int fsCount = 0;
@@ -312,7 +334,7 @@ public class CustomPartitionVertex exten
Multimap<Integer, InputSplit> bucketToInitialSplitMap =
ArrayListMultimap.<Integer, InputSplit> create();
- for (Map.Entry<Path, List<FileSplit>> entry :
pathFileSplitsMap.entrySet()) {
+ for (Map.Entry<String, List<FileSplit>> entry :
pathFileSplitsMap.entrySet()) {
int bucketId = bucketNum % numBuckets;
for (FileSplit fsplit : entry.getValue()) {
fsCount++;
Modified:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL:
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1614949&r1=1614948&r2=1614949&view=diff
==============================================================================
---
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
(original)
+++
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
Thu Jul 31 18:27:18 2014
@@ -95,12 +95,14 @@ import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexGroup;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.partition.MRPartitioner;
+import org.apache.tez.runtime.api.TezRootInputInitializer;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer;
import
org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfigurer;
import
org.apache.tez.runtime.library.conf.UnorderedUnpartitionedKVEdgeConfigurer;
@@ -415,7 +417,7 @@ public class DagUtils {
boolean useTezGroupedSplits = false;
int numTasks = -1;
- Class<HiveSplitGenerator> amSplitGeneratorClass = null;
+ Class<? extends TezRootInputInitializer> amSplitGeneratorClass = null;
InputSplitInfo inputSplitInfo = null;
Class inputFormatClass = conf.getClass("mapred.input.format.class",
InputFormat.class);
@@ -451,7 +453,11 @@ public class DagUtils {
&& !mapWork.isUseOneNullRowInputFormat()) {
// if we're generating the splits in the AM, we just need to set
// the correct plugin.
- amSplitGeneratorClass = HiveSplitGenerator.class;
+ if (useTezGroupedSplits) {
+ amSplitGeneratorClass = HiveSplitGenerator.class;
+ } else {
+ amSplitGeneratorClass = MRInputAMSplitGenerator.class;
+ }
} else {
// client side split generation means we have to compute them now
inputSplitInfo = MRHelpers.generateInputSplits(conf,
@@ -482,6 +488,7 @@ public class DagUtils {
} else {
mrInput = MRHelpers.createMRInputPayload(serializedConf, null);
}
+
map.addInput(alias,
new InputDescriptor(MRInputLegacy.class.getName()).
setUserPayload(mrInput), amSplitGeneratorClass);
Modified:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
URL:
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java?rev=1614949&r1=1614948&r2=1614949&view=diff
==============================================================================
---
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
(original)
+++
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
Thu Jul 31 18:27:18 2014
@@ -18,45 +18,29 @@
package org.apache.hadoop.hive.ql.exec.tez;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.debug.Utils;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKeyObject;
-import org.apache.hadoop.hive.ql.exec.persistence.LazyFlatRowContainer;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
-import org.apache.hadoop.hive.serde2.ByteStream.Output;
import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryFactory;
-import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
-import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryStruct;
-import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.library.api.KeyValueReader;
@@ -73,6 +57,7 @@ public class HashTableLoader implements
private Configuration hconf;
private MapJoinDesc desc;
private MapJoinKey lastKey = null;
+ private int rowCount = 0;
@Override
public void init(ExecMapperContext context, Configuration hconf,
MapJoinOperator joinOp) {
@@ -121,6 +106,7 @@ public class HashTableLoader implements
? new MapJoinBytesTableContainer(hconf, valCtx) : new
HashMapWrapper(hconf);
while (kvReader.next()) {
+ rowCount++;
lastKey = tableContainer.putRow(keyCtx,
(Writable)kvReader.getCurrentKey(),
valCtx, (Writable)kvReader.getCurrentValue());
}
Modified:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
URL:
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java?rev=1614949&r1=1614948&r2=1614949&view=diff
==============================================================================
---
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
(original)
+++
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
Thu Jul 31 18:27:18 2014
@@ -45,11 +45,12 @@ import org.apache.tez.mapreduce.protos.M
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.RootInputSpecUpdate;
-import org.apache.tez.runtime.api.TezRootInputInitializer;
-import org.apache.tez.runtime.api.TezRootInputInitializerContext;
import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
import org.apache.tez.runtime.api.events.RootInputInitializerEvent;
+import org.apache.tez.runtime.api.RootInputSpecUpdate;
+import org.apache.tez.runtime.api.TezRootInputInitializer;
+import org.apache.tez.runtime.api.TezRootInputInitializerContext;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
@@ -65,7 +66,7 @@ public class HiveSplitGenerator implemen
private static final Log LOG = LogFactory.getLog(HiveSplitGenerator.class);
- private final SplitGrouper grouper = new SplitGrouper();
+ private static final SplitGrouper grouper = new SplitGrouper();
@Override
public List<Event> initialize(TezRootInputInitializerContext
rootInputContext) throws Exception {
@@ -86,7 +87,36 @@ public class HiveSplitGenerator implemen
InputSplitInfoMem inputSplitInfo = null;
String realInputFormatName = userPayloadProto.getInputFormatName();
if (realInputFormatName != null && !realInputFormatName.isEmpty()) {
- inputSplitInfo = generateGroupedSplits(rootInputContext, jobConf, conf,
realInputFormatName);
+ // Need to instantiate the realInputFormat
+ InputFormat<?, ?> inputFormat =
+ (InputFormat<?, ?>)
ReflectionUtils.newInstance(Class.forName(realInputFormatName),
+ jobConf);
+
+ int totalResource =
rootInputContext.getTotalAvailableResource().getMemory();
+ int taskResource = rootInputContext.getVertexTaskResource().getMemory();
+ int availableSlots = totalResource / taskResource;
+
+ // Create the un-grouped splits
+ float waves =
+ conf.getFloat(TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES,
+ TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES_DEFAULT);
+
+ InputSplit[] splits = inputFormat.getSplits(jobConf, (int)
(availableSlots * waves));
+ LOG.info("Number of input splits: " + splits.length + ". " +
availableSlots
+ + " available slots, " + waves + " waves. Input format is: " +
realInputFormatName);
+
+ Multimap<Integer, InputSplit> groupedSplits =
+ generateGroupedSplits(jobConf, conf, splits, waves, availableSlots);
+ // And finally return them in a flat array
+ InputSplit[] flatSplits = groupedSplits.values().toArray(new
InputSplit[0]);
+ LOG.info("Number of grouped splits: " + flatSplits.length);
+
+ List<TaskLocationHint> locationHints =
grouper.createTaskLocationHints(flatSplits);
+
+ Utilities.clearWork(jobConf);
+
+ inputSplitInfo =
+ new InputSplitInfoMem(flatSplits, locationHints, flatSplits.length,
null, jobConf);
} else {
inputSplitInfo = MRHelpers.generateInputSplitsToMem(jobConf);
}
@@ -94,31 +124,12 @@ public class HiveSplitGenerator implemen
return createEventList(sendSerializedEvents, inputSplitInfo);
}
- private InputSplitInfoMem
generateGroupedSplits(TezRootInputInitializerContext context,
- JobConf jobConf, Configuration conf, String realInputFormatName) throws
Exception {
-
- int totalResource = context.getTotalAvailableResource().getMemory();
- int taskResource = context.getVertexTaskResource().getMemory();
- int availableSlots = totalResource / taskResource;
-
- float waves =
- conf.getFloat(TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES,
- TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES_DEFAULT);
+ public static Multimap<Integer, InputSplit> generateGroupedSplits(JobConf
jobConf,
+ Configuration conf, InputSplit[] splits, float waves, int availableSlots)
+ throws Exception {
MapWork work = Utilities.getMapWork(jobConf);
- LOG.info("Grouping splits for " + work.getName() + ". " + availableSlots +
" available slots, "
- + waves + " waves. Input format is: " + realInputFormatName);
-
- // Need to instantiate the realInputFormat
- InputFormat<?, ?> inputFormat =
- (InputFormat<?, ?>) ReflectionUtils
- .newInstance(Class.forName(realInputFormatName), jobConf);
-
- // Create the un-grouped splits
- InputSplit[] splits = inputFormat.getSplits(jobConf, (int) (availableSlots
* waves));
- LOG.info("Number of input splits: " + splits.length);
-
Multimap<Integer, InputSplit> bucketSplitMultiMap =
ArrayListMultimap.<Integer, InputSplit> create();
@@ -161,15 +172,7 @@ public class HiveSplitGenerator implemen
Multimap<Integer, InputSplit> groupedSplits =
grouper.group(jobConf, bucketSplitMultiMap, availableSlots, waves);
- // And finally return them in a flat array
- InputSplit[] flatSplits = groupedSplits.values().toArray(new
InputSplit[0]);
- LOG.info("Number of grouped splits: " + flatSplits.length);
-
- List<TaskLocationHint> locationHints =
grouper.createTaskLocationHints(flatSplits);
-
- Utilities.clearWork(jobConf);
-
- return new InputSplitInfoMem(flatSplits, locationHints, flatSplits.length,
null, jobConf);
+ return groupedSplits;
}
private List<Event> createEventList(boolean sendSerializedEvents,
InputSplitInfoMem inputSplitInfo) {
@@ -178,7 +181,8 @@ public class HiveSplitGenerator implemen
RootInputConfigureVertexTasksEvent configureVertexEvent = new
RootInputConfigureVertexTasksEvent(inputSplitInfo.getNumTasks(),
- inputSplitInfo.getTaskLocationHints(),
RootInputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
+ inputSplitInfo.getTaskLocationHints(),
+ RootInputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
events.add(configureVertexEvent);
if (sendSerializedEvents) {
Modified:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
URL:
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java?rev=1614949&r1=1614948&r2=1614949&view=diff
==============================================================================
---
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
(original)
+++
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
Thu Jul 31 18:27:18 2014
@@ -18,14 +18,13 @@
package org.apache.hadoop.hive.ql.io;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.lib.HashPartitioner;
/** Partition keys by their {@link Object#hashCode()}. */
public class DefaultHivePartitioner<K2, V2> extends HashPartitioner<K2, V2>
implements HivePartitioner<K2, V2> {
/** Use {@link Object#hashCode()} to partition. */
+ @Override
public int getBucket(K2 key, V2 value, int numBuckets) {
return (key.hashCode() & Integer.MAX_VALUE) % numBuckets;
}