Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1545564&r1=1545563&r2=1545564&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Tue Nov 26 08:19:25 2013 @@ -46,7 +46,7 @@ import org.apache.hadoop.hive.serde2.obj import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; /** @@ -59,24 +59,13 @@ public abstract class Operator<T extends private static final long serialVersionUID = 1L; + public static final String HIVECOUNTERCREATEDFILES = "CREATED_FILES"; + public static final String HIVECOUNTERFATAL = "FATAL_ERROR"; + private transient Configuration configuration; protected List<Operator<? extends OperatorDesc>> childOperators; protected List<Operator<? extends OperatorDesc>> parentOperators; protected String operatorId; - /** - * List of counter names associated with the operator. It contains the - * following default counters NUM_INPUT_ROWS NUM_OUTPUT_ROWS TIME_TAKEN - * Individual operators can add to this list via addToCounterNames methods. - */ - protected ArrayList<String> counterNames; - - /** - * Each operator has its own map of its counter names to disjoint - * ProgressCounter - it is populated at compile time and is read in at - * run-time while extracting the operator specific counts. - */ - protected HashMap<String, ProgressCounter> counterNameToEnum; - private transient ExecMapperContext execContext; private static AtomicInteger seqId; @@ -98,13 +87,10 @@ public abstract class Operator<T extends // to children. Note: close() being called and its state being CLOSE is // difference since close() could be called but state is not CLOSE if // one of its parent is not in state CLOSE.. - }; + } protected transient State state = State.UNINIT; - protected static transient boolean fatalError = false; // fatalError is shared acorss - // all operators - static { seqId = new AtomicInteger(0); } @@ -115,6 +101,7 @@ public abstract class Operator<T extends id = String.valueOf(seqId.getAndIncrement()); childOperators = new ArrayList<Operator<? extends OperatorDesc>>(); parentOperators = new ArrayList<Operator<? extends OperatorDesc>>(); + initOperatorId(); } public static void resetId() { @@ -197,10 +184,10 @@ public abstract class Operator<T extends } public boolean getDone() { - return done || fatalError; + return done; } - public void setDone(boolean done) { + protected final void setDone(boolean done) { this.done = done; } @@ -218,6 +205,8 @@ public abstract class Operator<T extends // non-bean .. protected transient HashMap<Enum<?>, LongWritable> statsMap = new HashMap<Enum<?>, LongWritable>(); + @SuppressWarnings("rawtypes") + protected transient OutputCollector out; protected transient Log LOG = LogFactory.getLog(this.getClass().getName()); protected transient boolean isLogInfoEnabled = LOG.isInfoEnabled(); protected transient String alias; @@ -263,6 +252,20 @@ public abstract class Operator<T extends } } + @SuppressWarnings("rawtypes") + public void setOutputCollector(OutputCollector out) { + this.out = out; + + // the collector is same across all operators + if (childOperators == null) { + return; + } + + for (Operator<? extends OperatorDesc> op : childOperators) { + op.setOutputCollector(out); + } + } + /** * Store the alias this operator is working on behalf of. */ @@ -318,6 +321,7 @@ public abstract class Operator<T extends * ignored. * @throws HiveException */ + @SuppressWarnings("unchecked") public void initialize(Configuration hconf, ObjectInspector[] inputOIs) throws HiveException { if (state == State.INIT) { @@ -475,38 +479,6 @@ public abstract class Operator<T extends */ public abstract void processOp(Object row, int tag) throws HiveException; - /** - * Process the row. - * - * @param row - * The object representing the row. - * @param tag - * The tag of the row usually means which parent this row comes from. - * Rows with the same tag should have exactly the same rowInspector - * all the time. - */ - public void process(Object row, int tag) throws HiveException { - if (fatalError) { - return; - } - - if (counterNameToEnum != null) { - inputRows++; - if ((inputRows % 1000) == 0) { - incrCounter(numInputRowsCntr, inputRows); - incrCounter(timeTakenCntr, totalTime); - inputRows = 0; - totalTime = 0; - } - - beginTime = System.currentTimeMillis(); - processOp(row, tag); - totalTime += (System.currentTimeMillis() - beginTime); - } else { - processOp(row, tag); - } - } - protected final void defaultStartGroup() throws HiveException { LOG.debug("Starting group"); @@ -514,10 +486,6 @@ public abstract class Operator<T extends return; } - if (fatalError) { - return; - } - LOG.debug("Starting group for children:"); for (Operator<? extends OperatorDesc> op : childOperators) { op.startGroup(); @@ -533,10 +501,6 @@ public abstract class Operator<T extends return; } - if (fatalError) { - return; - } - LOG.debug("Ending group for children:"); for (Operator<? extends OperatorDesc> op : childOperators) { op.endGroup(); @@ -607,14 +571,6 @@ public abstract class Operator<T extends reporter = null; - if (counterNameToEnum != null) { - incrCounter(numInputRowsCntr, inputRows); - incrCounter(numOutputRowsCntr, outputRows); - incrCounter(timeTakenCntr, totalTime); - } - - LOG.info(id + " forwarded " + cntr + " rows"); - try { logStats(); if (childOperators == null) { @@ -679,10 +635,6 @@ public abstract class Operator<T extends protected transient Operator<? extends OperatorDesc>[] childOperatorsArray = null; protected transient int[] childOperatorsTag; - // counters for debugging - private transient long cntr = 0; - private transient long nextCntr = 1; - /** * Replace one child with another at the same position. The parent of the * child is not changed @@ -821,21 +773,6 @@ public abstract class Operator<T extends protected void forward(Object row, ObjectInspector rowInspector) throws HiveException { - if (counterNameToEnum != null) { - if ((++outputRows % 1000) == 0) { - incrCounter(numOutputRowsCntr, outputRows); - outputRows = 0; - } - } - - increaseForward(1); - - // For debugging purposes: - // System.out.println("" + this.getClass() + ": " + - // SerDeUtils.getJSONString(row, rowInspector)); - // System.out.println("" + this.getClass() + ">> " + - // ObjectInspectorUtils.getObjectInspectorName(rowInspector)); - if ((childOperatorsArray == null) || (getDone())) { return; } @@ -846,7 +783,7 @@ public abstract class Operator<T extends if (o.getDone()) { childrenDone++; } else { - o.process(row, childOperatorsTag[i]); + o.processOp(row, childOperatorsTag[i]); } } @@ -856,18 +793,6 @@ public abstract class Operator<T extends } } - void increaseForward(long counter) { - if (isLogInfoEnabled) { - cntr += counter; - if (cntr >= nextCntr) { - LOG.info(id + " forwarding " + cntr + " rows"); - do { - nextCntr = getNextCntr(nextCntr); - } while(cntr >= nextCntr); - } - } - } - public void resetStats() { for (Enum<?> e : statsMap.keySet()) { statsMap.get(e).set(0L); @@ -1028,174 +953,12 @@ public abstract class Operator<T extends outputColName, Arrays.asList(fieldObjectInspectors)); } - /** - * All counter stuff below this - */ - - /** - * TODO This is a hack for hadoop 0.17 which only supports enum counters. - */ - public static enum ProgressCounter { - CREATED_FILES, - C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, - C11, C12, C13, C14, C15, C16, C17, C18, C19, C20, - C21, C22, C23, C24, C25, C26, C27, C28, C29, C30, - C31, C32, C33, C34, C35, C36, C37, C38, C39, C40, - C41, C42, C43, C44, C45, C46, C47, C48, C49, C50, - C51, C52, C53, C54, C55, C56, C57, C58, C59, C60, - C61, C62, C63, C64, C65, C66, C67, C68, C69, C70, - C71, C72, C73, C74, C75, C76, C77, C78, C79, C80, - C81, C82, C83, C84, C85, C86, C87, C88, C89, C90, - C91, C92, C93, C94, C95, C96, C97, C98, C99, C100, - C101, C102, C103, C104, C105, C106, C107, C108, C109, C110, - C111, C112, C113, C114, C115, C116, C117, C118, C119, C120, - C121, C122, C123, C124, C125, C126, C127, C128, C129, C130, - C131, C132, C133, C134, C135, C136, C137, C138, C139, C140, - C141, C142, C143, C144, C145, C146, C147, C148, C149, C150, - C151, C152, C153, C154, C155, C156, C157, C158, C159, C160, - C161, C162, C163, C164, C165, C166, C167, C168, C169, C170, - C171, C172, C173, C174, C175, C176, C177, C178, C179, C180, - C181, C182, C183, C184, C185, C186, C187, C188, C189, C190, - C191, C192, C193, C194, C195, C196, C197, C198, C199, C200, - C201, C202, C203, C204, C205, C206, C207, C208, C209, C210, - C211, C212, C213, C214, C215, C216, C217, C218, C219, C220, - C221, C222, C223, C224, C225, C226, C227, C228, C229, C230, - C231, C232, C233, C234, C235, C236, C237, C238, C239, C240, - C241, C242, C243, C244, C245, C246, C247, C248, C249, C250, - C251, C252, C253, C254, C255, C256, C257, C258, C259, C260, - C261, C262, C263, C264, C265, C266, C267, C268, C269, C270, - C271, C272, C273, C274, C275, C276, C277, C278, C279, C280, - C281, C282, C283, C284, C285, C286, C287, C288, C289, C290, - C291, C292, C293, C294, C295, C296, C297, C298, C299, C300, - C301, C302, C303, C304, C305, C306, C307, C308, C309, C310, - C311, C312, C313, C314, C315, C316, C317, C318, C319, C320, - C321, C322, C323, C324, C325, C326, C327, C328, C329, C330, - C331, C332, C333, C334, C335, C336, C337, C338, C339, C340, - C341, C342, C343, C344, C345, C346, C347, C348, C349, C350, - C351, C352, C353, C354, C355, C356, C357, C358, C359, C360, - C361, C362, C363, C364, C365, C366, C367, C368, C369, C370, - C371, C372, C373, C374, C375, C376, C377, C378, C379, C380, - C381, C382, C383, C384, C385, C386, C387, C388, C389, C390, - C391, C392, C393, C394, C395, C396, C397, C398, C399, C400, - C401, C402, C403, C404, C405, C406, C407, C408, C409, C410, - C411, C412, C413, C414, C415, C416, C417, C418, C419, C420, - C421, C422, C423, C424, C425, C426, C427, C428, C429, C430, - C431, C432, C433, C434, C435, C436, C437, C438, C439, C440, - C441, C442, C443, C444, C445, C446, C447, C448, C449, C450, - C451, C452, C453, C454, C455, C456, C457, C458, C459, C460, - C461, C462, C463, C464, C465, C466, C467, C468, C469, C470, - C471, C472, C473, C474, C475, C476, C477, C478, C479, C480, - C481, C482, C483, C484, C485, C486, C487, C488, C489, C490, - C491, C492, C493, C494, C495, C496, C497, C498, C499, C500, - C501, C502, C503, C504, C505, C506, C507, C508, C509, C510, - C511, C512, C513, C514, C515, C516, C517, C518, C519, C520, - C521, C522, C523, C524, C525, C526, C527, C528, C529, C530, - C531, C532, C533, C534, C535, C536, C537, C538, C539, C540, - C541, C542, C543, C544, C545, C546, C547, C548, C549, C550, - C551, C552, C553, C554, C555, C556, C557, C558, C559, C560, - C561, C562, C563, C564, C565, C566, C567, C568, C569, C570, - C571, C572, C573, C574, C575, C576, C577, C578, C579, C580, - C581, C582, C583, C584, C585, C586, C587, C588, C589, C590, - C591, C592, C593, C594, C595, C596, C597, C598, C599, C600, - C601, C602, C603, C604, C605, C606, C607, C608, C609, C610, - C611, C612, C613, C614, C615, C616, C617, C618, C619, C620, - C621, C622, C623, C624, C625, C626, C627, C628, C629, C630, - C631, C632, C633, C634, C635, C636, C637, C638, C639, C640, - C641, C642, C643, C644, C645, C646, C647, C648, C649, C650, - C651, C652, C653, C654, C655, C656, C657, C658, C659, C660, - C661, C662, C663, C664, C665, C666, C667, C668, C669, C670, - C671, C672, C673, C674, C675, C676, C677, C678, C679, C680, - C681, C682, C683, C684, C685, C686, C687, C688, C689, C690, - C691, C692, C693, C694, C695, C696, C697, C698, C699, C700, - C701, C702, C703, C704, C705, C706, C707, C708, C709, C710, - C711, C712, C713, C714, C715, C716, C717, C718, C719, C720, - C721, C722, C723, C724, C725, C726, C727, C728, C729, C730, - C731, C732, C733, C734, C735, C736, C737, C738, C739, C740, - C741, C742, C743, C744, C745, C746, C747, C748, C749, C750, - C751, C752, C753, C754, C755, C756, C757, C758, C759, C760, - C761, C762, C763, C764, C765, C766, C767, C768, C769, C770, - C771, C772, C773, C774, C775, C776, C777, C778, C779, C780, - C781, C782, C783, C784, C785, C786, C787, C788, C789, C790, - C791, C792, C793, C794, C795, C796, C797, C798, C799, C800, - C801, C802, C803, C804, C805, C806, C807, C808, C809, C810, - C811, C812, C813, C814, C815, C816, C817, C818, C819, C820, - C821, C822, C823, C824, C825, C826, C827, C828, C829, C830, - C831, C832, C833, C834, C835, C836, C837, C838, C839, C840, - C841, C842, C843, C844, C845, C846, C847, C848, C849, C850, - C851, C852, C853, C854, C855, C856, C857, C858, C859, C860, - C861, C862, C863, C864, C865, C866, C867, C868, C869, C870, - C871, C872, C873, C874, C875, C876, C877, C878, C879, C880, - C881, C882, C883, C884, C885, C886, C887, C888, C889, C890, - C891, C892, C893, C894, C895, C896, C897, C898, C899, C900, - C901, C902, C903, C904, C905, C906, C907, C908, C909, C910, - C911, C912, C913, C914, C915, C916, C917, C918, C919, C920, - C921, C922, C923, C924, C925, C926, C927, C928, C929, C930, - C931, C932, C933, C934, C935, C936, C937, C938, C939, C940, - C941, C942, C943, C944, C945, C946, C947, C948, C949, C950, - C951, C952, C953, C954, C955, C956, C957, C958, C959, C960, - C961, C962, C963, C964, C965, C966, C967, C968, C969, C970, - C971, C972, C973, C974, C975, C976, C977, C978, C979, C980, - C981, C982, C983, C984, C985, C986, C987, C988, C989, C990, - C991, C992, C993, C994, C995, C996, C997, C998, C999, C1000 - }; - - private static int totalNumCntrs = 1000; - - /** - * populated at runtime from hadoop counters at run time in the client. - */ - protected transient HashMap<String, Long> counters; - - /** - * keeps track of unique ProgressCounter enums used this value is used at - * compile time while assigning ProgressCounter enums to counter names. - */ - private static int lastEnumUsed; - - protected transient long inputRows = 0; - protected transient long outputRows = 0; - protected transient long beginTime = 0; - protected transient long totalTime = 0; - protected transient Object groupKeyObject; - /** - * this is called in operators in map or reduce tasks. - * - * @param name - * @param amount - */ - protected void incrCounter(String name, long amount) { - String counterName = getWrappedCounterName(name); - ProgressCounter pc = counterNameToEnum.get(counterName); - - // Currently, we maintain fixed number of counters per plan - in case of a - // bigger tree, we may run out of them - if (pc == null) { - LOG - .warn("Using too many counters. Increase the total number of counters for " - + counterName); - } else if (reporter != null) { - reporter.incrCounter(pc, amount); - } - } - - public ArrayList<String> getCounterNames() { - return counterNames; - } - - public void setCounterNames(ArrayList<String> counterNames) { - this.counterNames = counterNames; - } - public String getOperatorId() { return operatorId; } - public final String getWrappedCounterName(String ctrName) { - return String.format(counterNameFormat, getOperatorId(), ctrName); - } - public void initOperatorId() { setOperatorId(getName() + "_" + this.id); } @@ -1204,145 +967,6 @@ public abstract class Operator<T extends this.operatorId = operatorId; } - public HashMap<String, Long> getCounters() { - return counters; - } - - /** - * called in ExecDriver.progress periodically. - * - * @param ctrs - * counters from the running job - */ - @SuppressWarnings("unchecked") - public void updateCounters(Counters ctrs) { - if (counters == null) { - counters = new HashMap<String, Long>(); - } - - // For some old unit tests, the counters will not be populated. Eventually, - // the old tests should be removed - if (counterNameToEnum == null) { - return; - } - - for (Map.Entry<String, ProgressCounter> counter : counterNameToEnum - .entrySet()) { - counters.put(counter.getKey(), ctrs.getCounter(counter.getValue())); - } - // update counters of child operators - // this wont be an infinite loop since the operator graph is acyclic - // but, some operators may be updated more than once and that's ok - if (getChildren() != null) { - for (Node op : getChildren()) { - ((Operator<? extends OperatorDesc>) op).updateCounters(ctrs); - } - } - } - - /** - * Recursively check this operator and its descendants to see if the fatal - * error counter is set to non-zero. - * - * @param ctrs - */ - public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) { - if (counterNameToEnum == null) { - return false; - } - - String counterName = getWrappedCounterName(fatalErrorCntr); - ProgressCounter pc = counterNameToEnum.get(counterName); - - // Currently, we maintain fixed number of counters per plan - in case of a - // bigger tree, we may run out of them - if (pc == null) { - LOG - .warn("Using too many counters. Increase the total number of counters for " - + counterName); - } else { - long value = ctrs.getCounter(pc); - fatalErrorMessage(errMsg, value); - if (value != 0) { - return true; - } - } - - if (getChildren() != null) { - for (Node op : getChildren()) { - if (((Operator<? extends OperatorDesc>) op).checkFatalErrors(ctrs, - errMsg)) { - return true; - } - } - } - return false; - } - - /** - * Get the fatal error message based on counter's code. - * - * @param errMsg - * error message should be appended to this output parameter. - * @param counterValue - * input counter code. - */ - protected void fatalErrorMessage(StringBuilder errMsg, long counterValue) { - } - - // A given query can have multiple map-reduce jobs - public static void resetLastEnumUsed() { - lastEnumUsed = 0; - } - - /** - * Called only in SemanticAnalyzer after all operators have added their own - * set of counter names. - */ - public void assignCounterNameToEnum() { - if (counterNameToEnum != null) { - return; - } - counterNameToEnum = new HashMap<String, ProgressCounter>(); - for (String counterName : getCounterNames()) { - ++lastEnumUsed; - - // TODO Hack for hadoop-0.17 - // Currently, only maximum number of 'totalNumCntrs' can be used. If you - // want - // to add more counters, increase the number of counters in - // ProgressCounter - if (lastEnumUsed > totalNumCntrs) { - LOG - .warn("Using too many counters. Increase the total number of counters"); - return; - } - String enumName = "C" + lastEnumUsed; - ProgressCounter ctr = ProgressCounter.valueOf(enumName); - counterNameToEnum.put(counterName, ctr); - } - } - - protected static String numInputRowsCntr = "NUM_INPUT_ROWS"; - protected static String numOutputRowsCntr = "NUM_OUTPUT_ROWS"; - protected static String timeTakenCntr = "TIME_TAKEN"; - protected static String fatalErrorCntr = "FATAL_ERROR"; - private static String counterNameFormat = "CNTR_NAME_%s_%s"; - - public void initializeCounters() { - initOperatorId(); - counterNames = new ArrayList<String>(); - counterNames.add(getWrappedCounterName(numInputRowsCntr)); - counterNames.add(getWrappedCounterName(numOutputRowsCntr)); - counterNames.add(getWrappedCounterName(timeTakenCntr)); - counterNames.add(getWrappedCounterName(fatalErrorCntr)); - /* getAdditionalCounter should return Wrapped counters */ - List<String> newCntrs = getAdditionalCounters(); - if (newCntrs != null) { - counterNames.addAll(newCntrs); - } - } - /* * By default, the list is empty - if an operator wants to add more counters, * it should override this method and provide the new list. Counter names returned @@ -1353,15 +977,6 @@ public abstract class Operator<T extends return null; } - public HashMap<String, ProgressCounter> getCounterNameToEnum() { - return counterNameToEnum; - } - - public void setCounterNameToEnum( - HashMap<String, ProgressCounter> counterNameToEnum) { - this.counterNameToEnum = counterNameToEnum; - } - /** * Return the type of the specific operator among the * types in OperatorType. @@ -1435,10 +1050,10 @@ public abstract class Operator<T extends } } + @SuppressWarnings("unchecked") T descClone = (T)conf.clone(); Operator<? extends OperatorDesc> ret = - (Operator<? extends OperatorDesc>) OperatorFactory.getAndMakeChild( - descClone, getSchema(), parentClones); + OperatorFactory.getAndMakeChild(descClone, getSchema(), parentClones); return ret; } @@ -1564,25 +1179,6 @@ public abstract class Operator<T extends } /** - * Computes and retrieves the stats for this operator. Default implementation assumes same - * input/output size for operator. - * - * @return Statistics for this operator - */ - public Statistics getStatistics(HiveConf conf) throws HiveException { - Statistics stats = this.getConf().getStatistics(); - - if (stats == null) { - stats = new Statistics(); - for (Operator<? extends OperatorDesc> parent: this.getParentOperators()) { - stats.addNumberOfBytes(parent.getStatistics(conf).getNumberOfBytes()); - } - this.getConf().setStatistics(stats); - } - return stats; - } - - /** * used for LimitPushdownOptimizer * * if all of the operators between limit and reduce-sink does not remove any input rows @@ -1636,4 +1232,22 @@ public abstract class Operator<T extends } return false; } + + public Statistics getStatistics() { + if (conf != null) { + return conf.getStatistics(); + } + return null; + } + + public void setStatistics(Statistics stats) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting stats ("+stats+") on "+this); + } + if (conf != null) { + conf.setStatistics(stats); + } else { + LOG.warn("Cannot set stats when there's no descriptor: "+this); + } + } }
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1545564&r1=1545563&r2=1545564&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Tue Nov 26 08:19:25 2013 @@ -137,7 +137,6 @@ public final class OperatorFactory { Operator<T> op = (Operator<T>) o.opClass.getDeclaredConstructor( VectorizationContext.class, OperatorDesc.class).newInstance( vContext, conf); - op.initializeCounters(); return op; } catch (Exception e) { e.printStackTrace(); @@ -155,7 +154,6 @@ public final class OperatorFactory { if (o.descClass == opClass) { try { Operator<T> op = (Operator<T>) o.opClass.newInstance(); - op.initializeCounters(); return op; } catch (Exception e) { e.printStackTrace(); Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1545564&r1=1545563&r2=1545564&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Tue Nov 26 08:19:25 2013 @@ -363,13 +363,6 @@ public class ReduceSinkOperator extends if (null != out) { out.collect(keyWritable, valueWritable); } - if (++outputRows % 1000 == 0) { - if (counterNameToEnum != null) { - incrCounter(numOutputRowsCntr, outputRows); - } - increaseForward(outputRows); - outputRows = 0; - } } private BytesWritable makeValueWritable(Object row) throws Exception { Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=1545564&r1=1545563&r2=1545564&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Tue Nov 26 08:19:25 2013 @@ -550,7 +550,7 @@ public class SMBMapJoinOperator extends fetchDone[tag] = true; return; } - forwardOp.process(row.o, tag); + forwardOp.processOp(row.o, tag); // check if any operator had a fatal error or early exit during // execution if (forwardOp.getDone()) { @@ -795,7 +795,7 @@ public class SMBMapJoinOperator extends // Pass the row though the operator tree. It is guaranteed that not more than 1 row can // be produced from a input row. - forwardOp.process(nextRow.o, 0); + forwardOp.processOp(nextRow.o, 0); nextRow = sinkOp.getResult(); // It is possible that the row got absorbed in the operator tree. Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java?rev=1545564&r1=1545563&r2=1545564&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java Tue Nov 26 08:19:25 2013 @@ -28,9 +28,7 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.Warehouse; @@ -178,11 +176,11 @@ public class StatsTask extends Task<Stat if (!this.getWork().getNoStatsAggregator()) { String statsImplementationClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS); - StatsFactory.setImplementation(statsImplementationClass, conf); - if (work.isNoScanAnalyzeCommand()){ + StatsFactory factory = StatsFactory.newFactory(statsImplementationClass, conf); + if (factory != null && work.isNoScanAnalyzeCommand()){ // initialize stats publishing table for noscan which has only stats task // the rest of MR task following stats task initializes it in ExecDriver.java - StatsPublisher statsPublisher = StatsFactory.getStatsPublisher(); + StatsPublisher statsPublisher = factory.getStatsPublisher(); if (!statsPublisher.init(conf)) { // creating stats table if not exists if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) { throw @@ -190,10 +188,12 @@ public class StatsTask extends Task<Stat } } } - statsAggregator = StatsFactory.getStatsAggregator(); - // manufacture a StatsAggregator - if (!statsAggregator.connect(conf)) { - throw new HiveException("StatsAggregator connect failed " + statsImplementationClass); + if (factory != null) { + statsAggregator = factory.getStatsAggregator(); + // manufacture a StatsAggregator + if (!statsAggregator.connect(conf, getWork().getSourceTask())) { + throw new HiveException("StatsAggregator connect failed " + statsImplementationClass); + } } } @@ -377,7 +377,7 @@ public class StatsTask extends Task<Stat if (work.getLoadTableDesc() != null && !work.getLoadTableDesc().getReplace()) { String originalValue = parameters.get(statType); - if (originalValue != null) { + if (originalValue != null && !originalValue.equals("-1")) { longValue += Long.parseLong(originalValue); } } @@ -445,19 +445,4 @@ public class StatsTask extends Task<Stat } return list; } - - /** - * This method is static as it is called from the shutdown hook at the ExecDriver. - */ - public static void cleanUp(String jobID, Configuration config) { - StatsAggregator statsAggregator; - String statsImplementationClass = HiveConf.getVar(config, HiveConf.ConfVars.HIVESTATSDBCLASS); - StatsFactory.setImplementation(statsImplementationClass, config); - statsAggregator = StatsFactory.getStatsAggregator(); - if (statsAggregator.connect(config)) { - statsAggregator.cleanUp(jobID + Path.SEPARATOR); // Adding the path separator to avoid an Id - // being a prefix of another ID - statsAggregator.closeConnection(); - } - } } Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java?rev=1545564&r1=1545563&r2=1545564&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java Tue Nov 26 08:19:25 2013 @@ -322,18 +322,6 @@ public class TableScanOperator extends O } @Override - public Statistics getStatistics(HiveConf conf) throws HiveException { - Statistics stats = this.getConf().getStatistics(); - if (stats == null) { - stats = new Statistics(); - stats.addNumberOfBytes(Utilities.getSize(alias, getConf().getTable(), conf, - this, getConf().getPruningPredicate())); - this.getConf().setStatistics(stats); - } - return stats; - } - - @Override public boolean supportSkewJoinOptimization() { return true; } Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1545564&r1=1545563&r2=1545564&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Tue Nov 26 08:19:25 2013 @@ -837,7 +837,6 @@ public final class Utilities { // workaround for java 1.5 e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate()); e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate()); - e.setPersistenceDelegate(Operator.ProgressCounter.class, new EnumDelegate()); e.setPersistenceDelegate(java.sql.Date.class, new DatePersistenceDelegate()); e.setPersistenceDelegate(Timestamp.class, new TimestampPersistenceDelegate()); @@ -2155,12 +2154,6 @@ public final class Utilities { HiveStorageHandler handler = HiveUtils.getStorageHandler(myConf, partDesc.getOverlayedProperties().getProperty( hive_metastoreConstants.META_TABLE_STORAGE)); - if (handler == null) { - // native table - FileSystem fs = p.getFileSystem(myConf); - resultMap.put(pathStr, fs.getContentSummary(p)); - return; - } if (handler instanceof InputEstimator) { long total = 0; TableDesc tableDesc = partDesc.getTableDesc(); @@ -2176,6 +2169,8 @@ public final class Utilities { } resultMap.put(pathStr, new ContentSummary(total, -1, -1)); } + FileSystem fs = p.getFileSystem(myConf); + resultMap.put(pathStr, fs.getContentSummary(p)); } catch (Exception e) { // We safely ignore this exception for summary data. // We don't update the cache to protect it from polluting other @@ -2344,12 +2339,8 @@ public final class Utilities { } public static StatsPublisher getStatsPublisher(JobConf jc) { - String statsImplementationClass = HiveConf.getVar(jc, HiveConf.ConfVars.HIVESTATSDBCLASS); - if (StatsFactory.setImplementation(statsImplementationClass, jc)) { - return StatsFactory.getStatsPublisher(); - } else { - return null; - } + StatsFactory factory = StatsFactory.newFactory(jc); + return factory == null ? null : factory.getStatsPublisher(); } /** Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1545564&r1=1545563&r2=1545564&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Tue Nov 26 08:19:25 2013 @@ -82,7 +82,6 @@ import org.apache.hadoop.mapred.Counters import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Partitioner; import org.apache.hadoop.mapred.RunningJob; import org.apache.log4j.Appender; @@ -185,17 +184,10 @@ public class ExecDriver extends Task<Map * @return true if fatal errors happened during job execution, false otherwise. */ public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) { - for (Operator<? extends OperatorDesc> op : work.getMapWork().getAliasToWork().values()) { - if (op.checkFatalErrors(ctrs, errMsg)) { - return true; - } - } - if (work.getReduceWork() != null) { - if (work.getReduceWork().getReducer().checkFatalErrors(ctrs, errMsg)) { - return true; - } - } - return false; + Counters.Counter cntr = ctrs.findCounter( + HiveConf.getVar(job, HiveConf.ConfVars.HIVECOUNTERGROUP), + Operator.HIVECOUNTERFATAL); + return cntr != null && cntr.getValue() > 0; } /** @@ -414,9 +406,9 @@ public class ExecDriver extends Task<Map if (mWork.isGatheringStats() || (rWork != null && rWork.isGatheringStats())) { // initialize stats publishing table StatsPublisher statsPublisher; - String statsImplementationClass = HiveConf.getVar(job, HiveConf.ConfVars.HIVESTATSDBCLASS); - if (StatsFactory.setImplementation(statsImplementationClass, job)) { - statsPublisher = StatsFactory.getStatsPublisher(); + StatsFactory factory = StatsFactory.newFactory(job); + if (factory != null) { + statsPublisher = factory.getStatsPublisher(); if (!statsPublisher.init(job)) { // creating stats table if not exists if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) { throw @@ -816,16 +808,6 @@ public class ExecDriver extends Task<Map } @Override - public void updateCounters(Counters ctrs, RunningJob rj) throws IOException { - for (Operator<? extends OperatorDesc> op : work.getMapWork().getAliasToWork().values()) { - op.updateCounters(ctrs); - } - if (work.getReduceWork() != null) { - work.getReduceWork().getReducer().updateCounters(ctrs); - } - } - - @Override public void logPlanProgress(SessionState ss) throws IOException { ss.getHiveHistory().logPlanProgress(queryPlan); } Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1545564&r1=1545563&r2=1545564&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java Tue Nov 26 08:19:25 2013 @@ -258,7 +258,7 @@ public class ExecReducer extends MapRedu } } try { - reducer.process(row, tag); + reducer.processOp(row, tag); } catch (Exception e) { String rowString = null; try { Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java?rev=1545564&r1=1545563&r2=1545564&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java Tue Nov 26 08:19:25 2013 @@ -34,8 +34,9 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.MapRedStats; -import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter; +import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskHandle; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -80,14 +81,6 @@ public class HadoopJobExecHelper { reduceProgress = reduceProgress == 100 ? (int)Math.floor(rj.reduceProgress() * 100) : reduceProgress; task.taskCounters.put("CNTR_NAME_" + task.getId() + "_MAP_PROGRESS", Long.valueOf(mapProgress)); task.taskCounters.put("CNTR_NAME_" + task.getId() + "_REDUCE_PROGRESS", Long.valueOf(reduceProgress)); - if (ctrs == null) { - // hadoop might return null if it cannot locate the job. - // we may still be able to retrieve the job status - so ignore - return; - } - if(callBackObj != null) { - callBackObj.updateCounters(ctrs, rj); - } } /** @@ -200,6 +193,7 @@ public class HadoopJobExecHelper { } } + @SuppressWarnings("deprecation") public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) { if (ctrs == null) { // hadoop might return null if it cannot locate the job. @@ -207,7 +201,9 @@ public class HadoopJobExecHelper { return false; } // check for number of created files - long numFiles = ctrs.getCounter(ProgressCounter.CREATED_FILES); + Counters.Counter cntr = ctrs.findCounter(HiveConf.getVar(job, ConfVars.HIVECOUNTERGROUP), + Operator.HIVECOUNTERCREATEDFILES); + long numFiles = cntr != null ? cntr.getValue() : 0; long upperLimit = HiveConf.getLongVar(job, HiveConf.ConfVars.MAXCREATEDFILES); if (numFiles > upperLimit) { errMsg.append("total number of created files now is " + numFiles + ", which exceeds ").append(upperLimit); Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHook.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHook.java?rev=1545564&r1=1545563&r2=1545564&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHook.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHook.java Tue Nov 26 08:19:25 2013 @@ -22,12 +22,10 @@ import java.io.IOException; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.mapred.Counters; -import org.apache.hadoop.mapred.RunningJob; @SuppressWarnings("deprecation") public interface HadoopJobExecHook { - public void updateCounters(Counters ctrs, RunningJob rj) throws IOException; public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg); public void logPlanProgress(SessionState ss) throws IOException; Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java?rev=1545564&r1=1545563&r2=1545564&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java Tue Nov 26 08:19:25 2013 @@ -230,8 +230,7 @@ public class MapredLocalTask extends Tas if(ShimLoader.getHadoopShims().isSecurityEnabled() && - conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS) == true - ){ + ShimLoader.getHadoopShims().isLoginKeytabBased()) { //If kerberos security is enabled, and HS2 doAs is enabled, // then additional params need to be set so that the command is run as // intended user @@ -359,7 +358,7 @@ public class MapredLocalTask extends Tas forwardOp.close(false); break; } - forwardOp.process(row.o, 0); + forwardOp.processOp(row.o, 0); // check if any operator had a fatal error or early exit during // execution if (forwardOp.getDone()) { Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1545564&r1=1545563&r2=1545564&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Tue Nov 26 08:19:25 2013 @@ -619,9 +619,9 @@ public class DagUtils { // initialize stats publisher if necessary if (work.isGatheringStats()) { StatsPublisher statsPublisher; - String statsImplementationClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS); - if (StatsFactory.setImplementation(statsImplementationClass, conf)) { - statsPublisher = StatsFactory.getStatsPublisher(); + StatsFactory factory = StatsFactory.newFactory(conf); + if (factory != null) { + statsPublisher = factory.getStatsPublisher(); if (!statsPublisher.init(conf)) { // creating stats table if not exists if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) { throw Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1545564&r1=1545563&r2=1545564&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Tue Nov 26 08:19:25 2013 @@ -295,7 +295,7 @@ public class ReduceRecordProcessor exte row.add(valueObj); try { - reducer.process(row, tag); + reducer.processOp(row, tag); } catch (Exception e) { String rowString = null; try { Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java?rev=1545564&r1=1545563&r2=1545564&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java Tue Nov 26 08:19:25 2013 @@ -98,16 +98,6 @@ public class VectorFileSinkOperator exte } } - // Since File Sink is a terminal operator, forward is not called - so, - // maintain the number of output rows explicitly - if (counterNameToEnum != null) { - ++outputRows; - if (outputRows % 1000 == 0) { - incrCounter(numOutputRowsCntr, outputRows); - outputRows = 0; - } - } - try { updateProgress(); Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java?rev=1545564&r1=1545563&r2=1545564&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java Tue Nov 26 08:19:25 2013 @@ -30,6 +30,7 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.KeyWrapper; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; @@ -100,16 +101,22 @@ public class VectorGroupByOperator exten * Sum of batch size processed (ie. rows). */ private transient long sumBatchSize; + + /** + * Max number of entries in the vector group by aggregation hashtables. + * Exceeding this will trigger a flush irrelevant of memory pressure condition. + */ + private transient int maxHtEntries = 1000000; /** * The number of new entries that must be added to the hashtable before a memory size check. */ - private static final int FLUSH_CHECK_THRESHOLD = 10000; + private transient int checkInterval = 10000; /** * Percent of entries to flush when memory threshold exceeded. */ - private static final float PERCENT_ENTRIES_TO_FLUSH = 0.1f; + private transient float percentEntriesToFlush = 0.1f; /** * The global key-aggregation hash map. @@ -139,6 +146,16 @@ public class VectorGroupByOperator exten @Override protected void initializeOp(Configuration hconf) throws HiveException { + + // hconf is null in unit testing + if (null != hconf) { + this.percentEntriesToFlush = HiveConf.getFloatVar(hconf, + HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_FLUSH_PERCENT); + this.checkInterval = HiveConf.getIntVar(hconf, + HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_CHECKINTERVAL); + this.maxHtEntries = HiveConf.getIntVar(hconf, + HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_MAXENTRIES); + } List<ObjectInspector> objectInspectors = new ArrayList<ObjectInspector>(); @@ -226,8 +243,21 @@ public class VectorGroupByOperator exten processAggregators(batch); //Flush if memory limits were reached - if (shouldFlush(batch)) { + // We keep flushing until the memory is under threshold + int preFlushEntriesCount = numEntriesHashTable; + while (shouldFlush(batch)) { flush(false); + + //Validate that some progress is being made + if (!(numEntriesHashTable < preFlushEntriesCount)) { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Flush did not progress: %d entries before, %d entries after", + preFlushEntriesCount, + numEntriesHashTable)); + } + break; + } + preFlushEntriesCount = numEntriesHashTable; } if (sumBatchSize == 0 && 0 != batch.size) { @@ -247,7 +277,7 @@ public class VectorGroupByOperator exten private void flush(boolean all) throws HiveException { int entriesToFlush = all ? numEntriesHashTable : - (int)(numEntriesHashTable * PERCENT_ENTRIES_TO_FLUSH); + (int)(numEntriesHashTable * this.percentEntriesToFlush); int entriesFlushed = 0; if (LOG.isDebugEnabled()) { @@ -309,14 +339,18 @@ public class VectorGroupByOperator exten * Returns true if the memory threshold for the hash table was reached. */ private boolean shouldFlush(VectorizedRowBatch batch) { - if (numEntriesSinceCheck < FLUSH_CHECK_THRESHOLD || - batch.size == 0) { + if (batch.size == 0) { return false; } - // Were going to update the average variable row size by sampling the current batch - updateAvgVariableSize(batch); - numEntriesSinceCheck = 0; - return numEntriesHashTable * (fixedHashEntrySize + avgVariableSize) > maxHashTblMemory; + //numEntriesSinceCheck is the number of entries added to the hash table + // since the last time we checked the average variable size + if (numEntriesSinceCheck >= this.checkInterval) { + // Were going to update the average variable row size by sampling the current batch + updateAvgVariableSize(batch); + numEntriesSinceCheck = 0; + } + return numEntriesHashTable > this.maxHtEntries || + numEntriesHashTable * (fixedHashEntrySize + avgVariableSize) > maxHashTblMemory; } /** Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriter.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriter.java?rev=1545564&r1=1545563&r2=1545564&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriter.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriter.java Tue Nov 26 08:19:25 2013 @@ -33,4 +33,6 @@ public interface VectorExpressionWriter Object writeValue(long value) throws HiveException; Object writeValue(double value) throws HiveException; Object writeValue(byte[] value, int start, int length) throws HiveException; + Object setValue(Object row, ColumnVector column, int columnRow) throws HiveException; + Object initValue(Object ost) throws HiveException; }
