Modified: hive/branches/cbo/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java?rev=1624140&r1=1624139&r2=1624140&view=diff ============================================================================== --- hive/branches/cbo/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java (original) +++ hive/branches/cbo/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java Wed Sep 10 21:41:16 2014 @@ -17,30 +17,38 @@ */ package org.apache.hadoop.hive.metastore.txn; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.shims.ShimLoader; - import java.sql.Connection; import java.sql.Driver; +import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.SQLException; import java.sql.Statement; import java.util.Properties; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.shims.ShimLoader; + /** - * Utility methods for creating and destroying txn database/schema. Placed - * here in a separate class so it can be shared across unit tests. + * Utility methods for creating and destroying txn database/schema. + * Placed here in a separate class so it can be shared across unit tests. */ -public class TxnDbUtil { - private final static String txnMgr = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"; +public final class TxnDbUtil { + + private static final String TXN_MANAGER = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"; + + private TxnDbUtil() { + throw new UnsupportedOperationException("Can't initialize class"); + } /** * Set up the configuration so it will use the DbTxnManager, concurrency will be set to true, * and the JDBC configs will be set for putting the transaction and lock info in the embedded * metastore. - * @param conf HiveConf to add these values to. + * + * @param conf HiveConf to add these values to */ public static void setConfValues(HiveConf conf) { - conf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, txnMgr); + conf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, TXN_MANAGER); conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true); } @@ -49,187 +57,193 @@ public class TxnDbUtil { // intended for creating derby databases, and thus will inexorably get // out of date with it. I'm open to any suggestions on how to make this // read the file in a build friendly way. + Connection conn = null; - boolean committed = false; + Statement stmt = null; try { conn = getConnection(); - Statement s = conn.createStatement(); - s.execute("CREATE TABLE TXNS (" + - " TXN_ID bigint PRIMARY KEY," + - " TXN_STATE char(1) NOT NULL," + - " TXN_STARTED bigint NOT NULL," + - " TXN_LAST_HEARTBEAT bigint NOT NULL," + - " TXN_USER varchar(128) NOT NULL," + - " TXN_HOST varchar(128) NOT NULL)"); - - s.execute("CREATE TABLE TXN_COMPONENTS (" + - " TC_TXNID bigint REFERENCES TXNS (TXN_ID)," + - " TC_DATABASE varchar(128) NOT NULL," + - " TC_TABLE varchar(128)," + - " TC_PARTITION varchar(767))"); - s.execute("CREATE TABLE COMPLETED_TXN_COMPONENTS (" + - " CTC_TXNID bigint," + - " CTC_DATABASE varchar(128) NOT NULL," + - " CTC_TABLE varchar(128)," + - " CTC_PARTITION varchar(767))"); - s.execute("CREATE TABLE NEXT_TXN_ID (" + - " NTXN_NEXT bigint NOT NULL)"); - s.execute("INSERT INTO NEXT_TXN_ID VALUES(1)"); - s.execute("CREATE TABLE HIVE_LOCKS (" + - " HL_LOCK_EXT_ID bigint NOT NULL," + - " HL_LOCK_INT_ID bigint NOT NULL," + - " HL_TXNID bigint," + - " HL_DB varchar(128) NOT NULL," + - " HL_TABLE varchar(128)," + - " HL_PARTITION varchar(767)," + - " HL_LOCK_STATE char(1) NOT NULL," + - " HL_LOCK_TYPE char(1) NOT NULL," + - " HL_LAST_HEARTBEAT bigint NOT NULL," + - " HL_ACQUIRED_AT bigint," + - " HL_USER varchar(128) NOT NULL," + - " HL_HOST varchar(128) NOT NULL," + - " PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID))"); - s.execute("CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID)"); - - s.execute("CREATE TABLE NEXT_LOCK_ID (" + - " NL_NEXT bigint NOT NULL)"); - s.execute("INSERT INTO NEXT_LOCK_ID VALUES(1)"); - - s.execute("CREATE TABLE COMPACTION_QUEUE (" + - " CQ_ID bigint PRIMARY KEY," + - " CQ_DATABASE varchar(128) NOT NULL," + - " CQ_TABLE varchar(128) NOT NULL," + - " CQ_PARTITION varchar(767)," + - " CQ_STATE char(1) NOT NULL," + - " CQ_TYPE char(1) NOT NULL," + - " CQ_WORKER_ID varchar(128)," + - " CQ_START bigint," + - " CQ_RUN_AS varchar(128))"); + stmt = conn.createStatement(); + stmt.execute("CREATE TABLE TXNS (" + + " TXN_ID bigint PRIMARY KEY," + + " TXN_STATE char(1) NOT NULL," + + " TXN_STARTED bigint NOT NULL," + + " TXN_LAST_HEARTBEAT bigint NOT NULL," + + " TXN_USER varchar(128) NOT NULL," + + " TXN_HOST varchar(128) NOT NULL)"); + + stmt.execute("CREATE TABLE TXN_COMPONENTS (" + + " TC_TXNID bigint REFERENCES TXNS (TXN_ID)," + + " TC_DATABASE varchar(128) NOT NULL," + + " TC_TABLE varchar(128)," + + " TC_PARTITION varchar(767))"); + stmt.execute("CREATE TABLE COMPLETED_TXN_COMPONENTS (" + + " CTC_TXNID bigint," + + " CTC_DATABASE varchar(128) NOT NULL," + + " CTC_TABLE varchar(128)," + + " CTC_PARTITION varchar(767))"); + stmt.execute("CREATE TABLE NEXT_TXN_ID (" + " NTXN_NEXT bigint NOT NULL)"); + stmt.execute("INSERT INTO NEXT_TXN_ID VALUES(1)"); + stmt.execute("CREATE TABLE HIVE_LOCKS (" + + " HL_LOCK_EXT_ID bigint NOT NULL," + + " HL_LOCK_INT_ID bigint NOT NULL," + + " HL_TXNID bigint," + + " HL_DB varchar(128) NOT NULL," + + " HL_TABLE varchar(128)," + + " HL_PARTITION varchar(767)," + + " HL_LOCK_STATE char(1) NOT NULL," + + " HL_LOCK_TYPE char(1) NOT NULL," + + " HL_LAST_HEARTBEAT bigint NOT NULL," + + " HL_ACQUIRED_AT bigint," + + " HL_USER varchar(128) NOT NULL," + + " HL_HOST varchar(128) NOT NULL," + + " PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID))"); + stmt.execute("CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID)"); + + stmt.execute("CREATE TABLE NEXT_LOCK_ID (" + " NL_NEXT bigint NOT NULL)"); + stmt.execute("INSERT INTO NEXT_LOCK_ID VALUES(1)"); + + stmt.execute("CREATE TABLE COMPACTION_QUEUE (" + + " CQ_ID bigint PRIMARY KEY," + + " CQ_DATABASE varchar(128) NOT NULL," + + " CQ_TABLE varchar(128) NOT NULL," + + " CQ_PARTITION varchar(767)," + + " CQ_STATE char(1) NOT NULL," + + " CQ_TYPE char(1) NOT NULL," + + " CQ_WORKER_ID varchar(128)," + + " CQ_START bigint," + + " CQ_RUN_AS varchar(128))"); - s.execute("CREATE TABLE NEXT_COMPACTION_QUEUE_ID (NCQ_NEXT bigint NOT NULL)"); - s.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)"); + stmt.execute("CREATE TABLE NEXT_COMPACTION_QUEUE_ID (NCQ_NEXT bigint NOT NULL)"); + stmt.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)"); conn.commit(); - committed = true; } finally { - if (!committed) conn.rollback(); - conn.close(); + closeResources(conn, stmt, null); } } - public static void cleanDb() throws Exception { + public static void cleanDb() throws Exception { Connection conn = null; - boolean committed = false; + Statement stmt = null; try { conn = getConnection(); - Statement s = conn.createStatement(); + stmt = conn.createStatement(); + // We want to try these, whether they succeed or fail. try { - s.execute("DROP INDEX HL_TXNID_INDEX"); - } catch (Exception e) { - System.err.println("Unable to drop index HL_TXNID_INDEX " + - e.getMessage()); - } - try { - s.execute("DROP TABLE TXN_COMPONENTS"); - } catch (Exception e) { - System.err.println("Unable to drop table TXN_COMPONENTS " + - e.getMessage()); - } - try { - s.execute("DROP TABLE COMPLETED_TXN_COMPONENTS"); - } catch (Exception e) { - System.err.println("Unable to drop table COMPLETED_TXN_COMPONENTS " + - e.getMessage()); - } - try { - s.execute("DROP TABLE TXNS"); - } catch (Exception e) { - System.err.println("Unable to drop table TXNS " + - e.getMessage()); - } - try { - s.execute("DROP TABLE NEXT_TXN_ID"); - } catch (Exception e) { - System.err.println("Unable to drop table NEXT_TXN_ID " + - e.getMessage()); - } - try { - s.execute("DROP TABLE HIVE_LOCKS"); - } catch (Exception e) { - System.err.println("Unable to drop table HIVE_LOCKS " + - e.getMessage()); - } - try { - s.execute("DROP TABLE NEXT_LOCK_ID"); - } catch (Exception e) { - } - try { - s.execute("DROP TABLE COMPACTION_QUEUE"); - } catch (Exception e) { - } - try { - s.execute("DROP TABLE NEXT_COMPACTION_QUEUE_ID"); + stmt.execute("DROP INDEX HL_TXNID_INDEX"); } catch (Exception e) { + System.err.println("Unable to drop index HL_TXNID_INDEX " + e.getMessage()); } + + dropTable(stmt, "TXN_COMPONENTS"); + dropTable(stmt, "COMPLETED_TXN_COMPONENTS"); + dropTable(stmt, "TXNS"); + dropTable(stmt, "NEXT_TXN_ID"); + dropTable(stmt, "HIVE_LOCKS"); + dropTable(stmt, "NEXT_LOCK_ID"); + dropTable(stmt, "COMPACTION_QUEUE"); + dropTable(stmt, "NEXT_COMPACTION_QUEUE_ID"); + conn.commit(); - committed = true; } finally { - if (!committed) conn.rollback(); - conn.close(); + closeResources(conn, stmt, null); + } + } + + private static void dropTable(Statement stmt, String name) { + try { + stmt.execute("DROP TABLE " + name); + } catch (Exception e) { + System.err.println("Unable to drop table " + name + ": " + e.getMessage()); } } /** * A tool to count the number of partitions, tables, * and databases locked by a particular lockId. + * * @param lockId lock id to look for lock components + * * @return number of components, or 0 if there is no lock */ - public static int countLockComponents(long lockId) throws Exception { - Connection conn = getConnection(); + public static int countLockComponents(long lockId) throws Exception { + Connection conn = null; + PreparedStatement stmt = null; + ResultSet rs = null; try { - Statement s = conn.createStatement(); - ResultSet rs = s.executeQuery("select count(*) from hive_locks where hl_lock_ext_id = " + - lockId); - if (!rs.next()) return 0; - int rc = rs.getInt(1); - return rc; + conn = getConnection(); + stmt = conn.prepareStatement("SELECT count(*) FROM hive_locks WHERE hl_lock_ext_id = ?"); + stmt.setLong(1, lockId); + rs = stmt.executeQuery(); + if (!rs.next()) { + return 0; + } + return rs.getInt(1); } finally { - conn.rollback(); - conn.close(); + closeResources(conn, stmt, rs); } } public static int findNumCurrentLocks() throws Exception { Connection conn = null; + Statement stmt = null; + ResultSet rs = null; try { conn = getConnection(); - Statement s = conn.createStatement(); - ResultSet rs = s.executeQuery("select count(*) from hive_locks"); - if (!rs.next()) return 0; - int rc = rs.getInt(1); - return rc; - } finally { - if (conn != null) { - conn.rollback(); - conn.close(); + stmt = conn.createStatement(); + rs = stmt.executeQuery("select count(*) from hive_locks"); + if (!rs.next()) { + return 0; } + return rs.getInt(1); + } finally { + closeResources(conn, stmt, rs); } } private static Connection getConnection() throws Exception { HiveConf conf = new HiveConf(); String jdbcDriver = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER); - Driver driver = (Driver)Class.forName(jdbcDriver).newInstance(); + Driver driver = (Driver) Class.forName(jdbcDriver).newInstance(); Properties prop = new Properties(); String driverUrl = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORECONNECTURLKEY); String user = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME); - String passwd = ShimLoader.getHadoopShims().getPassword(conf, - HiveConf.ConfVars.METASTOREPWD.varname); - prop.put("user", user); - prop.put("password", passwd); + String passwd = + ShimLoader.getHadoopShims().getPassword(conf, HiveConf.ConfVars.METASTOREPWD.varname); + prop.setProperty("user", user); + prop.setProperty("password", passwd); return driver.connect(driverUrl, prop); } + private static void closeResources(Connection conn, Statement stmt, ResultSet rs) { + if (rs != null) { + try { + rs.close(); + } catch (SQLException e) { + System.err.println("Error closing ResultSet: " + e.getMessage()); + } + } + + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + System.err.println("Error closing Statement: " + e.getMessage()); + } + } + + if (conn != null) { + try { + conn.rollback(); + } catch (SQLException e) { + System.err.println("Error rolling back: " + e.getMessage()); + } + try { + conn.close(); + } catch (SQLException e) { + System.err.println("Error closing Connection: " + e.getMessage()); + } + } + } }
Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java?rev=1624140&r1=1624139&r2=1624140&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java Wed Sep 10 21:41:16 2014 @@ -185,7 +185,6 @@ public class QueryProperties { return this.filterWithSubQuery; } - public void clear() { hasJoin = false; hasGroupBy = false; Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=1624140&r1=1624139&r2=1624140&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Wed Sep 10 21:41:16 2014 @@ -1351,9 +1351,9 @@ public class DDLTask extends Task<DDLWor if(harPartitionDir.getUserInfo() != null) { authority.append(harPartitionDir.getUserInfo()).append("@"); } - authority.append(harPartitionDir.getHost()).append(":"); + authority.append(harPartitionDir.getHost()); if(harPartitionDir.getPort() != -1) { - authority.append(harPartitionDir.getPort()); + authority.append(":").append(harPartitionDir.getPort()); } Path harPath = new Path(harPartitionDir.getScheme(), authority.toString(), Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java?rev=1624140&r1=1624139&r2=1624140&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java Wed Sep 10 21:41:16 2014 @@ -557,7 +557,7 @@ public final class FunctionRegistry { try { FunctionTask.addFunctionResources(func.getResourceUris()); } catch (Exception e) { - LOG.error("Unable to load resources for " + dbName + "." + fName + ":" + e); + LOG.error("Unable to load resources for " + dbName + "." + fName + ":" + e.getMessage(), e); return null; } Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1624140&r1=1624139&r2=1624140&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Wed Sep 10 21:41:16 2014 @@ -92,6 +92,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.common.HiveInterruptCallback; import org.apache.hadoop.hive.common.HiveInterruptUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; @@ -160,6 +161,7 @@ import org.apache.hadoop.hive.serde2.Ser import org.apache.hadoop.hive.serde2.Serializer; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.Text; @@ -3423,6 +3425,41 @@ public final class Utilities { } } + public static boolean createDirsWithPermission(Configuration conf, Path mkdirPath, + FsPermission fsPermission, boolean recursive) throws IOException { + String origUmask = null; + LOG.debug("Create dirs " + mkdirPath + " with permission " + fsPermission + " recursive " + + recursive); + if (recursive) { + origUmask = conf.get(FsPermission.UMASK_LABEL); + // this umask is required because by default the hdfs mask is 022 resulting in + // all parents getting the fsPermission & !(022) permission instead of fsPermission + conf.set(FsPermission.UMASK_LABEL, "000"); + } + FileSystem fs = ShimLoader.getHadoopShims().getNonCachedFileSystem(mkdirPath.toUri(), conf); + boolean retval = false; + try { + retval = fs.mkdirs(mkdirPath, fsPermission); + resetUmaskInConf(conf, recursive, origUmask); + } catch (IOException ioe) { + resetUmaskInConf(conf, recursive, origUmask); + throw ioe; + } finally { + IOUtils.closeStream(fs); + } + return retval; + } + + private static void resetUmaskInConf(Configuration conf, boolean unsetUmask, String origUmask) { + if (unsetUmask) { + if (origUmask != null) { + conf.set(FsPermission.UMASK_LABEL, origUmask); + } else { + conf.unset(FsPermission.UMASK_LABEL); + } + } + } + /** * Returns true if a plan is both configured for vectorized execution * and vectorization is allowed. The plan may be configured for vectorization Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/AggregateDefinition.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/AggregateDefinition.java?rev=1624140&r1=1624139&r2=1624140&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/AggregateDefinition.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/AggregateDefinition.java Wed Sep 10 21:41:16 2014 @@ -19,16 +19,20 @@ package org.apache.hadoop.hive.ql.exec.vector; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; +import org.apache.hadoop.hive.ql.plan.GroupByDesc; class AggregateDefinition { + private String name; private VectorExpressionDescriptor.ArgumentType type; + private GroupByDesc.Mode mode; private Class<? extends VectorAggregateExpression> aggClass; AggregateDefinition(String name, VectorExpressionDescriptor.ArgumentType type, - Class<? extends VectorAggregateExpression> aggClass) { + GroupByDesc.Mode mode, Class<? extends VectorAggregateExpression> aggClass) { this.name = name; this.type = type; + this.mode = mode; this.aggClass = aggClass; } @@ -38,6 +42,9 @@ class AggregateDefinition { VectorExpressionDescriptor.ArgumentType getType() { return type; } + GroupByDesc.Mode getMode() { + return mode; + } Class<? extends VectorAggregateExpression> getAggClass() { return aggClass; } Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java?rev=1624140&r1=1624139&r2=1624140&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java Wed Sep 10 21:41:16 2014 @@ -32,8 +32,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; +import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.KeyWrapper; +import org.apache.hadoop.hive.ql.exec.KeyWrapperFactory; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory; @@ -47,13 +50,17 @@ import org.apache.hadoop.hive.ql.plan.ap import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.DataOutputBuffer; /** * Vectorized GROUP BY operator implementation. Consumes the vectorized input and * stores the aggregate operators' intermediate states. Emits row mode output. * */ -public class VectorGroupByOperator extends GroupByOperator { +public class VectorGroupByOperator extends GroupByOperator implements VectorizationContextRegion { private static final Log LOG = LogFactory.getLog( VectorGroupByOperator.class.getName()); @@ -70,6 +77,17 @@ public class VectorGroupByOperator exten */ private VectorExpression[] keyExpressions; + private boolean isVectorOutput; + + // Create a new outgoing vectorization context because column name map will change. + private VectorizationContext vOutContext = null; + + private String fileKey; + + // The above members are initialized by the constructor and must not be + // transient. + //--------------------------------------------------------------------------- + private transient VectorExpressionWriter[] keyOutputWriters; /** @@ -85,11 +103,18 @@ public class VectorGroupByOperator exten private transient Object[] forwardCache; + private transient VectorizedRowBatch outputBatch; + private transient VectorizedRowBatchCtx vrbCtx; + + private transient VectorColumnAssign[] vectorColumnAssign; + /** - * Interface for processing mode: global, hash or streaming + * Interface for processing mode: global, hash, unsorted streaming, or group batch */ private static interface IProcessingMode { public void initialize(Configuration hconf) throws HiveException; + public void startGroup() throws HiveException; + public void endGroup() throws HiveException; public void processBatch(VectorizedRowBatch batch) throws HiveException; public void close(boolean aborted) throws HiveException; } @@ -98,6 +123,15 @@ public class VectorGroupByOperator exten * Base class for all processing modes */ private abstract class ProcessingModeBase implements IProcessingMode { + + // Overridden and used in sorted reduce group batch processing mode. + public void startGroup() throws HiveException { + // Do nothing. + } + public void endGroup() throws HiveException { + // Do nothing. + } + /** * Evaluates the aggregators on the current batch. * The aggregationBatchInfo must have been prepared @@ -170,7 +204,7 @@ public class VectorGroupByOperator exten @Override public void close(boolean aborted) throws HiveException { if (!aborted) { - flushSingleRow(null, aggregationBuffers); + writeSingleRow(null, aggregationBuffers); } } } @@ -426,7 +460,7 @@ public class VectorGroupByOperator exten while(iter.hasNext()) { Map.Entry<KeyWrapper, VectorAggregationBufferRow> pair = iter.next(); - flushSingleRow((VectorHashKeyWrapper) pair.getKey(), pair.getValue()); + writeSingleRow((VectorHashKeyWrapper) pair.getKey(), pair.getValue()); if (!all) { iter.remove(); @@ -501,20 +535,21 @@ public class VectorGroupByOperator exten if (numEntriesHashTable > sumBatchSize * minReductionHashAggr) { flush(true); - changeToStreamingMode(); + changeToUnsortedStreamingMode(); } } } } /** - * Streaming processing mode. Intermediate values are flushed each time key changes. - * In this mode we're relying on the MR shuffle and merge the intermediates in the reduce. + * Unsorted streaming processing mode. Each input VectorizedRowBatch may have + * a mix of different keys (hence unsorted). Intermediate values are flushed + * each time key changes. */ - private class ProcessingModeStreaming extends ProcessingModeBase { + private class ProcessingModeUnsortedStreaming extends ProcessingModeBase { /** - * The aggreagation buffers used in streaming mode + * The aggregation buffers used in streaming mode */ private VectorAggregationBufferRow currentStreamingAggregators; @@ -557,7 +592,7 @@ public class VectorGroupByOperator exten // Nothing to do } }); - LOG.info("using streaming aggregation processing mode"); + LOG.info("using unsorted streaming aggregation processing mode"); } @Override @@ -601,7 +636,7 @@ public class VectorGroupByOperator exten // Now flush/forward all keys/rows, except the last (current) one for (int i = 0; i < flushMark; ++i) { - flushSingleRow(keysToFlush[i], rowsToFlush[i]); + writeSingleRow(keysToFlush[i], rowsToFlush[i]); rowsToFlush[i].reset(); streamAggregationBufferRowPool.putInPool(rowsToFlush[i]); } @@ -610,7 +645,79 @@ public class VectorGroupByOperator exten @Override public void close(boolean aborted) throws HiveException { if (!aborted && null != streamingKey) { - flushSingleRow(streamingKey, currentStreamingAggregators); + writeSingleRow(streamingKey, currentStreamingAggregators); + } + } + } + + /** + * Sorted reduce group batch processing mode. Each input VectorizedRowBatch will have the + * same key. On endGroup (or close), the intermediate values are flushed. + */ + private class ProcessingModeGroupBatches extends ProcessingModeBase { + + private boolean inGroup; + private boolean first; + + /** + * The group vector key helper. + */ + VectorGroupKeyHelper groupKeyHelper; + + /** + * The group vector aggregation buffers. + */ + private VectorAggregationBufferRow groupAggregators; + + /** + * Buffer to hold string values. + */ + private DataOutputBuffer buffer; + + @Override + public void initialize(Configuration hconf) throws HiveException { + inGroup = false; + groupKeyHelper = new VectorGroupKeyHelper(keyExpressions.length); + groupKeyHelper.init(keyExpressions); + groupAggregators = allocateAggregationBuffer(); + buffer = new DataOutputBuffer(); + LOG.info("using sorted group batch aggregation processing mode"); + } + + @Override + public void startGroup() throws HiveException { + inGroup = true; + first = true; + } + + @Override + public void endGroup() throws HiveException { + if (inGroup && !first) { + writeGroupRow(groupAggregators, buffer); + groupAggregators.reset(); + } + inGroup = false; + } + + @Override + public void processBatch(VectorizedRowBatch batch) throws HiveException { + assert(inGroup); + if (first) { + // Copy the group key to output batch now. We'll copy in the aggregates at the end of the group. + first = false; + groupKeyHelper.copyGroupKey(batch, outputBatch, buffer); + } + + // Aggregate this batch. + for (int i = 0; i < aggregators.length; ++i) { + aggregators[i].aggregateInput(groupAggregators.getAggregationBuffer(i), batch); + } + } + + @Override + public void close(boolean aborted) throws HiveException { + if (!aborted && inGroup && !first) { + writeGroupRow(groupAggregators, buffer); } } } @@ -633,8 +740,20 @@ public class VectorGroupByOperator exten aggregators = new VectorAggregateExpression[aggrDesc.size()]; for (int i = 0; i < aggrDesc.size(); ++i) { AggregationDesc aggDesc = aggrDesc.get(i); - aggregators[i] = vContext.getAggregatorExpression(aggDesc); + aggregators[i] = vContext.getAggregatorExpression(aggDesc, desc.getVectorDesc().isReduce()); } + + isVectorOutput = desc.getVectorDesc().isVectorOutput(); + + List<String> outColNames = desc.getOutputColumnNames(); + Map<String, Integer> mapOutCols = new HashMap<String, Integer>(outColNames.size()); + int outColIndex = 0; + for(String outCol: outColNames) { + mapOutCols.put(outCol, outColIndex++); + } + vOutContext = new VectorizationContext(mapOutCols, outColIndex); + vOutContext.setFileKey(vContext.getFileKey() + "/_GROUPBY_"); + fileKey = vOutContext.getFileKey(); } public VectorGroupByOperator() { @@ -662,13 +781,23 @@ public class VectorGroupByOperator exten objectInspectors.add(aggregators[i].getOutputObjectInspector()); } - keyWrappersBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions); - aggregationBatchInfo = new VectorAggregationBufferBatch(); - aggregationBatchInfo.compileAggregationBatchInfo(aggregators); - + if (!conf.getVectorDesc().isVectorGroupBatches()) { + // These data structures are only used by the map-side processing modes. + keyWrappersBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions); + aggregationBatchInfo = new VectorAggregationBufferBatch(); + aggregationBatchInfo.compileAggregationBatchInfo(aggregators); + } + LOG.warn("VectorGroupByOperator is vector output " + isVectorOutput); List<String> outputFieldNames = conf.getOutputColumnNames(); outputObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector( outputFieldNames, objectInspectors); + if (isVectorOutput) { + vrbCtx = new VectorizedRowBatchCtx(); + vrbCtx.init(hconf, fileKey, (StructObjectInspector) outputObjInspector); + outputBatch = vrbCtx.createVectorizedRowBatch(); + vectorColumnAssign = VectorColumnAssignFactory.buildAssigners( + outputBatch, outputObjInspector, vOutContext.getColumnMap(), conf.getOutputColumnNames()); + } } catch (HiveException he) { throw he; @@ -678,32 +807,43 @@ public class VectorGroupByOperator exten initializeChildren(hconf); - forwardCache =new Object[keyExpressions.length + aggregators.length]; + forwardCache = new Object[keyExpressions.length + aggregators.length]; if (keyExpressions.length == 0) { - processingMode = this.new ProcessingModeGlobalAggregate(); - } - else { - //TODO: consider if parent can offer order guarantees - // If input is sorted, is more efficient to use the streaming mode + processingMode = this.new ProcessingModeGlobalAggregate(); + } else if (conf.getVectorDesc().isVectorGroupBatches()) { + // Sorted GroupBy of vector batches where an individual batch has the same group key (e.g. reduce). + processingMode = this.new ProcessingModeGroupBatches(); + } else { + // We start in hash mode and may dynamically switch to unsorted stream mode. processingMode = this.new ProcessingModeHashAggregate(); } processingMode.initialize(hconf); } /** - * changes the processing mode to streaming + * changes the processing mode to unsorted streaming * This is done at the request of the hash agg mode, if the number of keys * exceeds the minReductionHashAggr factor * @throws HiveException */ - private void changeToStreamingMode() throws HiveException { - processingMode = this.new ProcessingModeStreaming(); + private void changeToUnsortedStreamingMode() throws HiveException { + processingMode = this.new ProcessingModeUnsortedStreaming(); processingMode.initialize(null); LOG.trace("switched to streaming mode"); } @Override + public void startGroup() throws HiveException { + processingMode.startGroup(); + } + + @Override + public void endGroup() throws HiveException { + processingMode.endGroup(); + } + + @Override public void processOp(Object row, int tag) throws HiveException { VectorizedRowBatch batch = (VectorizedRowBatch) row; @@ -719,26 +859,72 @@ public class VectorGroupByOperator exten * @param agg * @throws HiveException */ - private void flushSingleRow(VectorHashKeyWrapper kw, VectorAggregationBufferRow agg) + private void writeSingleRow(VectorHashKeyWrapper kw, VectorAggregationBufferRow agg) throws HiveException { int fi = 0; - for (int i = 0; i < keyExpressions.length; ++i) { - forwardCache[fi++] = keyWrappersBatch.getWritableKeyValue ( - kw, i, keyOutputWriters[i]); + if (!isVectorOutput) { + // Output row. + for (int i = 0; i < keyExpressions.length; ++i) { + forwardCache[fi++] = keyWrappersBatch.getWritableKeyValue ( + kw, i, keyOutputWriters[i]); + } + for (int i = 0; i < aggregators.length; ++i) { + forwardCache[fi++] = aggregators[i].evaluateOutput(agg.getAggregationBuffer(i)); + } + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("forwarding keys: %s: %s", + kw, Arrays.toString(forwardCache))); + } + forward(forwardCache, outputObjInspector); + } else { + // Output keys and aggregates into the output batch. + for (int i = 0; i < keyExpressions.length; ++i) { + vectorColumnAssign[fi++].assignObjectValue(keyWrappersBatch.getWritableKeyValue ( + kw, i, keyOutputWriters[i]), outputBatch.size); + } + for (int i = 0; i < aggregators.length; ++i) { + vectorColumnAssign[fi++].assignObjectValue(aggregators[i].evaluateOutput( + agg.getAggregationBuffer(i)), outputBatch.size); + } + ++outputBatch.size; + if (outputBatch.size == VectorizedRowBatch.DEFAULT_SIZE) { + flushOutput(); + } } + } + + /** + * Emits a (reduce) group row, made from the key (copied in at the beginning of the group) and + * the row aggregation buffers values + * @param agg + * @param buffer + * @throws HiveException + */ + private void writeGroupRow(VectorAggregationBufferRow agg, DataOutputBuffer buffer) + throws HiveException { + int fi = keyExpressions.length; // Start after group keys. for (int i = 0; i < aggregators.length; ++i) { - forwardCache[fi++] = aggregators[i].evaluateOutput(agg.getAggregationBuffer(i)); + vectorColumnAssign[fi++].assignObjectValue(aggregators[i].evaluateOutput( + agg.getAggregationBuffer(i)), outputBatch.size); } - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("forwarding keys: %s: %s", - kw, Arrays.toString(forwardCache))); + ++outputBatch.size; + if (outputBatch.size == VectorizedRowBatch.DEFAULT_SIZE) { + flushOutput(); + buffer.reset(); } - forward(forwardCache, outputObjInspector); + } + + private void flushOutput() throws HiveException { + forward(outputBatch, null); + outputBatch.reset(); } @Override public void closeOp(boolean aborted) throws HiveException { processingMode.close(aborted); + if (!aborted && isVectorOutput && outputBatch.size > 0) { + flushOutput(); + } } static public String getOperatorName() { @@ -761,4 +947,8 @@ public class VectorGroupByOperator exten this.aggregators = aggregators; } + @Override + public VectorizationContext getOuputVectorizationContext() { + return vOutContext; + } } Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java?rev=1624140&r1=1624139&r2=1624140&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java Wed Sep 10 21:41:16 2014 @@ -60,6 +60,7 @@ public class VectorHashKeyWrapper extend byteStarts = new int[byteValuesCount]; byteLengths = new int[byteValuesCount]; isNull = new boolean[longValuesCount + doubleValuesCount + byteValuesCount + decimalValuesCount]; + hashcode = 0; } private VectorHashKeyWrapper() { Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java?rev=1624140&r1=1624139&r2=1624140&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java Wed Sep 10 21:41:16 2014 @@ -32,41 +32,10 @@ import org.apache.hadoop.hive.serde2.laz * This class stores additional information about keys needed to evaluate and output the key values. * */ -public class VectorHashKeyWrapperBatch { +public class VectorHashKeyWrapperBatch extends VectorColumnSetInfo { - /** - * Helper class for looking up a key value based on key index. - */ - private static class KeyLookupHelper { - private int longIndex; - private int doubleIndex; - private int stringIndex; - private int decimalIndex; - - private static final int INDEX_UNUSED = -1; - - private void resetIndices() { - this.longIndex = this.doubleIndex = this.stringIndex = this.decimalIndex = INDEX_UNUSED; - } - public void setLong(int index) { - resetIndices(); - this.longIndex= index; - } - - public void setDouble(int index) { - resetIndices(); - this.doubleIndex = index; - } - - public void setString(int index) { - resetIndices(); - this.stringIndex = index; - } - - public void setDecimal(int index) { - resetIndices(); - this.decimalIndex = index; - } + public VectorHashKeyWrapperBatch(int keyCount) { + super(keyCount); } /** @@ -80,26 +49,6 @@ public class VectorHashKeyWrapperBatch { private VectorExpression[] keyExpressions; /** - * indices of LONG primitive keys. - */ - private int[] longIndices; - - /** - * indices of DOUBLE primitive keys. - */ - private int[] doubleIndices; - - /** - * indices of string (byte[]) primitive keys. - */ - private int[] stringIndices; - - /** - * indices of decimal primitive keys. - */ - private int[] decimalIndices; - - /** * Pre-allocated batch size vector of keys wrappers. * N.B. these keys are **mutable** and should never be used in a HashMap. * Always clone the key wrapper to obtain an immutable keywrapper suitable @@ -108,11 +57,6 @@ public class VectorHashKeyWrapperBatch { private VectorHashKeyWrapper[] vectorHashKeyWrappers; /** - * Lookup vector to map from key index to primitive type index. - */ - private KeyLookupHelper[] indexLookup; - - /** * The fixed size of the key wrappers. */ private int keysFixedSize; @@ -567,53 +511,17 @@ public class VectorHashKeyWrapperBatch { */ public static VectorHashKeyWrapperBatch compileKeyWrapperBatch(VectorExpression[] keyExpressions) throws HiveException { - VectorHashKeyWrapperBatch compiledKeyWrapperBatch = new VectorHashKeyWrapperBatch(); + VectorHashKeyWrapperBatch compiledKeyWrapperBatch = new VectorHashKeyWrapperBatch(keyExpressions.length); compiledKeyWrapperBatch.keyExpressions = keyExpressions; compiledKeyWrapperBatch.keysFixedSize = 0; - // We'll overallocate and then shrink the array for each type - int[] longIndices = new int[keyExpressions.length]; - int longIndicesIndex = 0; - int[] doubleIndices = new int[keyExpressions.length]; - int doubleIndicesIndex = 0; - int[] stringIndices = new int[keyExpressions.length]; - int stringIndicesIndex = 0; - int[] decimalIndices = new int[keyExpressions.length]; - int decimalIndicesIndex = 0; - KeyLookupHelper[] indexLookup = new KeyLookupHelper[keyExpressions.length]; - // Inspect the output type of each key expression. for(int i=0; i < keyExpressions.length; ++i) { - indexLookup[i] = new KeyLookupHelper(); - String outputType = keyExpressions[i].getOutputType(); - if (VectorizationContext.isIntFamily(outputType) || - VectorizationContext.isDatetimeFamily(outputType)) { - longIndices[longIndicesIndex] = i; - indexLookup[i].setLong(longIndicesIndex); - ++longIndicesIndex; - } else if (VectorizationContext.isFloatFamily(outputType)) { - doubleIndices[doubleIndicesIndex] = i; - indexLookup[i].setDouble(doubleIndicesIndex); - ++doubleIndicesIndex; - } else if (VectorizationContext.isStringFamily(outputType)) { - stringIndices[stringIndicesIndex]= i; - indexLookup[i].setString(stringIndicesIndex); - ++stringIndicesIndex; - } else if (VectorizationContext.isDecimalFamily(outputType)) { - decimalIndices[decimalIndicesIndex]= i; - indexLookup[i].setDecimal(decimalIndicesIndex); - ++decimalIndicesIndex; - } - else { - throw new HiveException("Unsuported vector output type: " + outputType); - } - } - compiledKeyWrapperBatch.indexLookup = indexLookup; - compiledKeyWrapperBatch.longIndices = Arrays.copyOf(longIndices, longIndicesIndex); - compiledKeyWrapperBatch.doubleIndices = Arrays.copyOf(doubleIndices, doubleIndicesIndex); - compiledKeyWrapperBatch.stringIndices = Arrays.copyOf(stringIndices, stringIndicesIndex); - compiledKeyWrapperBatch.decimalIndices = Arrays.copyOf(decimalIndices, decimalIndicesIndex); + compiledKeyWrapperBatch.addKey(keyExpressions[i].getOutputType()); + } + compiledKeyWrapperBatch.finishAdding(); + compiledKeyWrapperBatch.vectorHashKeyWrappers = new VectorHashKeyWrapper[VectorizedRowBatch.DEFAULT_SIZE]; for(int i=0;i<VectorizedRowBatch.DEFAULT_SIZE; ++i) { @@ -632,11 +540,11 @@ public class VectorHashKeyWrapperBatch { model.memoryAlign()); // Now add the key wrapper arrays - compiledKeyWrapperBatch.keysFixedSize += model.lengthForLongArrayOfSize(longIndicesIndex); - compiledKeyWrapperBatch.keysFixedSize += model.lengthForDoubleArrayOfSize(doubleIndicesIndex); - compiledKeyWrapperBatch.keysFixedSize += model.lengthForObjectArrayOfSize(stringIndicesIndex); - compiledKeyWrapperBatch.keysFixedSize += model.lengthForObjectArrayOfSize(decimalIndicesIndex); - compiledKeyWrapperBatch.keysFixedSize += model.lengthForIntArrayOfSize(longIndicesIndex) * 2; + compiledKeyWrapperBatch.keysFixedSize += model.lengthForLongArrayOfSize(compiledKeyWrapperBatch.longIndices.length); + compiledKeyWrapperBatch.keysFixedSize += model.lengthForDoubleArrayOfSize(compiledKeyWrapperBatch.doubleIndices.length); + compiledKeyWrapperBatch.keysFixedSize += model.lengthForObjectArrayOfSize(compiledKeyWrapperBatch.stringIndices.length); + compiledKeyWrapperBatch.keysFixedSize += model.lengthForObjectArrayOfSize(compiledKeyWrapperBatch.decimalIndices.length); + compiledKeyWrapperBatch.keysFixedSize += model.lengthForIntArrayOfSize(compiledKeyWrapperBatch.longIndices.length) * 2; compiledKeyWrapperBatch.keysFixedSize += model.lengthForBooleanArrayOfSize(keyExpressions.length); Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java?rev=1624140&r1=1624139&r2=1624140&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java Wed Sep 10 21:41:16 2014 @@ -44,6 +44,7 @@ import org.apache.hadoop.hive.ql.exec.UD import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.InputExpressionType; import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.Mode; import org.apache.hadoop.hive.ql.exec.vector.expressions.*; +import org.apache.hadoop.hive.ql.exec.vector.AggregateDefinition; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFAvgDecimal; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCount; @@ -83,6 +84,7 @@ import org.apache.hadoop.hive.ql.plan.Ex import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeNullDesc; +import org.apache.hadoop.hive.ql.plan.GroupByDesc; import org.apache.hadoop.hive.ql.udf.SettableUDF; import org.apache.hadoop.hive.ql.udf.UDFConv; import org.apache.hadoop.hive.ql.udf.UDFHex; @@ -198,7 +200,8 @@ public class VectorizationContext { protected int getInputColumnIndex(String name) { if (!columnMap.containsKey(name)) { - LOG.error(String.format("The column %s is not in the vectorization context column map.", name)); + LOG.error(String.format("The column %s is not in the vectorization context column map %s.", + name, columnMap.toString())); } return columnMap.get(name); } @@ -1880,50 +1883,55 @@ public class VectorizationContext { } } + // TODO: When we support vectorized STRUCTs and can handle more in the reduce-side (MERGEPARTIAL): + // TODO: Write reduce-side versions of AVG. Currently, only map-side (HASH) versions are in table. + // TODO: And, investigate if different reduce-side versions are needed for var* and std*, or if map-side aggregate can be used.. Right now they are conservatively + // marked map-side (HASH). static ArrayList<AggregateDefinition> aggregatesDefinition = new ArrayList<AggregateDefinition>() {{ - add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFMinLong.class)); - add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFMinDouble.class)); - add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, VectorUDAFMinString.class)); - add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFMinDecimal.class)); - add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFMaxLong.class)); - add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFMaxDouble.class)); - add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, VectorUDAFMaxString.class)); - add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFMaxDecimal.class)); - add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.NONE, VectorUDAFCountStar.class)); - add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFCount.class)); - add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFCount.class)); - add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, VectorUDAFCount.class)); - add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFCount.class)); - add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFSumLong.class)); - add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFSumDouble.class)); - add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFSumDecimal.class)); - add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFAvgLong.class)); - add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFAvgDouble.class)); - add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFAvgDecimal.class)); - add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFVarPopLong.class)); - add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFVarPopLong.class)); - add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFVarPopDouble.class)); - add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFVarPopDouble.class)); - add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFVarPopDecimal.class)); - add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFVarPopDecimal.class)); - add(new AggregateDefinition("var_samp", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFVarSampLong.class)); - add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFVarSampDouble.class)); - add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFVarSampDecimal.class)); - add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFStdPopLong.class)); - add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFStdPopLong.class)); - add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFStdPopLong.class)); - add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFStdPopDouble.class)); - add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFStdPopDouble.class)); - add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFStdPopDouble.class)); - add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFStdPopDecimal.class)); - add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFStdPopDecimal.class)); - add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFStdPopDecimal.class)); - add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFStdSampLong.class)); - add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFStdSampDouble.class)); - add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFStdSampDecimal.class)); + add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, null, VectorUDAFMinLong.class)); + add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFMinDouble.class)); + add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null, VectorUDAFMinString.class)); + add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFMinDecimal.class)); + add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, null, VectorUDAFMaxLong.class)); + add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFMaxDouble.class)); + add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null, VectorUDAFMaxString.class)); + add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFMaxDecimal.class)); + add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.NONE, GroupByDesc.Mode.HASH, VectorUDAFCountStar.class)); + add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); + add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.MERGEPARTIAL, VectorUDAFSumLong.class)); + add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); + add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); + add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); + add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, null, VectorUDAFSumLong.class)); + add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFSumDouble.class)); + add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFSumDecimal.class)); + add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFAvgLong.class)); + add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFAvgDouble.class)); + add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFAvgDecimal.class)); + add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopLong.class)); + add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopLong.class)); + add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopDouble.class)); + add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopDouble.class)); + add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarPopDecimal.class)); + add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarPopDecimal.class)); + add(new AggregateDefinition("var_samp", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarSampLong.class)); + add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarSampDouble.class)); + add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarSampDecimal.class)); + add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class)); + add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class)); + add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class)); + add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class)); + add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class)); + add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class)); + add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class)); + add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class)); + add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class)); + add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdSampLong.class)); + add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdSampDouble.class)); + add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdSampDecimal.class)); }}; - public VectorAggregateExpression getAggregatorExpression(AggregationDesc desc) + public VectorAggregateExpression getAggregatorExpression(AggregationDesc desc, boolean isReduce) throws HiveException { ArrayList<ExprNodeDesc> paramDescList = desc.getParameters(); @@ -1948,8 +1956,15 @@ public class VectorizationContext { for (AggregateDefinition aggDef : aggregatesDefinition) { if (aggregateName.equalsIgnoreCase(aggDef.getName()) && ((aggDef.getType() == VectorExpressionDescriptor.ArgumentType.NONE && - inputType == VectorExpressionDescriptor.ArgumentType.NONE) || + inputType == VectorExpressionDescriptor.ArgumentType.NONE) || (aggDef.getType().isSameTypeOrFamily(inputType)))) { + + if (aggDef.getMode() == GroupByDesc.Mode.HASH && isReduce) { + continue; + } else if (aggDef.getMode() == GroupByDesc.Mode.MERGEPARTIAL && !isReduce) { + continue; + } + Class<? extends VectorAggregateExpression> aggClass = aggDef.getAggClass(); try { @@ -1967,7 +1982,7 @@ public class VectorizationContext { } throw new HiveException("Vector aggregate not implemented: \"" + aggregateName + - "\" for type: \"" + inputType.name() + ""); + "\" for type: \"" + inputType.name() + " (reduce-side = " + isReduce + ")"); } public Map<Integer, String> getOutputColumnTypeMap() { Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java?rev=1624140&r1=1624139&r2=1624140&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java Wed Sep 10 21:41:16 2014 @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.exec.Ut import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.IOPrepareCache; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.Deserializer; @@ -124,15 +125,20 @@ public class VectorizedRowBatchCtx { * Used by non-tablescan operators when they change the vectorization context * @param hiveConf * @param fileKey - * The key on which to retrieve the extra column mapping from the map scratch + * The key on which to retrieve the extra column mapping from the map/reduce scratch * @param rowOI * Object inspector that shapes the column types */ public void init(Configuration hiveConf, String fileKey, StructObjectInspector rowOI) { - columnTypeMap = Utilities - .getMapRedWork(hiveConf).getMapWork().getScratchColumnVectorTypes() - .get(fileKey); + MapredWork mapredWork = Utilities.getMapRedWork(hiveConf); + Map<String, Map<Integer, String>> scratchColumnVectorTypes; + if (mapredWork.getMapWork() != null) { + scratchColumnVectorTypes = mapredWork.getMapWork().getScratchColumnVectorTypes(); + } else { + scratchColumnVectorTypes = mapredWork.getReduceWork().getScratchColumnVectorTypes(); + } + columnTypeMap = scratchColumnVectorTypes.get(fileKey); this.rowOI= rowOI; this.rawRowOI = rowOI; } Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1624140&r1=1624139&r2=1624140&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Wed Sep 10 21:41:16 2014 @@ -2336,6 +2336,12 @@ class RecordReaderImpl implements Record return ((DecimalColumnStatistics) index).getMaximum(); } else if (index instanceof TimestampColumnStatistics) { return ((TimestampColumnStatistics) index).getMaximum(); + } else if (index instanceof BooleanColumnStatistics) { + if (((BooleanColumnStatistics)index).getTrueCount()!=0) { + return "true"; + } else { + return "false"; + } } else { return null; } @@ -2360,6 +2366,12 @@ class RecordReaderImpl implements Record return ((DecimalColumnStatistics) index).getMinimum(); } else if (index instanceof TimestampColumnStatistics) { return ((TimestampColumnStatistics) index).getMinimum(); + } else if (index instanceof BooleanColumnStatistics) { + if (((BooleanColumnStatistics)index).getFalseCount()!=0) { + return "false"; + } else { + return "true"; + } } else { return null; } Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java?rev=1624140&r1=1624139&r2=1624140&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java Wed Sep 10 21:41:16 2014 @@ -331,6 +331,8 @@ final class SearchArgumentImpl implement return PredicateLeaf.Type.TIMESTAMP; case DECIMAL: return PredicateLeaf.Type.DECIMAL; + case BOOLEAN: + return PredicateLeaf.Type.BOOLEAN; default: } } @@ -368,6 +370,7 @@ final class SearchArgumentImpl implement case DATE: case TIMESTAMP: case DECIMAL: + case BOOLEAN: return lit; default: throw new IllegalArgumentException("Unknown literal " + getType(lit)); @@ -963,7 +966,8 @@ final class SearchArgumentImpl implement literal instanceof DateWritable || literal instanceof Timestamp || literal instanceof HiveDecimal || - literal instanceof BigDecimal) { + literal instanceof BigDecimal || + literal instanceof Boolean) { return literal; } else if (literal instanceof HiveChar || literal instanceof HiveVarchar) { @@ -1000,6 +1004,8 @@ final class SearchArgumentImpl implement }else if (literal instanceof HiveDecimal || literal instanceof BigDecimal) { return PredicateLeaf.Type.DECIMAL; + } else if (literal instanceof Boolean) { + return PredicateLeaf.Type.BOOLEAN; } throw new IllegalArgumentException("Unknown type for literal " + literal); } Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java?rev=1624140&r1=1624139&r2=1624140&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java Wed Sep 10 21:41:16 2014 @@ -364,8 +364,10 @@ abstract public class AbstractSMBJoinPro for (int pos = 0; pos < sortCols.size(); pos++) { Order o = sortCols.get(pos); - if (o.getOrder() != sortColumnsFirstPartition.get(pos).getOrder()) { - return false; + if (pos < sortColumnsFirstPartition.size()) { + if (o.getOrder() != sortColumnsFirstPartition.get(pos).getOrder()) { + return false; + } } sortColNames.add(o.getCol()); } Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java?rev=1624140&r1=1624139&r2=1624140&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java Wed Sep 10 21:41:16 2014 @@ -38,9 +38,11 @@ import org.apache.hadoop.hive.ql.exec.mr import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.exec.vector.VectorExtractOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor; +import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; +import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; @@ -61,6 +63,7 @@ import org.apache.hadoop.hive.ql.plan.Ba import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.GroupByDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredWork; @@ -71,6 +74,7 @@ import org.apache.hadoop.hive.ql.plan.SM import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.plan.VectorGroupByDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.ql.udf.UDFAcos; import org.apache.hadoop.hive.ql.udf.UDFAsin; @@ -290,23 +294,26 @@ public class Vectorizer implements Physi throws SemanticException { Task<? extends Serializable> currTask = (Task<? extends Serializable>) nd; if (currTask instanceof MapRedTask) { - convertMapWork(((MapRedTask) currTask).getWork().getMapWork()); + convertMapWork(((MapRedTask) currTask).getWork().getMapWork(), false); } else if (currTask instanceof TezTask) { TezWork work = ((TezTask) currTask).getWork(); for (BaseWork w: work.getAllWork()) { if (w instanceof MapWork) { - convertMapWork((MapWork)w); + convertMapWork((MapWork) w, true); } else if (w instanceof ReduceWork) { // We are only vectorizing Reduce under Tez. - convertReduceWork((ReduceWork)w); + if (HiveConf.getBoolVar(pctx.getConf(), + HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_ENABLED)) { + convertReduceWork((ReduceWork) w); + } } } } return null; } - private void convertMapWork(MapWork mapWork) throws SemanticException { - boolean ret = validateMapWork(mapWork); + private void convertMapWork(MapWork mapWork, boolean isTez) throws SemanticException { + boolean ret = validateMapWork(mapWork, isTez); if (ret) { vectorizeMapWork(mapWork); } @@ -319,7 +326,8 @@ public class Vectorizer implements Physi + ReduceSinkOperator.getOperatorName()), np); } - private boolean validateMapWork(MapWork mapWork) throws SemanticException { + private boolean validateMapWork(MapWork mapWork, boolean isTez) throws SemanticException { + LOG.info("Validating MapWork..."); // Validate the input format for (String path : mapWork.getPathToPartitionInfo().keySet()) { @@ -333,7 +341,7 @@ public class Vectorizer implements Physi } } Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>(); - MapWorkValidationNodeProcessor vnp = new MapWorkValidationNodeProcessor(); + MapWorkValidationNodeProcessor vnp = new MapWorkValidationNodeProcessor(isTez); addMapWorkRules(opRules, vnp); Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null); GraphWalker ogw = new DefaultGraphWalker(disp); @@ -417,9 +425,12 @@ public class Vectorizer implements Physi private void addReduceWorkRules(Map<Rule, NodeProcessor> opRules, NodeProcessor np) { opRules.put(new RuleRegExp("R1", ExtractOperator.getOperatorName() + ".*"), np); opRules.put(new RuleRegExp("R2", GroupByOperator.getOperatorName() + ".*"), np); + opRules.put(new RuleRegExp("R3", SelectOperator.getOperatorName() + ".*"), np); } private boolean validateReduceWork(ReduceWork reduceWork) throws SemanticException { + LOG.info("Validating ReduceWork..."); + // Validate input to ReduceWork. if (!getOnlyStructObjectInspectors(reduceWork)) { return false; @@ -487,16 +498,21 @@ public class Vectorizer implements Physi class MapWorkValidationNodeProcessor implements NodeProcessor { + private boolean isTez; + + public MapWorkValidationNodeProcessor(boolean isTez) { + this.isTez = isTez; + } + @Override public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { for (Node n : stack) { Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) n; - if ((op.getType().equals(OperatorType.REDUCESINK) || op.getType().equals(OperatorType.FILESINK)) && - op.getParentOperators().get(0).getType().equals(OperatorType.GROUPBY)) { + if (nonVectorizableChildOfGroupBy(op)) { return new Boolean(true); } - boolean ret = validateMapWorkOperator(op); + boolean ret = validateMapWorkOperator(op, isTez); if (!ret) { LOG.info("MapWork Operator: " + op.getName() + " could not be vectorized."); return new Boolean(false); @@ -513,6 +529,9 @@ public class Vectorizer implements Physi Object... nodeOutputs) throws SemanticException { for (Node n : stack) { Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) n; + if (nonVectorizableChildOfGroupBy(op)) { + return new Boolean(true); + } boolean ret = validateReduceWorkOperator(op); if (!ret) { LOG.info("ReduceWork Operator: " + op.getName() + " could not be vectorized."); @@ -579,21 +598,6 @@ public class Vectorizer implements Physi return vContext; } - public Boolean nonVectorizableChildOfGroupBy(Operator<? extends OperatorDesc> op) { - Operator<? extends OperatorDesc> currentOp = op; - while (currentOp.getParentOperators().size() > 0) { - currentOp = currentOp.getParentOperators().get(0); - if (currentOp.getType().equals(OperatorType.GROUPBY)) { - // No need to vectorize - if (!opsDone.contains(op)) { - opsDone.add(op); - } - return true; - } - } - return false; - } - public Operator<? extends OperatorDesc> doVectorize(Operator<? extends OperatorDesc> op, VectorizationContext vContext) throws SemanticException { Operator<? extends OperatorDesc> vectorOp = op; @@ -665,9 +669,13 @@ public class Vectorizer implements Physi assert vContext != null; - // Currently, Vectorized GROUPBY outputs rows, not vectorized row batchs. So, don't vectorize - // any operators below GROUPBY. + // When Vectorized GROUPBY outputs rows instead of vectorized row batchs, we don't + // vectorize the operators below it. if (nonVectorizableChildOfGroupBy(op)) { + // No need to vectorize + if (!opsDone.contains(op)) { + opsDone.add(op); + } return null; } @@ -719,13 +727,22 @@ public class Vectorizer implements Physi assert vContext != null; - // Currently, Vectorized GROUPBY outputs rows, not vectorized row batchs. So, don't vectorize - // any operators below GROUPBY. + // When Vectorized GROUPBY outputs rows instead of vectorized row batchs, we don't + // vectorize the operators below it. if (nonVectorizableChildOfGroupBy(op)) { + // No need to vectorize + if (!opsDone.contains(op)) { + opsDone.add(op); + } return null; } Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext); + if (vectorOp instanceof VectorGroupByOperator) { + VectorGroupByOperator groupBy = (VectorGroupByOperator) vectorOp; + VectorGroupByDesc vectorDesc = groupBy.getConf().getVectorDesc(); + vectorDesc.setVectorGroupBatches(true); + } if (saveRootVectorOp && op != vectorOp) { rootVectorOp = vectorOp; } @@ -772,7 +789,7 @@ public class Vectorizer implements Physi return pctx; } - boolean validateMapWorkOperator(Operator<? extends OperatorDesc> op) { + boolean validateMapWorkOperator(Operator<? extends OperatorDesc> op, boolean isTez) { boolean ret = false; switch (op.getType()) { case MAPJOIN: @@ -783,7 +800,7 @@ public class Vectorizer implements Physi } break; case GROUPBY: - ret = validateGroupByOperator((GroupByOperator) op); + ret = validateGroupByOperator((GroupByOperator) op, false, isTez); break; case FILTER: ret = validateFilterOperator((FilterOperator) op); @@ -814,6 +831,17 @@ public class Vectorizer implements Physi case EXTRACT: ret = validateExtractOperator((ExtractOperator) op); break; + case MAPJOIN: + // Does MAPJOIN actually get planned in Reduce? + if (op instanceof MapJoinOperator) { + ret = validateMapJoinOperator((MapJoinOperator) op); + } else if (op instanceof SMBMapJoinOperator) { + ret = validateSMBMapJoinOperator((SMBMapJoinOperator) op); + } + break; + case GROUPBY: + ret = validateGroupByOperator((GroupByOperator) op, true, true); + break; case FILTER: ret = validateFilterOperator((FilterOperator) op); break; @@ -836,6 +864,23 @@ public class Vectorizer implements Physi return ret; } + public Boolean nonVectorizableChildOfGroupBy(Operator<? extends OperatorDesc> op) { + Operator<? extends OperatorDesc> currentOp = op; + while (currentOp.getParentOperators().size() > 0) { + currentOp = currentOp.getParentOperators().get(0); + if (currentOp.getType().equals(OperatorType.GROUPBY)) { + GroupByDesc desc = (GroupByDesc)currentOp.getConf(); + boolean isVectorOutput = desc.getVectorDesc().isVectorOutput(); + if (isVectorOutput) { + // This GROUP BY does vectorize its output. + return false; + } + return true; + } + } + return false; + } + private boolean validateSMBMapJoinOperator(SMBMapJoinOperator op) { SMBJoinDesc desc = op.getConf(); // Validation is the same as for map join, since the 'small' tables are not vectorized @@ -886,16 +931,57 @@ public class Vectorizer implements Physi return validateExprNodeDesc(desc, VectorExpressionDescriptor.Mode.FILTER); } - private boolean validateGroupByOperator(GroupByOperator op) { - if (op.getConf().isGroupingSetsPresent()) { - LOG.warn("Grouping sets not supported in vector mode"); + private boolean validateGroupByOperator(GroupByOperator op, boolean isReduce, boolean isTez) { + GroupByDesc desc = op.getConf(); + VectorGroupByDesc vectorDesc = desc.getVectorDesc(); + + if (desc.isGroupingSetsPresent()) { + LOG.info("Grouping sets not supported in vector mode"); return false; } - boolean ret = validateExprNodeDesc(op.getConf().getKeys()); + boolean ret = validateExprNodeDesc(desc.getKeys()); if (!ret) { return false; } - return validateAggregationDesc(op.getConf().getAggregators()); + ret = validateAggregationDesc(desc.getAggregators(), isReduce); + if (!ret) { + return false; + } + boolean isVectorOutput = isTez && aggregatorsOutputIsPrimitive(desc.getAggregators(), isReduce); + vectorDesc.setVectorOutput(isVectorOutput); + if (isReduce) { + if (desc.isDistinct()) { + LOG.info("Distinct not supported in reduce vector mode"); + return false; + } + // Sort-based GroupBy? + if (desc.getMode() != GroupByDesc.Mode.COMPLETE && + desc.getMode() != GroupByDesc.Mode.PARTIAL1 && + desc.getMode() != GroupByDesc.Mode.PARTIAL2 && + desc.getMode() != GroupByDesc.Mode.MERGEPARTIAL) { + LOG.info("Reduce vector mode not supported when input for GROUP BY not sorted"); + return false; + } + LOG.info("Reduce GROUP BY mode is " + desc.getMode().name()); + if (desc.getGroupKeyNotReductionKey()) { + LOG.info("Reduce vector mode not supported when group key is not reduction key"); + return false; + } + if (!isVectorOutput) { + LOG.info("Reduce vector mode only supported when aggregate outputs are primitive types"); + return false; + } + if (desc.getKeys().size() > 0) { + LOG.info("Reduce-side GROUP BY will process key groups"); + vectorDesc.setVectorGroupBatches(true); + } else { + LOG.info("Reduce-side GROUP BY will do global aggregation"); + } + vectorDesc.setIsReduce(true); + } else { + LOG.info("Downstream operators of map-side GROUP BY will be vectorized: " + isVectorOutput); + } + return true; } private boolean validateExtractOperator(ExtractOperator op) { @@ -930,9 +1016,9 @@ public class Vectorizer implements Physi return true; } - private boolean validateAggregationDesc(List<AggregationDesc> descs) { + private boolean validateAggregationDesc(List<AggregationDesc> descs, boolean isReduce) { for (AggregationDesc d : descs) { - boolean ret = validateAggregationDesc(d); + boolean ret = validateAggregationDesc(d, isReduce); if (!ret) { return false; } @@ -952,9 +1038,7 @@ public class Vectorizer implements Physi String typeName = desc.getTypeInfo().getTypeName(); boolean ret = validateDataType(typeName); if (!ret) { - if (LOG.isDebugEnabled()) { - LOG.debug("Cannot vectorize " + desc.toString() + " of type " + typeName); - } + LOG.info("Cannot vectorize " + desc.toString() + " of type " + typeName); return false; } if (desc instanceof ExprNodeGenericFuncDesc) { @@ -987,12 +1071,11 @@ public class Vectorizer implements Physi VectorizationContext vc = new ValidatorVectorizationContext(); if (vc.getVectorExpression(desc, mode) == null) { // TODO: this cannot happen - VectorizationContext throws in such cases. + LOG.info("getVectorExpression returned null"); return false; } } catch (Exception e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Failed to vectorize", e); - } + LOG.info("Failed to vectorize", e); return false; } return true; @@ -1011,16 +1094,56 @@ public class Vectorizer implements Physi } } - private boolean validateAggregationDesc(AggregationDesc aggDesc) { + private boolean validateAggregationDesc(AggregationDesc aggDesc, boolean isReduce) { if (!supportedAggregationUdfs.contains(aggDesc.getGenericUDAFName().toLowerCase())) { return false; } if (aggDesc.getParameters() != null) { return validateExprNodeDesc(aggDesc.getParameters()); } + // See if we can vectorize the aggregation. + try { + VectorizationContext vc = new ValidatorVectorizationContext(); + if (vc.getAggregatorExpression(aggDesc, isReduce) == null) { + // TODO: this cannot happen - VectorizationContext throws in such cases. + LOG.info("getAggregatorExpression returned null"); + return false; + } + } catch (Exception e) { + LOG.info("Failed to vectorize", e); + return false; + } + return true; + } + + private boolean aggregatorsOutputIsPrimitive(List<AggregationDesc> descs, boolean isReduce) { + for (AggregationDesc d : descs) { + boolean ret = aggregatorsOutputIsPrimitive(d, isReduce); + if (!ret) { + return false; + } + } return true; } + private boolean aggregatorsOutputIsPrimitive(AggregationDesc aggDesc, boolean isReduce) { + VectorizationContext vc = new ValidatorVectorizationContext(); + VectorAggregateExpression vectorAggrExpr; + try { + vectorAggrExpr = vc.getAggregatorExpression(aggDesc, isReduce); + } catch (Exception e) { + // We should have already attempted to vectorize in validateAggregationDesc. + LOG.info("Vectorization of aggreation should have succeeded ", e); + return false; + } + + ObjectInspector outputObjInspector = vectorAggrExpr.getOutputObjectInspector(); + if (outputObjInspector.getCategory() == ObjectInspector.Category.PRIMITIVE) { + return true; + } + return false; + } + private boolean validateDataType(String type) { return supportedDataTypesPattern.matcher(type.toLowerCase()).matches(); }
