Author: prasanthj
Date: Wed Jul 30 23:50:11 2014
New Revision: 1614793
URL: http://svn.apache.org/r1614793
Log:
HIVE-7509: Fast stripe level merging for ORC (Prasanth J, reviewed by Gunther
Hagleitner)
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeMapper.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeOutputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeWork.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileKeyWrapper.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileMergeMapper.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeRecordReader.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileValueWrapper.java
hive/trunk/ql/src/test/queries/clientnegative/orc_merge1.q
hive/trunk/ql/src/test/queries/clientnegative/orc_merge2.q
hive/trunk/ql/src/test/queries/clientnegative/orc_merge3.q
hive/trunk/ql/src/test/queries/clientnegative/orc_merge4.q
hive/trunk/ql/src/test/queries/clientnegative/orc_merge5.q
hive/trunk/ql/src/test/queries/clientpositive/alter_merge_2_orc.q
hive/trunk/ql/src/test/queries/clientpositive/alter_merge_orc.q
hive/trunk/ql/src/test/queries/clientpositive/alter_merge_stats_orc.q
hive/trunk/ql/src/test/queries/clientpositive/orc_merge1.q
hive/trunk/ql/src/test/queries/clientpositive/orc_merge2.q
hive/trunk/ql/src/test/queries/clientpositive/orc_merge3.q
hive/trunk/ql/src/test/queries/clientpositive/orc_merge4.q
hive/trunk/ql/src/test/results/clientnegative/orc_merge1.q.out
hive/trunk/ql/src/test/results/clientnegative/orc_merge2.q.out
hive/trunk/ql/src/test/results/clientnegative/orc_merge3.q.out
hive/trunk/ql/src/test/results/clientnegative/orc_merge4.q.out
hive/trunk/ql/src/test/results/clientnegative/orc_merge5.q.out
hive/trunk/ql/src/test/results/clientpositive/alter_merge_2_orc.q.out
hive/trunk/ql/src/test/results/clientpositive/alter_merge_orc.q.out
hive/trunk/ql/src/test/results/clientpositive/alter_merge_stats_orc.q.out
hive/trunk/ql/src/test/results/clientpositive/orc_merge1.q.out
hive/trunk/ql/src/test/results/clientpositive/orc_merge2.q.out
hive/trunk/ql/src/test/results/clientpositive/orc_merge3.q.out
hive/trunk/ql/src/test/results/clientpositive/orc_merge4.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/alter_merge_2_orc.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/alter_merge_orc.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/alter_merge_stats_orc.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/orc_merge1.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/orc_merge2.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/orc_merge3.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/orc_merge4.q.out
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/itests/qtest/testconfiguration.properties
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTablePartMergeFilesDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
hive/trunk/ql/src/test/results/clientpositive/infer_bucket_sort_dyn_part.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_10.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_4.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_6.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_7.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_9.q.out
hive/trunk/ql/src/test/results/clientpositive/merge_dynamic_partition4.q.out
hive/trunk/ql/src/test/results/clientpositive/merge_dynamic_partition5.q.out
hive/trunk/ql/src/test/results/clientpositive/orc_createas1.q.out
hive/trunk/ql/src/test/results/clientpositive/rcfile_createas1.q.out
hive/trunk/ql/src/test/results/clientpositive/rcfile_merge1.q.out
hive/trunk/ql/src/test/results/clientpositive/rcfile_merge2.q.out
hive/trunk/ql/src/test/results/clientpositive/rcfile_merge3.q.out
hive/trunk/ql/src/test/results/clientpositive/union_remove_10.q.out
hive/trunk/ql/src/test/results/clientpositive/union_remove_11.q.out
hive/trunk/ql/src/test/results/clientpositive/union_remove_12.q.out
hive/trunk/ql/src/test/results/clientpositive/union_remove_13.q.out
hive/trunk/ql/src/test/results/clientpositive/union_remove_14.q.out
hive/trunk/ql/src/test/results/clientpositive/union_remove_16.q.out
hive/trunk/ql/src/test/results/clientpositive/union_remove_9.q.out
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL:
http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1614793&r1=1614792&r2=1614793&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
(original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed
Jul 30 23:50:11 2014
@@ -783,6 +783,14 @@ public class HiveConf extends Configurat
HIVEMERGERCFILEBLOCKLEVEL("hive.merge.rcfile.block.level", true, ""),
HIVEMERGEINPUTFORMATBLOCKLEVEL("hive.merge.input.format.block.level",
"org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileBlockMergeInputFormat", ""),
+ HIVEMERGEORCFILESTRIPELEVEL("hive.merge.orcfile.stripe.level", true,
+ "When hive.merge.mapfiles or hive.merge.mapredfiles is enabled while
writing a\n" +
+ " table with ORC file format, enabling this config will do stripe
level fast merge\n" +
+ " for small ORC files. Note that enabling this config will not honor
padding tolerance\n" +
+ " config (hive.exec.orc.block.padding.tolerance)."),
+ HIVEMERGEINPUTFORMATSTRIPELEVEL("hive.merge.input.format.stripe.level",
+ "org.apache.hadoop.hive.ql.io.orc.OrcFileStripeMergeInputFormat",
+ "Input file format to use for ORC stripe level merging (for internal
use only)"),
HIVEMERGECURRENTJOBHASDYNAMICPARTITIONS(
"hive.merge.current.job.has.dynamic.partitions", false, ""),
Modified: hive/trunk/itests/qtest/testconfiguration.properties
URL:
http://svn.apache.org/viewvc/hive/trunk/itests/qtest/testconfiguration.properties?rev=1614793&r1=1614792&r2=1614793&view=diff
==============================================================================
--- hive/trunk/itests/qtest/testconfiguration.properties (original)
+++ hive/trunk/itests/qtest/testconfiguration.properties Wed Jul 30 23:50:11
2014
@@ -1,5 +1,5 @@
minimr.query.files=stats_counter_partitioned.q,list_bucket_dml_10.q,input16_cc.q,scriptfile1.q,scriptfile1_win.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q,bucketmapjoin7.q,optrstat_groupby.q,bucket_num_reducers.q,bucket5.q,load_fs2.q,bucket_num_reducers2.q,infer_bucket_sort_merge.q,infer_bucket_sort_reducers_power_two.q,infer_bucket_sort_dyn_part.q,infer_bucket_sort_bucketed_table.q,infer_bucket_sort_map_operators.q,infer_bucket_sort_num_buckets.q,leftsemijoin_mr.q,schemeAuthority.q,schemeAuthority2.q,truncate_column_buckets.q,remote_script.q,,load_hdfs_file_with_space_in_the_name.q,parallel_orderby.q,import_exported_table.q,stats_counter.q,auto_sortmerge_join_16.q,quotedid_smb.q,file_with_header_footer.q,external_table_with_space_in_location_path.q,root_dir_external_table.q,index_bitmap3.q,ql_rewrite_gbtoidx.q,index_bitmap_auto.q,udf_using.q,empty_dir_in_table.q,temp_table_external.q
minimr.query.negative.files=cluster_tasklog_retrieval.q,minimr_broken_pipe.q,mapreduce_stack_trace.q,mapreduce_stack_trace_turnoff.q,mapreduce_stack_trace_hadoop20.q,mapreduce_stack_trace_turnoff_hadoop20.q,file_with_header_footer_negative.q,udf_local_resource.q
minitez.query.files=tez_fsstat.q,mapjoin_decimal.q,tez_join_tests.q,tez_joins_explain.q,mrr.q,tez_dml.q,tez_insert_overwrite_local_directory_1.q,tez_union.q,bucket_map_join_tez1.q,bucket_map_join_tez2.q,tez_schema_evolution.q,tez_join_hash.q
-minitez.query.files.shared=cross_product_check_1.q,cross_product_check_2.q,dynpart_sort_opt_vectorization.q,dynpart_sort_optimization.q,orc_analyze.q,join0.q,join1.q,auto_join0.q,auto_join1.q,bucket2.q,bucket3.q,bucket4.q,count.q,create_merge_compressed.q,cross_join.q,ctas.q,custom_input_output_format.q,disable_merge_for_bucketing.q,enforce_order.q,filter_join_breaktask.q,filter_join_breaktask2.q,groupby1.q,groupby2.q,groupby3.q,having.q,insert1.q,insert_into1.q,insert_into2.q,leftsemijoin.q,limit_pushdown.q,load_dyn_part1.q,load_dyn_part2.q,load_dyn_part3.q,mapjoin_mapjoin.q,mapreduce1.q,mapreduce2.q,merge1.q,merge2.q,metadata_only_queries.q,sample1.q,subquery_in.q,subquery_exists.q,vectorization_15.q,ptf.q,stats_counter.q,stats_noscan_1.q,stats_counter_partitioned.q,union2.q,union3.q,union4.q,union5.q,union6.q,union7.q,union8.q,union9.q,transform1.q,transform2.q,transform_ppr1.q,transform_ppr2.q,script_env_var1.q,script_env_var2.q,script_pipe.q,scriptfile1.q,metadataonly1.q,temp_t
able.q,vectorized_ptf.q,optimize_nullscan.q,vector_cast_constant.q,vector_string_concat.q
+minitez.query.files.shared=orc_merge1.q,orc_merge2.q,orc_merge3.q,orc_merge4.q,alter_merge_orc.q,alter_merge_2_orc.q,alter_merge_stats_orc.q,cross_product_check_1.q,cross_product_check_2.q,dynpart_sort_opt_vectorization.q,dynpart_sort_optimization.q,orc_analyze.q,join0.q,join1.q,auto_join0.q,auto_join1.q,bucket2.q,bucket3.q,bucket4.q,count.q,create_merge_compressed.q,cross_join.q,ctas.q,custom_input_output_format.q,disable_merge_for_bucketing.q,enforce_order.q,filter_join_breaktask.q,filter_join_breaktask2.q,groupby1.q,groupby2.q,groupby3.q,having.q,insert1.q,insert_into1.q,insert_into2.q,leftsemijoin.q,limit_pushdown.q,load_dyn_part1.q,load_dyn_part2.q,load_dyn_part3.q,mapjoin_mapjoin.q,mapreduce1.q,mapreduce2.q,merge1.q,merge2.q,metadata_only_queries.q,sample1.q,subquery_in.q,subquery_exists.q,vectorization_15.q,ptf.q,stats_counter.q,stats_noscan_1.q,stats_counter_partitioned.q,union2.q,union3.q,union4.q,union5.q,union6.q,union7.q,union8.q,union9.q,transform1.q,transform2.q,transf
orm_ppr1.q,transform_ppr2.q,script_env_var1.q,script_env_var2.q,script_pipe.q,scriptfile1.q,metadataonly1.q,temp_table.q,vectorized_ptf.q,optimize_nullscan.q,vector_cast_constant.q,vector_string_concat.q
beeline.positive.exclude=add_part_exist.q,alter1.q,alter2.q,alter4.q,alter5.q,alter_rename_partition.q,alter_rename_partition_authorization.q,archive.q,archive_corrupt.q,archive_multi.q,archive_mr_1806.q,archive_multi_mr_1806.q,authorization_1.q,authorization_2.q,authorization_4.q,authorization_5.q,authorization_6.q,authorization_7.q,ba_table1.q,ba_table2.q,ba_table3.q,ba_table_udfs.q,binary_table_bincolserde.q,binary_table_colserde.q,cluster.q,columnarserde_create_shortcut.q,combine2.q,constant_prop.q,create_nested_type.q,create_or_replace_view.q,create_struct_table.q,create_union_table.q,database.q,database_location.q,database_properties.q,ddltime.q,describe_database_json.q,drop_database_removes_partition_dirs.q,escape1.q,escape2.q,exim_00_nonpart_empty.q,exim_01_nonpart.q,exim_02_00_part_empty.q,exim_02_part.q,exim_03_nonpart_over_compat.q,exim_04_all_part.q,exim_04_evolved_parts.q,exim_05_some_part.q,exim_06_one_part.q,exim_07_all_part_over_nonoverlap.q,exim_08_nonpart_rename.q,
exim_09_part_spec_nonoverlap.q,exim_10_external_managed.q,exim_11_managed_external.q,exim_12_external_location.q,exim_13_managed_location.q,exim_14_managed_location_over_existing.q,exim_15_external_part.q,exim_16_part_external.q,exim_17_part_managed.q,exim_18_part_external.q,exim_19_00_part_external_location.q,exim_19_part_external_location.q,exim_20_part_managed_location.q,exim_21_export_authsuccess.q,exim_22_import_exist_authsuccess.q,exim_23_import_part_authsuccess.q,exim_24_import_nonexist_authsuccess.q,global_limit.q,groupby_complex_types.q,groupby_complex_types_multi_single_reducer.q,index_auth.q,index_auto.q,index_auto_empty.q,index_bitmap.q,index_bitmap1.q,index_bitmap2.q,index_bitmap3.q,index_bitmap_auto.q,index_bitmap_rc.q,index_compact.q,index_compact_1.q,index_compact_2.q,index_compact_3.q,index_stale_partitioned.q,init_file.q,input16.q,input16_cc.q,input46.q,input_columnarserde.q,input_dynamicserde.q,input_lazyserde.q,input_testxpath3.q,input_testxpath4.q,insert2_overwr
ite_partitions.q,insertexternal1.q,join_thrift.q,lateral_view.q,load_binary_data.q,load_exist_part_authsuccess.q,load_nonpart_authsuccess.q,load_part_authsuccess.q,loadpart_err.q,lock1.q,lock2.q,lock3.q,lock4.q,merge_dynamic_partition.q,multi_insert.q,multi_insert_move_tasks_share_dependencies.q,null_column.q,ppd_clusterby.q,query_with_semi.q,rename_column.q,sample6.q,sample_islocalmode_hook.q,set_processor_namespaces.q,show_tables.q,source.q,split_sample.q,str_to_map.q,transform1.q,udaf_collect_set.q,udaf_context_ngrams.q,udaf_histogram_numeric.q,udaf_ngrams.q,udaf_percentile_approx.q,udf_array.q,udf_bitmap_and.q,udf_bitmap_or.q,udf_explode.q,udf_format_number.q,udf_map.q,udf_map_keys.q,udf_map_values.q,udf_max.q,udf_min.q,udf_named_struct.q,udf_percentile.q,udf_printf.q,udf_sentences.q,udf_sort_array.q,udf_split.q,udf_struct.q,udf_substr.q,udf_translate.q,udf_union.q,udf_xpath.q,udtf_stack.q,view.q,virtual_column.q
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=1614793&r1=1614792&r2=1614793&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Wed Jul
30 23:50:11 2014
@@ -88,9 +88,9 @@ import org.apache.hadoop.hive.ql.QueryPl
import org.apache.hadoop.hive.ql.exec.ArchiveUtils.PartSpecInfo;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.io.merge.MergeTask;
+import org.apache.hadoop.hive.ql.io.merge.MergeWork;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
-import org.apache.hadoop.hive.ql.io.rcfile.merge.BlockMergeTask;
-import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateTask;
import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateWork;
import org.apache.hadoop.hive.ql.lockmgr.DbLockManager;
@@ -550,12 +550,13 @@ public class DDLTask extends Task<DDLWor
throws HiveException {
// merge work only needs input and output.
MergeWork mergeWork = new MergeWork(mergeFilesDesc.getInputDir(),
- mergeFilesDesc.getOutputDir());
+ mergeFilesDesc.getOutputDir(), mergeFilesDesc.getInputFormatClass());
mergeWork.setListBucketingCtx(mergeFilesDesc.getLbCtx());
mergeWork.resolveConcatenateMerge(db.getConf());
mergeWork.setMapperCannotSpanPartns(true);
+ mergeWork.setSourceTableInputFormat(mergeFilesDesc.getInputFormatClass());
DriverContext driverCxt = new DriverContext();
- BlockMergeTask taskExec = new BlockMergeTask();
+ MergeTask taskExec = new MergeTask();
taskExec.initialize(db.getConf(), null, driverCxt);
taskExec.setWork(mergeWork);
taskExec.setQueryPlan(this.getQueryPlan());
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=1614793&r1=1614792&r2=1614793&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Wed Jul
30 23:50:11 2014
@@ -36,7 +36,7 @@ import org.apache.hadoop.hive.ql.exec.mr
import org.apache.hadoop.hive.ql.hooks.LineageInfo.DataContainer;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
-import org.apache.hadoop.hive.ql.io.rcfile.merge.BlockMergeTask;
+import org.apache.hadoop.hive.ql.io.merge.MergeTask;
import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj;
@@ -294,7 +294,7 @@ public class MoveTask extends Task<MoveW
while (task.getParentTasks() != null && task.getParentTasks().size()
== 1) {
task = (Task)task.getParentTasks().get(0);
// If it was a merge task or a local map reduce task, nothing can
be inferred
- if (task instanceof BlockMergeTask || task instanceof
MapredLocalTask) {
+ if (task instanceof MergeTask || task instanceof MapredLocalTask) {
break;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java?rev=1614793&r1=1614792&r2=1614793&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java Wed
Jul 30 23:50:11 2014
@@ -28,8 +28,8 @@ import org.apache.hadoop.hive.ql.exec.mr
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.index.IndexMetadataChangeTask;
import org.apache.hadoop.hive.ql.index.IndexMetadataChangeWork;
-import org.apache.hadoop.hive.ql.io.rcfile.merge.BlockMergeTask;
-import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
+import org.apache.hadoop.hive.ql.io.merge.MergeTask;
+import org.apache.hadoop.hive.ql.io.merge.MergeWork;
import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanTask;
import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork;
import org.apache.hadoop.hive.ql.plan.ColumnStatsWork;
@@ -93,7 +93,7 @@ public final class TaskFactory {
taskvec.add(new TaskTuple<StatsNoJobWork>(StatsNoJobWork.class,
StatsNoJobTask.class));
taskvec.add(new TaskTuple<ColumnStatsWork>(ColumnStatsWork.class,
ColumnStatsTask.class));
taskvec.add(new TaskTuple<MergeWork>(MergeWork.class,
- BlockMergeTask.class));
+ MergeTask.class));
taskvec.add(new
TaskTuple<DependencyCollectionWork>(DependencyCollectionWork.class,
DependencyCollectionTask.class));
taskvec.add(new TaskTuple<PartialScanWork>(PartialScanWork.class,
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1614793&r1=1614792&r2=1614793&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Wed
Jul 30 23:50:11 2014
@@ -120,7 +120,8 @@ import org.apache.hadoop.hive.ql.io.Hive
import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat;
-import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
+import org.apache.hadoop.hive.ql.io.merge.MergeWork;
+import org.apache.hadoop.hive.ql.io.orc.OrcFileMergeMapper;
import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileMergeMapper;
import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanMapper;
import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork;
@@ -346,7 +347,8 @@ public final class Utilities {
if(MAP_PLAN_NAME.equals(name)){
if
(ExecMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))){
gWork = deserializePlan(in, MapWork.class, conf);
- } else
if(RCFileMergeMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {
+ } else
if(RCFileMergeMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS)) ||
+
OrcFileMergeMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {
gWork = deserializePlan(in, MergeWork.class, conf);
} else
if(ColumnTruncateMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {
gWork = deserializePlan(in, ColumnTruncateWork.class, conf);
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeInputFormat.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeInputFormat.java?rev=1614793&view=auto
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeInputFormat.java
(added)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeInputFormat.java
Wed Jul 30 23:50:11 2014
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.merge;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+public abstract class MergeInputFormat extends FileInputFormat {
+
+ @Override
+ public abstract RecordReader getRecordReader(InputSplit split, JobConf job,
+ Reporter reporter) throws IOException;
+
+}
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeMapper.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeMapper.java?rev=1614793&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeMapper.java
(added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeMapper.java
Wed Jul 30 23:50:11 2014
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.merge;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+
+public class MergeMapper extends MapReduceBase {
+ protected JobConf jc;
+ protected Path finalPath;
+ protected FileSystem fs;
+ protected boolean exception = false;
+ protected boolean autoDelete = false;
+ protected Path outPath;
+ protected boolean hasDynamicPartitions = false;
+ protected boolean isListBucketingDML = false;
+ protected boolean isListBucketingAlterTableConcatenate = false;
+ //used as depth for dir-calculation and if it is list bucketing case.
+ protected int listBucketingDepth;
+ protected boolean tmpPathFixedConcatenate = false;
+ protected boolean tmpPathFixed = false;
+ protected Path tmpPath;
+ protected Path taskTmpPath;
+ protected Path dpPath;
+
+ public final static Log LOG = LogFactory.getLog("MergeMapper");
+
+ @Override
+ public void configure(JobConf job) {
+ jc = job;
+ hasDynamicPartitions = HiveConf.getBoolVar(job,
+ HiveConf.ConfVars.HIVEMERGECURRENTJOBHASDYNAMICPARTITIONS);
+ isListBucketingAlterTableConcatenate = HiveConf.getBoolVar(job,
+ HiveConf.ConfVars.HIVEMERGECURRENTJOBCONCATENATELISTBUCKETING);
+ listBucketingDepth = HiveConf.getIntVar(job,
+ HiveConf.ConfVars.HIVEMERGECURRENTJOBCONCATENATELISTBUCKETINGDEPTH);
+
+ Path specPath = MergeOutputFormat.getMergeOutputPath(job);
+ Path tmpPath = Utilities.toTempPath(specPath);
+ Path taskTmpPath = Utilities.toTaskTempPath(specPath);
+ updatePaths(tmpPath, taskTmpPath);
+ try {
+ fs = specPath.getFileSystem(job);
+ autoDelete = fs.deleteOnExit(outPath);
+ } catch (IOException e) {
+ this.exception = true;
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void updatePaths(Path tmpPath, Path taskTmpPath) {
+ String taskId = Utilities.getTaskId(jc);
+ this.tmpPath = tmpPath;
+ this.taskTmpPath = taskTmpPath;
+ finalPath = new Path(tmpPath, taskId);
+ outPath = new Path(taskTmpPath, Utilities.toTempPath(taskId));
+ }
+
+
+ /**
+ * Validates that each input path belongs to the same partition since each
mapper merges the input
+ * to a single output directory
+ * @param inputPath
+ * @throws HiveException
+ */
+ protected void checkPartitionsMatch(Path inputPath) throws HiveException {
+ if (!dpPath.equals(inputPath)) {
+ // Temp partition input path does not match exist temp path
+ String msg = "Multiple partitions for one block merge mapper: " + dpPath
+ " NOT EQUAL TO "
+ + inputPath;
+ LOG.error(msg);
+ throw new HiveException(msg);
+ }
+ }
+
+ /**
+ * Fixes tmpPath to point to the correct partition. Before this is called,
tmpPath will default to
+ * the root tmp table dir fixTmpPath(..) works for DP + LB + multiple skewed
values + merge.
+ * reason: 1. fixTmpPath(..) compares inputPath and tmpDepth, find out path
difference and put it
+ * into newPath. Then add newpath to existing this.tmpPath and
this.taskTmpPath. 2. The path
+ * difference between inputPath and tmpDepth can be DP or DP+LB. It will
automatically handle it.
+ * 3. For example, if inputpath is
<prefix>/-ext-10002/hr=a1/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/
+ * HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME tmppath is <prefix>/_tmp.-ext-10000
newpath will be
+ *
hr=a1/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME
Then,
+ * this.tmpPath and this.taskTmpPath will be update correctly. We have
list_bucket_dml_6.q cover
+ * this case: DP + LP + multiple skewed values + merge.
+ * @param inputPath
+ * @throws HiveException
+ * @throws IOException
+ */
+ protected void fixTmpPath(Path inputPath) throws HiveException, IOException {
+ dpPath = inputPath;
+ Path newPath = new Path(".");
+ int inputDepth = inputPath.depth();
+ int tmpDepth = tmpPath.depth();
+
+ // Build the path from bottom up
+ while (inputPath != null && inputDepth > tmpDepth) {
+ newPath = new Path(inputPath.getName(), newPath);
+ inputDepth--;
+ inputPath = inputPath.getParent();
+ }
+
+ Path newTmpPath = new Path(tmpPath, newPath);
+ Path newTaskTmpPath = new Path(taskTmpPath, newPath);
+ if (!fs.exists(newTmpPath)) {
+ fs.mkdirs(newTmpPath);
+ }
+ updatePaths(newTmpPath, newTaskTmpPath);
+ }
+
+ /**
+ * Fixes tmpPath to point to the correct list bucketing sub-directories.
Before this is called,
+ * tmpPath will default to the root tmp table dir Reason to add a new method
instead of changing
+ * fixTmpPath() Reason 1: logic has slightly difference fixTmpPath(..) needs
2 variables in order
+ * to decide path delta which is in variable newPath. 1. inputPath.depth()
2. tmpPath.depth()
+ * fixTmpPathConcatenate needs 2 variables too but one of them is different
from fixTmpPath(..) 1.
+ * inputPath.depth() 2. listBucketingDepth Reason 2: less risks The existing
logic is a little not
+ * trivial around map() and fixTmpPath(). In order to ensure minimum impact
on existing flow, we
+ * try to avoid change on existing code/flow but add new code for new
feature.
+ * @param inputPath
+ * @throws HiveException
+ * @throws IOException
+ */
+ protected void fixTmpPathConcatenate(Path inputPath) throws HiveException,
IOException {
+ dpPath = inputPath;
+ Path newPath = new Path(".");
+
+ int depth = listBucketingDepth;
+ // Build the path from bottom up. pick up list bucketing subdirectories
+ while ((inputPath != null) && (depth > 0)) {
+ newPath = new Path(inputPath.getName(), newPath);
+ inputPath = inputPath.getParent();
+ depth--;
+ }
+
+ Path newTmpPath = new Path(tmpPath, newPath);
+ Path newTaskTmpPath = new Path(taskTmpPath, newPath);
+ if (!fs.exists(newTmpPath)) {
+ fs.mkdirs(newTmpPath);
+ }
+ updatePaths(newTmpPath, newTaskTmpPath);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!exception) {
+ FileStatus fss = fs.getFileStatus(outPath);
+ LOG.info("renamed path " + outPath + " to " + finalPath + " . File size
is " + fss.getLen());
+ if (!fs.rename(outPath, finalPath)) {
+ throw new IOException("Unable to rename output to " + finalPath);
+ }
+ } else {
+ if (!autoDelete) {
+ fs.delete(outPath, true);
+ }
+ }
+ }
+
+ protected void fixTmpPathAlterTable(Path path) throws IOException,
HiveException {
+ /**
+ * 1. boolean isListBucketingAlterTableConcatenate will be true only if it
is alter table ...
+ * concatenate on stored-as-dir so it will handle list bucketing alter
table merge in the if
+ * cause with the help of fixTmpPathConcatenate 2. If it is DML,
+ * isListBucketingAlterTableConcatenate will be false so that it will be
handled by else
+ * cause. In this else cause, we have another if check. 2.1 the if check
will make sure DP or
+ * LB, we will fix path with the help of fixTmpPath(..). Since both has
sub-directories. it
+ * includes SP + LB. 2.2 only SP without LB, we dont fix path.
+ */
+
+ // Fix temp path for alter table ... concatenate
+ if (isListBucketingAlterTableConcatenate) {
+ if (this.tmpPathFixedConcatenate) {
+ checkPartitionsMatch(path);
+ } else {
+ fixTmpPathConcatenate(path);
+ tmpPathFixedConcatenate = true;
+ }
+ } else {
+ if (hasDynamicPartitions || (listBucketingDepth > 0)) {
+ if (tmpPathFixed) {
+ checkPartitionsMatch(path);
+ } else {
+ // We haven't fixed the TMP path for this mapper yet
+ fixTmpPath(path);
+ tmpPathFixed = true;
+ }
+ }
+ }
+ }
+}
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeOutputFormat.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeOutputFormat.java?rev=1614793&view=auto
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeOutputFormat.java
(added)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeOutputFormat.java
Wed Jul 30 23:50:11 2014
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.merge;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.util.Progressable;
+
+public abstract class MergeOutputFormat extends
+ FileOutputFormat<Object, Object> {
+
+ public static void setMergeOutputPath(JobConf job, Path path) {
+ job.set("hive.merge.output.dir", path.toString());
+ }
+
+ public static Path getMergeOutputPath(JobConf conf) {
+ String name = conf.get("hive.merge.output.dir");
+ return name == null ? null: new Path(name);
+ }
+
+ public abstract RecordWriter<Object, Object> getRecordWriter(
+ FileSystem ignored, JobConf job, String name, Progressable progress)
+ throws IOException;
+}
\ No newline at end of file
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeTask.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeTask.java?rev=1614793&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeTask.java
(added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeTask.java
Wed Jul 30 23:50:11 2014
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.merge;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.HadoopJobExecHelper;
+import org.apache.hadoop.hive.ql.exec.mr.HadoopJobExecHook;
+import org.apache.hadoop.hive.ql.exec.mr.Throttle;
+import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.log4j.Appender;
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.LogManager;
+
+public class MergeTask extends Task<MergeWork> implements Serializable,
+ HadoopJobExecHook {
+
+ private static final long serialVersionUID = 1L;
+
+ public static String BACKUP_PREFIX = "_backup.";
+
+ protected transient JobConf job;
+ protected HadoopJobExecHelper jobExecHelper;
+
+ @Override
+ public void initialize(HiveConf conf, QueryPlan queryPlan,
+ DriverContext driverContext) {
+ super.initialize(conf, queryPlan, driverContext);
+ job = new JobConf(conf, MergeTask.class);
+ jobExecHelper = new HadoopJobExecHelper(job, this.console, this, this);
+ }
+
+ @Override
+ public boolean requireLock() {
+ return true;
+ }
+
+ boolean success = true;
+
+ @Override
+ /**
+ * start a new map-reduce job to do the merge, almost the same as ExecDriver.
+ */
+ public int execute(DriverContext driverContext) {
+ HiveConf.setVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT,
+ CombineHiveInputFormat.class.getName());
+ success = true;
+ ShimLoader.getHadoopShims().prepareJobOutput(job);
+ job.setOutputFormat(HiveOutputFormatImpl.class);
+ Class<? extends Mapper> mapperClass =
work.getMapperClass(work.getSourceTableInputFormat());
+ LOG.info("Using " + mapperClass.getCanonicalName() + " mapper class.");
+ job.setMapperClass(mapperClass);
+
+ Context ctx = driverContext.getCtx();
+ boolean ctxCreated = false;
+ try {
+ if (ctx == null) {
+ ctx = new Context(job);
+ ctxCreated = true;
+ }
+ }catch (IOException e) {
+ e.printStackTrace();
+ console.printError("Error launching map-reduce job", "\n"
+ + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ return 5;
+ }
+
+ job.setMapOutputKeyClass(NullWritable.class);
+ job.setMapOutputValueClass(NullWritable.class);
+ if(work.getNumMapTasks() != null) {
+ job.setNumMapTasks(work.getNumMapTasks());
+ }
+
+ // zero reducers
+ job.setNumReduceTasks(0);
+
+ if (work.getMinSplitSize() != null) {
+ HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZE, work
+ .getMinSplitSize().longValue());
+ }
+
+ if (work.getInputformat() != null) {
+ HiveConf.setVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT, work
+ .getInputformat());
+ }
+
+ String inpFormat = HiveConf.getVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT);
+ if ((inpFormat == null) || (!StringUtils.isNotBlank(inpFormat))) {
+ inpFormat = ShimLoader.getHadoopShims().getInputFormatClassName();
+ }
+
+ LOG.info("Using " + inpFormat);
+
+ try {
+ job.setInputFormat((Class<? extends InputFormat>) (Class
+ .forName(inpFormat)));
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e.getMessage());
+ }
+
+ Path outputPath = this.work.getOutputDir();
+ Path tempOutPath = Utilities.toTempPath(outputPath);
+ try {
+ FileSystem fs = tempOutPath.getFileSystem(job);
+ if (!fs.exists(tempOutPath)) {
+ fs.mkdirs(tempOutPath);
+ }
+ } catch (IOException e) {
+ console.printError("Can't make path " + outputPath + " : " +
e.getMessage());
+ return 6;
+ }
+
+ MergeOutputFormat.setMergeOutputPath(job, outputPath);
+
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(NullWritable.class);
+
+ HiveConf.setBoolVar(job,
+ HiveConf.ConfVars.HIVEMERGECURRENTJOBHASDYNAMICPARTITIONS,
+ work.hasDynamicPartitions());
+
+ HiveConf.setBoolVar(job,
+ HiveConf.ConfVars.HIVEMERGECURRENTJOBCONCATENATELISTBUCKETING,
+ work.isListBucketingAlterTableConcatenate());
+
+ HiveConf.setIntVar(
+ job,
+ HiveConf.ConfVars.HIVEMERGECURRENTJOBCONCATENATELISTBUCKETINGDEPTH,
+ ((work.getListBucketingCtx() == null) ? 0 : work.getListBucketingCtx()
+ .calculateListBucketingLevel()));
+
+ int returnVal = 0;
+ RunningJob rj = null;
+ boolean noName = StringUtils.isEmpty(HiveConf.getVar(job,
+ HiveConf.ConfVars.HADOOPJOBNAME));
+
+ String jobName = null;
+ if (noName && this.getQueryPlan() != null) {
+ int maxlen = conf.getIntVar(HiveConf.ConfVars.HIVEJOBNAMELENGTH);
+ jobName = Utilities.abbreviate(this.getQueryPlan().getQueryStr(),
+ maxlen - 6);
+ }
+
+ if (noName) {
+ // This is for a special case to ensure unit tests pass
+ HiveConf.setVar(job, HiveConf.ConfVars.HADOOPJOBNAME,
+ jobName != null ? jobName : "JOB" + Utilities.randGen.nextInt());
+ }
+
+ try {
+ addInputPaths(job, work);
+
+ Utilities.setMapWork(job, work, ctx.getMRTmpPath(), true);
+
+ // remove the pwd from conf file so that job tracker doesn't show this
+ // logs
+ String pwd = HiveConf.getVar(job, HiveConf.ConfVars.METASTOREPWD);
+ if (pwd != null) {
+ HiveConf.setVar(job, HiveConf.ConfVars.METASTOREPWD, "HIVE");
+ }
+ JobClient jc = new JobClient(job);
+
+ String addedJars = Utilities.getResourceFiles(job,
SessionState.ResourceType.JAR);
+ if (!addedJars.isEmpty()) {
+ job.set("tmpjars", addedJars);
+ }
+
+ // make this client wait if job trcker is not behaving well.
+ Throttle.checkJobTracker(job, LOG);
+
+ // Finally SUBMIT the JOB!
+ rj = jc.submitJob(job);
+
+ returnVal = jobExecHelper.progress(rj, jc, null);
+ success = (returnVal == 0);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ String mesg = " with exception '" + Utilities.getNameMessage(e) + "'";
+ if (rj != null) {
+ mesg = "Ended Job = " + rj.getJobID() + mesg;
+ } else {
+ mesg = "Job Submission failed" + mesg;
+ }
+
+ // Has to use full name to make sure it does not conflict with
+ // org.apache.commons.lang.StringUtils
+ console.printError(mesg, "\n"
+ + org.apache.hadoop.util.StringUtils.stringifyException(e));
+
+ success = false;
+ returnVal = 1;
+ } finally {
+ try {
+ if (ctxCreated) {
+ ctx.clear();
+ }
+ if (rj != null) {
+ if (returnVal != 0) {
+ rj.killJob();
+ }
+ HadoopJobExecHelper.runningJobs.remove(rj);
+ jobID = rj.getID().toString();
+ }
+ closeJob(outputPath, success, job, console, work.getDynPartCtx(),
null);
+ } catch (Exception e) {
+ }
+ }
+
+ return (returnVal);
+ }
+
+ private Path backupOutputPath(FileSystem fs, Path outpath, JobConf job)
+ throws IOException, HiveException {
+ if (fs.exists(outpath)) {
+ Path backupPath = new Path(outpath.getParent(), BACKUP_PREFIX
+ + outpath.getName());
+ Utilities.rename(fs, outpath, backupPath);
+ return backupPath;
+ } else {
+ return null;
+ }
+ }
+
+ private void closeJob(Path outputPath, boolean success, JobConf job,
+ LogHelper console, DynamicPartitionCtx dynPartCtx, Reporter reporter
+ ) throws HiveException, IOException {
+ FileSystem fs = outputPath.getFileSystem(job);
+ Path backupPath = backupOutputPath(fs, outputPath, job);
+ Utilities.mvFileToFinalPath(outputPath, job, success, LOG, dynPartCtx,
null,
+ reporter);
+ fs.delete(backupPath, true);
+ }
+
+ private void addInputPaths(JobConf job, MergeWork work) {
+ for (Path path : work.getInputPaths()) {
+ FileInputFormat.addInputPath(job, path);
+ }
+ }
+
+ @Override
+ public String getName() {
+ return "MergeTask";
+ }
+
+ public static String INPUT_SEPERATOR = ":";
+
+ public static void main(String[] args) {
+ String inputPathStr = null;
+ String outputDir = null;
+ String jobConfFileName = null;
+ String format = null;
+
+ try {
+ for (int i = 0; i < args.length; i++) {
+ if (args[i].equals("-input")) {
+ inputPathStr = args[++i];
+ } else if (args[i].equals("-jobconffile")) {
+ jobConfFileName = args[++i];
+ } else if (args[i].equals("-outputDir")) {
+ outputDir = args[++i];
+ } else if (args[i].equals("-format")) {
+ format = args[++i];
+ }
+ }
+ } catch (IndexOutOfBoundsException e) {
+ System.err.println("Missing argument to option");
+ printUsage();
+ }
+
+ if (inputPathStr == null || outputDir == null
+ || outputDir.trim().equals("")) {
+ printUsage();
+ }
+
+ List<Path> inputPaths = new ArrayList<Path>();
+ String[] paths = inputPathStr.split(INPUT_SEPERATOR);
+ if (paths == null || paths.length == 0) {
+ printUsage();
+ }
+
+ FileSystem fs = null;
+ JobConf conf = new JobConf(MergeTask.class);
+ for (String path : paths) {
+ try {
+ Path pathObj = new Path(path);
+ if (fs == null) {
+ fs = FileSystem.get(pathObj.toUri(), conf);
+ }
+ FileStatus fstatus = fs.getFileStatus(pathObj);
+ if (fstatus.isDir()) {
+ FileStatus[] fileStatus = fs.listStatus(pathObj);
+ for (FileStatus st : fileStatus) {
+ inputPaths.add(st.getPath());
+ }
+ } else {
+ inputPaths.add(fstatus.getPath());
+ }
+ } catch (IOException e) {
+ e.printStackTrace(System.err);
+ }
+ }
+
+ if (jobConfFileName != null) {
+ conf.addResource(new Path(jobConfFileName));
+ }
+ HiveConf hiveConf = new HiveConf(conf, MergeTask.class);
+
+ Log LOG = LogFactory.getLog(MergeTask.class.getName());
+ boolean isSilent = HiveConf.getBoolVar(conf,
+ HiveConf.ConfVars.HIVESESSIONSILENT);
+ LogHelper console = new LogHelper(LOG, isSilent);
+
+ // print out the location of the log file for the user so
+ // that it's easy to find reason for local mode execution failures
+ for (Appender appender : Collections
+ .list((Enumeration<Appender>) LogManager.getRootLogger()
+ .getAllAppenders())) {
+ if (appender instanceof FileAppender) {
+ console.printInfo("Execution log at: "
+ + ((FileAppender) appender).getFile());
+ }
+ }
+
+ MergeWork mergeWork = null;
+ if (format.equals("rcfile")) {
+ mergeWork = new MergeWork(inputPaths, new Path(outputDir),
RCFileInputFormat.class);
+ } else if (format.equals("orcfile")) {
+ mergeWork = new MergeWork(inputPaths, new Path(outputDir),
OrcInputFormat.class);
+ }
+
+ DriverContext driverCxt = new DriverContext();
+ MergeTask taskExec = new MergeTask();
+ taskExec.initialize(hiveConf, null, driverCxt);
+ taskExec.setWork(mergeWork);
+ int ret = taskExec.execute(driverCxt);
+
+ if (ret != 0) {
+ System.exit(2);
+ }
+
+ }
+
+ private static void printUsage() {
+ System.err.println("MergeTask -format <rcfile or orcfile> -input <colon
seperated input paths> "
+ + "-outputDir outputDir [-jobconffile <job conf file>] ");
+ System.exit(1);
+ }
+
+ @Override
+ public StageType getType() {
+ return StageType.MAPRED;
+ }
+
+ @Override
+ public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) {
+ return false;
+ }
+
+ @Override
+ public void logPlanProgress(SessionState ss) throws IOException {
+ // no op
+ }
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeWork.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeWork.java?rev=1614793&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeWork.java
(added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeWork.java
Wed Jul 30 23:50:11 2014
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.merge;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.HiveStatsUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcFileMergeMapper;
+import org.apache.hadoop.hive.ql.io.orc.OrcFileStripeMergeInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileBlockMergeInputFormat;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileMergeMapper;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.Mapper;
+
+@Explain(displayName = "Merge Work")
+public class MergeWork extends MapWork implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient List<Path> inputPaths;
+ private transient Path outputDir;
+ private boolean hasDynamicPartitions;
+ private DynamicPartitionCtx dynPartCtx;
+ private boolean isListBucketingAlterTableConcatenate;
+ private ListBucketingCtx listBucketingCtx;
+ private Class<? extends InputFormat> srcTblInputFormat;
+
+ public MergeWork() {
+ }
+
+ public MergeWork(List<Path> inputPaths, Path outputDir,
+ Class<? extends InputFormat> srcTblInputFormat) {
+ this(inputPaths, outputDir, false, null, srcTblInputFormat);
+ }
+
+ public MergeWork(List<Path> inputPaths, Path outputDir,
+ boolean hasDynamicPartitions, DynamicPartitionCtx dynPartCtx,
+ Class<? extends InputFormat> srcTblInputFormat) {
+ super();
+ this.inputPaths = inputPaths;
+ this.outputDir = outputDir;
+ this.hasDynamicPartitions = hasDynamicPartitions;
+ this.dynPartCtx = dynPartCtx;
+ this.srcTblInputFormat = srcTblInputFormat;
+ PartitionDesc partDesc = new PartitionDesc();
+ if(srcTblInputFormat.equals(OrcInputFormat.class)) {
+ partDesc.setInputFileFormatClass(OrcFileStripeMergeInputFormat.class);
+ } else if(srcTblInputFormat.equals(RCFileInputFormat.class)) {
+ partDesc.setInputFileFormatClass(RCFileBlockMergeInputFormat.class);
+ }
+ if(this.getPathToPartitionInfo() == null) {
+ this.setPathToPartitionInfo(new LinkedHashMap<String, PartitionDesc>());
+ }
+ for(Path path: this.inputPaths) {
+ this.getPathToPartitionInfo().put(path.toString(), partDesc);
+ }
+ }
+
+ public List<Path> getInputPaths() {
+ return inputPaths;
+ }
+
+ public void setInputPaths(List<Path> inputPaths) {
+ this.inputPaths = inputPaths;
+ }
+
+ public Path getOutputDir() {
+ return outputDir;
+ }
+
+ public void setOutputDir(Path outputDir) {
+ this.outputDir = outputDir;
+ }
+
+ public Class<? extends Mapper> getMapperClass(Class<? extends InputFormat>
klass) {
+ if (klass.equals(RCFileInputFormat.class)) {
+ return RCFileMergeMapper.class;
+ } else if (klass.equals(OrcInputFormat.class)) {
+ return OrcFileMergeMapper.class;
+ }
+ return null;
+ }
+
+ @Override
+ public Long getMinSplitSize() {
+ return null;
+ }
+
+ @Override
+ public String getInputformat() {
+ return CombineHiveInputFormat.class.getName();
+ }
+
+ @Override
+ public boolean isGatheringStats() {
+ return false;
+ }
+
+ public boolean hasDynamicPartitions() {
+ return this.hasDynamicPartitions;
+ }
+
+ public void setHasDynamicPartitions(boolean hasDynamicPartitions) {
+ this.hasDynamicPartitions = hasDynamicPartitions;
+ }
+
+ @Override
+ public void resolveDynamicPartitionStoredAsSubDirsMerge(HiveConf conf, Path
path,
+ TableDesc tblDesc, ArrayList<String> aliases, PartitionDesc partDesc) {
+
+ String inputFormatClass = null;
+ if (tblDesc.getInputFileFormatClass().equals(RCFileInputFormat.class)) {
+ inputFormatClass =
conf.getVar(HiveConf.ConfVars.HIVEMERGEINPUTFORMATBLOCKLEVEL);
+ } else if (tblDesc.getInputFileFormatClass().equals(OrcInputFormat.class)){
+ inputFormatClass =
conf.getVar(HiveConf.ConfVars.HIVEMERGEINPUTFORMATSTRIPELEVEL);
+ }
+
+ try {
+ partDesc.setInputFileFormatClass((Class <? extends InputFormat>)
+ Class.forName(inputFormatClass));
+ } catch (ClassNotFoundException e) {
+ String msg = "Merge input format class not found";
+ throw new RuntimeException(msg);
+ }
+ super.resolveDynamicPartitionStoredAsSubDirsMerge(conf, path, tblDesc,
aliases, partDesc);
+
+ // Add the DP path to the list of input paths
+ inputPaths.add(path);
+ }
+
+ /**
+ * alter table ... concatenate
+ *
+ * If it is skewed table, use subdirectories in inputpaths.
+ */
+ public void resolveConcatenateMerge(HiveConf conf) {
+ isListBucketingAlterTableConcatenate = ((listBucketingCtx == null) ? false
: listBucketingCtx
+ .isSkewedStoredAsDir());
+ if (isListBucketingAlterTableConcatenate) {
+ // use sub-dir as inputpath.
+ assert ((this.inputPaths != null) && (this.inputPaths.size() == 1)) :
+ "alter table ... concatenate should only have one directory inside
inputpaths";
+ Path dirPath = inputPaths.get(0);
+ try {
+ FileSystem inpFs = dirPath.getFileSystem(conf);
+ FileStatus[] status = HiveStatsUtils.getFileStatusRecurse(dirPath,
listBucketingCtx
+ .getSkewedColNames().size(), inpFs);
+ List<Path> newInputPath = new ArrayList<Path>();
+ boolean succeed = true;
+ for (int i = 0; i < status.length; ++i) {
+ if (status[i].isDir()) {
+ // Add the lb path to the list of input paths
+ newInputPath.add(status[i].getPath());
+ } else {
+ // find file instead of dir. dont change inputpath
+ succeed = false;
+ }
+ }
+ assert (succeed || ((!succeed) && newInputPath.isEmpty())) : "This
partition has "
+ + " inconsistent file structure: "
+ + "it is stored-as-subdir and expected all files in the same depth
of subdirectories.";
+ if (succeed) {
+ inputPaths.clear();
+ inputPaths.addAll(newInputPath);
+ }
+ } catch (IOException e) {
+ String msg = "Fail to get filesystem for directory name : " +
dirPath.toUri();
+ throw new RuntimeException(msg, e);
+ }
+
+ }
+ }
+
+ public DynamicPartitionCtx getDynPartCtx() {
+ return dynPartCtx;
+ }
+
+ public void setDynPartCtx(DynamicPartitionCtx dynPartCtx) {
+ this.dynPartCtx = dynPartCtx;
+ }
+
+ /**
+ * @return the listBucketingCtx
+ */
+ public ListBucketingCtx getListBucketingCtx() {
+ return listBucketingCtx;
+ }
+
+ /**
+ * @param listBucketingCtx the listBucketingCtx to set
+ */
+ public void setListBucketingCtx(ListBucketingCtx listBucketingCtx) {
+ this.listBucketingCtx = listBucketingCtx;
+ }
+
+ /**
+ * @return the isListBucketingAlterTableConcatenate
+ */
+ public boolean isListBucketingAlterTableConcatenate() {
+ return isListBucketingAlterTableConcatenate;
+ }
+
+ public Class<? extends InputFormat> getSourceTableInputFormat() {
+ return srcTblInputFormat;
+ }
+
+ @Explain(displayName = "input format")
+ public String getStringifiedInputFormat() {
+ return srcTblInputFormat.getCanonicalName();
+ }
+
+ @Explain(displayName = "merge level")
+ public String getMergeLevel() {
+ if (srcTblInputFormat != null) {
+ if (srcTblInputFormat.equals(OrcInputFormat.class)) {
+ return "stripe";
+ } else if (srcTblInputFormat.equals(RCFileInputFormat.class)) {
+ return "block";
+ }
+ }
+ return null;
+ }
+
+ public void setSourceTableInputFormat(Class<? extends InputFormat>
srcTblInputFormat) {
+ this.srcTblInputFormat = srcTblInputFormat;
+ }
+
+}
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileKeyWrapper.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileKeyWrapper.java?rev=1614793&view=auto
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileKeyWrapper.java
(added)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileKeyWrapper.java
Wed Jul 30 23:50:11 2014
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Key for OrcFileMergeMapper task. Contains orc file related information that
+ * should match before merging two orc files.
+ */
+public class OrcFileKeyWrapper implements
WritableComparable<OrcFileKeyWrapper> {
+
+ protected Path inputPath;
+ protected CompressionKind compression;
+ protected long compressBufferSize;
+ protected List<OrcProto.Type> types;
+ protected int rowIndexStride;
+ protected List<Integer> versionList;
+
+ public List<Integer> getVersionList() {
+ return versionList;
+ }
+
+ public void setVersionList(List<Integer> versionList) {
+ this.versionList = versionList;
+ }
+
+ public int getRowIndexStride() {
+ return rowIndexStride;
+ }
+
+ public void setRowIndexStride(int rowIndexStride) {
+ this.rowIndexStride = rowIndexStride;
+ }
+
+ public long getCompressBufferSize() {
+ return compressBufferSize;
+ }
+
+ public void setCompressBufferSize(long compressBufferSize) {
+ this.compressBufferSize = compressBufferSize;
+ }
+
+ public CompressionKind getCompression() {
+ return compression;
+ }
+
+ public void setCompression(CompressionKind compression) {
+ this.compression = compression;
+ }
+
+ public List<OrcProto.Type> getTypes() {
+ return types;
+ }
+
+ public void setTypes(List<OrcProto.Type> types) {
+ this.types = types;
+ }
+
+ public Path getInputPath() {
+ return inputPath;
+ }
+
+ public void setInputPath(Path inputPath) {
+ this.inputPath = inputPath;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ throw new RuntimeException("Not supported.");
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ throw new RuntimeException("Not supported.");
+ }
+
+ @Override
+ public int compareTo(OrcFileKeyWrapper o) {
+ return inputPath.compareTo(o.inputPath);
+ }
+
+}
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileMergeMapper.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileMergeMapper.java?rev=1614793&view=auto
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileMergeMapper.java
(added)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileMergeMapper.java
Wed Jul 30 23:50:11 2014
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.merge.MergeMapper;
+import org.apache.hadoop.hive.shims.CombineHiveKey;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * Map task fast merging of ORC files.
+ */
+public class OrcFileMergeMapper extends MergeMapper implements
+ Mapper<Object, OrcFileValueWrapper, Object, Object> {
+
+ // These parameters must match for all orc files involved in merging
+ CompressionKind compression = null;
+ long compressBuffSize = 0;
+ List<Integer> version;
+ int columnCount = 0;
+ int rowIndexStride = 0;
+
+ Writer outWriter;
+ private byte[] buffer;
+ Path prevPath;
+ private Reader reader;
+ private FSDataInputStream fdis;
+ public final static Log LOG = LogFactory.getLog("OrcFileMergeMapper");
+
+ @Override
+ public void configure(JobConf job) {
+ super.configure(job);
+
+ outWriter = null;
+ buffer = null;
+ prevPath = null;
+ reader = null;
+ fdis = null;
+ }
+
+ @Override
+ public void map(Object key, OrcFileValueWrapper value,
OutputCollector<Object, Object> output,
+ Reporter reporter) throws IOException {
+ try {
+
+ OrcFileKeyWrapper k = null;
+ if (key instanceof CombineHiveKey) {
+ k = (OrcFileKeyWrapper) ((CombineHiveKey) key).getKey();
+ } else {
+ k = (OrcFileKeyWrapper) key;
+ }
+
+ fixTmpPathAlterTable(k.getInputPath().getParent());
+
+ if (prevPath == null) {
+ prevPath = k.getInputPath();
+ reader = OrcFile.createReader(fs, k.inputPath);
+ }
+
+ // store the orc configuration from the first file. All other files
should
+ // match this configuration before merging
+ if (outWriter == null) {
+ compression = k.getCompression();
+ compressBuffSize = k.getCompressBufferSize();
+ version = k.getVersionList();
+ columnCount = k.getTypes().get(0).getSubtypesCount();
+ rowIndexStride = k.getRowIndexStride();
+
+ // block size and stripe size will be from config
+ outWriter = OrcFile.createWriter(outPath,
OrcFile.writerOptions(jc).compress(compression)
+ .inspector(reader.getObjectInspector()));
+ }
+
+ // check compatibility with subsequent files
+ if ((k.getTypes().get(0).getSubtypesCount() != columnCount)) {
+ throw new IOException("ORCFileMerge failed because the input files are
not compatible."
+ + " Column counts does not match.");
+ }
+
+ if (!k.compression.equals(compression)) {
+ throw new IOException("ORCFileMerge failed because the input files are
not compatible."
+ + " Compression codec does not match.");
+ }
+
+ if (k.compressBufferSize != compressBuffSize) {
+ throw new IOException("ORCFileMerge failed because the input files are
not compatible."
+ + " Compression buffer size does not match.");
+
+ }
+
+ if (!k.versionList.equals(version)) {
+ throw new IOException("ORCFileMerge failed because the input files are
not compatible."
+ + " Version does not match.");
+ }
+
+ if (k.rowIndexStride != rowIndexStride) {
+ throw new IOException("ORCFileMerge failed because the input files are
not compatible."
+ + " Row index stride does not match.");
+ }
+
+ // next file in the path
+ if (!k.getInputPath().equals(prevPath)) {
+ reader = OrcFile.createReader(fs, k.inputPath);
+ }
+
+ // initialize buffer to read the entire stripe
+ buffer = new byte[(int) value.stripeInformation.getLength()];
+ fdis = fs.open(k.inputPath);
+ fdis.readFully(value.stripeInformation.getOffset(), buffer, 0,
+ (int) value.stripeInformation.getLength());
+
+ // append the stripe buffer to the new ORC file
+ ((WriterImpl) outWriter).appendStripe(buffer,
value.getStripeInformation(),
+ value.getStripeStatistics());
+
+ LOG.info("Merged stripe from file " + k.inputPath + " [ offset : "
+ + value.getStripeInformation().getOffset() + " length: "
+ + value.getStripeInformation().getLength() + " ]");
+
+ // add user metadata to footer in case of any
+ if (value.isLastStripeInFile()) {
+ ((WriterImpl) outWriter).appendUserMetadata(value.getUserMetadata());
+ }
+ } catch (Throwable e) {
+ this.exception = true;
+ close();
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ // close writer
+ if (outWriter == null) {
+ return;
+ }
+
+ if (fdis != null) {
+ fdis.close();
+ fdis = null;
+ }
+
+ outWriter.close();
+ outWriter = null;
+
+ super.close();
+ }
+}
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeInputFormat.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeInputFormat.java?rev=1614793&view=auto
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeInputFormat.java
(added)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeInputFormat.java
Wed Jul 30 23:50:11 2014
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.io.merge.MergeInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+public class OrcFileStripeMergeInputFormat extends MergeInputFormat {
+
+ @Override
+ public RecordReader<OrcFileKeyWrapper, OrcFileValueWrapper> getRecordReader(
+ InputSplit split, JobConf job, Reporter reporter) throws IOException {
+
+ reporter.setStatus(split.toString());
+ return new OrcFileStripeMergeRecordReader(job, (FileSplit) split);
+ }
+
+}
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeRecordReader.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeRecordReader.java?rev=1614793&view=auto
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeRecordReader.java
(added)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeRecordReader.java
Wed Jul 30 23:50:11 2014
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+
+public class OrcFileStripeMergeRecordReader implements
+ RecordReader<OrcFileKeyWrapper, OrcFileValueWrapper> {
+
+ private final Reader reader;
+ private final Path path;
+ protected Iterator<StripeInformation> iter;
+ protected List<OrcProto.StripeStatistics> stripeStatistics;
+ private int stripeIdx;
+ private long start;
+ private long end;
+
+ public OrcFileStripeMergeRecordReader(Configuration conf, FileSplit split)
throws IOException {
+ path = split.getPath();
+ start = split.getStart();
+ end = start + split.getLength();
+ FileSystem fs = path.getFileSystem(conf);
+ this.reader = OrcFile.createReader(path,
OrcFile.readerOptions(conf).filesystem(fs));
+ this.iter = reader.getStripes().iterator();
+ this.stripeIdx = 0;
+ this.stripeStatistics = ((ReaderImpl)
reader).getOrcProtoStripeStatistics();
+ }
+
+ public Class<?> getKeyClass() {
+ return OrcFileKeyWrapper.class;
+ }
+
+ public Class<?> getValueClass() {
+ return OrcFileValueWrapper.class;
+ }
+
+ public OrcFileKeyWrapper createKey() {
+ return new OrcFileKeyWrapper();
+ }
+
+ public OrcFileValueWrapper createValue() {
+ return new OrcFileValueWrapper();
+ }
+
+ @Override
+ public boolean next(OrcFileKeyWrapper key, OrcFileValueWrapper value) throws
IOException {
+ return nextStripe(key, value);
+ }
+
+ protected boolean nextStripe(OrcFileKeyWrapper keyWrapper,
OrcFileValueWrapper valueWrapper)
+ throws IOException {
+ while (iter.hasNext()) {
+ StripeInformation si = iter.next();
+
+ // if stripe offset is outside the split boundary then ignore the current
+ // stripe as it will be handled by some other mapper.
+ if (si.getOffset() >= start && si.getOffset() < end) {
+ valueWrapper.setStripeStatistics(stripeStatistics.get(stripeIdx++));
+ valueWrapper.setStripeInformation(si);
+ if (!iter.hasNext()) {
+ valueWrapper.setLastStripeInFile(true);
+ valueWrapper.setUserMetadata(((ReaderImpl)
reader).getOrcProtoUserMetadata());
+ }
+ keyWrapper.setInputPath(path);
+ keyWrapper.setCompression(reader.getCompression());
+ keyWrapper.setCompressBufferSize(reader.getCompressionSize());
+ keyWrapper.setVersionList(((ReaderImpl)
reader).getFileMetaInfo().versionList);
+ keyWrapper.setRowIndexStride(reader.getRowIndexStride());
+ keyWrapper.setTypes(reader.getTypes());
+ } else {
+ continue;
+ }
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Default progress will be based on number of files processed.
+ * @return 0.0 to 1.0 of the input byte range
+ */
+ public float getProgress() throws IOException {
+ return 0.0f;
+ }
+
+ public long getPos() throws IOException {
+ return 0;
+ }
+
+ protected void seek(long pos) throws IOException {
+ }
+
+ public long getStart() {
+ return 0;
+ }
+
+ public void close() throws IOException {
+ }
+
+}
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileValueWrapper.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileValueWrapper.java?rev=1614793&view=auto
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileValueWrapper.java
(added)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileValueWrapper.java
Wed Jul 30 23:50:11 2014
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Value for OrcFileMergeMapper. Contains stripe related information for the
+ * current orc file that is being merged.
+ */
+public class OrcFileValueWrapper implements
WritableComparable<OrcFileValueWrapper> {
+
+ protected StripeInformation stripeInformation;
+ protected OrcProto.StripeStatistics stripeStatistics;
+ protected List<OrcProto.UserMetadataItem> userMetadata;
+ protected boolean lastStripeInFile;
+
+ public List<OrcProto.UserMetadataItem> getUserMetadata() {
+ return userMetadata;
+ }
+
+ public void setUserMetadata(List<OrcProto.UserMetadataItem> userMetadata) {
+ this.userMetadata = userMetadata;
+ }
+
+ public boolean isLastStripeInFile() {
+ return lastStripeInFile;
+ }
+
+ public void setLastStripeInFile(boolean lastStripeInFile) {
+ this.lastStripeInFile = lastStripeInFile;
+ }
+
+ public OrcProto.StripeStatistics getStripeStatistics() {
+ return stripeStatistics;
+ }
+
+ public void setStripeStatistics(OrcProto.StripeStatistics stripeStatistics) {
+ this.stripeStatistics = stripeStatistics;
+ }
+
+ public StripeInformation getStripeInformation() {
+ return stripeInformation;
+ }
+
+ public void setStripeInformation(StripeInformation stripeInformation) {
+ this.stripeInformation = stripeInformation;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ throw new RuntimeException("Not supported.");
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ throw new RuntimeException("Not supported.");
+ }
+
+ @Override
+ public int compareTo(OrcFileValueWrapper o) {
+ if (stripeInformation.getOffset() < o.getStripeInformation().getOffset()) {
+ return -1;
+ } else if (stripeInformation.getOffset() >
o.getStripeInformation().getOffset()) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+
+}
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java?rev=1614793&r1=1614792&r2=1614793&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java Wed
Jul 30 23:50:11 2014
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FSDataInputS
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.UserMetadataItem;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -60,6 +61,7 @@ final class ReaderImpl implements Reader
private final ObjectInspector inspector;
private long deserializedSize = -1;
private final Configuration conf;
+ private final List<Integer> versionList;
//serialized footer - Keeping this around for use by getFileMetaInfo()
// will help avoid cpu cycles spend in deserializing at cost of increased
@@ -306,6 +308,7 @@ final class ReaderImpl implements Reader
this.metadata = rInfo.metadata;
this.footer = rInfo.footer;
this.inspector = rInfo.inspector;
+ this.versionList = footerMetaData.versionList;
}
@@ -387,7 +390,8 @@ final class ReaderImpl implements Reader
ps.getCompression().toString(),
(int) ps.getCompressionBlockSize(),
(int) ps.getMetadataLength(),
- buffer
+ buffer,
+ ps.getVersionList()
);
}
@@ -446,18 +450,26 @@ final class ReaderImpl implements Reader
final int bufferSize;
final int metadataSize;
final ByteBuffer footerBuffer;
+ final List<Integer> versionList;
+
+ FileMetaInfo(String compressionType, int bufferSize, int metadataSize,
+ ByteBuffer footerBuffer) {
+ this(compressionType, bufferSize, metadataSize, footerBuffer, null);
+ }
+
FileMetaInfo(String compressionType, int bufferSize, int metadataSize,
- ByteBuffer footerBuffer){
+ ByteBuffer footerBuffer, List<Integer> versionList){
this.compressionType = compressionType;
this.bufferSize = bufferSize;
this.metadataSize = metadataSize;
this.footerBuffer = footerBuffer;
+ this.versionList = versionList;
}
}
public FileMetaInfo getFileMetaInfo(){
return new FileMetaInfo(compressionKind.toString(), bufferSize,
- metadataSize, footerByteBuffer);
+ metadataSize, footerByteBuffer, versionList);
}
@@ -629,4 +641,11 @@ final class ReaderImpl implements Reader
return new Metadata(metadata);
}
+ List<OrcProto.StripeStatistics> getOrcProtoStripeStatistics() {
+ return metadata.getStripeStatsList();
+ }
+
+ public List<UserMetadataItem> getOrcProtoUserMetadata() {
+ return footer.getMetadataList();
+ }
}
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1614793&r1=1614792&r2=1614793&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Wed
Jul 30 23:50:11 2014
@@ -28,8 +28,6 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-import com.google.common.annotations.VisibleForTesting;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -43,8 +41,8 @@ import org.apache.hadoop.hive.ql.io.orc.
import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.StripeStatistics;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.UserMetadataItem;
import org.apache.hadoop.hive.ql.util.JavaDataModel;
-import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
@@ -73,6 +71,7 @@ import org.apache.hadoop.hive.serde2.typ
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedOutputStream;
@@ -2218,4 +2217,78 @@ class WriterImpl implements Writer, Memo
}
return rawWriter.getPos();
}
+
+ void appendStripe(byte[] stripe, StripeInformation stripeInfo,
+ OrcProto.StripeStatistics stripeStatistics) throws IOException {
+ appendStripe(stripe, 0, stripe.length, stripeInfo, stripeStatistics);
+ }
+
+ void appendStripe(byte[] stripe, int offset, int length,
+ StripeInformation stripeInfo,
+ OrcProto.StripeStatistics stripeStatistics) throws IOException {
+ getStream();
+ long start = rawWriter.getPos();
+
+ long stripeLen = length;
+ long availBlockSpace = blockSize - (start % blockSize);
+
+ // see if stripe can fit in the current hdfs block, else pad the remaining
+ // space in the block
+ if (stripeLen < blockSize && stripeLen > availBlockSpace &&
+ addBlockPadding) {
+ byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)];
+ LOG.info(String.format("Padding ORC by %d bytes while merging..",
+ availBlockSpace));
+ start += availBlockSpace;
+ while (availBlockSpace > 0) {
+ int writeLen = (int) Math.min(availBlockSpace, pad.length);
+ rawWriter.write(pad, 0, writeLen);
+ availBlockSpace -= writeLen;
+ }
+ }
+
+ rawWriter.write(stripe);
+ rowsInStripe = stripeStatistics.getColStats(0).getNumberOfValues();
+ rowCount += rowsInStripe;
+
+ // since we have already written the stripe, just update stripe statistics
+ treeWriter.stripeStatsBuilders.add(stripeStatistics.toBuilder());
+
+ // update file level statistics
+ updateFileStatistics(stripeStatistics);
+
+ // update stripe information
+ OrcProto.StripeInformation dirEntry = OrcProto.StripeInformation
+ .newBuilder()
+ .setOffset(start)
+ .setNumberOfRows(rowsInStripe)
+ .setIndexLength(stripeInfo.getIndexLength())
+ .setDataLength(stripeInfo.getDataLength())
+ .setFooterLength(stripeInfo.getFooterLength())
+ .build();
+ stripes.add(dirEntry);
+
+ // reset it after writing the stripe
+ rowsInStripe = 0;
+ }
+
+ private void updateFileStatistics(OrcProto.StripeStatistics
stripeStatistics) {
+ List<OrcProto.ColumnStatistics> cs = stripeStatistics.getColStatsList();
+
+ // root element
+
treeWriter.fileStatistics.merge(ColumnStatisticsImpl.deserialize(cs.get(0)));
+ TreeWriter[] childWriters = treeWriter.getChildrenWriters();
+ for (int i = 0; i < childWriters.length; i++) {
+ childWriters[i].fileStatistics.merge(
+ ColumnStatisticsImpl.deserialize(cs.get(i + 1)));
+ }
+ }
+
+ void appendUserMetadata(List<UserMetadataItem> userMetadata) {
+ if (userMetadata != null) {
+ for (UserMetadataItem item : userMetadata) {
+ this.userMetadata.put(item.getName(), item.getValue());
+ }
+ }
+ }
}
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java?rev=1614793&r1=1614792&r2=1614793&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java
Wed Jul 30 23:50:11 2014
@@ -19,22 +19,22 @@
package org.apache.hadoop.hive.ql.io.rcfile.merge;
import java.io.IOException;
-import org.apache.hadoop.mapred.FileInputFormat;
+
+import org.apache.hadoop.hive.ql.io.merge.MergeInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
-@SuppressWarnings({ "deprecation", "unchecked" })
-public class RCFileBlockMergeInputFormat extends FileInputFormat {
+public class RCFileBlockMergeInputFormat extends MergeInputFormat {
@Override
- public RecordReader getRecordReader(InputSplit split, JobConf job,
- Reporter reporter) throws IOException {
+ public RecordReader<RCFileKeyBufferWrapper, RCFileValueBufferWrapper>
+ getRecordReader(InputSplit split, JobConf job, Reporter reporter)
+ throws IOException {
reporter.setStatus(split.toString());
-
return new RCFileBlockMergeRecordReader(job, (FileSplit) split);
}