Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1629563&r1=1629562&r2=1629563&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Mon Oct 6 04:00:39 2014 @@ -109,8 +109,8 @@ import org.apache.hadoop.hive.serde2.laz import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.StringUtils; import org.apache.thrift.TException; import com.google.common.collect.Sets; @@ -378,6 +378,27 @@ public class Hive { List<String> partCols, Class<? extends InputFormat> fileInputFormat, Class<?> fileOutputFormat, int bucketCount, List<String> bucketCols) throws HiveException { + createTable(tableName, columns, partCols, fileInputFormat, fileOutputFormat, bucketCount, + bucketCols, null); + } + + /** + * Create a table metadata and the directory for the table data + * @param tableName table name + * @param columns list of fields of the table + * @param partCols partition keys of the table + * @param fileInputFormat Class of the input format of the table data file + * @param fileOutputFormat Class of the output format of the table data file + * @param bucketCount number of buckets that each partition (or the table itself) should be + * divided into + * @param bucketCols Bucket columns + * @param parameters Parameters for the table + * @throws HiveException + */ + public void createTable(String tableName, List<String> columns, List<String> partCols, + Class<? extends InputFormat> fileInputFormat, + Class<?> fileOutputFormat, int bucketCount, List<String> bucketCols, + Map<String, String> parameters) throws HiveException { if (columns == null) { throw new HiveException("columns not specified for table " + tableName); } @@ -402,6 +423,9 @@ public class Hive { tbl.setSerializationLib(LazySimpleSerDe.class.getName()); tbl.setNumBuckets(bucketCount); tbl.setBucketCols(bucketCols); + if (parameters != null) { + tbl.setParamters(parameters); + } createTable(tbl); } @@ -427,9 +451,9 @@ public class Hive { newTbl.checkValidity(); getMSC().alter_table(names[0], names[1], newTbl.getTTable()); } catch (MetaException e) { - throw new HiveException("Unable to alter table.", e); + throw new HiveException("Unable to alter table. " + e.getMessage(), e); } catch (TException e) { - throw new HiveException("Unable to alter table.", e); + throw new HiveException("Unable to alter table. " + e.getMessage(), e); } } @@ -455,9 +479,9 @@ public class Hive { try { getMSC().alter_index(dbName, baseTblName, idxName, newIdx); } catch (MetaException e) { - throw new HiveException("Unable to alter index.", e); + throw new HiveException("Unable to alter index. " + e.getMessage(), e); } catch (TException e) { - throw new HiveException("Unable to alter index.", e); + throw new HiveException("Unable to alter index. " + e.getMessage(), e); } } @@ -502,9 +526,9 @@ public class Hive { getMSC().alter_partition(dbName, tblName, newPart.getTPartition()); } catch (MetaException e) { - throw new HiveException("Unable to alter partition.", e); + throw new HiveException("Unable to alter partition. " + e.getMessage(), e); } catch (TException e) { - throw new HiveException("Unable to alter partition.", e); + throw new HiveException("Unable to alter partition. " + e.getMessage(), e); } } @@ -534,9 +558,9 @@ public class Hive { } getMSC().alter_partitions(names[0], names[1], newTParts); } catch (MetaException e) { - throw new HiveException("Unable to alter partition.", e); + throw new HiveException("Unable to alter partition. " + e.getMessage(), e); } catch (TException e) { - throw new HiveException("Unable to alter partition.", e); + throw new HiveException("Unable to alter partition. " + e.getMessage(), e); } } /** @@ -578,11 +602,11 @@ public class Hive { newPart.getTPartition()); } catch (InvalidOperationException e){ - throw new HiveException("Unable to rename partition.", e); + throw new HiveException("Unable to rename partition. " + e.getMessage(), e); } catch (MetaException e) { - throw new HiveException("Unable to rename partition.", e); + throw new HiveException("Unable to rename partition. " + e.getMessage(), e); } catch (TException e) { - throw new HiveException("Unable to rename partition.", e); + throw new HiveException("Unable to rename partition. " + e.getMessage(), e); } } @@ -591,11 +615,11 @@ public class Hive { try { getMSC().alterDatabase(dbName, db); } catch (MetaException e) { - throw new HiveException("Unable to alter database " + dbName, e); + throw new HiveException("Unable to alter database " + dbName + ". " + e.getMessage(), e); } catch (NoSuchObjectException e) { throw new HiveException("Database " + dbName + " does not exists.", e); } catch (TException e) { - throw new HiveException("Unable to alter database " + dbName, e); + throw new HiveException("Unable to alter database " + dbName + ". " + e.getMessage(), e); } } /** @@ -870,14 +894,31 @@ public class Hive { try { return getMSC().dropIndex(db_name, tbl_name, index_name, deleteData); } catch (NoSuchObjectException e) { - throw new HiveException("Partition or table doesn't exist.", e); + throw new HiveException("Partition or table doesn't exist. " + e.getMessage(), e); } catch (Exception e) { - throw new HiveException("Unknown error. Please check logs.", e); + throw new HiveException(e.getMessage(), e); } } /** * Drops table along with the data in it. If the table doesn't exist then it + * is a no-op. If ifPurge option is specified it is passed to the + * hdfs command that removes table data from warehouse to make it skip trash. + * + * @param tableName + * table to drop + * @param ifPurge + * completely purge the table (skipping trash) while removing data from warehouse + * @throws HiveException + * thrown if the drop fails + */ + public void dropTable(String tableName, boolean ifPurge) throws HiveException { + String[] names = Utilities.getDbTableName(tableName); + dropTable(names[0], names[1], true, true, ifPurge); + } + + /** + * Drops table along with the data in it. If the table doesn't exist then it * is a no-op * * @param tableName @@ -886,8 +927,7 @@ public class Hive { * thrown if the drop fails */ public void dropTable(String tableName) throws HiveException { - String[] names = Utilities.getDbTableName(tableName); - dropTable(names[0], names[1], true, true); + dropTable(tableName, false); } /** @@ -902,7 +942,7 @@ public class Hive { * thrown if the drop fails */ public void dropTable(String dbName, String tableName) throws HiveException { - dropTable(dbName, tableName, true, true); + dropTable(dbName, tableName, true, true, false); } /** @@ -913,14 +953,31 @@ public class Hive { * @param deleteData * deletes the underlying data along with metadata * @param ignoreUnknownTab - * an exception if thrown if this is falser and table doesn't exist + * an exception is thrown if this is false and the table doesn't exist * @throws HiveException */ public void dropTable(String dbName, String tableName, boolean deleteData, boolean ignoreUnknownTab) throws HiveException { + dropTable(dbName, tableName, deleteData, ignoreUnknownTab, false); + } + /** + * Drops the table. + * + * @param dbName + * @param tableName + * @param deleteData + * deletes the underlying data along with metadata + * @param ignoreUnknownTab + * an exception is thrown if this is false and the table doesn't exist + * @param ifPurge + * completely purge the table skipping trash while removing data from warehouse + * @throws HiveException + */ + public void dropTable(String dbName, String tableName, boolean deleteData, + boolean ignoreUnknownTab, boolean ifPurge) throws HiveException { try { - getMSC().dropTable(dbName, tableName, deleteData, ignoreUnknownTab); + getMSC().dropTable(dbName, tableName, deleteData, ignoreUnknownTab, ifPurge); } catch (NoSuchObjectException e) { if (!ignoreUnknownTab) { throw new HiveException(e); @@ -1008,7 +1065,7 @@ public class Hive { } return null; } catch (Exception e) { - throw new HiveException("Unable to fetch table " + tableName, e); + throw new HiveException("Unable to fetch table " + tableName + ". " + e.getMessage(), e); } // For non-views, we need to do some extra fixes @@ -1204,6 +1261,15 @@ public class Hive { return getDatabase(currentDb); } + public void loadPartition(Path loadPath, String tableName, + Map<String, String> partSpec, boolean replace, boolean holdDDLTime, + boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, + boolean isSrcLocal, boolean isAcid) throws HiveException { + Table tbl = getTable(tableName); + loadPartition(loadPath, tbl, partSpec, replace, holdDDLTime, inheritTableSpecs, + isSkewedStoreAsSubdir, isSrcLocal, isAcid); + } + /** * Load a directory into a Hive Table Partition - Alters existing content of * the partition with the contents of loadPath. - If the partition does not @@ -1212,7 +1278,7 @@ public class Hive { * * @param loadPath * Directory containing files to load into Table - * @param tableName + * @param tbl * name of table to be loaded. * @param partSpec * defines which partition needs to be loaded @@ -1225,12 +1291,12 @@ public class Hive { * @param isSrcLocal * If the source directory is LOCAL */ - public void loadPartition(Path loadPath, String tableName, + public Partition loadPartition(Path loadPath, Table tbl, Map<String, String> partSpec, boolean replace, boolean holdDDLTime, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, boolean isSrcLocal, boolean isAcid) throws HiveException { - Table tbl = getTable(tableName); Path tblDataLocationPath = tbl.getDataLocation(); + Partition newTPart = null; try { /** * Move files before creating the partition since down stream processes @@ -1279,10 +1345,10 @@ public class Hive { Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid); } + boolean forceCreate = (!holdDDLTime) ? true : false; + newTPart = getPartition(tbl, partSpec, forceCreate, newPartPath.toString(), inheritTableSpecs); // recreate the partition if it existed before if (!holdDDLTime) { - Partition newTPart = getPartition(tbl, partSpec, true, newPartPath.toString(), - inheritTableSpecs); if (isSkewedStoreAsSubdir) { org.apache.hadoop.hive.metastore.api.Partition newCreatedTpart = newTPart.getTPartition(); SkewedInfo skewedInfo = newCreatedTpart.getSd().getSkewedInfo(); @@ -1292,9 +1358,9 @@ public class Hive { /* Add list bucketing location mappings. */ skewedInfo.setSkewedColValueLocationMaps(skewedColValueLocationMaps); newCreatedTpart.getSd().setSkewedInfo(skewedInfo); - alterPartition(tbl.getTableName(), new Partition(tbl, newCreatedTpart)); + alterPartition(tbl.getDbName(), tbl.getTableName(), new Partition(tbl, newCreatedTpart)); newTPart = getPartition(tbl, partSpec, true, newPartPath.toString(), inheritTableSpecs); - newCreatedTpart = newTPart.getTPartition(); + return new Partition(tbl, newCreatedTpart); } } } catch (IOException e) { @@ -1307,7 +1373,7 @@ public class Hive { LOG.error(StringUtils.stringifyException(e)); throw new HiveException(e); } - + return newTPart; } /** @@ -1403,18 +1469,18 @@ private void constructOneLBLocationMap(F * @param replace * @param numDP number of dynamic partitions * @param holdDDLTime - * @return a list of strings with the dynamic partition paths + * @return partition map details (PartitionSpec and Partition) * @throws HiveException */ - public ArrayList<LinkedHashMap<String, String>> loadDynamicPartitions(Path loadPath, + public Map<Map<String, String>, Partition> loadDynamicPartitions(Path loadPath, String tableName, Map<String, String> partSpec, boolean replace, int numDP, boolean holdDDLTime, boolean listBucketingEnabled, boolean isAcid) throws HiveException { Set<Path> validPartitions = new HashSet<Path>(); try { - ArrayList<LinkedHashMap<String, String>> fullPartSpecs = - new ArrayList<LinkedHashMap<String, String>>(); + Map<Map<String, String>, Partition> partitionsMap = new + LinkedHashMap<Map<String, String>, Partition>(); FileSystem fs = loadPath.getFileSystem(conf); FileStatus[] leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP+1, fs); @@ -1448,6 +1514,7 @@ private void constructOneLBLocationMap(F + " to at least " + validPartitions.size() + '.'); } + Table tbl = getTable(tableName); // for each dynamically created DP directory, construct a full partition spec // and load the partition based on that Iterator<Path> iter = validPartitions.iterator(); @@ -1460,14 +1527,12 @@ private void constructOneLBLocationMap(F // generate a full partition specification LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<String, String>(partSpec); Warehouse.makeSpecFromName(fullPartSpec, partPath); - fullPartSpecs.add(fullPartSpec); - - // finally load the partition -- move the file to the final table address - loadPartition(partPath, tableName, fullPartSpec, replace, holdDDLTime, true, - listBucketingEnabled, false, isAcid); + Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, replace, + holdDDLTime, true, listBucketingEnabled, false, isAcid); + partitionsMap.put(fullPartSpec, newPartition); LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec); } - return fullPartSpecs; + return partitionsMap; } catch (IOException e) { throw new HiveException(e); } @@ -1500,6 +1565,7 @@ private void constructOneLBLocationMap(F tbl.replaceFiles(loadPath, isSrcLocal); } else { tbl.copyFiles(loadPath, isSrcLocal, isAcid); + tbl.getParameters().put(StatsSetupConst.STATS_GENERATED_VIA_STATS_TASK, "true"); } try { @@ -1613,17 +1679,6 @@ private void constructOneLBLocationMap(F return getPartition(tbl, partSpec, forceCreate, null, true); } - private static void clearPartitionStats(org.apache.hadoop.hive.metastore.api.Partition tpart) { - Map<String,String> tpartParams = tpart.getParameters(); - if (tpartParams == null) { - return; - } - - for (String statType : StatsSetupConst.supportedStats) { - tpartParams.remove(statType); - } - } - /** * Returns partition metadata * @@ -1691,7 +1746,7 @@ private void constructOneLBLocationMap(F throw new HiveException("new partition path should not be null or empty."); } tpart.getSd().setLocation(partPath); - clearPartitionStats(tpart); + tpart.getParameters().put(StatsSetupConst.STATS_GENERATED_VIA_STATS_TASK,"true"); String fullName = tbl.getTableName(); if (!org.apache.commons.lang.StringUtils.isEmpty(tbl.getDbName())) { fullName = tbl.getDbName() + "." + tbl.getTableName(); @@ -1722,7 +1777,7 @@ private void constructOneLBLocationMap(F } catch (NoSuchObjectException e) { throw new HiveException("Partition or table doesn't exist.", e); } catch (Exception e) { - throw new HiveException("Unknown error. Please check logs.", e); + throw new HiveException(e.getMessage(), e); } } @@ -1736,6 +1791,7 @@ private void constructOneLBLocationMap(F public List<Partition> dropPartitions(String dbName, String tblName, List<DropTableDesc.PartSpec> partSpecs, boolean deleteData, boolean ignoreProtection, boolean ifExists) throws HiveException { + //TODO: add support for ifPurge try { Table tbl = getTable(dbName, tblName); List<ObjectPair<Integer, byte[]>> partExprs = @@ -1750,7 +1806,7 @@ private void constructOneLBLocationMap(F } catch (NoSuchObjectException e) { throw new HiveException("Partition or table doesn't exist.", e); } catch (Exception e) { - throw new HiveException("Unknown error. Please check logs.", e); + throw new HiveException(e.getMessage(), e); } } @@ -2243,7 +2299,7 @@ private void constructOneLBLocationMap(F result.add(srcToDest); } } catch (IOException e) { - throw new HiveException("checkPaths: filesystem error in check phase", e); + throw new HiveException("checkPaths: filesystem error in check phase. " + e.getMessage(), e); } return result; } @@ -2310,7 +2366,7 @@ private void constructOneLBLocationMap(F try { ShimLoader.getHadoopShims().setFullFileStatus(conf, destStatus, fs, destf); } catch (IOException e) { - LOG.warn("Error setting permission of file " + destf + ": "+ StringUtils.stringifyException(e)); + LOG.warn("Error setting permission of file " + destf + ": "+ e.getMessage(), e); } } return success; @@ -2349,7 +2405,7 @@ private void constructOneLBLocationMap(F srcs = srcFs.globStatus(srcf); } catch (IOException e) { LOG.error(StringUtils.stringifyException(e)); - throw new HiveException("addFiles: filesystem error in check phase", e); + throw new HiveException("addFiles: filesystem error in check phase. " + e.getMessage(), e); } if (srcs == null) { LOG.info("No sources specified to move: " + srcf); @@ -2375,7 +2431,7 @@ private void constructOneLBLocationMap(F } } } catch (IOException e) { - throw new HiveException("copyFiles: error while moving files!!!", e); + throw new HiveException("copyFiles: error while moving files!!! " + e.getMessage(), e); } } } @@ -2447,7 +2503,7 @@ private void constructOneLBLocationMap(F fs.rename(bucketSrc, bucketDest); } } catch (IOException e) { - throw new HiveException("Error moving acid files", e); + throw new HiveException("Error moving acid files " + e.getMessage(), e); } } } @@ -2679,7 +2735,7 @@ private void constructOneLBLocationMap(F throw new HiveException(e); } } - + public boolean setPartitionColumnStatistics(SetPartitionsStatsRequest request) throws HiveException { try { return getMSC().setPartitionColumnStatistics(request);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java?rev=1629563&r1=1629562&r2=1629563&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java Mon Oct 6 04:00:39 2014 @@ -451,7 +451,11 @@ public class SessionHiveMetaStoreClient // Delete table data if (deleteData && !MetaStoreUtils.isExternalTable(table)) { try { - getWh().deleteDir(tablePath, true); + boolean ifPurge = false; + if (envContext != null){ + ifPurge = Boolean.parseBoolean(envContext.getProperties().get("ifPurge")); + } + getWh().deleteDir(tablePath, true, ifPurge); } catch (Exception err) { LOG.error("Failed to delete temp table directory: " + tablePath, err); // Forgive error Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=1629563&r1=1629562&r2=1629563&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Mon Oct 6 04:00:39 2014 @@ -385,6 +385,10 @@ public class Table implements Serializab tTable.getParameters().put(name, value); } + public void setParamters(Map<String, String> params) { + tTable.setParameters(params); + } + public String getProperty(String name) { return tTable.getParameters().get(name); } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java?rev=1629563&r1=1629562&r2=1629563&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java Mon Oct 6 04:00:39 2014 @@ -670,10 +670,15 @@ public final class ConstantPropagateProc cppCtx.getOpToConstantExprs().put(op, constants); foldOperator(op, cppCtx); List<ExprNodeDesc> colList = op.getConf().getColList(); + List<String> columnNames = op.getConf().getOutputColumnNames(); + Map<String, ExprNodeDesc> columnExprMap = op.getColumnExprMap(); if (colList != null) { for (int i = 0; i < colList.size(); i++) { ExprNodeDesc newCol = foldExpr(colList.get(i), constants, cppCtx, op, 0, false); colList.set(i, newCol); + if (columnExprMap != null) { + columnExprMap.put(columnNames.get(i), newCol); + } } LOG.debug("New column list:(" + StringUtils.join(colList, " ") + ")"); } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java?rev=1629563&r1=1629562&r2=1629563&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java Mon Oct 6 04:00:39 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.optimizer; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -29,12 +30,17 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator; +import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; +import org.apache.hadoop.hive.ql.exec.DummyStoreOperator; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.MuxOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.TezDummyStoreOperator; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; @@ -42,12 +48,16 @@ import org.apache.hadoop.hive.ql.parse.O import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; +import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.JoinCondDesc; +import org.apache.hadoop.hive.ql.plan.JoinDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.OpTraits; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.Statistics; +import org.apache.hadoop.util.ReflectionUtils; /** * ConvertJoinMapJoin is an optimization that replaces a common join @@ -60,39 +70,46 @@ public class ConvertJoinMapJoin implemen static final private Log LOG = LogFactory.getLog(ConvertJoinMapJoin.class.getName()); + @SuppressWarnings("unchecked") @Override - /* - * (non-Javadoc) - * we should ideally not modify the tree we traverse. - * However, since we need to walk the tree at any time when we modify the - * operator, we might as well do it here. - */ - public Object process(Node nd, Stack<Node> stack, - NodeProcessorCtx procCtx, Object... nodeOutputs) - throws SemanticException { + /* + * (non-Javadoc) we should ideally not modify the tree we traverse. However, + * since we need to walk the tree at any time when we modify the operator, we + * might as well do it here. + */ + public Object + process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs) + throws SemanticException { OptimizeTezProcContext context = (OptimizeTezProcContext) procCtx; - if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) { + JoinOperator joinOp = (JoinOperator) nd; + + if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN) + && !(context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN))) { + // we are just converting to a common merge join operator. The shuffle + // join in map-reduce case. + int pos = 0; // it doesn't matter which position we use in this case. + convertJoinSMBJoin(joinOp, context, pos, 0, false, false); return null; } - JoinOperator joinOp = (JoinOperator) nd; - // if we have traits, and table info is present in the traits, we know the + // if we have traits, and table info is present in the traits, we know the // exact number of buckets. Else choose the largest number of estimated // reducers from the parent operators. int numBuckets = -1; int estimatedBuckets = -1; + TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf); if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) { for (Operator<? extends OperatorDesc>parentOp : joinOp.getParentOperators()) { if (parentOp.getOpTraits().getNumBuckets() > 0) { - numBuckets = (numBuckets < parentOp.getOpTraits().getNumBuckets()) ? - parentOp.getOpTraits().getNumBuckets() : numBuckets; + numBuckets = (numBuckets < parentOp.getOpTraits().getNumBuckets()) ? + parentOp.getOpTraits().getNumBuckets() : numBuckets; } if (parentOp instanceof ReduceSinkOperator) { ReduceSinkOperator rs = (ReduceSinkOperator)parentOp; - estimatedBuckets = (estimatedBuckets < rs.getConf().getNumReducers()) ? + estimatedBuckets = (estimatedBuckets < rs.getConf().getNumReducers()) ? rs.getConf().getNumReducers() : estimatedBuckets; } } @@ -107,29 +124,80 @@ public class ConvertJoinMapJoin implemen numBuckets = 1; } LOG.info("Estimated number of buckets " + numBuckets); - int mapJoinConversionPos = mapJoinConversionPos(joinOp, context, numBuckets); + int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets); if (mapJoinConversionPos < 0) { - // we cannot convert to bucket map join, we cannot convert to - // map join either based on the size + // we cannot convert to bucket map join, we cannot convert to + // map join either based on the size. Check if we can convert to SMB join. + if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) { + convertJoinSMBJoin(joinOp, context, 0, 0, false, false); + return null; + } + Class<? extends BigTableSelectorForAutoSMJ> bigTableMatcherClass = null; + try { + bigTableMatcherClass = + (Class<? extends BigTableSelectorForAutoSMJ>) (Class.forName(HiveConf.getVar( + context.parseContext.getConf(), + HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN_BIGTABLE_SELECTOR))); + } catch (ClassNotFoundException e) { + throw new SemanticException(e.getMessage()); + } + + BigTableSelectorForAutoSMJ bigTableMatcher = + ReflectionUtils.newInstance(bigTableMatcherClass, null); + JoinDesc joinDesc = joinOp.getConf(); + JoinCondDesc[] joinCondns = joinDesc.getConds(); + Set<Integer> joinCandidates = MapJoinProcessor.getBigTableCandidates(joinCondns); + if (joinCandidates.isEmpty()) { + // This is a full outer join. This can never be a map-join + // of any type. So return false. + return false; + } + mapJoinConversionPos = + bigTableMatcher.getBigTablePosition(context.parseContext, joinOp, joinCandidates); + if (mapJoinConversionPos < 0) { + // contains aliases from sub-query + // we are just converting to a common merge join operator. The shuffle + // join in map-reduce case. + int pos = 0; // it doesn't matter which position we use in this case. + convertJoinSMBJoin(joinOp, context, pos, 0, false, false); + return null; + } + + if (checkConvertJoinSMBJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) { + convertJoinSMBJoin(joinOp, context, mapJoinConversionPos, + tezBucketJoinProcCtx.getNumBuckets(), tezBucketJoinProcCtx.isSubQuery(), true); + } else { + // we are just converting to a common merge join operator. The shuffle + // join in map-reduce case. + int pos = 0; // it doesn't matter which position we use in this case. + convertJoinSMBJoin(joinOp, context, pos, 0, false, false); + } return null; } - if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) { - if (convertJoinBucketMapJoin(joinOp, context, mapJoinConversionPos)) { - return null; + if (numBuckets > 1) { + if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) { + if (convertJoinBucketMapJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) { + return null; + } } } LOG.info("Convert to non-bucketed map join"); // check if we can convert to map join no bucket scaling. - mapJoinConversionPos = mapJoinConversionPos(joinOp, context, 1); + mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, 1); if (mapJoinConversionPos < 0) { + // we are just converting to a common merge join operator. The shuffle + // join in map-reduce case. + int pos = 0; // it doesn't matter which position we use in this case. + convertJoinSMBJoin(joinOp, context, pos, 0, false, false); return null; } MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos); // map join operator by default has no bucket cols - mapJoinOp.setOpTraits(new OpTraits(null, -1)); + mapJoinOp.setOpTraits(new OpTraits(null, -1, null)); + mapJoinOp.setStatistics(joinOp.getStatistics()); // propagate this change till the next RS for (Operator<? extends OperatorDesc> childOp : mapJoinOp.getChildOperators()) { setAllChildrenTraitsToNull(childOp); @@ -138,11 +206,107 @@ public class ConvertJoinMapJoin implemen return null; } + // replaces the join operator with a new CommonJoinOperator, removes the + // parent reduce sinks + private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext context, + int mapJoinConversionPos, int numBuckets, boolean isSubQuery, boolean adjustParentsChildren) + throws SemanticException { + ParseContext parseContext = context.parseContext; + MapJoinDesc mapJoinDesc = null; + if (adjustParentsChildren) { + mapJoinDesc = MapJoinProcessor.getMapJoinDesc(context.conf, parseContext.getOpParseCtx(), + joinOp, parseContext.getJoinContext().get(joinOp), mapJoinConversionPos, true); + } else { + JoinDesc joinDesc = joinOp.getConf(); + // retain the original join desc in the map join. + mapJoinDesc = + new MapJoinDesc(null, null, joinDesc.getExprs(), null, null, + joinDesc.getOutputColumnNames(), mapJoinConversionPos, joinDesc.getConds(), + joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null); + } + + @SuppressWarnings("unchecked") + CommonMergeJoinOperator mergeJoinOp = + (CommonMergeJoinOperator) OperatorFactory.get(new CommonMergeJoinDesc(numBuckets, + isSubQuery, mapJoinConversionPos, mapJoinDesc)); + OpTraits opTraits = + new OpTraits(joinOp.getOpTraits().getBucketColNames(), numBuckets, joinOp.getOpTraits() + .getSortCols()); + mergeJoinOp.setOpTraits(opTraits); + mergeJoinOp.setStatistics(joinOp.getStatistics()); + + for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) { + int pos = parentOp.getChildOperators().indexOf(joinOp); + parentOp.getChildOperators().remove(pos); + parentOp.getChildOperators().add(pos, mergeJoinOp); + } + + for (Operator<? extends OperatorDesc> childOp : joinOp.getChildOperators()) { + int pos = childOp.getParentOperators().indexOf(joinOp); + childOp.getParentOperators().remove(pos); + childOp.getParentOperators().add(pos, mergeJoinOp); + } + + List<Operator<? extends OperatorDesc>> childOperators = mergeJoinOp.getChildOperators(); + if (childOperators == null) { + childOperators = new ArrayList<Operator<? extends OperatorDesc>>(); + mergeJoinOp.setChildOperators(childOperators); + } + + List<Operator<? extends OperatorDesc>> parentOperators = mergeJoinOp.getParentOperators(); + if (parentOperators == null) { + parentOperators = new ArrayList<Operator<? extends OperatorDesc>>(); + mergeJoinOp.setParentOperators(parentOperators); + } + + childOperators.clear(); + parentOperators.clear(); + childOperators.addAll(joinOp.getChildOperators()); + parentOperators.addAll(joinOp.getParentOperators()); + mergeJoinOp.getConf().setGenJoinKeys(false); + + if (adjustParentsChildren) { + mergeJoinOp.getConf().setGenJoinKeys(true); + List<Operator<? extends OperatorDesc>> newParentOpList = + new ArrayList<Operator<? extends OperatorDesc>>(); + for (Operator<? extends OperatorDesc> parentOp : mergeJoinOp.getParentOperators()) { + for (Operator<? extends OperatorDesc> grandParentOp : parentOp.getParentOperators()) { + grandParentOp.getChildOperators().remove(parentOp); + grandParentOp.getChildOperators().add(mergeJoinOp); + newParentOpList.add(grandParentOp); + } + } + mergeJoinOp.getParentOperators().clear(); + mergeJoinOp.getParentOperators().addAll(newParentOpList); + List<Operator<? extends OperatorDesc>> parentOps = + new ArrayList<Operator<? extends OperatorDesc>>(mergeJoinOp.getParentOperators()); + for (Operator<? extends OperatorDesc> parentOp : parentOps) { + int parentIndex = mergeJoinOp.getParentOperators().indexOf(parentOp); + if (parentIndex == mapJoinConversionPos) { + continue; + } + + // insert the dummy store operator here + DummyStoreOperator dummyStoreOp = new TezDummyStoreOperator(); + dummyStoreOp.setParentOperators(new ArrayList<Operator<? extends OperatorDesc>>()); + dummyStoreOp.setChildOperators(new ArrayList<Operator<? extends OperatorDesc>>()); + dummyStoreOp.getChildOperators().add(mergeJoinOp); + int index = parentOp.getChildOperators().indexOf(mergeJoinOp); + parentOp.getChildOperators().remove(index); + parentOp.getChildOperators().add(index, dummyStoreOp); + dummyStoreOp.getParentOperators().add(parentOp); + mergeJoinOp.getParentOperators().remove(parentIndex); + mergeJoinOp.getParentOperators().add(parentIndex, dummyStoreOp); + } + } + mergeJoinOp.cloneOriginalParentsList(mergeJoinOp.getParentOperators()); + } + private void setAllChildrenTraitsToNull(Operator<? extends OperatorDesc> currentOp) { if (currentOp instanceof ReduceSinkOperator) { return; } - currentOp.setOpTraits(new OpTraits(null, -1)); + currentOp.setOpTraits(new OpTraits(null, -1, null)); for (Operator<? extends OperatorDesc> childOp : currentOp.getChildOperators()) { if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof GroupByOperator)) { break; @@ -151,28 +315,26 @@ public class ConvertJoinMapJoin implemen } } - private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcContext context, - int bigTablePosition) throws SemanticException { - - TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf); + private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcContext context, + int bigTablePosition, TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException { if (!checkConvertJoinBucketMapJoin(joinOp, context, bigTablePosition, tezBucketJoinProcCtx)) { LOG.info("Check conversion to bucket map join failed."); return false; } - MapJoinOperator mapJoinOp = - convertJoinMapJoin(joinOp, context, bigTablePosition); + MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, bigTablePosition); MapJoinDesc joinDesc = mapJoinOp.getConf(); joinDesc.setBucketMapJoin(true); // we can set the traits for this join operator OpTraits opTraits = new OpTraits(joinOp.getOpTraits().getBucketColNames(), - tezBucketJoinProcCtx.getNumBuckets()); + tezBucketJoinProcCtx.getNumBuckets(), null); mapJoinOp.setOpTraits(opTraits); + mapJoinOp.setStatistics(joinOp.getStatistics()); setNumberOfBucketsOnChildren(mapJoinOp); - // Once the conversion is done, we can set the partitioner to bucket cols on the small table + // Once the conversion is done, we can set the partitioner to bucket cols on the small table Map<String, Integer> bigTableBucketNumMapping = new HashMap<String, Integer>(); bigTableBucketNumMapping.put(joinDesc.getBigTableAlias(), tezBucketJoinProcCtx.getNumBuckets()); joinDesc.setBigTableBucketNumMapping(bigTableBucketNumMapping); @@ -182,6 +344,54 @@ public class ConvertJoinMapJoin implemen return true; } + /* + * This method tries to convert a join to an SMB. This is done based on + * traits. If the sorted by columns are the same as the join columns then, we + * can convert the join to an SMB. Otherwise retain the bucket map join as it + * is still more efficient than a regular join. + */ + private boolean checkConvertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext context, + int bigTablePosition, TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException { + + ReduceSinkOperator bigTableRS = + (ReduceSinkOperator) joinOp.getParentOperators().get(bigTablePosition); + int numBuckets = bigTableRS.getParentOperators().get(0).getOpTraits() + .getNumBuckets(); + + // the sort and bucket cols have to match on both sides for this + // transformation of the join operation + for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) { + if (!(parentOp instanceof ReduceSinkOperator)) { + // could be mux/demux operators. Currently not supported + LOG.info("Found correlation optimizer operators. Cannot convert to SMB at this time."); + return false; + } + ReduceSinkOperator rsOp = (ReduceSinkOperator) parentOp; + if (checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getSortCols(), rsOp + .getOpTraits().getSortCols(), rsOp.getColumnExprMap(), tezBucketJoinProcCtx) == false) { + LOG.info("We cannot convert to SMB because the sort column names do not match."); + return false; + } + + if (checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getBucketColNames(), rsOp + .getOpTraits().getBucketColNames(), rsOp.getColumnExprMap(), tezBucketJoinProcCtx) + == false) { + LOG.info("We cannot convert to SMB because bucket column names do not match."); + return false; + } + } + + boolean isSubQuery = false; + if (numBuckets < 0) { + isSubQuery = true; + numBuckets = bigTableRS.getConf().getNumReducers(); + } + tezBucketJoinProcCtx.setNumBuckets(numBuckets); + tezBucketJoinProcCtx.setIsSubQuery(isSubQuery); + LOG.info("We can convert the join to an SMB join."); + return true; + } + private void setNumberOfBucketsOnChildren(Operator<? extends OperatorDesc> currentOp) { int numBuckets = currentOp.getOpTraits().getNumBuckets(); for (Operator<? extends OperatorDesc>op : currentOp.getChildOperators()) { @@ -193,15 +403,13 @@ public class ConvertJoinMapJoin implemen } /* - * We perform the following checks to see if we can convert to a bucket map join - * 1. If the parent reduce sink of the big table side has the same emit key cols as - * its parent, we can create a bucket map join eliminating the reduce sink. - * 2. If we have the table information, we can check the same way as in Mapreduce to - * determine if we can perform a Bucket Map Join. + * If the parent reduce sink of the big table side has the same emit key cols + * as its parent, we can create a bucket map join eliminating the reduce sink. */ - private boolean checkConvertJoinBucketMapJoin(JoinOperator joinOp, - OptimizeTezProcContext context, int bigTablePosition, - TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException { + private boolean checkConvertJoinBucketMapJoin(JoinOperator joinOp, + OptimizeTezProcContext context, int bigTablePosition, + TezBucketJoinProcCtx tezBucketJoinProcCtx) + throws SemanticException { // bail on mux-operator because mux operator masks the emit keys of the // constituent reduce sinks if (!(joinOp.getParentOperators().get(0) instanceof ReduceSinkOperator)) { @@ -211,14 +419,41 @@ public class ConvertJoinMapJoin implemen } ReduceSinkOperator rs = (ReduceSinkOperator) joinOp.getParentOperators().get(bigTablePosition); + List<List<String>> parentColNames = rs.getOpTraits().getBucketColNames(); + Operator<? extends OperatorDesc> parentOfParent = rs.getParentOperators().get(0); + List<List<String>> grandParentColNames = parentOfParent.getOpTraits().getBucketColNames(); + int numBuckets = parentOfParent.getOpTraits().getNumBuckets(); + // all keys matched. + if (checkColEquality(grandParentColNames, parentColNames, rs.getColumnExprMap(), + tezBucketJoinProcCtx) == false) { + LOG.info("No info available to check for bucket map join. Cannot convert"); + return false; + } + /* * this is the case when the big table is a sub-query and is probably - * already bucketed by the join column in say a group by operation + * already bucketed by the join column in say a group by operation */ - List<List<String>> colNames = rs.getParentOperators().get(0).getOpTraits().getBucketColNames(); - if ((colNames != null) && (colNames.isEmpty() == false)) { - Operator<? extends OperatorDesc>parentOfParent = rs.getParentOperators().get(0); - for (List<String>listBucketCols : parentOfParent.getOpTraits().getBucketColNames()) { + boolean isSubQuery = false; + if (numBuckets < 0) { + isSubQuery = true; + numBuckets = rs.getConf().getNumReducers(); + } + tezBucketJoinProcCtx.setNumBuckets(numBuckets); + tezBucketJoinProcCtx.setIsSubQuery(isSubQuery); + return true; + } + + private boolean checkColEquality(List<List<String>> grandParentColNames, + List<List<String>> parentColNames, Map<String, ExprNodeDesc> colExprMap, + TezBucketJoinProcCtx tezBucketJoinProcCtx) { + + if ((grandParentColNames == null) || (parentColNames == null)) { + return false; + } + + if ((parentColNames != null) && (parentColNames.isEmpty() == false)) { + for (List<String> listBucketCols : grandParentColNames) { // can happen if this operator does not carry forward the previous bucketing columns // for e.g. another join operator which does not carry one of the sides' key columns if (listBucketCols.isEmpty()) { @@ -226,9 +461,9 @@ public class ConvertJoinMapJoin implemen } int colCount = 0; // parent op is guaranteed to have a single list because it is a reduce sink - for (String colName : rs.getOpTraits().getBucketColNames().get(0)) { + for (String colName : parentColNames.get(0)) { // all columns need to be at least a subset of the parentOfParent's bucket cols - ExprNodeDesc exprNodeDesc = rs.getColumnExprMap().get(colName); + ExprNodeDesc exprNodeDesc = colExprMap.get(colName); if (exprNodeDesc instanceof ExprNodeColumnDesc) { if (((ExprNodeColumnDesc)exprNodeDesc).getColumn().equals(listBucketCols.get(colCount))) { colCount++; @@ -236,32 +471,21 @@ public class ConvertJoinMapJoin implemen break; } } - - if (colCount == rs.getOpTraits().getBucketColNames().get(0).size()) { - // all keys matched. - int numBuckets = parentOfParent.getOpTraits().getNumBuckets(); - boolean isSubQuery = false; - if (numBuckets < 0) { - isSubQuery = true; - numBuckets = rs.getConf().getNumReducers(); - } - tezBucketJoinProcCtx.setNumBuckets(numBuckets); - tezBucketJoinProcCtx.setIsSubQuery(isSubQuery); + + if (colCount == parentColNames.get(0).size()) { return true; } } } return false; } - - LOG.info("No info available to check for bucket map join. Cannot convert"); return false; } - public int mapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext context, + public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext context, int buckets) { - Set<Integer> bigTableCandidateSet = MapJoinProcessor. - getBigTableCandidates(joinOp.getConf().getConds()); + Set<Integer> bigTableCandidateSet = + MapJoinProcessor.getBigTableCandidates(joinOp.getConf().getConds()); long maxSize = context.conf.getLongVar( HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); @@ -287,7 +511,7 @@ public class ConvertJoinMapJoin implemen long inputSize = currInputStat.getDataSize(); if ((bigInputStat == null) || ((bigInputStat != null) && - (inputSize > bigInputStat.getDataSize()))) { + (inputSize > bigInputStat.getDataSize()))) { if (bigTableFound) { // cannot convert to map join; we've already chosen a big table @@ -347,9 +571,9 @@ public class ConvertJoinMapJoin implemen * for tez. */ - public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcContext context, + public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcContext context, int bigTablePosition) throws SemanticException { - // bail on mux operator because currently the mux operator masks the emit keys + // bail on mux operator because currently the mux operator masks the emit keys // of the constituent reduce sinks. for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) { if (parentOp instanceof MuxOperator) { @@ -359,12 +583,12 @@ public class ConvertJoinMapJoin implemen //can safely convert the join to a map join. ParseContext parseContext = context.parseContext; - MapJoinOperator mapJoinOp = MapJoinProcessor. - convertJoinOpMapJoinOp(context.conf, parseContext.getOpParseCtx(), - joinOp, parseContext.getJoinContext().get(joinOp), bigTablePosition, true); + MapJoinOperator mapJoinOp = + MapJoinProcessor.convertJoinOpMapJoinOp(context.conf, parseContext.getOpParseCtx(), joinOp, + parseContext.getJoinContext().get(joinOp), bigTablePosition, true); - Operator<? extends OperatorDesc> parentBigTableOp - = mapJoinOp.getParentOperators().get(bigTablePosition); + Operator<? extends OperatorDesc> parentBigTableOp = + mapJoinOp.getParentOperators().get(bigTablePosition); if (parentBigTableOp instanceof ReduceSinkOperator) { for (Operator<?> p : parentBigTableOp.getParentOperators()) { // we might have generated a dynamic partition operator chain. Since @@ -380,11 +604,10 @@ public class ConvertJoinMapJoin implemen } } mapJoinOp.getParentOperators().remove(bigTablePosition); - if (!(mapJoinOp.getParentOperators().contains( - parentBigTableOp.getParentOperators().get(0)))) { + if (!(mapJoinOp.getParentOperators().contains(parentBigTableOp.getParentOperators().get(0)))) { mapJoinOp.getParentOperators().add(bigTablePosition, parentBigTableOp.getParentOperators().get(0)); - } + } parentBigTableOp.getParentOperators().get(0).removeChild(parentBigTableOp); for (Operator<? extends OperatorDesc> op : mapJoinOp.getParentOperators()) { if (!(op.getChildOperators().contains(mapJoinOp))) { @@ -397,15 +620,31 @@ public class ConvertJoinMapJoin implemen return mapJoinOp; } - private boolean hasDynamicPartitionBroadcast(Operator<?> op) { - if (op instanceof AppMasterEventOperator && op.getConf() instanceof DynamicPruningEventDesc) { - return true; - } - for (Operator<?> c : op.getChildOperators()) { - if (hasDynamicPartitionBroadcast(c)) { - return true; + private boolean hasDynamicPartitionBroadcast(Operator<?> parent) { + boolean hasDynamicPartitionPruning = false; + + for (Operator<?> op: parent.getChildOperators()) { + while (op != null) { + if (op instanceof AppMasterEventOperator && op.getConf() instanceof DynamicPruningEventDesc) { + // found dynamic partition pruning operator + hasDynamicPartitionPruning = true; + break; + } + + if (op instanceof ReduceSinkOperator || op instanceof FileSinkOperator) { + // crossing reduce sink or file sink means the pruning isn't for this parent. + break; + } + + if (op.getChildOperators().size() != 1) { + // dynamic partition pruning pipeline doesn't have multiple children + break; + } + + op = op.getChildOperators().get(0); } } - return false; + + return hasDynamicPartitionPruning; } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1629563&r1=1629562&r2=1629563&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Mon Oct 6 04:00:39 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.optimizer; +import com.google.common.collect.Interner; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; @@ -39,8 +40,6 @@ import org.apache.hadoop.hive.ql.exec.No import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.OperatorUtils; -import org.apache.hadoop.hive.ql.exec.OrcFileMergeOperator; -import org.apache.hadoop.hive.ql.exec.RCFileMergeOperator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; @@ -101,7 +100,6 @@ import org.apache.hadoop.hive.ql.plan.Ta import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.InputFormat; import java.io.Serializable; @@ -580,8 +578,6 @@ public final class GenMapRedUtils { //This read entity is a direct read entity and not an indirect read (that is when // this is being read because it is a dependency of a view). boolean isDirectRead = (parentViewInfo == null); - PlanUtils.addInput(inputs, - new ReadEntity(parseCtx.getTopToTable().get(topOp), parentViewInfo, isDirectRead)); for (Partition part : parts) { if (part.getTable().isPartitioned()) { @@ -882,6 +878,30 @@ public final class GenMapRedUtils { } } + public static void internTableDesc(Task<?> task, Interner<TableDesc> interner) { + + if (task instanceof ConditionalTask) { + for (Task tsk : ((ConditionalTask) task).getListTasks()) { + internTableDesc(tsk, interner); + } + } else if (task instanceof ExecDriver) { + MapredWork work = (MapredWork) task.getWork(); + work.getMapWork().internTable(interner); + } else if (task != null && (task.getWork() instanceof TezWork)) { + TezWork work = (TezWork)task.getWork(); + for (BaseWork w : work.getAllWorkUnsorted()) { + if (w instanceof MapWork) { + ((MapWork)w).internTable(interner); + } + } + } + if (task.getNumChild() > 0) { + for (Task childTask : task.getChildTasks()) { + internTableDesc(childTask, interner); + } + } + } + /** * create a new plan and return. * @@ -1507,7 +1527,7 @@ public final class GenMapRedUtils { * * @param fsInputDesc * @param finalName - * @param inputFormatClass + * @param inputFormatClass * @return MergeWork if table is stored as RCFile or ORCFile, * null otherwise */ @@ -1714,7 +1734,7 @@ public final class GenMapRedUtils { // There are separate configuration parameters to control whether to // merge for a map-only job // or for a map-reduce job - if (currTask.getWork() instanceof MapredWork) { + if (currTask.getWork() instanceof MapredWork) { ReduceWork reduceWork = ((MapredWork) currTask.getWork()).getReduceWork(); boolean mergeMapOnly = hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) && reduceWork == null; @@ -1813,7 +1833,7 @@ public final class GenMapRedUtils { return Collections.emptyList(); } - public static List<Path> getInputPathsForPartialScan(QBParseInfo parseInfo, StringBuffer aggregationKey) + public static List<Path> getInputPathsForPartialScan(QBParseInfo parseInfo, StringBuffer aggregationKey) throws SemanticException { List<Path> inputPaths = new ArrayList<Path>(); switch (parseInfo.getTableSpec().specType) { @@ -1850,6 +1870,7 @@ public final class GenMapRedUtils { public static Set<Operator<?>> findTopOps(Operator<?> startOp, final Class<?> clazz) { final Set<Operator<?>> operators = new LinkedHashSet<Operator<?>>(); OperatorUtils.iterateParents(startOp, new NodeUtils.Function<Operator<?>>() { + @Override public void apply(Operator<?> argument) { if (argument.getNumParent() == 0 && (clazz == null || clazz.isInstance(argument))) { operators.add(argument); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1629563&r1=1629562&r2=1629563&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Mon Oct 6 04:00:39 2014 @@ -389,157 +389,8 @@ public class MapJoinProcessor implements JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException { - JoinDesc desc = op.getConf(); - JoinCondDesc[] condns = desc.getConds(); - Byte[] tagOrder = desc.getTagOrder(); - - // outer join cannot be performed on a table which is being cached - if (!noCheckOuterJoin) { - if (checkMapJoin(mapJoinPos, condns) < 0) { - throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg()); - } - } - - // Walk over all the sources (which are guaranteed to be reduce sink - // operators). - // The join outputs a concatenation of all the inputs. - QBJoinTree leftSrc = joinTree.getJoinSrc(); - List<ReduceSinkOperator> oldReduceSinkParentOps = - new ArrayList<ReduceSinkOperator>(op.getNumParent()); - if (leftSrc != null) { - // assert mapJoinPos == 0; - Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(0); - assert parentOp.getParentOperators().size() == 1; - oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp); - } - - - byte pos = 0; - for (String src : joinTree.getBaseSrc()) { - if (src != null) { - Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(pos); - assert parentOp.getParentOperators().size() == 1; - oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp); - } - pos++; - } - - Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap(); - List<ColumnInfo> schema = new ArrayList<ColumnInfo>(op.getSchema().getSignature()); - Map<Byte, List<ExprNodeDesc>> valueExprs = op.getConf().getExprs(); - Map<Byte, List<ExprNodeDesc>> newValueExprs = new HashMap<Byte, List<ExprNodeDesc>>(); - for (Map.Entry<Byte, List<ExprNodeDesc>> entry : valueExprs.entrySet()) { - byte tag = entry.getKey(); - Operator<?> terminal = oldReduceSinkParentOps.get(tag); - - List<ExprNodeDesc> values = entry.getValue(); - List<ExprNodeDesc> newValues = ExprNodeDescUtils.backtrack(values, op, terminal); - newValueExprs.put(tag, newValues); - for (int i = 0; i < schema.size(); i++) { - ColumnInfo column = schema.get(i); - if (column == null) { - continue; - } - ExprNodeDesc expr = colExprMap.get(column.getInternalName()); - int index = ExprNodeDescUtils.indexOf(expr, values); - if (index >= 0) { - colExprMap.put(column.getInternalName(), newValues.get(index)); - schema.set(i, null); - } - } - } - - // rewrite value index for mapjoin - Map<Byte, int[]> valueIndices = new HashMap<Byte, int[]>(); - - // get the join keys from old parent ReduceSink operators - Map<Byte, List<ExprNodeDesc>> keyExprMap = new HashMap<Byte, List<ExprNodeDesc>>(); - - // construct valueTableDescs and valueFilteredTableDescs - List<TableDesc> valueTableDescs = new ArrayList<TableDesc>(); - List<TableDesc> valueFilteredTableDescs = new ArrayList<TableDesc>(); - int[][] filterMap = desc.getFilterMap(); - for (pos = 0; pos < op.getParentOperators().size(); pos++) { - ReduceSinkOperator inputRS = oldReduceSinkParentOps.get(pos); - List<ExprNodeDesc> keyCols = inputRS.getConf().getKeyCols(); - List<ExprNodeDesc> valueCols = newValueExprs.get(pos); - if (pos != mapJoinPos) { - // remove values in key exprs for value table schema - // value expression for hashsink will be modified in LocalMapJoinProcessor - int[] valueIndex = new int[valueCols.size()]; - List<ExprNodeDesc> valueColsInValueExpr = new ArrayList<ExprNodeDesc>(); - for (int i = 0; i < valueIndex.length; i++) { - ExprNodeDesc expr = valueCols.get(i); - int kindex = ExprNodeDescUtils.indexOf(expr, keyCols); - if (kindex >= 0) { - valueIndex[i] = kindex; - } else { - valueIndex[i] = -valueColsInValueExpr.size() - 1; - valueColsInValueExpr.add(expr); - } - } - if (needValueIndex(valueIndex)) { - valueIndices.put(pos, valueIndex); - } - valueCols = valueColsInValueExpr; - } - // deep copy expr node desc - List<ExprNodeDesc> valueFilteredCols = ExprNodeDescUtils.clone(valueCols); - if (filterMap != null && filterMap[pos] != null && pos != mapJoinPos) { - ExprNodeColumnDesc isFilterDesc = new ExprNodeColumnDesc(TypeInfoFactory - .getPrimitiveTypeInfo(serdeConstants.SMALLINT_TYPE_NAME), "filter", "filter", false); - valueFilteredCols.add(isFilterDesc); - } - - TableDesc valueTableDesc = PlanUtils.getMapJoinValueTableDesc(PlanUtils - .getFieldSchemasFromColumnList(valueCols, "mapjoinvalue")); - TableDesc valueFilteredTableDesc = PlanUtils.getMapJoinValueTableDesc(PlanUtils - .getFieldSchemasFromColumnList(valueFilteredCols, "mapjoinvalue")); - - valueTableDescs.add(valueTableDesc); - valueFilteredTableDescs.add(valueFilteredTableDesc); - - keyExprMap.put(pos, keyCols); - } - - Map<Byte, List<ExprNodeDesc>> filters = desc.getFilters(); - Map<Byte, List<ExprNodeDesc>> newFilters = new HashMap<Byte, List<ExprNodeDesc>>(); - for (Map.Entry<Byte, List<ExprNodeDesc>> entry : filters.entrySet()) { - byte srcTag = entry.getKey(); - List<ExprNodeDesc> filter = entry.getValue(); - - Operator<?> terminal = oldReduceSinkParentOps.get(srcTag); - newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, op, terminal)); - } - desc.setFilters(filters = newFilters); - - // create dumpfile prefix needed to create descriptor - String dumpFilePrefix = ""; - if( joinTree.getMapAliases() != null ) { - for(String mapAlias : joinTree.getMapAliases()) { - dumpFilePrefix = dumpFilePrefix + mapAlias; - } - dumpFilePrefix = dumpFilePrefix+"-"+PlanUtils.getCountForMapJoinDumpFilePrefix(); - } else { - dumpFilePrefix = "mapfile"+PlanUtils.getCountForMapJoinDumpFilePrefix(); - } - - List<ExprNodeDesc> keyCols = keyExprMap.get((byte)mapJoinPos); - - List<String> outputColumnNames = op.getConf().getOutputColumnNames(); - TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(hconf, - PlanUtils.getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX)); - JoinCondDesc[] joinCondns = op.getConf().getConds(); - MapJoinDesc mapJoinDescriptor = new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs, - valueTableDescs, valueFilteredTableDescs, outputColumnNames, mapJoinPos, joinCondns, - filters, op.getConf().getNoOuterJoin(), dumpFilePrefix); - mapJoinDescriptor.setStatistics(op.getConf().getStatistics()); - mapJoinDescriptor.setTagOrder(tagOrder); - mapJoinDescriptor.setNullSafes(desc.getNullSafes()); - mapJoinDescriptor.setFilterMap(desc.getFilterMap()); - if (!valueIndices.isEmpty()) { - mapJoinDescriptor.setValueIndices(valueIndices); - } + MapJoinDesc mapJoinDescriptor = + getMapJoinDesc(hconf, opParseCtxMap, op, joinTree, mapJoinPos, noCheckOuterJoin); // reduce sink row resolver used to generate map join op RowResolver outputRS = opParseCtxMap.get(op).getRowResolver(); @@ -551,6 +402,7 @@ public class MapJoinProcessor implements opParseCtxMap.put(mapJoinOp, ctx); mapJoinOp.getConf().setReversedExprs(op.getConf().getReversedExprs()); + Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap(); mapJoinOp.setColumnExprMap(colExprMap); List<Operator<? extends OperatorDesc>> childOps = op.getChildOperators(); @@ -1176,4 +1028,168 @@ public class MapJoinProcessor implements } } + + public static MapJoinDesc getMapJoinDesc(HiveConf hconf, + LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap, + JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException { + JoinDesc desc = op.getConf(); + JoinCondDesc[] condns = desc.getConds(); + Byte[] tagOrder = desc.getTagOrder(); + + // outer join cannot be performed on a table which is being cached + if (!noCheckOuterJoin) { + if (checkMapJoin(mapJoinPos, condns) < 0) { + throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg()); + } + } + + // Walk over all the sources (which are guaranteed to be reduce sink + // operators). + // The join outputs a concatenation of all the inputs. + QBJoinTree leftSrc = joinTree.getJoinSrc(); + List<ReduceSinkOperator> oldReduceSinkParentOps = + new ArrayList<ReduceSinkOperator>(op.getNumParent()); + if (leftSrc != null) { + // assert mapJoinPos == 0; + Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(0); + assert parentOp.getParentOperators().size() == 1; + oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp); + } + + byte pos = 0; + for (String src : joinTree.getBaseSrc()) { + if (src != null) { + Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(pos); + assert parentOp.getParentOperators().size() == 1; + oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp); + } + pos++; + } + + Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap(); + List<ColumnInfo> schema = new ArrayList<ColumnInfo>(op.getSchema().getSignature()); + Map<Byte, List<ExprNodeDesc>> valueExprs = op.getConf().getExprs(); + Map<Byte, List<ExprNodeDesc>> newValueExprs = new HashMap<Byte, List<ExprNodeDesc>>(); + for (Map.Entry<Byte, List<ExprNodeDesc>> entry : valueExprs.entrySet()) { + byte tag = entry.getKey(); + Operator<?> terminal = oldReduceSinkParentOps.get(tag); + + List<ExprNodeDesc> values = entry.getValue(); + List<ExprNodeDesc> newValues = ExprNodeDescUtils.backtrack(values, op, terminal); + newValueExprs.put(tag, newValues); + for (int i = 0; i < schema.size(); i++) { + ColumnInfo column = schema.get(i); + if (column == null) { + continue; + } + ExprNodeDesc expr = colExprMap.get(column.getInternalName()); + int index = ExprNodeDescUtils.indexOf(expr, values); + if (index >= 0) { + colExprMap.put(column.getInternalName(), newValues.get(index)); + schema.set(i, null); + } + } + } + + // rewrite value index for mapjoin + Map<Byte, int[]> valueIndices = new HashMap<Byte, int[]>(); + + // get the join keys from old parent ReduceSink operators + Map<Byte, List<ExprNodeDesc>> keyExprMap = new HashMap<Byte, List<ExprNodeDesc>>(); + + // construct valueTableDescs and valueFilteredTableDescs + List<TableDesc> valueTableDescs = new ArrayList<TableDesc>(); + List<TableDesc> valueFilteredTableDescs = new ArrayList<TableDesc>(); + int[][] filterMap = desc.getFilterMap(); + for (pos = 0; pos < op.getParentOperators().size(); pos++) { + ReduceSinkOperator inputRS = oldReduceSinkParentOps.get(pos); + List<ExprNodeDesc> keyCols = inputRS.getConf().getKeyCols(); + List<ExprNodeDesc> valueCols = newValueExprs.get(pos); + if (pos != mapJoinPos) { + // remove values in key exprs for value table schema + // value expression for hashsink will be modified in + // LocalMapJoinProcessor + int[] valueIndex = new int[valueCols.size()]; + List<ExprNodeDesc> valueColsInValueExpr = new ArrayList<ExprNodeDesc>(); + for (int i = 0; i < valueIndex.length; i++) { + ExprNodeDesc expr = valueCols.get(i); + int kindex = ExprNodeDescUtils.indexOf(expr, keyCols); + if (kindex >= 0) { + valueIndex[i] = kindex; + } else { + valueIndex[i] = -valueColsInValueExpr.size() - 1; + valueColsInValueExpr.add(expr); + } + } + if (needValueIndex(valueIndex)) { + valueIndices.put(pos, valueIndex); + } + valueCols = valueColsInValueExpr; + } + // deep copy expr node desc + List<ExprNodeDesc> valueFilteredCols = ExprNodeDescUtils.clone(valueCols); + if (filterMap != null && filterMap[pos] != null && pos != mapJoinPos) { + ExprNodeColumnDesc isFilterDesc = + new ExprNodeColumnDesc( + TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.SMALLINT_TYPE_NAME), "filter", + "filter", false); + valueFilteredCols.add(isFilterDesc); + } + + TableDesc valueTableDesc = + PlanUtils.getMapJoinValueTableDesc(PlanUtils.getFieldSchemasFromColumnList(valueCols, + "mapjoinvalue")); + TableDesc valueFilteredTableDesc = + PlanUtils.getMapJoinValueTableDesc(PlanUtils.getFieldSchemasFromColumnList( + valueFilteredCols, "mapjoinvalue")); + + valueTableDescs.add(valueTableDesc); + valueFilteredTableDescs.add(valueFilteredTableDesc); + + keyExprMap.put(pos, keyCols); + } + + Map<Byte, List<ExprNodeDesc>> filters = desc.getFilters(); + Map<Byte, List<ExprNodeDesc>> newFilters = new HashMap<Byte, List<ExprNodeDesc>>(); + for (Map.Entry<Byte, List<ExprNodeDesc>> entry : filters.entrySet()) { + byte srcTag = entry.getKey(); + List<ExprNodeDesc> filter = entry.getValue(); + + Operator<?> terminal = oldReduceSinkParentOps.get(srcTag); + newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, op, terminal)); + } + desc.setFilters(filters = newFilters); + + // create dumpfile prefix needed to create descriptor + String dumpFilePrefix = ""; + if (joinTree.getMapAliases() != null) { + for (String mapAlias : joinTree.getMapAliases()) { + dumpFilePrefix = dumpFilePrefix + mapAlias; + } + dumpFilePrefix = dumpFilePrefix + "-" + PlanUtils.getCountForMapJoinDumpFilePrefix(); + } else { + dumpFilePrefix = "mapfile" + PlanUtils.getCountForMapJoinDumpFilePrefix(); + } + + List<ExprNodeDesc> keyCols = keyExprMap.get((byte) mapJoinPos); + + List<String> outputColumnNames = op.getConf().getOutputColumnNames(); + TableDesc keyTableDesc = + PlanUtils.getMapJoinKeyTableDesc(hconf, + PlanUtils.getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX)); + JoinCondDesc[] joinCondns = op.getConf().getConds(); + MapJoinDesc mapJoinDescriptor = + new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs, valueTableDescs, + valueFilteredTableDescs, outputColumnNames, mapJoinPos, joinCondns, filters, op + .getConf().getNoOuterJoin(), dumpFilePrefix); + mapJoinDescriptor.setStatistics(op.getConf().getStatistics()); + mapJoinDescriptor.setTagOrder(tagOrder); + mapJoinDescriptor.setNullSafes(desc.getNullSafes()); + mapJoinDescriptor.setFilterMap(desc.getFilterMap()); + if (!valueIndices.isEmpty()) { + mapJoinDescriptor.setValueIndices(valueIndices); + } + + return mapJoinDescriptor; + } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1629563&r1=1629562&r2=1629563&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Mon Oct 6 04:00:39 2014 @@ -51,7 +51,12 @@ public class Optimizer { * @param hiveConf */ public void initialize(HiveConf hiveConf) { + + boolean isTezExecEngine = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez"); + boolean bucketMapJoinOptimizer = false; + transformations = new ArrayList<Transform>(); + // Add the transformation that computes the lineage information. transformations.add(new Generator()); if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTPPD)) { @@ -81,15 +86,16 @@ public class Optimizer { } transformations.add(new SamplePruner()); transformations.add(new MapJoinProcessor()); - boolean bucketMapJoinOptimizer = false; - if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN)) { + + if ((HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN)) && !isTezExecEngine) { transformations.add(new BucketMapJoinOptimizer()); bucketMapJoinOptimizer = true; } // If optimize hive.optimize.bucketmapjoin.sortedmerge is set, add both // BucketMapJoinOptimizer and SortedMergeBucketMapJoinOptimizer - if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)) { + if ((HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)) + && !isTezExecEngine) { if (!bucketMapJoinOptimizer) { // No need to add BucketMapJoinOptimizer twice transformations.add(new BucketMapJoinOptimizer()); @@ -119,7 +125,7 @@ public class Optimizer { if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCORRELATION) && !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEGROUPBYSKEW) && !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME) && - !HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + !isTezExecEngine) { transformations.add(new CorrelationOptimizer()); } if (HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVELIMITPUSHDOWNMEMORYUSAGE) > 0) { @@ -128,8 +134,7 @@ public class Optimizer { if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES)) { transformations.add(new StatsOptimizer()); } - String execEngine = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE); - if ((pctx.getContext().getExplain() || "spark".equals(execEngine)) && !"tez".equals(execEngine)) { + if (pctx.getContext().getExplain() && !isTezExecEngine) { transformations.add(new AnnotateWithStatistics()); transformations.add(new AnnotateWithOpTraits()); } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java?rev=1629563&r1=1629562&r2=1629563&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java Mon Oct 6 04:00:39 2014 @@ -52,6 +52,7 @@ import org.apache.hadoop.hive.ql.plan.Ta import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.plan.TezWork.VertexType; import org.apache.hadoop.hive.ql.stats.StatsUtils; public class ReduceSinkMapJoinProc implements NodeProcessor { @@ -183,7 +184,10 @@ public class ReduceSinkMapJoinProc imple TezWork tezWork = context.currentTask.getWork(); LOG.debug("connecting "+parentWork.getName()+" with "+myWork.getName()); tezWork.connect(parentWork, myWork, edgeProp); - + if (edgeType == EdgeType.CUSTOM_EDGE) { + tezWork.setVertexType(myWork, VertexType.INITIALIZED_EDGES); + } + ReduceSinkOperator r = null; if (parentRS.getConf().getOutputName() != null) { LOG.debug("Cloning reduce sink for multi-child broadcast edge"); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java?rev=1629563&r1=1629562&r2=1629563&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java Mon Oct 6 04:00:39 2014 @@ -44,9 +44,9 @@ import org.apache.hadoop.hive.ql.exec.Ut import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat; import org.apache.hadoop.hive.ql.io.HiveInputFormat; -import org.apache.hadoop.hive.ql.metadata.InputEstimator; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.ql.metadata.InputEstimator; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; @@ -55,13 +55,25 @@ import org.apache.hadoop.hive.ql.parse.P import org.apache.hadoop.hive.ql.parse.QB; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.SplitSample; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.ListSinkDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.SelectDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToBinary; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToChar; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToDate; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToDecimal; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUnixTimeStamp; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUtcTimestamp; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToVarchar; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; @@ -73,9 +85,11 @@ public class SimpleFetchOptimizer implem private final Log LOG = LogFactory.getLog(SimpleFetchOptimizer.class.getName()); + @Override public ParseContext transform(ParseContext pctx) throws SemanticException { Map<String, Operator<? extends OperatorDesc>> topOps = pctx.getTopOps(); - if (pctx.getQB().isSimpleSelectQuery() && topOps.size() == 1) { + if (pctx.getQB().getIsQuery() && !pctx.getQB().getParseInfo().isAnalyzeCommand() + && topOps.size() == 1) { // no join, no groupby, no distinct, no lateral view, no subq, // no CTAS or insert, not analyze command, and single sourced. String alias = (String) pctx.getTopOps().keySet().toArray()[0]; @@ -144,7 +158,7 @@ public class SimpleFetchOptimizer implem // for non-aggressive mode (minimal) // 1. samping is not allowed // 2. for partitioned table, all filters should be targeted to partition column - // 3. SelectOperator should be select star + // 3. SelectOperator should use only simple cast/column access private FetchData checkTree(boolean aggressive, ParseContext pctx, String alias, TableScanOperator ts) throws HiveException { SplitSample splitSample = pctx.getNameToSplitSample().get(alias); @@ -156,7 +170,7 @@ public class SimpleFetchOptimizer implem return null; } - Table table = qb.getMetaData().getAliasToTable().get(alias); + Table table = pctx.getTopToTable().get(ts); if (table == null) { return null; } @@ -181,34 +195,71 @@ public class SimpleFetchOptimizer implem return null; } - private FetchData checkOperators(FetchData fetch, TableScanOperator ts, boolean aggresive, + private FetchData checkOperators(FetchData fetch, TableScanOperator ts, boolean aggressive, boolean bypassFilter) { if (ts.getChildOperators().size() != 1) { return null; } Operator<?> op = ts.getChildOperators().get(0); for (; ; op = op.getChildOperators().get(0)) { - if (aggresive) { - if (!(op instanceof LimitOperator || op instanceof FilterOperator - || op instanceof SelectOperator)) { + if (op instanceof SelectOperator) { + if (!aggressive) { + if (!checkExpressions((SelectOperator) op)) { + break; + } + } + continue; + } + + if (aggressive) { + if (!(op instanceof LimitOperator || op instanceof FilterOperator)) { break; } - } else if (!(op instanceof LimitOperator || (op instanceof FilterOperator && bypassFilter) - || (op instanceof SelectOperator && ((SelectOperator) op).getConf().isSelectStar()))) { + } else if (!(op instanceof LimitOperator || (op instanceof FilterOperator && bypassFilter))) { break; } + if (op.getChildOperators() == null || op.getChildOperators().size() != 1) { return null; } } + if (op instanceof FileSinkOperator) { fetch.scanOp = ts; fetch.fileSink = op; return fetch; } + return null; } + private boolean checkExpressions(SelectOperator op) { + SelectDesc desc = op.getConf(); + for (ExprNodeDesc expr : desc.getColList()) { + if (!checkExpression(expr)) { + return false; + } + } + return true; + } + + private boolean checkExpression(ExprNodeDesc expr) { + if (expr instanceof ExprNodeConstantDesc || expr instanceof ExprNodeColumnDesc) { + return true; + } + + if (expr instanceof ExprNodeGenericFuncDesc) { + GenericUDF udf = ((ExprNodeGenericFuncDesc) expr).getGenericUDF(); + if (udf instanceof GenericUDFToBinary || udf instanceof GenericUDFToChar + || udf instanceof GenericUDFToDate || udf instanceof GenericUDFToDecimal + || udf instanceof GenericUDFToUnixTimeStamp || udf instanceof GenericUDFToUtcTimestamp + || udf instanceof GenericUDFToVarchar) { + return expr.getChildren().size() == 1 && checkExpression(expr.getChildren().get(0)); + } + } + return false; + } + private class FetchData { private final ReadEntity parent; @@ -240,7 +291,7 @@ public class SimpleFetchOptimizer implem this.splitSample = splitSample; this.onlyPruningFilter = bypassFilter; } - + /* * all filters were executed during partition pruning */ @@ -251,7 +302,7 @@ public class SimpleFetchOptimizer implem private FetchWork convertToWork() throws HiveException { inputs.clear(); if (!table.isPartitioned()) { - inputs.add(new ReadEntity(table, parent)); + inputs.add(new ReadEntity(table, parent, parent == null)); FetchWork work = new FetchWork(table.getPath(), Utilities.getTableDesc(table)); PlanUtils.configureInputJobPropertiesForStorageHandler(work.getTblDesc()); work.setSplitSample(splitSample); @@ -261,12 +312,12 @@ public class SimpleFetchOptimizer implem List<PartitionDesc> partP = new ArrayList<PartitionDesc>(); for (Partition partition : partsList.getNotDeniedPartns()) { - inputs.add(new ReadEntity(partition, parent)); + inputs.add(new ReadEntity(partition, parent, parent == null)); listP.add(partition.getDataLocation()); partP.add(Utilities.getPartitionDesc(partition)); } Table sourceTable = partsList.getSourceTable(); - inputs.add(new ReadEntity(sourceTable, parent)); + inputs.add(new ReadEntity(sourceTable, parent, parent == null)); TableDesc table = Utilities.getTableDesc(sourceTable); FetchWork work = new FetchWork(listP, partP, table); if (!work.getPartDesc().isEmpty()) {
