Modified: 
hive/branches/spark/metastore/src/java/org/apache/hadoop/hive/metastore/RawStoreProxy.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/metastore/src/java/org/apache/hadoop/hive/metastore/RawStoreProxy.java?rev=1615452&r1=1615451&r2=1615452&view=diff
==============================================================================
--- 
hive/branches/spark/metastore/src/java/org/apache/hadoop/hive/metastore/RawStoreProxy.java
 (original)
+++ 
hive/branches/spark/metastore/src/java/org/apache/hadoop/hive/metastore/RawStoreProxy.java
 Sun Aug  3 20:48:35 2014
@@ -26,8 +26,6 @@ import java.lang.reflect.UndeclaredThrow
 import java.util.List;
 
 import org.apache.commons.lang.ClassUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 import org.apache.hadoop.hive.common.classification.InterfaceStability;
@@ -55,7 +53,7 @@ public class RawStoreProxy implements In
     // This has to be called before initializing the instance of RawStore
     init();
 
-    this.base = (RawStore) ReflectionUtils.newInstance(rawStoreClass, conf);
+    this.base = ReflectionUtils.newInstance(rawStoreClass, conf);
   }
 
   public static RawStore getProxy(HiveConf hiveConf, Configuration conf, 
String rawStoreClassName,
@@ -96,14 +94,6 @@ public class RawStoreProxy implements In
   public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
     Object ret = null;
 
-    boolean reloadConf = HiveConf.getBoolVar(hiveConf,
-        HiveConf.ConfVars.METASTOREFORCERELOADCONF);
-
-    if (reloadConf) {
-      MetaStoreInit.updateConnectionURL(hiveConf, getConf(), null, 
metaStoreInitData);
-      initMS();
-    }
-
     try {
       ret = method.invoke(base, args);
     } catch (UndeclaredThrowableException e) {

Modified: 
hive/branches/spark/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java?rev=1615452&r1=1615451&r2=1615452&view=diff
==============================================================================
--- 
hive/branches/spark/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
 (original)
+++ 
hive/branches/spark/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
 Sun Aug  3 20:48:35 2014
@@ -17,17 +17,24 @@
  */
 package org.apache.hadoop.hive.metastore.txn;
 
-import com.jolbox.bonecp.BoneCP;
 import com.jolbox.bonecp.BoneCPConfig;
+import com.jolbox.bonecp.BoneCPDataSource;
+import org.apache.commons.dbcp.ConnectionFactory;
+import org.apache.commons.dbcp.DriverManagerConnectionFactory;
+import org.apache.commons.dbcp.PoolableConnectionFactory;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.dbcp.PoolingDataSource;
 
+import org.apache.commons.pool.ObjectPool;
+import org.apache.commons.pool.impl.GenericObjectPool;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidTxnListImpl;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.*;
 import org.apache.hadoop.util.StringUtils;
 
+import javax.sql.DataSource;
 import java.sql.*;
 import java.util.*;
 
@@ -65,9 +72,9 @@ public class TxnHandler {
   static final private int ALLOWED_REPEATED_DEADLOCKS = 5;
   static final private Log LOG = LogFactory.getLog(TxnHandler.class.getName());
 
-  static private BoneCP connPool;
-  private static final Boolean lockLock = new Boolean("true"); // Random 
object to lock on for the
-  // lock method
+  static private DataSource connPool;
+  private static Boolean lockLock = new Boolean("true"); // Random object to 
lock on for the lock
+  // method
 
   /**
    * Number of consecutive deadlocks we have seen
@@ -1596,14 +1603,28 @@ public class TxnHandler {
     String driverUrl = HiveConf.getVar(conf, 
HiveConf.ConfVars.METASTORECONNECTURLKEY);
     String user = HiveConf.getVar(conf, 
HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME);
     String passwd = HiveConf.getVar(conf, HiveConf.ConfVars.METASTOREPWD);
+    String connectionPooler = HiveConf.getVar(conf,
+        HiveConf.ConfVars.METASTORE_CONNECTION_POOLING_TYPE).toLowerCase();
 
-    BoneCPConfig config = new BoneCPConfig();
-    config.setJdbcUrl(driverUrl);
-    config.setMaxConnectionsPerPartition(10);
-    config.setPartitionCount(1);
-    config.setUser(user);
-    config.setPassword(passwd);
-    connPool = new BoneCP(config);
+    if ("bonecp".equals(connectionPooler)) {
+      BoneCPConfig config = new BoneCPConfig();
+      config.setJdbcUrl(driverUrl);
+      config.setMaxConnectionsPerPartition(10);
+      config.setPartitionCount(1);
+      config.setUser(user);
+      config.setPassword(passwd);
+      connPool = new BoneCPDataSource(config);
+    } else if ("dbcp".equals(connectionPooler)) {
+      ObjectPool objectPool = new GenericObjectPool();
+      ConnectionFactory connFactory = new 
DriverManagerConnectionFactory(driverUrl, user, passwd);
+      // This doesn't get used, but it's still necessary, see
+      // 
http://svn.apache.org/viewvc/commons/proper/dbcp/branches/DBCP_1_4_x_BRANCH/doc/ManualPoolingDataSourceExample.java?view=markup
+      PoolableConnectionFactory poolConnFactory =
+          new PoolableConnectionFactory(connFactory, objectPool, null, null, 
false, true);
+      connPool = new PoolingDataSource(objectPool);
+    } else {
+      throw new RuntimeException("Unknown JDBC connection pooling " + 
connectionPooler);
+    }
   }
 
  private static synchronized void buildJumpTable() {

Modified: hive/branches/spark/pom.xml
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/pom.xml?rev=1615452&r1=1615451&r2=1615452&view=diff
==============================================================================
--- hive/branches/spark/pom.xml (original)
+++ hive/branches/spark/pom.xml Sun Aug  3 20:48:35 2014
@@ -104,6 +104,8 @@
     <commons-lang.version>2.4</commons-lang.version>
     <commons-lang3.version>3.1</commons-lang3.version>
     <commons-logging.version>1.1.3</commons-logging.version>
+    <commons-pool.version>1.5.4</commons-pool.version>
+    <commons-dbcp.version>1.4</commons-dbcp.version>
     <derby.version>10.10.1.1</derby.version>
     <guava.version>14.0.1</guava.version>
     <groovy.version>2.1.6</groovy.version>

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Context.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Context.java?rev=1615452&r1=1615451&r2=1615452&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Context.java 
(original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Context.java Sun 
Aug  3 20:48:35 2014
@@ -39,7 +39,6 @@ import org.apache.hadoop.hive.ql.lockmgr
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 
 import java.io.DataInput;
@@ -54,8 +53,6 @@ import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 
-import javax.security.auth.login.LoginException;
-
 /**
  * Context for Semantic Analyzers. Usage: not reusable - construct a new one 
for
  * each query should call clear() at end of use to remove temporary folders
@@ -337,7 +334,14 @@ public class Context {
    *          external URI to which the tmp data has to be eventually moved
    * @return next available tmp path on the file system corresponding extURI
    */
-  public Path getExternalTmpPath(URI extURI) {
+  public Path getExternalTmpPath(Path path) {
+    URI extURI = path.toUri();
+    if (extURI.getScheme().equals("viewfs")) {
+      // if we are on viewfs we don't want to use /tmp as tmp dir since rename 
from /tmp/..
+      // to final /user/hive/warehouse/ will fail later, so instead pick tmp 
dir
+      // on same namespace as tbl dir.
+      return getExtTmpPathRelTo(path.getParent());
+    }
     return new Path(getExternalScratchDir(extURI), EXT_PREFIX +
       nextPathId());
   }
@@ -347,7 +351,8 @@ public class Context {
    * within passed in uri, whereas getExternalTmpPath() ignores passed in path 
and returns temp
    * path within /tmp
    */
-  public Path getExtTmpPathRelTo(URI uri) {
+  public Path getExtTmpPathRelTo(Path path) {
+    URI uri = path.toUri();
     return new Path (getScratchDir(uri.getScheme(), uri.getAuthority(), 
!explain, 
     uri.getPath() + Path.SEPARATOR + "_" + this.executionId), EXT_PREFIX + 
nextPathId());
   }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1615452&r1=1615451&r2=1615452&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Driver.java 
(original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Sun 
Aug  3 20:48:35 2014
@@ -82,6 +82,7 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
 import org.apache.hadoop.hive.ql.parse.ASTNode;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.ColumnAccessInfo;
 import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHook;
 import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext;
 import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContextImpl;
@@ -503,8 +504,13 @@ public class Driver implements CommandPr
     Hive db = sem.getDb();
 
     if (ss.isAuthorizationModeV2()) {
-      doAuthorizationV2(ss, op, inputs, outputs, command);
-      return;
+      // get mapping of tables to columns used
+      ColumnAccessInfo colAccessInfo = sem.getColumnAccessInfo();
+      // colAccessInfo is set only in case of SemanticAnalyzer
+      Map<String, Set<String>> tab2Cols = colAccessInfo != null ? colAccessInfo
+          .getTableToColumnAccessMap() : null;
+      doAuthorizationV2(ss, op, inputs, outputs, command, tab2Cols);
+     return;
     }
     if (op == null) {
       throw new HiveException("Operation should not be null");
@@ -583,56 +589,9 @@ public class Driver implements CommandPr
         }
       }
 
-      //for a select or create-as-select query, populate the partition to 
column (par2Cols) or
-      // table to columns mapping (tab2Cols)
-      if (op.equals(HiveOperation.CREATETABLE_AS_SELECT)
-          || op.equals(HiveOperation.QUERY)) {
-        SemanticAnalyzer querySem = (SemanticAnalyzer) sem;
-        ParseContext parseCtx = querySem.getParseContext();
-        Map<TableScanOperator, Table> tsoTopMap = parseCtx.getTopToTable();
-
-        for (Map.Entry<String, Operator<? extends OperatorDesc>> topOpMap : 
querySem
-            .getParseContext().getTopOps().entrySet()) {
-          Operator<? extends OperatorDesc> topOp = topOpMap.getValue();
-          if (topOp instanceof TableScanOperator
-              && tsoTopMap.containsKey(topOp)) {
-            TableScanOperator tableScanOp = (TableScanOperator) topOp;
-            Table tbl = tsoTopMap.get(tableScanOp);
-            List<Integer> neededColumnIds = tableScanOp.getNeededColumnIDs();
-            List<FieldSchema> columns = tbl.getCols();
-            List<String> cols = new ArrayList<String>();
-            for (int i = 0; i < neededColumnIds.size(); i++) {
-              cols.add(columns.get(neededColumnIds.get(i)).getName());
-            }
-            //map may not contain all sources, since input list may have been 
optimized out
-            //or non-existent tho such sources may still be referenced by the 
TableScanOperator
-            //if it's null then the partition probably doesn't exist so let's 
use table permission
-            if (tbl.isPartitioned() &&
-                tableUsePartLevelAuth.get(tbl.getTableName()) == Boolean.TRUE) 
{
-              String alias_id = topOpMap.getKey();
-
-              PrunedPartitionList partsList = 
PartitionPruner.prune(tableScanOp,
-                  parseCtx, alias_id);
-              Set<Partition> parts = partsList.getPartitions();
-              for (Partition part : parts) {
-                List<String> existingCols = part2Cols.get(part);
-                if (existingCols == null) {
-                  existingCols = new ArrayList<String>();
-                }
-                existingCols.addAll(cols);
-                part2Cols.put(part, existingCols);
-              }
-            } else {
-              List<String> existingCols = tab2Cols.get(tbl);
-              if (existingCols == null) {
-                existingCols = new ArrayList<String>();
-              }
-              existingCols.addAll(cols);
-              tab2Cols.put(tbl, existingCols);
-            }
-          }
-        }
-      }
+      getTablePartitionUsedColumns(op, sem, tab2Cols, part2Cols, 
tableUsePartLevelAuth);
+
+
 
       // cache the results for table authorization
       Set<String> tableAuthChecked = new HashSet<String>();
@@ -683,8 +642,65 @@ public class Driver implements CommandPr
     }
   }
 
+  private static void getTablePartitionUsedColumns(HiveOperation op, 
BaseSemanticAnalyzer sem,
+      Map<Table, List<String>> tab2Cols, Map<Partition, List<String>> 
part2Cols,
+      Map<String, Boolean> tableUsePartLevelAuth) throws HiveException {
+    // for a select or create-as-select query, populate the partition to column
+    // (par2Cols) or
+    // table to columns mapping (tab2Cols)
+    if (op.equals(HiveOperation.CREATETABLE_AS_SELECT)
+        || op.equals(HiveOperation.QUERY)) {
+      SemanticAnalyzer querySem = (SemanticAnalyzer) sem;
+      ParseContext parseCtx = querySem.getParseContext();
+      Map<TableScanOperator, Table> tsoTopMap = parseCtx.getTopToTable();
+
+      for (Map.Entry<String, Operator<? extends OperatorDesc>> topOpMap : 
querySem
+          .getParseContext().getTopOps().entrySet()) {
+        Operator<? extends OperatorDesc> topOp = topOpMap.getValue();
+        if (topOp instanceof TableScanOperator
+            && tsoTopMap.containsKey(topOp)) {
+          TableScanOperator tableScanOp = (TableScanOperator) topOp;
+          Table tbl = tsoTopMap.get(tableScanOp);
+          List<Integer> neededColumnIds = tableScanOp.getNeededColumnIDs();
+          List<FieldSchema> columns = tbl.getCols();
+          List<String> cols = new ArrayList<String>();
+          for (int i = 0; i < neededColumnIds.size(); i++) {
+            cols.add(columns.get(neededColumnIds.get(i)).getName());
+          }
+          //map may not contain all sources, since input list may have been 
optimized out
+          //or non-existent tho such sources may still be referenced by the 
TableScanOperator
+          //if it's null then the partition probably doesn't exist so let's 
use table permission
+          if (tbl.isPartitioned() &&
+              tableUsePartLevelAuth.get(tbl.getTableName()) == Boolean.TRUE) {
+            String alias_id = topOpMap.getKey();
+
+            PrunedPartitionList partsList = PartitionPruner.prune(tableScanOp,
+                parseCtx, alias_id);
+            Set<Partition> parts = partsList.getPartitions();
+            for (Partition part : parts) {
+              List<String> existingCols = part2Cols.get(part);
+              if (existingCols == null) {
+                existingCols = new ArrayList<String>();
+              }
+              existingCols.addAll(cols);
+              part2Cols.put(part, existingCols);
+            }
+          } else {
+            List<String> existingCols = tab2Cols.get(tbl);
+            if (existingCols == null) {
+              existingCols = new ArrayList<String>();
+            }
+            existingCols.addAll(cols);
+            tab2Cols.put(tbl, existingCols);
+          }
+        }
+      }
+    }
+
+  }
+
   private static void doAuthorizationV2(SessionState ss, HiveOperation op, 
HashSet<ReadEntity> inputs,
-      HashSet<WriteEntity> outputs, String command) throws HiveException {
+      HashSet<WriteEntity> outputs, String command, Map<String, Set<String>> 
tab2cols) throws HiveException {
 
     HiveAuthzContext.Builder authzContextBuilder = new 
HiveAuthzContext.Builder();
 
@@ -696,11 +712,34 @@ public class Driver implements CommandPr
 
     HiveOperationType hiveOpType = getHiveOperationType(op);
     List<HivePrivilegeObject> inputsHObjs = getHivePrivObjects(inputs);
+    updateInputColumnInfo(inputsHObjs, tab2cols);
+
     List<HivePrivilegeObject> outputHObjs = getHivePrivObjects(outputs);
     ss.getAuthorizerV2().checkPrivileges(hiveOpType, inputsHObjs, outputHObjs, 
authzContextBuilder.build());
     return;
   }
 
+  /**
+   * Add column information for input table objects
+   * @param inputsHObjs input HivePrivilegeObject
+   * @param map table to used input columns mapping
+   */
+  private static void updateInputColumnInfo(List<HivePrivilegeObject> 
inputsHObjs,
+      Map<String, Set<String>> tableName2Cols) {
+    if(tableName2Cols == null) {
+      return;
+    }
+    for(HivePrivilegeObject inputObj : inputsHObjs){
+      if(inputObj.getType() != HivePrivilegeObjectType.TABLE_OR_VIEW){
+        // input columns are relevant only for tables or views
+        continue;
+      }
+      Set<String> cols = 
tableName2Cols.get(Table.getCompleteName(inputObj.getDbname(),
+          inputObj.getObjectName()));
+      inputObj.setColumns(cols);
+    }
+  }
+
   private static List<HivePrivilegeObject> getHivePrivObjects(HashSet<? 
extends Entity> privObjects) {
     List<HivePrivilegeObject> hivePrivobjs = new 
ArrayList<HivePrivilegeObject>();
     if(privObjects == null){
@@ -1213,7 +1252,8 @@ public class Driver implements CommandPr
       }
       resStream = null;
 
-      HookContext hookContext = new HookContext(plan, conf, ctx.getPathToCS());
+      SessionState ss = SessionState.get();
+      HookContext hookContext = new HookContext(plan, conf, ctx.getPathToCS(), 
ss.getUserName(), ss.getUserIpAddress());
       hookContext.setHookType(HookContext.HookType.PRE_EXEC_HOOK);
 
       for (Hook peh : getHooks(HiveConf.ConfVars.PREEXECHOOKS)) {

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=1615452&r1=1615451&r2=1615452&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java 
(original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java 
Sun Aug  3 20:48:35 2014
@@ -35,6 +35,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -88,9 +89,9 @@ import org.apache.hadoop.hive.ql.QueryPl
 import org.apache.hadoop.hive.ql.exec.ArchiveUtils.PartSpecInfo;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.io.merge.MergeTask;
+import org.apache.hadoop.hive.ql.io.merge.MergeWork;
 import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
-import org.apache.hadoop.hive.ql.io.rcfile.merge.BlockMergeTask;
-import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
 import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateTask;
 import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateWork;
 import org.apache.hadoop.hive.ql.lockmgr.DbLockManager;
@@ -550,12 +551,13 @@ public class DDLTask extends Task<DDLWor
       throws HiveException {
     // merge work only needs input and output.
     MergeWork mergeWork = new MergeWork(mergeFilesDesc.getInputDir(),
-        mergeFilesDesc.getOutputDir());
+        mergeFilesDesc.getOutputDir(), mergeFilesDesc.getInputFormatClass());
     mergeWork.setListBucketingCtx(mergeFilesDesc.getLbCtx());
     mergeWork.resolveConcatenateMerge(db.getConf());
     mergeWork.setMapperCannotSpanPartns(true);
+    mergeWork.setSourceTableInputFormat(mergeFilesDesc.getInputFormatClass());
     DriverContext driverCxt = new DriverContext();
-    BlockMergeTask taskExec = new BlockMergeTask();
+    MergeTask taskExec = new MergeTask();
     taskExec.initialize(db.getConf(), null, driverCxt);
     taskExec.setWork(mergeWork);
     taskExec.setQueryPlan(this.getQueryPlan());
@@ -598,10 +600,13 @@ public class DDLTask extends Task<DDLWor
 
     HiveAuthorizer authorizer = getSessionAuthorizer();
     try {
+      Set<String> colSet = showGrantDesc.getColumns() != null ? new 
HashSet<String>(
+          showGrantDesc.getColumns()) : null;
       List<HivePrivilegeInfo> privInfos = authorizer.showPrivileges(
           
AuthorizationUtils.getHivePrincipal(showGrantDesc.getPrincipalDesc()),
           AuthorizationUtils.getHivePrivilegeObject(showGrantDesc.getHiveObj(),
-              showGrantDesc.getColumns()));
+              colSet
+              ));
       boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST);
       writeToFile(writeGrantInfo(privInfos, testMode), 
showGrantDesc.getResFile());
     } catch (IOException e) {
@@ -1283,7 +1288,7 @@ public class DDLTask extends Task<DDLWor
       // First create the archive in a tmp dir so that if the job fails, the
       // bad files don't pollute the filesystem
       Path tmpPath = new Path(driverContext.getCtx()
-          .getExternalTmpPath(originalDir.toUri()), "partlevel");
+          .getExternalTmpPath(originalDir), "partlevel");
 
       console.printInfo("Creating " + archiveName +
           " for " + originalDir.toString());
@@ -1478,7 +1483,7 @@ public class DDLTask extends Task<DDLWor
       throw new HiveException("Haven't found any archive where it should be");
     }
 
-    Path tmpPath = 
driverContext.getCtx().getExternalTmpPath(originalDir.toUri());
+    Path tmpPath = driverContext.getCtx().getExternalTmpPath(originalDir);
 
     try {
       fs = tmpPath.getFileSystem(conf);

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=1615452&r1=1615451&r2=1615452&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java 
(original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java 
Sun Aug  3 20:48:35 2014
@@ -36,7 +36,7 @@ import org.apache.hadoop.hive.ql.exec.mr
 import org.apache.hadoop.hive.ql.hooks.LineageInfo.DataContainer;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
-import org.apache.hadoop.hive.ql.io.rcfile.merge.BlockMergeTask;
+import org.apache.hadoop.hive.ql.io.merge.MergeTask;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj;
@@ -294,7 +294,7 @@ public class MoveTask extends Task<MoveW
           while (task.getParentTasks() != null && task.getParentTasks().size() 
== 1) {
             task = (Task)task.getParentTasks().get(0);
             // If it was a merge task or a local map reduce task, nothing can 
be inferred
-            if (task instanceof BlockMergeTask || task instanceof 
MapredLocalTask) {
+            if (task instanceof MergeTask || task instanceof MapredLocalTask) {
               break;
             }
 

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java?rev=1615452&r1=1615451&r2=1615452&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java 
(original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java 
Sun Aug  3 20:48:35 2014
@@ -165,7 +165,7 @@ public abstract class Task<T extends Ser
       }
       return retval;
     } catch (IOException e) {
-      throw new RuntimeException(e.getMessage());
+      throw new RuntimeException("Unexpected error: " + e.getMessage(), e);
     }
   }
 

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java?rev=1615452&r1=1615451&r2=1615452&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java 
(original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java 
Sun Aug  3 20:48:35 2014
@@ -29,8 +29,8 @@ import org.apache.hadoop.hive.ql.exec.sp
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.index.IndexMetadataChangeTask;
 import org.apache.hadoop.hive.ql.index.IndexMetadataChangeWork;
-import org.apache.hadoop.hive.ql.io.rcfile.merge.BlockMergeTask;
-import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
+import org.apache.hadoop.hive.ql.io.merge.MergeTask;
+import org.apache.hadoop.hive.ql.io.merge.MergeWork;
 import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanTask;
 import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork;
 import org.apache.hadoop.hive.ql.plan.ColumnStatsWork;
@@ -95,7 +95,7 @@ public final class TaskFactory {
     taskvec.add(new TaskTuple<StatsNoJobWork>(StatsNoJobWork.class, 
StatsNoJobTask.class));
     taskvec.add(new TaskTuple<ColumnStatsWork>(ColumnStatsWork.class, 
ColumnStatsTask.class));
     taskvec.add(new TaskTuple<MergeWork>(MergeWork.class,
-        BlockMergeTask.class));
+        MergeTask.class));
     taskvec.add(new 
TaskTuple<DependencyCollectionWork>(DependencyCollectionWork.class,
         DependencyCollectionTask.class));
     taskvec.add(new TaskTuple<PartialScanWork>(PartialScanWork.class,

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1615452&r1=1615451&r2=1615452&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java 
(original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java 
Sun Aug  3 20:48:35 2014
@@ -121,7 +121,8 @@ import org.apache.hadoop.hive.ql.io.Hive
 import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat;
 import org.apache.hadoop.hive.ql.io.RCFile;
 import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat;
-import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
+import org.apache.hadoop.hive.ql.io.merge.MergeWork;
+import org.apache.hadoop.hive.ql.io.orc.OrcFileMergeMapper;
 import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileMergeMapper;
 import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanMapper;
 import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork;
@@ -353,7 +354,8 @@ public final class Utilities {
         if(MAP_PLAN_NAME.equals(name)){
           if 
(ExecMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))){
             gWork = deserializePlan(in, MapWork.class, conf);
-          } else 
if(RCFileMergeMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {
+          } else 
if(RCFileMergeMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS)) ||
+              
OrcFileMergeMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {
             gWork = deserializePlan(in, MergeWork.class, conf);
           } else 
if(ColumnTruncateMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {
             gWork = deserializePlan(in, ColumnTruncateWork.class, conf);

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1615452&r1=1615451&r2=1615452&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
 Sun Aug  3 20:48:35 2014
@@ -499,7 +499,7 @@ public class ExecDriver extends Task<Map
       inputPaths.add(new Path(path));
     }
 
-    Path tmpPath = 
context.getCtx().getExternalTmpPath(inputPaths.get(0).toUri());
+    Path tmpPath = context.getCtx().getExternalTmpPath(inputPaths.get(0));
     Path partitionFile = new Path(tmpPath, ".partitions");
     ShimLoader.getHadoopShims().setTotalOrderPartitionFile(job, partitionFile);
     PartitionKeySampler sampler = new PartitionKeySampler();

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1615452&r1=1615451&r2=1615452&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
 Sun Aug  3 20:48:35 2014
@@ -39,7 +39,6 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.serde2.Deserializer;
-import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -56,7 +55,7 @@ import org.apache.hadoop.util.StringUtil
 /**
  * ExecReducer is the generic Reducer class for Hive. Together with ExecMapper 
it is
  * the bridge between the map-reduce framework and the Hive operator pipeline 
at
- * execution time. It's main responsabilities are:
+ * execution time. It's main responsibilities are:
  *
  * - Load and setup the operator pipeline from XML
  * - Run the pipeline by transforming key, value pairs to records and 
forwarding them to the operators
@@ -66,8 +65,20 @@ import org.apache.hadoop.util.StringUtil
  */
 public class ExecReducer extends MapReduceBase implements Reducer {
 
+  private static final Log LOG = LogFactory.getLog("ExecReducer");
   private static final String PLAN_KEY = "__REDUCE_PLAN__";
 
+  // used to log memory usage periodically
+  private final MemoryMXBean memoryMXBean = 
ManagementFactory.getMemoryMXBean();
+  // Input value serde needs to be an array to support different SerDe
+  // for different tags
+  private final Deserializer[] inputValueDeserializer = new 
Deserializer[Byte.MAX_VALUE];
+  private final Object[] valueObject = new Object[Byte.MAX_VALUE];
+  private final List<Object> row = new 
ArrayList<Object>(Utilities.reduceFieldNameList.size());
+  private final boolean isLogInfoEnabled = LOG.isInfoEnabled();
+
+  // TODO: move to DynamicSerDe when it's ready
+  private Deserializer inputKeyDeserializer;
   private JobConf jc;
   private OutputCollector<?, ?> oc;
   private Operator<?> reducer;
@@ -76,23 +87,13 @@ public class ExecReducer extends MapRedu
   private boolean isTagged = false;
   private long cntr = 0;
   private long nextCntr = 1;
-
-  public static final Log l4j = LogFactory.getLog("ExecReducer");
-  private boolean isLogInfoEnabled = false;
-
-  // used to log memory usage periodically
-  private MemoryMXBean memoryMXBean;
-
-  // TODO: move to DynamicSerDe when it's ready
-  private Deserializer inputKeyDeserializer;
-  // Input value serde needs to be an array to support different SerDe
-  // for different tags
-  private final SerDe[] inputValueDeserializer = new SerDe[Byte.MAX_VALUE];
-
-  TableDesc keyTableDesc;
-  TableDesc[] valueTableDesc;
-
-  ObjectInspector[] rowObjectInspector;
+  private TableDesc keyTableDesc;
+  private TableDesc[] valueTableDesc;
+  private ObjectInspector[] rowObjectInspector;
+
+  // runtime objects
+  private transient Object keyObject;
+  private transient BytesWritable groupKey;
 
   @Override
   public void configure(JobConf job) {
@@ -100,20 +101,16 @@ public class ExecReducer extends MapRedu
     ObjectInspector[] valueObjectInspector = new 
ObjectInspector[Byte.MAX_VALUE];
     ObjectInspector keyObjectInspector;
 
-    // Allocate the bean at the beginning -
-    memoryMXBean = ManagementFactory.getMemoryMXBean();
-    l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
-
-    isLogInfoEnabled = l4j.isInfoEnabled();
+    LOG.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
 
     try {
-      l4j.info("conf classpath = "
+      LOG.info("conf classpath = "
           + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs()));
-      l4j.info("thread classpath = "
+      LOG.info("thread classpath = "
           + Arrays.asList(((URLClassLoader) Thread.currentThread()
           .getContextClassLoader()).getURLs()));
     } catch (Exception e) {
-      l4j.info("cannot get classpath: " + e.getMessage());
+      LOG.info("cannot get classpath: " + e.getMessage());
     }
     jc = job;
 
@@ -132,7 +129,7 @@ public class ExecReducer extends MapRedu
     isTagged = gWork.getNeedsTagging();
     try {
       keyTableDesc = gWork.getKeyDesc();
-      inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc
+      inputKeyDeserializer = ReflectionUtils.newInstance(keyTableDesc
           .getDeserializerClass(), null);
       SerDeUtils.initializeSerDe(inputKeyDeserializer, null, 
keyTableDesc.getProperties(), null);
       keyObjectInspector = inputKeyDeserializer.getObjectInspector();
@@ -140,7 +137,7 @@ public class ExecReducer extends MapRedu
       for (int tag = 0; tag < gWork.getTagToValueDesc().size(); tag++) {
         // We should initialize the SerDe with the TypeInfo when available.
         valueTableDesc[tag] = gWork.getTagToValueDesc().get(tag);
-        inputValueDeserializer[tag] = (SerDe) ReflectionUtils.newInstance(
+        inputValueDeserializer[tag] = ReflectionUtils.newInstance(
             valueTableDesc[tag].getDeserializerClass(), null);
         SerDeUtils.initializeSerDe(inputValueDeserializer[tag], null,
                                    valueTableDesc[tag].getProperties(), null);
@@ -162,7 +159,7 @@ public class ExecReducer extends MapRedu
 
     // initialize reduce operator tree
     try {
-      l4j.info(reducer.dump(0));
+      LOG.info(reducer.dump(0));
       reducer.initialize(jc, rowObjectInspector);
     } catch (Throwable e) {
       abort = true;
@@ -175,13 +172,6 @@ public class ExecReducer extends MapRedu
     }
   }
 
-  private Object keyObject;
-  private final Object[] valueObject = new Object[Byte.MAX_VALUE];
-
-  private BytesWritable groupKey;
-
-  List<Object> row = new 
ArrayList<Object>(Utilities.reduceFieldNameList.size());
-
   public void reduce(Object key, Iterator values, OutputCollector output,
       Reporter reporter) throws IOException {
     if (reducer.getDone()) {
@@ -212,7 +202,7 @@ public class ExecReducer extends MapRedu
           groupKey = new BytesWritable();
         } else {
           // If a operator wants to do some work at the end of a group
-          l4j.trace("End Group");
+          LOG.trace("End Group");
           reducer.endGroup();
         }
 
@@ -227,7 +217,7 @@ public class ExecReducer extends MapRedu
         }
 
         groupKey.set(keyWritable.get(), 0, keyWritable.getSize());
-        l4j.trace("Start Group");
+        LOG.trace("Start Group");
         reducer.setGroupKeyObject(keyObject);
         reducer.startGroup();
       }
@@ -253,7 +243,7 @@ public class ExecReducer extends MapRedu
           cntr++;
           if (cntr == nextCntr) {
             long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
-            l4j.info("ExecReducer: processing " + cntr
+            LOG.info("ExecReducer: processing " + cntr
                 + " rows: used memory = " + used_memory);
             nextCntr = getNextCntr(cntr);
           }
@@ -279,7 +269,7 @@ public class ExecReducer extends MapRedu
         // Don't create a new object if we are already out of memory
         throw (OutOfMemoryError) e;
       } else {
-        l4j.fatal(StringUtils.stringifyException(e));
+        LOG.fatal(StringUtils.stringifyException(e));
         throw new RuntimeException(e);
       }
     }
@@ -301,17 +291,17 @@ public class ExecReducer extends MapRedu
 
     // No row was processed
     if (oc == null) {
-      l4j.trace("Close called no row");
+      LOG.trace("Close called without any rows processed");
     }
 
     try {
       if (groupKey != null) {
         // If a operator wants to do some work at the end of a group
-        l4j.trace("End Group");
+        LOG.trace("End Group");
         reducer.endGroup();
       }
       if (isLogInfoEnabled) {
-        l4j.info("ExecReducer: processed " + cntr + " rows: used memory = "
+        LOG.info("ExecReducer: processed " + cntr + " rows: used memory = "
             + memoryMXBean.getHeapMemoryUsage().getUsed());
       }
 
@@ -322,7 +312,7 @@ public class ExecReducer extends MapRedu
     } catch (Exception e) {
       if (!abort) {
         // signal new failure to map-reduce
-        l4j.error("Hit error while closing operators - failing tree");
+        LOG.error("Hit error while closing operators - failing tree");
         throw new RuntimeException("Hive Runtime Error while closing 
operators: "
             + e.getMessage(), e);
       }

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1615452&r1=1615451&r2=1615452&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
 Sun Aug  3 20:48:35 2014
@@ -16,9 +16,9 @@
  * limitations under the License.
  */
 package org.apache.hadoop.hive.ql.exec.tez;
+
 import java.io.IOException;
 import java.text.NumberFormat;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -101,7 +101,7 @@ public class TezProcessor implements Log
   private void setupMRLegacyConfigs(TezProcessorContext processorContext) {
     // Hive "insert overwrite local directory" uses task id as dir name
     // Setting the id in jobconf helps to have the similar dir name as MR
-    StringBuilder taskAttemptIdBuilder = new StringBuilder("task");
+    StringBuilder taskAttemptIdBuilder = new StringBuilder("attempt_");
     
taskAttemptIdBuilder.append(processorContext.getApplicationId().getClusterTimestamp())
         .append("_")
         
.append(jobIdFormat.format(processorContext.getApplicationId().getId()))

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringConcatColScalar.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringConcatColScalar.java?rev=1615452&r1=1615451&r2=1615452&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringConcatColScalar.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringConcatColScalar.java
 Sun Aug  3 20:48:35 2014
@@ -45,6 +45,11 @@ public class StringConcatColScalar exten
 
   @Override
   public void evaluate(VectorizedRowBatch batch) {
+
+    if (childExpressions != null) {
+      super.evaluateChildren(batch);
+    }
+
     BytesColumnVector inputColVector = (BytesColumnVector) batch.cols[colNum];
     BytesColumnVector outV = (BytesColumnVector) batch.cols[outputColumn];
     int[] sel = batch.selected;

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringConcatScalarCol.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringConcatScalarCol.java?rev=1615452&r1=1615451&r2=1615452&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringConcatScalarCol.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringConcatScalarCol.java
 Sun Aug  3 20:48:35 2014
@@ -45,6 +45,11 @@ public class StringConcatScalarCol exten
 
   @Override
   public void evaluate(VectorizedRowBatch batch) {
+
+    if (childExpressions != null) {
+        super.evaluateChildren(batch);
+      }
+
     BytesColumnVector inputColVector = (BytesColumnVector) batch.cols[colNum];
     BytesColumnVector outV = (BytesColumnVector) batch.cols[outputColumn];
     int[] sel = batch.selected;

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateString.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateString.java?rev=1615452&r1=1615451&r2=1615452&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateString.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateString.java
 Sun Aug  3 20:48:35 2014
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.hive.ql.exec.vector.expressions;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
 import org.apache.hadoop.io.Text;
 
 import java.sql.Date;
@@ -25,6 +28,9 @@ import java.sql.Date;
 public class VectorUDFDateString extends StringUnaryUDF {
   private static final long serialVersionUID = 1L;
 
+  private static final Log LOG = LogFactory.getLog(
+      VectorUDFDateString.class.getName());
+
   public VectorUDFDateString(int colNum, int outputColumn) {
     super(colNum, outputColumn, new StringUnaryUDF.IUDFUnaryString() {
       Text t = new Text();
@@ -39,7 +45,9 @@ public class VectorUDFDateString extends
           t.set(date.toString());
           return t;
         } catch (IllegalArgumentException e) {
-          e.printStackTrace();
+          if (LOG.isDebugEnabled()) {
+            LOG.info("VectorUDFDateString passed bad string for Date.valueOf 
'" + s.toString() + "'");
+          }
           return null;
         }
       }

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFTimestampFieldLong.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFTimestampFieldLong.java?rev=1615452&r1=1615451&r2=1615452&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFTimestampFieldLong.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFTimestampFieldLong.java
 Sun Aug  3 20:48:35 2014
@@ -88,6 +88,11 @@ public abstract class VectorUDFTimestamp
 
   @Override
   public void evaluate(VectorizedRowBatch batch) {
+
+    if (childExpressions != null) {
+        super.evaluateChildren(batch);
+      }
+
     LongColumnVector outV = (LongColumnVector) batch.cols[outputColumn];
     LongColumnVector inputCol = (LongColumnVector)batch.cols[this.colNum];
     /* every line below this is identical for evaluateLong & evaluateString */

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java?rev=1615452&r1=1615451&r2=1615452&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java
 Sun Aug  3 20:48:35 2014
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -51,13 +50,11 @@ public class HookContext {
   private UserGroupInformation ugi;
   private HookType hookType;
   final private Map<String, ContentSummary> inputPathToContentSummary;
-
-  public HookContext(QueryPlan queryPlan, HiveConf conf) throws Exception{
-    this(queryPlan, conf, new ConcurrentHashMap<String, ContentSummary>());
-  }
+  private final String ipAddress;
+  private final String userName;
 
   public HookContext(QueryPlan queryPlan, HiveConf conf,
-      Map<String, ContentSummary> inputPathToContentSummary) throws Exception {
+      Map<String, ContentSummary> inputPathToContentSummary, String userName, 
String ipAddress) throws Exception {
     this.queryPlan = queryPlan;
     this.conf = conf;
     this.inputPathToContentSummary = inputPathToContentSummary;
@@ -69,6 +66,8 @@ public class HookContext {
     if(SessionState.get() != null){
       linfo = SessionState.get().getLineageState().getLineageInfo();
     }
+    this.ipAddress = ipAddress;
+    this.userName = userName;
   }
 
   public QueryPlan getQueryPlan() {
@@ -143,7 +142,15 @@ public class HookContext {
     this.hookType = hookType;
   }
 
+  public String getIpAddress() {
+    return this.ipAddress;
+ }
+
   public String getOperationName() {
     return SessionState.get().getHiveOperation().name();
   }
+
+  public String getUserName() {
+    return this.userName;
+  }
 }

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1615452&r1=1615451&r2=1615452&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
 Sun Aug  3 20:48:35 2014
@@ -71,8 +71,20 @@ import org.apache.hadoop.util.Reflection
 public class HiveInputFormat<K extends WritableComparable, V extends Writable>
     implements InputFormat<K, V>, JobConfigurable {
 
-  public static final String CLASS_NAME = HiveInputFormat.class.getName();
-  public static final Log LOG = LogFactory.getLog(CLASS_NAME);
+  private static final String CLASS_NAME = HiveInputFormat.class.getName();
+  private static final Log LOG = LogFactory.getLog(CLASS_NAME);
+
+  /**
+   * A cache of InputFormat instances.
+   */
+  private static Map<Class, InputFormat<WritableComparable, Writable>> 
inputFormats 
+    = new ConcurrentHashMap<Class, InputFormat<WritableComparable, 
Writable>>();
+
+  private JobConf job;
+
+  // both classes access by subclasses
+  protected Map<String, PartitionDesc> pathToPartitionInfo;
+  protected MapWork mrwork;
 
   /**
    * HiveInputSplit encapsulates an InputSplit with its corresponding
@@ -178,18 +190,10 @@ public class HiveInputFormat<K extends W
     }
   }
 
-  JobConf job;
-
   public void configure(JobConf job) {
     this.job = job;
   }
 
-  /**
-   * A cache of InputFormat instances.
-   */
-  protected static Map<Class, InputFormat<WritableComparable, Writable>> 
inputFormats 
-    = new ConcurrentHashMap<Class, InputFormat<WritableComparable, 
Writable>>();
-
   public static InputFormat<WritableComparable, Writable> 
getInputFormatFromCache(
     Class inputFormatClass, JobConf job) throws IOException {
 
@@ -248,9 +252,6 @@ public class HiveInputFormat<K extends W
     return rr;
   }
 
-  protected Map<String, PartitionDesc> pathToPartitionInfo;
-  MapWork mrwork = null;
-
   protected void init(JobConf job) {
     mrwork = Utilities.getMapWork(job);
     pathToPartitionInfo = mrwork.getPathToPartitionInfo();
@@ -281,7 +282,6 @@ public class HiveInputFormat<K extends W
       headerCount = Utilities.getHeaderCount(table);
       footerCount = Utilities.getFooterCount(table, conf);
       if (headerCount != 0 || footerCount != 0) {
-        
         // Input file has header or footer, cannot be splitted.
         conf.setLong(
             
ShimLoader.getHadoopShims().getHadoopConfNames().get("MAPREDMINSPLITSIZE"),

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java?rev=1615452&r1=1615451&r2=1615452&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
 Sun Aug  3 20:48:35 2014
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.UserMetadataItem;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -60,6 +61,7 @@ final class ReaderImpl implements Reader
   private final ObjectInspector inspector;
   private long deserializedSize = -1;
   private final Configuration conf;
+  private final List<Integer> versionList;
 
   //serialized footer - Keeping this around for use by getFileMetaInfo()
   // will help avoid cpu cycles spend in deserializing at cost of increased
@@ -306,6 +308,7 @@ final class ReaderImpl implements Reader
     this.metadata = rInfo.metadata;
     this.footer = rInfo.footer;
     this.inspector = rInfo.inspector;
+    this.versionList = footerMetaData.versionList;
   }
 
 
@@ -387,7 +390,8 @@ final class ReaderImpl implements Reader
         ps.getCompression().toString(),
         (int) ps.getCompressionBlockSize(),
         (int) ps.getMetadataLength(),
-        buffer
+        buffer,
+        ps.getVersionList()
         );
   }
 
@@ -446,18 +450,26 @@ final class ReaderImpl implements Reader
     final int bufferSize;
     final int metadataSize;
     final ByteBuffer footerBuffer;
+    final List<Integer> versionList;
+
+    FileMetaInfo(String compressionType, int bufferSize, int metadataSize,
+        ByteBuffer footerBuffer) {
+      this(compressionType, bufferSize, metadataSize, footerBuffer, null);
+    }
+
     FileMetaInfo(String compressionType, int bufferSize, int metadataSize,
-                 ByteBuffer footerBuffer){
+                 ByteBuffer footerBuffer, List<Integer> versionList){
       this.compressionType = compressionType;
       this.bufferSize = bufferSize;
       this.metadataSize = metadataSize;
       this.footerBuffer = footerBuffer;
+      this.versionList = versionList;
     }
   }
 
   public FileMetaInfo getFileMetaInfo(){
     return new FileMetaInfo(compressionKind.toString(), bufferSize,
-        metadataSize, footerByteBuffer);
+        metadataSize, footerByteBuffer, versionList);
   }
 
 
@@ -629,4 +641,11 @@ final class ReaderImpl implements Reader
     return new Metadata(metadata);
   }
 
+  List<OrcProto.StripeStatistics> getOrcProtoStripeStatistics() {
+    return metadata.getStripeStatsList();
+  }
+
+  public List<UserMetadataItem> getOrcProtoUserMetadata() {
+    return footer.getMetadataList();
+  }
 }

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1615452&r1=1615451&r2=1615452&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
 Sun Aug  3 20:48:35 2014
@@ -28,8 +28,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
-import com.google.common.annotations.VisibleForTesting;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -43,8 +41,8 @@ import org.apache.hadoop.hive.ql.io.orc.
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.StripeStatistics;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.UserMetadataItem;
 import org.apache.hadoop.hive.ql.util.JavaDataModel;
-import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
@@ -73,6 +71,7 @@ import org.apache.hadoop.hive.serde2.typ
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.CodedOutputStream;
@@ -2218,4 +2217,78 @@ class WriterImpl implements Writer, Memo
     }
     return rawWriter.getPos();
   }
+
+  void appendStripe(byte[] stripe, StripeInformation stripeInfo,
+      OrcProto.StripeStatistics stripeStatistics) throws IOException {
+    appendStripe(stripe, 0, stripe.length, stripeInfo, stripeStatistics);
+  }
+
+  void appendStripe(byte[] stripe, int offset, int length,
+      StripeInformation stripeInfo,
+      OrcProto.StripeStatistics stripeStatistics) throws IOException {
+    getStream();
+    long start = rawWriter.getPos();
+
+    long stripeLen = length;
+    long availBlockSpace = blockSize - (start % blockSize);
+
+    // see if stripe can fit in the current hdfs block, else pad the remaining
+    // space in the block
+    if (stripeLen < blockSize && stripeLen > availBlockSpace &&
+        addBlockPadding) {
+      byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)];
+      LOG.info(String.format("Padding ORC by %d bytes while merging..",
+          availBlockSpace));
+      start += availBlockSpace;
+      while (availBlockSpace > 0) {
+        int writeLen = (int) Math.min(availBlockSpace, pad.length);
+        rawWriter.write(pad, 0, writeLen);
+        availBlockSpace -= writeLen;
+      }
+    }
+
+    rawWriter.write(stripe);
+    rowsInStripe = stripeStatistics.getColStats(0).getNumberOfValues();
+    rowCount += rowsInStripe;
+
+    // since we have already written the stripe, just update stripe statistics
+    treeWriter.stripeStatsBuilders.add(stripeStatistics.toBuilder());
+
+    // update file level statistics
+    updateFileStatistics(stripeStatistics);
+
+    // update stripe information
+    OrcProto.StripeInformation dirEntry = OrcProto.StripeInformation
+        .newBuilder()
+        .setOffset(start)
+        .setNumberOfRows(rowsInStripe)
+        .setIndexLength(stripeInfo.getIndexLength())
+        .setDataLength(stripeInfo.getDataLength())
+        .setFooterLength(stripeInfo.getFooterLength())
+        .build();
+    stripes.add(dirEntry);
+
+    // reset it after writing the stripe
+    rowsInStripe = 0;
+  }
+
+  private void updateFileStatistics(OrcProto.StripeStatistics 
stripeStatistics) {
+    List<OrcProto.ColumnStatistics> cs = stripeStatistics.getColStatsList();
+
+    // root element
+    
treeWriter.fileStatistics.merge(ColumnStatisticsImpl.deserialize(cs.get(0)));
+    TreeWriter[] childWriters = treeWriter.getChildrenWriters();
+    for (int i = 0; i < childWriters.length; i++) {
+      childWriters[i].fileStatistics.merge(
+          ColumnStatisticsImpl.deserialize(cs.get(i + 1)));
+    }
+  }
+
+  void appendUserMetadata(List<UserMetadataItem> userMetadata) {
+    if (userMetadata != null) {
+      for (UserMetadataItem item : userMetadata) {
+        this.userMetadata.put(item.getName(), item.getValue());
+      }
+    }
+  }
 }

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java?rev=1615452&r1=1615451&r2=1615452&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
 Sun Aug  3 20:48:35 2014
@@ -104,9 +104,9 @@ public class ParquetRecordReaderWrapper 
     } else {
       realReader = null;
       eof = true;
-      if (valueObj == null) { // Should initialize the value for createValue
-        valueObj = new ArrayWritable(Writable.class, new Writable[schemaSize]);
-      }
+    }
+    if (valueObj == null) { // Should initialize the value for createValue
+      valueObj = new ArrayWritable(Writable.class, new Writable[schemaSize]);
     }
   }
 

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java?rev=1615452&r1=1615451&r2=1615452&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java
 Sun Aug  3 20:48:35 2014
@@ -19,22 +19,22 @@
 package org.apache.hadoop.hive.ql.io.rcfile.merge;
 
 import java.io.IOException;
-import org.apache.hadoop.mapred.FileInputFormat;
+
+import org.apache.hadoop.hive.ql.io.merge.MergeInputFormat;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 
-@SuppressWarnings({ "deprecation", "unchecked" })
-public class RCFileBlockMergeInputFormat extends FileInputFormat {
+public class RCFileBlockMergeInputFormat extends MergeInputFormat {
 
   @Override
-  public RecordReader getRecordReader(InputSplit split, JobConf job,
-      Reporter reporter) throws IOException {
+  public RecordReader<RCFileKeyBufferWrapper, RCFileValueBufferWrapper>
+    getRecordReader(InputSplit split, JobConf job, Reporter reporter)
+    throws IOException {
 
     reporter.setStatus(split.toString());
-
     return new RCFileBlockMergeRecordReader(job, (FileSplit) split);
   }
 

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java?rev=1615452&r1=1615451&r2=1615452&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java
 Sun Aug  3 20:48:35 2014
@@ -22,89 +22,25 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.RCFile;
 import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
-import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.ql.io.merge.MergeMapper;
 import org.apache.hadoop.hive.shims.CombineHiveKey;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 
 @SuppressWarnings("deprecation")
-public class RCFileMergeMapper extends MapReduceBase implements
+public class RCFileMergeMapper extends MergeMapper implements
     Mapper<Object, RCFileValueBufferWrapper, Object, Object> {
 
-  private JobConf jc;
-  Class<? extends Writable> outputClass;
   RCFile.Writer outWriter;
 
-  Path finalPath;
-  FileSystem fs;
-
-  boolean exception = false;
-  boolean autoDelete = false;
-  Path outPath;
-
   CompressionCodec codec = null;
   int columnNumber = 0;
-
-  boolean hasDynamicPartitions = false;
-  boolean isListBucketingDML = false;
-  boolean isListBucketingAlterTableConcatenate = false;
-  int listBucketingDepth; // used as depth for dir-calculation and if it is 
list bucketing case.
-  boolean tmpPathFixedConcatenate = false;
-  boolean tmpPathFixed = false;
-  Path tmpPath;
-  Path taskTmpPath;
-  Path dpPath;
-
   public final static Log LOG = LogFactory.getLog("RCFileMergeMapper");
 
-  public RCFileMergeMapper() {
-  }
-
-  @Override
-  public void configure(JobConf job) {
-    jc = job;
-    hasDynamicPartitions = HiveConf.getBoolVar(job,
-        HiveConf.ConfVars.HIVEMERGECURRENTJOBHASDYNAMICPARTITIONS);
-    isListBucketingAlterTableConcatenate = HiveConf.getBoolVar(job,
-        HiveConf.ConfVars.HIVEMERGECURRENTJOBCONCATENATELISTBUCKETING);
-    listBucketingDepth = HiveConf.getIntVar(job,
-        HiveConf.ConfVars.HIVEMERGECURRENTJOBCONCATENATELISTBUCKETINGDEPTH);
-
-    Path specPath = RCFileBlockMergeOutputFormat.getMergeOutputPath(job);
-    Path tmpPath = Utilities.toTempPath(specPath);
-    Path taskTmpPath = Utilities.toTaskTempPath(specPath);
-    updatePaths(tmpPath, taskTmpPath);
-    try {
-      fs = specPath.getFileSystem(job);
-      autoDelete = fs.deleteOnExit(outPath);
-    } catch (IOException e) {
-      this.exception = true;
-      throw new RuntimeException(e);
-    }
-  }
-
-  private void updatePaths(Path tmpPath, Path taskTmpPath) {
-    String taskId = Utilities.getTaskId(jc);
-    this.tmpPath = tmpPath;
-    this.taskTmpPath = taskTmpPath;
-    finalPath = new Path(tmpPath, taskId);
-    outPath = new Path(taskTmpPath, Utilities.toTempPath(taskId));
-  }
-
   @Override
   public void map(Object k, RCFileValueBufferWrapper value,
       OutputCollector<Object, Object> output, Reporter reporter)
@@ -118,35 +54,7 @@ public class RCFileMergeMapper extends M
         key = (RCFileKeyBufferWrapper) k;
       }
 
-      /**
-       * 1. boolean isListBucketingAlterTableConcatenate will be true only if 
it is alter table ...
-       * concatenate on stored-as-dir so it will handle list bucketing alter 
table merge in the if
-       * cause with the help of fixTmpPathConcatenate
-       * 2. If it is DML, isListBucketingAlterTableConcatenate will be false 
so that it will be
-       * handled by else cause. In this else cause, we have another if check.
-       * 2.1 the if check will make sure DP or LB, we will fix path with the 
help of fixTmpPath(..).
-       * Since both has sub-directories. it includes SP + LB.
-       * 2.2 only SP without LB, we dont fix path.
-       */
-      // Fix temp path for alter table ... concatenate
-      if (isListBucketingAlterTableConcatenate) {
-        if (this.tmpPathFixedConcatenate) {
-          checkPartitionsMatch(key.inputPath.getParent());
-        } else {
-          fixTmpPathConcatenate(key.inputPath.getParent());
-          tmpPathFixedConcatenate = true;
-        }
-      } else {
-        if (hasDynamicPartitions || (listBucketingDepth > 0)) {
-          if (tmpPathFixed) {
-            checkPartitionsMatch(key.inputPath.getParent());
-          } else {
-            // We haven't fixed the TMP path for this mapper yet
-            fixTmpPath(key.inputPath.getParent());
-            tmpPathFixed = true;
-          }
-        }
-      }
+      fixTmpPathAlterTable(key.inputPath.getParent());
 
       if (outWriter == null) {
         codec = key.codec;
@@ -172,106 +80,6 @@ public class RCFileMergeMapper extends M
     }
   }
 
-  /**
-   * Validates that each input path belongs to the same partition
-   * since each mapper merges the input to a single output directory
-   *
-   * @param inputPath
-   * @throws HiveException
-   */
-  private void checkPartitionsMatch(Path inputPath) throws HiveException {
-    if (!dpPath.equals(inputPath)) {
-      // Temp partition input path does not match exist temp path
-      String msg = "Multiple partitions for one block merge mapper: " +
-          dpPath + " NOT EQUAL TO " + inputPath;
-      LOG.error(msg);
-      throw new HiveException(msg);
-    }
-  }
-
-  /**
-   * Fixes tmpPath to point to the correct partition.
-   * Before this is called, tmpPath will default to the root tmp table dir
-   * fixTmpPath(..) works for DP + LB + multiple skewed values + merge. reason:
-   * 1. fixTmpPath(..) compares inputPath and tmpDepth, find out path 
difference and put it into
-   * newPath. Then add newpath to existing this.tmpPath and this.taskTmpPath.
-   * 2. The path difference between inputPath and tmpDepth can be DP or DP+LB. 
It will automatically
-   * handle it.
-   * 3. For example,
-   * if inputpath is 
<prefix>/-ext-10002/hr=a1/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/
-   * HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME
-   * tmppath is <prefix>/_tmp.-ext-10000
-   * newpath will be 
hr=a1/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME
-   * Then, this.tmpPath and this.taskTmpPath will be update correctly.
-   * We have list_bucket_dml_6.q cover this case: DP + LP + multiple skewed 
values + merge.
-   * @param inputPath
-   * @throws HiveException
-   * @throws IOException
-   */
-  private void fixTmpPath(Path inputPath)
-      throws HiveException, IOException {
-    dpPath = inputPath;
-    Path newPath = new Path(".");
-    int inputDepth = inputPath.depth();
-    int tmpDepth = tmpPath.depth();
-
-    // Build the path from bottom up
-    while (inputPath != null && inputPath.depth() > tmpDepth) {
-      newPath = new Path(inputPath.getName(), newPath);
-      inputDepth--;
-      inputPath = inputPath.getParent();
-    }
-
-    Path newTmpPath = new Path(tmpPath, newPath);
-    Path newTaskTmpPath = new Path(taskTmpPath, newPath);
-    if (!fs.exists(newTmpPath)) {
-      fs.mkdirs(newTmpPath);
-    }
-    updatePaths(newTmpPath, newTaskTmpPath);
-  }
-
-  /**
-   * Fixes tmpPath to point to the correct list bucketing sub-directories.
-   * Before this is called, tmpPath will default to the root tmp table dir
-   * Reason to add a new method instead of changing fixTmpPath()
-   * Reason 1: logic has slightly difference
-   * fixTmpPath(..) needs 2 variables in order to decide path delta which is 
in variable newPath.
-   * 1. inputPath.depth()
-   * 2. tmpPath.depth()
-   * fixTmpPathConcatenate needs 2 variables too but one of them is different 
from fixTmpPath(..)
-   * 1. inputPath.depth()
-   * 2. listBucketingDepth
-   * Reason 2: less risks
-   * The existing logic is a little not trivial around map() and fixTmpPath(). 
In order to ensure
-   * minimum impact on existing flow, we try to avoid change on existing 
code/flow but add new code
-   * for new feature.
-   *
-   * @param inputPath
-   * @throws HiveException
-   * @throws IOException
-   */
-  private void fixTmpPathConcatenate(Path inputPath)
-      throws HiveException, IOException {
-    dpPath = inputPath;
-    Path newPath = new Path(".");
-
-    int depth = listBucketingDepth;
-    // Build the path from bottom up. pick up list bucketing subdirectories
-    while ((inputPath != null) && (depth > 0)) {
-      newPath = new Path(inputPath.getName(), newPath);
-      inputPath = inputPath.getParent();
-      depth--;
-    }
-
-    Path newTmpPath = new Path(tmpPath, newPath);
-    Path newTaskTmpPath = new Path(taskTmpPath, newPath);
-    if (!fs.exists(newTmpPath)) {
-      fs.mkdirs(newTmpPath);
-    }
-    updatePaths(newTmpPath, newTaskTmpPath);
-  }
-
-
   @Override
   public void close() throws IOException {
     // close writer
@@ -282,42 +90,7 @@ public class RCFileMergeMapper extends M
     outWriter.close();
     outWriter = null;
 
-    if (!exception) {
-      FileStatus fss = fs.getFileStatus(outPath);
-      LOG.info("renamed path " + outPath + " to " + finalPath
-          + " . File size is " + fss.getLen());
-      if (!fs.rename(outPath, finalPath)) {
-        throw new IOException("Unable to rename output to " + finalPath);
-      }
-    } else {
-      if (!autoDelete) {
-        fs.delete(outPath, true);
-      }
-    }
-  }
-
-  public static String BACKUP_PREFIX = "_backup.";
-
-  public static Path backupOutputPath(FileSystem fs, Path outpath, JobConf job)
-      throws IOException, HiveException {
-    if (fs.exists(outpath)) {
-      Path backupPath = new Path(outpath.getParent(), BACKUP_PREFIX
-          + outpath.getName());
-      Utilities.rename(fs, outpath, backupPath);
-      return backupPath;
-    } else {
-      return null;
-    }
-  }
-
-  public static void jobClose(Path outputPath, boolean success, JobConf job,
-      LogHelper console, DynamicPartitionCtx dynPartCtx, Reporter reporter
-      ) throws HiveException, IOException {
-    FileSystem fs = outputPath.getFileSystem(job);
-    Path backupPath = backupOutputPath(fs, outputPath, job);
-    Utilities.mvFileToFinalPath(outputPath, job, success, LOG, dynPartCtx, 
null,
-      reporter);
-    fs.delete(backupPath, true);
+    super.close();
   }
 
 }

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java?rev=1615452&r1=1615451&r2=1615452&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
 Sun Aug  3 20:48:35 2014
@@ -207,6 +207,16 @@ public class DbLockManager implements Hi
     }
   }
 
+  /**
+   * Clear the memory of the locks in this object.  This won't clear the locks 
from the database.
+   * It is for use with
+   * {@link 
#DbLockManager(org.apache.hadoop.hive.metastore.HiveMetaStoreClient).commitTxn} 
and
+   * {@link 
#DbLockManager(org.apache.hadoop.hive.metastore.HiveMetaStoreClient).rollbackTxn}.
+   */
+  void clearLocalLockRecords() {
+    locks.clear();
+  }
+
   // Sleep before we send checkLock again, but do it with a back off
   // off so we don't sit and hammer the metastore in a tight loop
   private void backoff() {

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java?rev=1615452&r1=1615451&r2=1615452&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
 Sun Aug  3 20:48:35 2014
@@ -203,6 +203,7 @@ public class DbTxnManager extends HiveTx
           "transaction");
     }
     try {
+      lockMgr.clearLocalLockRecords();
       LOG.debug("Committing txn " + txnId);
       client.commitTxn(txnId);
     } catch (NoSuchTxnException e) {
@@ -226,6 +227,7 @@ public class DbTxnManager extends HiveTx
           "transaction");
     }
     try {
+      lockMgr.clearLocalLockRecords();
       LOG.debug("Rolling back txn " + txnId);
       client.rollbackTxn(txnId);
     } catch (NoSuchTxnException e) {

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1615452&r1=1615451&r2=1615452&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java 
(original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java 
Sun Aug  3 20:48:35 2014
@@ -43,6 +43,7 @@ import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -141,6 +142,10 @@ public class Hive {
     }
   };
 
+  public static Hive get(Configuration c, Class<?> clazz) throws HiveException 
{
+    return get(c instanceof HiveConf ? (HiveConf)c : new HiveConf(c, clazz));
+  }
+
   /**
    * Gets hive object for the current thread. If one is not initialized then a
    * new one is created If the new configuration is different in metadata conf
@@ -153,20 +158,13 @@ public class Hive {
    *
    */
   public static Hive get(HiveConf c) throws HiveException {
-    boolean needsRefresh = false;
     Hive db = hiveDB.get();
-    if (db != null) {
-      for (HiveConf.ConfVars oneVar : HiveConf.metaVars) {
-        // Since metaVars are all of different types, use string for comparison
-        String oldVar = db.getConf().get(oneVar.varname, "");
-        String newVar = c.get(oneVar.varname, "");
-        if (oldVar.compareToIgnoreCase(newVar) != 0) {
-          needsRefresh = true;
-          break;
-        }
-      }
+    if (db == null ||
+        (db.metaStoreClient != null && 
!db.metaStoreClient.isCompatibleWith(c))) {
+      return get(c, true);
     }
-    return get(c, needsRefresh);
+    db.conf = c;
+    return db;
   }
 
   /**
@@ -195,7 +193,8 @@ public class Hive {
   public static Hive get() throws HiveException {
     Hive db = hiveDB.get();
     if (db == null) {
-      db = new Hive(new HiveConf(Hive.class));
+      SessionState session = SessionState.get();
+      db = new Hive(session == null ? new HiveConf(Hive.class) : 
session.getConf());
       hiveDB.set(db);
     }
     return db;

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=1615452&r1=1615451&r2=1615452&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java 
(original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java 
Sun Aug  3 20:48:35 2014
@@ -933,7 +933,11 @@ public class Table implements Serializab
    * @return include the db name
    */
   public String getCompleteName() {
-    return getDbName() + "@" + getTableName();
+    return getCompleteName(getDbName(), getTableName());
+  }
+
+  public static String getCompleteName(String dbName, String tabName) {
+    return dbName + "@" + tabName;
   }
 
   /**

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java?rev=1615452&r1=1615451&r2=1615452&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java
 Sun Aug  3 20:48:35 2014
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Stack;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileStatus;
@@ -50,6 +51,7 @@ import org.apache.hadoop.hive.ql.parse.Q
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.TableAccessAnalyzer;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -456,10 +458,19 @@ abstract public class AbstractBucketJoin
   public static List<String> toColumns(List<ExprNodeDesc> keys) {
     List<String> columns = new ArrayList<String>();
     for (ExprNodeDesc key : keys) {
-      if (!(key instanceof ExprNodeColumnDesc)) {
+      if (key instanceof ExprNodeColumnDesc) {
+        columns.add(((ExprNodeColumnDesc) key).getColumn());
+      } else if ((key instanceof ExprNodeConstantDesc)) {
+        ExprNodeConstantDesc constant = (ExprNodeConstantDesc) key;
+        String colName = constant.getFoldedFromCol();
+        if (colName == null){
+          return null;
+        } else {
+          columns.add(colName);
+        }
+      } else {
         return null;
       }
-      columns.add(((ExprNodeColumnDesc) key).getColumn());
     }
     return columns;
   }

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java?rev=1615452&r1=1615451&r2=1615452&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java
 Sun Aug  3 20:48:35 2014
@@ -82,9 +82,7 @@ public class ConstantPropagate implement
       //    if the later is enabled.
       return pactx;
     }
-    if (pactx.getConf().getBoolVar(ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)) {
-      return pactx;
-    }
+
     pGraphContext = pactx;
     opToParseCtxMap = pGraphContext.getOpParseCtx();
 

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java?rev=1615452&r1=1615451&r2=1615452&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
 Sun Aug  3 20:48:35 2014
@@ -552,6 +552,7 @@ public final class ConstantPropagateProc
    * conditional expressions and extract assignment expressions and propagate 
them.
    */
   public static class ConstantPropagateFilterProc implements NodeProcessor {
+    @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, 
Object... nodeOutputs)
         throws SemanticException {
       FilterOperator op = (FilterOperator) nd;
@@ -594,6 +595,7 @@ public final class ConstantPropagateProc
    * Node Processor for Constant Propagate for Group By Operators.
    */
   public static class ConstantPropagateGroupByProc implements NodeProcessor {
+    @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, 
Object... nodeOutputs)
         throws SemanticException {
       GroupByOperator op = (GroupByOperator) nd;
@@ -630,6 +632,7 @@ public final class ConstantPropagateProc
    * The Default Node Processor for Constant Propagation.
    */
   public static class ConstantPropagateDefaultProc implements NodeProcessor {
+    @Override
     @SuppressWarnings("unchecked")
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, 
Object... nodeOutputs)
         throws SemanticException {
@@ -658,6 +661,7 @@ public final class ConstantPropagateProc
    * The Node Processor for Constant Propagation for Select Operators.
    */
   public static class ConstantPropagateSelectProc implements NodeProcessor {
+    @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, 
Object... nodeOutputs)
         throws SemanticException {
       SelectOperator op = (SelectOperator) nd;
@@ -691,6 +695,7 @@ public final class ConstantPropagateProc
    * propagation, this processor also prunes dynamic partitions to static 
partitions if possible.
    */
   public static class ConstantPropagateFileSinkProc implements NodeProcessor {
+    @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, 
Object... nodeOutputs)
         throws SemanticException {
       FileSinkOperator op = (FileSinkOperator) nd;
@@ -743,6 +748,7 @@ public final class ConstantPropagateProc
    * Currently these kinds of Operators include UnionOperator and 
ScriptOperator.
    */
   public static class ConstantPropagateStopProc implements NodeProcessor {
+    @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, 
Object... nodeOutputs)
         throws SemanticException {
       Operator<?> op = (Operator<?>) nd;
@@ -763,6 +769,7 @@ public final class ConstantPropagateProc
    * join (left table for left outer join and vice versa) can be propagated.
    */
   public static class ConstantPropagateReduceSinkProc implements NodeProcessor 
{
+    @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, 
Object... nodeOutputs)
         throws SemanticException {
       ReduceSinkOperator op = (ReduceSinkOperator) nd;
@@ -795,7 +802,11 @@ public final class ConstantPropagateProc
       // key columns
       ArrayList<ExprNodeDesc> newKeyEpxrs = new ArrayList<ExprNodeDesc>();
       for (ExprNodeDesc desc : rsDesc.getKeyCols()) {
-        newKeyEpxrs.add(foldExpr(desc, constants, cppCtx, op, 0, false));
+        ExprNodeDesc newDesc = foldExpr(desc, constants, cppCtx, op, 0, false);
+        if (newDesc != desc && desc instanceof ExprNodeColumnDesc && newDesc 
instanceof ExprNodeConstantDesc) {
+          
((ExprNodeConstantDesc)newDesc).setFoldedFromCol(((ExprNodeColumnDesc)desc).getColumn());
+        }
+        newKeyEpxrs.add(newDesc);
       }
       rsDesc.setKeyCols(newKeyEpxrs);
 
@@ -854,6 +865,7 @@ public final class ConstantPropagateProc
    * The Node Processor for Constant Propagation for Join Operators.
    */
   public static class ConstantPropagateJoinProc implements NodeProcessor {
+    @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, 
Object... nodeOutputs)
         throws SemanticException {
       JoinOperator op = (JoinOperator) nd;
@@ -916,6 +928,7 @@ public final class ConstantPropagateProc
    * The Node Processor for Constant Propagation for Table Scan Operators.
    */
   public static class ConstantPropagateTableScanProc implements NodeProcessor {
+    @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, 
Object... nodeOutputs)
         throws SemanticException {
       TableScanOperator op = (TableScanOperator) nd;

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=1615452&r1=1615451&r2=1615452&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
 Sun Aug  3 20:48:35 2014
@@ -98,7 +98,8 @@ public class GenMRFileSink1 implements N
 
     if (chDir) {
       // Merge the files in the destination table/partitions by creating 
Map-only merge job
-      // If underlying data is RCFile a RCFileBlockMerge task would be created.
+      // If underlying data is RCFile or OrcFile, RCFileBlockMerge task or
+      // OrcFileStripeMerge task would be created.
       LOG.info("using CombineHiveInputformat for the merge job");
       GenMapRedUtils.createMRWorkForMergingFiles(fsOp, finalName,
           ctx.getDependencyTaskForMultiInsert(), ctx.getMvTask(),


Reply via email to