Author: prasanthj
Date: Wed Jul 30 23:50:11 2014
New Revision: 1614793

URL: http://svn.apache.org/r1614793
Log:
HIVE-7509: Fast stripe level merging for ORC (Prasanth J, reviewed by Gunther 
Hagleitner)

Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeInputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeMapper.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeOutputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeWork.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileKeyWrapper.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileMergeMapper.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeInputFormat.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeRecordReader.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileValueWrapper.java
    hive/trunk/ql/src/test/queries/clientnegative/orc_merge1.q
    hive/trunk/ql/src/test/queries/clientnegative/orc_merge2.q
    hive/trunk/ql/src/test/queries/clientnegative/orc_merge3.q
    hive/trunk/ql/src/test/queries/clientnegative/orc_merge4.q
    hive/trunk/ql/src/test/queries/clientnegative/orc_merge5.q
    hive/trunk/ql/src/test/queries/clientpositive/alter_merge_2_orc.q
    hive/trunk/ql/src/test/queries/clientpositive/alter_merge_orc.q
    hive/trunk/ql/src/test/queries/clientpositive/alter_merge_stats_orc.q
    hive/trunk/ql/src/test/queries/clientpositive/orc_merge1.q
    hive/trunk/ql/src/test/queries/clientpositive/orc_merge2.q
    hive/trunk/ql/src/test/queries/clientpositive/orc_merge3.q
    hive/trunk/ql/src/test/queries/clientpositive/orc_merge4.q
    hive/trunk/ql/src/test/results/clientnegative/orc_merge1.q.out
    hive/trunk/ql/src/test/results/clientnegative/orc_merge2.q.out
    hive/trunk/ql/src/test/results/clientnegative/orc_merge3.q.out
    hive/trunk/ql/src/test/results/clientnegative/orc_merge4.q.out
    hive/trunk/ql/src/test/results/clientnegative/orc_merge5.q.out
    hive/trunk/ql/src/test/results/clientpositive/alter_merge_2_orc.q.out
    hive/trunk/ql/src/test/results/clientpositive/alter_merge_orc.q.out
    hive/trunk/ql/src/test/results/clientpositive/alter_merge_stats_orc.q.out
    hive/trunk/ql/src/test/results/clientpositive/orc_merge1.q.out
    hive/trunk/ql/src/test/results/clientpositive/orc_merge2.q.out
    hive/trunk/ql/src/test/results/clientpositive/orc_merge3.q.out
    hive/trunk/ql/src/test/results/clientpositive/orc_merge4.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/alter_merge_2_orc.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/alter_merge_orc.q.out
    
hive/trunk/ql/src/test/results/clientpositive/tez/alter_merge_stats_orc.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/orc_merge1.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/orc_merge2.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/orc_merge3.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/orc_merge4.q.out
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/itests/qtest/testconfiguration.properties
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTablePartMergeFilesDesc.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
    
