Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java?rev=1613740&r1=1613739&r2=1613740&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java Sat Jul 26 23:45:46 2014 @@ -144,6 +144,7 @@ import org.apache.hadoop.hive.serde2.typ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hive.common.util.AnnotationUtils; import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.NodeList; @@ -1613,14 +1614,14 @@ public final class FunctionRegistry { // the deterministic annotation declares return false; } - UDFType genericUDFType = genericUDF.getClass().getAnnotation(UDFType.class); + UDFType genericUDFType = AnnotationUtils.getAnnotation(genericUDF.getClass(), UDFType.class); if (genericUDFType != null && genericUDFType.deterministic() == false) { return false; } if (genericUDF instanceof GenericUDFBridge) { GenericUDFBridge bridge = (GenericUDFBridge) (genericUDF); - UDFType bridgeUDFType = bridge.getUdfClass().getAnnotation(UDFType.class); + UDFType bridgeUDFType = AnnotationUtils.getAnnotation(bridge.getUdfClass(), UDFType.class); if (bridgeUDFType != null && bridgeUDFType.deterministic() == false) { return false; } @@ -1638,14 +1639,14 @@ public final class FunctionRegistry { * Returns whether a GenericUDF is stateful or not. */ public static boolean isStateful(GenericUDF genericUDF) { - UDFType genericUDFType = genericUDF.getClass().getAnnotation(UDFType.class); + UDFType genericUDFType = AnnotationUtils.getAnnotation(genericUDF.getClass(), UDFType.class); if (genericUDFType != null && genericUDFType.stateful()) { return true; } if (genericUDF instanceof GenericUDFBridge) { GenericUDFBridge bridge = (GenericUDFBridge) genericUDF; - UDFType bridgeUDFType = bridge.getUdfClass().getAnnotation(UDFType.class); + UDFType bridgeUDFType = AnnotationUtils.getAnnotation(bridge.getUdfClass(), UDFType.class); if (bridgeUDFType != null && bridgeUDFType.stateful()) { return true; } @@ -1884,7 +1885,7 @@ public final class FunctionRegistry { /** * Both UDF and UDAF functions can imply order for analytical functions * - * @param name + * @param functionName * name of function * @return true if a GenericUDF or GenericUDAF exists for this name and implyOrder is true, false * otherwise. @@ -1894,7 +1895,8 @@ public final class FunctionRegistry { FunctionInfo info = getFunctionInfo(functionName); if (info != null) { if (info.isGenericUDF()) { - UDFType type = info.getGenericUDF().getClass().getAnnotation(UDFType.class); + UDFType type = + AnnotationUtils.getAnnotation(info.getGenericUDF().getClass(), UDFType.class); if (type != null) { return type.impliesOrder(); } @@ -1961,7 +1963,8 @@ public final class FunctionRegistry { FunctionInfo info = getFunctionInfo(name); GenericUDAFResolver res = info.getGenericUDAFResolver(); if (res != null){ - WindowFunctionDescription desc = res.getClass().getAnnotation(WindowFunctionDescription.class); + WindowFunctionDescription desc = + AnnotationUtils.getAnnotation(res.getClass(), WindowFunctionDescription.class); if (desc != null){ return desc.rankingFunction(); }
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1613740&r1=1613739&r2=1613740&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Sat Jul 26 23:45:46 2014 @@ -131,7 +131,7 @@ public class MapJoinOperator extends Abs int bigPos = conf.getPosBigTable(); List<ObjectInspector> valueOI = new ArrayList<ObjectInspector>(); for (int i = 0; i < valueIndex.length; i++) { - if (valueIndex[i] >= 0) { + if (valueIndex[i] >= 0 && !joinKeysObjectInspectors[bigPos].isEmpty()) { valueOI.add(joinKeysObjectInspectors[bigPos].get(valueIndex[i])); } else { valueOI.add(inspectors.get(i)); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1613740&r1=1613739&r2=1613740&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Sat Jul 26 23:45:46 2014 @@ -272,6 +272,7 @@ public class ReduceSinkOperator extends // TODO: this is fishy - we init object inspectors based on first tag. We // should either init for each tag, or if rowInspector doesn't really // matter, then we can create this in ctor and get rid of firstRow. + LOG.info("keys are " + conf.getOutputKeyColumnNames() + " num distributions: " + conf.getNumDistributionKeys()); keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval, distinctColIndices, conf.getOutputKeyColumnNames(), numDistributionKeys, rowInspector); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java?rev=1613740&r1=1613739&r2=1613740&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java Sat Jul 26 23:45:46 2014 @@ -32,8 +32,7 @@ import org.apache.hadoop.hive.serde2.obj /** * Select operator implementation. */ -public class SelectOperator extends Operator<SelectDesc> implements - Serializable { +public class SelectOperator extends Operator<SelectDesc> implements Serializable { private static final long serialVersionUID = 1L; protected transient ExprNodeEvaluator[] eval; @@ -60,10 +59,9 @@ public class SelectOperator extends Oper } } output = new Object[eval.length]; - LOG.info("SELECT " - + ((StructObjectInspector) inputObjInspectors[0]).getTypeName()); - outputObjInspector = initEvaluatorsAndReturnStruct(eval, conf - .getOutputColumnNames(), inputObjInspectors[0]); + LOG.info("SELECT " + ((StructObjectInspector) inputObjInspectors[0]).getTypeName()); + outputObjInspector = initEvaluatorsAndReturnStruct(eval, conf.getOutputColumnNames(), + inputObjInspectors[0]); initializeChildren(hconf); } @@ -81,8 +79,7 @@ public class SelectOperator extends Oper } catch (HiveException e) { throw e; } catch (RuntimeException e) { - throw new HiveException("Error evaluating " - + conf.getColList().get(i).getExprString(), e); + throw new HiveException("Error evaluating " + conf.getColList().get(i).getExprString(), e); } forward(output, outputObjInspector); } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java?rev=1613740&r1=1613739&r2=1613740&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java Sat Jul 26 23:45:46 2014 @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -35,6 +36,7 @@ import org.apache.hadoop.hive.ql.QueryPl import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.SessionState; @@ -365,6 +367,10 @@ public abstract class Task<T extends Ser return work; } + public Collection<MapWork> getMapWork() { + return Collections.<MapWork>emptyList(); + } + public void setId(String id) { this.id = id; } @@ -389,7 +395,7 @@ public abstract class Task<T extends Ser return false; } - public Operator<? extends OperatorDesc> getReducer() { + public Operator<? extends OperatorDesc> getReducer(MapWork work) { return null; } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1613740&r1=1613739&r2=1613740&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Sat Jul 26 23:45:46 2014 @@ -268,6 +268,10 @@ public final class Utilities { return w; } + public static void cacheMapWork(Configuration conf, MapWork work, Path hiveScratchDir) { + cacheBaseWork(conf, MAP_PLAN_NAME, work, hiveScratchDir); + } + public static void setMapWork(Configuration conf, MapWork work) { setBaseWork(conf, MAP_PLAN_NAME, work); } @@ -284,6 +288,17 @@ public final class Utilities { return (ReduceWork) getBaseWork(conf, REDUCE_PLAN_NAME); } + public static void cacheBaseWork(Configuration conf, String name, BaseWork work, + Path hiveScratchDir) { + try { + setPlanPath(conf, hiveScratchDir); + setBaseWork(conf, name, work); + } catch (IOException e) { + LOG.error("Failed to cache plan", e); + throw new RuntimeException(e); + } + } + /** * Pushes work into the global work map */ @@ -2332,13 +2347,15 @@ public final class Utilities { public static boolean isEmptyPath(JobConf job, Path dirPath, Context ctx) throws Exception { - ContentSummary cs = ctx.getCS(dirPath); - if (cs != null) { - LOG.info("Content Summary " + dirPath + "length: " + cs.getLength() + " num files: " - + cs.getFileCount() + " num directories: " + cs.getDirectoryCount()); - return (cs.getLength() == 0 && cs.getFileCount() == 0 && cs.getDirectoryCount() <= 1); - } else { - LOG.info("Content Summary not cached for " + dirPath); + if (ctx != null) { + ContentSummary cs = ctx.getCS(dirPath); + if (cs != null) { + LOG.info("Content Summary " + dirPath + "length: " + cs.getLength() + " num files: " + + cs.getFileCount() + " num directories: " + cs.getDirectoryCount()); + return (cs.getLength() == 0 && cs.getFileCount() == 0 && cs.getDirectoryCount() <= 1); + } else { + LOG.info("Content Summary not cached for " + dirPath); + } } return isEmptyPath(job, dirPath); } @@ -2986,7 +3003,13 @@ public final class Utilities { * so we don't want to depend on scratch dir and context. */ public static List<Path> getInputPathsTez(JobConf job, MapWork work) throws Exception { - List<Path> paths = getInputPaths(job, work, null, null); + String scratchDir = HiveConf.getVar(job, HiveConf.ConfVars.SCRATCHDIR); + + // we usually don't want to create dummy files for tez, however the metadata only + // optimization relies on it. + List<Path> paths = getInputPaths(job, work, new Path(scratchDir), null, + !work.isUseOneNullRowInputFormat()); + return paths; } @@ -3004,8 +3027,8 @@ public final class Utilities { * @return List of paths to process for the given MapWork * @throws Exception */ - public static List<Path> getInputPaths(JobConf job, MapWork work, Path hiveScratchDir, Context ctx) - throws Exception { + public static List<Path> getInputPaths(JobConf job, MapWork work, Path hiveScratchDir, + Context ctx, boolean skipDummy) throws Exception { int sequenceNumber = 0; Set<Path> pathsProcessed = new HashSet<Path>(); @@ -3030,7 +3053,7 @@ public final class Utilities { pathsProcessed.add(path); LOG.info("Adding input file " + path); - if (!HiveConf.getVar(job, ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") + if (!skipDummy && isEmptyPath(job, path, ctx)) { path = createDummyFileForEmptyPartition(path, job, work, hiveScratchDir, alias, sequenceNumber++); @@ -3048,8 +3071,7 @@ public final class Utilities { // T2) x; // If T is empty and T2 contains 100 rows, the user expects: 0, 100 (2 // rows) - if (path == null - && !HiveConf.getVar(job, ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + if (path == null && !skipDummy) { path = createDummyFileForEmptyTable(job, work, hiveScratchDir, alias, sequenceNumber++); pathsToAdd.add(path); @@ -3100,7 +3122,8 @@ public final class Utilities { PartitionDesc partDesc = work.getPathToPartitionInfo().get(strPath); boolean nonNative = partDesc.getTableDesc().isNonNative(); boolean oneRow = partDesc.getInputFileFormatClass() == OneNullRowInputFormat.class; - Properties props = partDesc.getProperties(); + Properties props = SerDeUtils.createOverlayedProperties( + partDesc.getTableDesc().getProperties(), partDesc.getProperties()); Class<? extends HiveOutputFormat> outFileFormat = partDesc.getOutputFileFormatClass(); if (nonNative) { Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java?rev=1613740&r1=1613739&r2=1613740&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java Sat Jul 26 23:45:46 2014 @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver; +import org.apache.hive.common.util.AnnotationUtils; @SuppressWarnings("deprecation") public class WindowFunctionInfo implements CommonFunctionInfo @@ -33,7 +34,8 @@ public class WindowFunctionInfo implemen assert fInfo.isGenericUDAF(); this.fInfo = fInfo; Class<? extends GenericUDAFResolver> wfnCls = fInfo.getGenericUDAFResolver().getClass(); - WindowFunctionDescription def = wfnCls.getAnnotation(WindowFunctionDescription.class); + WindowFunctionDescription def = + AnnotationUtils.getAnnotation(wfnCls, WindowFunctionDescription.class); if ( def != null) { supportsWindow = def.supportsWindow(); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1613740&r1=1613739&r2=1613740&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Sat Jul 26 23:45:46 2014 @@ -355,8 +355,8 @@ public class ExecDriver extends Task<Map //upload archive file to hdfs Path hdfsFilePath =Utilities.generateTarPath(hdfsPath, stageId); short replication = (short) job.getInt("mapred.submit.replication", 10); - hdfs.setReplication(hdfsFilePath, replication); hdfs.copyFromLocalFile(archivePath, hdfsFilePath); + hdfs.setReplication(hdfsFilePath, replication); LOG.info("Upload 1 archive file from" + archivePath + " to: " + hdfsFilePath); //add the archive file to distributed cache @@ -366,7 +366,7 @@ public class ExecDriver extends Task<Map } } work.configureJobConf(job); - List<Path> inputPaths = Utilities.getInputPaths(job, mWork, emptyScratchDir, ctx); + List<Path> inputPaths = Utilities.getInputPaths(job, mWork, emptyScratchDir, ctx, false); Utilities.setInputPaths(job, inputPaths); Utilities.setMapRedWork(job, work, ctx.getMRTmpPath()); @@ -789,6 +789,11 @@ public class ExecDriver extends Task<Map } @Override + public Collection<MapWork> getMapWork() { + return Collections.<MapWork>singleton(getWork().getMapWork()); + } + + @Override public boolean isMapRedTask() { return true; } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java?rev=1613740&r1=1613739&r2=1613740&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java Sat Jul 26 23:45:46 2014 @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.DriverC import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.plan.MapredWork; +import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.session.SessionState; @@ -462,8 +463,11 @@ public class MapRedTask extends ExecDriv } @Override - public Operator<? extends OperatorDesc> getReducer() { - return getWork().getReduceWork() == null ? null : getWork().getReduceWork().getReducer(); + public Operator<? extends OperatorDesc> getReducer(MapWork mapWork) { + if (getWork().getMapWork() == mapWork) { + return getWork().getReduceWork() == null ? null : getWork().getReduceWork().getReducer(); + } + return null; } @Override Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1613740&r1=1613739&r2=1613740&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Sat Jul 26 23:45:46 2014 @@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.exec.mr import org.apache.hadoop.hive.ql.exec.mr.ExecReducer; import org.apache.hadoop.hive.ql.exec.tez.tools.TezMergedLogicalInput; import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; +import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl; @@ -195,6 +196,10 @@ public class DagUtils { inpFormat = BucketizedHiveInputFormat.class.getName(); } + if (mapWork.isUseOneNullRowInputFormat()) { + inpFormat = CombineHiveInputFormat.class.getName(); + } + conf.set("mapred.mapper.class", ExecMapper.class.getName()); conf.set("mapred.input.format.class", inpFormat); @@ -413,7 +418,7 @@ public class DagUtils { Path tezDir = getTezDir(mrScratchDir); // set up the operator plan - Utilities.setMapWork(conf, mapWork, mrScratchDir, false); + Utilities.cacheMapWork(conf, mapWork, mrScratchDir); // create the directories FileSinkOperators need Utilities.createTmpDirs(conf, mapWork); @@ -441,6 +446,7 @@ public class DagUtils { } } } + if (vertexHasCustomInput) { useTezGroupedSplits = false; // grouping happens in execution phase. Setting the class to TezGroupedSplitsInputFormat @@ -459,7 +465,8 @@ public class DagUtils { } } - if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION)) { + if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION) + && !mapWork.isUseOneNullRowInputFormat()) { // if we're generating the splits in the AM, we just need to set // the correct plugin. amSplitGeneratorClass = HiveSplitGenerator.class; @@ -470,6 +477,9 @@ public class DagUtils { numTasks = inputSplitInfo.getNumTasks(); } + // set up the operator plan + Utilities.setMapWork(conf, mapWork, mrScratchDir, false); + byte[] serializedConf = MRHelpers.createUserPayloadFromConf(conf); map = new Vertex(mapWork.getName(), new ProcessorDescriptor(MapTezProcessor.class.getName()). Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1613740&r1=1613739&r2=1613740&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Sat Jul 26 23:45:46 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.tez; +import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; @@ -36,6 +37,9 @@ import org.apache.hadoop.hive.ql.exec.Ta import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; import org.apache.hadoop.hive.ql.plan.TezWork; @@ -354,4 +358,42 @@ public class TezTask extends Task<TezWor public String getName() { return "TEZ"; } + + @Override + public Collection<MapWork> getMapWork() { + List<MapWork> result = new LinkedList<MapWork>(); + TezWork work = getWork(); + + // framework expects MapWork instances that have no physical parents (i.e.: union parent is + // fine, broadcast parent isn't) + for (BaseWork w: work.getAllWorkUnsorted()) { + if (w instanceof MapWork) { + List<BaseWork> parents = work.getParents(w); + boolean candidate = true; + for (BaseWork parent: parents) { + if (!(parent instanceof UnionWork)) { + candidate = false; + } + } + if (candidate) { + result.add((MapWork)w); + } + } + } + return result; + } + + @Override + public Operator<? extends OperatorDesc> getReducer(MapWork mapWork) { + List<BaseWork> children = getWork().getChildren(mapWork); + if (children.size() != 1) { + return null; + } + + if (!(children.get(0) instanceof ReduceWork)) { + return null; + } + + return ((ReduceWork)children.get(0)).getReducer(); + } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java?rev=1613740&r1=1613739&r2=1613740&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java Sat Jul 26 23:45:46 2014 @@ -393,7 +393,7 @@ public class VectorColumnAssignFactory { else { BytesWritable bw = (BytesWritable) val; byte[] bytes = bw.getBytes(); - assignBytes(bytes, 0, bytes.length, destIndex); + assignBytes(bytes, 0, bw.getLength(), destIndex); } } }.init(outputBatch, (BytesColumnVector) destCol); @@ -408,7 +408,7 @@ public class VectorColumnAssignFactory { else { Text bw = (Text) val; byte[] bytes = bw.getBytes(); - assignBytes(bytes, 0, bytes.length, destIndex); + assignBytes(bytes, 0, bw.getLength(), destIndex); } } }.init(outputBatch, (BytesColumnVector) destCol); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java?rev=1613740&r1=1613739&r2=1613740&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java Sat Jul 26 23:45:46 2014 @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.v import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hive.common.util.AnnotationUtils; /** * Describes a vector expression and encapsulates the {@link Mode}, number of arguments, @@ -219,7 +220,8 @@ public class VectorExpressionDescriptor } public Class<?> getVectorExpressionClass(Class<?> udf, Descriptor descriptor) throws HiveException { - VectorizedExpressions annotation = udf.getAnnotation(VectorizedExpressions.class); + VectorizedExpressions annotation = + AnnotationUtils.getAnnotation(udf, VectorizedExpressions.class); if (annotation == null || annotation.value() == null) { return null; } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java?rev=1613740&r1=1613739&r2=1613740&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java Sat Jul 26 23:45:46 2014 @@ -100,6 +100,7 @@ import org.apache.hadoop.hive.serde2.typ import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; /** * Context class for vectorization execution. @@ -393,13 +394,30 @@ public class VectorizationContext { List<ExprNodeDesc> childrenWithCasts = new ArrayList<ExprNodeDesc>(); boolean atleastOneCastNeeded = false; - for (ExprNodeDesc child : children) { - ExprNodeDesc castExpression = getImplicitCastExpression(genericUDF, child, commonType); - if (castExpression != null) { - atleastOneCastNeeded = true; - childrenWithCasts.add(castExpression); - } else { - childrenWithCasts.add(child); + if (genericUDF instanceof GenericUDFElt) { + int i = 0; + for (ExprNodeDesc child : children) { + TypeInfo castType = commonType; + if (i++ == 0) { + castType = isIntFamily(child.getTypeString()) ? child.getTypeInfo() : TypeInfoFactory.intTypeInfo; + } + ExprNodeDesc castExpression = getImplicitCastExpression(genericUDF, child, castType); + if (castExpression != null) { + atleastOneCastNeeded = true; + childrenWithCasts.add(castExpression); + } else { + childrenWithCasts.add(child); + } + } + } else { + for (ExprNodeDesc child : children) { + ExprNodeDesc castExpression = getImplicitCastExpression(genericUDF, child, commonType); + if (castExpression != null) { + atleastOneCastNeeded = true; + childrenWithCasts.add(castExpression); + } else { + childrenWithCasts.add(child); + } } } if (atleastOneCastNeeded) { @@ -484,7 +502,7 @@ public class VectorizationContext { } else { // Casts to exact types including long to double etc. are needed in some special cases. - if (udf instanceof GenericUDFCoalesce) { + if (udf instanceof GenericUDFCoalesce || udf instanceof GenericUDFElt) { GenericUDF genericUdf = getGenericUDFForCast(castType); List<ExprNodeDesc> children = new ArrayList<ExprNodeDesc>(); children.add(child); @@ -699,13 +717,15 @@ public class VectorizationContext { private VectorExpression getConstantVectorExpression(Object constantValue, TypeInfo typeInfo, Mode mode) throws HiveException { - String type = typeInfo.getTypeName(); + String type = typeInfo.getTypeName(); String colVectorType = getNormalizedTypeName(type); int outCol = -1; if (mode == Mode.PROJECTION) { outCol = ocm.allocateOutputColumn(colVectorType); } - if (decimalTypePattern.matcher(type).matches()) { + if (constantValue == null) { + return new ConstantVectorExpression(outCol, type, true); + } else if (decimalTypePattern.matcher(type).matches()) { VectorExpression ve = new ConstantVectorExpression(outCol, (Decimal128) constantValue); ve.setOutputType(typeInfo.getTypeName()); return ve; @@ -896,6 +916,10 @@ public class VectorizationContext { // Coalesce is a special case because it can take variable number of arguments. return getCoalesceExpression(childExpr, returnType); + } else if (udf instanceof GenericUDFElt) { + + // Coalesce is a special case because it can take variable number of arguments. + return getEltExpression(childExpr, returnType); } else if (udf instanceof GenericUDFBridge) { VectorExpression v = getGenericUDFBridgeVectorExpression((GenericUDFBridge) udf, childExpr, mode, returnType); @@ -948,6 +972,33 @@ public class VectorizationContext { } } + private VectorExpression getEltExpression(List<ExprNodeDesc> childExpr, TypeInfo returnType) + throws HiveException { + int[] inputColumns = new int[childExpr.size()]; + VectorExpression[] vectorChildren = null; + try { + vectorChildren = getVectorExpressions(childExpr, Mode.PROJECTION); + + int i = 0; + for (VectorExpression ve : vectorChildren) { + inputColumns[i++] = ve.getOutputColumn(); + } + + int outColumn = ocm.allocateOutputColumn(getNormalizedTypeName(returnType.getTypeName())); + VectorElt vectorElt = new VectorElt(inputColumns, outColumn); + vectorElt.setOutputType(returnType.getTypeName()); + vectorElt.setChildExpressions(vectorChildren); + return vectorElt; + } finally { + // Free the output columns of the child expressions. + if (vectorChildren != null) { + for (VectorExpression v : vectorChildren) { + ocm.freeOutputColumn(v.getOutputColumn()); + } + } + } + } + /** * Create a filter or boolean-valued expression for column IN ( <list-of-constants> ) */ @@ -1063,10 +1114,9 @@ public class VectorizationContext { ExprNodeDesc child = childExpr.get(0); String inputType = childExpr.get(0).getTypeString(); if (child instanceof ExprNodeConstantDesc) { - // Return a constant vector expression - Object constantValue = ((ExprNodeConstantDesc) child).getValue(); - Decimal128 decimalValue = castConstantToDecimal(constantValue, child.getTypeInfo()); - return getConstantVectorExpression(decimalValue, returnType, Mode.PROJECTION); + // Don't do constant folding here. Wait until the optimizer is changed to do it. + // Family of related JIRAs: HIVE-7421, HIVE-7422, and HIVE-7424. + return null; } if (isIntFamily(inputType)) { return createVectorExpression(CastLongToDecimal.class, childExpr, Mode.PROJECTION, returnType); @@ -1083,49 +1133,15 @@ public class VectorizationContext { throw new HiveException("Unhandled cast input type: " + inputType); } - private Decimal128 castConstantToDecimal(Object scalar, TypeInfo type) throws HiveException { - PrimitiveTypeInfo ptinfo = (PrimitiveTypeInfo) type; - String typename = type.getTypeName(); - Decimal128 d = new Decimal128(); - int scale = HiveDecimalUtils.getScaleForType(ptinfo); - switch (ptinfo.getPrimitiveCategory()) { - case FLOAT: - float floatVal = ((Float) scalar).floatValue(); - d.update(floatVal, (short) scale); - break; - case DOUBLE: - double doubleVal = ((Double) scalar).doubleValue(); - d.update(doubleVal, (short) scale); - break; - case BYTE: - byte byteVal = ((Byte) scalar).byteValue(); - d.update(byteVal, (short) scale); - break; - case SHORT: - short shortVal = ((Short) scalar).shortValue(); - d.update(shortVal, (short) scale); - break; - case INT: - int intVal = ((Integer) scalar).intValue(); - d.update(intVal, (short) scale); - break; - case LONG: - long longVal = ((Long) scalar).longValue(); - d.update(longVal, (short) scale); - break; - case DECIMAL: - HiveDecimal decimalVal = (HiveDecimal) scalar; - d.update(decimalVal.unscaledValue(), (short) scale); - break; - default: - throw new HiveException("Unsupported type "+typename+" for cast to Decimal128"); - } - return d; - } - private VectorExpression getCastToString(List<ExprNodeDesc> childExpr, TypeInfo returnType) throws HiveException { + ExprNodeDesc child = childExpr.get(0); String inputType = childExpr.get(0).getTypeString(); + if (child instanceof ExprNodeConstantDesc) { + // Don't do constant folding here. Wait until the optimizer is changed to do it. + // Family of related JIRAs: HIVE-7421, HIVE-7422, and HIVE-7424. + return null; + } if (inputType.equals("boolean")) { // Boolean must come before the integer family. It's a special case. return createVectorExpression(CastBooleanToStringViaLongToString.class, childExpr, Mode.PROJECTION, null); @@ -1145,7 +1161,13 @@ public class VectorizationContext { private VectorExpression getCastToDoubleExpression(Class<?> udf, List<ExprNodeDesc> childExpr, TypeInfo returnType) throws HiveException { + ExprNodeDesc child = childExpr.get(0); String inputType = childExpr.get(0).getTypeString(); + if (child instanceof ExprNodeConstantDesc) { + // Don't do constant folding here. Wait until the optimizer is changed to do it. + // Family of related JIRAs: HIVE-7421, HIVE-7422, and HIVE-7424. + return null; + } if (isIntFamily(inputType)) { return createVectorExpression(CastLongToDouble.class, childExpr, Mode.PROJECTION, returnType); } else if (inputType.equals("timestamp")) { @@ -1163,7 +1185,13 @@ public class VectorizationContext { private VectorExpression getCastToBoolean(List<ExprNodeDesc> childExpr) throws HiveException { + ExprNodeDesc child = childExpr.get(0); String inputType = childExpr.get(0).getTypeString(); + if (child instanceof ExprNodeConstantDesc) { + // Don't do constant folding here. Wait until the optimizer is changed to do it. + // Family of related JIRAs: HIVE-7421, HIVE-7422, and HIVE-7424. + return null; + } // Long and double are handled using descriptors, string needs to be specially handled. if (inputType.equals("string")) { // string casts to false if it is 0 characters long, otherwise true @@ -1184,7 +1212,13 @@ public class VectorizationContext { private VectorExpression getCastToLongExpression(List<ExprNodeDesc> childExpr) throws HiveException { + ExprNodeDesc child = childExpr.get(0); String inputType = childExpr.get(0).getTypeString(); + if (child instanceof ExprNodeConstantDesc) { + // Don't do constant folding here. Wait until the optimizer is changed to do it. + // Family of related JIRAs: HIVE-7421, HIVE-7422, and HIVE-7424. + return null; + } // Float family, timestamp are handled via descriptor based lookup, int family needs // special handling. if (isIntFamily(inputType)) { Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/ConstantVectorExpression.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/ConstantVectorExpression.java?rev=1613740&r1=1613739&r2=1613740&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/ConstantVectorExpression.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/ConstantVectorExpression.java Sat Jul 26 23:45:46 2014 @@ -41,6 +41,7 @@ public class ConstantVectorExpression ex private double doubleValue = 0; private byte[] bytesValue = null; private Decimal128 decimalValue = null; + private boolean isNullValue = false; private Type type; private int bytesValueLength = 0; @@ -74,34 +75,58 @@ public class ConstantVectorExpression ex this(outputColumn, "decimal"); setDecimalValue(value); } - + + /* + * Support for null constant object + */ + public ConstantVectorExpression(int outputColumn, String typeString, boolean isNull) { + this(outputColumn, typeString); + isNullValue = isNull; + } + private void evaluateLong(VectorizedRowBatch vrg) { LongColumnVector cv = (LongColumnVector) vrg.cols[outputColumn]; cv.isRepeating = true; - cv.noNulls = true; - cv.vector[0] = longValue; + cv.noNulls = !isNullValue; + if (!isNullValue) { + cv.vector[0] = longValue; + } else { + cv.isNull[0] = true; + } } private void evaluateDouble(VectorizedRowBatch vrg) { DoubleColumnVector cv = (DoubleColumnVector) vrg.cols[outputColumn]; cv.isRepeating = true; - cv.noNulls = true; - cv.vector[0] = doubleValue; + cv.noNulls = !isNullValue; + if (!isNullValue) { + cv.vector[0] = doubleValue; + } else { + cv.isNull[0] = true; + } } private void evaluateBytes(VectorizedRowBatch vrg) { BytesColumnVector cv = (BytesColumnVector) vrg.cols[outputColumn]; cv.isRepeating = true; - cv.noNulls = true; + cv.noNulls = !isNullValue; cv.initBuffer(); - cv.setVal(0, bytesValue, 0, bytesValueLength); + if (!isNullValue) { + cv.setVal(0, bytesValue, 0, bytesValueLength); + } else { + cv.isNull[0] = true; + } } private void evaluateDecimal(VectorizedRowBatch vrg) { DecimalColumnVector dcv = (DecimalColumnVector) vrg.cols[outputColumn]; dcv.isRepeating = true; - dcv.noNulls = true; - dcv.vector[0].update(decimalValue); + dcv.noNulls = !isNullValue; + if (!isNullValue) { + dcv.vector[0].update(decimalValue); + } else { + dcv.isNull[0] = true; + } } @Override Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java?rev=1613740&r1=1613739&r2=1613740&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java Sat Jul 26 23:45:46 2014 @@ -478,12 +478,15 @@ public final class VectorExpressionWrite @Override public Object setValue(Object field, Decimal128 value) { + if (null == field) { + field = initValue(null); + } return ((SettableHiveDecimalObjectInspector) this.objectInspector).set(field, HiveDecimal.create(value.toBigDecimal())); } @Override - public Object initValue(Object ignored) throws HiveException { + public Object initValue(Object ignored) { return ((SettableHiveDecimalObjectInspector) this.objectInspector).create( HiveDecimal.create(BigDecimal.ZERO)); } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java?rev=1613740&r1=1613739&r2=1613740&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java Sat Jul 26 23:45:46 2014 @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.hooks; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -28,6 +29,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.ExplainTask; +import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; @@ -110,7 +112,8 @@ public class ATSHook implements ExecuteW ExplainTask explain = new ExplainTask(); explain.initialize(hookContext.getConf(), plan, null); String query = plan.getQueryStr(); - JSONObject explainPlan = explain.getJSONPlan(null, null, plan.getRootTasks(), + List<Task<?>> rootTasks = plan.getRootTasks(); + JSONObject explainPlan = explain.getJSONPlan(null, null, rootTasks, plan.getFetchTask(), true, false, false); fireAndForget(hookContext.getConf(), createPreHookEvent(queryId, query, explainPlan, queryStartTime, user, numMrJobs, numTezJobs)); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/Entity.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/Entity.java?rev=1613740&r1=1613739&r2=1613740&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/Entity.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/Entity.java Sat Jul 26 23:45:46 2014 @@ -40,7 +40,7 @@ public class Entity implements Serializa * The type of the entity. */ public static enum Type { - DATABASE, TABLE, PARTITION, DUMMYPARTITION, DFS_DIR, LOCAL_DIR + DATABASE, TABLE, PARTITION, DUMMYPARTITION, DFS_DIR, LOCAL_DIR, FUNCTION } /** @@ -64,11 +64,17 @@ public class Entity implements Serializa private Partition p; /** - * The directory if this is a directory. + * The directory if this is a directory */ private String d; /** + * An object that is represented as a String + * Currently used for functions + */ + private String stringObject; + + /** * This is derived from t and p, but we need to serialize this field to make * sure Entity.hashCode() does not need to recursively read into t and p. */ @@ -136,6 +142,21 @@ public class Entity implements Serializa this.d = d; } + public String getFunctionName() { + if (typ == Type.FUNCTION) { + return stringObject; + } + return null; + } + + public void setFunctionName(String funcName) { + if (typ != Type.FUNCTION) { + throw new IllegalArgumentException( + "Set function can't be called on entity if the entity type is not " + Type.FUNCTION); + } + this.stringObject = funcName; + } + /** * Only used by serialization. */ @@ -210,6 +231,24 @@ public class Entity implements Serializa } /** + * Create an entity representing a object with given name, database namespace and type + * @param database - database namespace + * @param strObj - object name as string + * @param type - the entity type. this constructor only supports FUNCTION type currently + */ + public Entity(Database database, String strObj, Type type) { + if (type != Type.FUNCTION) { + throw new IllegalArgumentException("This constructor is supported only for type:" + + Type.FUNCTION); + } + this.database = database; + this.stringObject = strObj; + this.typ = type; + this.complete = true; + name = computeName(); + } + + /** * Get the parameter map of the Entity. */ public Map<String, String> getParameters() { @@ -293,6 +332,8 @@ public class Entity implements Serializa return t.getDbName() + "@" + t.getTableName() + "@" + p.getName(); case DUMMYPARTITION: return p.getName(); + case FUNCTION: + return stringObject; default: return d; } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java?rev=1613740&r1=1613739&r2=1613740&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java Sat Jul 26 23:45:46 2014 @@ -82,6 +82,19 @@ public class WriteEntity extends Entity } /** + * Constructor for objects represented as String. + * Currently applicable only for function names. + * @param db + * @param objName + * @param type + * @param writeType + */ + public WriteEntity(Database db, String objName, Type type, WriteType writeType) { + super(db, objName, type); + this.writeType = writeType; + } + + /** * Constructor for a partition. * * @param p Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java?rev=1613740&r1=1613739&r2=1613740&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java Sat Jul 26 23:45:46 2014 @@ -17,11 +17,34 @@ */ package org.apache.hadoop.hive.ql.io; +import org.apache.hadoop.mapred.TextInputFormat; + +import com.google.common.annotations.VisibleForTesting; + public final class IOConstants { public static final String COLUMNS = "columns"; public static final String COLUMNS_TYPES = "columns.types"; public static final String MAPRED_TASK_ID = "mapred.task.id"; + public static final String TEXTFILE = "TEXTFILE"; + public static final String SEQUENCEFILE = "SEQUENCEFILE"; + public static final String RCFILE = "RCFILE"; + public static final String ORC = "ORC"; + public static final String ORCFILE = "ORCFILE"; + public static final String PARQUET = "PARQUET"; + public static final String PARQUETFILE = "PARQUETFILE"; + public static final String AVRO = "AVRO"; + public static final String AVROFILE = "AVROFILE"; + + @VisibleForTesting + public static final String CUSTOM_TEXT_SERDE = "CustomTextSerde"; + + public static final String TEXTFILE_INPUT = TextInputFormat.class + .getName(); + @SuppressWarnings("deprecation") + public static final String TEXTFILE_OUTPUT = IgnoreKeyTextOutputFormat.class + .getName(); + private IOConstants() { // prevent instantiation } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java?rev=1613740&r1=1613739&r2=1613740&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java Sat Jul 26 23:45:46 2014 @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.serde2.io. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; class ColumnStatisticsImpl implements ColumnStatistics { @@ -335,8 +336,8 @@ class ColumnStatisticsImpl implements Co protected static final class StringStatisticsImpl extends ColumnStatisticsImpl implements StringColumnStatistics { - private String minimum = null; - private String maximum = null; + private Text minimum = null; + private Text maximum = null; private long sum = 0; StringStatisticsImpl() { @@ -346,10 +347,10 @@ class ColumnStatisticsImpl implements Co super(stats); OrcProto.StringStatistics str = stats.getStringStatistics(); if (str.hasMaximum()) { - maximum = str.getMaximum(); + maximum = new Text(str.getMaximum()); } if (str.hasMinimum()) { - minimum = str.getMinimum(); + minimum = new Text(str.getMinimum()); } if(str.hasSum()) { sum = str.getSum(); @@ -365,16 +366,15 @@ class ColumnStatisticsImpl implements Co } @Override - void updateString(String value) { + void updateString(Text value) { if (minimum == null) { - minimum = value; - maximum = value; + maximum = minimum = new Text(value); } else if (minimum.compareTo(value) > 0) { - minimum = value; + minimum = new Text(value); } else if (maximum.compareTo(value) < 0) { - maximum = value; + maximum = new Text(value); } - sum += value.length(); + sum += value.getLength(); } @Override @@ -382,13 +382,18 @@ class ColumnStatisticsImpl implements Co super.merge(other); StringStatisticsImpl str = (StringStatisticsImpl) other; if (minimum == null) { - minimum = str.minimum; - maximum = str.maximum; + if(str.minimum != null) { + maximum = new Text(str.getMaximum()); + minimum = new Text(str.getMinimum()); + } else { + /* both are empty */ + maximum = minimum = null; + } } else if (str.minimum != null) { if (minimum.compareTo(str.minimum) > 0) { - minimum = str.minimum; + minimum = new Text(str.getMinimum()); } else if (maximum.compareTo(str.maximum) < 0) { - maximum = str.maximum; + maximum = new Text(str.getMaximum()); } } sum += str.sum; @@ -400,8 +405,8 @@ class ColumnStatisticsImpl implements Co OrcProto.StringStatistics.Builder str = OrcProto.StringStatistics.newBuilder(); if (getNumberOfValues() != 0) { - str.setMinimum(minimum); - str.setMaximum(maximum); + str.setMinimum(getMinimum()); + str.setMaximum(getMaximum()); str.setSum(sum); } result.setStringStatistics(str); @@ -410,12 +415,12 @@ class ColumnStatisticsImpl implements Co @Override public String getMinimum() { - return minimum; + return minimum == null ? null : minimum.toString(); } @Override public String getMaximum() { - return maximum; + return maximum == null ? null : maximum.toString(); } @Override @@ -428,9 +433,9 @@ class ColumnStatisticsImpl implements Co StringBuilder buf = new StringBuilder(super.toString()); if (getNumberOfValues() != 0) { buf.append(" min: "); - buf.append(minimum); + buf.append(getMinimum()); buf.append(" max: "); - buf.append(maximum); + buf.append(getMaximum()); buf.append(" sum: "); buf.append(sum); } @@ -617,8 +622,8 @@ class ColumnStatisticsImpl implements Co private static final class DateStatisticsImpl extends ColumnStatisticsImpl implements DateColumnStatistics { - private DateWritable minimum = null; - private DateWritable maximum = null; + private Integer minimum = null; + private Integer maximum = null; DateStatisticsImpl() { } @@ -628,10 +633,10 @@ class ColumnStatisticsImpl implements Co OrcProto.DateStatistics dateStats = stats.getDateStatistics(); // min,max values serialized/deserialized as int (days since epoch) if (dateStats.hasMaximum()) { - maximum = new DateWritable(dateStats.getMaximum()); + maximum = dateStats.getMaximum(); } if (dateStats.hasMinimum()) { - minimum = new DateWritable(dateStats.getMinimum()); + minimum = dateStats.getMinimum(); } } @@ -645,12 +650,12 @@ class ColumnStatisticsImpl implements Co @Override void updateDate(DateWritable value) { if (minimum == null) { - minimum = value; - maximum = value; - } else if (minimum.compareTo(value) > 0) { - minimum = value; - } else if (maximum.compareTo(value) < 0) { - maximum = value; + minimum = value.getDays(); + maximum = value.getDays(); + } else if (minimum > value.getDays()) { + minimum = value.getDays(); + } else if (maximum < value.getDays()) { + maximum = value.getDays(); } } @@ -662,9 +667,9 @@ class ColumnStatisticsImpl implements Co minimum = dateStats.minimum; maximum = dateStats.maximum; } else if (dateStats.minimum != null) { - if (minimum.compareTo(dateStats.minimum) > 0) { + if (minimum > dateStats.minimum) { minimum = dateStats.minimum; - } else if (maximum.compareTo(dateStats.maximum) < 0) { + } else if (maximum < dateStats.maximum) { maximum = dateStats.maximum; } } @@ -676,21 +681,26 @@ class ColumnStatisticsImpl implements Co OrcProto.DateStatistics.Builder dateStats = OrcProto.DateStatistics.newBuilder(); if (getNumberOfValues() != 0) { - dateStats.setMinimum(minimum.getDays()); - dateStats.setMaximum(maximum.getDays()); + dateStats.setMinimum(minimum); + dateStats.setMaximum(maximum); } result.setDateStatistics(dateStats); return result; } + private transient final DateWritable minDate = new DateWritable(); + private transient final DateWritable maxDate = new DateWritable(); + @Override public DateWritable getMinimum() { - return minimum; + minDate.set(minimum); + return minDate; } @Override public DateWritable getMaximum() { - return maximum; + maxDate.set(maximum); + return maxDate; } @Override @@ -733,7 +743,7 @@ class ColumnStatisticsImpl implements Co throw new UnsupportedOperationException("Can't update double"); } - void updateString(String value) { + void updateString(Text value) { throw new UnsupportedOperationException("Can't update string"); } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java?rev=1613740&r1=1613739&r2=1613740&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java Sat Jul 26 23:45:46 2014 @@ -20,7 +20,12 @@ package org.apache.hadoop.hive.ql.io.orc import java.util.ArrayList; import java.util.List; +import java.io.IOException; +import java.text.DecimalFormat; +import java.util.List; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex; import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry; @@ -103,7 +108,8 @@ public final class FileDump { buf.append(i); buf.append(": "); buf.append(encoding.getKind()); - if (encoding.getKind() == OrcProto.ColumnEncoding.Kind.DICTIONARY) { + if (encoding.getKind() == OrcProto.ColumnEncoding.Kind.DICTIONARY || + encoding.getKind() == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) { buf.append("["); buf.append(encoding.getDictionarySize()); buf.append("]"); @@ -147,7 +153,28 @@ public final class FileDump { } } } + + FileSystem fs = path.getFileSystem(conf); + long fileLen = fs.getContentSummary(path).getLength(); + long paddedBytes = getTotalPaddingSize(reader); + // empty ORC file is ~45 bytes. Assumption here is file length always >0 + double percentPadding = ((double) paddedBytes / (double) fileLen) * 100; + DecimalFormat format = new DecimalFormat("##.##"); + System.out.println("\nFile length: " + fileLen + " bytes"); + System.out.println("Padding length: " + paddedBytes + " bytes"); + System.out.println("Padding ratio: " + format.format(percentPadding) + "%"); rows.close(); } } + + private static long getTotalPaddingSize(Reader reader) throws IOException { + long paddedBytes = 0; + List<org.apache.hadoop.hive.ql.io.orc.StripeInformation> stripes = reader.getStripes(); + for (int i = 1; i < stripes.size(); i++) { + long prevStripeOffset = stripes.get(i - 1).getOffset(); + long prevStripeLen = stripes.get(i - 1).getLength(); + paddedBytes += stripes.get(i).getOffset() - (prevStripeOffset + prevStripeLen); + } + return paddedBytes; + } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java?rev=1613740&r1=1613739&r2=1613740&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java Sat Jul 26 23:45:46 2014 @@ -26,6 +26,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.*; + /** * Contains factory methods to read or write ORC files. */ @@ -233,29 +235,13 @@ public final class OrcFile { WriterOptions(Configuration conf) { configuration = conf; memoryManagerValue = getMemoryManager(conf); - stripeSizeValue = - conf.getLong(HiveConf.ConfVars.HIVE_ORC_DEFAULT_STRIPE_SIZE.varname, - HiveConf.ConfVars.HIVE_ORC_DEFAULT_STRIPE_SIZE.defaultLongVal); - blockSizeValue = - conf.getLong(HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_SIZE.varname, - HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_SIZE.defaultLongVal); - rowIndexStrideValue = - conf.getInt(HiveConf.ConfVars.HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE - .varname, HiveConf.ConfVars.HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE.defaultIntVal); - bufferSizeValue = - conf.getInt(HiveConf.ConfVars.HIVE_ORC_DEFAULT_BUFFER_SIZE.varname, - HiveConf.ConfVars.HIVE_ORC_DEFAULT_BUFFER_SIZE.defaultIntVal); - blockPaddingValue = - conf.getBoolean(HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_PADDING - .varname, HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_PADDING - .defaultBoolVal); - compressValue = - CompressionKind.valueOf(conf.get(HiveConf.ConfVars - .HIVE_ORC_DEFAULT_COMPRESS.varname, - HiveConf.ConfVars - .HIVE_ORC_DEFAULT_COMPRESS.defaultVal)); - String versionName = - conf.get(HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT.varname); + stripeSizeValue = HiveConf.getLongVar(conf, HIVE_ORC_DEFAULT_STRIPE_SIZE); + blockSizeValue = HiveConf.getLongVar(conf, HIVE_ORC_DEFAULT_BLOCK_SIZE); + rowIndexStrideValue = HiveConf.getIntVar(conf, HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE); + bufferSizeValue = HiveConf.getIntVar(conf, HIVE_ORC_DEFAULT_BUFFER_SIZE); + blockPaddingValue = HiveConf.getBoolVar(conf, HIVE_ORC_DEFAULT_BLOCK_PADDING); + compressValue = CompressionKind.valueOf(HiveConf.getVar(conf, HIVE_ORC_DEFAULT_COMPRESS)); + String versionName = HiveConf.getVar(conf, HIVE_ORC_WRITE_FORMAT); if (versionName == null) { versionValue = Version.CURRENT; } else { Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1613740&r1=1613739&r2=1613740&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Sat Jul 26 23:45:46 2014 @@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.exec.ve import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; @@ -2404,6 +2405,10 @@ class RecordReaderImpl implements Record return Double.valueOf(predObj.toString()); } } else if (statsObj instanceof String) { + // Ex: where d = date '1970-02-01' will be ExprNodeConstantDesc + if (predObj instanceof ExprNodeConstantDesc) { + return ((ExprNodeConstantDesc) predObj).getValue().toString(); + } return predObj.toString(); } else if (statsObj instanceof HiveDecimal) { if (predObj instanceof Long) { Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java?rev=1613740&r1=1613739&r2=1613740&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java Sat Jul 26 23:45:46 2014 @@ -38,7 +38,11 @@ class StringRedBlackTree extends RedBlac public int add(String value) { newKey.set(value); - // if the key is new, add it to our byteArray and store the offset & length + return addNewKey(); + } + + private int addNewKey() { + // if the newKey is actually new, add it to our byteArray and store the offset & length if (add()) { int len = newKey.getLength(); keyOffsets.add(byteArray.add(newKey.getBytes(), 0, len)); @@ -46,6 +50,11 @@ class StringRedBlackTree extends RedBlac return lastAdd; } + public int add(Text value) { + newKey.set(value); + return addNewKey(); + } + @Override protected int compareValue(int position) { int start = keyOffsets.get(position); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1613740&r1=1613739&r2=1613740&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Sat Jul 26 23:45:46 2014 @@ -1032,20 +1032,20 @@ class WriterImpl implements Writer, Memo } /** - * Method to retrieve string values from the value object, which can be overridden + * Method to retrieve text values from the value object, which can be overridden * by subclasses. * @param obj value - * @return String value from obj + * @return Text text value from obj */ - String getStringValue(Object obj) { - return ((StringObjectInspector) inspector).getPrimitiveJavaObject(obj); + Text getTextValue(Object obj) { + return ((StringObjectInspector) inspector).getPrimitiveWritableObject(obj); } @Override void write(Object obj) throws IOException { super.write(obj); if (obj != null) { - String val = getStringValue(obj); + Text val = getTextValue(obj); rows.add(dictionary.add(val)); indexStatistics.updateString(val); } @@ -1194,9 +1194,9 @@ class WriterImpl implements Writer, Memo * Override base class implementation to support char values. */ @Override - String getStringValue(Object obj) { + Text getTextValue(Object obj) { return (((HiveCharObjectInspector) inspector) - .getPrimitiveJavaObject(obj)).getValue(); + .getPrimitiveWritableObject(obj)).getTextValue(); } } @@ -1216,9 +1216,9 @@ class WriterImpl implements Writer, Memo * Override base class implementation to support varchar values. */ @Override - String getStringValue(Object obj) { + Text getTextValue(Object obj) { return (((HiveVarcharObjectInspector) inspector) - .getPrimitiveJavaObject(obj)).getValue(); + .getPrimitiveWritableObject(obj)).getTextValue(); } } @@ -1938,7 +1938,7 @@ class WriterImpl implements Writer, Memo if (availRatio < paddingTolerance && addBlockPadding) { long padding = blockSize - (start % blockSize); byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, padding)]; - LOG.info(String.format("Padding ORC by %d bytes (<= %0.2f * %d)", + LOG.info(String.format("Padding ORC by %d bytes (<= %.2f * %d)", padding, availRatio, defaultStripeSize)); start += padding; while (padding > 0) { Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java?rev=1613740&r1=1613739&r2=1613740&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java Sat Jul 26 23:45:46 2014 @@ -70,6 +70,9 @@ public class HiveLockObject { lockTime = elem[1]; lockMode = elem[2]; queryStr = elem[3]; + if (elem.length >= 5) { + clientIp = elem[4]; + } } public String getQueryId() { Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java?rev=1613740&r1=1613739&r2=1613740&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java Sat Jul 26 23:45:46 2014 @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.ql.lockmgr import org.apache.hadoop.hive.ql.metadata.*; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; @@ -296,45 +297,77 @@ public class ZooKeeperHiveLockManager im private ZooKeeperHiveLock lock (HiveLockObject key, HiveLockMode mode, boolean keepAlive, boolean parentCreated) throws LockException { - int tryNum = 1; + int tryNum = 0; ZooKeeperHiveLock ret = null; + Set<String> conflictingLocks = new HashSet<String>(); do { + tryNum++; try { if (tryNum > 1) { Thread.sleep(sleepTime); prepareRetry(); } - ret = lockPrimitive(key, mode, keepAlive, parentCreated); + ret = lockPrimitive(key, mode, keepAlive, parentCreated, conflictingLocks); if (ret != null) { break; } - tryNum++; } catch (Exception e1) { - tryNum++; if (e1 instanceof KeeperException) { KeeperException e = (KeeperException) e1; switch (e.code()) { case CONNECTIONLOSS: case OPERATIONTIMEOUT: - LOG.warn("Possibly transient ZooKeeper exception: ", e); - break; + LOG.debug("Possibly transient ZooKeeper exception: ", e); + continue; default: LOG.error("Serious Zookeeper exception: ", e); break; } } if (tryNum >= numRetriesForLock) { + console.printError("Unable to acquire " + key.getData().getLockMode() + + ", " + mode + " lock " + key.getDisplayName() + " after " + + tryNum + " attempts."); + LOG.error("Exceeds maximum retries with errors: ", e1); + printConflictingLocks(key,mode,conflictingLocks); + conflictingLocks.clear(); throw new LockException(e1); } } } while (tryNum < numRetriesForLock); + if (ret == null) { + console.printError("Unable to acquire " + key.getData().getLockMode() + + ", " + mode + " lock " + key.getDisplayName() + " after " + + tryNum + " attempts."); + printConflictingLocks(key,mode,conflictingLocks); + } + conflictingLocks.clear(); return ret; } + private void printConflictingLocks(HiveLockObject key, HiveLockMode mode, + Set<String> conflictingLocks) { + if (!conflictingLocks.isEmpty()) { + HiveLockObjectData requestedLock = new HiveLockObjectData(key.getData().toString()); + LOG.debug("Requested lock " + key.getDisplayName() + + ":: mode:" + requestedLock.getLockMode() + "," + mode + + "; query:" + requestedLock.getQueryStr()); + for (String conflictingLock : conflictingLocks) { + HiveLockObjectData conflictingLockData = new HiveLockObjectData(conflictingLock); + LOG.debug("Conflicting lock to " + key.getDisplayName() + + ":: mode:" + conflictingLockData.getLockMode() + + ";query:" + conflictingLockData.getQueryStr() + + ";queryId:" + conflictingLockData.getQueryId() + + ";clientIp:" + conflictingLockData.getClientIp()); + } + } + } + private ZooKeeperHiveLock lockPrimitive(HiveLockObject key, - HiveLockMode mode, boolean keepAlive, boolean parentCreated) + HiveLockMode mode, boolean keepAlive, boolean parentCreated, + Set<String> conflictingLocks) throws KeeperException, InterruptedException { String res; @@ -394,9 +427,19 @@ public class ZooKeeperHiveLockManager im } if ((childSeq >= 0) && (childSeq < seqNo)) { - zooKeeper.delete(res, -1); - console.printError("conflicting lock present for " - + key.getDisplayName() + " mode " + mode); + try { + zooKeeper.delete(res, -1); + } finally { + if (LOG.isDebugEnabled()) { + Stat stat = new Stat(); + try { + String data = new String(zooKeeper.getData(child, false, stat)); + conflictingLocks.add(data); + } catch (Exception e) { + //ignored + } + } + } return null; } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1613740&r1=1613739&r2=1613740&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Sat Jul 26 23:45:46 2014 @@ -58,7 +58,6 @@ import org.apache.hadoop.hive.conf.HiveC import org.apache.hadoop.hive.metastore.HiveMetaException; import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.metastore.HiveMetaHookLoader; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; @@ -213,7 +212,6 @@ public class Hive { /** * Hive * - * @param argFsRoot * @param c * */ @@ -402,6 +400,7 @@ public class Hive { if (newTbl.getParameters() != null) { newTbl.getParameters().remove(hive_metastoreConstants.DDL_TIME); } + newTbl.checkValidity(); getMSC().alter_table(names[0], names[1], newTbl.getTTable()); } catch (MetaException e) { throw new HiveException("Unable to alter table.", e); @@ -469,6 +468,7 @@ public class Hive { if (newPart.getParameters() != null) { newPart.getParameters().remove(hive_metastoreConstants.DDL_TIME); } + newPart.checkValidity(); getMSC().alter_partition(dbName, tblName, newPart.getTPartition()); } catch (MetaException e) { @@ -681,6 +681,10 @@ public class Hive { if (baseTbl.getTableType() == TableType.VIRTUAL_VIEW.toString()) { throw new HiveException("tableName="+ tableName +" is a VIRTUAL VIEW. Index on VIRTUAL VIEW is not supported."); } + if (baseTbl.isTemporary()) { + throw new HiveException("tableName=" + tableName + + " is a TEMPORARY TABLE. Index on TEMPORARY TABLE is not supported."); + } if (indexTblName == null) { indexTblName = MetaStoreUtils.getIndexTableName(dbName, tableName, indexName); @@ -1014,10 +1018,7 @@ public class Hive { } } - Table table = new Table(tTable); - - table.checkValidity(); - return table; + return new Table(tTable); } /** @@ -1135,10 +1136,10 @@ public class Hive { * @return true on success * @throws HiveException */ - public boolean revokePrivileges(PrivilegeBag privileges) + public boolean revokePrivileges(PrivilegeBag privileges, boolean grantOption) throws HiveException { try { - return getMSC().revoke_privileges(privileges); + return getMSC().revoke_privileges(privileges, grantOption); } catch (Exception e) { throw new HiveException(e); } @@ -2068,9 +2069,9 @@ private void constructOneLBLocationMap(F } public boolean revokeRole(String roleName, String userName, - PrincipalType principalType) throws HiveException { + PrincipalType principalType, boolean grantOption) throws HiveException { try { - return getMSC().revoke_role(roleName, userName, principalType); + return getMSC().revoke_role(roleName, userName, principalType, grantOption); } catch (Exception e) { throw new HiveException(e); } @@ -2486,7 +2487,7 @@ private void constructOneLBLocationMap(F } }; return RetryingMetaStoreClient.getProxy(conf, hookLoader, - HiveMetaStoreClient.class.getName()); + SessionHiveMetaStoreClient.class.getName()); } /** Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java?rev=1613740&r1=1613739&r2=1613740&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java Sat Jul 26 23:45:46 2014 @@ -33,7 +33,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.JavaUtils; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.ProtectMode; import org.apache.hadoop.hive.metastore.Warehouse; @@ -46,7 +45,6 @@ import org.apache.hadoop.hive.ql.io.Hive import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat; import org.apache.hadoop.hive.serde2.Deserializer; -import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.mapred.InputFormat; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; @@ -504,8 +502,7 @@ public class Partition implements Serial public List<FieldSchema> getCols() { try { - if (Hive.get().getConf().getStringCollection(ConfVars.SERDESUSINGMETASTOREFORSCHEMA.varname) - .contains(tPartition.getSd().getSerdeInfo().getSerializationLib())) { + if (Table.hasMetastoreBasedSchema(Hive.get().getConf(), tPartition.getSd())) { return tPartition.getSd().getCols(); } return Hive.getFieldsFromDeserializer(table.getTableName(), getDeserializer()); @@ -644,4 +641,10 @@ public class Partition implements Serial public Map<List<String>, String> getSkewedColValueLocationMaps() { return tPartition.getSd().getSkewedInfo().getSkewedColValueLocationMaps(); } + + public void checkValidity() throws HiveException { + if (!tPartition.getSd().equals(table.getSd())) { + Table.validateColumns(getCols(), table.getPartCols()); + } + } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=1613740&r1=1613739&r2=1613740&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Sat Jul 26 23:45:46 2014 @@ -23,18 +23,19 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; -import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.ProtectMode; @@ -58,7 +59,6 @@ import org.apache.hadoop.hive.serde.serd import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe; import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.Writable; @@ -137,7 +137,7 @@ public class Table implements Serializab } /** - * Initialize an emtpy table. + * Initialize an empty table. */ public static org.apache.hadoop.hive.metastore.api.Table getEmptyTable(String databaseName, String tableName) { @@ -210,37 +210,11 @@ public class Table implements Serializab assert(getViewExpandedText() == null); } - Iterator<FieldSchema> iterCols = getCols().iterator(); - List<String> colNames = new ArrayList<String>(); - while (iterCols.hasNext()) { - String colName = iterCols.next().getName(); - if (!MetaStoreUtils.validateColumnName(colName)) { - throw new HiveException("Invalid column name '" + colName - + "' in the table definition"); - } - Iterator<String> iter = colNames.iterator(); - while (iter.hasNext()) { - String oldColName = iter.next(); - if (colName.equalsIgnoreCase(oldColName)) { - throw new HiveException("Duplicate column name " + colName - + " in the table definition."); - } - } - colNames.add(colName.toLowerCase()); - } + validateColumns(getCols(), getPartCols()); + } - if (getPartCols() != null) { - // there is no overlap between columns and partitioning columns - Iterator<FieldSchema> partColsIter = getPartCols().iterator(); - while (partColsIter.hasNext()) { - String partCol = partColsIter.next().getName(); - if (colNames.contains(partCol.toLowerCase())) { - throw new HiveException("Partition column name " + partCol - + " conflicts with table columns."); - } - } - } - return; + public StorageDescriptor getSd() { + return tTable.getSd(); } public void setInputFormatClass(Class<? extends InputFormat> inputFormatClass) { @@ -623,15 +597,15 @@ public class Table implements Serializab public List<FieldSchema> getCols() { + String serializationLib = getSerializationLib(); try { - if (null == getSerializationLib() || Hive.get().getConf().getStringCollection( - ConfVars.SERDESUSINGMETASTOREFORSCHEMA.varname).contains(getSerializationLib())) { + if (hasMetastoreBasedSchema(Hive.get().getConf(), serializationLib)) { return tTable.getSd().getCols(); } else { return Hive.getFieldsFromDeserializer(getTableName(), getDeserializer()); } } catch (HiveException e) { - LOG.error("Unable to get field from serde: " + getSerializationLib(), e); + LOG.error("Unable to get field from serde: " + serializationLib, e); } return new ArrayList<FieldSchema>(); } @@ -996,4 +970,48 @@ public class Table implements Serializab throw new RuntimeException("Cannot get path ", e); } } + + public boolean isTemporary() { + return tTable.isTemporary(); + } + + public static boolean hasMetastoreBasedSchema(HiveConf conf, StorageDescriptor serde) { + return hasMetastoreBasedSchema(conf, serde.getSerdeInfo().getSerializationLib()); + } + + public static boolean hasMetastoreBasedSchema(HiveConf conf, String serdeLib) { + return StringUtils.isEmpty(serdeLib) || + conf.getStringCollection(ConfVars.SERDESUSINGMETASTOREFORSCHEMA.varname).contains(serdeLib); + } + + public static void validateColumns(List<FieldSchema> columns, List<FieldSchema> partCols) + throws HiveException { + List<String> colNames = new ArrayList<String>(); + for (FieldSchema partCol: columns) { + String colName = normalize(partCol.getName()); + if (colNames.contains(colName)) { + throw new HiveException("Duplicate column name " + colName + + " in the table definition."); + } + colNames.add(colName); + } + if (partCols != null) { + // there is no overlap between columns and partitioning columns + for (FieldSchema partCol: partCols) { + String colName = normalize(partCol.getName()); + if (colNames.contains(colName)) { + throw new HiveException("Partition column name " + colName + + " conflicts with table columns."); + } + } + } + } + + private static String normalize(String colName) throws HiveException { + if (!MetaStoreUtils.validateColumnName(colName)) { + throw new HiveException("Invalid column name '" + colName + + "' in the table definition"); + } + return colName.toLowerCase(); + } };
