Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAccessController.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAccessController.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAccessController.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAccessController.java Tue Oct 14 19:06:45 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -90,42 +91,11 @@ public class SQLStdHiveAccessController HiveAuthenticationProvider authenticator, HiveAuthzSessionContext ctx) throws HiveAuthzPluginException { this.metastoreClientFactory = metastoreClientFactory; this.authenticator = authenticator; - this.sessionCtx = applyTestSettings(ctx, conf); - - assertHiveCliAuthDisabled(conf); - initUserRoles(); + this.sessionCtx = SQLAuthorizationUtils.applyTestSettings(ctx, conf); LOG.info("Created SQLStdHiveAccessController for session context : " + sessionCtx); } /** - * Change the session context based on configuration to aid in testing of sql std auth - * @param ctx - * @param conf - * @return - */ - private HiveAuthzSessionContext applyTestSettings(HiveAuthzSessionContext ctx, HiveConf conf) { - if(conf.getBoolVar(ConfVars.HIVE_TEST_AUTHORIZATION_SQLSTD_HS2_MODE) && - ctx.getClientType() == CLIENT_TYPE.HIVECLI - ){ - // create new session ctx object with HS2 as client type - HiveAuthzSessionContext.Builder ctxBuilder = new HiveAuthzSessionContext.Builder(ctx); - ctxBuilder.setClientType(CLIENT_TYPE.HIVESERVER2); - return ctxBuilder.build(); - } - return ctx; - } - - private void assertHiveCliAuthDisabled(HiveConf conf) throws HiveAuthzPluginException { - if (sessionCtx.getClientType() == CLIENT_TYPE.HIVECLI - && conf.getBoolVar(ConfVars.HIVE_AUTHORIZATION_ENABLED)) { - throw new HiveAuthzPluginException( - "SQL standards based authorization should not be enabled from hive cli" - + "Instead the use of storage based authorization in hive metastore is reccomended. Set " - + ConfVars.HIVE_AUTHORIZATION_ENABLED.varname + "=false to disable authz within cli"); - } - } - - /** * (Re-)initialize currentRoleNames if necessary. * @throws HiveAuthzPluginException */ @@ -381,9 +351,9 @@ public class SQLStdHiveAccessController @Override public List<HiveRoleGrant> getPrincipalGrantInfoForRole(String roleName) throws HiveAuthzPluginException, HiveAccessControlException { // only user belonging to admin role can list role - if (!isUserAdmin()) { + if (!isUserAdmin() && !doesUserHasAdminOption(Arrays.asList(roleName))) { throw new HiveAccessControlException("Current user : " + currentUserName+ " is not" - + " allowed get principals in a role. " + ADMIN_ONLY_MSG); + + " allowed get principals in a role. " + ADMIN_ONLY_MSG + " Otherwise, " + HAS_ADMIN_PRIV_MSG); } try { return getHiveRoleGrants(metastoreClientFactory.getHiveMetastoreClient(), roleName);
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAuthorizationValidator.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAuthorizationValidator.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAuthorizationValidator.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAuthorizationValidator.java Tue Oct 14 19:06:45 2014 @@ -25,12 +25,15 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.ql.security.HiveAuthenticationProvider; import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException; import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizationValidator; import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzContext; import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzPluginException; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzSessionContext; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzSessionContext.CLIENT_TYPE; import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveMetastoreClientFactory; import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType; import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrincipal; @@ -44,16 +47,30 @@ public class SQLStdHiveAuthorizationVali private final HiveConf conf; private final HiveAuthenticationProvider authenticator; private final SQLStdHiveAccessControllerWrapper privController; + private final HiveAuthzSessionContext ctx; public static final Log LOG = LogFactory.getLog(SQLStdHiveAuthorizationValidator.class); public SQLStdHiveAuthorizationValidator(HiveMetastoreClientFactory metastoreClientFactory, HiveConf conf, HiveAuthenticationProvider authenticator, - SQLStdHiveAccessControllerWrapper privilegeManager) { + SQLStdHiveAccessControllerWrapper privilegeManager, HiveAuthzSessionContext ctx) + throws HiveAuthzPluginException { this.metastoreClientFactory = metastoreClientFactory; this.conf = conf; this.authenticator = authenticator; this.privController = privilegeManager; + this.ctx = SQLAuthorizationUtils.applyTestSettings(ctx, conf); + assertHiveCliAuthDisabled(conf); + } + + private void assertHiveCliAuthDisabled(HiveConf conf) throws HiveAuthzPluginException { + if (ctx.getClientType() == CLIENT_TYPE.HIVECLI + && conf.getBoolVar(ConfVars.HIVE_AUTHORIZATION_ENABLED)) { + throw new HiveAuthzPluginException( + "SQL standards based authorization should not be enabled from hive cli" + + "Instead the use of storage based authorization in hive metastore is reccomended. Set " + + ConfVars.HIVE_AUTHORIZATION_ENABLED.varname + "=false to disable authz within cli"); + } } @Override Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAuthorizerFactory.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAuthorizerFactory.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAuthorizerFactory.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAuthorizerFactory.java Tue Oct 14 19:06:45 2014 @@ -37,7 +37,7 @@ public class SQLStdHiveAuthorizerFactory return new HiveAuthorizerImpl( privilegeManager, new SQLStdHiveAuthorizationValidator(metastoreClientFactory, conf, authenticator, - privilegeManager) + privilegeManager, ctx) ); } } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java Tue Oct 14 19:06:45 2014 @@ -515,16 +515,17 @@ public class SessionState { */ private Path createRootHDFSDir(HiveConf conf) throws IOException { Path rootHDFSDirPath = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR)); - FsPermission expectedHDFSDirPermission = new FsPermission("777"); + FsPermission writableHDFSDirPermission = new FsPermission((short)00733); FileSystem fs = rootHDFSDirPath.getFileSystem(conf); if (!fs.exists(rootHDFSDirPath)) { - Utilities.createDirsWithPermission(conf, rootHDFSDirPath, expectedHDFSDirPermission, true); + Utilities.createDirsWithPermission(conf, rootHDFSDirPath, writableHDFSDirPermission, true); } FsPermission currentHDFSDirPermission = fs.getFileStatus(rootHDFSDirPath).getPermission(); LOG.debug("HDFS root scratch dir: " + rootHDFSDirPath + ", permission: " + currentHDFSDirPermission); - // If the root HDFS scratch dir already exists, make sure the permissions are 777. - if (!expectedHDFSDirPermission.equals(fs.getFileStatus(rootHDFSDirPath).getPermission())) { + // If the root HDFS scratch dir already exists, make sure it is writeable. + if (!((currentHDFSDirPermission.toShort() & writableHDFSDirPermission + .toShort()) == writableHDFSDirPermission.toShort())) { throw new RuntimeException("The root scratch dir: " + rootHDFSDirPath + " on HDFS should be writable. Current permissions are: " + currentHDFSDirPermission); } @@ -1244,7 +1245,7 @@ public class SessionState { try { if (tezSessionState != null) { - TezSessionPoolManager.getInstance().close(tezSessionState); + TezSessionPoolManager.getInstance().close(tezSessionState, false); } } catch (Exception e) { LOG.info("Error closing tez session", e); Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java Tue Oct 14 19:06:45 2014 @@ -18,10 +18,18 @@ package org.apache.hadoop.hive.ql.stats; -import com.google.common.base.Joiner; -import com.google.common.collect.Lists; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.StatsSetupConst; @@ -79,20 +87,16 @@ import org.apache.hadoop.hive.serde2.obj import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableTimestampObjectInspector; import org.apache.hadoop.io.BytesWritable; +import org.apache.tez.mapreduce.hadoop.MRJobConfig; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; public class StatsUtils { private static final Log LOG = LogFactory.getLog(StatsUtils.class.getName()); + /** * Collect table, partition and column level statistics * @param conf @@ -109,15 +113,34 @@ public class StatsUtils { public static Statistics collectStatistics(HiveConf conf, PrunedPartitionList partList, Table table, TableScanOperator tableScanOperator) throws HiveException { - Statistics stats = new Statistics(); - // column level statistics are required only for the columns that are needed List<ColumnInfo> schema = tableScanOperator.getSchema().getSignature(); List<String> neededColumns = tableScanOperator.getNeededColumns(); + List<String> referencedColumns = tableScanOperator.getReferencedColumns(); + + return collectStatistics(conf, partList, table, schema, neededColumns, referencedColumns); + } + + private static Statistics collectStatistics(HiveConf conf, PrunedPartitionList partList, + Table table, List<ColumnInfo> schema, List<String> neededColumns, + List<String> referencedColumns) throws HiveException { + boolean fetchColStats = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_FETCH_COLUMN_STATS); boolean fetchPartStats = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_FETCH_PARTITION_STATS); + + return collectStatistics(conf, partList, table, schema, neededColumns, referencedColumns, + fetchColStats, fetchPartStats); + } + + public static Statistics collectStatistics(HiveConf conf, PrunedPartitionList partList, + Table table, List<ColumnInfo> schema, List<String> neededColumns, + List<String> referencedColumns, boolean fetchColStats, boolean fetchPartStats) + throws HiveException { + + Statistics stats = new Statistics(); + float deserFactor = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVE_STATS_DESERIALIZATION_FACTOR); @@ -207,7 +230,6 @@ public class StatsUtils { stats.getBasicStatsState().equals(State.COMPLETE)) { stats.setBasicStatsState(State.PARTIAL); } - boolean haveFullStats = fetchColStats; if (fetchColStats) { List<String> partNames = new ArrayList<String>(partList.getNotDeniedPartns().size()); for (Partition part : partList.getNotDeniedPartns()) { @@ -215,37 +237,84 @@ public class StatsUtils { } Map<String, String> colToTabAlias = new HashMap<String, String>(); neededColumns = processNeededColumns(schema, neededColumns, colToTabAlias); - AggrStats aggrStats = Hive.get().getAggrColStatsFor(table.getDbName(), table.getTableName(), neededColumns, partNames); + AggrStats aggrStats = Hive.get().getAggrColStatsFor(table.getDbName(), table.getTableName(), + neededColumns, partNames); if (null == aggrStats) { - haveFullStats = false; + // There are some partitions with no state (or we didn't fetch any state). + // Update the stats with empty list to reflect that in the + // state/initialize structures. + List<ColStatistics> emptyStats = Lists.newArrayList(); + + // add partition column stats + addParitionColumnStats(neededColumns, referencedColumns, schema, table, partList, + emptyStats); + + stats.addToColumnStats(emptyStats); + stats.updateColumnStatsState(deriveStatType(emptyStats, referencedColumns)); } else { List<ColumnStatisticsObj> colStats = aggrStats.getColStats(); if (colStats.size() != neededColumns.size()) { - LOG.debug("Column stats requested for : " + neededColumns.size() + " columns. Able to retrieve" - + " for " + colStats.size() + " columns"); + LOG.debug("Column stats requested for : " + neededColumns.size() + " columns. Able to" + + " retrieve for " + colStats.size() + " columns"); } - List<ColStatistics> columnStats = convertColStats(colStats, table.getTableName(), colToTabAlias); + List<ColStatistics> columnStats = convertColStats(colStats, table.getTableName(), + colToTabAlias); + + addParitionColumnStats(neededColumns, referencedColumns, schema, table, partList, + columnStats); + stats.addToColumnStats(columnStats); - State colState = deriveStatType(columnStats, neededColumns); + State colState = deriveStatType(columnStats, referencedColumns); if (aggrStats.getPartsFound() != partNames.size() && colState != State.NONE) { - LOG.debug("Column stats requested for : " + partNames.size() +" partitions. " - + "Able to retrieve for " + aggrStats.getPartsFound() + " partitions"); + LOG.debug("Column stats requested for : " + partNames.size() + " partitions. " + + "Able to retrieve for " + aggrStats.getPartsFound() + " partitions"); colState = State.PARTIAL; } stats.setColumnStatsState(colState); } } - // There are some partitions with no state (or we didn't fetch any state). - // Update the stats with empty list to reflect that in the state/initialize structures. - if (!haveFullStats) { - List<ColStatistics> emptyStats = Lists.<ColStatistics>newArrayList(); - stats.addToColumnStats(emptyStats); - stats.updateColumnStatsState(deriveStatType(emptyStats, neededColumns)); - } } return stats; } + private static void addParitionColumnStats(List<String> neededColumns, + List<String> referencedColumns, List<ColumnInfo> schema, Table table, + PrunedPartitionList partList, List<ColStatistics> colStats) + throws HiveException { + + // extra columns is difference between referenced columns vs needed + // columns. The difference could be partition columns. + List<String> extraCols = Lists.newArrayList(referencedColumns); + if (referencedColumns.size() > neededColumns.size()) { + extraCols.removeAll(neededColumns); + for (String col : extraCols) { + for (ColumnInfo ci : schema) { + // conditions for being partition column + if (col.equals(ci.getInternalName()) && ci.getIsVirtualCol() && + !ci.isHiddenVirtualCol()) { + // currently metastore does not store column stats for + // partition column, so we calculate the NDV from pruned + // partition list + ColStatistics partCS = new ColStatistics(table.getTableName(), + ci.getInternalName(), ci.getType().getTypeName()); + long numPartitions = getNDVPartitionColumn(partList.getPartitions(), + ci.getInternalName()); + partCS.setCountDistint(numPartitions); + colStats.add(partCS); + } + } + } + } + } + + public static int getNDVPartitionColumn(Set<Partition> partitions, String partColName) { + Set<String> distinctVals = new HashSet<String>(partitions.size()); + for (Partition partition : partitions) { + distinctVals.add(partition.getSpec().get(partColName)); + } + return distinctVals.size(); + } + private static void setUnknownRcDsToAverage( List<Long> rowCounts, List<Long> dataSizes, int avgRowSize) { if (LOG.isDebugEnabled()) { @@ -751,7 +820,8 @@ public class StatsUtils { || colType.equalsIgnoreCase(serdeConstants.FLOAT_TYPE_NAME)) { return JavaDataModel.get().primitive1(); } else if (colType.equalsIgnoreCase(serdeConstants.DOUBLE_TYPE_NAME) - || colType.equalsIgnoreCase(serdeConstants.BIGINT_TYPE_NAME)) { + || colType.equalsIgnoreCase(serdeConstants.BIGINT_TYPE_NAME) + || colType.equalsIgnoreCase("long")) { return JavaDataModel.get().primitive2(); } else if (colType.equalsIgnoreCase(serdeConstants.TIMESTAMP_TYPE_NAME)) { return JavaDataModel.get().lengthOfTimestamp(); @@ -780,7 +850,8 @@ public class StatsUtils { return JavaDataModel.get().lengthForIntArrayOfSize(length); } else if (colType.equalsIgnoreCase(serdeConstants.DOUBLE_TYPE_NAME)) { return JavaDataModel.get().lengthForDoubleArrayOfSize(length); - } else if (colType.equalsIgnoreCase(serdeConstants.BIGINT_TYPE_NAME)) { + } else if (colType.equalsIgnoreCase(serdeConstants.BIGINT_TYPE_NAME) + || colType.equalsIgnoreCase("long")) { return JavaDataModel.get().lengthForLongArrayOfSize(length); } else if (colType.equalsIgnoreCase(serdeConstants.BINARY_TYPE_NAME)) { return JavaDataModel.get().lengthForByteArrayOfSize(length); @@ -876,7 +947,7 @@ public class StatsUtils { Statistics parentStats, Map<String, ExprNodeDesc> colExprMap, RowSchema rowSchema) { List<ColStatistics> cs = Lists.newArrayList(); - if (colExprMap != null) { + if (colExprMap != null && rowSchema != null) { for (ColumnInfo ci : rowSchema.getSignature()) { String outColName = ci.getInternalName(); outColName = StatsUtils.stripPrefixFromColumnName(outColName); @@ -1042,10 +1113,8 @@ public class StatsUtils { /** * Get basic stats of table - * @param dbName - * - database name - * @param tabName - * - table name + * @param table + * - table * @param statType * - type of stats * @return value of stats @@ -1283,4 +1352,11 @@ public class StatsUtils { } } + + public static long getAvailableMemory(Configuration conf) { + int memory = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) > 0 ? + HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) : + conf.getInt(MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB); + return memory; + } } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java Tue Oct 14 19:06:45 2014 @@ -24,15 +24,12 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidTxnListImpl; -import org.apache.hadoop.hive.metastore.api.LockComponent; -import org.apache.hadoop.hive.metastore.api.LockLevel; -import org.apache.hadoop.hive.metastore.api.LockRequest; -import org.apache.hadoop.hive.metastore.api.LockResponse; -import org.apache.hadoop.hive.metastore.api.LockState; -import org.apache.hadoop.hive.metastore.api.LockType; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.UnlockRequest; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.security.UserGroupInformation; @@ -41,7 +38,12 @@ import org.apache.hadoop.util.StringUtil import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; /** * A class to clean directories after compactions. This will run in a separate thread. @@ -50,35 +52,85 @@ public class Cleaner extends CompactorTh static final private String CLASS_NAME = Cleaner.class.getName(); static final private Log LOG = LogFactory.getLog(CLASS_NAME); - private long cleanerCheckInterval = 5000; + private long cleanerCheckInterval = 0; + + // List of compactions to clean. + private Map<Long, Set<Long>> compactId2LockMap = new HashMap<Long, Set<Long>>(); + private Map<Long, CompactionInfo> compactId2CompactInfoMap = new HashMap<Long, CompactionInfo>(); @Override public void run() { - // Make sure nothing escapes this run method and kills the metastore at large, - // so wrap it in a big catch Throwable statement. + if (cleanerCheckInterval == 0) { + cleanerCheckInterval = conf.getTimeVar( + HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS); + } + do { + // This is solely for testing. It checks if the test has set the looped value to false, + // and if so remembers that and then sets it to true at the end. We have to check here + // first to make sure we go through a complete iteration of the loop before resetting it. + boolean setLooped = !looped.boolVal; + // Make sure nothing escapes this run method and kills the metastore at large, + // so wrap it in a big catch Throwable statement. try { long startedAt = System.currentTimeMillis(); - // Now look for new entries ready to be cleaned. + // First look for all the compactions that are waiting to be cleaned. If we have not + // seen an entry before, look for all the locks held on that table or partition and + // record them. We will then only clean the partition once all of those locks have been + // released. This way we avoid removing the files while they are in use, + // while at the same time avoiding starving the cleaner as new readers come along. + // This works because we know that any reader who comes along after the worker thread has + // done the compaction will read the more up to date version of the data (either in a + // newer delta or in a newer base). List<CompactionInfo> toClean = txnHandler.findReadyToClean(); - for (CompactionInfo ci : toClean) { - LockComponent comp = null; - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, ci.dbname); - comp.setTablename(ci.tableName); - if (ci.partName != null) comp.setPartitionname(ci.partName); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest rqst = new LockRequest(components, System.getProperty("user.name"), - Worker.hostname()); - LockResponse rsp = txnHandler.lockNoWait(rqst); + if (toClean.size() > 0 || compactId2LockMap.size() > 0) { + ShowLocksResponse locksResponse = txnHandler.showLocks(new ShowLocksRequest()); + + for (CompactionInfo ci : toClean) { + // Check to see if we have seen this request before. If so, ignore it. If not, + // add it to our queue. + if (!compactId2LockMap.containsKey(ci.id)) { + compactId2LockMap.put(ci.id, findRelatedLocks(ci, locksResponse)); + compactId2CompactInfoMap.put(ci.id, ci); + } + } + + // Now, for each entry in the queue, see if all of the associated locks are clear so we + // can clean + Set<Long> currentLocks = buildCurrentLockSet(locksResponse); + List<Long> expiredLocks = new ArrayList<Long>(); + List<Long> compactionsCleaned = new ArrayList<Long>(); try { - if (rsp.getState() == LockState.ACQUIRED) { - clean(ci); + for (Map.Entry<Long, Set<Long>> queueEntry : compactId2LockMap.entrySet()) { + boolean sawLock = false; + for (Long lockId : queueEntry.getValue()) { + if (currentLocks.contains(lockId)) { + sawLock = true; + break; + } else { + expiredLocks.add(lockId); + } + } + + if (!sawLock) { + // Remember to remove this when we're out of the loop, + // we can't do it in the loop or we'll get a concurrent modification exception. + compactionsCleaned.add(queueEntry.getKey()); + clean(compactId2CompactInfoMap.get(queueEntry.getKey())); + } else { + // Remove the locks we didn't see so we don't look for them again next time + for (Long lockId : expiredLocks) { + queueEntry.getValue().remove(lockId); + } + } } } finally { - if (rsp.getState() == LockState.ACQUIRED) { - txnHandler.unlock(new UnlockRequest(rsp.getLockid())); + if (compactionsCleaned.size() > 0) { + for (Long compactId : compactionsCleaned) { + compactId2LockMap.remove(compactId); + compactId2CompactInfoMap.remove(compactId); + } } } } @@ -91,9 +143,37 @@ public class Cleaner extends CompactorTh LOG.error("Caught an exception in the main loop of compactor cleaner, " + StringUtils.stringifyException(t)); } + if (setLooped) { + looped.boolVal = true; + } } while (!stop.boolVal); } + private Set<Long> findRelatedLocks(CompactionInfo ci, ShowLocksResponse locksResponse) { + Set<Long> relatedLocks = new HashSet<Long>(); + for (ShowLocksResponseElement lock : locksResponse.getLocks()) { + if (ci.dbname.equals(lock.getDbname())) { + if ((ci.tableName == null && lock.getTablename() == null) || + (ci.tableName != null && ci.tableName.equals(lock.getTablename()))) { + if ((ci.partName == null && lock.getPartname() == null) || + (ci.partName != null && ci.partName.equals(lock.getPartname()))) { + relatedLocks.add(lock.getLockid()); + } + } + } + } + + return relatedLocks; + } + + private Set<Long> buildCurrentLockSet(ShowLocksResponse locksResponse) { + Set<Long> currentLocks = new HashSet<Long>(locksResponse.getLocks().size()); + for (ShowLocksResponseElement lock : locksResponse.getLocks()) { + currentLocks.add(lock.getLockid()); + } + return currentLocks; + } + private void clean(CompactionInfo ci) throws MetaException { LOG.info("Starting cleaning for " + ci.getFullPartitionName()); try { Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java Tue Oct 14 19:06:45 2014 @@ -506,13 +506,15 @@ public class CompactorMR { ValidTxnList txnList = new ValidTxnListImpl(jobConf.get(ValidTxnList.VALID_TXNS_KEY)); + boolean isMajor = jobConf.getBoolean(IS_MAJOR, false); AcidInputFormat.RawReader<V> reader = - aif.getRawReader(jobConf, jobConf.getBoolean(IS_MAJOR, false), split.getBucket(), + aif.getRawReader(jobConf, isMajor, split.getBucket(), txnList, split.getBaseDir(), split.getDeltaDirs()); RecordIdentifier identifier = reader.createKey(); V value = reader.createValue(); getWriter(reporter, reader.getObjectInspector(), split.getBucket()); while (reader.next(identifier, value)) { + if (isMajor && reader.isDelete(value)) continue; writer.write(value); reporter.progress(); } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java Tue Oct 14 19:06:45 2014 @@ -53,6 +53,7 @@ abstract class CompactorThread extends T protected RawStore rs; protected int threadId; protected BooleanPointer stop; + protected BooleanPointer looped; @Override public void setHiveConf(HiveConf conf) { @@ -66,8 +67,9 @@ abstract class CompactorThread extends T } @Override - public void init(BooleanPointer stop) throws MetaException { + public void init(BooleanPointer stop, BooleanPointer looped) throws MetaException { this.stop = stop; + this.looped = looped; setPriority(MIN_PRIORITY); setDaemon(true); // this means the process will exit without waiting for this thread Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java Tue Oct 14 19:06:45 2014 @@ -76,7 +76,7 @@ public class Initiator extends Compactor // don't doom the entire thread. try { ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest()); - ValidTxnList txns = TxnHandler.createValidTxnList(txnHandler.getOpenTxns()); + ValidTxnList txns = TxnHandler.createValidTxnList(txnHandler.getOpenTxns(), 0); Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreshold); LOG.debug("Found " + potentials.size() + " potential compactions, " + "checking to see if we should compact any of them"); @@ -137,8 +137,8 @@ public class Initiator extends Compactor } @Override - public void init(BooleanPointer stop) throws MetaException { - super.init(stop); + public void init(BooleanPointer stop, BooleanPointer looped) throws MetaException { + super.init(stop, looped); checkInterval = conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL, TimeUnit.MILLISECONDS) ; } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java Tue Oct 14 19:06:45 2014 @@ -120,7 +120,7 @@ public class Worker extends CompactorThr final boolean isMajor = ci.isMajorCompaction(); final ValidTxnList txns = - TxnHandler.createValidTxnList(txnHandler.getOpenTxns()); + TxnHandler.createValidTxnList(txnHandler.getOpenTxns(), 0); final StringBuffer jobName = new StringBuffer(name); jobName.append("-compactor-"); jobName.append(ci.getFullPartitionName()); @@ -168,8 +168,8 @@ public class Worker extends CompactorThr } @Override - public void init(BooleanPointer stop) throws MetaException { - super.init(stop); + public void init(BooleanPointer stop, BooleanPointer looped) throws MetaException { + super.init(stop, looped); StringBuilder name = new StringBuilder(hostname()); name.append("-"); Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFLog.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFLog.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFLog.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFLog.java Tue Oct 14 19:06:45 2014 @@ -52,29 +52,6 @@ public class UDFLog extends UDFMath { } /** - * Get the logarithm of the given decimal with the given base. - */ - public DoubleWritable evaluate(DoubleWritable base, HiveDecimalWritable writable) { - if (base == null || writable == null) { - return null; - } - double d = writable.getHiveDecimal().bigDecimalValue().doubleValue(); - return log(base.get(), d); - } - - /** - * Get the logarithm of input with the given decimal as the base. - */ - public DoubleWritable evaluate(HiveDecimalWritable base, DoubleWritable d) { - if (base == null || d == null) { - return null; - } - - double b = base.getHiveDecimal().bigDecimalValue().doubleValue(); - return log(b, d.get()); - } - - /** * Get the logarithm of the given decimal input with the given decimal base. */ public DoubleWritable evaluate(HiveDecimalWritable baseWritable, HiveDecimalWritable writable) { Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBaseNumeric.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBaseNumeric.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBaseNumeric.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBaseNumeric.java Tue Oct 14 19:06:45 2014 @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.exec.No import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; @@ -44,6 +45,7 @@ import org.apache.hadoop.hive.serde2.obj import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.PrimitiveGrouping; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; @@ -318,4 +320,17 @@ public abstract class GenericUDFBaseNume public void setAnsiSqlArithmetic(boolean ansiSqlArithmetic) { this.ansiSqlArithmetic = ansiSqlArithmetic; } + + public PrimitiveTypeInfo deriveMinArgumentCast( + ExprNodeDesc childExpr, TypeInfo targetType) { + assert targetType instanceof PrimitiveTypeInfo : "Not a primitive type" + targetType; + PrimitiveTypeInfo pti = (PrimitiveTypeInfo)targetType; + // We only do the minimum cast for decimals. Other types are assumed safe; fix if needed. + // We also don't do anything for non-primitive children (maybe we should assert). + if ((pti.getPrimitiveCategory() != PrimitiveCategory.DECIMAL) + || (!(childExpr.getTypeInfo() instanceof PrimitiveTypeInfo))) return pti; + PrimitiveTypeInfo childTi = (PrimitiveTypeInfo)childExpr.getTypeInfo(); + // If the child is also decimal, no cast is needed (we hope - can target type be narrower?). + return HiveDecimalUtils.getDecimalTypeForPrimitiveCategory(childTi); + } } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFFromUtcTimestamp.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFFromUtcTimestamp.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFFromUtcTimestamp.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFFromUtcTimestamp.java Tue Oct 14 19:06:45 2014 @@ -22,6 +22,7 @@ import java.util.TimeZone; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -33,7 +34,9 @@ import org.apache.hadoop.hive.serde2.obj import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.io.Text; - +@Description(name = "from_utc_timestamp", + value = "from_utc_timestamp(timestamp, string timezone) - " + + "Assumes given timestamp ist UTC and converts to given timezone (as of Hive 0.8.0)") public class GenericUDFFromUtcTimestamp extends GenericUDF { static final Log LOG = LogFactory.getLog(GenericUDFFromUtcTimestamp.class); Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFIf.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFIf.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFIf.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFIf.java Tue Oct 14 19:06:45 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.udf.generic; +import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; @@ -62,6 +63,11 @@ import org.apache.hadoop.hive.ql.exec.ve * otherwise it returns expr3. IF() returns a numeric or string value, depending * on the context in which it is used. */ +@Description( + name = "if", + value = "IF(expr1,expr2,expr3) - If expr1 is TRUE (expr1 <> 0 and expr1 <> NULL) then" + + " IF() returns expr2; otherwise it returns expr3. IF() returns a numeric or string value," + + " depending on the context in which it is used.") @VectorizedExpressions({ IfExprLongColumnLongColumn.class, IfExprDoubleColumnDoubleColumn.class, IfExprLongColumnLongScalar.class, IfExprDoubleColumnDoubleScalar.class, Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFIndex.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFIndex.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFIndex.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFIndex.java Tue Oct 14 19:06:45 2014 @@ -26,9 +26,11 @@ import org.apache.hadoop.hive.ql.metadat import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.io.IntWritable; /** * GenericUDFIndex. @@ -36,11 +38,10 @@ import org.apache.hadoop.hive.serde2.obj */ @Description(name = "index", value = "_FUNC_(a, n) - Returns the n-th element of a ") public class GenericUDFIndex extends GenericUDF { + private transient MapObjectInspector mapOI; - private boolean mapKeyPreferWritable; private transient ListObjectInspector listOI; - private transient PrimitiveObjectInspector indexOI; - private transient ObjectInspector returnOI; + private transient ObjectInspectorConverters.Converter converter; @Override public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { @@ -66,21 +67,22 @@ public class GenericUDFIndex extends Gen } // index has to be a primitive - if (arguments[1] instanceof PrimitiveObjectInspector) { - indexOI = (PrimitiveObjectInspector) arguments[1]; - } else { + if (!(arguments[1] instanceof PrimitiveObjectInspector)) { throw new UDFArgumentTypeException(1, "Primitive Type is expected but " + arguments[1].getTypeName() + "\" is found"); } - + PrimitiveObjectInspector inputOI = (PrimitiveObjectInspector) arguments[1]; + ObjectInspector returnOI; + ObjectInspector indexOI; if (mapOI != null) { + indexOI = ObjectInspectorConverters.getConvertedOI( + inputOI, mapOI.getMapKeyObjectInspector()); returnOI = mapOI.getMapValueObjectInspector(); - ObjectInspector keyOI = mapOI.getMapKeyObjectInspector(); - mapKeyPreferWritable = ((PrimitiveObjectInspector) keyOI) - .preferWritable(); } else { + indexOI = PrimitiveObjectInspectorFactory.writableIntObjectInspector; returnOI = listOI.getListElementObjectInspector(); } + converter = ObjectInspectorConverters.getConverter(inputOI, indexOI); return returnOI; } @@ -88,35 +90,16 @@ public class GenericUDFIndex extends Gen @Override public Object evaluate(DeferredObject[] arguments) throws HiveException { assert (arguments.length == 2); - Object main = arguments[0].get(); Object index = arguments[1].get(); + Object indexObject = converter.convert(index); + if (indexObject == null) { + return null; + } if (mapOI != null) { - - Object indexObject; - if (mapKeyPreferWritable) { - indexObject = indexOI.getPrimitiveWritableObject(index); - } else { - indexObject = indexOI.getPrimitiveJavaObject(index); - } - return mapOI.getMapValueElement(main, indexObject); - - } else { - - assert (listOI != null); - int intIndex = 0; - try { - intIndex = PrimitiveObjectInspectorUtils.getInt(index, indexOI); - } catch (NullPointerException e) { - // If index is null, we should return null. - return null; - } catch (NumberFormatException e) { - // If index is not a number, we should return null. - return null; - } - return listOI.getListElement(main, intIndex); - + return mapOI.getMapValueElement(arguments[0].get(), indexObject); } + return listOI.getListElement(arguments[0].get(), ((IntWritable)indexObject).get()); } @Override Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFTimestamp.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFTimestamp.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFTimestamp.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFTimestamp.java Tue Oct 14 19:06:45 2014 @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.udf.generic; +import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.exec.vector.VectorizedExpressions; @@ -39,6 +40,8 @@ import org.apache.hadoop.hive.serde2.obj * Creates a TimestampWritable object using PrimitiveObjectInspectorConverter * */ +@Description(name = "timestamp", +value = "cast(date as timestamp) - Returns timestamp") @VectorizedExpressions({CastLongToTimestampViaLongToLong.class, CastDoubleToTimestampViaDoubleToLong.class, CastDecimalToTimestamp.class}) public class GenericUDFTimestamp extends GenericUDF { Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFToUtcTimestamp.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFToUtcTimestamp.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFToUtcTimestamp.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFToUtcTimestamp.java Tue Oct 14 19:06:45 2014 @@ -17,7 +17,11 @@ */ package org.apache.hadoop.hive.ql.udf.generic; +import org.apache.hadoop.hive.ql.exec.Description; +@Description(name = "to_utc_timestamp", + value = "to_utc_timestamp(timestamp, string timezone) - " + + "Assumes given timestamp is in given timezone and converts to UTC (as of Hive 0.8.0)") public class GenericUDFToUtcTimestamp extends GenericUDFFromUtcTimestamp { Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFunctionRegistry.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFunctionRegistry.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFunctionRegistry.java (original) +++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFunctionRegistry.java Tue Oct 14 19:06:45 2014 @@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.exec; import java.lang.reflect.Method; import java.util.ArrayList; -import java.util.Arrays; import java.util.LinkedList; import java.util.List; @@ -80,7 +79,7 @@ public class TestFunctionRegistry extend } private void implicit(TypeInfo a, TypeInfo b, boolean convertible) { - assertEquals(convertible, FunctionRegistry.implicitConvertable(a,b)); + assertEquals(convertible, FunctionRegistry.implicitConvertible(a, b)); } public void testImplicitConversion() { Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java (original) +++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java Tue Oct 14 19:06:45 2014 @@ -18,17 +18,24 @@ package org.apache.hadoop.hive.ql.exec; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import junit.framework.TestCase; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.io.IOContext; import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory; import org.apache.hadoop.hive.ql.plan.CollectDesc; @@ -42,6 +49,10 @@ import org.apache.hadoop.hive.ql.plan.Pl import org.apache.hadoop.hive.ql.plan.ScriptDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.processors.CommandProcessor; +import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; @@ -49,8 +60,14 @@ import org.apache.hadoop.hive.serde2.obj import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TextInputFormat; +import org.junit.Test; /** * TestOperators. @@ -274,7 +291,7 @@ public class TestOperators extends TestC cd, sop); op.initialize(new JobConf(TestOperators.class), - new ObjectInspector[] {r[0].oi}); + new ObjectInspector[]{r[0].oi}); // evaluate on row for (int i = 0; i < 5; i++) { @@ -314,7 +331,8 @@ public class TestOperators extends TestC Configuration hconf = new JobConf(TestOperators.class); HiveConf.setVar(hconf, HiveConf.ConfVars.HADOOPMAPFILENAME, "hdfs:///testDir/testFile"); - IOContext.get().setInputPath(new Path("hdfs:///testDir/testFile")); + IOContext.get(hconf.get(Utilities.INPUT_NAME)).setInputPath( + new Path("hdfs:///testDir/testFile")); // initialize pathToAliases ArrayList<String> aliases = new ArrayList<String>(); @@ -379,4 +397,82 @@ public class TestOperators extends TestC throw (e); } } + + @Test + public void testFetchOperatorContextQuoting() throws Exception { + JobConf conf = new JobConf(); + ArrayList<Path> list = new ArrayList<Path>(); + list.add(new Path("hdfs://nn.example.com/fi\tl\\e\t1")); + list.add(new Path("hdfs://nn.example.com/file\t2")); + list.add(new Path("file:/file3")); + FetchOperator.setFetchOperatorContext(conf, list); + String[] parts = + conf.get(FetchOperator.FETCH_OPERATOR_DIRECTORY_LIST).split("\t"); + assertEquals(3, parts.length); + assertEquals("hdfs://nn.example.com/fi\\tl\\\\e\\t1", parts[0]); + assertEquals("hdfs://nn.example.com/file\\t2", parts[1]); + assertEquals("file:/file3", parts[2]); + } + + /** + * A custom input format that checks to make sure that the fetch operator + * sets the required attributes. + */ + public static class CustomInFmt extends TextInputFormat { + + @Override + public InputSplit[] getSplits(JobConf job, int splits) throws IOException { + + // ensure that the table properties were copied + assertEquals("val1", job.get("myprop1")); + assertEquals("val2", job.get("myprop2")); + + // ensure that both of the partitions are in the complete list. + String[] dirs = job.get("hive.complete.dir.list").split("\t"); + assertEquals(2, dirs.length); + assertEquals(true, dirs[0].endsWith("/state=CA")); + assertEquals(true, dirs[1].endsWith("/state=OR")); + return super.getSplits(job, splits); + } + } + + @Test + public void testFetchOperatorContext() throws Exception { + HiveConf conf = new HiveConf(); + conf.set("hive.support.concurrency", "false"); + SessionState.start(conf); + String cmd = "create table fetchOp (id int, name string) " + + "partitioned by (state string) " + + "row format delimited fields terminated by '|' " + + "stored as " + + "inputformat 'org.apache.hadoop.hive.ql.exec.TestOperators$CustomInFmt' " + + "outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' " + + "tblproperties ('myprop1'='val1', 'myprop2' = 'val2')"; + Driver driver = new Driver(); + driver.init(); + CommandProcessorResponse response = driver.run(cmd); + assertEquals(0, response.getResponseCode()); + List<Object> result = new ArrayList<Object>(); + + cmd = "load data local inpath '../data/files/employee.dat' " + + "overwrite into table fetchOp partition (state='CA')"; + driver.init(); + response = driver.run(cmd); + assertEquals(0, response.getResponseCode()); + + cmd = "load data local inpath '../data/files/employee2.dat' " + + "overwrite into table fetchOp partition (state='OR')"; + driver.init(); + response = driver.run(cmd); + assertEquals(0, response.getResponseCode()); + + cmd = "select * from fetchOp"; + driver.init(); + driver.setMaxRows(500); + response = driver.run(cmd); + assertEquals(0, response.getResponseCode()); + driver.getResults(result); + assertEquals(20, result.size()); + driver.close(); + } } Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java (original) +++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java Tue Oct 14 19:06:45 2014 @@ -26,6 +26,7 @@ import java.util.Random; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import org.apache.hadoop.hive.conf.HiveConf; public class TestTezSessionPool { @@ -157,4 +158,29 @@ public class TestTezSessionPool { } } } + + @Test + public void testCloseAndOpenDefault() throws Exception { + poolManager = new TestTezSessionPoolManager(); + TezSessionState session = Mockito.mock(TezSessionState.class); + Mockito.when(session.isDefault()).thenReturn(false); + + poolManager.closeAndOpen(session, conf, false); + + Mockito.verify(session).close(false); + Mockito.verify(session).open(conf, null); + } + + @Test + public void testCloseAndOpenWithResources() throws Exception { + poolManager = new TestTezSessionPoolManager(); + TezSessionState session = Mockito.mock(TezSessionState.class); + Mockito.when(session.isDefault()).thenReturn(false); + String[] extraResources = new String[] { "file:///tmp/foo.jar" }; + + poolManager.closeAndOpen(session, conf, extraResources, false); + + Mockito.verify(session).close(false); + Mockito.verify(session).open(conf, extraResources); + } } Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java (original) +++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java Tue Oct 14 19:06:45 2014 @@ -30,9 +30,11 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; -import java.util.LinkedList; import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -48,6 +50,7 @@ import org.apache.hadoop.hive.ql.plan.Re import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.plan.TezWork.VertexType; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.mapred.JobConf; @@ -90,8 +93,11 @@ public class TestTezTask { path = mock(Path.class); when(path.getFileSystem(any(Configuration.class))).thenReturn(fs); when(utils.getTezDir(any(Path.class))).thenReturn(path); - when(utils.createVertex(any(JobConf.class), any(BaseWork.class), any(Path.class), any(LocalResource.class), - any(List.class), any(FileSystem.class), any(Context.class), anyBoolean(), any(TezWork.class))).thenAnswer(new Answer<Vertex>() { + when( + utils.createVertex(any(JobConf.class), any(BaseWork.class), any(Path.class), + any(LocalResource.class), any(List.class), any(FileSystem.class), any(Context.class), + anyBoolean(), any(TezWork.class), any(VertexType.class))).thenAnswer( + new Answer<Vertex>() { @Override public Vertex answer(InvocationOnMock invocation) throws Throwable { @@ -101,8 +107,8 @@ public class TestTezTask { } }); - when(utils.createEdge(any(JobConf.class), any(Vertex.class), - any(Vertex.class), any(TezEdgeProperty.class))).thenAnswer(new Answer<Edge>() { + when(utils.createEdge(any(JobConf.class), any(Vertex.class), any(Vertex.class), + any(TezEdgeProperty.class), any(VertexType.class))).thenAnswer(new Answer<Edge>() { @Override public Edge answer(InvocationOnMock invocation) throws Throwable { @@ -204,10 +210,11 @@ public class TestTezTask { @Test public void testSubmit() throws Exception { DAG dag = DAG.create("test"); - task.submit(conf, dag, path, appLr, sessionState, new LinkedList()); + task.submit(conf, dag, path, appLr, sessionState, Collections.<LocalResource> emptyList(), + new String[0], Collections.<String,LocalResource> emptyMap()); // validate close/reopen - verify(sessionState, times(1)).open(any(HiveConf.class)); - verify(sessionState, times(1)).close(eq(false)); // now uses pool after HIVE-7043 + verify(sessionState, times(1)).open(any(HiveConf.class), any(String[].class)); + verify(sessionState, times(1)).close(eq(true)); // now uses pool after HIVE-7043 verify(session, times(2)).submitDAG(any(DAG.class)); } @@ -216,4 +223,54 @@ public class TestTezTask { task.close(work, 0); verify(op, times(4)).jobClose(any(Configuration.class), eq(true)); } + + @Test + public void testExistingSessionGetsStorageHandlerResources() throws Exception { + final String[] inputOutputJars = new String[] {"file:///tmp/foo.jar"}; + LocalResource res = mock(LocalResource.class); + final List<LocalResource> resources = Collections.singletonList(res); + final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>(); + resMap.put("foo.jar", res); + + when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars)) + .thenReturn(resources); + when(utils.getBaseName(res)).thenReturn("foo.jar"); + when(sessionState.isOpen()).thenReturn(true); + when(sessionState.hasResources(inputOutputJars)).thenReturn(false); + task.updateSession(sessionState, conf, path, inputOutputJars, resMap); + verify(session).addAppMasterLocalFiles(resMap); + } + + @Test + public void testExtraResourcesAddedToDag() throws Exception { + final String[] inputOutputJars = new String[] {"file:///tmp/foo.jar"}; + LocalResource res = mock(LocalResource.class); + final List<LocalResource> resources = Collections.singletonList(res); + final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>(); + resMap.put("foo.jar", res); + DAG dag = mock(DAG.class); + + when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars)) + .thenReturn(resources); + when(utils.getBaseName(res)).thenReturn("foo.jar"); + when(sessionState.isOpen()).thenReturn(true); + when(sessionState.hasResources(inputOutputJars)).thenReturn(false); + task.addExtraResourcesToDag(sessionState, dag, inputOutputJars, resMap); + verify(dag).addTaskLocalFiles(resMap); + } + + @Test + public void testGetExtraLocalResources() throws Exception { + final String[] inputOutputJars = new String[] {"file:///tmp/foo.jar"}; + LocalResource res = mock(LocalResource.class); + final List<LocalResource> resources = Collections.singletonList(res); + final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>(); + resMap.put("foo.jar", res); + + when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars)) + .thenReturn(resources); + when(utils.getBaseName(res)).thenReturn("foo.jar"); + + assertEquals(resMap, task.getExtraLocalResources(conf, path, inputOutputJars)); + } } Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java (original) +++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java Tue Oct 14 19:06:45 2014 @@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.plan.Ag import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.GroupByDesc; +import org.apache.hadoop.hive.ql.plan.VectorGroupByDesc; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; @@ -601,6 +602,30 @@ public class TestVectorGroupByOperator { } @Test + public void testCountReduce() throws HiveException { + testAggregateCountReduce( + 2, + Arrays.asList(new Long[]{}), + 0L); + testAggregateCountReduce( + 2, + Arrays.asList(new Long[]{0L}), + 0L); + testAggregateCountReduce( + 2, + Arrays.asList(new Long[]{0L,0L}), + 0L); + testAggregateCountReduce( + 2, + Arrays.asList(new Long[]{0L,1L,0L}), + 1L); + testAggregateCountReduce( + 2, + Arrays.asList(new Long[]{13L,0L,7L,19L}), + 39L); + } + + @Test public void testCountDecimal() throws HiveException { testAggregateDecimal( "Decimal", @@ -1210,7 +1235,7 @@ public class TestVectorGroupByOperator { "count", 2, Arrays.asList(new Long[]{}), - null); + 0L); } @Test @@ -2027,6 +2052,17 @@ public class TestVectorGroupByOperator { testAggregateCountStarIterable (fdr, expected); } + public void testAggregateCountReduce ( + int batchSize, + Iterable<Long> values, + Object expected) throws HiveException { + + @SuppressWarnings("unchecked") + FakeVectorRowBatchFromLongIterables fdr = new FakeVectorRowBatchFromLongIterables(batchSize, + values); + testAggregateCountReduceIterable (fdr, expected); + } + public static interface Validator { void validate (String key, Object expected, Object result); @@ -2223,6 +2259,37 @@ public class TestVectorGroupByOperator { validator.validate("_total", expected, result); } + public void testAggregateCountReduceIterable ( + Iterable<VectorizedRowBatch> data, + Object expected) throws HiveException { + Map<String, Integer> mapColumnNames = new HashMap<String, Integer>(); + mapColumnNames.put("A", 0); + VectorizationContext ctx = new VectorizationContext(mapColumnNames, 1); + + GroupByDesc desc = buildGroupByDescType(ctx, "count", "A", TypeInfoFactory.longTypeInfo); + VectorGroupByDesc vectorDesc = desc.getVectorDesc(); + vectorDesc.setIsReduce(true); + + VectorGroupByOperator vgo = new VectorGroupByOperator(ctx, desc); + + FakeCaptureOutputOperator out = FakeCaptureOutputOperator.addCaptureOutputChild(vgo); + vgo.initialize(null, null); + + for (VectorizedRowBatch unit: data) { + vgo.processOp(unit, 0); + } + vgo.close(false); + + List<Object> outBatchList = out.getCapturedRows(); + assertNotNull(outBatchList); + assertEquals(1, outBatchList.size()); + + Object result = outBatchList.get(0); + + Validator validator = getValidator("count"); + validator.validate("_total", expected, result); + } + public void testAggregateStringIterable ( String aggregateName, Iterable<VectorizedRowBatch> data, Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/StorageFormats.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/StorageFormats.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/StorageFormats.java (original) +++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/StorageFormats.java Tue Oct 14 19:06:45 2014 @@ -90,7 +90,9 @@ public class StorageFormats { * includes both native Hive storage formats as well as those enumerated in the * ADDITIONAL_STORAGE_FORMATS table. * - * @return List of storage format as paramters. + * @return List of storage format as a Collection of Object arrays, each containing (in order): + * Storage format name, SerDe class name, InputFormat class name, OutputFormat class name. + * This list is used as the parameters to JUnit parameterized tests. */ public static Collection<Object[]> asParameters() { List<Object[]> parameters = new ArrayList<Object[]>(); @@ -130,5 +132,21 @@ public class StorageFormats { return parameters; } + + /** + * Returns a list of the names of storage formats. + * + * @return List of names of storage formats. + */ + public static Collection<Object[]> names() { + List<Object[]> names = new ArrayList<Object[]>(); + for (StorageFormatDescriptor descriptor : ServiceLoader.load(StorageFormatDescriptor.class)) { + String[] formatNames = new String[descriptor.getNames().size()]; + formatNames = descriptor.getNames().toArray(formatNames); + String[] params = { formatNames[0] }; + names.add(params); + } + return names; + } } Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java (original) +++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java Tue Oct 14 19:06:45 2014 @@ -217,11 +217,11 @@ public class TestAcidUtils { Path part = new MockPath(fs, "/tbl/part1"); AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidTxnListImpl("150:")); + // The two original buckets won't be in the obsolete list because we don't look at those + // until we have determined there is no base. List<FileStatus> obsolete = dir.getObsolete(); - assertEquals(3, obsolete.size()); + assertEquals(1, obsolete.size()); assertEquals("mock:/tbl/part1/base_5", obsolete.get(0).getPath().toString()); - assertEquals("mock:/tbl/part1/000000_0", obsolete.get(1).getPath().toString()); - assertEquals("mock:/tbl/part1/000001_1", obsolete.get(2).getPath().toString()); assertEquals("mock:/tbl/part1/base_10", dir.getBaseDirectory().toString()); } Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java (original) +++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java Tue Oct 14 19:06:45 2014 @@ -115,7 +115,8 @@ public class TestHiveBinarySearchRecordR } private void resetIOContext() { - ioContext = IOContext.get(); + conf.set(Utilities.INPUT_NAME, "TestHiveBinarySearchRecordReader"); + ioContext = IOContext.get(conf.get(Utilities.INPUT_NAME)); ioContext.setUseSorted(false); ioContext.setIsBinarySearching(false); ioContext.setEndBinarySearch(false); @@ -124,6 +125,7 @@ public class TestHiveBinarySearchRecordR } private void init() throws IOException { + conf = new JobConf(); resetIOContext(); rcfReader = mock(RCFileRecordReader.class); when(rcfReader.next((LongWritable)anyObject(), @@ -131,7 +133,6 @@ public class TestHiveBinarySearchRecordR // Since the start is 0, and the length is 100, the first call to sync should be with the value // 50 so return that for getPos() when(rcfReader.getPos()).thenReturn(50L); - conf = new JobConf(); conf.setBoolean("hive.input.format.sorted", true); TableDesc tblDesc = Utilities.defaultTd; Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java (original) +++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java Tue Oct 14 19:06:45 2014 @@ -165,7 +165,7 @@ public class TestSymlinkTextInputFormat + " failed with exit code= " + ecode); } - String cmd = "select key from " + tblName; + String cmd = "select key*1 from " + tblName; drv.compile(cmd); //create scratch dir Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java?rev=1631841&r1=1631840&r2=1631841&view=diff ============================================================================== --- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java (original) +++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java Tue Oct 14 19:06:45 2014 @@ -73,6 +73,7 @@ import org.apache.hadoop.hive.ql.io.sarg import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeUtils; @@ -118,7 +119,6 @@ public class TestInputOutputFormat { TimeZone gmt = TimeZone.getTimeZone("GMT+0"); DATE_FORMAT.setTimeZone(gmt); TIME_FORMAT.setTimeZone(gmt); - TimeZone local = TimeZone.getDefault(); } public static class BigRow implements Writable { @@ -560,6 +560,12 @@ public class TestInputOutputFormat { this.file = file; } + /** + * Set the blocks and their location for the file. + * Must be called after the stream is closed or the block length will be + * wrong. + * @param blocks the list of blocks + */ public void setBlocks(MockBlock... blocks) { file.blocks = blocks; int offset = 0; @@ -580,12 +586,18 @@ public class TestInputOutputFormat { file.content = new byte[file.length]; System.arraycopy(buf.getData(), 0, file.content, 0, file.length); } + + @Override + public String toString() { + return "Out stream to " + file.toString(); + } } public static class MockFileSystem extends FileSystem { final List<MockFile> files = new ArrayList<MockFile>(); Path workingDir = new Path("/"); + @SuppressWarnings("unused") public MockFileSystem() { // empty } @@ -620,7 +632,7 @@ public class TestInputOutputFormat { return new FSDataInputStream(new MockInputStream(file)); } } - return null; + throw new IOException("File not found: " + path); } @Override @@ -743,8 +755,12 @@ public class TestInputOutputFormat { for(MockBlock block: file.blocks) { if (OrcInputFormat.SplitGenerator.getOverlap(block.offset, block.length, start, len) > 0) { + String[] topology = new String[block.hosts.length]; + for(int i=0; i < topology.length; ++i) { + topology[i] = "/rack/ " + block.hosts[i]; + } result.add(new BlockLocation(block.hosts, block.hosts, - block.offset, block.length)); + topology, block.offset, block.length)); } } return result.toArray(new BlockLocation[result.size()]); @@ -1209,7 +1225,8 @@ public class TestInputOutputFormat { Path warehouseDir, String tableName, ObjectInspector objectInspector, - boolean isVectorized + boolean isVectorized, + int partitions ) throws IOException { Utilities.clearWorkMap(); JobConf conf = new JobConf(); @@ -1218,9 +1235,20 @@ public class TestInputOutputFormat { conf.set("hive.vectorized.execution.enabled", Boolean.toString(isVectorized)); conf.set("fs.mock.impl", MockFileSystem.class.getName()); conf.set("mapred.mapper.class", ExecMapper.class.getName()); - Path root = new Path(warehouseDir, tableName + "/p=0"); + Path root = new Path(warehouseDir, tableName); + // clean out previous contents ((MockFileSystem) root.getFileSystem(conf)).clear(); - conf.set("mapred.input.dir", root.toString()); + // build partition strings + String[] partPath = new String[partitions]; + StringBuilder buffer = new StringBuilder(); + for(int p=0; p < partitions; ++p) { + partPath[p] = new Path(root, "p=" + p).toString(); + if (p != 0) { + buffer.append(','); + } + buffer.append(partPath[p]); + } + conf.set("mapred.input.dir", buffer.toString()); StringBuilder columnIds = new StringBuilder(); StringBuilder columnNames = new StringBuilder(); StringBuilder columnTypes = new StringBuilder(); @@ -1239,6 +1267,8 @@ public class TestInputOutputFormat { } conf.set("hive.io.file.readcolumn.ids", columnIds.toString()); conf.set("partition_columns", "p"); + conf.set(serdeConstants.LIST_COLUMNS, columnNames.toString()); + conf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypes.toString()); MockFileSystem fs = (MockFileSystem) warehouseDir.getFileSystem(conf); fs.clear(); @@ -1249,9 +1279,6 @@ public class TestInputOutputFormat { tblProps.put("columns.types", columnTypes.toString()); TableDesc tbl = new TableDesc(OrcInputFormat.class, OrcOutputFormat.class, tblProps); - LinkedHashMap<String, String> partSpec = - new LinkedHashMap<String, String>(); - PartitionDesc part = new PartitionDesc(tbl, partSpec); MapWork mapWork = new MapWork(); mapWork.setVectorMode(isVectorized); @@ -1260,11 +1287,16 @@ public class TestInputOutputFormat { new LinkedHashMap<String, ArrayList<String>>(); ArrayList<String> aliases = new ArrayList<String>(); aliases.add(tableName); - aliasMap.put(root.toString(), aliases); - mapWork.setPathToAliases(aliasMap); LinkedHashMap<String, PartitionDesc> partMap = new LinkedHashMap<String, PartitionDesc>(); - partMap.put(root.toString(), part); + for(int p=0; p < partitions; ++p) { + aliasMap.put(partPath[p], aliases); + LinkedHashMap<String, String> partSpec = + new LinkedHashMap<String, String>(); + PartitionDesc part = new PartitionDesc(tbl, partSpec); + partMap.put(partPath[p], part); + } + mapWork.setPathToAliases(aliasMap); mapWork.setPathToPartitionInfo(partMap); mapWork.setScratchColumnMap(new HashMap<String, Map<String, Integer>>()); mapWork.setScratchColumnVectorTypes(new HashMap<String, @@ -1285,6 +1317,7 @@ public class TestInputOutputFormat { * @throws Exception */ @Test + @SuppressWarnings("unchecked") public void testVectorization() throws Exception { // get the object inspector for MyRow StructObjectInspector inspector; @@ -1294,7 +1327,7 @@ public class TestInputOutputFormat { ObjectInspectorFactory.ObjectInspectorOptions.JAVA); } JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"), - "vectorization", inspector, true); + "vectorization", inspector, true, 1); // write the orc file to the mock file system Writer writer = @@ -1332,6 +1365,7 @@ public class TestInputOutputFormat { * @throws Exception */ @Test + @SuppressWarnings("unchecked") public void testVectorizationWithBuckets() throws Exception { // get the object inspector for MyRow StructObjectInspector inspector; @@ -1341,7 +1375,7 @@ public class TestInputOutputFormat { ObjectInspectorFactory.ObjectInspectorOptions.JAVA); } JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"), - "vectorBuckets", inspector, true); + "vectorBuckets", inspector, true, 1); // write the orc file to the mock file system Writer writer = @@ -1377,10 +1411,11 @@ public class TestInputOutputFormat { // test acid with vectorization, no combine @Test + @SuppressWarnings("unchecked") public void testVectorizationWithAcid() throws Exception { StructObjectInspector inspector = new BigRowInspector(); JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"), - "vectorizationAcid", inspector, true); + "vectorizationAcid", inspector, true, 1); // write the orc file to the mock file system Path partDir = new Path(conf.get("mapred.input.dir")); @@ -1444,6 +1479,7 @@ public class TestInputOutputFormat { // test non-vectorized, non-acid, combine @Test + @SuppressWarnings("unchecked") public void testCombinationInputFormat() throws Exception { // get the object inspector for MyRow StructObjectInspector inspector; @@ -1453,7 +1489,7 @@ public class TestInputOutputFormat { ObjectInspectorFactory.ObjectInspectorOptions.JAVA); } JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"), - "combination", inspector, false); + "combination", inspector, false, 1); // write the orc file to the mock file system Path partDir = new Path(conf.get("mapred.input.dir")); @@ -1516,17 +1552,25 @@ public class TestInputOutputFormat { public void testCombinationInputFormatWithAcid() throws Exception { // get the object inspector for MyRow StructObjectInspector inspector; + final int PARTITIONS = 2; + final int BUCKETS = 3; synchronized (TestOrcFile.class) { inspector = (StructObjectInspector) ObjectInspectorFactory.getReflectionObjectInspector(MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); } JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"), - "combinationAcid", inspector, false); + "combinationAcid", inspector, false, PARTITIONS); // write the orc file to the mock file system - Path partDir = new Path(conf.get("mapred.input.dir")); - OrcRecordUpdater writer = new OrcRecordUpdater(partDir, + Path[] partDir = new Path[PARTITIONS]; + String[] paths = conf.getStrings("mapred.input.dir"); + for(int p=0; p < PARTITIONS; ++p) { + partDir[p] = new Path(paths[p]); + } + + // write a base file in partition 0 + OrcRecordUpdater writer = new OrcRecordUpdater(partDir[0], new AcidOutputFormat.Options(conf).maximumTransactionId(10) .writingBase(true).bucket(0).inspector(inspector)); for(int i=0; i < 10; ++i) { @@ -1534,31 +1578,68 @@ public class TestInputOutputFormat { } WriterImpl baseWriter = (WriterImpl) writer.getWriter(); writer.close(false); + MockOutputStream outputStream = (MockOutputStream) baseWriter.getStream(); - int length0 = outputStream.file.length; - writer = new OrcRecordUpdater(partDir, + outputStream.setBlocks(new MockBlock("host1", "host2")); + + // write a delta file in partition 0 + writer = new OrcRecordUpdater(partDir[0], new AcidOutputFormat.Options(conf).maximumTransactionId(10) .writingBase(true).bucket(1).inspector(inspector)); for(int i=10; i < 20; ++i) { writer.insert(10, new MyRow(i, 2*i)); } - baseWriter = (WriterImpl) writer.getWriter(); + WriterImpl deltaWriter = (WriterImpl) writer.getWriter(); + outputStream = (MockOutputStream) deltaWriter.getStream(); writer.close(false); - outputStream = (MockOutputStream) baseWriter.getStream(); outputStream.setBlocks(new MockBlock("host1", "host2")); + // write three files in partition 1 + for(int bucket=0; bucket < BUCKETS; ++bucket) { + Writer orc = OrcFile.createWriter( + new Path(partDir[1], "00000" + bucket + "_0"), + OrcFile.writerOptions(conf) + .blockPadding(false) + .bufferSize(1024) + .inspector(inspector)); + orc.addRow(new MyRow(1, 2)); + outputStream = (MockOutputStream) ((WriterImpl) orc).getStream(); + orc.close(); + outputStream.setBlocks(new MockBlock("host3", "host4")); + } + // call getsplits + conf.setInt(hive_metastoreConstants.BUCKET_COUNT, BUCKETS); HiveInputFormat<?,?> inputFormat = new CombineHiveInputFormat<WritableComparable, Writable>(); - try { - InputSplit[] splits = inputFormat.getSplits(conf, 1); - assertTrue("shouldn't reach here", false); - } catch (IOException ioe) { - assertEquals("CombineHiveInputFormat is incompatible" - + " with ACID tables. Please set hive.input.format=org.apache.hadoop" - + ".hive.ql.io.HiveInputFormat", - ioe.getMessage()); + InputSplit[] splits = inputFormat.getSplits(conf, 1); + assertEquals(3, splits.length); + HiveInputFormat.HiveInputSplit split = + (HiveInputFormat.HiveInputSplit) splits[0]; + assertEquals("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat", + split.inputFormatClassName()); + assertEquals("mock:/combinationAcid/p=0/base_0000010/bucket_00000", + split.getPath().toString()); + assertEquals(0, split.getStart()); + assertEquals(580, split.getLength()); + split = (HiveInputFormat.HiveInputSplit) splits[1]; + assertEquals("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat", + split.inputFormatClassName()); + assertEquals("mock:/combinationAcid/p=0/base_0000010/bucket_00001", + split.getPath().toString()); + assertEquals(0, split.getStart()); + assertEquals(601, split.getLength()); + CombineHiveInputFormat.CombineHiveInputSplit combineSplit = + (CombineHiveInputFormat.CombineHiveInputSplit) splits[2]; + assertEquals(BUCKETS, combineSplit.getNumPaths()); + for(int bucket=0; bucket < BUCKETS; ++bucket) { + assertEquals("mock:/combinationAcid/p=1/00000" + bucket + "_0", + combineSplit.getPath(bucket).toString()); + assertEquals(0, combineSplit.getOffset(bucket)); + assertEquals(225, combineSplit.getLength(bucket)); } + String[] hosts = combineSplit.getLocations(); + assertEquals(2, hosts.length); } @Test