hive/trunk/ql/src/test/results/clientpositive/infer_bucket_sort_dyn_part.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_10.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_4.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_6.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_7.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_9.q.out
    hive/trunk/ql/src/test/results/clientpositive/merge_dynamic_partition4.q.out
    hive/trunk/ql/src/test/results/clientpositive/merge_dynamic_partition5.q.out
    hive/trunk/ql/src/test/results/clientpositive/orc_createas1.q.out
    hive/trunk/ql/src/test/results/clientpositive/rcfile_createas1.q.out
    hive/trunk/ql/src/test/results/clientpositive/rcfile_merge1.q.out
    hive/trunk/ql/src/test/results/clientpositive/rcfile_merge2.q.out
    hive/trunk/ql/src/test/results/clientpositive/rcfile_merge3.q.out
    hive/trunk/ql/src/test/results/clientpositive/union_remove_10.q.out
    hive/trunk/ql/src/test/results/clientpositive/union_remove_11.q.out
    hive/trunk/ql/src/test/results/clientpositive/union_remove_12.q.out
    hive/trunk/ql/src/test/results/clientpositive/union_remove_13.q.out
    hive/trunk/ql/src/test/results/clientpositive/union_remove_14.q.out
    hive/trunk/ql/src/test/results/clientpositive/union_remove_16.q.out
    hive/trunk/ql/src/test/results/clientpositive/union_remove_9.q.out

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1614793&r1=1614792&r2=1614793&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
(original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed 
Jul 30 23:50:11 2014
@@ -783,6 +783,14 @@ public class HiveConf extends Configurat
     HIVEMERGERCFILEBLOCKLEVEL("hive.merge.rcfile.block.level", true, ""),
     HIVEMERGEINPUTFORMATBLOCKLEVEL("hive.merge.input.format.block.level",
         
"org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileBlockMergeInputFormat", ""),
+    HIVEMERGEORCFILESTRIPELEVEL("hive.merge.orcfile.stripe.level", true,
+        "When hive.merge.mapfiles or hive.merge.mapredfiles is enabled while 
writing a\n" +
+        " table with ORC file format, enabling this config will do stripe 
level fast merge\n" +
+        " for small ORC files. Note that enabling this config will not honor 
padding tolerance\n" +
+        " config (hive.exec.orc.block.padding.tolerance)."),
+    HIVEMERGEINPUTFORMATSTRIPELEVEL("hive.merge.input.format.stripe.level",
+        "org.apache.hadoop.hive.ql.io.orc.OrcFileStripeMergeInputFormat", 
+       "Input file format to use for ORC stripe level merging (for internal 
use only)"),
     HIVEMERGECURRENTJOBHASDYNAMICPARTITIONS(
         "hive.merge.current.job.has.dynamic.partitions", false, ""),
 

Modified: hive/trunk/itests/qtest/testconfiguration.properties
URL: 
http://svn.apache.org/viewvc/hive/trunk/itests/qtest/testconfiguration.properties?rev=1614793&r1=1614792&r2=1614793&view=diff
==============================================================================
--- hive/trunk/itests/qtest/testconfiguration.properties (original)
+++ hive/trunk/itests/qtest/testconfiguration.properties Wed Jul 30 23:50:11 
2014
@@ -1,5 +1,5 @@
 
minimr.query.files=stats_counter_partitioned.q,list_bucket_dml_10.q,input16_cc.q,scriptfile1.q,scriptfile1_win.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q,bucketmapjoin7.q,optrstat_groupby.q,bucket_num_reducers.q,bucket5.q,load_fs2.q,bucket_num_reducers2.q,infer_bucket_sort_merge.q,infer_bucket_sort_reducers_power_two.q,infer_bucket_sort_dyn_part.q,infer_bucket_sort_bucketed_table.q,infer_bucket_sort_map_operators.q,infer_bucket_sort_num_buckets.q,leftsemijoin_mr.q,schemeAuthority.q,schemeAuthority2.q,truncate_column_buckets.q,remote_script.q,,load_hdfs_file_with_space_in_the_name.q,parallel_orderby.q,import_exported_table.q,stats_counter.q,auto_sortmerge_join_16.q,quotedid_smb.q,file_with_header_footer.q,external_table_with_space_in_location_path.q,root_dir_external_table.q,index_bitmap3.q,ql_rewrite_gbtoidx.q,index_bitmap_auto.q,udf_using.q,empty_dir_in_table.q,temp_table_external.q
 
minimr.query.negative.files=cluster_tasklog_retrieval.q,minimr_broken_pipe.q,mapreduce_stack_trace.q,mapreduce_stack_trace_turnoff.q,mapreduce_stack_trace_hadoop20.q,mapreduce_stack_trace_turnoff_hadoop20.q,file_with_header_footer_negative.q,udf_local_resource.q
 
minitez.query.files=tez_fsstat.q,mapjoin_decimal.q,tez_join_tests.q,tez_joins_explain.q,mrr.q,tez_dml.q,tez_insert_overwrite_local_directory_1.q,tez_union.q,bucket_map_join_tez1.q,bucket_map_join_tez2.q,tez_schema_evolution.q,tez_join_hash.q
-minitez.query.files.shared=cross_product_check_1.q,cross_product_check_2.q,dynpart_sort_opt_vectorization.q,dynpart_sort_optimization.q,orc_analyze.q,join0.q,join1.q,auto_join0.q,auto_join1.q,bucket2.q,bucket3.q,bucket4.q,count.q,create_merge_compressed.q,cross_join.q,ctas.q,custom_input_output_format.q,disable_merge_for_bucketing.q,enforce_order.q,filter_join_breaktask.q,filter_join_breaktask2.q,groupby1.q,groupby2.q,groupby3.q,having.q,insert1.q,insert_into1.q,insert_into2.q,leftsemijoin.q,limit_pushdown.q,load_dyn_part1.q,load_dyn_part2.q,load_dyn_part3.q,mapjoin_mapjoin.q,mapreduce1.q,mapreduce2.q,merge1.q,merge2.q,metadata_only_queries.q,sample1.q,subquery_in.q,subquery_exists.q,vectorization_15.q,ptf.q,stats_counter.q,stats_noscan_1.q,stats_counter_partitioned.q,union2.q,union3.q,union4.q,union5.q,union6.q,union7.q,union8.q,union9.q,transform1.q,transform2.q,transform_ppr1.q,transform_ppr2.q,script_env_var1.q,script_env_var2.q,script_pipe.q,scriptfile1.q,metadataonly1.q,temp_t
 
able.q,vectorized_ptf.q,optimize_nullscan.q,vector_cast_constant.q,vector_string_concat.q
+minitez.query.files.shared=orc_merge1.q,orc_merge2.q,orc_merge3.q,orc_merge4.q,alter_merge_orc.q,alter_merge_2_orc.q,alter_merge_stats_orc.q,cross_product_check_1.q,cross_product_check_2.q,dynpart_sort_opt_vectorization.q,dynpart_sort_optimization.q,orc_analyze.q,join0.q,join1.q,auto_join0.q,auto_join1.q,bucket2.q,bucket3.q,bucket4.q,count.q,create_merge_compressed.q,cross_join.q,ctas.q,custom_input_output_format.q,disable_merge_for_bucketing.q,enforce_order.q,filter_join_breaktask.q,filter_join_breaktask2.q,groupby1.q,groupby2.q,groupby3.q,having.q,insert1.q,insert_into1.q,insert_into2.q,leftsemijoin.q,limit_pushdown.q,load_dyn_part1.q,load_dyn_part2.q,load_dyn_part3.q,mapjoin_mapjoin.q,mapreduce1.q,mapreduce2.q,merge1.q,merge2.q,metadata_only_queries.q,sample1.q,subquery_in.q,subquery_exists.q,vectorization_15.q,ptf.q,stats_counter.q,stats_noscan_1.q,stats_counter_partitioned.q,union2.q,union3.q,union4.q,union5.q,union6.q,union7.q,union8.q,union9.q,transform1.q,transform2.q,transf
 
orm_ppr1.q,transform_ppr2.q,script_env_var1.q,script_env_var2.q,script_pipe.q,scriptfile1.q,metadataonly1.q,temp_table.q,vectorized_ptf.q,optimize_nullscan.q,vector_cast_constant.q,vector_string_concat.q
 
beeline.positive.exclude=add_part_exist.q,alter1.q,alter2.q,alter4.q,alter5.q,alter_rename_partition.q,alter_rename_partition_authorization.q,archive.q,archive_corrupt.q,archive_multi.q,archive_mr_1806.q,archive_multi_mr_1806.q,authorization_1.q,authorization_2.q,authorization_4.q,authorization_5.q,authorization_6.q,authorization_7.q,ba_table1.q,ba_table2.q,ba_table3.q,ba_table_udfs.q,binary_table_bincolserde.q,binary_table_colserde.q,cluster.q,columnarserde_create_shortcut.q,combine2.q,constant_prop.q,create_nested_type.q,create_or_replace_view.q,create_struct_table.q,create_union_table.q,database.q,database_location.q,database_properties.q,ddltime.q,describe_database_json.q,drop_database_removes_partition_dirs.q,escape1.q,escape2.q,exim_00_nonpart_empty.q,exim_01_nonpart.q,exim_02_00_part_empty.q,exim_02_part.q,exim_03_nonpart_over_compat.q,exim_04_all_part.q,exim_04_evolved_parts.q,exim_05_some_part.q,exim_06_one_part.q,exim_07_all_part_over_nonoverlap.q,exim_08_nonpart_rename.q,
 
exim_09_part_spec_nonoverlap.q,exim_10_external_managed.q,exim_11_managed_external.q,exim_12_external_location.q,exim_13_managed_location.q,exim_14_managed_location_over_existing.q,exim_15_external_part.q,exim_16_part_external.q,exim_17_part_managed.q,exim_18_part_external.q,exim_19_00_part_external_location.q,exim_19_part_external_location.q,exim_20_part_managed_location.q,exim_21_export_authsuccess.q,exim_22_import_exist_authsuccess.q,exim_23_import_part_authsuccess.q,exim_24_import_nonexist_authsuccess.q,global_limit.q,groupby_complex_types.q,groupby_complex_types_multi_single_reducer.q,index_auth.q,index_auto.q,index_auto_empty.q,index_bitmap.q,index_bitmap1.q,index_bitmap2.q,index_bitmap3.q,index_bitmap_auto.q,index_bitmap_rc.q,index_compact.q,index_compact_1.q,index_compact_2.q,index_compact_3.q,index_stale_partitioned.q,init_file.q,input16.q,input16_cc.q,input46.q,input_columnarserde.q,input_dynamicserde.q,input_lazyserde.q,input_testxpath3.q,input_testxpath4.q,insert2_overwr
 
ite_partitions.q,insertexternal1.q,join_thrift.q,lateral_view.q,load_binary_data.q,load_exist_part_authsuccess.q,load_nonpart_authsuccess.q,load_part_authsuccess.q,loadpart_err.q,lock1.q,lock2.q,lock3.q,lock4.q,merge_dynamic_partition.q,multi_insert.q,multi_insert_move_tasks_share_dependencies.q,null_column.q,ppd_clusterby.q,query_with_semi.q,rename_column.q,sample6.q,sample_islocalmode_hook.q,set_processor_namespaces.q,show_tables.q,source.q,split_sample.q,str_to_map.q,transform1.q,udaf_collect_set.q,udaf_context_ngrams.q,udaf_histogram_numeric.q,udaf_ngrams.q,udaf_percentile_approx.q,udf_array.q,udf_bitmap_and.q,udf_bitmap_or.q,udf_explode.q,udf_format_number.q,udf_map.q,udf_map_keys.q,udf_map_values.q,udf_max.q,udf_min.q,udf_named_struct.q,udf_percentile.q,udf_printf.q,udf_sentences.q,udf_sort_array.q,udf_split.q,udf_struct.q,udf_substr.q,udf_translate.q,udf_union.q,udf_xpath.q,udtf_stack.q,view.q,virtual_column.q

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=1614793&r1=1614792&r2=1614793&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java 
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Wed Jul 
30 23:50:11 2014
@@ -88,9 +88,9 @@ import org.apache.hadoop.hive.ql.QueryPl
 import org.apache.hadoop.hive.ql.exec.ArchiveUtils.PartSpecInfo;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.io.merge.MergeTask;
+import org.apache.hadoop.hive.ql.io.merge.MergeWork;
 import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
-import org.apache.hadoop.hive.ql.io.rcfile.merge.BlockMergeTask;
-import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
 import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateTask;
 import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateWork;
 import org.apache.hadoop.hive.ql.lockmgr.DbLockManager;
@@ -550,12 +550,13 @@ public class DDLTask extends Task<DDLWor
       throws HiveException {
     // merge work only needs input and output.
     MergeWork mergeWork = new MergeWork(mergeFilesDesc.getInputDir(),
-        mergeFilesDesc.getOutputDir());
+        mergeFilesDesc.getOutputDir(), mergeFilesDesc.getInputFormatClass());
     mergeWork.setListBucketingCtx(mergeFilesDesc.getLbCtx());
     mergeWork.resolveConcatenateMerge(db.getConf());
     mergeWork.setMapperCannotSpanPartns(true);
+    mergeWork.setSourceTableInputFormat(mergeFilesDesc.getInputFormatClass());
     DriverContext driverCxt = new DriverContext();
-    BlockMergeTask taskExec = new BlockMergeTask();
+    MergeTask taskExec = new MergeTask();
     taskExec.initialize(db.getConf(), null, driverCxt);
     taskExec.setWork(mergeWork);
     taskExec.setQueryPlan(this.getQueryPlan());

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=1614793&r1=1614792&r2=1614793&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java 
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Wed Jul 
30 23:50:11 2014
@@ -36,7 +36,7 @@ import org.apache.hadoop.hive.ql.exec.mr
 import org.apache.hadoop.hive.ql.hooks.LineageInfo.DataContainer;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
-import org.apache.hadoop.hive.ql.io.rcfile.merge.BlockMergeTask;
+import org.apache.hadoop.hive.ql.io.merge.MergeTask;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj;
@@ -294,7 +294,7 @@ public class MoveTask extends Task<MoveW
           while (task.getParentTasks() != null && task.getParentTasks().size() 
== 1) {
             task = (Task)task.getParentTasks().get(0);
             // If it was a merge task or a local map reduce task, nothing can 
be inferred
-            if (task instanceof BlockMergeTask || task instanceof 
MapredLocalTask) {
+            if (task instanceof MergeTask || task instanceof MapredLocalTask) {
               break;
             }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java?rev=1614793&r1=1614792&r2=1614793&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java 
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java Wed 
Jul 30 23:50:11 2014
@@ -28,8 +28,8 @@ import org.apache.hadoop.hive.ql.exec.mr
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.index.IndexMetadataChangeTask;
 import org.apache.hadoop.hive.ql.index.IndexMetadataChangeWork;
-import org.apache.hadoop.hive.ql.io.rcfile.merge.BlockMergeTask;
-import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
+import org.apache.hadoop.hive.ql.io.merge.MergeTask;
+import org.apache.hadoop.hive.ql.io.merge.MergeWork;
 import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanTask;
 import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork;
 import org.apache.hadoop.hive.ql.plan.ColumnStatsWork;
@@ -93,7 +93,7 @@ public final class TaskFactory {
     taskvec.add(new TaskTuple<StatsNoJobWork>(StatsNoJobWork.class, 
StatsNoJobTask.class));
     taskvec.add(new TaskTuple<ColumnStatsWork>(ColumnStatsWork.class, 
ColumnStatsTask.class));
     taskvec.add(new TaskTuple<MergeWork>(MergeWork.class,
-        BlockMergeTask.class));
+        MergeTask.class));
     taskvec.add(new 
TaskTuple<DependencyCollectionWork>(DependencyCollectionWork.class,
         DependencyCollectionTask.class));
     taskvec.add(new TaskTuple<PartialScanWork>(PartialScanWork.class,

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1614793&r1=1614792&r2=1614793&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java 
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Wed 
Jul 30 23:50:11 2014
@@ -120,7 +120,8 @@ import org.apache.hadoop.hive.ql.io.Hive
 import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat;
 import org.apache.hadoop.hive.ql.io.RCFile;
 import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat;
-import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
+import org.apache.hadoop.hive.ql.io.merge.MergeWork;
+import org.apache.hadoop.hive.ql.io.orc.OrcFileMergeMapper;
 import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileMergeMapper;
 import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanMapper;
 import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork;
@@ -346,7 +347,8 @@ public final class Utilities {
         if(MAP_PLAN_NAME.equals(name)){
           if 
(ExecMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))){
             gWork = deserializePlan(in, MapWork.class, conf);
-          } else 
if(RCFileMergeMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {
+          } else 
if(RCFileMergeMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS)) ||
+              
OrcFileMergeMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {
             gWork = deserializePlan(in, MergeWork.class, conf);
           } else 
if(ColumnTruncateMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {
             gWork = deserializePlan(in, ColumnTruncateWork.class, conf);

Added: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeInputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeInputFormat.java?rev=1614793&view=auto
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeInputFormat.java 
(added)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeInputFormat.java 
Wed Jul 30 23:50:11 2014
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.merge;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+public abstract class MergeInputFormat extends FileInputFormat {
+
+  @Override
+  public abstract RecordReader getRecordReader(InputSplit split, JobConf job,
+      Reporter reporter) throws IOException;
+
+}

Added: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeMapper.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeMapper.java?rev=1614793&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeMapper.java 
(added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeMapper.java 
Wed Jul 30 23:50:11 2014
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.merge;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+
+public class MergeMapper extends MapReduceBase {
+  protected JobConf jc;
+  protected Path finalPath;
+  protected FileSystem fs;
+  protected boolean exception = false;
+  protected boolean autoDelete = false;
+  protected Path outPath;
+  protected boolean hasDynamicPartitions = false;
+  protected boolean isListBucketingDML = false;
+  protected boolean isListBucketingAlterTableConcatenate = false;
+  //used as depth for dir-calculation and if it is list bucketing case.
+  protected int listBucketingDepth;
+  protected boolean tmpPathFixedConcatenate = false;
+  protected boolean tmpPathFixed = false;
+  protected Path tmpPath;
+  protected Path taskTmpPath;
+  protected Path dpPath;
+
+  public final static Log LOG = LogFactory.getLog("MergeMapper");
+
+  @Override
+  public void configure(JobConf job) {
+    jc = job;
+    hasDynamicPartitions = HiveConf.getBoolVar(job,
+        HiveConf.ConfVars.HIVEMERGECURRENTJOBHASDYNAMICPARTITIONS);
+    isListBucketingAlterTableConcatenate = HiveConf.getBoolVar(job,
+        HiveConf.ConfVars.HIVEMERGECURRENTJOBCONCATENATELISTBUCKETING);
+    listBucketingDepth = HiveConf.getIntVar(job,
+        HiveConf.ConfVars.HIVEMERGECURRENTJOBCONCATENATELISTBUCKETINGDEPTH);
+
+    Path specPath = MergeOutputFormat.getMergeOutputPath(job);
+    Path tmpPath = Utilities.toTempPath(specPath);
+    Path taskTmpPath = Utilities.toTaskTempPath(specPath);
+    updatePaths(tmpPath, taskTmpPath);
+    try {
+      fs = specPath.getFileSystem(job);
+      autoDelete = fs.deleteOnExit(outPath);
+    } catch (IOException e) {
+      this.exception = true;
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void updatePaths(Path tmpPath, Path taskTmpPath) {
+    String taskId = Utilities.getTaskId(jc);
+    this.tmpPath = tmpPath;
+    this.taskTmpPath = taskTmpPath;
+    finalPath = new Path(tmpPath, taskId);
+    outPath = new Path(taskTmpPath, Utilities.toTempPath(taskId));
+  }
+
+
+  /**
+   * Validates that each input path belongs to the same partition since each 
mapper merges the input
+   * to a single output directory
+   * @param inputPath
+   * @throws HiveException
+   */
+  protected void checkPartitionsMatch(Path inputPath) throws HiveException {
+    if (!dpPath.equals(inputPath)) {
+      // Temp partition input path does not match exist temp path
+      String msg = "Multiple partitions for one block merge mapper: " + dpPath 
+ " NOT EQUAL TO "
+          + inputPath;
+      LOG.error(msg);
+      throw new HiveException(msg);
+    }
+  }
+
+  /**
+   * Fixes tmpPath to point to the correct partition. Before this is called, 
tmpPath will default to
+   * the root tmp table dir fixTmpPath(..) works for DP + LB + multiple skewed 
values + merge.
+   * reason: 1. fixTmpPath(..) compares inputPath and tmpDepth, find out path 
difference and put it
+   * into newPath. Then add newpath to existing this.tmpPath and 
this.taskTmpPath. 2. The path
+   * difference between inputPath and tmpDepth can be DP or DP+LB. It will 
automatically handle it.
+   * 3. For example, if inputpath is 
<prefix>/-ext-10002/hr=a1/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/
+   * HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME tmppath is <prefix>/_tmp.-ext-10000 
newpath will be
+   * 
hr=a1/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME 
Then,
+   * this.tmpPath and this.taskTmpPath will be update correctly. We have 
list_bucket_dml_6.q cover
+   * this case: DP + LP + multiple skewed values + merge.
+   * @param inputPath
+   * @throws HiveException
+   * @throws IOException
+   */
+  protected void fixTmpPath(Path inputPath) throws HiveException, IOException {
+    dpPath = inputPath;
+    Path newPath = new Path(".");
+    int inputDepth = inputPath.depth();
+    int tmpDepth = tmpPath.depth();
+
+    // Build the path from bottom up
+    while (inputPath != null && inputDepth > tmpDepth) {
+      newPath = new Path(inputPath.getName(), newPath);
+      inputDepth--;
+      inputPath = inputPath.getParent();
+    }
+
+    Path newTmpPath = new Path(tmpPath, newPath);
+    Path newTaskTmpPath = new Path(taskTmpPath, newPath);
+    if (!fs.exists(newTmpPath)) {
+      fs.mkdirs(newTmpPath);
+    }
+    updatePaths(newTmpPath, newTaskTmpPath);
+  }
+
+  /**
+   * Fixes tmpPath to point to the correct list bucketing sub-directories. 
Before this is called,
+   * tmpPath will default to the root tmp table dir Reason to add a new method 
instead of changing
+   * fixTmpPath() Reason 1: logic has slightly difference fixTmpPath(..) needs 
2 variables in order
+   * to decide path delta which is in variable newPath. 1. inputPath.depth() 
2. tmpPath.depth()
+   * fixTmpPathConcatenate needs 2 variables too but one of them is different 
from fixTmpPath(..) 1.
+   * inputPath.depth() 2. listBucketingDepth Reason 2: less risks The existing 
logic is a little not
+   * trivial around map() and fixTmpPath(). In order to ensure minimum impact 
on existing flow, we
+   * try to avoid change on existing code/flow but add new code for new 
feature.
+   * @param inputPath
+   * @throws HiveException
+   * @throws IOException
+   */
+  protected void fixTmpPathConcatenate(Path inputPath) throws HiveException, 
IOException {
+    dpPath = inputPath;
+    Path newPath = new Path(".");
+
+    int depth = listBucketingDepth;
+    // Build the path from bottom up. pick up list bucketing subdirectories
+    while ((inputPath != null) && (depth > 0)) {
+      newPath = new Path(inputPath.getName(), newPath);
+      inputPath = inputPath.getParent();
+      depth--;
+    }
+
+    Path newTmpPath = new Path(tmpPath, newPath);
+    Path newTaskTmpPath = new Path(taskTmpPath, newPath);
+    if (!fs.exists(newTmpPath)) {
+      fs.mkdirs(newTmpPath);
+    }
+    updatePaths(newTmpPath, newTaskTmpPath);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!exception) {
+      FileStatus fss = fs.getFileStatus(outPath);
+      LOG.info("renamed path " + outPath + " to " + finalPath + " . File size 
is " + fss.getLen());
+      if (!fs.rename(outPath, finalPath)) {
+        throw new IOException("Unable to rename output to " + finalPath);
+      }
+    } else {
+      if (!autoDelete) {
+        fs.delete(outPath, true);
+      }
+    }
+  }
+
+  protected void fixTmpPathAlterTable(Path path) throws IOException, 
HiveException {
+    /**
+     * 1. boolean isListBucketingAlterTableConcatenate will be true only if it 
is alter table ...
+     * concatenate on stored-as-dir so it will handle list bucketing alter 
table merge in the if
+     * cause with the help of fixTmpPathConcatenate 2. If it is DML,
+     * isListBucketingAlterTableConcatenate will be false so that it will be 
handled by else
+     * cause. In this else cause, we have another if check. 2.1 the if check 
will make sure DP or
+     * LB, we will fix path with the help of fixTmpPath(..). Since both has 
sub-directories. it
+     * includes SP + LB. 2.2 only SP without LB, we dont fix path.
+     */
+
+    // Fix temp path for alter table ... concatenate
+    if (isListBucketingAlterTableConcatenate) {
+      if (this.tmpPathFixedConcatenate) {
+        checkPartitionsMatch(path);
+      } else {
+        fixTmpPathConcatenate(path);
+        tmpPathFixedConcatenate = true;
+      }
+    } else {
+      if (hasDynamicPartitions || (listBucketingDepth > 0)) {
+        if (tmpPathFixed) {
+          checkPartitionsMatch(path);
+        } else {
+          // We haven't fixed the TMP path for this mapper yet
+          fixTmpPath(path);
+          tmpPathFixed = true;
+        }
+      }
+    }
+  }
+}

Added: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeOutputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeOutputFormat.java?rev=1614793&view=auto
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeOutputFormat.java
 (added)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeOutputFormat.java
 Wed Jul 30 23:50:11 2014
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.merge;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.util.Progressable;
+
+public abstract class MergeOutputFormat extends
+    FileOutputFormat<Object, Object> {
+
+  public static void setMergeOutputPath(JobConf job, Path path) {
+    job.set("hive.merge.output.dir", path.toString());
+  }
+
+  public static Path getMergeOutputPath(JobConf conf) {
+    String name = conf.get("hive.merge.output.dir");
+    return name == null ? null: new Path(name);
+  }
+
+  public abstract RecordWriter<Object, Object> getRecordWriter(
+      FileSystem ignored, JobConf job, String name, Progressable progress)
+      throws IOException;
+}
\ No newline at end of file

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeTask.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeTask.java?rev=1614793&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeTask.java 
(added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeTask.java 
Wed Jul 30 23:50:11 2014
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.merge;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.HadoopJobExecHelper;
+import org.apache.hadoop.hive.ql.exec.mr.HadoopJobExecHook;
+import org.apache.hadoop.hive.ql.exec.mr.Throttle;
+import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.log4j.Appender;
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.LogManager;
+
+public class MergeTask extends Task<MergeWork> implements Serializable,
+    HadoopJobExecHook {
+
+  private static final long serialVersionUID = 1L;
+
+  public static String BACKUP_PREFIX = "_backup.";
+
+  protected transient JobConf job;
+  protected HadoopJobExecHelper jobExecHelper;
+
+  @Override
+  public void initialize(HiveConf conf, QueryPlan queryPlan,
+      DriverContext driverContext) {
+    super.initialize(conf, queryPlan, driverContext);
+    job = new JobConf(conf, MergeTask.class);
+    jobExecHelper = new HadoopJobExecHelper(job, this.console, this, this);
+  }
+
+  @Override
+  public boolean requireLock() {
+    return true;
+  }
+
+  boolean success = true;
+
+  @Override
+  /**
+   * start a new map-reduce job to do the merge, almost the same as ExecDriver.
+   */
+  public int execute(DriverContext driverContext) {
+    HiveConf.setVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT,
+        CombineHiveInputFormat.class.getName());
+    success = true;
+    ShimLoader.getHadoopShims().prepareJobOutput(job);
+    job.setOutputFormat(HiveOutputFormatImpl.class);
+    Class<? extends Mapper> mapperClass = 
work.getMapperClass(work.getSourceTableInputFormat());
+    LOG.info("Using " + mapperClass.getCanonicalName() + " mapper class.");
+    job.setMapperClass(mapperClass);
+
+    Context ctx = driverContext.getCtx();
+    boolean ctxCreated = false;
+    try {
+      if (ctx == null) {
+        ctx = new Context(job);
+        ctxCreated = true;
+      }
+    }catch (IOException e) {
+      e.printStackTrace();
+      console.printError("Error launching map-reduce job", "\n"
+          + org.apache.hadoop.util.StringUtils.stringifyException(e));
+      return 5;
+    }
+
+    job.setMapOutputKeyClass(NullWritable.class);
+    job.setMapOutputValueClass(NullWritable.class);
+    if(work.getNumMapTasks() != null) {
+      job.setNumMapTasks(work.getNumMapTasks());
+    }
+
+    // zero reducers
+    job.setNumReduceTasks(0);
+
+    if (work.getMinSplitSize() != null) {
+      HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZE, work
+          .getMinSplitSize().longValue());
+    }
+
+    if (work.getInputformat() != null) {
+      HiveConf.setVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT, work
+          .getInputformat());
+    }
+
+    String inpFormat = HiveConf.getVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT);
+    if ((inpFormat == null) || (!StringUtils.isNotBlank(inpFormat))) {
+      inpFormat = ShimLoader.getHadoopShims().getInputFormatClassName();
+    }
+
+    LOG.info("Using " + inpFormat);
+
+    try {
+      job.setInputFormat((Class<? extends InputFormat>) (Class
+          .forName(inpFormat)));
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException(e.getMessage());
+    }
+
+    Path outputPath = this.work.getOutputDir();
+    Path tempOutPath = Utilities.toTempPath(outputPath);
+    try {
+      FileSystem fs = tempOutPath.getFileSystem(job);
+      if (!fs.exists(tempOutPath)) {
+        fs.mkdirs(tempOutPath);
+      }
+    } catch (IOException e) {
+      console.printError("Can't make path " + outputPath + " : " + 
e.getMessage());
+      return 6;
+    }
+
+    MergeOutputFormat.setMergeOutputPath(job, outputPath);
+
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(NullWritable.class);
+
+    HiveConf.setBoolVar(job,
+        HiveConf.ConfVars.HIVEMERGECURRENTJOBHASDYNAMICPARTITIONS,
+        work.hasDynamicPartitions());
+
+    HiveConf.setBoolVar(job,
+        HiveConf.ConfVars.HIVEMERGECURRENTJOBCONCATENATELISTBUCKETING,
+        work.isListBucketingAlterTableConcatenate());
+
+    HiveConf.setIntVar(
+        job,
+        HiveConf.ConfVars.HIVEMERGECURRENTJOBCONCATENATELISTBUCKETINGDEPTH,
+        ((work.getListBucketingCtx() == null) ? 0 : work.getListBucketingCtx()
+            .calculateListBucketingLevel()));
+
+    int returnVal = 0;
+    RunningJob rj = null;
+    boolean noName = StringUtils.isEmpty(HiveConf.getVar(job,
+        HiveConf.ConfVars.HADOOPJOBNAME));
+
+    String jobName = null;
+    if (noName && this.getQueryPlan() != null) {
+      int maxlen = conf.getIntVar(HiveConf.ConfVars.HIVEJOBNAMELENGTH);
+      jobName = Utilities.abbreviate(this.getQueryPlan().getQueryStr(),
+          maxlen - 6);
+    }
+
+    if (noName) {
+      // This is for a special case to ensure unit tests pass
+      HiveConf.setVar(job, HiveConf.ConfVars.HADOOPJOBNAME,
+          jobName != null ? jobName : "JOB" + Utilities.randGen.nextInt());
+    }
+
+    try {
+      addInputPaths(job, work);
+
+      Utilities.setMapWork(job, work, ctx.getMRTmpPath(), true);
+
+      // remove the pwd from conf file so that job tracker doesn't show this
+      // logs
+      String pwd = HiveConf.getVar(job, HiveConf.ConfVars.METASTOREPWD);
+      if (pwd != null) {
+        HiveConf.setVar(job, HiveConf.ConfVars.METASTOREPWD, "HIVE");
+      }
+      JobClient jc = new JobClient(job);
+
+      String addedJars = Utilities.getResourceFiles(job, 
SessionState.ResourceType.JAR);
+      if (!addedJars.isEmpty()) {
+        job.set("tmpjars", addedJars);
+      }
+
+      // make this client wait if job trcker is not behaving well.
+      Throttle.checkJobTracker(job, LOG);
+
+      // Finally SUBMIT the JOB!
+      rj = jc.submitJob(job);
+
+      returnVal = jobExecHelper.progress(rj, jc, null);
+      success = (returnVal == 0);
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      String mesg = " with exception '" + Utilities.getNameMessage(e) + "'";
+      if (rj != null) {
+        mesg = "Ended Job = " + rj.getJobID() + mesg;
+      } else {
+        mesg = "Job Submission failed" + mesg;
+      }
+
+      // Has to use full name to make sure it does not conflict with
+      // org.apache.commons.lang.StringUtils
+      console.printError(mesg, "\n"
+          + org.apache.hadoop.util.StringUtils.stringifyException(e));
+
+      success = false;
+      returnVal = 1;
+    } finally {
+      try {
+        if (ctxCreated) {
+          ctx.clear();
+        }
+        if (rj != null) {
+          if (returnVal != 0) {
+            rj.killJob();
+          }
+          HadoopJobExecHelper.runningJobs.remove(rj);
+          jobID = rj.getID().toString();
+        }
+        closeJob(outputPath, success, job, console, work.getDynPartCtx(), 
null);
+      } catch (Exception e) {
+      }
+    }
+
+    return (returnVal);
+  }
+
+  private Path backupOutputPath(FileSystem fs, Path outpath, JobConf job)
+      throws IOException, HiveException {
+    if (fs.exists(outpath)) {
+      Path backupPath = new Path(outpath.getParent(), BACKUP_PREFIX
+          + outpath.getName());
+      Utilities.rename(fs, outpath, backupPath);
+      return backupPath;
+    } else {
+      return null;
+    }
+  }
+
+  private void closeJob(Path outputPath, boolean success, JobConf job,
+      LogHelper console, DynamicPartitionCtx dynPartCtx, Reporter reporter
+      ) throws HiveException, IOException {
+    FileSystem fs = outputPath.getFileSystem(job);
+    Path backupPath = backupOutputPath(fs, outputPath, job);
+    Utilities.mvFileToFinalPath(outputPath, job, success, LOG, dynPartCtx, 
null,
+      reporter);
+    fs.delete(backupPath, true);
+  }
+
+  private void addInputPaths(JobConf job, MergeWork work) {
+    for (Path path : work.getInputPaths()) {
+      FileInputFormat.addInputPath(job, path);
+    }
+  }
+
+  @Override
+  public String getName() {
+    return "MergeTask";
+  }
+
+  public static String INPUT_SEPERATOR = ":";
+
+  public static void main(String[] args) {
+    String inputPathStr = null;
+    String outputDir = null;
+    String jobConfFileName = null;
+    String format = null;
+
+    try {
+      for (int i = 0; i < args.length; i++) {
+        if (args[i].equals("-input")) {
+          inputPathStr = args[++i];
+        } else if (args[i].equals("-jobconffile")) {
+          jobConfFileName = args[++i];
+        } else if (args[i].equals("-outputDir")) {
+          outputDir = args[++i];
+        } else if (args[i].equals("-format")) {
+          format = args[++i];
+        }
+      }
+    } catch (IndexOutOfBoundsException e) {
+      System.err.println("Missing argument to option");
+      printUsage();
+    }
+
+    if (inputPathStr == null || outputDir == null
+        || outputDir.trim().equals("")) {
+      printUsage();
+    }
+
+    List<Path> inputPaths = new ArrayList<Path>();
+    String[] paths = inputPathStr.split(INPUT_SEPERATOR);
+    if (paths == null || paths.length == 0) {
+      printUsage();
+    }
+
+    FileSystem fs = null;
+    JobConf conf = new JobConf(MergeTask.class);
+    for (String path : paths) {
+      try {
+        Path pathObj = new Path(path);
+        if (fs == null) {
+          fs = FileSystem.get(pathObj.toUri(), conf);
+        }
+        FileStatus fstatus = fs.getFileStatus(pathObj);
+        if (fstatus.isDir()) {
+          FileStatus[] fileStatus = fs.listStatus(pathObj);
+          for (FileStatus st : fileStatus) {
+            inputPaths.add(st.getPath());
+          }
+        } else {
+          inputPaths.add(fstatus.getPath());
+        }
+      } catch (IOException e) {
+        e.printStackTrace(System.err);
+      }
+    }
+
+    if (jobConfFileName != null) {
+      conf.addResource(new Path(jobConfFileName));
+    }
+    HiveConf hiveConf = new HiveConf(conf, MergeTask.class);
+
+    Log LOG = LogFactory.getLog(MergeTask.class.getName());
+    boolean isSilent = HiveConf.getBoolVar(conf,
+        HiveConf.ConfVars.HIVESESSIONSILENT);
+    LogHelper console = new LogHelper(LOG, isSilent);
+
+    // print out the location of the log file for the user so
+    // that it's easy to find reason for local mode execution failures
+    for (Appender appender : Collections
+        .list((Enumeration<Appender>) LogManager.getRootLogger()
+            .getAllAppenders())) {
+      if (appender instanceof FileAppender) {
+        console.printInfo("Execution log at: "
+            + ((FileAppender) appender).getFile());
+      }
+    }
+
+    MergeWork mergeWork = null;
+    if (format.equals("rcfile")) {
+      mergeWork = new MergeWork(inputPaths, new Path(outputDir), 
RCFileInputFormat.class);
+    } else if (format.equals("orcfile")) {
+      mergeWork = new MergeWork(inputPaths, new Path(outputDir), 
OrcInputFormat.class);
+    }
+
+    DriverContext driverCxt = new DriverContext();
+    MergeTask taskExec = new MergeTask();
+    taskExec.initialize(hiveConf, null, driverCxt);
+    taskExec.setWork(mergeWork);
+    int ret = taskExec.execute(driverCxt);
+
+    if (ret != 0) {
+      System.exit(2);
+    }
+
+  }
+
+  private static void printUsage() {
+    System.err.println("MergeTask -format <rcfile or orcfile> -input <colon 
seperated input paths>  "
+        + "-outputDir outputDir [-jobconffile <job conf file>] ");
+    System.exit(1);
+  }
+
+  @Override
+  public StageType getType() {
+    return StageType.MAPRED;
+  }
+
+  @Override
+  public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) {
+    return false;
+  }
+
+  @Override
+  public void logPlanProgress(SessionState ss) throws IOException {
+    // no op
+  }
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeWork.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeWork.java?rev=1614793&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeWork.java 
(added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeWork.java 
Wed Jul 30 23:50:11 2014
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.merge;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.HiveStatsUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcFileMergeMapper;
+import org.apache.hadoop.hive.ql.io.orc.OrcFileStripeMergeInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileBlockMergeInputFormat;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileMergeMapper;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.Mapper;
+
+@Explain(displayName = "Merge Work")
+public class MergeWork extends MapWork implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  private transient List<Path> inputPaths;
+  private transient Path outputDir;
+  private boolean hasDynamicPartitions;
+  private DynamicPartitionCtx dynPartCtx;
+  private boolean isListBucketingAlterTableConcatenate;
+  private ListBucketingCtx listBucketingCtx;
+  private Class<? extends InputFormat> srcTblInputFormat;
+
+  public MergeWork() {
+  }
+
+  public MergeWork(List<Path> inputPaths, Path outputDir,
+      Class<? extends InputFormat> srcTblInputFormat) {
+    this(inputPaths, outputDir, false, null, srcTblInputFormat);
+  }
+
+  public MergeWork(List<Path> inputPaths, Path outputDir,
+      boolean hasDynamicPartitions, DynamicPartitionCtx dynPartCtx,
+      Class<? extends InputFormat> srcTblInputFormat) {
+    super();
+    this.inputPaths = inputPaths;
+    this.outputDir = outputDir;
+    this.hasDynamicPartitions = hasDynamicPartitions;
+    this.dynPartCtx = dynPartCtx;
+    this.srcTblInputFormat = srcTblInputFormat;
+    PartitionDesc partDesc = new PartitionDesc();
+    if(srcTblInputFormat.equals(OrcInputFormat.class)) {
+      partDesc.setInputFileFormatClass(OrcFileStripeMergeInputFormat.class);
+    } else if(srcTblInputFormat.equals(RCFileInputFormat.class)) {
+      partDesc.setInputFileFormatClass(RCFileBlockMergeInputFormat.class);
+    }
+    if(this.getPathToPartitionInfo() == null) {
+      this.setPathToPartitionInfo(new LinkedHashMap<String, PartitionDesc>());
+    }
+    for(Path path: this.inputPaths) {
+      this.getPathToPartitionInfo().put(path.toString(), partDesc);
+    }
+  }
+
+  public List<Path> getInputPaths() {
+    return inputPaths;
+  }
+
+  public void setInputPaths(List<Path> inputPaths) {
+    this.inputPaths = inputPaths;
+  }
+
+  public Path getOutputDir() {
+    return outputDir;
+  }
+
+  public void setOutputDir(Path outputDir) {
+    this.outputDir = outputDir;
+  }
+
+  public Class<? extends Mapper> getMapperClass(Class<? extends InputFormat> 
klass) {
+    if (klass.equals(RCFileInputFormat.class)) {
+      return RCFileMergeMapper.class;
+    } else if (klass.equals(OrcInputFormat.class)) {
+      return OrcFileMergeMapper.class;
+    }
+    return null;
+  }
+
+  @Override
+  public Long getMinSplitSize() {
+    return null;
+  }
+
+  @Override
+  public String getInputformat() {
+    return CombineHiveInputFormat.class.getName();
+  }
+
+  @Override
+  public boolean isGatheringStats() {
+    return false;
+  }
+
+  public boolean hasDynamicPartitions() {
+    return this.hasDynamicPartitions;
+  }
+
+  public void setHasDynamicPartitions(boolean hasDynamicPartitions) {
+    this.hasDynamicPartitions = hasDynamicPartitions;
+  }
+
+  @Override
+  public void resolveDynamicPartitionStoredAsSubDirsMerge(HiveConf conf, Path 
path,
+      TableDesc tblDesc, ArrayList<String> aliases, PartitionDesc partDesc) {
+
+    String inputFormatClass = null;
+    if (tblDesc.getInputFileFormatClass().equals(RCFileInputFormat.class)) {
+      inputFormatClass = 
conf.getVar(HiveConf.ConfVars.HIVEMERGEINPUTFORMATBLOCKLEVEL);
+    } else if (tblDesc.getInputFileFormatClass().equals(OrcInputFormat.class)){
+      inputFormatClass = 
conf.getVar(HiveConf.ConfVars.HIVEMERGEINPUTFORMATSTRIPELEVEL);
+    }
+
+    try {
+      partDesc.setInputFileFormatClass((Class <? extends InputFormat>)
+          Class.forName(inputFormatClass));
+    } catch (ClassNotFoundException e) {
+      String msg = "Merge input format class not found";
+      throw new RuntimeException(msg);
+    }
+    super.resolveDynamicPartitionStoredAsSubDirsMerge(conf, path, tblDesc, 
aliases, partDesc);
+
+    // Add the DP path to the list of input paths
+    inputPaths.add(path);
+  }
+
+  /**
+   * alter table ... concatenate
+   *
+   * If it is skewed table, use subdirectories in inputpaths.
+   */
+  public void resolveConcatenateMerge(HiveConf conf) {
+    isListBucketingAlterTableConcatenate = ((listBucketingCtx == null) ? false 
: listBucketingCtx
+        .isSkewedStoredAsDir());
+    if (isListBucketingAlterTableConcatenate) {
+      // use sub-dir as inputpath.
+      assert ((this.inputPaths != null) && (this.inputPaths.size() == 1)) :
+        "alter table ... concatenate should only have one directory inside 
inputpaths";
+      Path dirPath = inputPaths.get(0);
+      try {
+        FileSystem inpFs = dirPath.getFileSystem(conf);
+        FileStatus[] status = HiveStatsUtils.getFileStatusRecurse(dirPath, 
listBucketingCtx
+            .getSkewedColNames().size(), inpFs);
+        List<Path> newInputPath = new ArrayList<Path>();
+        boolean succeed = true;
+        for (int i = 0; i < status.length; ++i) {
+           if (status[i].isDir()) {
+             // Add the lb path to the list of input paths
+             newInputPath.add(status[i].getPath());
+           } else {
+             // find file instead of dir. dont change inputpath
+             succeed = false;
+           }
+        }
+        assert (succeed || ((!succeed) && newInputPath.isEmpty())) : "This 
partition has "
+            + " inconsistent file structure: "
+            + "it is stored-as-subdir and expected all files in the same depth 
of subdirectories.";
+        if (succeed) {
+          inputPaths.clear();
+          inputPaths.addAll(newInputPath);
+        }
+      } catch (IOException e) {
+        String msg = "Fail to get filesystem for directory name : " + 
dirPath.toUri();
+        throw new RuntimeException(msg, e);
+      }
+
+    }
+  }
+
+  public DynamicPartitionCtx getDynPartCtx() {
+    return dynPartCtx;
+  }
+
+  public void setDynPartCtx(DynamicPartitionCtx dynPartCtx) {
+    this.dynPartCtx = dynPartCtx;
+  }
+
+  /**
+   * @return the listBucketingCtx
+   */
+  public ListBucketingCtx getListBucketingCtx() {
+    return listBucketingCtx;
+  }
+
+  /**
+   * @param listBucketingCtx the listBucketingCtx to set
+   */
+  public void setListBucketingCtx(ListBucketingCtx listBucketingCtx) {
+    this.listBucketingCtx = listBucketingCtx;
+  }
+
+  /**
+   * @return the isListBucketingAlterTableConcatenate
+   */
+  public boolean isListBucketingAlterTableConcatenate() {
+    return isListBucketingAlterTableConcatenate;
+  }
+
+  public Class<? extends InputFormat> getSourceTableInputFormat() {
+    return srcTblInputFormat;
+  }
+
+  @Explain(displayName = "input format")
+  public String getStringifiedInputFormat() {
+    return srcTblInputFormat.getCanonicalName();
+  }
+
+  @Explain(displayName = "merge level")
+  public String getMergeLevel() {
+    if (srcTblInputFormat != null) {
+      if (srcTblInputFormat.equals(OrcInputFormat.class)) {
+        return "stripe";
+      } else if (srcTblInputFormat.equals(RCFileInputFormat.class)) {
+        return "block";
+      }
+    }
+    return null;
+  }
+
+  public void setSourceTableInputFormat(Class<? extends InputFormat> 
srcTblInputFormat) {
+    this.srcTblInputFormat = srcTblInputFormat;
+  }
+
+}

Added: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileKeyWrapper.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileKeyWrapper.java?rev=1614793&view=auto
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileKeyWrapper.java 
(added)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileKeyWrapper.java 
Wed Jul 30 23:50:11 2014
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Key for OrcFileMergeMapper task. Contains orc file related information that
+ * should match before merging two orc files.
+ */
+public class OrcFileKeyWrapper implements 
WritableComparable<OrcFileKeyWrapper> {
+
+  protected Path inputPath;
+  protected CompressionKind compression;
+  protected long compressBufferSize;
+  protected List<OrcProto.Type> types;
+  protected int rowIndexStride;
+  protected List<Integer> versionList;
+
+  public List<Integer> getVersionList() {
+    return versionList;
+  }
+
+  public void setVersionList(List<Integer> versionList) {
+    this.versionList = versionList;
+  }
+
+  public int getRowIndexStride() {
+    return rowIndexStride;
+  }
+
+  public void setRowIndexStride(int rowIndexStride) {
+    this.rowIndexStride = rowIndexStride;
+  }
+
+  public long getCompressBufferSize() {
+    return compressBufferSize;
+  }
+
+  public void setCompressBufferSize(long compressBufferSize) {
+    this.compressBufferSize = compressBufferSize;
+  }
+
+  public CompressionKind getCompression() {
+    return compression;
+  }
+
+  public void setCompression(CompressionKind compression) {
+    this.compression = compression;
+  }
+
+  public List<OrcProto.Type> getTypes() {
+    return types;
+  }
+
+  public void setTypes(List<OrcProto.Type> types) {
+    this.types = types;
+  }
+
+  public Path getInputPath() {
+    return inputPath;
+  }
+
+  public void setInputPath(Path inputPath) {
+    this.inputPath = inputPath;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    throw new RuntimeException("Not supported.");
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    throw new RuntimeException("Not supported.");
+  }
+
+  @Override
+  public int compareTo(OrcFileKeyWrapper o) {
+    return inputPath.compareTo(o.inputPath);
+  }
+
+}

Added: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileMergeMapper.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileMergeMapper.java?rev=1614793&view=auto
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileMergeMapper.java 
(added)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileMergeMapper.java 
Wed Jul 30 23:50:11 2014
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.merge.MergeMapper;
+import org.apache.hadoop.hive.shims.CombineHiveKey;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * Map task fast merging of ORC files.
+ */
+public class OrcFileMergeMapper extends MergeMapper implements
+    Mapper<Object, OrcFileValueWrapper, Object, Object> {
+
+  // These parameters must match for all orc files involved in merging
+  CompressionKind compression = null;
+  long compressBuffSize = 0;
+  List<Integer> version;
+  int columnCount = 0;
+  int rowIndexStride = 0;
+
+  Writer outWriter;
+  private byte[] buffer;
+  Path prevPath;
+  private Reader reader;
+  private FSDataInputStream fdis;
+  public final static Log LOG = LogFactory.getLog("OrcFileMergeMapper");
+
+  @Override
+  public void configure(JobConf job) {
+    super.configure(job);
+
+    outWriter = null;
+    buffer = null;
+    prevPath = null;
+    reader = null;
+    fdis = null;
+  }
+
+  @Override
+  public void map(Object key, OrcFileValueWrapper value, 
OutputCollector<Object, Object> output,
+      Reporter reporter) throws IOException {
+    try {
+
+      OrcFileKeyWrapper k = null;
+      if (key instanceof CombineHiveKey) {
+        k = (OrcFileKeyWrapper) ((CombineHiveKey) key).getKey();
+      } else {
+        k = (OrcFileKeyWrapper) key;
+      }
+
+      fixTmpPathAlterTable(k.getInputPath().getParent());
+
+      if (prevPath == null) {
+        prevPath = k.getInputPath();
+        reader = OrcFile.createReader(fs, k.inputPath);
+      }
+
+      // store the orc configuration from the first file. All other files 
should
+      // match this configuration before merging
+      if (outWriter == null) {
+        compression = k.getCompression();
+        compressBuffSize = k.getCompressBufferSize();
+        version = k.getVersionList();
+        columnCount = k.getTypes().get(0).getSubtypesCount();
+        rowIndexStride = k.getRowIndexStride();
+
+        // block size and stripe size will be from config
+        outWriter = OrcFile.createWriter(outPath, 
OrcFile.writerOptions(jc).compress(compression)
+            .inspector(reader.getObjectInspector()));
+      }
+
+      // check compatibility with subsequent files
+      if ((k.getTypes().get(0).getSubtypesCount() != columnCount)) {
+        throw new IOException("ORCFileMerge failed because the input files are 
not compatible."
+            + " Column counts does not match.");
+      }
+
+      if (!k.compression.equals(compression)) {
+        throw new IOException("ORCFileMerge failed because the input files are 
not compatible."
+            + " Compression codec does not match.");
+      }
+
+      if (k.compressBufferSize != compressBuffSize) {
+        throw new IOException("ORCFileMerge failed because the input files are 
not compatible."
+            + " Compression buffer size does not match.");
+
+      }
+
+      if (!k.versionList.equals(version)) {
+        throw new IOException("ORCFileMerge failed because the input files are 
not compatible."
+            + " Version does not match.");
+      }
+
+      if (k.rowIndexStride != rowIndexStride) {
+        throw new IOException("ORCFileMerge failed because the input files are 
not compatible."
+            + " Row index stride does not match.");
+      }
+
+      // next file in the path
+      if (!k.getInputPath().equals(prevPath)) {
+        reader = OrcFile.createReader(fs, k.inputPath);
+      }
+
+      // initialize buffer to read the entire stripe
+      buffer = new byte[(int) value.stripeInformation.getLength()];
+      fdis = fs.open(k.inputPath);
+      fdis.readFully(value.stripeInformation.getOffset(), buffer, 0,
+          (int) value.stripeInformation.getLength());
+
+      // append the stripe buffer to the new ORC file
+      ((WriterImpl) outWriter).appendStripe(buffer, 
value.getStripeInformation(),
+          value.getStripeStatistics());
+
+      LOG.info("Merged stripe from file " + k.inputPath + " [ offset : "
+          + value.getStripeInformation().getOffset() + " length: "
+          + value.getStripeInformation().getLength() + " ]");
+
+      // add user metadata to footer in case of any
+      if (value.isLastStripeInFile()) {
+        ((WriterImpl) outWriter).appendUserMetadata(value.getUserMetadata());
+      }
+    } catch (Throwable e) {
+      this.exception = true;
+      close();
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    // close writer
+    if (outWriter == null) {
+      return;
+    }
+
+    if (fdis != null) {
+      fdis.close();
+      fdis = null;
+    }
+
+    outWriter.close();
+    outWriter = null;
+
+    super.close();
+  }
+}

Added: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeInputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeInputFormat.java?rev=1614793&view=auto
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeInputFormat.java
 (added)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeInputFormat.java
 Wed Jul 30 23:50:11 2014
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.io.merge.MergeInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+public class OrcFileStripeMergeInputFormat extends MergeInputFormat {
+
+  @Override
+  public RecordReader<OrcFileKeyWrapper, OrcFileValueWrapper> getRecordReader(
+      InputSplit split, JobConf job, Reporter reporter) throws IOException {
+
+    reporter.setStatus(split.toString());
+    return new OrcFileStripeMergeRecordReader(job, (FileSplit) split);
+  }
+
+}

Added: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeRecordReader.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeRecordReader.java?rev=1614793&view=auto
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeRecordReader.java
 (added)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeRecordReader.java
 Wed Jul 30 23:50:11 2014
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+
+public class OrcFileStripeMergeRecordReader implements
+    RecordReader<OrcFileKeyWrapper, OrcFileValueWrapper> {
+
+  private final Reader reader;
+  private final Path path;
+  protected Iterator<StripeInformation> iter;
+  protected List<OrcProto.StripeStatistics> stripeStatistics;
+  private int stripeIdx;
+  private long start;
+  private long end;
+
+  public OrcFileStripeMergeRecordReader(Configuration conf, FileSplit split) 
throws IOException {
+    path = split.getPath();
+    start = split.getStart();
+    end = start + split.getLength();
+    FileSystem fs = path.getFileSystem(conf);
+    this.reader = OrcFile.createReader(path, 
OrcFile.readerOptions(conf).filesystem(fs));
+    this.iter = reader.getStripes().iterator();
+    this.stripeIdx = 0;
+    this.stripeStatistics = ((ReaderImpl) 
reader).getOrcProtoStripeStatistics();
+  }
+
+  public Class<?> getKeyClass() {
+    return OrcFileKeyWrapper.class;
+  }
+
+  public Class<?> getValueClass() {
+    return OrcFileValueWrapper.class;
+  }
+
+  public OrcFileKeyWrapper createKey() {
+    return new OrcFileKeyWrapper();
+  }
+
+  public OrcFileValueWrapper createValue() {
+    return new OrcFileValueWrapper();
+  }
+
+  @Override
+  public boolean next(OrcFileKeyWrapper key, OrcFileValueWrapper value) throws 
IOException {
+    return nextStripe(key, value);
+  }
+
+  protected boolean nextStripe(OrcFileKeyWrapper keyWrapper, 
OrcFileValueWrapper valueWrapper)
+      throws IOException {
+    while (iter.hasNext()) {
+      StripeInformation si = iter.next();
+
+      // if stripe offset is outside the split boundary then ignore the current
+      // stripe as it will be handled by some other mapper.
+      if (si.getOffset() >= start && si.getOffset() < end) {
+        valueWrapper.setStripeStatistics(stripeStatistics.get(stripeIdx++));
+        valueWrapper.setStripeInformation(si);
+        if (!iter.hasNext()) {
+          valueWrapper.setLastStripeInFile(true);
+          valueWrapper.setUserMetadata(((ReaderImpl) 
reader).getOrcProtoUserMetadata());
+        }
+        keyWrapper.setInputPath(path);
+        keyWrapper.setCompression(reader.getCompression());
+        keyWrapper.setCompressBufferSize(reader.getCompressionSize());
+        keyWrapper.setVersionList(((ReaderImpl) 
reader).getFileMetaInfo().versionList);
+        keyWrapper.setRowIndexStride(reader.getRowIndexStride());
+        keyWrapper.setTypes(reader.getTypes());
+      } else {
+        continue;
+      }
+      return true;
+    }
+
+    return false;
+  }
+
+  /**
+   * Default progress will be based on number of files processed.
+   * @return 0.0 to 1.0 of the input byte range
+   */
+  public float getProgress() throws IOException {
+    return 0.0f;
+  }
+
+  public long getPos() throws IOException {
+    return 0;
+  }
+
+  protected void seek(long pos) throws IOException {
+  }
+
+  public long getStart() {
+    return 0;
+  }
+
+  public void close() throws IOException {
+  }
+
+}

Added: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileValueWrapper.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileValueWrapper.java?rev=1614793&view=auto
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileValueWrapper.java
 (added)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileValueWrapper.java
 Wed Jul 30 23:50:11 2014
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Value for OrcFileMergeMapper. Contains stripe related information for the
+ * current orc file that is being merged.
+ */
+public class OrcFileValueWrapper implements 
WritableComparable<OrcFileValueWrapper> {
+
+  protected StripeInformation stripeInformation;
+  protected OrcProto.StripeStatistics stripeStatistics;
+  protected List<OrcProto.UserMetadataItem> userMetadata;
+  protected boolean lastStripeInFile;
+
+  public List<OrcProto.UserMetadataItem> getUserMetadata() {
+    return userMetadata;
+  }
+
+  public void setUserMetadata(List<OrcProto.UserMetadataItem> userMetadata) {
+    this.userMetadata = userMetadata;
+  }
+
+  public boolean isLastStripeInFile() {
+    return lastStripeInFile;
+  }
+
+  public void setLastStripeInFile(boolean lastStripeInFile) {
+    this.lastStripeInFile = lastStripeInFile;
+  }
+
+  public OrcProto.StripeStatistics getStripeStatistics() {
+    return stripeStatistics;
+  }
+
+  public void setStripeStatistics(OrcProto.StripeStatistics stripeStatistics) {
+    this.stripeStatistics = stripeStatistics;
+  }
+
+  public StripeInformation getStripeInformation() {
+    return stripeInformation;
+  }
+
+  public void setStripeInformation(StripeInformation stripeInformation) {
+    this.stripeInformation = stripeInformation;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    throw new RuntimeException("Not supported.");
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    throw new RuntimeException("Not supported.");
+  }
+
+  @Override
+  public int compareTo(OrcFileValueWrapper o) {
+    if (stripeInformation.getOffset() < o.getStripeInformation().getOffset()) {
+      return -1;
+    } else if (stripeInformation.getOffset() > 
o.getStripeInformation().getOffset()) {
+      return 1;
+    } else {
+      return 0;
+    }
+  }
+
+}

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java?rev=1614793&r1=1614792&r2=1614793&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java 
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java Wed 
Jul 30 23:50:11 2014
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.UserMetadataItem;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -60,6 +61,7 @@ final class ReaderImpl implements Reader
   private final ObjectInspector inspector;
   private long deserializedSize = -1;
   private final Configuration conf;
+  private final List<Integer> versionList;
 
   //serialized footer - Keeping this around for use by getFileMetaInfo()
   // will help avoid cpu cycles spend in deserializing at cost of increased
@@ -306,6 +308,7 @@ final class ReaderImpl implements Reader
     this.metadata = rInfo.metadata;
     this.footer = rInfo.footer;
     this.inspector = rInfo.inspector;
+    this.versionList = footerMetaData.versionList;
   }
 
 
@@ -387,7 +390,8 @@ final class ReaderImpl implements Reader
         ps.getCompression().toString(),
         (int) ps.getCompressionBlockSize(),
         (int) ps.getMetadataLength(),
-        buffer
+        buffer,
+        ps.getVersionList()
         );
   }
 
@@ -446,18 +450,26 @@ final class ReaderImpl implements Reader
     final int bufferSize;
     final int metadataSize;
     final ByteBuffer footerBuffer;
+    final List<Integer> versionList;
+
+    FileMetaInfo(String compressionType, int bufferSize, int metadataSize,
+        ByteBuffer footerBuffer) {
+      this(compressionType, bufferSize, metadataSize, footerBuffer, null);
+    }
+
     FileMetaInfo(String compressionType, int bufferSize, int metadataSize,
-                 ByteBuffer footerBuffer){
+                 ByteBuffer footerBuffer, List<Integer> versionList){
       this.compressionType = compressionType;
       this.bufferSize = bufferSize;
       this.metadataSize = metadataSize;
       this.footerBuffer = footerBuffer;
+      this.versionList = versionList;
     }
   }
 
   public FileMetaInfo getFileMetaInfo(){
     return new FileMetaInfo(compressionKind.toString(), bufferSize,
-        metadataSize, footerByteBuffer);
+        metadataSize, footerByteBuffer, versionList);
   }
 
 
@@ -629,4 +641,11 @@ final class ReaderImpl implements Reader
     return new Metadata(metadata);
   }
 
+  List<OrcProto.StripeStatistics> getOrcProtoStripeStatistics() {
+    return metadata.getStripeStatsList();
+  }
+
+  public List<UserMetadataItem> getOrcProtoUserMetadata() {
+    return footer.getMetadataList();
+  }
 }

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1614793&r1=1614792&r2=1614793&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java 
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Wed 
Jul 30 23:50:11 2014
@@ -28,8 +28,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
-import com.google.common.annotations.VisibleForTesting;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -43,8 +41,8 @@ import org.apache.hadoop.hive.ql.io.orc.
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.StripeStatistics;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.UserMetadataItem;
 import org.apache.hadoop.hive.ql.util.JavaDataModel;
