Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Sun Oct 5 22:26:43 2014 @@ -29,6 +29,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -38,13 +40,13 @@ import org.apache.hadoop.hive.common.Sta import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.io.RecordUpdater; -import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.io.HivePartitioner; import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat; +import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveFatalException; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; @@ -72,14 +74,16 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.ReflectionUtils; -import com.google.common.collect.Lists; - /** * File Sink operator implementation. **/ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements Serializable { + public static final Log LOG = LogFactory.getLog(FileSinkOperator.class); + private static final boolean isInfoEnabled = LOG.isInfoEnabled(); + private static final boolean isDebugEnabled = LOG.isDebugEnabled(); + protected transient HashMap<String, FSPaths> valToPaths; protected transient int numDynParts; protected transient List<String> dpColNames; @@ -101,10 +105,6 @@ public class FileSinkOperator extends Te protected transient boolean isCollectRWStats; private transient FSPaths prevFsp; private transient FSPaths fpaths; - private transient ObjectInspector keyOI; - private transient List<Object> keyWritables; - private transient List<String> keys; - private transient int numKeyColToRead; private StructField recIdField; // field to find record identifier in private StructField bucketField; // field bucket is in in record id private StructObjectInspector recIdInspector; // OI for inspecting record id @@ -131,9 +131,6 @@ public class FileSinkOperator extends Te int acidLastBucket = -1; int acidFileOffset = -1; - public FSPaths() { - } - public FSPaths(Path specPath) { tmpPath = Utilities.toTempPath(specPath); taskOutputTempPath = Utilities.toTaskTempPath(specPath); @@ -141,7 +138,9 @@ public class FileSinkOperator extends Te finalPaths = new Path[numFiles]; outWriters = new RecordWriter[numFiles]; updaters = new RecordUpdater[numFiles]; - LOG.debug("Created slots for " + numFiles); + if (isDebugEnabled) { + LOG.debug("Created slots for " + numFiles); + } stat = new Stat(); } @@ -326,7 +325,6 @@ public class FileSinkOperator extends Te parent = Utilities.toTempPath(conf.getDirName()); statsCollectRawDataSize = conf.isStatsCollectRawDataSize(); statsFromRecordWriter = new boolean[numFiles]; - serializer = (Serializer) conf.getTableInfo().getDeserializerClass().newInstance(); serializer.initialize(null, conf.getTableInfo().getProperties()); outputClass = serializer.getSerializedClass(); @@ -363,20 +361,6 @@ public class FileSinkOperator extends Te lbSetup(); } - int numPart = 0; - int numBuck = 0; - if (conf.getPartitionCols() != null && !conf.getPartitionCols().isEmpty()) { - numPart = conf.getPartitionCols().size(); - } - - // bucket number will exists only in PARTITION_BUCKET_SORTED mode - if (conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) { - numBuck = 1; - } - numKeyColToRead = numPart + numBuck; - keys = Lists.newArrayListWithCapacity(numKeyColToRead); - keyWritables = Lists.newArrayListWithCapacity(numKeyColToRead); - if (!bDynParts) { fsp = new FSPaths(specPath); @@ -423,7 +407,8 @@ public class FileSinkOperator extends Te this.dpColNames = dpCtx.getDPColNames(); this.maxPartitions = dpCtx.getMaxPartitionsPerNode(); - assert numDynParts == dpColNames.size() : "number of dynamic paritions should be the same as the size of DP mapping"; + assert numDynParts == dpColNames.size() + : "number of dynamic paritions should be the same as the size of DP mapping"; if (dpColNames != null && dpColNames.size() > 0) { this.bDynParts = true; @@ -441,6 +426,9 @@ public class FileSinkOperator extends Te newFieldsOI.add(sf.getFieldObjectInspector()); newFieldsName.add(sf.getFieldName()); this.dpStartCol++; + } else { + // once we found the start column for partition column we are done + break; } } assert newFieldsOI.size() > 0 : "new Fields ObjectInspector is empty"; @@ -457,11 +445,15 @@ public class FileSinkOperator extends Te Set<Integer> seenBuckets = new HashSet<Integer>(); for (int idx = 0; idx < totalFiles; idx++) { if (this.getExecContext() != null && this.getExecContext().getFileId() != null) { - LOG.info("replace taskId from execContext "); + if (isInfoEnabled) { + LOG.info("replace taskId from execContext "); + } taskId = Utilities.replaceTaskIdFromFilename(taskId, this.getExecContext().getFileId()); - LOG.info("new taskId: FS " + taskId); + if (isInfoEnabled) { + LOG.info("new taskId: FS " + taskId); + } assert !multiFileSpray; assert totalFiles == 1; @@ -515,9 +507,13 @@ public class FileSinkOperator extends Te try { if (isNativeTable) { fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, fsp.tmpPath, null); - LOG.info("Final Path: FS " + fsp.finalPaths[filesIdx]); + if (isInfoEnabled) { + LOG.info("Final Path: FS " + fsp.finalPaths[filesIdx]); + } fsp.outPaths[filesIdx] = fsp.getTaskOutPath(taskId); - LOG.info("Writing to temp file: FS " + fsp.outPaths[filesIdx]); + if (isInfoEnabled) { + LOG.info("Writing to temp file: FS " + fsp.outPaths[filesIdx]); + } } else { fsp.finalPaths[filesIdx] = fsp.outPaths[filesIdx] = specPath; } @@ -532,7 +528,9 @@ public class FileSinkOperator extends Te fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, fsp.tmpPath, extension); } - LOG.info("New Final Path: FS " + fsp.finalPaths[filesIdx]); + if (isInfoEnabled) { + LOG.info("New Final Path: FS " + fsp.finalPaths[filesIdx]); + } if (isNativeTable) { // in recent hadoop versions, use deleteOnExit to clean tmp files. @@ -604,14 +602,22 @@ public class FileSinkOperator extends Te updateProgress(); // if DP is enabled, get the final output writers and prepare the real output row - assert inputObjInspectors[0].getCategory() == ObjectInspector.Category.STRUCT : "input object inspector is not struct"; + assert inputObjInspectors[0].getCategory() == ObjectInspector.Category.STRUCT + : "input object inspector is not struct"; if (bDynParts) { + + // we need to read bucket number which is the last column in value (after partition columns) + if (conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) { + numDynParts += 1; + } + // copy the DP column values from the input row to dpVals dpVals.clear(); dpWritables.clear(); - ObjectInspectorUtils.partialCopyToStandardObject(dpWritables, row, dpStartCol, numDynParts, - (StructObjectInspector) inputObjInspectors[0], ObjectInspectorCopyOption.WRITABLE); + ObjectInspectorUtils.partialCopyToStandardObject(dpWritables, row, dpStartCol,numDynParts, + (StructObjectInspector) inputObjInspectors[0],ObjectInspectorCopyOption.WRITABLE); + // get a set of RecordWriter based on the DP column values // pass the null value along to the escaping process to determine what the dir should be for (Object o : dpWritables) { @@ -621,16 +627,11 @@ public class FileSinkOperator extends Te dpVals.add(o.toString()); } } - // use SubStructObjectInspector to serialize the non-partitioning columns in the input row - recordValue = serializer.serialize(row, subSetOI); - // when dynamic partition sorting is not used, the DPSortState will be NONE - // in which we will fall back to old method of file system path creation - // i.e, having as many record writers as distinct values in partition column - if (conf.getDpSortState().equals(DPSortState.NONE)) { - fpaths = getDynOutPaths(dpVals, lbDirName); - } + fpaths = getDynOutPaths(dpVals, lbDirName); + // use SubStructObjectInspector to serialize the non-partitioning columns in the input row + recordValue = serializer.serialize(row, subSetOI); } else { if (lbDirName != null) { fpaths = lookupListBucketingPaths(lbDirName); @@ -686,8 +687,10 @@ public class FileSinkOperator extends Te fpaths.updaters[++fpaths.acidFileOffset] = HiveFileFormatUtils.getAcidRecordUpdater( jc, conf.getTableInfo(), bucketNum, conf, fpaths.outPaths[fpaths.acidFileOffset], rowInspector, reporter, 0); - LOG.debug("Created updater for bucket number " + bucketNum + " using file " + - fpaths.outPaths[fpaths.acidFileOffset]); + if (isDebugEnabled) { + LOG.debug("Created updater for bucket number " + bucketNum + " using file " + + fpaths.outPaths[fpaths.acidFileOffset]); + } } if (conf.getWriteType() == AcidUtils.Operation.UPDATE) { @@ -834,10 +837,8 @@ public class FileSinkOperator extends Te if (dpDir != null) { dpDir = appendToSource(lbDirName, dpDir); pathKey = dpDir; - int numericBucketNum = 0; if(conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) { String buckNum = row.get(row.size() - 1); - numericBucketNum = Integer.valueOf(buckNum); taskId = Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), buckNum); pathKey = appendToSource(taskId, dpDir); } @@ -918,26 +919,6 @@ public class FileSinkOperator extends Te } @Override - public void startGroup() throws HiveException { - if (!conf.getDpSortState().equals(DPSortState.NONE)) { - keyOI = getGroupKeyObjectInspector(); - keys.clear(); - keyWritables.clear(); - ObjectInspectorUtils.partialCopyToStandardObject(keyWritables, getGroupKeyObject(), 0, - numKeyColToRead, (StructObjectInspector) keyOI, ObjectInspectorCopyOption.WRITABLE); - - for (Object o : keyWritables) { - if (o == null || o.toString().length() == 0) { - keys.add(dpCtx.getDefaultPartitionName()); - } else { - keys.add(o.toString()); - } - } - fpaths = getDynOutPaths(keys, null); - } - } - - @Override public void closeOp(boolean abort) throws HiveException { if (!bDynParts && !filesCreated) {
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java Sun Oct 5 22:26:43 2014 @@ -76,7 +76,7 @@ public class FilterOperator extends Oper statsMap.put(Counter.FILTERED, filtered_count); statsMap.put(Counter.PASSED, passed_count); conditionInspector = null; - ioContext = IOContext.get(); + ioContext = IOContext.get(hconf.get(Utilities.INPUT_NAME)); } catch (Throwable e) { throw new HiveException(e); } Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java Sun Oct 5 22:26:43 2014 @@ -639,6 +639,14 @@ public final class FunctionRegistry { } } + public static String getNormalizedFunctionName(String fn) { + // Does the same thing as getFunctionInfo, except for getting the function info. + fn = fn.toLowerCase(); + return (FunctionUtils.isQualifiedFunctionName(fn) || mFunctions.get(fn) != null) ? fn + : FunctionUtils.qualifyFunctionName( + fn, SessionState.get().getCurrentDatabase().toLowerCase()); + } + private static <T extends CommonFunctionInfo> T getFunctionInfo( Map<String, T> mFunctions, String functionName) { functionName = functionName.toLowerCase(); @@ -861,15 +869,7 @@ public final class FunctionRegistry { TypeInfoUtils.getCharacterLengthForType(b)); return TypeInfoFactory.getVarcharTypeInfo(maxLength); case DECIMAL: - int prec1 = HiveDecimalUtils.getPrecisionForType(a); - int prec2 = HiveDecimalUtils.getPrecisionForType(b); - int scale1 = HiveDecimalUtils.getScaleForType(a); - int scale2 = HiveDecimalUtils.getScaleForType(b); - int intPart = Math.max(prec1 - scale1, prec2 - scale2); - int decPart = Math.max(scale1, scale2); - int prec = Math.min(intPart + decPart, HiveDecimal.MAX_PRECISION); - int scale = Math.min(decPart, HiveDecimal.MAX_PRECISION - intPart); - return TypeInfoFactory.getDecimalTypeInfo(prec, scale); + return HiveDecimalUtils.getDecimalTypeForPrimitiveCategories(a, b); default: // Type doesn't require any qualifiers. return TypeInfoFactory.getPrimitiveTypeInfo( Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Sun Oct 5 22:26:43 2014 @@ -77,6 +77,7 @@ public class GroupByOperator extends Ope private static final Log LOG = LogFactory.getLog(GroupByOperator.class .getName()); + private static final boolean isTraceEnabled = LOG.isTraceEnabled(); private static final long serialVersionUID = 1L; private static final int NUMROWSESTIMATESIZE = 1000; @@ -101,6 +102,7 @@ public class GroupByOperator extends Ope transient ExprNodeEvaluator unionExprEval = null; transient GenericUDAFEvaluator[] aggregationEvaluators; + transient boolean[] estimableAggregationEvaluators; protected transient ArrayList<ObjectInspector> objectInspectors; transient ArrayList<String> fieldNames; @@ -442,10 +444,10 @@ public class GroupByOperator extends Ope estimateRowSize(); } - private static final int javaObjectOverHead = 64; - private static final int javaHashEntryOverHead = 64; - private static final int javaSizePrimitiveType = 16; - private static final int javaSizeUnknownType = 256; + public static final int javaObjectOverHead = 64; + public static final int javaHashEntryOverHead = 64; + public static final int javaSizePrimitiveType = 16; + public static final int javaSizeUnknownType = 256; /** * The size of the element at position 'pos' is returned, if possible. If the @@ -557,11 +559,13 @@ public class GroupByOperator extends Ope // Go over all the aggregation classes and and get the size of the fields of // fixed length. Keep track of the variable length // fields in these aggregation classes. + estimableAggregationEvaluators = new boolean[aggregationEvaluators.length]; for (int i = 0; i < aggregationEvaluators.length; i++) { fixedRowSize += javaObjectOverHead; AggregationBuffer agg = aggregationEvaluators[i].getNewAggregationBuffer(); if (GenericUDAFEvaluator.isEstimable(agg)) { + estimableAggregationEvaluators[i] = true; continue; } Field[] fArr = ObjectInspectorUtils.getDeclaredNonStaticFields(agg.getClass()); @@ -765,10 +769,12 @@ public class GroupByOperator extends Ope flushHashTable(true); hashAggr = false; } else { - LOG.trace("Hash Aggr Enabled: #hash table = " + numRowsHashTbl - + " #total = " + numRowsInput + " reduction = " + 1.0 - * (numRowsHashTbl / numRowsInput) + " minReduction = " - + minReductionHashAggr); + if (isTraceEnabled) { + LOG.trace("Hash Aggr Enabled: #hash table = " + numRowsHashTbl + + " #total = " + numRowsInput + " reduction = " + 1.0 + * (numRowsHashTbl / numRowsInput) + " minReduction = " + + minReductionHashAggr); + } } } } @@ -952,7 +958,7 @@ public class GroupByOperator extends Ope AggregationBuffer[] aggs = hashAggregations.get(newKeys); for (int i = 0; i < aggs.length; i++) { AggregationBuffer agg = aggs[i]; - if (GenericUDAFEvaluator.isEstimable(agg)) { + if (estimableAggregationEvaluators[i]) { totalVariableSize += ((GenericUDAFEvaluator.AbstractAggregationBuffer)agg).estimate(); continue; } @@ -966,8 +972,10 @@ public class GroupByOperator extends Ope // Update the number of entries that can fit in the hash table numEntriesHashTable = (int) (maxHashTblMemory / (fixedRowSize + (totalVariableSize / numEntriesVarSize))); - LOG.trace("Hash Aggr: #hash table = " + numEntries - + " #max in hash table = " + numEntriesHashTable); + if (isTraceEnabled) { + LOG.trace("Hash Aggr: #hash table = " + numEntries + + " #max in hash table = " + numEntriesHashTable); + } } // flush if necessary Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Sun Oct 5 22:26:43 2014 @@ -171,8 +171,9 @@ public class MapJoinOperator extends Abs private void loadHashTable() throws HiveException { - if (this.getExecContext().getLocalWork() == null - || !this.getExecContext().getLocalWork().getInputFileChangeSensitive()) { + if ((this.getExecContext() != null) + && ((this.getExecContext().getLocalWork() == null) || (!this.getExecContext() + .getLocalWork().getInputFileChangeSensitive()))) { if (hashTblInitedOnce) { return; } else { @@ -313,8 +314,8 @@ public class MapJoinOperator extends Abs tableContainer.dumpMetrics(); } } - if ((this.getExecContext().getLocalWork() != null - && this.getExecContext().getLocalWork().getInputFileChangeSensitive()) + if ((this.getExecContext() != null) && (this.getExecContext().getLocalWork() != null) + && (this.getExecContext().getLocalWork().getInputFileChangeSensitive()) && mapJoinTables != null) { for (MapJoinTableContainer tableContainer : mapJoinTables) { if (tableContainer != null) { Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Sun Oct 5 22:26:43 2014 @@ -33,9 +33,10 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; -import org.apache.hadoop.hive.ql.io.IOContext; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; +import org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor; import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.io.IOContext; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.plan.MapWork; @@ -181,7 +182,7 @@ public class MapOperator extends Operato PartitionDesc pd = ctx.partDesc; TableDesc td = pd.getTableDesc(); - + MapOpCtx opCtx = new MapOpCtx(); // Use table properties in case of unpartitioned tables, // and the union of table properties and partition properties, with partition @@ -205,42 +206,42 @@ public class MapOperator extends Operato opCtx.partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter( partRawRowObjectInspector, opCtx.tblRawRowObjectInspector); - + // Next check if this table has partitions and if so // get the list of partition names as well as allocate // the serdes for the partition columns String pcols = overlayedProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS); - + if (pcols != null && pcols.length() > 0) { String[] partKeys = pcols.trim().split("/"); String pcolTypes = overlayedProps .getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES); String[] partKeyTypes = pcolTypes.trim().split(":"); - + if (partKeys.length > partKeyTypes.length) { throw new HiveException("Internal error : partKeys length, " +partKeys.length + " greater than partKeyTypes length, " + partKeyTypes.length); } - + List<String> partNames = new ArrayList<String>(partKeys.length); Object[] partValues = new Object[partKeys.length]; List<ObjectInspector> partObjectInspectors = new ArrayList<ObjectInspector>(partKeys.length); - + for (int i = 0; i < partKeys.length; i++) { String key = partKeys[i]; partNames.add(key); ObjectInspector oi = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector (TypeInfoFactory.getPrimitiveTypeInfo(partKeyTypes[i])); - + // Partitions do not exist for this table if (partSpec == null) { // for partitionless table, initialize partValue to null partValues[i] = null; } else { - partValues[i] = + partValues[i] = ObjectInspectorConverters. getConverter(PrimitiveObjectInspectorFactory. - javaStringObjectInspector, oi).convert(partSpec.get(key)); + javaStringObjectInspector, oi).convert(partSpec.get(key)); } partObjectInspectors.add(oi); } @@ -337,13 +338,8 @@ public class MapOperator extends Operato return tableDescOI; } - private boolean isPartitioned(PartitionDesc pd) { - return pd.getPartSpec() != null && !pd.getPartSpec().isEmpty(); - } - public void setChildren(Configuration hconf) throws HiveException { - - Path fpath = IOContext.get().getInputPath(); + Path fpath = IOContext.get(hconf.get(Utilities.INPUT_NAME)).getInputPath(); boolean schemeless = fpath.toUri().getScheme() == null; @@ -639,4 +635,8 @@ public class MapOperator extends Operato return null; } + @Override + public Map<Integer, DummyStoreOperator> getTagToOperatorTree() { + return MapRecordProcessor.getConnectOps(); + } } Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Sun Oct 5 22:26:43 2014 @@ -353,6 +353,7 @@ public class MoveTask extends Task<MoveW pushFeed(FeedType.DYNAMIC_PARTITIONS, dps); } + long startTime = System.currentTimeMillis(); // load the list of DP partitions and return the list of partition specs // TODO: In a follow-up to HIVE-1361, we should refactor loadDynamicPartitions // to use Utilities.getFullDPSpecs() to get the list of full partSpecs. @@ -360,7 +361,7 @@ public class MoveTask extends Task<MoveW // iterate over it and call loadPartition() here. // The reason we don't do inside HIVE-1361 is the latter is large and we // want to isolate any potential issue it may introduce. - ArrayList<LinkedHashMap<String, String>> dp = + Map<Map<String, String>, Partition> dp = db.loadDynamicPartitions( tbd.getSourcePath(), tbd.getTable().getTableName(), @@ -370,16 +371,19 @@ public class MoveTask extends Task<MoveW tbd.getHoldDDLTime(), isSkewedStoredAsDirs(tbd), work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID); + console.printInfo("\t Time taken for load dynamic partitions : " + + (System.currentTimeMillis() - startTime)); if (dp.size() == 0 && conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) { throw new HiveException("This query creates no partitions." + " To turn off this error, set hive.error.on.empty.partition=false."); } + startTime = System.currentTimeMillis(); // for each partition spec, get the partition // and put it to WriteEntity for post-exec hook - for (LinkedHashMap<String, String> partSpec: dp) { - Partition partn = db.getPartition(table, partSpec, false); + for(Map.Entry<Map<String, String>, Partition> entry : dp.entrySet()) { + Partition partn = entry.getValue(); if (bucketCols != null || sortCols != null) { updatePartitionBucketSortColumns(table, partn, bucketCols, numBuckets, sortCols); @@ -412,8 +416,10 @@ public class MoveTask extends Task<MoveW table.getCols()); } - console.printInfo("\tLoading partition " + partSpec); + console.printInfo("\tLoading partition " + entry.getKey()); } + console.printInfo("\t Time taken for adding to write entity : " + + (System.currentTimeMillis() - startTime)); dc = null; // reset data container to prevent it being added again. } else { // static partitions List<String> partVals = MetaStoreUtils.getPvals(table.getPartCols(), Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Sun Oct 5 22:26:43 2014 @@ -146,6 +146,7 @@ public abstract class Operator<T extends /** * Implements the getChildren function for the Node Interface. */ + @Override public ArrayList<Node> getChildren() { if (getChildOperators() == null) { @@ -497,8 +498,6 @@ public abstract class Operator<T extends LOG.debug("Starting group for children:"); for (Operator<? extends OperatorDesc> op : childOperators) { - op.setGroupKeyObjectInspector(groupKeyOI); - op.setGroupKeyObject(groupKeyObject); op.startGroup(); } @@ -851,6 +850,7 @@ public abstract class Operator<T extends * * @return the name of the operator */ + @Override public String getName() { return getOperatorName(); } @@ -968,7 +968,6 @@ public abstract class Operator<T extends } protected transient Object groupKeyObject; - protected transient ObjectInspector groupKeyOI; public String getOperatorId() { return operatorId; @@ -1061,7 +1060,7 @@ public abstract class Operator<T extends if (parents != null) { for (Operator<? extends OperatorDesc> parent : parents) { - parentClones.add((Operator<? extends OperatorDesc>)(parent.clone())); + parentClones.add((parent.clone())); } } @@ -1082,8 +1081,8 @@ public abstract class Operator<T extends public Operator<? extends OperatorDesc> cloneOp() throws CloneNotSupportedException { T descClone = (T) conf.clone(); Operator<? extends OperatorDesc> ret = - (Operator<? extends OperatorDesc>) OperatorFactory.getAndMakeChild( - descClone, getSchema()); + OperatorFactory.getAndMakeChild( + descClone, getSchema()); return ret; } @@ -1254,15 +1253,15 @@ public abstract class Operator<T extends } return null; } - + public OpTraits getOpTraits() { if (conf != null) { return conf.getOpTraits(); } - + return null; } - + public void setOpTraits(OpTraits metaInfo) { if (LOG.isDebugEnabled()) { LOG.debug("Setting traits ("+metaInfo+") on "+this); @@ -1285,21 +1284,23 @@ public abstract class Operator<T extends } } - public void setGroupKeyObjectInspector(ObjectInspector keyObjectInspector) { - this.groupKeyOI = keyObjectInspector; - } - - public ObjectInspector getGroupKeyObjectInspector() { - return groupKeyOI; - } - public static Operator createDummy() { return new DummyOperator(); } private static class DummyOperator extends Operator { public DummyOperator() { super("dummy"); } + @Override public void processOp(Object row, int tag) { } + @Override public OperatorType getType() { return null; } } + + public Map<Integer, DummyStoreOperator> getTagToOperatorTree() { + if ((parentOperators == null) || (parentOperators.size() == 0)) { + return null; + } + Map<Integer, DummyStoreOperator> dummyOps = parentOperators.get(0).getTagToOperatorTree(); + return dummyOps; + } } Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Sun Oct 5 22:26:43 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec; +import org.apache.hadoop.hive.ql.exec.vector.VectorAppMasterEventOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorExtractOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator; @@ -31,6 +32,7 @@ import org.apache.hadoop.hive.ql.exec.ve import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc; import org.apache.hadoop.hive.ql.plan.CollectDesc; +import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc; import org.apache.hadoop.hive.ql.plan.DemuxDesc; import org.apache.hadoop.hive.ql.plan.DummyStoreDesc; import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; @@ -114,10 +116,16 @@ public final class OperatorFactory { RCFileMergeOperator.class)); opvec.add(new OpTuple<OrcFileMergeDesc>(OrcFileMergeDesc.class, OrcFileMergeOperator.class)); + opvec.add(new OpTuple<CommonMergeJoinDesc>(CommonMergeJoinDesc.class, + CommonMergeJoinOperator.class)); } static { vectorOpvec = new ArrayList<OpTuple>(); + vectorOpvec.add(new OpTuple<AppMasterEventDesc>(AppMasterEventDesc.class, + VectorAppMasterEventOperator.class)); + vectorOpvec.add(new OpTuple<DynamicPruningEventDesc>(DynamicPruningEventDesc.class, + VectorAppMasterEventOperator.class)); vectorOpvec.add(new OpTuple<SelectDesc>(SelectDesc.class, VectorSelectOperator.class)); vectorOpvec.add(new OpTuple<GroupByDesc>(GroupByDesc.class, VectorGroupByOperator.class)); vectorOpvec.add(new OpTuple<MapJoinDesc>(MapJoinDesc.class, VectorMapJoinOperator.class)); Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java Sun Oct 5 22:26:43 2014 @@ -46,6 +46,9 @@ public class OperatorUtils { public static <T> Set<T> findOperators(Collection<Operator<?>> starts, Class<T> clazz) { Set<T> found = new HashSet<T>(); for (Operator<?> start : starts) { + if (start == null) { + continue; + } findOperators(start, clazz, found); } return found; Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Sun Oct 5 22:26:43 2014 @@ -50,7 +50,6 @@ import org.apache.hadoop.hive.serde2.obj import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; import org.apache.hadoop.io.BinaryComparable; import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.OutputCollector; @@ -67,6 +66,9 @@ public class ReduceSinkOperator extends } private static final Log LOG = LogFactory.getLog(ReduceSinkOperator.class.getName()); + private static final boolean isInfoEnabled = LOG.isInfoEnabled(); + private static final boolean isDebugEnabled = LOG.isDebugEnabled(); + private static final boolean isTraceEnabled = LOG.isTraceEnabled(); private static final long serialVersionUID = 1L; private static final MurmurHash hash = (MurmurHash) MurmurHash.getInstance(); @@ -117,6 +119,8 @@ public class ReduceSinkOperator extends protected transient Object[] cachedValues; protected transient List<List<Integer>> distinctColIndices; protected transient Random random; + protected transient int bucketNumber; + /** * This two dimensional array holds key data and a corresponding Union object * which contains the tag identifying the aggregate expression for distinct columns. @@ -144,8 +148,14 @@ public class ReduceSinkOperator extends protected void initializeOp(Configuration hconf) throws HiveException { try { List<ExprNodeDesc> keys = conf.getKeyCols(); - LOG.debug("keys size is " + keys.size()); - for (ExprNodeDesc k : keys) LOG.debug("Key exprNodeDesc " + k.getExprString()); + + if (isDebugEnabled) { + LOG.debug("keys size is " + keys.size()); + for (ExprNodeDesc k : keys) { + LOG.debug("Key exprNodeDesc " + k.getExprString()); + } + } + keyEval = new ExprNodeEvaluator[keys.size()]; int i = 0; for (ExprNodeDesc e : keys) { @@ -184,7 +194,9 @@ public class ReduceSinkOperator extends tag = conf.getTag(); tagByte[0] = (byte) tag; skipTag = conf.getSkipTag(); - LOG.info("Using tag = " + tag); + if (isInfoEnabled) { + LOG.info("Using tag = " + tag); + } TableDesc keyTableDesc = conf.getKeySerializeInfo(); keySerializer = (Serializer) keyTableDesc.getDeserializerClass() @@ -284,7 +296,10 @@ public class ReduceSinkOperator extends bucketInspector = (IntObjectInspector)bucketField.getFieldObjectInspector(); } - LOG.info("keys are " + conf.getOutputKeyColumnNames() + " num distributions: " + conf.getNumDistributionKeys()); + if (isInfoEnabled) { + LOG.info("keys are " + conf.getOutputKeyColumnNames() + " num distributions: " + + conf.getNumDistributionKeys()); + } keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval, distinctColIndices, conf.getOutputKeyColumnNames(), numDistributionKeys, rowInspector); @@ -304,15 +319,14 @@ public class ReduceSinkOperator extends populateCachedDistributionKeys(row, 0); // replace bucketing columns with hashcode % numBuckets - int buckNum = -1; if (bucketEval != null) { - buckNum = computeBucketNumber(row, conf.getNumBuckets()); - cachedKeys[0][buckColIdxInKey] = new IntWritable(buckNum); + bucketNumber = computeBucketNumber(row, conf.getNumBuckets()); + cachedKeys[0][buckColIdxInKey] = new Text(String.valueOf(bucketNumber)); } else if (conf.getWriteType() == AcidUtils.Operation.UPDATE || conf.getWriteType() == AcidUtils.Operation.DELETE) { // In the non-partitioned case we still want to compute the bucket number for updates and // deletes. - buckNum = computeBucketNumber(row, conf.getNumBuckets()); + bucketNumber = computeBucketNumber(row, conf.getNumBuckets()); } HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null); @@ -328,7 +342,7 @@ public class ReduceSinkOperator extends if (autoParallel && partitionEval.length > 0) { hashCode = computeMurmurHash(firstKey); } else { - hashCode = computeHashCode(row, buckNum); + hashCode = computeHashCode(row); } firstKey.setHashCode(hashCode); @@ -377,7 +391,9 @@ public class ReduceSinkOperator extends // column directly. Object recIdValue = acidRowInspector.getStructFieldData(row, recIdField); buckNum = bucketInspector.get(recIdInspector.getStructFieldData(recIdValue, bucketField)); - LOG.debug("Acid choosing bucket number " + buckNum); + if (isTraceEnabled) { + LOG.trace("Acid choosing bucket number " + buckNum); + } } else { for (int i = 0; i < bucketEval.length; i++) { Object o = bucketEval[i].evaluate(row); @@ -422,7 +438,7 @@ public class ReduceSinkOperator extends return hash.hash(firstKey.getBytes(), firstKey.getDistKeyLength(), 0); } - private int computeHashCode(Object row, int buckNum) throws HiveException { + private int computeHashCode(Object row) throws HiveException { // Evaluate the HashCode int keyHashCode = 0; if (partitionEval.length == 0) { @@ -446,8 +462,10 @@ public class ReduceSinkOperator extends + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]); } } - LOG.debug("Going to return hash code " + (keyHashCode * 31 + buckNum)); - return buckNum < 0 ? keyHashCode : keyHashCode * 31 + buckNum; + if (isTraceEnabled) { + LOG.trace("Going to return hash code " + (keyHashCode * 31 + bucketNumber)); + } + return bucketNumber < 0 ? keyHashCode : keyHashCode * 31 + bucketNumber; } private boolean partitionKeysAreNull(Object row) throws HiveException { @@ -493,10 +511,19 @@ public class ReduceSinkOperator extends } private BytesWritable makeValueWritable(Object row) throws Exception { + int length = valueEval.length; + + // in case of bucketed table, insert the bucket number as the last column in value + if (bucketEval != null) { + length -= 1; + cachedValues[length] = new Text(String.valueOf(bucketNumber)); + } + // Evaluate the value - for (int i = 0; i < valueEval.length; i++) { + for (int i = 0; i < length; i++) { cachedValues[i] = valueEval[i].evaluate(row); } + // Serialize the value return (BytesWritable) valueSerializer.serialize(cachedValues, valueObjectInspector); } Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Sun Oct 5 22:26:43 2014 @@ -89,6 +89,7 @@ import org.apache.hadoop.hive.ql.plan.Fi import org.apache.hadoop.hive.ql.plan.GroupByDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredWork; +import org.apache.hadoop.hive.ql.plan.MergeJoinWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; @@ -201,6 +202,8 @@ public final class Utilities { public static String HADOOP_LOCAL_FS = "file:///"; public static String MAP_PLAN_NAME = "map.xml"; public static String REDUCE_PLAN_NAME = "reduce.xml"; + public static String MERGE_PLAN_NAME = "merge.xml"; + public static final String INPUT_NAME = "iocontext.input.name"; public static final String MAPRED_MAPPER_CLASS = "mapred.mapper.class"; public static final String MAPRED_REDUCER_CLASS = "mapred.reducer.class"; @@ -291,6 +294,39 @@ public final class Utilities { return (ReduceWork) getBaseWork(conf, REDUCE_PLAN_NAME); } + public static Path setMergeWork(JobConf conf, MergeJoinWork mergeJoinWork, Path mrScratchDir, + boolean useCache) { + for (BaseWork baseWork : mergeJoinWork.getBaseWorkList()) { + setBaseWork(conf, baseWork, mrScratchDir, baseWork.getName() + MERGE_PLAN_NAME, useCache); + String prefixes = conf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES); + if (prefixes == null) { + prefixes = baseWork.getName(); + } else { + prefixes = prefixes + "," + baseWork.getName(); + } + conf.set(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES, prefixes); + } + + // nothing to return + return null; + } + + public static BaseWork getMergeWork(JobConf jconf) { + if ((jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX) == null) + || (jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX).isEmpty())) { + return null; + } + return getMergeWork(jconf, jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX)); + } + + public static BaseWork getMergeWork(JobConf jconf, String prefix) { + if (prefix == null || prefix.isEmpty()) { + return null; + } + + return getBaseWork(jconf, prefix + MERGE_PLAN_NAME); + } + public static void cacheBaseWork(Configuration conf, String name, BaseWork work, Path hiveScratchDir) { try { @@ -375,6 +411,8 @@ public final class Utilities { throw new RuntimeException("unable to determine work from configuration ." + MAPRED_REDUCER_CLASS +" was "+ conf.get(MAPRED_REDUCER_CLASS)) ; } + } else if (name.contains(MERGE_PLAN_NAME)) { + gWork = deserializePlan(in, MapWork.class, conf); } gWorkMap.put(path, gWork); } else { @@ -608,8 +646,14 @@ public final class Utilities { } public static void setMapRedWork(Configuration conf, MapredWork w, Path hiveScratchDir) { + String useName = conf.get(INPUT_NAME); + if (useName == null) { + useName = "mapreduce"; + } + conf.set(INPUT_NAME, useName); setMapWork(conf, w.getMapWork(), hiveScratchDir, true); if (w.getReduceWork() != null) { + conf.set(INPUT_NAME, useName); setReduceWork(conf, w.getReduceWork(), hiveScratchDir, true); } } @@ -1846,7 +1890,7 @@ public final class Utilities { for (int i = 0; i < parts.length; ++i) { assert parts[i].isDir() : "dynamic partition " + parts[i].getPath() - + " is not a direcgtory"; + + " is not a directory"; FileStatus[] items = fs.listStatus(parts[i].getPath()); // remove empty directory since DP insert should not generate empty partitions. Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java Sun Oct 5 22:26:43 2014 @@ -78,10 +78,11 @@ public class ExecMapper extends MapReduc private MapredLocalWork localWork = null; private boolean isLogInfoEnabled = false; - private final ExecMapperContext execContext = new ExecMapperContext(); + private ExecMapperContext execContext = null; @Override public void configure(JobConf job) { + execContext = new ExecMapperContext(job); // Allocate the bean at the beginning - memoryMXBean = ManagementFactory.getMemoryMXBean(); l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax()); @@ -292,6 +293,7 @@ public class ExecMapper extends MapReduc this.rp = rp; } + @Override public void func(Operator op) { Map<Enum<?>, Long> opStats = op.getStats(); for (Map.Entry<Enum<?>, Long> e : opStats.entrySet()) { Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java Sun Oct 5 22:26:43 2014 @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.FetchOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.IOContext; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; import org.apache.hadoop.mapred.JobConf; @@ -60,8 +61,9 @@ public class ExecMapperContext { this.currentBigBucketFile = currentBigBucketFile; } - public ExecMapperContext() { - ioCxt = IOContext.get(); + public ExecMapperContext(JobConf jc) { + this.jc = jc; + ioCxt = IOContext.get(jc.get(Utilities.INPUT_NAME)); } public void clear() { Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java Sun Oct 5 22:26:43 2014 @@ -66,6 +66,8 @@ import org.apache.hadoop.util.StringUtil public class ExecReducer extends MapReduceBase implements Reducer { private static final Log LOG = LogFactory.getLog("ExecReducer"); + private static final boolean isInfoEnabled = LOG.isInfoEnabled(); + private static final boolean isTraceEnabled = LOG.isTraceEnabled(); private static final String PLAN_KEY = "__REDUCE_PLAN__"; // used to log memory usage periodically @@ -75,7 +77,6 @@ public class ExecReducer extends MapRedu private final Deserializer[] inputValueDeserializer = new Deserializer[Byte.MAX_VALUE]; private final Object[] valueObject = new Object[Byte.MAX_VALUE]; private final List<Object> row = new ArrayList<Object>(Utilities.reduceFieldNameList.size()); - private final boolean isLogInfoEnabled = LOG.isInfoEnabled(); // TODO: move to DynamicSerDe when it's ready private Deserializer inputKeyDeserializer; @@ -101,16 +102,18 @@ public class ExecReducer extends MapRedu ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; ObjectInspector keyObjectInspector; - LOG.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax()); + if (isInfoEnabled) { + LOG.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax()); - try { - LOG.info("conf classpath = " - + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs())); - LOG.info("thread classpath = " - + Arrays.asList(((URLClassLoader) Thread.currentThread() - .getContextClassLoader()).getURLs())); - } catch (Exception e) { - LOG.info("cannot get classpath: " + e.getMessage()); + try { + LOG.info("conf classpath = " + + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs())); + LOG.info("thread classpath = " + + Arrays.asList(((URLClassLoader) Thread.currentThread() + .getContextClassLoader()).getURLs())); + } catch (Exception e) { + LOG.info("cannot get classpath: " + e.getMessage()); + } } jc = job; @@ -147,7 +150,6 @@ public class ExecReducer extends MapRedu ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>(); ois.add(keyObjectInspector); ois.add(valueObjectInspector[tag]); - reducer.setGroupKeyObjectInspector(keyObjectInspector); rowObjectInspector[tag] = ObjectInspectorFactory .getStandardStructObjectInspector(Utilities.reduceFieldNameList, ois); } @@ -202,7 +204,9 @@ public class ExecReducer extends MapRedu groupKey = new BytesWritable(); } else { // If a operator wants to do some work at the end of a group - LOG.trace("End Group"); + if (isTraceEnabled) { + LOG.trace("End Group"); + } reducer.endGroup(); } @@ -217,9 +221,11 @@ public class ExecReducer extends MapRedu } groupKey.set(keyWritable.get(), 0, keyWritable.getSize()); - LOG.trace("Start Group"); - reducer.setGroupKeyObject(keyObject); + if (isTraceEnabled) { + LOG.trace("Start Group"); + } reducer.startGroup(); + reducer.setGroupKeyObject(keyObject); } // System.err.print(keyObject.toString()); while (values.hasNext()) { @@ -239,12 +245,14 @@ public class ExecReducer extends MapRedu row.clear(); row.add(keyObject); row.add(valueObject[tag]); - if (isLogInfoEnabled) { + if (isInfoEnabled) { cntr++; if (cntr == nextCntr) { long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed(); - LOG.info("ExecReducer: processing " + cntr - + " rows: used memory = " + used_memory); + if (isInfoEnabled) { + LOG.info("ExecReducer: processing " + cntr + + " rows: used memory = " + used_memory); + } nextCntr = getNextCntr(cntr); } } @@ -290,17 +298,19 @@ public class ExecReducer extends MapRedu public void close() { // No row was processed - if (oc == null) { + if (oc == null && isTraceEnabled) { LOG.trace("Close called without any rows processed"); } try { if (groupKey != null) { // If a operator wants to do some work at the end of a group - LOG.trace("End Group"); + if (isTraceEnabled) { + LOG.trace("End Group"); + } reducer.endGroup(); } - if (isLogInfoEnabled) { + if (isInfoEnabled) { LOG.info("ExecReducer: processed " + cntr + " rows: used memory = " + memoryMXBean.getHeapMemoryUsage().getUsed()); } Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java Sun Oct 5 22:26:43 2014 @@ -91,7 +91,7 @@ public class MapredLocalTask extends Tas // not sure we need this exec context; but all the operators in the work // will pass this context throught - private ExecMapperContext execContext = new ExecMapperContext(); + private ExecMapperContext execContext = null; private Process executor; @@ -113,6 +113,7 @@ public class MapredLocalTask extends Tas public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) { super.initialize(conf, queryPlan, driverContext); job = new JobConf(conf, ExecDriver.class); + execContext = new ExecMapperContext(job); //we don't use the HadoopJobExecHooks for local tasks this.jobExecHelper = new HadoopJobExecHelper(job, console, this, null); } @@ -301,6 +302,11 @@ public class MapredLocalTask extends Tas if (work == null) { return -1; } + + if (execContext == null) { + execContext = new ExecMapperContext(job); + } + memoryMXBean = ManagementFactory.getMemoryMXBean(); long startTime = System.currentTimeMillis(); console.printInfo(Utilities.now() Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java Sun Oct 5 22:26:43 2014 @@ -31,6 +31,7 @@ import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.plan.TezWork.VertexType; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.serializer.SerializationFactory; @@ -79,9 +80,14 @@ public class CustomPartitionVertex exten private List<InputDataInformationEvent> dataInformationEvents; private int numBuckets = -1; private Configuration conf = null; - private boolean rootVertexInitialized = false; private final SplitGrouper grouper = new SplitGrouper(); private int taskCount = 0; + private VertexType vertexType; + private String mainWorkName; + private final Multimap<Integer, Integer> bucketToTaskMap = HashMultimap.<Integer, Integer> create(); + + private final Map<String, Multimap<Integer, InputSplit>> inputToGroupedSplitMap = + new HashMap<String, Multimap<Integer, InputSplit>>(); public CustomPartitionVertex(VertexManagerPluginContext context) { super(context); @@ -90,8 +96,18 @@ public class CustomPartitionVertex exten @Override public void initialize() { this.context = getContext(); - ByteBuffer byteBuf = context.getUserPayload().getPayload(); - this.numBuckets = byteBuf.getInt(); + ByteBuffer payload = context.getUserPayload().getPayload(); + CustomVertexConfiguration vertexConf = new CustomVertexConfiguration(); + DataInputByteBuffer dibb = new DataInputByteBuffer(); + dibb.reset(payload); + try { + vertexConf.readFields(dibb); + } catch (IOException e) { + throw new RuntimeException(e); + } + this.numBuckets = vertexConf.getNumBuckets(); + this.mainWorkName = vertexConf.getInputName(); + this.vertexType = vertexConf.getVertexType(); } @Override @@ -113,17 +129,12 @@ public class CustomPartitionVertex exten public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) { } - // One call per root Input - and for now only one is handled. + // One call per root Input @Override public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, List<Event> events) { + LOG.info("On root vertex initialized " + inputName); - // Ideally, since there's only 1 Input expected at the moment - - // ensure this method is called only once. Tez will call it once per Root - // Input. - Preconditions.checkState(rootVertexInitialized == false); - LOG.info("Root vertex not initialized"); - rootVertexInitialized = true; try { // This is using the payload from the RootVertexInitializer corresponding // to InputName. Ideally it should be using it's own configuration class - @@ -164,9 +175,6 @@ public class CustomPartitionVertex exten // No tasks should have been started yet. Checked by initial state // check. Preconditions.checkState(dataInformationEventSeen == false); - Preconditions - .checkState(context.getVertexNumTasks(context.getVertexName()) == -1, - "Parallelism for the vertex should be set to -1 if the InputInitializer is setting parallelism"); InputConfigureVertexTasksEvent cEvent = (InputConfigureVertexTasksEvent) event; // The vertex cannot be configured until all DataEvents are seen - to @@ -220,21 +228,55 @@ public class CustomPartitionVertex exten (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0])); Multimap<Integer, InputSplit> groupedSplit = HiveSplitGenerator.generateGroupedSplits(jobConf, conf, inputSplitArray, waves, - availableSlots); + availableSlots, inputName); bucketToGroupedSplitMap.putAll(key, groupedSplit.values()); } - LOG.info("We have grouped the splits into " + bucketToGroupedSplitMap.size() + " tasks"); - processAllEvents(inputName, bucketToGroupedSplitMap); + LOG.info("We have grouped the splits into " + bucketToGroupedSplitMap); + if ((mainWorkName.isEmpty() == false) && (mainWorkName.compareTo(inputName) != 0)) { + /* + * this is the small table side. In case of SMB join, we may need to send each split to the + * corresponding bucket-based task on the other side. In case a split needs to go to + * multiple downstream tasks, we need to clone the event and send it to the right + * destination. + */ + processAllSideEvents(inputName, bucketToGroupedSplitMap); + } else { + processAllEvents(inputName, bucketToGroupedSplitMap); + } } catch (Exception e) { throw new RuntimeException(e); } } + private void processAllSideEvents(String inputName, + Multimap<Integer, InputSplit> bucketToGroupedSplitMap) throws IOException { + // the bucket to task map should have been setup by the big table. + if (bucketToTaskMap.isEmpty()) { + inputToGroupedSplitMap.put(inputName, bucketToGroupedSplitMap); + return; + } + List<InputDataInformationEvent> taskEvents = new ArrayList<InputDataInformationEvent>(); + for (Entry<Integer, Collection<InputSplit>> entry : bucketToGroupedSplitMap.asMap().entrySet()) { + Collection<Integer> destTasks = bucketToTaskMap.get(entry.getKey()); + for (Integer task : destTasks) { + for (InputSplit split : entry.getValue()) { + MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(split); + InputDataInformationEvent diEvent = + InputDataInformationEvent.createWithSerializedPayload(task, serializedSplit + .toByteString().asReadOnlyByteBuffer()); + diEvent.setTargetIndex(task); + taskEvents.add(diEvent); + } + } + } + + context.addRootInputEvents(inputName, taskEvents); + } + private void processAllEvents(String inputName, Multimap<Integer, InputSplit> bucketToGroupedSplitMap) throws IOException { - Multimap<Integer, Integer> bucketToTaskMap = HashMultimap.<Integer, Integer> create(); List<InputSplit> finalSplits = Lists.newLinkedList(); for (Entry<Integer, Collection<InputSplit>> entry : bucketToGroupedSplitMap.asMap().entrySet()) { int bucketNum = entry.getKey(); @@ -248,11 +290,13 @@ public class CustomPartitionVertex exten // Construct the EdgeManager descriptor to be used by all edges which need // the routing table. - EdgeManagerPluginDescriptor hiveEdgeManagerDesc = - EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName()); - UserPayload payload = getBytePayload(bucketToTaskMap); - hiveEdgeManagerDesc.setUserPayload(payload); - + EdgeManagerPluginDescriptor hiveEdgeManagerDesc = null; + if ((vertexType == VertexType.MULTI_INPUT_INITIALIZED_EDGES) + || (vertexType == VertexType.INITIALIZED_EDGES)) { + hiveEdgeManagerDesc = EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName()); + UserPayload payload = getBytePayload(bucketToTaskMap); + hiveEdgeManagerDesc.setUserPayload(payload); + } Map<String, EdgeManagerPluginDescriptor> emMap = Maps.newHashMap(); // Replace the edge manager for all vertices which have routing type custom. @@ -285,13 +329,21 @@ public class CustomPartitionVertex exten rootInputSpecUpdate.put( inputName, InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate()); - context.setVertexParallelism( - taskCount, - VertexLocationHint.create(grouper.createTaskLocationHints(finalSplits - .toArray(new InputSplit[finalSplits.size()]))), emMap, rootInputSpecUpdate); + if ((mainWorkName.compareTo(inputName) == 0) || (mainWorkName.isEmpty())) { + context.setVertexParallelism( + taskCount, + VertexLocationHint.create(grouper.createTaskLocationHints(finalSplits + .toArray(new InputSplit[finalSplits.size()]))), emMap, rootInputSpecUpdate); + } // Set the actual events for the tasks. context.addRootInputEvents(inputName, taskEvents); + if (inputToGroupedSplitMap.isEmpty() == false) { + for (Entry<String, Multimap<Integer, InputSplit>> entry : inputToGroupedSplitMap.entrySet()) { + processAllSideEvents(entry.getKey(), entry.getValue()); + } + inputToGroupedSplitMap.clear(); + } } UserPayload getBytePayload(Multimap<Integer, Integer> routingTable) throws IOException { @@ -315,7 +367,8 @@ public class CustomPartitionVertex exten if (!(inputSplit instanceof FileSplit)) { throw new UnsupportedOperationException( - "Cannot handle splits other than FileSplit for the moment"); + "Cannot handle splits other than FileSplit for the moment. Current input split type: " + + inputSplit.getClass().getSimpleName()); } return (FileSplit) inputSplit; } @@ -327,7 +380,6 @@ public class CustomPartitionVertex exten Map<String, List<FileSplit>> pathFileSplitsMap) { int bucketNum = 0; - int fsCount = 0; Multimap<Integer, InputSplit> bucketToInitialSplitMap = ArrayListMultimap.<Integer, InputSplit> create(); @@ -335,14 +387,20 @@ public class CustomPartitionVertex exten for (Map.Entry<String, List<FileSplit>> entry : pathFileSplitsMap.entrySet()) { int bucketId = bucketNum % numBuckets; for (FileSplit fsplit : entry.getValue()) { - fsCount++; bucketToInitialSplitMap.put(bucketId, fsplit); } bucketNum++; } - LOG.info("Total number of splits counted: " + fsCount + " and total files encountered: " - + pathFileSplitsMap.size()); + if (bucketNum < numBuckets) { + int loopedBucketId = 0; + for (; bucketNum < numBuckets; bucketNum++) { + for (InputSplit fsplit : bucketToInitialSplitMap.get(loopedBucketId)) { + bucketToInitialSplitMap.put(bucketNum, fsplit); + } + loopedBucketId++; + } + } return bucketToInitialSplitMap; } Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Sun Oct 5 22:26:43 2014 @@ -20,6 +20,23 @@ package org.apache.hadoop.hive.ql.exec.t import com.google.common.base.Function; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; + +import javax.security.auth.login.LoginException; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -32,6 +49,7 @@ import org.apache.hadoop.hive.conf.HiveC import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; import org.apache.hadoop.hive.ql.exec.mr.ExecReducer; @@ -47,10 +65,12 @@ import org.apache.hadoop.hive.ql.io.merg import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.MergeJoinWork; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.plan.TezWork.VertexType; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; @@ -90,12 +110,16 @@ import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.VertexGroup; import org.apache.tez.dag.api.VertexManagerPluginDescriptor; import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; +import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MRInputHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; +import org.apache.tez.mapreduce.input.MRInput; import org.apache.tez.mapreduce.input.MRInputLegacy; +import org.apache.tez.mapreduce.input.MultiMRInput; import org.apache.tez.mapreduce.output.MROutput; import org.apache.tez.mapreduce.partition.MRPartitioner; +import org.apache.tez.mapreduce.protos.MRRuntimeProtos; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.comparator.TezBytesComparator; import org.apache.tez.runtime.library.common.serializer.TezBytesWritableSerialization; @@ -104,21 +128,6 @@ import org.apache.tez.runtime.library.co import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig; import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput; -import javax.security.auth.login.LoginException; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; - /** * DagUtils. DagUtils is a collection of helper methods to convert * map and reduce work to tez vertices and edges. It handles configuration @@ -130,6 +139,11 @@ public class DagUtils { private static final Log LOG = LogFactory.getLog(DagUtils.class.getName()); private static final String TEZ_DIR = "_tez_scratch_dir"; private static DagUtils instance; + // The merge file being currently processed. + public static final String TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX = + "hive.tez.current.merge.file.prefix"; + // "A comma separated list of work names used as prefix. + public static final String TEZ_MERGE_WORK_FILE_PREFIXES = "hive.tez.merge.file.prefixes"; private void addCredentials(MapWork mapWork, DAG dag) { Set<String> paths = mapWork.getPathToAliases().keySet(); @@ -238,8 +252,8 @@ public class DagUtils { * endpoints. */ @SuppressWarnings("rawtypes") - public GroupInputEdge createEdge(VertexGroup group, JobConf vConf, - Vertex w, TezEdgeProperty edgeProp) + public GroupInputEdge createEdge(VertexGroup group, JobConf vConf, Vertex w, + TezEdgeProperty edgeProp, VertexType vertexType) throws IOException { Class mergeInputClass; @@ -254,10 +268,14 @@ public class DagUtils { case CUSTOM_EDGE: { mergeInputClass = ConcatenatedMergedKeyValueInput.class; int numBuckets = edgeProp.getNumBuckets(); + CustomVertexConfiguration vertexConf = + new CustomVertexConfiguration(numBuckets, vertexType, ""); + DataOutputBuffer dob = new DataOutputBuffer(); + vertexConf.write(dob); VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create(CustomPartitionVertex.class.getName()); - ByteBuffer userPayload = ByteBuffer.allocate(4).putInt(numBuckets); - userPayload.flip(); + byte[] userPayloadBytes = dob.getData(); + ByteBuffer userPayload = ByteBuffer.wrap(userPayloadBytes); desc.setUserPayload(UserPayload.create(userPayload)); w.setVertexManagerPlugin(desc); break; @@ -289,17 +307,21 @@ public class DagUtils { * @param w The second vertex (sink) * @return */ - public Edge createEdge(JobConf vConf, Vertex v, Vertex w, - TezEdgeProperty edgeProp) + public Edge createEdge(JobConf vConf, Vertex v, Vertex w, TezEdgeProperty edgeProp, + VertexType vertexType) throws IOException { switch(edgeProp.getEdgeType()) { case CUSTOM_EDGE: { int numBuckets = edgeProp.getNumBuckets(); - ByteBuffer userPayload = ByteBuffer.allocate(4).putInt(numBuckets); - userPayload.flip(); + CustomVertexConfiguration vertexConf = + new CustomVertexConfiguration(numBuckets, vertexType, ""); + DataOutputBuffer dob = new DataOutputBuffer(); + vertexConf.write(dob); VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create( CustomPartitionVertex.class.getName()); + byte[] userPayloadBytes = dob.getData(); + ByteBuffer userPayload = ByteBuffer.wrap(userPayloadBytes); desc.setUserPayload(UserPayload.create(userPayload)); w.setVertexManagerPlugin(desc); break; @@ -405,7 +427,7 @@ public class DagUtils { * from yarn. Falls back to Map-reduce's map size if tez * container size isn't set. */ - private Resource getContainerResource(Configuration conf) { + public static Resource getContainerResource(Configuration conf) { int memory = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) > 0 ? HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) : conf.getInt(MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB); @@ -443,12 +465,61 @@ public class DagUtils { return MRHelpers.getJavaOptsForMRMapper(conf); } + private Vertex createVertex(JobConf conf, MergeJoinWork mergeJoinWork, LocalResource appJarLr, + List<LocalResource> additionalLr, FileSystem fs, Path mrScratchDir, Context ctx, + VertexType vertexType) + throws Exception { + Utilities.setMergeWork(conf, mergeJoinWork, mrScratchDir, false); + if (mergeJoinWork.getMainWork() instanceof MapWork) { + List<BaseWork> mapWorkList = mergeJoinWork.getBaseWorkList(); + MapWork mapWork = (MapWork) (mergeJoinWork.getMainWork()); + CommonMergeJoinOperator mergeJoinOp = mergeJoinWork.getMergeJoinOperator(); + Vertex mergeVx = + createVertex(conf, mapWork, appJarLr, additionalLr, fs, mrScratchDir, ctx, vertexType); + + // grouping happens in execution phase. Setting the class to TezGroupedSplitsInputFormat + // here would cause pre-mature grouping which would be incorrect. + Class inputFormatClass = HiveInputFormat.class; + conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class); + // mapreduce.tez.input.initializer.serialize.event.payload should be set + // to false when using this plug-in to avoid getting a serialized event at run-time. + conf.setBoolean("mapreduce.tez.input.initializer.serialize.event.payload", false); + for (int i = 0; i < mapWorkList.size(); i++) { + + mapWork = (MapWork) (mapWorkList.get(i)); + conf.set(TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX, mapWork.getName()); + conf.set(Utilities.INPUT_NAME, mapWork.getName()); + LOG.info("Going through each work and adding MultiMRInput"); + mergeVx.addDataSource(mapWork.getName(), + MultiMRInput.createConfigBuilder(conf, HiveInputFormat.class).build()); + } + + VertexManagerPluginDescriptor desc = + VertexManagerPluginDescriptor.create(CustomPartitionVertex.class.getName()); + CustomVertexConfiguration vertexConf = + new CustomVertexConfiguration(mergeJoinWork.getMergeJoinOperator().getConf() + .getNumBuckets(), vertexType, mergeJoinWork.getBigTableAlias()); + DataOutputBuffer dob = new DataOutputBuffer(); + vertexConf.write(dob); + byte[] userPayload = dob.getData(); + desc.setUserPayload(UserPayload.create(ByteBuffer.wrap(userPayload))); + mergeVx.setVertexManagerPlugin(desc); + return mergeVx; + } else { + Vertex mergeVx = + createVertex(conf, (ReduceWork) mergeJoinWork.getMainWork(), appJarLr, additionalLr, fs, + mrScratchDir, ctx); + return mergeVx; + } + } + /* * Helper function to create Vertex from MapWork. */ private Vertex createVertex(JobConf conf, MapWork mapWork, LocalResource appJarLr, List<LocalResource> additionalLr, FileSystem fs, - Path mrScratchDir, Context ctx, TezWork tezWork) throws Exception { + Path mrScratchDir, Context ctx, VertexType vertexType) + throws Exception { Path tezDir = getTezDir(mrScratchDir); @@ -470,15 +541,8 @@ public class DagUtils { Class inputFormatClass = conf.getClass("mapred.input.format.class", InputFormat.class); - boolean vertexHasCustomInput = false; - if (tezWork != null) { - for (BaseWork baseWork : tezWork.getParents(mapWork)) { - if (tezWork.getEdgeType(baseWork, mapWork) == EdgeType.CUSTOM_EDGE) { - vertexHasCustomInput = true; - } - } - } - + boolean vertexHasCustomInput = VertexType.isCustomInputType(vertexType); + LOG.info("Vertex has custom input? " + vertexHasCustomInput); if (vertexHasCustomInput) { groupSplitsInInputInitializer = false; // grouping happens in execution phase. The input payload should not enable grouping here, @@ -513,6 +577,8 @@ public class DagUtils { } } + // remember mapping of plan to input + conf.set(Utilities.INPUT_NAME, mapWork.getName()); if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION) && !mapWork.isUseOneNullRowInputFormat()) { @@ -593,6 +659,7 @@ public class DagUtils { Path mrScratchDir, Context ctx) throws Exception { // set up operator plan + conf.set(Utilities.INPUT_NAME, reduceWork.getName()); Utilities.setReduceWork(conf, reduceWork, mrScratchDir, false); // create the directories FileSinkOperators need @@ -937,12 +1004,22 @@ public class DagUtils { return initializeVertexConf(conf, context, (MapWork)work); } else if (work instanceof ReduceWork) { return initializeVertexConf(conf, context, (ReduceWork)work); + } else if (work instanceof MergeJoinWork) { + return initializeVertexConf(conf, context, (MergeJoinWork) work); } else { assert false; return null; } } + private JobConf initializeVertexConf(JobConf conf, Context context, MergeJoinWork work) { + if (work.getMainWork() instanceof MapWork) { + return initializeVertexConf(conf, context, (MapWork) (work.getMainWork())); + } else { + return initializeVertexConf(conf, context, (ReduceWork) (work.getMainWork())); + } + } + /** * Create a vertex from a given work object. * @@ -958,18 +1035,21 @@ public class DagUtils { */ public Vertex createVertex(JobConf conf, BaseWork work, Path scratchDir, LocalResource appJarLr, - List<LocalResource> additionalLr, - FileSystem fileSystem, Context ctx, boolean hasChildren, TezWork tezWork) throws Exception { + List<LocalResource> additionalLr, FileSystem fileSystem, Context ctx, boolean hasChildren, + TezWork tezWork, VertexType vertexType) throws Exception { Vertex v = null; // simply dispatch the call to the right method for the actual (sub-) type of // BaseWork. if (work instanceof MapWork) { - v = createVertex(conf, (MapWork) work, appJarLr, - additionalLr, fileSystem, scratchDir, ctx, tezWork); + v = createVertex(conf, (MapWork) work, appJarLr, additionalLr, fileSystem, scratchDir, ctx, + vertexType); } else if (work instanceof ReduceWork) { v = createVertex(conf, (ReduceWork) work, appJarLr, additionalLr, fileSystem, scratchDir, ctx); + } else if (work instanceof MergeJoinWork) { + v = createVertex(conf, (MergeJoinWork) work, appJarLr, additionalLr, fileSystem, scratchDir, + ctx, vertexType); } else { // something is seriously wrong if this is happening throw new HiveException(ErrorMsg.GENERIC_ERROR.getErrorCodedMsg()); Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java?rev=1629544&r1=1629543&r2=1629544&view=diff ============================================================================== --- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java (original) +++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java Sun Oct 5 22:26:43 2014 @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -59,6 +60,7 @@ import org.apache.hadoop.hive.serde2.typ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.tez.dag.api.event.VertexState; import org.apache.tez.runtime.api.InputInitializerContext; import org.apache.tez.runtime.api.events.InputInitializerEvent; @@ -77,12 +79,13 @@ public class DynamicPartitionPruner { private final BytesWritable writable = new BytesWritable(); - private final BlockingQueue<InputInitializerEvent> queue = - new LinkedBlockingQueue<InputInitializerEvent>(); + private final BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>(); + + private final Set<String> sourcesWaitingForEvents = new HashSet<String>(); private int sourceInfoCount = 0; - private InputInitializerContext context; + private final Object endOfEvents = new Object(); public DynamicPartitionPruner() { } @@ -91,8 +94,21 @@ public class DynamicPartitionPruner { throws SerDeException, IOException, InterruptedException, HiveException { - this.context = context; - this.initialize(work, jobConf); + synchronized(sourcesWaitingForEvents) { + initialize(work, jobConf); + + if (sourcesWaitingForEvents.isEmpty()) { + return; + } + + Set<VertexState> states = Collections.singleton(VertexState.SUCCEEDED); + for (String source : sourcesWaitingForEvents) { + // we need to get state transition updates for the vertices that will send + // events to us. once we have received all events and a vertex has succeeded, + // we can move to do the pruning. + context.registerForVertexStateUpdates(source, states); + } + } LOG.info("Waiting for events (" + sourceInfoCount + " items) ..."); // synchronous event processing loop. Won't return until all events have @@ -102,7 +118,7 @@ public class DynamicPartitionPruner { LOG.info("Ok to proceed."); } - public BlockingQueue<InputInitializerEvent> getQueue() { + public BlockingQueue<Object> getQueue() { return queue; } @@ -111,11 +127,14 @@ public class DynamicPartitionPruner { sourceInfoCount = 0; } - private void initialize(MapWork work, JobConf jobConf) throws SerDeException { + public void initialize(MapWork work, JobConf jobConf) throws SerDeException { this.clear(); Map<String, SourceInfo> columnMap = new HashMap<String, SourceInfo>(); + Set<String> sources = work.getEventSourceTableDescMap().keySet(); + + sourcesWaitingForEvents.addAll(sources); - for (String s : work.getEventSourceTableDescMap().keySet()) { + for (String s : sources) { List<TableDesc> tables = work.getEventSourceTableDescMap().get(s); List<String> columnNames = work.getEventSourceColumnNameMap().get(s); List<ExprNodeDesc> partKeyExprs = work.getEventSourcePartKeyExprMap().get(s); @@ -277,46 +296,30 @@ public class DynamicPartitionPruner { private void processEvents() throws SerDeException, IOException, InterruptedException { int eventCount = 0; - int neededEvents = getExpectedNumberOfEvents(); - while (neededEvents > eventCount) { - InputInitializerEvent event = queue.take(); + while (true) { + Object element = queue.take(); + + if (element == endOfEvents) { + // we're done processing events + break; + } + + InputInitializerEvent event = (InputInitializerEvent) element; + LOG.info("Input event: " + event.getTargetInputName() + ", " + event.getTargetVertexName() + ", " + (event.getUserPayload().limit() - event.getUserPayload().position())); - processPayload(event.getUserPayload()); + processPayload(event.getUserPayload(), event.getSourceVertexName()); eventCount += 1; - neededEvents = getExpectedNumberOfEvents(); - LOG.info("Needed events: " + neededEvents + ", received events: " + eventCount); } - } - - private int getExpectedNumberOfEvents() throws InterruptedException { - int neededEvents = 0; - - boolean notInitialized; - do { - neededEvents = 0; - notInitialized = false; - for (String s : sourceInfoMap.keySet()) { - int multiplier = sourceInfoMap.get(s).size(); - int taskNum = context.getVertexNumTasks(s); - LOG.info("Vertex " + s + " has " + taskNum + " events."); - if (taskNum < 0) { - notInitialized = true; - Thread.sleep(10); - continue; - } - neededEvents += (taskNum * multiplier); - } - } while (notInitialized); - - return neededEvents; + LOG.info("Received events: " + eventCount); } @SuppressWarnings("deprecation") - private String processPayload(ByteBuffer payload) throws SerDeException, IOException { + private String processPayload(ByteBuffer payload, String sourceName) throws SerDeException, + IOException { + DataInputStream in = new DataInputStream(new ByteBufferBackedInputStream(payload)); - String sourceName = in.readUTF(); String columnName = in.readUTF(); boolean skip = in.readBoolean(); @@ -390,4 +393,26 @@ public class DynamicPartitionPruner { } } + public void addEvent(InputInitializerEvent event) { + synchronized(sourcesWaitingForEvents) { + if (sourcesWaitingForEvents.contains(event.getSourceVertexName())) { + queue.offer(event); + } + } + } + + public void processVertex(String name) { + LOG.info("Vertex succeeded: " + name); + + synchronized(sourcesWaitingForEvents) { + sourcesWaitingForEvents.remove(name); + + if (sourcesWaitingForEvents.isEmpty()) { + // we've got what we need; mark the queue + queue.offer(endOfEvents); + } else { + LOG.info("Waiting for " + sourcesWaitingForEvents.size() + " events."); + } + } + } }
