Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java?rev=1624688&view=auto ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java (added) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java Sat Sep 13 00:39:26 2014 @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.merge; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.QueryPlan; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.mr.HadoopJobExecHelper; +import org.apache.hadoop.hive.ql.exec.mr.HadoopJobExecHook; +import org.apache.hadoop.hive.ql.exec.mr.Throttle; +import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RunningJob; + +import java.io.IOException; +import java.io.Serializable; + +/** + * Task for fast merging of ORC and RC files. + */ +public class MergeFileTask extends Task<MergeFileWork> implements Serializable, + HadoopJobExecHook { + + private transient JobConf job; + private HadoopJobExecHelper jobExecHelper; + private boolean success = true; + + @Override + public void initialize(HiveConf conf, QueryPlan queryPlan, + DriverContext driverContext) { + super.initialize(conf, queryPlan, driverContext); + job = new JobConf(conf, MergeFileTask.class); + jobExecHelper = new HadoopJobExecHelper(job, this.console, this, this); + } + + @Override + public boolean requireLock() { + return true; + } + + /** + * start a new map-reduce job to do the merge, almost the same as ExecDriver. + */ + @Override + public int execute(DriverContext driverContext) { + + Context ctx = driverContext.getCtx(); + boolean ctxCreated = false; + RunningJob rj = null; + int returnVal = 0; + + try { + if (ctx == null) { + ctx = new Context(job); + ctxCreated = true; + } + + ShimLoader.getHadoopShims().prepareJobOutput(job); + job.setInputFormat(work.getInputformatClass()); + job.setOutputFormat(HiveOutputFormatImpl.class); + job.setMapperClass(MergeFileMapper.class); + job.setMapOutputKeyClass(NullWritable.class); + job.setMapOutputValueClass(NullWritable.class); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(NullWritable.class); + job.setNumReduceTasks(0); + + // create the temp directories + Path outputPath = work.getOutputDir(); + Path tempOutPath = Utilities.toTempPath(outputPath); + FileSystem fs = tempOutPath.getFileSystem(job); + if (!fs.exists(tempOutPath)) { + fs.mkdirs(tempOutPath); + } + + // set job name + boolean noName = StringUtils.isEmpty(HiveConf.getVar(job, + HiveConf.ConfVars.HADOOPJOBNAME)); + + String jobName = null; + if (noName && this.getQueryPlan() != null) { + int maxlen = conf.getIntVar(HiveConf.ConfVars.HIVEJOBNAMELENGTH); + jobName = Utilities.abbreviate(this.getQueryPlan().getQueryStr(), + maxlen - 6); + } + + if (noName) { + // This is for a special case to ensure unit tests pass + HiveConf.setVar(job, HiveConf.ConfVars.HADOOPJOBNAME, + jobName != null ? jobName : "JOB" + Utilities.randGen.nextInt()); + } + + // add input path + addInputPaths(job, work); + + // serialize work + Utilities.setMapWork(job, work, ctx.getMRTmpPath(), true); + + // remove pwd from conf file so that job tracker doesn't show this logs + String pwd = HiveConf.getVar(job, HiveConf.ConfVars.METASTOREPWD); + if (pwd != null) { + HiveConf.setVar(job, HiveConf.ConfVars.METASTOREPWD, "HIVE"); + } + + // submit the job + JobClient jc = new JobClient(job); + + String addedJars = Utilities.getResourceFiles(job, + SessionState.ResourceType.JAR); + if (!addedJars.isEmpty()) { + job.set("tmpjars", addedJars); + } + + // make this client wait if job trcker is not behaving well. + Throttle.checkJobTracker(job, LOG); + + // Finally SUBMIT the JOB! + rj = jc.submitJob(job); + + returnVal = jobExecHelper.progress(rj, jc, null); + success = (returnVal == 0); + + } catch (Exception e) { + String mesg = " with exception '" + Utilities.getNameMessage(e) + "'"; + if (rj != null) { + mesg = "Ended Job = " + rj.getJobID() + mesg; + } else { + mesg = "Job Submission failed" + mesg; + } + + // Has to use full name to make sure it does not conflict with + // org.apache.commons.lang.StringUtils + console.printError(mesg, "\n" + + org.apache.hadoop.util.StringUtils.stringifyException(e)); + + success = false; + returnVal = 1; + } finally { + try { + if (ctxCreated) { + ctx.clear(); + } + if (rj != null) { + if (returnVal != 0) { + rj.killJob(); + } + HadoopJobExecHelper.runningJobs.remove(rj); + jobID = rj.getID().toString(); + } + // get the list of Dynamic partition paths + if (rj != null) { + if (work.getAliasToWork() != null) { + for (Operator<? extends OperatorDesc> op : work.getAliasToWork() + .values()) { + op.jobClose(job, success); + } + } + } + } catch (Exception e) { + // jobClose needs to execute successfully otherwise fail task + if (success) { + success = false; + returnVal = 3; + String mesg = "Job Commit failed with exception '" + + Utilities.getNameMessage(e) + "'"; + console.printError(mesg, "\n" + + org.apache.hadoop.util.StringUtils.stringifyException(e)); + } + } + } + + return returnVal; + } + + private void addInputPaths(JobConf job, MergeFileWork work) { + for (Path path : work.getInputPaths()) { + FileInputFormat.addInputPath(job, path); + } + } + + @Override + public String getName() { + return "MergeFileTask"; + } + + @Override + public StageType getType() { + return StageType.MAPRED; + } + + @Override + public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) { + return false; + } + + @Override + public void logPlanProgress(SessionState ss) throws IOException { + // no op + } +}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java?rev=1624688&view=auto ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java (added) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java Sat Sep 13 00:39:26 2014 @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.merge; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.HiveStatsUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; +import org.apache.hadoop.hive.ql.io.RCFileInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcFileStripeMergeInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileBlockMergeInputFormat; +import org.apache.hadoop.hive.ql.plan.Explain; +import org.apache.hadoop.hive.ql.plan.ListBucketingCtx; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.mapred.InputFormat; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; + +@Explain(displayName = "Merge File Operator") +public class MergeFileWork extends MapWork { + + private static final Log LOG = LogFactory.getLog(MergeFileWork.class); + private List<Path> inputPaths; + private Path outputDir; + private boolean hasDynamicPartitions; + private boolean isListBucketingAlterTableConcatenate; + private ListBucketingCtx listBucketingCtx; + + // source table input format + private String srcTblInputFormat; + + // internal input format used by CombineHiveInputFormat + private Class<? extends InputFormat> internalInputFormat; + + public MergeFileWork(List<Path> inputPaths, Path outputDir, + String srcTblInputFormat) { + this(inputPaths, outputDir, false, srcTblInputFormat); + } + + public MergeFileWork(List<Path> inputPaths, Path outputDir, + boolean hasDynamicPartitions, + String srcTblInputFormat) { + this.inputPaths = inputPaths; + this.outputDir = outputDir; + this.hasDynamicPartitions = hasDynamicPartitions; + this.srcTblInputFormat = srcTblInputFormat; + PartitionDesc partDesc = new PartitionDesc(); + if (srcTblInputFormat.equals(OrcInputFormat.class.getName())) { + this.internalInputFormat = OrcFileStripeMergeInputFormat.class; + } else if (srcTblInputFormat.equals(RCFileInputFormat.class.getName())) { + this.internalInputFormat = RCFileBlockMergeInputFormat.class; + } + partDesc.setInputFileFormatClass(internalInputFormat); + if (this.getPathToPartitionInfo() == null) { + this.setPathToPartitionInfo(new LinkedHashMap<String, PartitionDesc>()); + } + for (Path path : this.inputPaths) { + this.getPathToPartitionInfo().put(path.toString(), partDesc); + } + this.isListBucketingAlterTableConcatenate = false; + } + + public List<Path> getInputPaths() { + return inputPaths; + } + + public void setInputPaths(List<Path> inputPaths) { + this.inputPaths = inputPaths; + } + + public Path getOutputDir() { + return outputDir; + } + + public void setOutputDir(Path outputDir) { + this.outputDir = outputDir; + } + + @Override + public Long getMinSplitSize() { + return null; + } + + @Override + public String getInputformat() { + return getInputformatClass().getName(); + } + + public Class<? extends InputFormat> getInputformatClass() { + return CombineHiveInputFormat.class; + } + + @Override + public boolean isGatheringStats() { + return false; + } + + public boolean hasDynamicPartitions() { + return this.hasDynamicPartitions; + } + + public void setHasDynamicPartitions(boolean hasDynamicPartitions) { + this.hasDynamicPartitions = hasDynamicPartitions; + } + + @Override + public void resolveDynamicPartitionStoredAsSubDirsMerge(HiveConf conf, + Path path, + TableDesc tblDesc, + ArrayList<String> aliases, + PartitionDesc partDesc) { + super.resolveDynamicPartitionStoredAsSubDirsMerge(conf, path, tblDesc, + aliases, partDesc); + // set internal input format for all partition descriptors + partDesc.setInputFileFormatClass(internalInputFormat); + // Add the DP path to the list of input paths + inputPaths.add(path); + } + + /** + * alter table ... concatenate + * <p/> + * If it is skewed table, use subdirectories in inputpaths. + */ + public void resolveConcatenateMerge(HiveConf conf) { + isListBucketingAlterTableConcatenate = + ((listBucketingCtx == null) ? false : listBucketingCtx + .isSkewedStoredAsDir()); + LOG.info("isListBucketingAlterTableConcatenate : " + + isListBucketingAlterTableConcatenate); + if (isListBucketingAlterTableConcatenate) { + // use sub-dir as inputpath. + assert ((this.inputPaths != null) && (this.inputPaths.size() == 1)) : + "alter table ... concatenate should only have one" + + " directory inside inputpaths"; + Path dirPath = inputPaths.get(0); + try { + FileSystem inpFs = dirPath.getFileSystem(conf); + FileStatus[] status = + HiveStatsUtils.getFileStatusRecurse(dirPath, listBucketingCtx + .getSkewedColNames().size(), inpFs); + List<Path> newInputPath = new ArrayList<Path>(); + boolean succeed = true; + for (int i = 0; i < status.length; ++i) { + if (status[i].isDir()) { + // Add the lb path to the list of input paths + newInputPath.add(status[i].getPath()); + } else { + // find file instead of dir. dont change inputpath + succeed = false; + } + } + assert (succeed || ((!succeed) && newInputPath.isEmpty())) : + "This partition has " + + " inconsistent file structure: " + + + "it is stored-as-subdir and expected all files in the same depth" + + " of subdirectories."; + if (succeed) { + inputPaths.clear(); + inputPaths.addAll(newInputPath); + } + } catch (IOException e) { + String msg = + "Fail to get filesystem for directory name : " + dirPath.toUri(); + throw new RuntimeException(msg, e); + } + + } + } + + /** + * @return the listBucketingCtx + */ + public ListBucketingCtx getListBucketingCtx() { + return listBucketingCtx; + } + + /** + * @param listBucketingCtx the listBucketingCtx to set + */ + public void setListBucketingCtx(ListBucketingCtx listBucketingCtx) { + this.listBucketingCtx = listBucketingCtx; + } + + /** + * @return the isListBucketingAlterTableConcatenate + */ + public boolean isListBucketingAlterTableConcatenate() { + return isListBucketingAlterTableConcatenate; + } + + @Explain(displayName = "input format") + public String getSourceTableInputFormat() { + return srcTblInputFormat; + } + + public void setSourceTableInputFormat(String srcTblInputFormat) { + this.srcTblInputFormat = srcTblInputFormat; + } + + @Explain(displayName = "merge level") + public String getMergeLevel() { + if (srcTblInputFormat != null) { + if (srcTblInputFormat.equals(OrcInputFormat.class.getName())) { + return "stripe"; + } else if (srcTblInputFormat.equals(RCFileInputFormat.class.getName())) { + return "block"; + } + } + return null; + } +} Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeInputFormat.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeInputFormat.java?rev=1624688&r1=1624687&r2=1624688&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeInputFormat.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeInputFormat.java Sat Sep 13 00:39:26 2014 @@ -18,16 +18,16 @@ package org.apache.hadoop.hive.ql.io.orc; -import java.io.IOException; - -import org.apache.hadoop.hive.ql.io.merge.MergeInputFormat; +import org.apache.hadoop.hive.ql.io.merge.MergeFileInputFormat; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; -public class OrcFileStripeMergeInputFormat extends MergeInputFormat { +import java.io.IOException; + +public class OrcFileStripeMergeInputFormat extends MergeFileInputFormat { @Override public RecordReader<OrcFileKeyWrapper, OrcFileValueWrapper> getRecordReader( Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java?rev=1624688&r1=1624687&r2=1624688&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java Sat Sep 13 00:39:26 2014 @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io.orc import java.io.IOException; import java.nio.ByteBuffer; +import java.util.List; /** * The interface for writing ORC files. @@ -72,4 +73,30 @@ public interface Writer { * @return the offset that would be a valid end location for an ORC file */ long writeIntermediateFooter() throws IOException; + + /** + * Fast stripe append to ORC file. This interface is used for fast ORC file + * merge with other ORC files. When merging, the file to be merged should pass + * stripe in binary form along with stripe information and stripe statistics. + * After appending last stripe of a file, use appendUserMetadata() to append + * any user metadata. + * @param stripe - stripe as byte array + * @param offset - offset within byte array + * @param length - length of stripe within byte array + * @param stripeInfo - stripe information + * @param stripeStatistics - stripe statistics (Protobuf objects can be + * merged directly) + * @throws IOException + */ + public void appendStripe(byte[] stripe, int offset, int length, + StripeInformation stripeInfo, + OrcProto.StripeStatistics stripeStatistics) throws IOException; + + /** + * When fast stripe append is used for merging ORC stripes, after appending + * the last stripe from a file, this interface must be used to merge any + * user metadata. + * @param userMetadata - user metadata + */ + public void appendUserMetadata(List<OrcProto.UserMetadataItem> userMetadata); } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1624688&r1=1624687&r2=1624688&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Sat Sep 13 00:39:26 2014 @@ -29,6 +29,10 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; +import com.google.protobuf.CodedOutputStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -74,10 +78,17 @@ import org.apache.hadoop.hive.serde2.typ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; -import com.google.protobuf.ByteString; -import com.google.protobuf.CodedOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.lang.management.ManagementFactory; +import java.nio.ByteBuffer; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import static com.google.common.base.Preconditions.checkArgument; /** * An ORC file writer. The file is divided into stripes, which is the natural @@ -2316,17 +2327,19 @@ class WriterImpl implements Writer, Memo return rawWriter.getPos(); } - void appendStripe(byte[] stripe, StripeInformation stripeInfo, - OrcProto.StripeStatistics stripeStatistics) throws IOException { - appendStripe(stripe, 0, stripe.length, stripeInfo, stripeStatistics); - } - - void appendStripe(byte[] stripe, int offset, int length, + @Override + public void appendStripe(byte[] stripe, int offset, int length, StripeInformation stripeInfo, OrcProto.StripeStatistics stripeStatistics) throws IOException { + checkArgument(stripe != null, "Stripe must not be null"); + checkArgument(length <= stripe.length, + "Specified length must not be greater specified array length"); + checkArgument(stripeInfo != null, "Stripe information must not be null"); + checkArgument(stripeStatistics != null, + "Stripe statistics must not be null"); + getStream(); long start = rawWriter.getPos(); - long stripeLen = length; long availBlockSpace = blockSize - (start % blockSize); @@ -2382,7 +2395,8 @@ class WriterImpl implements Writer, Memo } } - void appendUserMetadata(List<UserMetadataItem> userMetadata) { + @Override + public void appendUserMetadata(List<UserMetadataItem> userMetadata) { if (userMetadata != null) { for (UserMetadataItem item : userMetadata) { this.userMetadata.put(item.getName(), item.getValue()); Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java?rev=1624688&r1=1624687&r2=1624688&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java Sat Sep 13 00:39:26 2014 @@ -20,14 +20,14 @@ package org.apache.hadoop.hive.ql.io.rcf import java.io.IOException; -import org.apache.hadoop.hive.ql.io.merge.MergeInputFormat; +import org.apache.hadoop.hive.ql.io.merge.MergeFileInputFormat; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; -public class RCFileBlockMergeInputFormat extends MergeInputFormat { +public class RCFileBlockMergeInputFormat extends MergeFileInputFormat { @Override public RecordReader<RCFileKeyBufferWrapper, RCFileValueBufferWrapper> Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1624688&r1=1624687&r2=1624688&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Sat Sep 13 00:39:26 2014 @@ -18,20 +18,6 @@ package org.apache.hadoop.hive.ql.optimizer; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Set; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; @@ -53,6 +39,8 @@ import org.apache.hadoop.hive.ql.exec.No import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.OperatorUtils; +import org.apache.hadoop.hive.ql.exec.OrcFileMergeOperator; +import org.apache.hadoop.hive.ql.exec.RCFileMergeOperator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; @@ -65,8 +53,10 @@ import org.apache.hadoop.hive.ql.exec.mr import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.io.RCFileInputFormat; -import org.apache.hadoop.hive.ql.io.merge.MergeWork; +import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; +import org.apache.hadoop.hive.ql.io.orc.OrcFileStripeMergeInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileBlockMergeInputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRUnionCtx; @@ -88,6 +78,7 @@ import org.apache.hadoop.hive.ql.plan.Co import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.FetchWork; +import org.apache.hadoop.hive.ql.plan.FileMergeDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc; import org.apache.hadoop.hive.ql.plan.LoadFileDesc; @@ -96,8 +87,10 @@ import org.apache.hadoop.hive.ql.plan.Ma import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.OrcFileMergeDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.RCFileMergeDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.StatsWork; @@ -106,6 +99,22 @@ import org.apache.hadoop.hive.ql.plan.Ta import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputFormat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; /** * General utility common functions for the Processor to convert operator into @@ -1250,33 +1259,20 @@ public final class GenMapRedUtils { (conf.getBoolVar(ConfVars.HIVEMERGEORCFILESTRIPELEVEL) && fsInputDesc.getTableInfo().getInputFileFormatClass().equals(OrcInputFormat.class))) { - // Check if InputFormatClass is valid - final String inputFormatClass; - if (fsInputDesc.getTableInfo().getInputFileFormatClass().equals(RCFileInputFormat.class)) { - inputFormatClass = conf.getVar(ConfVars.HIVEMERGEINPUTFORMATBLOCKLEVEL); + cplan = GenMapRedUtils.createMergeTask(fsInputDesc, finalName, + dpCtx != null && dpCtx.getNumDPCols() > 0); + if (conf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + work = new TezWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID)); + cplan.setName("Tez Merge File Work"); + ((TezWork) work).add(cplan); } else { - inputFormatClass = conf.getVar(ConfVars.HIVEMERGEINPUTFORMATSTRIPELEVEL); - } - try { - Class c = Class.forName(inputFormatClass); - - if(fsInputDesc.getTableInfo().getInputFileFormatClass().equals(OrcInputFormat.class)) { - LOG.info("OrcFile format - Using stripe level merge"); - } else { - LOG.info("RCFile format- Using block level merge"); - } - cplan = GenMapRedUtils.createMergeTask(fsInputDesc, finalName, - dpCtx != null && dpCtx.getNumDPCols() > 0); work = cplan; - } catch (ClassNotFoundException e) { - String msg = "Illegal input format class: " + inputFormatClass; - throw new SemanticException(msg); } } else { cplan = createMRWorkForMergingFiles(conf, tsMerge, fsInputDesc); if (conf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { work = new TezWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID)); - cplan.setName("Merge"); + cplan.setName("Tez Merge File Work"); ((TezWork)work).add(cplan); } else { work = new MapredWork(); @@ -1489,6 +1485,7 @@ public final class GenMapRedUtils { * * @param fsInputDesc * @param finalName + * @param inputFormatClass * @return MergeWork if table is stored as RCFile or ORCFile, * null otherwise */ @@ -1498,38 +1495,62 @@ public final class GenMapRedUtils { Path inputDir = fsInputDesc.getFinalDirName(); TableDesc tblDesc = fsInputDesc.getTableInfo(); - if (tblDesc.getInputFileFormatClass().equals(RCFileInputFormat.class) || - tblDesc.getInputFileFormatClass().equals(OrcInputFormat.class)) { - ArrayList<Path> inputDirs = new ArrayList<Path>(1); - ArrayList<String> inputDirstr = new ArrayList<String>(1); - if (!hasDynamicPartitions - && !GenMapRedUtils.isSkewedStoredAsDirs(fsInputDesc)) { - inputDirs.add(inputDir); - inputDirstr.add(inputDir.toString()); - } - - MergeWork work = new MergeWork(inputDirs, finalName, - hasDynamicPartitions, fsInputDesc.getDynPartCtx(), - tblDesc.getInputFileFormatClass()); - LinkedHashMap<String, ArrayList<String>> pathToAliases = - new LinkedHashMap<String, ArrayList<String>>(); - pathToAliases.put(inputDir.toString(), (ArrayList<String>) inputDirstr.clone()); - work.setMapperCannotSpanPartns(true); - work.setPathToAliases(pathToAliases); - work.setAliasToWork( - new LinkedHashMap<String, Operator<? extends OperatorDesc>>()); - if (hasDynamicPartitions - || GenMapRedUtils.isSkewedStoredAsDirs(fsInputDesc)) { - work.getPathToPartitionInfo().put(inputDir.toString(), - new PartitionDesc(tblDesc, null)); - } - work.setListBucketingCtx(fsInputDesc.getLbCtx()); + List<Path> inputDirs = new ArrayList<Path>(1); + ArrayList<String> inputDirstr = new ArrayList<String>(1); + // this will be populated by MergeFileWork.resolveDynamicPartitionStoredAsSubDirsMerge + // in case of dynamic partitioning and list bucketing + if (!hasDynamicPartitions && + !GenMapRedUtils.isSkewedStoredAsDirs(fsInputDesc)) { + inputDirs.add(inputDir); + } + inputDirstr.add(inputDir.toString()); + + // internal input format class for CombineHiveInputFormat + final Class<? extends InputFormat> internalIFClass; + if (tblDesc.getInputFileFormatClass().equals(RCFileInputFormat.class)) { + internalIFClass = RCFileBlockMergeInputFormat.class; + } else if (tblDesc.getInputFileFormatClass().equals(OrcInputFormat.class)) { + internalIFClass = OrcFileStripeMergeInputFormat.class; + } else { + throw new SemanticException("createMergeTask called on a table with file" + + " format other than RCFile or ORCFile"); + } - return work; + // create the merge file work + MergeFileWork work = new MergeFileWork(inputDirs, finalName, + hasDynamicPartitions, tblDesc.getInputFileFormatClass().getName()); + LinkedHashMap<String, ArrayList<String>> pathToAliases = + new LinkedHashMap<String, ArrayList<String>>(); + pathToAliases.put(inputDir.toString(), inputDirstr); + work.setMapperCannotSpanPartns(true); + work.setPathToAliases(pathToAliases); + PartitionDesc pDesc = new PartitionDesc(tblDesc, null); + pDesc.setInputFileFormatClass(internalIFClass); + work.getPathToPartitionInfo().put(inputDir.toString(), pDesc); + work.setListBucketingCtx(fsInputDesc.getLbCtx()); + + // create alias to work which contains the merge operator + LinkedHashMap<String, Operator<? extends OperatorDesc>> aliasToWork = + new LinkedHashMap<String, Operator<? extends OperatorDesc>>(); + Operator<? extends OperatorDesc> mergeOp = null; + final FileMergeDesc fmd; + if (tblDesc.getInputFileFormatClass().equals(RCFileInputFormat.class)) { + fmd = new RCFileMergeDesc(); + } else { + fmd = new OrcFileMergeDesc(); } + fmd.setDpCtx(fsInputDesc.getDynPartCtx()); + fmd.setOutputPath(finalName); + fmd.setHasDynamicPartitions(work.hasDynamicPartitions()); + fmd.setListBucketingAlterTableConcatenate(work.isListBucketingAlterTableConcatenate()); + int lbLevel = work.getListBucketingCtx() == null ? 0 : + work.getListBucketingCtx().calculateListBucketingLevel(); + fmd.setListBucketingDepth(lbLevel); + mergeOp = OperatorFactory.get(fmd); + aliasToWork.put(inputDir.toString(), mergeOp); + work.setAliasToWork(aliasToWork); - throw new SemanticException("createMergeTask called on a table with file" - + " format other than RCFile or ORCFile"); + return work; } /** Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java?rev=1624688&r1=1624687&r2=1624688&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java Sat Sep 13 00:39:26 2014 @@ -404,6 +404,9 @@ public class TezCompiler extends TaskCom } private void setInputFormat(MapWork work, Operator<? extends OperatorDesc> op) { + if (op == null) { + return; + } if (op.isUseBucketizedHiveInputFormat()) { work.setUseBucketizedHiveInputFormat(true); return; Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java?rev=1624688&view=auto ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java (added) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java Sat Sep 13 00:39:26 2014 @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.plan; + +import org.apache.hadoop.fs.Path; + +/** + * + */ +public class FileMergeDesc extends AbstractOperatorDesc { + private DynamicPartitionCtx dpCtx; + private Path outputPath; + private int listBucketingDepth; + private boolean hasDynamicPartitions; + private boolean isListBucketingAlterTableConcatenate; + + public FileMergeDesc(DynamicPartitionCtx dynPartCtx, Path outputDir) { + this.dpCtx = dynPartCtx; + this.outputPath = outputDir; + } + + public DynamicPartitionCtx getDpCtx() { + return dpCtx; + } + + public void setDpCtx(DynamicPartitionCtx dpCtx) { + this.dpCtx = dpCtx; + } + + public Path getOutputPath() { + return outputPath; + } + + public void setOutputPath(Path outputPath) { + this.outputPath = outputPath; + } + + public int getListBucketingDepth() { + return listBucketingDepth; + } + + public void setListBucketingDepth(int listBucketingDepth) { + this.listBucketingDepth = listBucketingDepth; + } + + public boolean hasDynamicPartitions() { + return hasDynamicPartitions; + } + + public void setHasDynamicPartitions(boolean hasDynamicPartitions) { + this.hasDynamicPartitions = hasDynamicPartitions; + } + + public boolean isListBucketingAlterTableConcatenate() { + return isListBucketingAlterTableConcatenate; + } + + public void setListBucketingAlterTableConcatenate(boolean isListBucketingAlterTableConcatenate) { + this.isListBucketingAlterTableConcatenate = isListBucketingAlterTableConcatenate; + } +} Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/OrcFileMergeDesc.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/OrcFileMergeDesc.java?rev=1624688&view=auto ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/OrcFileMergeDesc.java (added) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/OrcFileMergeDesc.java Sat Sep 13 00:39:26 2014 @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.plan; + +import org.apache.hadoop.fs.Path; + +/** + * ORC fast file merge operator descriptor. + */ +@Explain(displayName = "ORC File Merge Operator") +public class OrcFileMergeDesc extends FileMergeDesc { + + public OrcFileMergeDesc() { + this(null, null); + } + + public OrcFileMergeDesc(DynamicPartitionCtx dpCtx, Path outPath) { + super(dpCtx, outPath); + } +} Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/RCFileMergeDesc.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/RCFileMergeDesc.java?rev=1624688&view=auto ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/RCFileMergeDesc.java (added) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/RCFileMergeDesc.java Sat Sep 13 00:39:26 2014 @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.plan; + +import org.apache.hadoop.fs.Path; + +/** + * Descriptor for Fast file merge RC file operator. + */ +@Explain(displayName = "RCFile Merge Operator") +public class RCFileMergeDesc extends FileMergeDesc { + + public RCFileMergeDesc() { + this(null, null); + } + + public RCFileMergeDesc(DynamicPartitionCtx dpCtx, Path outPath) { + super(dpCtx, outPath); + } + +} Modified: hive/trunk/ql/src/test/queries/clientpositive/list_bucket_dml_8.q URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/list_bucket_dml_8.q?rev=1624688&r1=1624687&r2=1624688&view=diff ============================================================================== --- hive/trunk/ql/src/test/queries/clientpositive/list_bucket_dml_8.q (original) +++ hive/trunk/ql/src/test/queries/clientpositive/list_bucket_dml_8.q Sat Sep 13 00:39:26 2014 @@ -69,7 +69,6 @@ show partitions list_bucketing_dynamic_p desc formatted list_bucketing_dynamic_part partition (ds='2008-04-08', hr='a1'); desc formatted list_bucketing_dynamic_part partition (ds='2008-04-08', hr='b1'); -set hive.merge.current.job.concatenate.list.bucketing=true; -- concatenate the partition and it will merge files alter table list_bucketing_dynamic_part partition (ds='2008-04-08', hr='b1') concatenate; Modified: hive/trunk/ql/src/test/queries/clientpositive/orc_merge1.q URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/orc_merge1.q?rev=1624688&r1=1624687&r2=1624688&view=diff ============================================================================== --- hive/trunk/ql/src/test/queries/clientpositive/orc_merge1.q (original) +++ hive/trunk/ql/src/test/queries/clientpositive/orc_merge1.q Sat Sep 13 00:39:26 2014 @@ -1,51 +1,87 @@ set hive.merge.orcfile.stripe.level=false; set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict; +set hive.optimize.sort.dynamic.partition=false; +set mapred.min.split.size=1000; +set mapred.max.split.size=2000; +set tez.grouping.min-size=1000; +set tez.grouping.max-size=2000; +set hive.merge.tezfiles=false; +set hive.merge.mapfiles=false; +set hive.merge.mapredfiles=false; DROP TABLE orcfile_merge1; DROP TABLE orcfile_merge1b; +DROP TABLE orcfile_merge1c; CREATE TABLE orcfile_merge1 (key INT, value STRING) PARTITIONED BY (ds STRING, part STRING) STORED AS ORC; CREATE TABLE orcfile_merge1b (key INT, value STRING) PARTITIONED BY (ds STRING, part STRING) STORED AS ORC; +CREATE TABLE orcfile_merge1c (key INT, value STRING) + PARTITIONED BY (ds STRING, part STRING) STORED AS ORC; --- Use non stipe-level merge +-- merge disabled EXPLAIN INSERT OVERWRITE TABLE orcfile_merge1 PARTITION (ds='1', part) - SELECT key, value, PMOD(HASH(key), 100) as part + SELECT key, value, PMOD(HASH(key), 2) as part FROM src; INSERT OVERWRITE TABLE orcfile_merge1 PARTITION (ds='1', part) - SELECT key, value, PMOD(HASH(key), 100) as part + SELECT key, value, PMOD(HASH(key), 2) as part FROM src; -DESC FORMATTED orcfile_merge1 partition (ds='1', part='50'); +DESC FORMATTED orcfile_merge1 partition (ds='1', part='0'); -set hive.merge.orcfile.stripe.level=true; +set hive.merge.tezfiles=true; +set hive.merge.mapfiles=true; +set hive.merge.mapredfiles=true; +-- auto-merge slow way EXPLAIN INSERT OVERWRITE TABLE orcfile_merge1b PARTITION (ds='1', part) - SELECT key, value, PMOD(HASH(key), 100) as part + SELECT key, value, PMOD(HASH(key), 2) as part FROM src; INSERT OVERWRITE TABLE orcfile_merge1b PARTITION (ds='1', part) - SELECT key, value, PMOD(HASH(key), 100) as part + SELECT key, value, PMOD(HASH(key), 2) as part + FROM src; + +DESC FORMATTED orcfile_merge1b partition (ds='1', part='0'); + +set hive.merge.orcfile.stripe.level=true; +-- auto-merge fast way +EXPLAIN + INSERT OVERWRITE TABLE orcfile_merge1c PARTITION (ds='1', part) + SELECT key, value, PMOD(HASH(key), 2) as part + FROM src; + +INSERT OVERWRITE TABLE orcfile_merge1c PARTITION (ds='1', part) + SELECT key, value, PMOD(HASH(key), 2) as part FROM src; -DESC FORMATTED orcfile_merge1 partition (ds='1', part='50'); +DESC FORMATTED orcfile_merge1c partition (ds='1', part='0'); +set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; -- Verify SELECT SUM(HASH(c)) FROM ( SELECT TRANSFORM(*) USING 'tr \t _' AS (c) FROM orcfile_merge1 WHERE ds='1' ) t; -set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; - SELECT SUM(HASH(c)) FROM ( SELECT TRANSFORM(*) USING 'tr \t _' AS (c) FROM orcfile_merge1b WHERE ds='1' ) t; +SELECT SUM(HASH(c)) FROM ( + SELECT TRANSFORM(*) USING 'tr \t _' AS (c) + FROM orcfile_merge1c WHERE ds='1' +) t; + +select count(*) from orcfile_merge1; +select count(*) from orcfile_merge1b; +select count(*) from orcfile_merge1c; + DROP TABLE orcfile_merge1; DROP TABLE orcfile_merge1b; +DROP TABLE orcfile_merge1c; Added: hive/trunk/ql/src/test/queries/clientpositive/orc_merge5.q URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/orc_merge5.q?rev=1624688&view=auto ============================================================================== --- hive/trunk/ql/src/test/queries/clientpositive/orc_merge5.q (added) +++ hive/trunk/ql/src/test/queries/clientpositive/orc_merge5.q Sat Sep 13 00:39:26 2014 @@ -0,0 +1,61 @@ +-- SORT_QUERY_RESULTS + +create table orc_merge5 (userid bigint, string1 string, subtype double, decimal1 decimal, ts timestamp) stored as orc; +create table orc_merge5b (userid bigint, string1 string, subtype double, decimal1 decimal, ts timestamp) stored as orc; + +load data local inpath '../../data/files/orc_split_elim.orc' into table orc_merge5; + +SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; +SET mapred.min.split.size=1000; +SET mapred.max.split.size=50000; +SET hive.optimize.index.filter=true; +set hive.merge.orcfile.stripe.level=false; +set hive.merge.tezfiles=false; +set hive.merge.mapfiles=false; +set hive.merge.mapredfiles=false; +set hive.compute.splits.in.am=true; +set tez.grouping.min-size=1000; +set tez.grouping.max-size=50000; + +-- 3 mappers +explain insert overwrite table orc_merge5b select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13; +insert overwrite table orc_merge5b select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13; + +-- 3 files total +analyze table orc_merge5b compute statistics noscan; +desc formatted orc_merge5b; +select * from orc_merge5b; + +set hive.merge.orcfile.stripe.level=true; +set hive.merge.tezfiles=true; +set hive.merge.mapfiles=true; +set hive.merge.mapredfiles=true; + +-- 3 mappers +explain insert overwrite table orc_merge5b select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13; +insert overwrite table orc_merge5b select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13; + +-- 1 file after merging +analyze table orc_merge5b compute statistics noscan; +desc formatted orc_merge5b; +select * from orc_merge5b; + +set hive.merge.orcfile.stripe.level=false; +set hive.merge.tezfiles=false; +set hive.merge.mapfiles=false; +set hive.merge.mapredfiles=false; + +insert overwrite table orc_merge5b select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13; +analyze table orc_merge5b compute statistics noscan; +desc formatted orc_merge5b; +select * from orc_merge5b; + +set hive.merge.orcfile.stripe.level=true; +explain alter table orc_merge5b concatenate; +alter table orc_merge5b concatenate; + +-- 1 file after merging +analyze table orc_merge5b compute statistics noscan; +desc formatted orc_merge5b; +select * from orc_merge5b; + Added: hive/trunk/ql/src/test/queries/clientpositive/orc_merge6.q URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/orc_merge6.q?rev=1624688&view=auto ============================================================================== --- hive/trunk/ql/src/test/queries/clientpositive/orc_merge6.q (added) +++ hive/trunk/ql/src/test/queries/clientpositive/orc_merge6.q Sat Sep 13 00:39:26 2014 @@ -0,0 +1,78 @@ +-- SORT_QUERY_RESULTS + +-- orc file merge tests for static partitions +create table orc_merge5 (userid bigint, string1 string, subtype double, decimal1 decimal, ts timestamp) stored as orc; +create table orc_merge5a (userid bigint, string1 string, subtype double, decimal1 decimal, ts timestamp) partitioned by (year string, hour int) stored as orc; + +load data local inpath '../../data/files/orc_split_elim.orc' into table orc_merge5; + +SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; +SET mapred.min.split.size=1000; +SET mapred.max.split.size=50000; +SET hive.optimize.index.filter=true; +set hive.merge.orcfile.stripe.level=false; +set hive.merge.tezfiles=false; +set hive.merge.mapfiles=false; +set hive.merge.mapredfiles=false; +set hive.compute.splits.in.am=true; +set tez.grouping.min-size=1000; +set tez.grouping.max-size=50000; + +-- 3 mappers +explain insert overwrite table orc_merge5a partition (year="2000",hour=24) select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13; +insert overwrite table orc_merge5a partition (year="2000",hour=24) select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13; +insert overwrite table orc_merge5a partition (year="2001",hour=24) select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13; + +-- 3 files total +analyze table orc_merge5a partition(year="2000",hour=24) compute statistics noscan; +analyze table orc_merge5a partition(year="2001",hour=24) compute statistics noscan; +desc formatted orc_merge5a partition(year="2000",hour=24); +desc formatted orc_merge5a partition(year="2001",hour=24); +show partitions orc_merge5a; +select * from orc_merge5a; + +set hive.merge.orcfile.stripe.level=true; +set hive.merge.tezfiles=true; +set hive.merge.mapfiles=true; +set hive.merge.mapredfiles=true; + +-- 3 mappers +explain insert overwrite table orc_merge5a partition (year="2000",hour=24) select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13; +insert overwrite table orc_merge5a partition (year="2000",hour=24) select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13; +insert overwrite table orc_merge5a partition (year="2001",hour=24) select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13; + +-- 1 file after merging +analyze table orc_merge5a partition(year="2000",hour=24) compute statistics noscan; +analyze table orc_merge5a partition(year="2001",hour=24) compute statistics noscan; +desc formatted orc_merge5a partition(year="2000",hour=24); +desc formatted orc_merge5a partition(year="2001",hour=24); +show partitions orc_merge5a; +select * from orc_merge5a; + +set hive.merge.orcfile.stripe.level=false; +set hive.merge.tezfiles=false; +set hive.merge.mapfiles=false; +set hive.merge.mapredfiles=false; + +insert overwrite table orc_merge5a partition (year="2000",hour=24) select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13; +insert overwrite table orc_merge5a partition (year="2001",hour=24) select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13; +analyze table orc_merge5a partition(year="2000",hour=24) compute statistics noscan; +analyze table orc_merge5a partition(year="2001",hour=24) compute statistics noscan; +desc formatted orc_merge5a partition(year="2000",hour=24); +desc formatted orc_merge5a partition(year="2001",hour=24); +show partitions orc_merge5a; +select * from orc_merge5a; + +set hive.merge.orcfile.stripe.level=true; +explain alter table orc_merge5a partition(year="2000",hour=24) concatenate; +alter table orc_merge5a partition(year="2000",hour=24) concatenate; +alter table orc_merge5a partition(year="2001",hour=24) concatenate; + +-- 1 file after merging +analyze table orc_merge5a partition(year="2000",hour=24) compute statistics noscan; +analyze table orc_merge5a partition(year="2001",hour=24) compute statistics noscan; +desc formatted orc_merge5a partition(year="2000",hour=24); +desc formatted orc_merge5a partition(year="2001",hour=24); +show partitions orc_merge5a; +select * from orc_merge5a; + Added: hive/trunk/ql/src/test/queries/clientpositive/orc_merge7.q URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/orc_merge7.q?rev=1624688&view=auto ============================================================================== --- hive/trunk/ql/src/test/queries/clientpositive/orc_merge7.q (added) +++ hive/trunk/ql/src/test/queries/clientpositive/orc_merge7.q Sat Sep 13 00:39:26 2014 @@ -0,0 +1,82 @@ +-- SORT_QUERY_RESULTS + +-- orc merge file tests for dynamic partition case + +create table orc_merge5 (userid bigint, string1 string, subtype double, decimal1 decimal, ts timestamp) stored as orc; +create table orc_merge5a (userid bigint, string1 string, subtype double, decimal1 decimal, ts timestamp) partitioned by (st double) stored as orc; + +load data local inpath '../../data/files/orc_split_elim.orc' into table orc_merge5; + +SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; +SET mapred.min.split.size=1000; +SET mapred.max.split.size=50000; +SET hive.optimize.index.filter=true; +set hive.merge.orcfile.stripe.level=false; +set hive.merge.tezfiles=false; +set hive.merge.mapfiles=false; +set hive.merge.mapredfiles=false; +set hive.compute.splits.in.am=true; +set tez.grouping.min-size=1000; +set tez.grouping.max-size=50000; +set hive.exec.dynamic.partition=true; +set hive.exec.dynamic.partition.mode=nonstrict; +set hive.optimize.sort.dynamic.partition=false; + +-- 3 mappers +explain insert overwrite table orc_merge5a partition (st) select userid,string1,subtype,decimal1,ts,subtype from orc_merge5; +insert overwrite table orc_merge5a partition (st) select userid,string1,subtype,decimal1,ts,subtype from orc_merge5; +insert overwrite table orc_merge5a partition (st) select userid,string1,subtype,decimal1,ts,subtype from orc_merge5; + +-- 3 files total +analyze table orc_merge5a partition(st=80.0) compute statistics noscan; +analyze table orc_merge5a partition(st=0.8) compute statistics noscan; +desc formatted orc_merge5a partition(st=80.0); +desc formatted orc_merge5a partition(st=0.8); +show partitions orc_merge5a; +select * from orc_merge5a where userid<=13; + +set hive.merge.orcfile.stripe.level=true; +set hive.merge.tezfiles=true; +set hive.merge.mapfiles=true; +set hive.merge.mapredfiles=true; + +-- 3 mappers +explain insert overwrite table orc_merge5a partition (st) select userid,string1,subtype,decimal1,ts,subtype from orc_merge5; +insert overwrite table orc_merge5a partition (st) select userid,string1,subtype,decimal1,ts,subtype from orc_merge5; +insert overwrite table orc_merge5a partition (st) select userid,string1,subtype,decimal1,ts,subtype from orc_merge5; + +-- 1 file after merging +analyze table orc_merge5a partition(st=80.0) compute statistics noscan; +analyze table orc_merge5a partition(st=0.8) compute statistics noscan; +desc formatted orc_merge5a partition(st=80.0); +desc formatted orc_merge5a partition(st=0.8); +show partitions orc_merge5a; +select * from orc_merge5a where userid<=13; + +set hive.merge.orcfile.stripe.level=false; +set hive.merge.tezfiles=false; +set hive.merge.mapfiles=false; +set hive.merge.mapredfiles=false; + +insert overwrite table orc_merge5a partition (st) select userid,string1,subtype,decimal1,ts,subtype from orc_merge5; +insert overwrite table orc_merge5a partition (st) select userid,string1,subtype,decimal1,ts,subtype from orc_merge5; +analyze table orc_merge5a partition(st=80.0) compute statistics noscan; +analyze table orc_merge5a partition(st=0.8) compute statistics noscan; +desc formatted orc_merge5a partition(st=80.0); +desc formatted orc_merge5a partition(st=0.8); +show partitions orc_merge5a; +select * from orc_merge5a where userid<=13; + +set hive.merge.orcfile.stripe.level=true; +explain alter table orc_merge5a partition(st=80.0) concatenate; +alter table orc_merge5a partition(st=80.0) concatenate; +alter table orc_merge5a partition(st=0.8) concatenate; + +-- 1 file after merging +analyze table orc_merge5a partition(st=80.0) compute statistics noscan; +analyze table orc_merge5a partition(st=0.8) compute statistics noscan; +desc formatted orc_merge5a partition(st=80.0); +desc formatted orc_merge5a partition(st=0.8); +show partitions orc_merge5a; +select * from orc_merge5a where userid<=13; + Modified: hive/trunk/ql/src/test/results/clientpositive/infer_bucket_sort_dyn_part.q.out URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/infer_bucket_sort_dyn_part.q.out?rev=1624688&r1=1624687&r2=1624688&view=diff ============================================================================== --- hive/trunk/ql/src/test/results/clientpositive/infer_bucket_sort_dyn_part.q.out (original) +++ hive/trunk/ql/src/test/results/clientpositive/infer_bucket_sort_dyn_part.q.out Sat Sep 13 00:39:26 2014 @@ -566,12 +566,16 @@ STAGE PLANS: Stats-Aggr Operator Stage: Stage-4 - Merge Work + Merge File Operator + Map Operator Tree: + RCFile Merge Operator merge level: block input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat Stage: Stage-6 - Merge Work + Merge File Operator + Map Operator Tree: + RCFile Merge Operator merge level: block input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat Modified: hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_10.q.out URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_10.q.out?rev=1624688&r1=1624687&r2=1624688&view=diff ============================================================================== Files hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_10.q.out (original) and hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_10.q.out Sat Sep 13 00:39:26 2014 differ Modified: hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_4.q.out URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_4.q.out?rev=1624688&r1=1624687&r2=1624688&view=diff ============================================================================== Files hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_4.q.out (original) and hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_4.q.out Sat Sep 13 00:39:26 2014 differ Modified: hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_6.q.out URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_6.q.out?rev=1624688&r1=1624687&r2=1624688&view=diff ============================================================================== Files hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_6.q.out (original) and hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_6.q.out Sat Sep 13 00:39:26 2014 differ Modified: hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_7.q.out URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_7.q.out?rev=1624688&r1=1624687&r2=1624688&view=diff ============================================================================== Files hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_7.q.out (original) and hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_7.q.out Sat Sep 13 00:39:26 2014 differ Modified: hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_9.q.out URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_9.q.out?rev=1624688&r1=1624687&r2=1624688&view=diff ============================================================================== Files hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_9.q.out (original) and hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_9.q.out Sat Sep 13 00:39:26 2014 differ Modified: hive/trunk/ql/src/test/results/clientpositive/merge_dynamic_partition4.q.out URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/merge_dynamic_partition4.q.out?rev=1624688&r1=1624687&r2=1624688&view=diff ============================================================================== --- hive/trunk/ql/src/test/results/clientpositive/merge_dynamic_partition4.q.out (original) +++ hive/trunk/ql/src/test/results/clientpositive/merge_dynamic_partition4.q.out Sat Sep 13 00:39:26 2014 @@ -202,12 +202,16 @@ STAGE PLANS: Stats-Aggr Operator Stage: Stage-3 - Merge Work + Merge File Operator + Map Operator Tree: + RCFile Merge Operator merge level: block input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat Stage: Stage-5 - Merge Work + Merge File Operator + Map Operator Tree: + RCFile Merge Operator merge level: block input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat Modified: hive/trunk/ql/src/test/results/clientpositive/merge_dynamic_partition5.q.out URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/merge_dynamic_partition5.q.out?rev=1624688&r1=1624687&r2=1624688&view=diff ============================================================================== --- hive/trunk/ql/src/test/results/clientpositive/merge_dynamic_partition5.q.out (original) +++ hive/trunk/ql/src/test/results/clientpositive/merge_dynamic_partition5.q.out Sat Sep 13 00:39:26 2014 @@ -176,12 +176,16 @@ STAGE PLANS: Stats-Aggr Operator Stage: Stage-3 - Merge Work + Merge File Operator + Map Operator Tree: + RCFile Merge Operator merge level: block input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat Stage: Stage-5 - Merge Work + Merge File Operator + Map Operator Tree: + RCFile Merge Operator merge level: block input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat Modified: hive/trunk/ql/src/test/results/clientpositive/orc_createas1.q.out URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/orc_createas1.q.out?rev=1624688&r1=1624687&r2=1624688&view=diff ============================================================================== --- hive/trunk/ql/src/test/results/clientpositive/orc_createas1.q.out (original) +++ hive/trunk/ql/src/test/results/clientpositive/orc_createas1.q.out Sat Sep 13 00:39:26 2014 @@ -111,12 +111,16 @@ STAGE PLANS: Stats-Aggr Operator Stage: Stage-3 - Merge Work + Merge File Operator + Map Operator Tree: + ORC File Merge Operator merge level: stripe input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat Stage: Stage-5 - Merge Work + Merge File Operator + Map Operator Tree: + ORC File Merge Operator merge level: stripe input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat @@ -270,12 +274,16 @@ STAGE PLANS: Stats-Aggr Operator Stage: Stage-3 - Merge Work + Merge File Operator + Map Operator Tree: + ORC File Merge Operator merge level: stripe input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat Stage: Stage-5 - Merge Work + Merge File Operator + Map Operator Tree: + ORC File Merge Operator merge level: stripe input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