-import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
@@ -73,6 +71,7 @@ import org.apache.hadoop.hive.serde2.typ
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.CodedOutputStream;
@@ -2218,4 +2217,78 @@ class WriterImpl implements Writer, Memo
     }
     return rawWriter.getPos();
   }
+
+  void appendStripe(byte[] stripe, StripeInformation stripeInfo,
+      OrcProto.StripeStatistics stripeStatistics) throws IOException {
+    appendStripe(stripe, 0, stripe.length, stripeInfo, stripeStatistics);
+  }
+
+  void appendStripe(byte[] stripe, int offset, int length,
+      StripeInformation stripeInfo,
+      OrcProto.StripeStatistics stripeStatistics) throws IOException {
+    getStream();
+    long start = rawWriter.getPos();
+
+    long stripeLen = length;
+    long availBlockSpace = blockSize - (start % blockSize);
+
+    // see if stripe can fit in the current hdfs block, else pad the remaining
+    // space in the block
+    if (stripeLen < blockSize && stripeLen > availBlockSpace &&
+        addBlockPadding) {
+      byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)];
+      LOG.info(String.format("Padding ORC by %d bytes while merging..",
+          availBlockSpace));
+      start += availBlockSpace;
+      while (availBlockSpace > 0) {
+        int writeLen = (int) Math.min(availBlockSpace, pad.length);
+        rawWriter.write(pad, 0, writeLen);
+        availBlockSpace -= writeLen;
+      }
+    }
+
+    rawWriter.write(stripe);
+    rowsInStripe = stripeStatistics.getColStats(0).getNumberOfValues();
+    rowCount += rowsInStripe;
+
+    // since we have already written the stripe, just update stripe statistics
+    treeWriter.stripeStatsBuilders.add(stripeStatistics.toBuilder());
+
+    // update file level statistics
+    updateFileStatistics(stripeStatistics);
+
+    // update stripe information
+    OrcProto.StripeInformation dirEntry = OrcProto.StripeInformation
+        .newBuilder()
+        .setOffset(start)
+        .setNumberOfRows(rowsInStripe)
+        .setIndexLength(stripeInfo.getIndexLength())
+        .setDataLength(stripeInfo.getDataLength())
+        .setFooterLength(stripeInfo.getFooterLength())
+        .build();
+    stripes.add(dirEntry);
+
+    // reset it after writing the stripe
+    rowsInStripe = 0;
+  }
+
+  private void updateFileStatistics(OrcProto.StripeStatistics 
stripeStatistics) {
+    List<OrcProto.ColumnStatistics> cs = stripeStatistics.getColStatsList();
+
+    // root element
+    
treeWriter.fileStatistics.merge(ColumnStatisticsImpl.deserialize(cs.get(0)));
+    TreeWriter[] childWriters = treeWriter.getChildrenWriters();
+    for (int i = 0; i < childWriters.length; i++) {
+      childWriters[i].fileStatistics.merge(
+          ColumnStatisticsImpl.deserialize(cs.get(i + 1)));
+    }
+  }
+
+  void appendUserMetadata(List<UserMetadataItem> userMetadata) {
+    if (userMetadata != null) {
+      for (UserMetadataItem item : userMetadata) {
+        this.userMetadata.put(item.getName(), item.getValue());
+      }
+    }
+  }
 }

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java?rev=1614793&r1=1614792&r2=1614793&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java
 (original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java
 Wed Jul 30 23:50:11 2014
@@ -19,22 +19,22 @@
 package org.apache.hadoop.hive.ql.io.rcfile.merge;
 
 import java.io.IOException;
-import org.apache.hadoop.mapred.FileInputFormat;
+
+import org.apache.hadoop.hive.ql.io.merge.MergeInputFormat;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 
-@SuppressWarnings({ "deprecation", "unchecked" })
-public class RCFileBlockMergeInputFormat extends FileInputFormat {
+public class RCFileBlockMergeInputFormat extends MergeInputFormat {
 
   @Override
-  public RecordReader getRecordReader(InputSplit split, JobConf job,
-      Reporter reporter) throws IOException {
+  public RecordReader<RCFileKeyBufferWrapper, RCFileValueBufferWrapper>
+    getRecordReader(InputSplit split, JobConf job, Reporter reporter)
+    throws IOException {
 
     reporter.setStatus(split.toString());
-
     return new RCFileBlockMergeRecordReader(job, (FileSplit) split);
   }
 


Reply via email to