Modified: hive/branches/spark/metastore/src/java/org/apache/hadoop/hive/metastore/RawStoreProxy.java URL: http://svn.apache.org/viewvc/hive/branches/spark/metastore/src/java/org/apache/hadoop/hive/metastore/RawStoreProxy.java?rev=1615452&r1=1615451&r2=1615452&view=diff ============================================================================== --- hive/branches/spark/metastore/src/java/org/apache/hadoop/hive/metastore/RawStoreProxy.java (original) +++ hive/branches/spark/metastore/src/java/org/apache/hadoop/hive/metastore/RawStoreProxy.java Sun Aug 3 20:48:35 2014 @@ -26,8 +26,6 @@ import java.lang.reflect.UndeclaredThrow import java.util.List; import org.apache.commons.lang.ClassUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; @@ -55,7 +53,7 @@ public class RawStoreProxy implements In // This has to be called before initializing the instance of RawStore init(); - this.base = (RawStore) ReflectionUtils.newInstance(rawStoreClass, conf); + this.base = ReflectionUtils.newInstance(rawStoreClass, conf); } public static RawStore getProxy(HiveConf hiveConf, Configuration conf, String rawStoreClassName, @@ -96,14 +94,6 @@ public class RawStoreProxy implements In public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Object ret = null; - boolean reloadConf = HiveConf.getBoolVar(hiveConf, - HiveConf.ConfVars.METASTOREFORCERELOADCONF); - - if (reloadConf) { - MetaStoreInit.updateConnectionURL(hiveConf, getConf(), null, metaStoreInitData); - initMS(); - } - try { ret = method.invoke(base, args); } catch (UndeclaredThrowableException e) {
Modified: hive/branches/spark/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java URL: http://svn.apache.org/viewvc/hive/branches/spark/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java?rev=1615452&r1=1615451&r2=1615452&view=diff ============================================================================== --- hive/branches/spark/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java (original) +++ hive/branches/spark/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java Sun Aug 3 20:48:35 2014 @@ -17,17 +17,24 @@ */ package org.apache.hadoop.hive.metastore.txn; -import com.jolbox.bonecp.BoneCP; import com.jolbox.bonecp.BoneCPConfig; +import com.jolbox.bonecp.BoneCPDataSource; +import org.apache.commons.dbcp.ConnectionFactory; +import org.apache.commons.dbcp.DriverManagerConnectionFactory; +import org.apache.commons.dbcp.PoolableConnectionFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.commons.dbcp.PoolingDataSource; +import org.apache.commons.pool.ObjectPool; +import org.apache.commons.pool.impl.GenericObjectPool; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidTxnListImpl; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.util.StringUtils; +import javax.sql.DataSource; import java.sql.*; import java.util.*; @@ -65,9 +72,9 @@ public class TxnHandler { static final private int ALLOWED_REPEATED_DEADLOCKS = 5; static final private Log LOG = LogFactory.getLog(TxnHandler.class.getName()); - static private BoneCP connPool; - private static final Boolean lockLock = new Boolean("true"); // Random object to lock on for the - // lock method + static private DataSource connPool; + private static Boolean lockLock = new Boolean("true"); // Random object to lock on for the lock + // method /** * Number of consecutive deadlocks we have seen @@ -1596,14 +1603,28 @@ public class TxnHandler { String driverUrl = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORECONNECTURLKEY); String user = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME); String passwd = HiveConf.getVar(conf, HiveConf.ConfVars.METASTOREPWD); + String connectionPooler = HiveConf.getVar(conf, + HiveConf.ConfVars.METASTORE_CONNECTION_POOLING_TYPE).toLowerCase(); - BoneCPConfig config = new BoneCPConfig(); - config.setJdbcUrl(driverUrl); - config.setMaxConnectionsPerPartition(10); - config.setPartitionCount(1); - config.setUser(user); - config.setPassword(passwd); - connPool = new BoneCP(config); + if ("bonecp".equals(connectionPooler)) { + BoneCPConfig config = new BoneCPConfig(); + config.setJdbcUrl(driverUrl); + config.setMaxConnectionsPerPartition(10); + config.setPartitionCount(1); + config.setUser(user); + config.setPassword(passwd); + connPool = new BoneCPDataSource(config); + } else if ("dbcp".equals(connectionPooler)) { + ObjectPool objectPool = new GenericObjectPool(); + ConnectionFactory connFactory = new DriverManagerConnectionFactory(driverUrl, user, passwd); + // This doesn't get used, but it's still necessary, see + // http://svn.apache.org/viewvc/commons/proper/dbcp/branches/DBCP_1_4_x_BRANCH/doc/ManualPoolingDataSourceExample.java?view=markup + PoolableConnectionFactory poolConnFactory = + new PoolableConnectionFactory(connFactory, objectPool, null, null, false, true); + connPool = new PoolingDataSource(objectPool); + } else { + throw new RuntimeException("Unknown JDBC connection pooling " + connectionPooler); + } } private static synchronized void buildJumpTable() { Modified: hive/branches/spark/pom.xml URL: http://svn.apache.org/viewvc/hive/branches/spark/pom.xml?rev=1615452&r1=1615451&r2=1615452&view=diff ============================================================================== --- hive/branches/spark/pom.xml (original) +++ hive/branches/spark/pom.xml Sun Aug 3 20:48:35 2014 @@ -104,6 +104,8 @@ <commons-lang.version>2.4</commons-lang.version> <commons-lang3.version>3.1</commons-lang3.version> <commons-logging.version>1.1.3</commons-logging.version> + <commons-pool.version>1.5.4</commons-pool.version> + <commons-dbcp.version>1.4</commons-dbcp.version> <derby.version>10.10.1.1</derby.version> <guava.version>14.0.1</guava.version> <groovy.version>2.1.6</groovy.version> Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Context.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Context.java?rev=1615452&r1=1615451&r2=1615452&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Context.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Context.java Sun Aug 3 20:48:35 2014 @@ -39,7 +39,6 @@ import org.apache.hadoop.hive.ql.lockmgr import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import java.io.DataInput; @@ -54,8 +53,6 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; -import javax.security.auth.login.LoginException; - /** * Context for Semantic Analyzers. Usage: not reusable - construct a new one for * each query should call clear() at end of use to remove temporary folders @@ -337,7 +334,14 @@ public class Context { * external URI to which the tmp data has to be eventually moved * @return next available tmp path on the file system corresponding extURI */ - public Path getExternalTmpPath(URI extURI) { + public Path getExternalTmpPath(Path path) { + URI extURI = path.toUri(); + if (extURI.getScheme().equals("viewfs")) { + // if we are on viewfs we don't want to use /tmp as tmp dir since rename from /tmp/.. + // to final /user/hive/warehouse/ will fail later, so instead pick tmp dir + // on same namespace as tbl dir. + return getExtTmpPathRelTo(path.getParent()); + } return new Path(getExternalScratchDir(extURI), EXT_PREFIX + nextPathId()); } @@ -347,7 +351,8 @@ public class Context { * within passed in uri, whereas getExternalTmpPath() ignores passed in path and returns temp * path within /tmp */ - public Path getExtTmpPathRelTo(URI uri) { + public Path getExtTmpPathRelTo(Path path) { + URI uri = path.toUri(); return new Path (getScratchDir(uri.getScheme(), uri.getAuthority(), !explain, uri.getPath() + Path.SEPARATOR + "_" + this.executionId), EXT_PREFIX + nextPathId()); } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Driver.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1615452&r1=1615451&r2=1615452&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Sun Aug 3 20:48:35 2014 @@ -82,6 +82,7 @@ import org.apache.hadoop.hive.ql.metadat import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.ColumnAccessInfo; import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHook; import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext; import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContextImpl; @@ -503,8 +504,13 @@ public class Driver implements CommandPr Hive db = sem.getDb(); if (ss.isAuthorizationModeV2()) { - doAuthorizationV2(ss, op, inputs, outputs, command); - return; + // get mapping of tables to columns used + ColumnAccessInfo colAccessInfo = sem.getColumnAccessInfo(); + // colAccessInfo is set only in case of SemanticAnalyzer + Map<String, Set<String>> tab2Cols = colAccessInfo != null ? colAccessInfo + .getTableToColumnAccessMap() : null; + doAuthorizationV2(ss, op, inputs, outputs, command, tab2Cols); + return; } if (op == null) { throw new HiveException("Operation should not be null"); @@ -583,56 +589,9 @@ public class Driver implements CommandPr } } - //for a select or create-as-select query, populate the partition to column (par2Cols) or - // table to columns mapping (tab2Cols) - if (op.equals(HiveOperation.CREATETABLE_AS_SELECT) - || op.equals(HiveOperation.QUERY)) { - SemanticAnalyzer querySem = (SemanticAnalyzer) sem; - ParseContext parseCtx = querySem.getParseContext(); - Map<TableScanOperator, Table> tsoTopMap = parseCtx.getTopToTable(); - - for (Map.Entry<String, Operator<? extends OperatorDesc>> topOpMap : querySem - .getParseContext().getTopOps().entrySet()) { - Operator<? extends OperatorDesc> topOp = topOpMap.getValue(); - if (topOp instanceof TableScanOperator - && tsoTopMap.containsKey(topOp)) { - TableScanOperator tableScanOp = (TableScanOperator) topOp; - Table tbl = tsoTopMap.get(tableScanOp); - List<Integer> neededColumnIds = tableScanOp.getNeededColumnIDs(); - List<FieldSchema> columns = tbl.getCols(); - List<String> cols = new ArrayList<String>(); - for (int i = 0; i < neededColumnIds.size(); i++) { - cols.add(columns.get(neededColumnIds.get(i)).getName()); - } - //map may not contain all sources, since input list may have been optimized out - //or non-existent tho such sources may still be referenced by the TableScanOperator - //if it's null then the partition probably doesn't exist so let's use table permission - if (tbl.isPartitioned() && - tableUsePartLevelAuth.get(tbl.getTableName()) == Boolean.TRUE) { - String alias_id = topOpMap.getKey(); - - PrunedPartitionList partsList = PartitionPruner.prune(tableScanOp, - parseCtx, alias_id); - Set<Partition> parts = partsList.getPartitions(); - for (Partition part : parts) { - List<String> existingCols = part2Cols.get(part); - if (existingCols == null) { - existingCols = new ArrayList<String>(); - } - existingCols.addAll(cols); - part2Cols.put(part, existingCols); - } - } else { - List<String> existingCols = tab2Cols.get(tbl); - if (existingCols == null) { - existingCols = new ArrayList<String>(); - } - existingCols.addAll(cols); - tab2Cols.put(tbl, existingCols); - } - } - } - } + getTablePartitionUsedColumns(op, sem, tab2Cols, part2Cols, tableUsePartLevelAuth); + + // cache the results for table authorization Set<String> tableAuthChecked = new HashSet<String>(); @@ -683,8 +642,65 @@ public class Driver implements CommandPr } } + private static void getTablePartitionUsedColumns(HiveOperation op, BaseSemanticAnalyzer sem, + Map<Table, List<String>> tab2Cols, Map<Partition, List<String>> part2Cols, + Map<String, Boolean> tableUsePartLevelAuth) throws HiveException { + // for a select or create-as-select query, populate the partition to column + // (par2Cols) or + // table to columns mapping (tab2Cols) + if (op.equals(HiveOperation.CREATETABLE_AS_SELECT) + || op.equals(HiveOperation.QUERY)) { + SemanticAnalyzer querySem = (SemanticAnalyzer) sem; + ParseContext parseCtx = querySem.getParseContext(); + Map<TableScanOperator, Table> tsoTopMap = parseCtx.getTopToTable(); + + for (Map.Entry<String, Operator<? extends OperatorDesc>> topOpMap : querySem + .getParseContext().getTopOps().entrySet()) { + Operator<? extends OperatorDesc> topOp = topOpMap.getValue(); + if (topOp instanceof TableScanOperator + && tsoTopMap.containsKey(topOp)) { + TableScanOperator tableScanOp = (TableScanOperator) topOp; + Table tbl = tsoTopMap.get(tableScanOp); + List<Integer> neededColumnIds = tableScanOp.getNeededColumnIDs(); + List<FieldSchema> columns = tbl.getCols(); + List<String> cols = new ArrayList<String>(); + for (int i = 0; i < neededColumnIds.size(); i++) { + cols.add(columns.get(neededColumnIds.get(i)).getName()); + } + //map may not contain all sources, since input list may have been optimized out + //or non-existent tho such sources may still be referenced by the TableScanOperator + //if it's null then the partition probably doesn't exist so let's use table permission + if (tbl.isPartitioned() && + tableUsePartLevelAuth.get(tbl.getTableName()) == Boolean.TRUE) { + String alias_id = topOpMap.getKey(); + + PrunedPartitionList partsList = PartitionPruner.prune(tableScanOp, + parseCtx, alias_id); + Set<Partition> parts = partsList.getPartitions(); + for (Partition part : parts) { + List<String> existingCols = part2Cols.get(part); + if (existingCols == null) { + existingCols = new ArrayList<String>(); + } + existingCols.addAll(cols); + part2Cols.put(part, existingCols); + } + } else { + List<String> existingCols = tab2Cols.get(tbl); + if (existingCols == null) { + existingCols = new ArrayList<String>(); + } + existingCols.addAll(cols); + tab2Cols.put(tbl, existingCols); + } + } + } + } + + } + private static void doAuthorizationV2(SessionState ss, HiveOperation op, HashSet<ReadEntity> inputs, - HashSet<WriteEntity> outputs, String command) throws HiveException { + HashSet<WriteEntity> outputs, String command, Map<String, Set<String>> tab2cols) throws HiveException { HiveAuthzContext.Builder authzContextBuilder = new HiveAuthzContext.Builder(); @@ -696,11 +712,34 @@ public class Driver implements CommandPr HiveOperationType hiveOpType = getHiveOperationType(op); List<HivePrivilegeObject> inputsHObjs = getHivePrivObjects(inputs); + updateInputColumnInfo(inputsHObjs, tab2cols); + List<HivePrivilegeObject> outputHObjs = getHivePrivObjects(outputs); ss.getAuthorizerV2().checkPrivileges(hiveOpType, inputsHObjs, outputHObjs, authzContextBuilder.build()); return; } + /** + * Add column information for input table objects + * @param inputsHObjs input HivePrivilegeObject + * @param map table to used input columns mapping + */ + private static void updateInputColumnInfo(List<HivePrivilegeObject> inputsHObjs, + Map<String, Set<String>> tableName2Cols) { + if(tableName2Cols == null) { + return; + } + for(HivePrivilegeObject inputObj : inputsHObjs){ + if(inputObj.getType() != HivePrivilegeObjectType.TABLE_OR_VIEW){ + // input columns are relevant only for tables or views + continue; + } + Set<String> cols = tableName2Cols.get(Table.getCompleteName(inputObj.getDbname(), + inputObj.getObjectName())); + inputObj.setColumns(cols); + } + } + private static List<HivePrivilegeObject> getHivePrivObjects(HashSet<? extends Entity> privObjects) { List<HivePrivilegeObject> hivePrivobjs = new ArrayList<HivePrivilegeObject>(); if(privObjects == null){ @@ -1213,7 +1252,8 @@ public class Driver implements CommandPr } resStream = null; - HookContext hookContext = new HookContext(plan, conf, ctx.getPathToCS()); + SessionState ss = SessionState.get(); + HookContext hookContext = new HookContext(plan, conf, ctx.getPathToCS(), ss.getUserName(), ss.getUserIpAddress()); hookContext.setHookType(HookContext.HookType.PRE_EXEC_HOOK); for (Hook peh : getHooks(HiveConf.ConfVars.PREEXECHOOKS)) { Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=1615452&r1=1615451&r2=1615452&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Sun Aug 3 20:48:35 2014 @@ -35,6 +35,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -88,9 +89,9 @@ import org.apache.hadoop.hive.ql.QueryPl import org.apache.hadoop.hive.ql.exec.ArchiveUtils.PartSpecInfo; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.io.merge.MergeTask; +import org.apache.hadoop.hive.ql.io.merge.MergeWork; import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; -import org.apache.hadoop.hive.ql.io.rcfile.merge.BlockMergeTask; -import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork; import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateTask; import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateWork; import org.apache.hadoop.hive.ql.lockmgr.DbLockManager; @@ -550,12 +551,13 @@ public class DDLTask extends Task<DDLWor throws HiveException { // merge work only needs input and output. MergeWork mergeWork = new MergeWork(mergeFilesDesc.getInputDir(), - mergeFilesDesc.getOutputDir()); + mergeFilesDesc.getOutputDir(), mergeFilesDesc.getInputFormatClass()); mergeWork.setListBucketingCtx(mergeFilesDesc.getLbCtx()); mergeWork.resolveConcatenateMerge(db.getConf()); mergeWork.setMapperCannotSpanPartns(true); + mergeWork.setSourceTableInputFormat(mergeFilesDesc.getInputFormatClass()); DriverContext driverCxt = new DriverContext(); - BlockMergeTask taskExec = new BlockMergeTask(); + MergeTask taskExec = new MergeTask(); taskExec.initialize(db.getConf(), null, driverCxt); taskExec.setWork(mergeWork); taskExec.setQueryPlan(this.getQueryPlan()); @@ -598,10 +600,13 @@ public class DDLTask extends Task<DDLWor HiveAuthorizer authorizer = getSessionAuthorizer(); try { + Set<String> colSet = showGrantDesc.getColumns() != null ? new HashSet<String>( + showGrantDesc.getColumns()) : null; List<HivePrivilegeInfo> privInfos = authorizer.showPrivileges( AuthorizationUtils.getHivePrincipal(showGrantDesc.getPrincipalDesc()), AuthorizationUtils.getHivePrivilegeObject(showGrantDesc.getHiveObj(), - showGrantDesc.getColumns())); + colSet + )); boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST); writeToFile(writeGrantInfo(privInfos, testMode), showGrantDesc.getResFile()); } catch (IOException e) { @@ -1283,7 +1288,7 @@ public class DDLTask extends Task<DDLWor // First create the archive in a tmp dir so that if the job fails, the // bad files don't pollute the filesystem Path tmpPath = new Path(driverContext.getCtx() - .getExternalTmpPath(originalDir.toUri()), "partlevel"); + .getExternalTmpPath(originalDir), "partlevel"); console.printInfo("Creating " + archiveName + " for " + originalDir.toString()); @@ -1478,7 +1483,7 @@ public class DDLTask extends Task<DDLWor throw new HiveException("Haven't found any archive where it should be"); } - Path tmpPath = driverContext.getCtx().getExternalTmpPath(originalDir.toUri()); + Path tmpPath = driverContext.getCtx().getExternalTmpPath(originalDir); try { fs = tmpPath.getFileSystem(conf); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=1615452&r1=1615451&r2=1615452&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Sun Aug 3 20:48:35 2014 @@ -36,7 +36,7 @@ import org.apache.hadoop.hive.ql.exec.mr import org.apache.hadoop.hive.ql.hooks.LineageInfo.DataContainer; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; -import org.apache.hadoop.hive.ql.io.rcfile.merge.BlockMergeTask; +import org.apache.hadoop.hive.ql.io.merge.MergeTask; import org.apache.hadoop.hive.ql.lockmgr.HiveLock; import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj; @@ -294,7 +294,7 @@ public class MoveTask extends Task<MoveW while (task.getParentTasks() != null && task.getParentTasks().size() == 1) { task = (Task)task.getParentTasks().get(0); // If it was a merge task or a local map reduce task, nothing can be inferred - if (task instanceof BlockMergeTask || task instanceof MapredLocalTask) { + if (task instanceof MergeTask || task instanceof MapredLocalTask) { break; } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java?rev=1615452&r1=1615451&r2=1615452&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java Sun Aug 3 20:48:35 2014 @@ -165,7 +165,7 @@ public abstract class Task<T extends Ser } return retval; } catch (IOException e) { - throw new RuntimeException(e.getMessage()); + throw new RuntimeException("Unexpected error: " + e.getMessage(), e); } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java?rev=1615452&r1=1615451&r2=1615452&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java Sun Aug 3 20:48:35 2014 @@ -29,8 +29,8 @@ import org.apache.hadoop.hive.ql.exec.sp import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.index.IndexMetadataChangeTask; import org.apache.hadoop.hive.ql.index.IndexMetadataChangeWork; -import org.apache.hadoop.hive.ql.io.rcfile.merge.BlockMergeTask; -import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork; +import org.apache.hadoop.hive.ql.io.merge.MergeTask; +import org.apache.hadoop.hive.ql.io.merge.MergeWork; import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanTask; import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork; import org.apache.hadoop.hive.ql.plan.ColumnStatsWork; @@ -95,7 +95,7 @@ public final class TaskFactory { taskvec.add(new TaskTuple<StatsNoJobWork>(StatsNoJobWork.class, StatsNoJobTask.class)); taskvec.add(new TaskTuple<ColumnStatsWork>(ColumnStatsWork.class, ColumnStatsTask.class)); taskvec.add(new TaskTuple<MergeWork>(MergeWork.class, - BlockMergeTask.class)); + MergeTask.class)); taskvec.add(new TaskTuple<DependencyCollectionWork>(DependencyCollectionWork.class, DependencyCollectionTask.class)); taskvec.add(new TaskTuple<PartialScanWork>(PartialScanWork.class, Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1615452&r1=1615451&r2=1615452&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Sun Aug 3 20:48:35 2014 @@ -121,7 +121,8 @@ import org.apache.hadoop.hive.ql.io.Hive import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat; import org.apache.hadoop.hive.ql.io.RCFile; import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat; -import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork; +import org.apache.hadoop.hive.ql.io.merge.MergeWork; +import org.apache.hadoop.hive.ql.io.orc.OrcFileMergeMapper; import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileMergeMapper; import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanMapper; import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork; @@ -353,7 +354,8 @@ public final class Utilities { if(MAP_PLAN_NAME.equals(name)){ if (ExecMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))){ gWork = deserializePlan(in, MapWork.class, conf); - } else if(RCFileMergeMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) { + } else if(RCFileMergeMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS)) || + OrcFileMergeMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) { gWork = deserializePlan(in, MergeWork.class, conf); } else if(ColumnTruncateMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) { gWork = deserializePlan(in, ColumnTruncateWork.class, conf); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1615452&r1=1615451&r2=1615452&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Sun Aug 3 20:48:35 2014 @@ -499,7 +499,7 @@ public class ExecDriver extends Task<Map inputPaths.add(new Path(path)); } - Path tmpPath = context.getCtx().getExternalTmpPath(inputPaths.get(0).toUri()); + Path tmpPath = context.getCtx().getExternalTmpPath(inputPaths.get(0)); Path partitionFile = new Path(tmpPath, ".partitions"); ShimLoader.getHadoopShims().setTotalOrderPartitionFile(job, partitionFile); PartitionKeySampler sampler = new PartitionKeySampler(); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1615452&r1=1615451&r2=1615452&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java Sun Aug 3 20:48:35 2014 @@ -39,7 +39,6 @@ import org.apache.hadoop.hive.ql.metadat import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.serde2.Deserializer; -import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -56,7 +55,7 @@ import org.apache.hadoop.util.StringUtil /** * ExecReducer is the generic Reducer class for Hive. Together with ExecMapper it is * the bridge between the map-reduce framework and the Hive operator pipeline at - * execution time. It's main responsabilities are: + * execution time. It's main responsibilities are: * * - Load and setup the operator pipeline from XML * - Run the pipeline by transforming key, value pairs to records and forwarding them to the operators @@ -66,8 +65,20 @@ import org.apache.hadoop.util.StringUtil */ public class ExecReducer extends MapReduceBase implements Reducer { + private static final Log LOG = LogFactory.getLog("ExecReducer"); private static final String PLAN_KEY = "__REDUCE_PLAN__"; + // used to log memory usage periodically + private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + // Input value serde needs to be an array to support different SerDe + // for different tags + private final Deserializer[] inputValueDeserializer = new Deserializer[Byte.MAX_VALUE]; + private final Object[] valueObject = new Object[Byte.MAX_VALUE]; + private final List<Object> row = new ArrayList<Object>(Utilities.reduceFieldNameList.size()); + private final boolean isLogInfoEnabled = LOG.isInfoEnabled(); + + // TODO: move to DynamicSerDe when it's ready + private Deserializer inputKeyDeserializer; private JobConf jc; private OutputCollector<?, ?> oc; private Operator<?> reducer; @@ -76,23 +87,13 @@ public class ExecReducer extends MapRedu private boolean isTagged = false; private long cntr = 0; private long nextCntr = 1; - - public static final Log l4j = LogFactory.getLog("ExecReducer"); - private boolean isLogInfoEnabled = false; - - // used to log memory usage periodically - private MemoryMXBean memoryMXBean; - - // TODO: move to DynamicSerDe when it's ready - private Deserializer inputKeyDeserializer; - // Input value serde needs to be an array to support different SerDe - // for different tags - private final SerDe[] inputValueDeserializer = new SerDe[Byte.MAX_VALUE]; - - TableDesc keyTableDesc; - TableDesc[] valueTableDesc; - - ObjectInspector[] rowObjectInspector; + private TableDesc keyTableDesc; + private TableDesc[] valueTableDesc; + private ObjectInspector[] rowObjectInspector; + + // runtime objects + private transient Object keyObject; + private transient BytesWritable groupKey; @Override public void configure(JobConf job) { @@ -100,20 +101,16 @@ public class ExecReducer extends MapRedu ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; ObjectInspector keyObjectInspector; - // Allocate the bean at the beginning - - memoryMXBean = ManagementFactory.getMemoryMXBean(); - l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax()); - - isLogInfoEnabled = l4j.isInfoEnabled(); + LOG.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax()); try { - l4j.info("conf classpath = " + LOG.info("conf classpath = " + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs())); - l4j.info("thread classpath = " + LOG.info("thread classpath = " + Arrays.asList(((URLClassLoader) Thread.currentThread() .getContextClassLoader()).getURLs())); } catch (Exception e) { - l4j.info("cannot get classpath: " + e.getMessage()); + LOG.info("cannot get classpath: " + e.getMessage()); } jc = job; @@ -132,7 +129,7 @@ public class ExecReducer extends MapRedu isTagged = gWork.getNeedsTagging(); try { keyTableDesc = gWork.getKeyDesc(); - inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc + inputKeyDeserializer = ReflectionUtils.newInstance(keyTableDesc .getDeserializerClass(), null); SerDeUtils.initializeSerDe(inputKeyDeserializer, null, keyTableDesc.getProperties(), null); keyObjectInspector = inputKeyDeserializer.getObjectInspector(); @@ -140,7 +137,7 @@ public class ExecReducer extends MapRedu for (int tag = 0; tag < gWork.getTagToValueDesc().size(); tag++) { // We should initialize the SerDe with the TypeInfo when available. valueTableDesc[tag] = gWork.getTagToValueDesc().get(tag); - inputValueDeserializer[tag] = (SerDe) ReflectionUtils.newInstance( + inputValueDeserializer[tag] = ReflectionUtils.newInstance( valueTableDesc[tag].getDeserializerClass(), null); SerDeUtils.initializeSerDe(inputValueDeserializer[tag], null, valueTableDesc[tag].getProperties(), null); @@ -162,7 +159,7 @@ public class ExecReducer extends MapRedu // initialize reduce operator tree try { - l4j.info(reducer.dump(0)); + LOG.info(reducer.dump(0)); reducer.initialize(jc, rowObjectInspector); } catch (Throwable e) { abort = true; @@ -175,13 +172,6 @@ public class ExecReducer extends MapRedu } } - private Object keyObject; - private final Object[] valueObject = new Object[Byte.MAX_VALUE]; - - private BytesWritable groupKey; - - List<Object> row = new ArrayList<Object>(Utilities.reduceFieldNameList.size()); - public void reduce(Object key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { if (reducer.getDone()) { @@ -212,7 +202,7 @@ public class ExecReducer extends MapRedu groupKey = new BytesWritable(); } else { // If a operator wants to do some work at the end of a group - l4j.trace("End Group"); + LOG.trace("End Group"); reducer.endGroup(); } @@ -227,7 +217,7 @@ public class ExecReducer extends MapRedu } groupKey.set(keyWritable.get(), 0, keyWritable.getSize()); - l4j.trace("Start Group"); + LOG.trace("Start Group"); reducer.setGroupKeyObject(keyObject); reducer.startGroup(); } @@ -253,7 +243,7 @@ public class ExecReducer extends MapRedu cntr++; if (cntr == nextCntr) { long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed(); - l4j.info("ExecReducer: processing " + cntr + LOG.info("ExecReducer: processing " + cntr + " rows: used memory = " + used_memory); nextCntr = getNextCntr(cntr); } @@ -279,7 +269,7 @@ public class ExecReducer extends MapRedu // Don't create a new object if we are already out of memory throw (OutOfMemoryError) e; } else { - l4j.fatal(StringUtils.stringifyException(e)); + LOG.fatal(StringUtils.stringifyException(e)); throw new RuntimeException(e); } } @@ -301,17 +291,17 @@ public class ExecReducer extends MapRedu // No row was processed if (oc == null) { - l4j.trace("Close called no row"); + LOG.trace("Close called without any rows processed"); } try { if (groupKey != null) { // If a operator wants to do some work at the end of a group - l4j.trace("End Group"); + LOG.trace("End Group"); reducer.endGroup(); } if (isLogInfoEnabled) { - l4j.info("ExecReducer: processed " + cntr + " rows: used memory = " + LOG.info("ExecReducer: processed " + cntr + " rows: used memory = " + memoryMXBean.getHeapMemoryUsage().getUsed()); } @@ -322,7 +312,7 @@ public class ExecReducer extends MapRedu } catch (Exception e) { if (!abort) { // signal new failure to map-reduce - l4j.error("Hit error while closing operators - failing tree"); + LOG.error("Hit error while closing operators - failing tree"); throw new RuntimeException("Hive Runtime Error while closing operators: " + e.getMessage(), e); } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1615452&r1=1615451&r2=1615452&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Sun Aug 3 20:48:35 2014 @@ -16,9 +16,9 @@ * limitations under the License. */ package org.apache.hadoop.hive.ql.exec.tez; + import java.io.IOException; import java.text.NumberFormat; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -101,7 +101,7 @@ public class TezProcessor implements Log private void setupMRLegacyConfigs(TezProcessorContext processorContext) { // Hive "insert overwrite local directory" uses task id as dir name // Setting the id in jobconf helps to have the similar dir name as MR - StringBuilder taskAttemptIdBuilder = new StringBuilder("task"); + StringBuilder taskAttemptIdBuilder = new StringBuilder("attempt_"); taskAttemptIdBuilder.append(processorContext.getApplicationId().getClusterTimestamp()) .append("_") .append(jobIdFormat.format(processorContext.getApplicationId().getId())) Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringConcatColScalar.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringConcatColScalar.java?rev=1615452&r1=1615451&r2=1615452&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringConcatColScalar.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringConcatColScalar.java Sun Aug 3 20:48:35 2014 @@ -45,6 +45,11 @@ public class StringConcatColScalar exten @Override public void evaluate(VectorizedRowBatch batch) { + + if (childExpressions != null) { + super.evaluateChildren(batch); + } + BytesColumnVector inputColVector = (BytesColumnVector) batch.cols[colNum]; BytesColumnVector outV = (BytesColumnVector) batch.cols[outputColumn]; int[] sel = batch.selected; Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringConcatScalarCol.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringConcatScalarCol.java?rev=1615452&r1=1615451&r2=1615452&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringConcatScalarCol.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringConcatScalarCol.java Sun Aug 3 20:48:35 2014 @@ -45,6 +45,11 @@ public class StringConcatScalarCol exten @Override public void evaluate(VectorizedRowBatch batch) { + + if (childExpressions != null) { + super.evaluateChildren(batch); + } + BytesColumnVector inputColVector = (BytesColumnVector) batch.cols[colNum]; BytesColumnVector outV = (BytesColumnVector) batch.cols[outputColumn]; int[] sel = batch.selected; Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateString.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateString.java?rev=1615452&r1=1615451&r2=1615452&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateString.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateString.java Sun Aug 3 20:48:35 2014 @@ -18,6 +18,9 @@ package org.apache.hadoop.hive.ql.exec.vector.expressions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator; import org.apache.hadoop.io.Text; import java.sql.Date; @@ -25,6 +28,9 @@ import java.sql.Date; public class VectorUDFDateString extends StringUnaryUDF { private static final long serialVersionUID = 1L; + private static final Log LOG = LogFactory.getLog( + VectorUDFDateString.class.getName()); + public VectorUDFDateString(int colNum, int outputColumn) { super(colNum, outputColumn, new StringUnaryUDF.IUDFUnaryString() { Text t = new Text(); @@ -39,7 +45,9 @@ public class VectorUDFDateString extends t.set(date.toString()); return t; } catch (IllegalArgumentException e) { - e.printStackTrace(); + if (LOG.isDebugEnabled()) { + LOG.info("VectorUDFDateString passed bad string for Date.valueOf '" + s.toString() + "'"); + } return null; } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFTimestampFieldLong.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFTimestampFieldLong.java?rev=1615452&r1=1615451&r2=1615452&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFTimestampFieldLong.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFTimestampFieldLong.java Sun Aug 3 20:48:35 2014 @@ -88,6 +88,11 @@ public abstract class VectorUDFTimestamp @Override public void evaluate(VectorizedRowBatch batch) { + + if (childExpressions != null) { + super.evaluateChildren(batch); + } + LongColumnVector outV = (LongColumnVector) batch.cols[outputColumn]; LongColumnVector inputCol = (LongColumnVector)batch.cols[this.colNum]; /* every line below this is identical for evaluateLong & evaluateString */ Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java?rev=1615452&r1=1615451&r2=1615452&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java Sun Aug 3 20:48:35 2014 @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.hive.conf.HiveConf; @@ -51,13 +50,11 @@ public class HookContext { private UserGroupInformation ugi; private HookType hookType; final private Map<String, ContentSummary> inputPathToContentSummary; - - public HookContext(QueryPlan queryPlan, HiveConf conf) throws Exception{ - this(queryPlan, conf, new ConcurrentHashMap<String, ContentSummary>()); - } + private final String ipAddress; + private final String userName; public HookContext(QueryPlan queryPlan, HiveConf conf, - Map<String, ContentSummary> inputPathToContentSummary) throws Exception { + Map<String, ContentSummary> inputPathToContentSummary, String userName, String ipAddress) throws Exception { this.queryPlan = queryPlan; this.conf = conf; this.inputPathToContentSummary = inputPathToContentSummary; @@ -69,6 +66,8 @@ public class HookContext { if(SessionState.get() != null){ linfo = SessionState.get().getLineageState().getLineageInfo(); } + this.ipAddress = ipAddress; + this.userName = userName; } public QueryPlan getQueryPlan() { @@ -143,7 +142,15 @@ public class HookContext { this.hookType = hookType; } + public String getIpAddress() { + return this.ipAddress; + } + public String getOperationName() { return SessionState.get().getHiveOperation().name(); } + + public String getUserName() { + return this.userName; + } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1615452&r1=1615451&r2=1615452&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Sun Aug 3 20:48:35 2014 @@ -71,8 +71,20 @@ import org.apache.hadoop.util.Reflection public class HiveInputFormat<K extends WritableComparable, V extends Writable> implements InputFormat<K, V>, JobConfigurable { - public static final String CLASS_NAME = HiveInputFormat.class.getName(); - public static final Log LOG = LogFactory.getLog(CLASS_NAME); + private static final String CLASS_NAME = HiveInputFormat.class.getName(); + private static final Log LOG = LogFactory.getLog(CLASS_NAME); + + /** + * A cache of InputFormat instances. + */ + private static Map<Class, InputFormat<WritableComparable, Writable>> inputFormats + = new ConcurrentHashMap<Class, InputFormat<WritableComparable, Writable>>(); + + private JobConf job; + + // both classes access by subclasses + protected Map<String, PartitionDesc> pathToPartitionInfo; + protected MapWork mrwork; /** * HiveInputSplit encapsulates an InputSplit with its corresponding @@ -178,18 +190,10 @@ public class HiveInputFormat<K extends W } } - JobConf job; - public void configure(JobConf job) { this.job = job; } - /** - * A cache of InputFormat instances. - */ - protected static Map<Class, InputFormat<WritableComparable, Writable>> inputFormats - = new ConcurrentHashMap<Class, InputFormat<WritableComparable, Writable>>(); - public static InputFormat<WritableComparable, Writable> getInputFormatFromCache( Class inputFormatClass, JobConf job) throws IOException { @@ -248,9 +252,6 @@ public class HiveInputFormat<K extends W return rr; } - protected Map<String, PartitionDesc> pathToPartitionInfo; - MapWork mrwork = null; - protected void init(JobConf job) { mrwork = Utilities.getMapWork(job); pathToPartitionInfo = mrwork.getPathToPartitionInfo(); @@ -281,7 +282,6 @@ public class HiveInputFormat<K extends W headerCount = Utilities.getHeaderCount(table); footerCount = Utilities.getFooterCount(table, conf); if (headerCount != 0 || footerCount != 0) { - // Input file has header or footer, cannot be splitted. conf.setLong( ShimLoader.getHadoopShims().getHadoopConfNames().get("MAPREDMINSPLITSIZE"), Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java?rev=1615452&r1=1615451&r2=1615452&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java Sun Aug 3 20:48:35 2014 @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FSDataInputS import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type; +import org.apache.hadoop.hive.ql.io.orc.OrcProto.UserMetadataItem; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -60,6 +61,7 @@ final class ReaderImpl implements Reader private final ObjectInspector inspector; private long deserializedSize = -1; private final Configuration conf; + private final List<Integer> versionList; //serialized footer - Keeping this around for use by getFileMetaInfo() // will help avoid cpu cycles spend in deserializing at cost of increased @@ -306,6 +308,7 @@ final class ReaderImpl implements Reader this.metadata = rInfo.metadata; this.footer = rInfo.footer; this.inspector = rInfo.inspector; + this.versionList = footerMetaData.versionList; } @@ -387,7 +390,8 @@ final class ReaderImpl implements Reader ps.getCompression().toString(), (int) ps.getCompressionBlockSize(), (int) ps.getMetadataLength(), - buffer + buffer, + ps.getVersionList() ); } @@ -446,18 +450,26 @@ final class ReaderImpl implements Reader final int bufferSize; final int metadataSize; final ByteBuffer footerBuffer; + final List<Integer> versionList; + + FileMetaInfo(String compressionType, int bufferSize, int metadataSize, + ByteBuffer footerBuffer) { + this(compressionType, bufferSize, metadataSize, footerBuffer, null); + } + FileMetaInfo(String compressionType, int bufferSize, int metadataSize, - ByteBuffer footerBuffer){ + ByteBuffer footerBuffer, List<Integer> versionList){ this.compressionType = compressionType; this.bufferSize = bufferSize; this.metadataSize = metadataSize; this.footerBuffer = footerBuffer; + this.versionList = versionList; } } public FileMetaInfo getFileMetaInfo(){ return new FileMetaInfo(compressionKind.toString(), bufferSize, - metadataSize, footerByteBuffer); + metadataSize, footerByteBuffer, versionList); } @@ -629,4 +641,11 @@ final class ReaderImpl implements Reader return new Metadata(metadata); } + List<OrcProto.StripeStatistics> getOrcProtoStripeStatistics() { + return metadata.getStripeStatsList(); + } + + public List<UserMetadataItem> getOrcProtoUserMetadata() { + return footer.getMetadataList(); + } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1615452&r1=1615451&r2=1615452&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Sun Aug 3 20:48:35 2014 @@ -28,8 +28,6 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; -import com.google.common.annotations.VisibleForTesting; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -43,8 +41,8 @@ import org.apache.hadoop.hive.ql.io.orc. import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry; import org.apache.hadoop.hive.ql.io.orc.OrcProto.StripeStatistics; import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type; +import org.apache.hadoop.hive.ql.io.orc.OrcProto.UserMetadataItem; import org.apache.hadoop.hive.ql.util.JavaDataModel; -import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; @@ -73,6 +71,7 @@ import org.apache.hadoop.hive.serde2.typ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.protobuf.ByteString; import com.google.protobuf.CodedOutputStream; @@ -2218,4 +2217,78 @@ class WriterImpl implements Writer, Memo } return rawWriter.getPos(); } + + void appendStripe(byte[] stripe, StripeInformation stripeInfo, + OrcProto.StripeStatistics stripeStatistics) throws IOException { + appendStripe(stripe, 0, stripe.length, stripeInfo, stripeStatistics); + } + + void appendStripe(byte[] stripe, int offset, int length, + StripeInformation stripeInfo, + OrcProto.StripeStatistics stripeStatistics) throws IOException { + getStream(); + long start = rawWriter.getPos(); + + long stripeLen = length; + long availBlockSpace = blockSize - (start % blockSize); + + // see if stripe can fit in the current hdfs block, else pad the remaining + // space in the block + if (stripeLen < blockSize && stripeLen > availBlockSpace && + addBlockPadding) { + byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)]; + LOG.info(String.format("Padding ORC by %d bytes while merging..", + availBlockSpace)); + start += availBlockSpace; + while (availBlockSpace > 0) { + int writeLen = (int) Math.min(availBlockSpace, pad.length); + rawWriter.write(pad, 0, writeLen); + availBlockSpace -= writeLen; + } + } + + rawWriter.write(stripe); + rowsInStripe = stripeStatistics.getColStats(0).getNumberOfValues(); + rowCount += rowsInStripe; + + // since we have already written the stripe, just update stripe statistics + treeWriter.stripeStatsBuilders.add(stripeStatistics.toBuilder()); + + // update file level statistics + updateFileStatistics(stripeStatistics); + + // update stripe information + OrcProto.StripeInformation dirEntry = OrcProto.StripeInformation + .newBuilder() + .setOffset(start) + .setNumberOfRows(rowsInStripe) + .setIndexLength(stripeInfo.getIndexLength()) + .setDataLength(stripeInfo.getDataLength()) + .setFooterLength(stripeInfo.getFooterLength()) + .build(); + stripes.add(dirEntry); + + // reset it after writing the stripe + rowsInStripe = 0; + } + + private void updateFileStatistics(OrcProto.StripeStatistics stripeStatistics) { + List<OrcProto.ColumnStatistics> cs = stripeStatistics.getColStatsList(); + + // root element + treeWriter.fileStatistics.merge(ColumnStatisticsImpl.deserialize(cs.get(0))); + TreeWriter[] childWriters = treeWriter.getChildrenWriters(); + for (int i = 0; i < childWriters.length; i++) { + childWriters[i].fileStatistics.merge( + ColumnStatisticsImpl.deserialize(cs.get(i + 1))); + } + } + + void appendUserMetadata(List<UserMetadataItem> userMetadata) { + if (userMetadata != null) { + for (UserMetadataItem item : userMetadata) { + this.userMetadata.put(item.getName(), item.getValue()); + } + } + } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java?rev=1615452&r1=1615451&r2=1615452&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java Sun Aug 3 20:48:35 2014 @@ -104,9 +104,9 @@ public class ParquetRecordReaderWrapper } else { realReader = null; eof = true; - if (valueObj == null) { // Should initialize the value for createValue - valueObj = new ArrayWritable(Writable.class, new Writable[schemaSize]); - } + } + if (valueObj == null) { // Should initialize the value for createValue + valueObj = new ArrayWritable(Writable.class, new Writable[schemaSize]); } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java?rev=1615452&r1=1615451&r2=1615452&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java Sun Aug 3 20:48:35 2014 @@ -19,22 +19,22 @@ package org.apache.hadoop.hive.ql.io.rcfile.merge; import java.io.IOException; -import org.apache.hadoop.mapred.FileInputFormat; + +import org.apache.hadoop.hive.ql.io.merge.MergeInputFormat; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; -@SuppressWarnings({ "deprecation", "unchecked" }) -public class RCFileBlockMergeInputFormat extends FileInputFormat { +public class RCFileBlockMergeInputFormat extends MergeInputFormat { @Override - public RecordReader getRecordReader(InputSplit split, JobConf job, - Reporter reporter) throws IOException { + public RecordReader<RCFileKeyBufferWrapper, RCFileValueBufferWrapper> + getRecordReader(InputSplit split, JobConf job, Reporter reporter) + throws IOException { reporter.setStatus(split.toString()); - return new RCFileBlockMergeRecordReader(job, (FileSplit) split); } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java?rev=1615452&r1=1615451&r2=1615452&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java Sun Aug 3 20:48:35 2014 @@ -22,89 +22,25 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.RCFile; import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; -import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.io.merge.MergeMapper; import org.apache.hadoop.hive.shims.CombineHiveKey; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; @SuppressWarnings("deprecation") -public class RCFileMergeMapper extends MapReduceBase implements +public class RCFileMergeMapper extends MergeMapper implements Mapper<Object, RCFileValueBufferWrapper, Object, Object> { - private JobConf jc; - Class<? extends Writable> outputClass; RCFile.Writer outWriter; - Path finalPath; - FileSystem fs; - - boolean exception = false; - boolean autoDelete = false; - Path outPath; - CompressionCodec codec = null; int columnNumber = 0; - - boolean hasDynamicPartitions = false; - boolean isListBucketingDML = false; - boolean isListBucketingAlterTableConcatenate = false; - int listBucketingDepth; // used as depth for dir-calculation and if it is list bucketing case. - boolean tmpPathFixedConcatenate = false; - boolean tmpPathFixed = false; - Path tmpPath; - Path taskTmpPath; - Path dpPath; - public final static Log LOG = LogFactory.getLog("RCFileMergeMapper"); - public RCFileMergeMapper() { - } - - @Override - public void configure(JobConf job) { - jc = job; - hasDynamicPartitions = HiveConf.getBoolVar(job, - HiveConf.ConfVars.HIVEMERGECURRENTJOBHASDYNAMICPARTITIONS); - isListBucketingAlterTableConcatenate = HiveConf.getBoolVar(job, - HiveConf.ConfVars.HIVEMERGECURRENTJOBCONCATENATELISTBUCKETING); - listBucketingDepth = HiveConf.getIntVar(job, - HiveConf.ConfVars.HIVEMERGECURRENTJOBCONCATENATELISTBUCKETINGDEPTH); - - Path specPath = RCFileBlockMergeOutputFormat.getMergeOutputPath(job); - Path tmpPath = Utilities.toTempPath(specPath); - Path taskTmpPath = Utilities.toTaskTempPath(specPath); - updatePaths(tmpPath, taskTmpPath); - try { - fs = specPath.getFileSystem(job); - autoDelete = fs.deleteOnExit(outPath); - } catch (IOException e) { - this.exception = true; - throw new RuntimeException(e); - } - } - - private void updatePaths(Path tmpPath, Path taskTmpPath) { - String taskId = Utilities.getTaskId(jc); - this.tmpPath = tmpPath; - this.taskTmpPath = taskTmpPath; - finalPath = new Path(tmpPath, taskId); - outPath = new Path(taskTmpPath, Utilities.toTempPath(taskId)); - } - @Override public void map(Object k, RCFileValueBufferWrapper value, OutputCollector<Object, Object> output, Reporter reporter) @@ -118,35 +54,7 @@ public class RCFileMergeMapper extends M key = (RCFileKeyBufferWrapper) k; } - /** - * 1. boolean isListBucketingAlterTableConcatenate will be true only if it is alter table ... - * concatenate on stored-as-dir so it will handle list bucketing alter table merge in the if - * cause with the help of fixTmpPathConcatenate - * 2. If it is DML, isListBucketingAlterTableConcatenate will be false so that it will be - * handled by else cause. In this else cause, we have another if check. - * 2.1 the if check will make sure DP or LB, we will fix path with the help of fixTmpPath(..). - * Since both has sub-directories. it includes SP + LB. - * 2.2 only SP without LB, we dont fix path. - */ - // Fix temp path for alter table ... concatenate - if (isListBucketingAlterTableConcatenate) { - if (this.tmpPathFixedConcatenate) { - checkPartitionsMatch(key.inputPath.getParent()); - } else { - fixTmpPathConcatenate(key.inputPath.getParent()); - tmpPathFixedConcatenate = true; - } - } else { - if (hasDynamicPartitions || (listBucketingDepth > 0)) { - if (tmpPathFixed) { - checkPartitionsMatch(key.inputPath.getParent()); - } else { - // We haven't fixed the TMP path for this mapper yet - fixTmpPath(key.inputPath.getParent()); - tmpPathFixed = true; - } - } - } + fixTmpPathAlterTable(key.inputPath.getParent()); if (outWriter == null) { codec = key.codec; @@ -172,106 +80,6 @@ public class RCFileMergeMapper extends M } } - /** - * Validates that each input path belongs to the same partition - * since each mapper merges the input to a single output directory - * - * @param inputPath - * @throws HiveException - */ - private void checkPartitionsMatch(Path inputPath) throws HiveException { - if (!dpPath.equals(inputPath)) { - // Temp partition input path does not match exist temp path - String msg = "Multiple partitions for one block merge mapper: " + - dpPath + " NOT EQUAL TO " + inputPath; - LOG.error(msg); - throw new HiveException(msg); - } - } - - /** - * Fixes tmpPath to point to the correct partition. - * Before this is called, tmpPath will default to the root tmp table dir - * fixTmpPath(..) works for DP + LB + multiple skewed values + merge. reason: - * 1. fixTmpPath(..) compares inputPath and tmpDepth, find out path difference and put it into - * newPath. Then add newpath to existing this.tmpPath and this.taskTmpPath. - * 2. The path difference between inputPath and tmpDepth can be DP or DP+LB. It will automatically - * handle it. - * 3. For example, - * if inputpath is <prefix>/-ext-10002/hr=a1/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/ - * HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME - * tmppath is <prefix>/_tmp.-ext-10000 - * newpath will be hr=a1/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME - * Then, this.tmpPath and this.taskTmpPath will be update correctly. - * We have list_bucket_dml_6.q cover this case: DP + LP + multiple skewed values + merge. - * @param inputPath - * @throws HiveException - * @throws IOException - */ - private void fixTmpPath(Path inputPath) - throws HiveException, IOException { - dpPath = inputPath; - Path newPath = new Path("."); - int inputDepth = inputPath.depth(); - int tmpDepth = tmpPath.depth(); - - // Build the path from bottom up - while (inputPath != null && inputPath.depth() > tmpDepth) { - newPath = new Path(inputPath.getName(), newPath); - inputDepth--; - inputPath = inputPath.getParent(); - } - - Path newTmpPath = new Path(tmpPath, newPath); - Path newTaskTmpPath = new Path(taskTmpPath, newPath); - if (!fs.exists(newTmpPath)) { - fs.mkdirs(newTmpPath); - } - updatePaths(newTmpPath, newTaskTmpPath); - } - - /** - * Fixes tmpPath to point to the correct list bucketing sub-directories. - * Before this is called, tmpPath will default to the root tmp table dir - * Reason to add a new method instead of changing fixTmpPath() - * Reason 1: logic has slightly difference - * fixTmpPath(..) needs 2 variables in order to decide path delta which is in variable newPath. - * 1. inputPath.depth() - * 2. tmpPath.depth() - * fixTmpPathConcatenate needs 2 variables too but one of them is different from fixTmpPath(..) - * 1. inputPath.depth() - * 2. listBucketingDepth - * Reason 2: less risks - * The existing logic is a little not trivial around map() and fixTmpPath(). In order to ensure - * minimum impact on existing flow, we try to avoid change on existing code/flow but add new code - * for new feature. - * - * @param inputPath - * @throws HiveException - * @throws IOException - */ - private void fixTmpPathConcatenate(Path inputPath) - throws HiveException, IOException { - dpPath = inputPath; - Path newPath = new Path("."); - - int depth = listBucketingDepth; - // Build the path from bottom up. pick up list bucketing subdirectories - while ((inputPath != null) && (depth > 0)) { - newPath = new Path(inputPath.getName(), newPath); - inputPath = inputPath.getParent(); - depth--; - } - - Path newTmpPath = new Path(tmpPath, newPath); - Path newTaskTmpPath = new Path(taskTmpPath, newPath); - if (!fs.exists(newTmpPath)) { - fs.mkdirs(newTmpPath); - } - updatePaths(newTmpPath, newTaskTmpPath); - } - - @Override public void close() throws IOException { // close writer @@ -282,42 +90,7 @@ public class RCFileMergeMapper extends M outWriter.close(); outWriter = null; - if (!exception) { - FileStatus fss = fs.getFileStatus(outPath); - LOG.info("renamed path " + outPath + " to " + finalPath - + " . File size is " + fss.getLen()); - if (!fs.rename(outPath, finalPath)) { - throw new IOException("Unable to rename output to " + finalPath); - } - } else { - if (!autoDelete) { - fs.delete(outPath, true); - } - } - } - - public static String BACKUP_PREFIX = "_backup."; - - public static Path backupOutputPath(FileSystem fs, Path outpath, JobConf job) - throws IOException, HiveException { - if (fs.exists(outpath)) { - Path backupPath = new Path(outpath.getParent(), BACKUP_PREFIX - + outpath.getName()); - Utilities.rename(fs, outpath, backupPath); - return backupPath; - } else { - return null; - } - } - - public static void jobClose(Path outputPath, boolean success, JobConf job, - LogHelper console, DynamicPartitionCtx dynPartCtx, Reporter reporter - ) throws HiveException, IOException { - FileSystem fs = outputPath.getFileSystem(job); - Path backupPath = backupOutputPath(fs, outputPath, job); - Utilities.mvFileToFinalPath(outputPath, job, success, LOG, dynPartCtx, null, - reporter); - fs.delete(backupPath, true); + super.close(); } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java?rev=1615452&r1=1615451&r2=1615452&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java Sun Aug 3 20:48:35 2014 @@ -207,6 +207,16 @@ public class DbLockManager implements Hi } } + /** + * Clear the memory of the locks in this object. This won't clear the locks from the database. + * It is for use with + * {@link #DbLockManager(org.apache.hadoop.hive.metastore.HiveMetaStoreClient).commitTxn} and + * {@link #DbLockManager(org.apache.hadoop.hive.metastore.HiveMetaStoreClient).rollbackTxn}. + */ + void clearLocalLockRecords() { + locks.clear(); + } + // Sleep before we send checkLock again, but do it with a back off // off so we don't sit and hammer the metastore in a tight loop private void backoff() { Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java?rev=1615452&r1=1615451&r2=1615452&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java Sun Aug 3 20:48:35 2014 @@ -203,6 +203,7 @@ public class DbTxnManager extends HiveTx "transaction"); } try { + lockMgr.clearLocalLockRecords(); LOG.debug("Committing txn " + txnId); client.commitTxn(txnId); } catch (NoSuchTxnException e) { @@ -226,6 +227,7 @@ public class DbTxnManager extends HiveTx "transaction"); } try { + lockMgr.clearLocalLockRecords(); LOG.debug("Rolling back txn " + txnId); client.rollbackTxn(txnId); } catch (NoSuchTxnException e) { Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1615452&r1=1615451&r2=1615452&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Sun Aug 3 20:48:35 2014 @@ -43,6 +43,7 @@ import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -141,6 +142,10 @@ public class Hive { } }; + public static Hive get(Configuration c, Class<?> clazz) throws HiveException { + return get(c instanceof HiveConf ? (HiveConf)c : new HiveConf(c, clazz)); + } + /** * Gets hive object for the current thread. If one is not initialized then a * new one is created If the new configuration is different in metadata conf @@ -153,20 +158,13 @@ public class Hive { * */ public static Hive get(HiveConf c) throws HiveException { - boolean needsRefresh = false; Hive db = hiveDB.get(); - if (db != null) { - for (HiveConf.ConfVars oneVar : HiveConf.metaVars) { - // Since metaVars are all of different types, use string for comparison - String oldVar = db.getConf().get(oneVar.varname, ""); - String newVar = c.get(oneVar.varname, ""); - if (oldVar.compareToIgnoreCase(newVar) != 0) { - needsRefresh = true; - break; - } - } + if (db == null || + (db.metaStoreClient != null && !db.metaStoreClient.isCompatibleWith(c))) { + return get(c, true); } - return get(c, needsRefresh); + db.conf = c; + return db; } /** @@ -195,7 +193,8 @@ public class Hive { public static Hive get() throws HiveException { Hive db = hiveDB.get(); if (db == null) { - db = new Hive(new HiveConf(Hive.class)); + SessionState session = SessionState.get(); + db = new Hive(session == null ? new HiveConf(Hive.class) : session.getConf()); hiveDB.set(db); } return db; Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=1615452&r1=1615451&r2=1615452&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Sun Aug 3 20:48:35 2014 @@ -933,7 +933,11 @@ public class Table implements Serializab * @return include the db name */ public String getCompleteName() { - return getDbName() + "@" + getTableName(); + return getCompleteName(getDbName(), getTableName()); + } + + public static String getCompleteName(String dbName, String tabName) { + return dbName + "@" + tabName; } /** Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java?rev=1615452&r1=1615451&r2=1615452&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java Sun Aug 3 20:48:35 2014 @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Stack; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileStatus; @@ -50,6 +51,7 @@ import org.apache.hadoop.hive.ql.parse.Q import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.TableAccessAnalyzer; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -456,10 +458,19 @@ abstract public class AbstractBucketJoin public static List<String> toColumns(List<ExprNodeDesc> keys) { List<String> columns = new ArrayList<String>(); for (ExprNodeDesc key : keys) { - if (!(key instanceof ExprNodeColumnDesc)) { + if (key instanceof ExprNodeColumnDesc) { + columns.add(((ExprNodeColumnDesc) key).getColumn()); + } else if ((key instanceof ExprNodeConstantDesc)) { + ExprNodeConstantDesc constant = (ExprNodeConstantDesc) key; + String colName = constant.getFoldedFromCol(); + if (colName == null){ + return null; + } else { + columns.add(colName); + } + } else { return null; } - columns.add(((ExprNodeColumnDesc) key).getColumn()); } return columns; } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java?rev=1615452&r1=1615451&r2=1615452&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java Sun Aug 3 20:48:35 2014 @@ -82,9 +82,7 @@ public class ConstantPropagate implement // if the later is enabled. return pactx; } - if (pactx.getConf().getBoolVar(ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)) { - return pactx; - } + pGraphContext = pactx; opToParseCtxMap = pGraphContext.getOpParseCtx(); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java?rev=1615452&r1=1615451&r2=1615452&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java Sun Aug 3 20:48:35 2014 @@ -552,6 +552,7 @@ public final class ConstantPropagateProc * conditional expressions and extract assignment expressions and propagate them. */ public static class ConstantPropagateFilterProc implements NodeProcessor { + @Override public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { FilterOperator op = (FilterOperator) nd; @@ -594,6 +595,7 @@ public final class ConstantPropagateProc * Node Processor for Constant Propagate for Group By Operators. */ public static class ConstantPropagateGroupByProc implements NodeProcessor { + @Override public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { GroupByOperator op = (GroupByOperator) nd; @@ -630,6 +632,7 @@ public final class ConstantPropagateProc * The Default Node Processor for Constant Propagation. */ public static class ConstantPropagateDefaultProc implements NodeProcessor { + @Override @SuppressWarnings("unchecked") public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { @@ -658,6 +661,7 @@ public final class ConstantPropagateProc * The Node Processor for Constant Propagation for Select Operators. */ public static class ConstantPropagateSelectProc implements NodeProcessor { + @Override public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { SelectOperator op = (SelectOperator) nd; @@ -691,6 +695,7 @@ public final class ConstantPropagateProc * propagation, this processor also prunes dynamic partitions to static partitions if possible. */ public static class ConstantPropagateFileSinkProc implements NodeProcessor { + @Override public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { FileSinkOperator op = (FileSinkOperator) nd; @@ -743,6 +748,7 @@ public final class ConstantPropagateProc * Currently these kinds of Operators include UnionOperator and ScriptOperator. */ public static class ConstantPropagateStopProc implements NodeProcessor { + @Override public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { Operator<?> op = (Operator<?>) nd; @@ -763,6 +769,7 @@ public final class ConstantPropagateProc * join (left table for left outer join and vice versa) can be propagated. */ public static class ConstantPropagateReduceSinkProc implements NodeProcessor { + @Override public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { ReduceSinkOperator op = (ReduceSinkOperator) nd; @@ -795,7 +802,11 @@ public final class ConstantPropagateProc // key columns ArrayList<ExprNodeDesc> newKeyEpxrs = new ArrayList<ExprNodeDesc>(); for (ExprNodeDesc desc : rsDesc.getKeyCols()) { - newKeyEpxrs.add(foldExpr(desc, constants, cppCtx, op, 0, false)); + ExprNodeDesc newDesc = foldExpr(desc, constants, cppCtx, op, 0, false); + if (newDesc != desc && desc instanceof ExprNodeColumnDesc && newDesc instanceof ExprNodeConstantDesc) { + ((ExprNodeConstantDesc)newDesc).setFoldedFromCol(((ExprNodeColumnDesc)desc).getColumn()); + } + newKeyEpxrs.add(newDesc); } rsDesc.setKeyCols(newKeyEpxrs); @@ -854,6 +865,7 @@ public final class ConstantPropagateProc * The Node Processor for Constant Propagation for Join Operators. */ public static class ConstantPropagateJoinProc implements NodeProcessor { + @Override public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { JoinOperator op = (JoinOperator) nd; @@ -916,6 +928,7 @@ public final class ConstantPropagateProc * The Node Processor for Constant Propagation for Table Scan Operators. */ public static class ConstantPropagateTableScanProc implements NodeProcessor { + @Override public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { TableScanOperator op = (TableScanOperator) nd; Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=1615452&r1=1615451&r2=1615452&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java Sun Aug 3 20:48:35 2014 @@ -98,7 +98,8 @@ public class GenMRFileSink1 implements N if (chDir) { // Merge the files in the destination table/partitions by creating Map-only merge job - // If underlying data is RCFile a RCFileBlockMerge task would be created. + // If underlying data is RCFile or OrcFile, RCFileBlockMerge task or + // OrcFileStripeMerge task would be created. LOG.info("using CombineHiveInputformat for the merge job"); GenMapRedUtils.createMRWorkForMergingFiles(fsOp, finalName, ctx.getDependencyTaskForMultiInsert(), ctx.getMvTask(),
