HIVE-13566: Auto-gather column stats - phase 1 (Pengcheng Xiong, reviewed by Ashutosh Chauhan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ec4b936e Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ec4b936e Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ec4b936e Branch: refs/heads/master Commit: ec4b936e66db559cd7226f66d416dad02864530f Parents: 2ed4783 Author: Pengcheng Xiong <[email protected]> Authored: Mon May 23 20:22:33 2016 -0700 Committer: Pengcheng Xiong <[email protected]> Committed: Mon May 23 20:22:33 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/hive/beeline/BeeLine.java | 14 +- .../java/org/apache/hive/beeline/Commands.java | 72 +- .../org/apache/hadoop/hive/conf/HiveConf.java | 4 +- .../hive/beeline/TestBeeLineWithArgs.java | 24 +- jdbc/src/java/org/apache/hive/jdbc/Utils.java | 54 +- .../apache/hadoop/hive/ql/exec/Operator.java | 6 +- .../hadoop/hive/ql/exec/TextRecordReader.java | 4 +- .../hadoop/hive/ql/exec/TextRecordWriter.java | 4 +- .../apache/hadoop/hive/ql/exec/Utilities.java | 24 - .../hadoop/hive/ql/exec/tez/TezProcessor.java | 3 - .../apache/hadoop/hive/ql/metadata/Hive.java | 10 - .../physical/GenMRSkewJoinProcessor.java | 17 +- .../physical/GenSparkSkewJoinProcessor.java | 18 +- .../physical/SparkMapJoinResolver.java | 4 +- .../ql/parse/ColumnStatsAutoGatherContext.java | 291 ++ .../ql/parse/ColumnStatsSemanticAnalyzer.java | 71 +- .../hadoop/hive/ql/parse/ParseContext.java | 12 + .../hadoop/hive/ql/parse/QBParseInfo.java | 20 +- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 71 +- .../hadoop/hive/ql/parse/TaskCompiler.java | 63 +- .../ql/plan/ConditionalResolverSkewJoin.java | 10 +- .../queries/clientpositive/autoColumnStats_1.q | 192 ++ .../queries/clientpositive/autoColumnStats_2.q | 214 ++ .../queries/clientpositive/autoColumnStats_3.q | 67 + .../queries/clientpositive/autoColumnStats_4.q | 20 + .../queries/clientpositive/autoColumnStats_5.q | 47 + .../queries/clientpositive/autoColumnStats_6.q | 41 + .../queries/clientpositive/autoColumnStats_7.q | 19 + .../queries/clientpositive/autoColumnStats_8.q | 27 + .../queries/clientpositive/autoColumnStats_9.q | 22 + .../clientpositive/autoColumnStats_1.q.out | 1379 +++++++++ .../clientpositive/autoColumnStats_2.q.out | 1500 ++++++++++ .../clientpositive/autoColumnStats_3.q.out | 420 +++ .../clientpositive/autoColumnStats_4.q.out | 260 ++ .../clientpositive/autoColumnStats_5.q.out | 664 +++++ .../clientpositive/autoColumnStats_6.q.out | 299 ++ .../clientpositive/autoColumnStats_7.q.out | 216 ++ .../clientpositive/autoColumnStats_8.q.out | 2624 ++++++++++++++++++ .../clientpositive/autoColumnStats_9.q.out | 268 ++ 39 files changed, 8845 insertions(+), 230 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/beeline/src/java/org/apache/hive/beeline/BeeLine.java ---------------------------------------------------------------------- diff --git a/beeline/src/java/org/apache/hive/beeline/BeeLine.java b/beeline/src/java/org/apache/hive/beeline/BeeLine.java index 9138613..734eeb8 100644 --- a/beeline/src/java/org/apache/hive/beeline/BeeLine.java +++ b/beeline/src/java/org/apache/hive/beeline/BeeLine.java @@ -93,9 +93,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.io.IOUtils; import org.apache.hive.beeline.cli.CliOptionsProcessor; -import org.apache.hive.jdbc.Utils; -import org.apache.hive.jdbc.Utils.JdbcConnectionParams; - /** * A console SQL shell with command completion. * <p> @@ -142,6 +139,7 @@ public class BeeLine implements Closeable { private static final Options options = new Options(); public static final String BEELINE_DEFAULT_JDBC_DRIVER = "org.apache.hive.jdbc.HiveDriver"; + public static final String BEELINE_DEFAULT_JDBC_URL = "jdbc:hive2://"; public static final String DEFAULT_DATABASE_NAME = "default"; private static final String SCRIPT_OUTPUT_PREFIX = ">>>"; @@ -768,14 +766,6 @@ public class BeeLine implements Closeable { */ if (url != null) { - if (user == null) { - user = Utils.parsePropertyFromUrl(url, JdbcConnectionParams.AUTH_USER); - } - - if (pass == null) { - pass = Utils.parsePropertyFromUrl(url, JdbcConnectionParams.AUTH_PASSWD); - } - String com = constructCmd(url, user, pass, driver, false); String comForDebug = constructCmd(url, user, pass, driver, true); debug("issuing: " + comForDebug); @@ -904,7 +894,7 @@ public class BeeLine implements Closeable { } private int embeddedConnect() { - if (!execCommandWithPrefix("!connect " + Utils.URL_PREFIX + " '' ''")) { + if (!execCommandWithPrefix("!connect " + BEELINE_DEFAULT_JDBC_URL + " '' ''")) { return ERRNO_OTHER; } else { return ERRNO_OK; http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/beeline/src/java/org/apache/hive/beeline/Commands.java ---------------------------------------------------------------------- diff --git a/beeline/src/java/org/apache/hive/beeline/Commands.java b/beeline/src/java/org/apache/hive/beeline/Commands.java index 3a204c0..80703ff 100644 --- a/beeline/src/java/org/apache/hive/beeline/Commands.java +++ b/beeline/src/java/org/apache/hive/beeline/Commands.java @@ -61,8 +61,6 @@ import java.util.TreeSet; import org.apache.hadoop.hive.common.cli.ShellCmdExecutor; import org.apache.hive.jdbc.HiveStatement; -import org.apache.hive.jdbc.Utils; -import org.apache.hive.jdbc.Utils.JdbcConnectionParams; public class Commands { @@ -1316,41 +1314,18 @@ public class Commands { Properties props = new Properties(); if (url != null) { String saveUrl = getUrlToUse(url); - props.setProperty(JdbcConnectionParams.PROPERTY_URL, url); + props.setProperty("url", saveUrl); } - - String value = null; if (driver != null) { - props.setProperty(JdbcConnectionParams.PROPERTY_DRIVER, driver); - } else { - value = Utils.parsePropertyFromUrl(url, JdbcConnectionParams.PROPERTY_DRIVER); - if (value != null) { - props.setProperty(JdbcConnectionParams.PROPERTY_DRIVER, value); - } + props.setProperty("driver", driver); } - if (user != null) { - props.setProperty(JdbcConnectionParams.AUTH_USER, user); - } else { - value = Utils.parsePropertyFromUrl(url, JdbcConnectionParams.AUTH_USER); - if (value != null) { - props.setProperty(JdbcConnectionParams.AUTH_USER, value); - } + props.setProperty("user", user); } - if (pass != null) { - props.setProperty(JdbcConnectionParams.AUTH_PASSWD, pass); - } else { - value = Utils.parsePropertyFromUrl(url, JdbcConnectionParams.AUTH_PASSWD); - if (value != null) { - props.setProperty(JdbcConnectionParams.AUTH_PASSWD, value); - } + props.setProperty("password", pass); } - value = Utils.parsePropertyFromUrl(url, JdbcConnectionParams.AUTH_TYPE); - if (value != null) { - props.setProperty(JdbcConnectionParams.AUTH_TYPE, value); - } return connect(props); } @@ -1403,25 +1378,26 @@ public class Commands { public boolean connect(Properties props) throws IOException { String url = getProperty(props, new String[] { - JdbcConnectionParams.PROPERTY_URL, + "url", "javax.jdo.option.ConnectionURL", "ConnectionURL", }); String driver = getProperty(props, new String[] { - JdbcConnectionParams.PROPERTY_DRIVER, + "driver", "javax.jdo.option.ConnectionDriverName", "ConnectionDriverName", }); String username = getProperty(props, new String[] { - JdbcConnectionParams.AUTH_USER, + "user", "javax.jdo.option.ConnectionUserName", "ConnectionUserName", }); String password = getProperty(props, new String[] { - JdbcConnectionParams.AUTH_PASSWD, + "password", "javax.jdo.option.ConnectionPassword", "ConnectionPassword", }); + String auth = getProperty(props, new String[] {"auth"}); if (url == null || url.length() == 0) { return beeLine.error("Property \"url\" is required"); @@ -1432,25 +1408,23 @@ public class Commands { } } - String auth = getProperty(props, new String[] {JdbcConnectionParams.AUTH_TYPE}); + beeLine.info("Connecting to " + url); + + if (username == null) { + username = beeLine.getConsoleReader().readLine("Enter username for " + url + ": "); + } + props.setProperty("user", username); + if (password == null) { + password = beeLine.getConsoleReader().readLine("Enter password for " + url + ": ", + new Character('*')); + } + props.setProperty("password", password); + if (auth == null) { auth = beeLine.getOpts().getAuthType(); - if (auth != null) { - props.setProperty(JdbcConnectionParams.AUTH_TYPE, auth); - } } - - beeLine.info("Connecting to " + url); - if (Utils.parsePropertyFromUrl(url, JdbcConnectionParams.AUTH_PRINCIPAL) == null) { - if (username == null) { - username = beeLine.getConsoleReader().readLine("Enter username for " + url + ": "); - } - props.setProperty(JdbcConnectionParams.AUTH_USER, username); - if (password == null) { - password = beeLine.getConsoleReader().readLine("Enter password for " + url + ": ", - new Character('*')); - } - props.setProperty(JdbcConnectionParams.AUTH_PASSWD, password); + if (auth != null) { + props.setProperty("auth", auth); } try { http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index c0843b9..ed20069 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1519,7 +1519,9 @@ public class HiveConf extends Configuration { // Statistics HIVESTATSAUTOGATHER("hive.stats.autogather", true, - "A flag to gather statistics automatically during the INSERT OVERWRITE command."), + "A flag to gather statistics (only basic) automatically during the INSERT OVERWRITE command."), + HIVESTATSCOLAUTOGATHER("hive.stats.column.autogather", false, + "A flag to gather column statistics automatically."), HIVESTATSDBCLASS("hive.stats.dbclass", "fs", new PatternSet("custom", "fs"), "The storage that stores temporary Hive statistics. In filesystem based statistics collection ('fs'), \n" + "each task writes statistics it has collected in a file on the filesystem, which will be aggregated \n" + http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java b/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java index ecfeddb..f9909ad 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java @@ -39,7 +39,6 @@ import java.util.List; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hive.jdbc.Utils; import org.apache.hive.jdbc.miniHS2.MiniHS2; import org.junit.AfterClass; import org.junit.Assert; @@ -176,7 +175,6 @@ public class TestBeeLineWithArgs { // Put the script content in a temp file File scriptFile = File.createTempFile(this.getClass().getSimpleName(), "temp"); - System.out.println("script file is " + scriptFile.getAbsolutePath()); scriptFile.deleteOnExit(); PrintStream os = new PrintStream(new FileOutputStream(scriptFile)); os.print(scriptText); @@ -657,7 +655,7 @@ public class TestBeeLineWithArgs { @Test public void testEmbeddedBeelineConnection() throws Throwable{ - String embeddedJdbcURL = Utils.URL_PREFIX+"/Default"; + String embeddedJdbcURL = BeeLine.BEELINE_DEFAULT_JDBC_URL+"/Default"; List<String> argList = getBaseArgs(embeddedJdbcURL); argList.add("--hivevar"); argList.add("DUMMY_TBL=embedded_table"); @@ -772,7 +770,7 @@ public class TestBeeLineWithArgs { @Test public void testEmbeddedBeelineOutputs() throws Throwable{ - String embeddedJdbcURL = Utils.URL_PREFIX+"/Default"; + String embeddedJdbcURL = BeeLine.BEELINE_DEFAULT_JDBC_URL+"/Default"; List<String> argList = getBaseArgs(embeddedJdbcURL); // Set to non-zk lock manager to avoid trying to connect to zookeeper final String SCRIPT_TEXT = @@ -845,22 +843,4 @@ public class TestBeeLineWithArgs { } - /** - * Attempt to execute a simple script file with the usage of user & password variables in URL. - * Test for presence of an expected pattern - * in the output (stdout or stderr), fail if not found - * Print PASSED or FAILED - */ - @Test - public void testConnectionWithURLParams() throws Throwable { - final String EXPECTED_PATTERN = " hivetest "; - List<String> argList = new ArrayList<String>(); - argList.add("-d"); - argList.add(BeeLine.BEELINE_DEFAULT_JDBC_DRIVER); - argList.add("-u"); - argList.add(miniHS2.getBaseJdbcURL() + ";user=hivetest;password=hive"); - String SCRIPT_TEXT = "select current_user();"; - - testScriptFile( SCRIPT_TEXT, EXPECTED_PATTERN, true, argList); - } } http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/jdbc/src/java/org/apache/hive/jdbc/Utils.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/Utils.java b/jdbc/src/java/org/apache/hive/jdbc/Utils.java index 7ea6309..42181d7 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/Utils.java +++ b/jdbc/src/java/org/apache/hive/jdbc/Utils.java @@ -37,12 +37,12 @@ import org.apache.http.cookie.Cookie; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class Utils { +class Utils { static final Logger LOG = LoggerFactory.getLogger(Utils.class.getName()); /** * The required prefix for the connection URL. */ - public static final String URL_PREFIX = "jdbc:hive2://"; + static final String URL_PREFIX = "jdbc:hive2://"; /** * If host is provided, without a port. @@ -63,7 +63,7 @@ public class Utils { static final String HIVE_SERVER2_RETRY_TRUE = "true"; static final String HIVE_SERVER2_RETRY_FALSE = "false"; - public static class JdbcConnectionParams { + static class JdbcConnectionParams { // Note on client side parameter naming convention: // Prefer using a shorter camelCase param name instead of using the same name as the // corresponding @@ -76,33 +76,31 @@ public class Utils { // Retry setting static final String RETRIES = "retries"; - public static final String AUTH_TYPE = "auth"; + static final String AUTH_TYPE = "auth"; // We're deprecating this variable's name. - public static final String AUTH_QOP_DEPRECATED = "sasl.qop"; - public static final String AUTH_QOP = "saslQop"; - public static final String AUTH_SIMPLE = "noSasl"; - public static final String AUTH_TOKEN = "delegationToken"; - public static final String AUTH_USER = "user"; - public static final String AUTH_PRINCIPAL = "principal"; - public static final String AUTH_PASSWD = "password"; - public static final String AUTH_KERBEROS_AUTH_TYPE = "kerberosAuthType"; - public static final String AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT = "fromSubject"; - public static final String ANONYMOUS_USER = "anonymous"; - public static final String ANONYMOUS_PASSWD = "anonymous"; - public static final String USE_SSL = "ssl"; - public static final String SSL_TRUST_STORE = "sslTrustStore"; - public static final String SSL_TRUST_STORE_PASSWORD = "trustStorePassword"; + static final String AUTH_QOP_DEPRECATED = "sasl.qop"; + static final String AUTH_QOP = "saslQop"; + static final String AUTH_SIMPLE = "noSasl"; + static final String AUTH_TOKEN = "delegationToken"; + static final String AUTH_USER = "user"; + static final String AUTH_PRINCIPAL = "principal"; + static final String AUTH_PASSWD = "password"; + static final String AUTH_KERBEROS_AUTH_TYPE = "kerberosAuthType"; + static final String AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT = "fromSubject"; + static final String ANONYMOUS_USER = "anonymous"; + static final String ANONYMOUS_PASSWD = "anonymous"; + static final String USE_SSL = "ssl"; + static final String SSL_TRUST_STORE = "sslTrustStore"; + static final String SSL_TRUST_STORE_PASSWORD = "trustStorePassword"; // We're deprecating the name and placement of this in the parsed map (from hive conf vars to // hive session vars). static final String TRANSPORT_MODE_DEPRECATED = "hive.server2.transport.mode"; - public static final String TRANSPORT_MODE = "transportMode"; + static final String TRANSPORT_MODE = "transportMode"; // We're deprecating the name and placement of this in the parsed map (from hive conf vars to // hive session vars). static final String HTTP_PATH_DEPRECATED = "hive.server2.thrift.http.path"; - public static final String HTTP_PATH = "httpPath"; - public static final String SERVICE_DISCOVERY_MODE = "serviceDiscoveryMode"; - public static final String PROPERTY_DRIVER = "driver"; - public static final String PROPERTY_URL = "url"; + static final String HTTP_PATH = "httpPath"; + static final String SERVICE_DISCOVERY_MODE = "serviceDiscoveryMode"; // Don't use dynamic service discovery static final String SERVICE_DISCOVERY_MODE_NONE = "none"; // Use ZooKeeper for indirection while using dynamic service discovery @@ -633,14 +631,4 @@ public class Utils { } return true; } - - public static String parsePropertyFromUrl(final String url, final String key) { - String[] tokens = url.split(";"); - for (String token : tokens) { - if (token.trim().startsWith(key.trim() + "=")) { - return token.trim().substring((key.trim() + "=").length()); - } - } - return null; - } } http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 00552a8..636f079 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -406,11 +406,7 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C } } if (asyncEx != null) { - if (asyncEx instanceof Exception) { - throw new HiveException("Async initialization failed", asyncEx); - } else { - throw (Error) asyncEx; - } + throw new HiveException("Async initialization failed", asyncEx); } completeInitializationOp(os); } http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java index 47ab9c2..8319f11 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java @@ -40,14 +40,12 @@ public class TextRecordReader implements RecordReader { private InputStream in; private Text row; private Configuration conf; - private boolean escape; public void initialize(InputStream in, Configuration conf, Properties tbl) throws IOException { lineReader = new LineReader(in, conf); this.in = in; this.conf = conf; - escape = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVESCRIPTESCAPE); } public Writable createRow() throws IOException { @@ -62,7 +60,7 @@ public class TextRecordReader implements RecordReader { int bytesConsumed = lineReader.readLine((Text) row); - if (escape) { + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVESCRIPTESCAPE)) { return HiveUtils.unescapeText((Text) row); } return bytesConsumed; http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordWriter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordWriter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordWriter.java index f15458d..10b4594 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordWriter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordWriter.java @@ -35,20 +35,18 @@ public class TextRecordWriter implements RecordWriter { private OutputStream out; private Configuration conf; - private boolean escape; public void initialize(OutputStream out, Configuration conf) throws IOException { this.out = out; this.conf = conf; - escape = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVESCRIPTESCAPE); } public void write(Writable row) throws IOException { Text text = (Text) row; Text escapeText = text; - if (escape) { + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVESCRIPTESCAPE)) { escapeText = HiveUtils.escapeText(text); } http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 8144c3b..7082931 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.exec; import java.util.ArrayList; import com.google.common.collect.Lists; import com.google.common.collect.Sets; - import java.beans.DefaultPersistenceDelegate; import java.beans.Encoder; import java.beans.Expression; @@ -97,7 +96,6 @@ import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -196,7 +194,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.KryoException; import com.google.common.base.Preconditions; /** @@ -439,7 +436,6 @@ public final class Utilities { throw new RuntimeException("Unknown work type: " + name); } } - gWorkMap.get(conf).put(path, gWork); } else if (LOG.isDebugEnabled()) { LOG.debug("Found plan in cache for name: " + name); @@ -450,16 +446,6 @@ public final class Utilities { LOG.debug("File not found: " + fnf.getMessage()); LOG.info("No plan file found: "+path); return null; - } catch (KryoException ke) { - Throwable cnfThrowable = findClassNotFoundException(ke); - if (LlapProxy.isDaemon() && (cnfThrowable != null)) { - LOG.error("Missing class \"" + cnfThrowable.getMessage() + "\". If this is a UDF and you " + - "are running LLAP, you may need to regenerate the llap startup script and restart " + - "llap with jars for your udf.", cnfThrowable); - throw new RuntimeException("Cannot find \"" + cnfThrowable.getMessage() + "\" You may" + - " need to regenerate the LLAP startup script and restart llap daemons.", cnfThrowable); - } - throw new RuntimeException(ke); } catch (Exception e) { String msg = "Failed to load plan: " + path + ": " + e; LOG.error(msg, e); @@ -474,16 +460,6 @@ public final class Utilities { } } - private static Throwable findClassNotFoundException(Throwable ke) { - while (ke != null) { - if (ke instanceof ClassNotFoundException) { - return ke; - } - ke = ke.getCause(); - } - return null; - } - public static void setWorkflowAdjacencies(Configuration conf, QueryPlan plan) { try { Graph stageGraph = plan.getQueryPlan().getStageGraph(); http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java index a33b6e2..c560f37 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -39,7 +39,6 @@ import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; import org.apache.tez.runtime.api.ProcessorContext; -import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.library.api.KeyValueWriter; /** @@ -179,8 +178,6 @@ public class TezProcessor extends AbstractLogicalIOProcessor { } finally { if (originalThrowable != null && originalThrowable instanceof Error) { LOG.error(StringUtils.stringifyException(originalThrowable)); - getContext().reportFailure(TaskFailureType.FATAL, originalThrowable, - "Cannot recover from this error"); throw new RuntimeException(originalThrowable); } http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index d9f58f2..3fa1233 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -2860,16 +2860,6 @@ private void constructOneLBLocationMap(FileStatus fSta, if (destIsSubDir) { FileStatus[] srcs = destFs.listStatus(srcf, FileUtils.HIDDEN_FILES_PATH_FILTER); - if (inheritPerms) { - try { - HdfsUtils.setFullFileStatus(conf, destStatus, destFs, destf, false); - } catch (IOException e) { - String msg = "Error setting permission of file " + destf; - LOG.error(msg); - throw new HiveException(msg, e); - } - } - List<Future<Void>> futures = new LinkedList<>(); final ExecutorService pool = Executors.newFixedThreadPool( conf.getIntVar(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT), http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java index 9fbbd4c..f41fa4e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java @@ -117,12 +117,6 @@ public final class GenMRSkewJoinProcessor { } List<Task<? extends Serializable>> children = currTask.getChildTasks(); - if (children != null && children.size() > 1) { - throw new SemanticException("Should not happened"); - } - - Task<? extends Serializable> child = - children != null && children.size() == 1 ? children.get(0) : null; Path baseTmpDir = parseCtx.getContext().getMRTmpPath(); @@ -347,13 +341,14 @@ public final class GenMRSkewJoinProcessor { tsk.addDependentTask(oldChild); } } - } - if (child != null) { - currTask.removeDependentTask(child); - listTasks.add(child); + currTask.setChildTasks(new ArrayList<Task<? extends Serializable>>()); + for (Task<? extends Serializable> oldChild : children) { + oldChild.getParentTasks().remove(currTask); + } + listTasks.addAll(children); } ConditionalResolverSkewJoinCtx context = - new ConditionalResolverSkewJoinCtx(bigKeysDirToTaskMap, child); + new ConditionalResolverSkewJoinCtx(bigKeysDirToTaskMap, children); ConditionalWork cndWork = new ConditionalWork(listWorks); ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork, parseCtx.getConf()); http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java index 11ec07a..ded9231 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java @@ -93,9 +93,6 @@ public class GenSparkSkewJoinProcessor { List<Task<? extends Serializable>> children = currTask.getChildTasks(); - Task<? extends Serializable> child = - children != null && children.size() == 1 ? children.get(0) : null; - Path baseTmpDir = parseCtx.getContext().getMRTmpPath(); JoinDesc joinDescriptor = joinOp.getConf(); @@ -334,14 +331,17 @@ public class GenSparkSkewJoinProcessor { tsk.addDependentTask(oldChild); } } - } - if (child != null) { - currTask.removeDependentTask(child); - listTasks.add(child); - listWorks.add(child.getWork()); + currTask.setChildTasks(new ArrayList<Task<? extends Serializable>>()); + for (Task<? extends Serializable> oldChild : children) { + oldChild.getParentTasks().remove(currTask); + } + listTasks.addAll(children); + for (Task<? extends Serializable> oldChild : children) { + listWorks.add(oldChild.getWork()); + } } ConditionalResolverSkewJoin.ConditionalResolverSkewJoinCtx context = - new ConditionalResolverSkewJoin.ConditionalResolverSkewJoinCtx(bigKeysDirToTaskMap, child); + new ConditionalResolverSkewJoin.ConditionalResolverSkewJoinCtx(bigKeysDirToTaskMap, children); ConditionalWork cndWork = new ConditionalWork(listWorks); ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork, parseCtx.getConf()); http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java index 8e56263..a3ec990 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java @@ -411,7 +411,9 @@ public class SparkMapJoinResolver implements PhysicalPlanResolver { context.setDirToTaskMap(newbigKeysDirToTaskMap); // update no skew task if (context.getNoSkewTask() != null && context.getNoSkewTask().equals(originalTask)) { - context.setNoSkewTask(newTask); + List<Task<? extends Serializable>> noSkewTask = new ArrayList<>(); + noSkewTask.add(newTask); + context.setNoSkewTask(noSkewTask); } } } http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsAutoGatherContext.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsAutoGatherContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsAutoGatherContext.java new file mode 100644 index 0000000..15a47dc --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsAutoGatherContext.java @@ -0,0 +1,291 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.RowSchema; +import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.AnalyzeRewriteContext; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.LoadFileDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.SelectDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +/** + * ColumnStatsAutoGatherContext: This is passed to the compiler when set + * hive.stats.autogather=true during the INSERT OVERWRITE command. + * + **/ + +public class ColumnStatsAutoGatherContext { + + public AnalyzeRewriteContext analyzeRewrite; + private final List<LoadFileDesc> loadFileWork = new ArrayList<>(); + private final SemanticAnalyzer sa; + private final HiveConf conf; + private final Operator<? extends OperatorDesc> op; + private final List<FieldSchema> columns; + private final List<FieldSchema> partitionColumns; + private boolean isInsertInto; + private Table tbl; + private Map<String, String> partSpec; + + public ColumnStatsAutoGatherContext(SemanticAnalyzer sa, HiveConf conf, + Operator<? extends OperatorDesc> op, Table tbl, Map<String, String> partSpec, + boolean isInsertInto) throws SemanticException { + super(); + this.sa = sa; + this.conf = conf; + this.op = op; + this.tbl = tbl; + this.partSpec = partSpec; + this.isInsertInto = isInsertInto; + columns = tbl.getCols(); + partitionColumns = tbl.getPartCols(); + } + + public List<LoadFileDesc> getLoadFileWork() { + return loadFileWork; + } + + public AnalyzeRewriteContext getAnalyzeRewrite() { + return analyzeRewrite; + } + + public void setAnalyzeRewrite(AnalyzeRewriteContext analyzeRewrite) { + this.analyzeRewrite = analyzeRewrite; + } + + public void insertAnalyzePipeline() throws SemanticException{ + // 1. Generate the statement of analyze table [tablename] compute statistics for columns + // In non-partitioned table case, it will generate TS-SEL-GBY-RS-GBY-SEL-FS operator + // In static-partitioned table case, it will generate TS-FIL(partitionKey)-SEL-GBY(partitionKey)-RS-GBY-SEL-FS operator + // In dynamic-partitioned table case, it will generate TS-SEL-GBY(partitionKey)-RS-GBY-SEL-FS operator + // However, we do not need to specify the partition-spec because (1) the data is going to be inserted to that specific partition + // (2) we can compose the static/dynamic partition using a select operator in replaceSelectOperatorProcess.. + String analyzeCommand = "analyze table `" + tbl.getDbName() + "`.`" + tbl.getTableName() + "`" + + " compute statistics for columns "; + + // 2. Based on the statement, generate the selectOperator + Operator<?> selOp = null; + try { + selOp = genSelOpForAnalyze(analyzeCommand); + } catch (IOException | ParseException e) { + throw new SemanticException(e); + } + + // 3. attach this SEL to the operator right before FS + op.getChildOperators().add(selOp); + selOp.getParentOperators().clear(); + selOp.getParentOperators().add(op); + + // 4. address the colExp, colList, etc for the SEL + try { + replaceSelectOperatorProcess((SelectOperator)selOp, op); + } catch (HiveException e) { + throw new SemanticException(e); + } + } + + @SuppressWarnings("rawtypes") + private Operator genSelOpForAnalyze(String analyzeCommand) throws IOException, ParseException, SemanticException{ + //0. initialization + Context ctx = new Context(conf); + ParseDriver pd = new ParseDriver(); + ASTNode tree = pd.parse(analyzeCommand, ctx); + tree = ParseUtils.findRootNonNullToken(tree); + + //1. get the ColumnStatsSemanticAnalyzer + BaseSemanticAnalyzer baseSem = SemanticAnalyzerFactory.get(new QueryState(conf), tree); + ColumnStatsSemanticAnalyzer colSem = (ColumnStatsSemanticAnalyzer) baseSem; + + //2. get the rewritten AST + ASTNode ast = colSem.rewriteAST(tree, this); + baseSem = SemanticAnalyzerFactory.get(new QueryState(conf), ast); + SemanticAnalyzer sem = (SemanticAnalyzer) baseSem; + QB qb = new QB(null, null, false); + ASTNode child = ast; + ParseContext subPCtx = ((SemanticAnalyzer) sem).getParseContext(); + subPCtx.setContext(ctx); + ((SemanticAnalyzer) sem).initParseCtx(subPCtx); + sem.doPhase1(child, qb, sem.initPhase1Ctx(), null); + // This will trigger new calls to metastore to collect metadata + // TODO: cache the information from the metastore + sem.getMetaData(qb); + Operator<?> operator = sem.genPlan(qb); + + //3. populate the load file work so that ColumnStatsTask can work + loadFileWork.addAll(sem.getLoadFileWork()); + + //4. because there is only one TS for analyze statement, we can get it. + if (sem.topOps.values().size() != 1) { + throw new SemanticException( + "ColumnStatsAutoGatherContext is expecting exactly one TS, but finds " + + sem.topOps.values().size()); + } + operator = sem.topOps.values().iterator().next(); + + //5. get the first SEL after TS + while(!(operator instanceof SelectOperator)){ + operator = operator.getChildOperators().get(0); + } + return operator; + } + + /** + * @param operator : the select operator in the analyze statement + * @param input : the operator right before FS in the insert overwrite statement + * @throws HiveException + */ + private void replaceSelectOperatorProcess(SelectOperator operator, Operator<? extends OperatorDesc> input) + throws HiveException { + RowSchema selRS = operator.getSchema(); + ArrayList<ColumnInfo> signature = new ArrayList<>(); + OpParseContext inputCtx = sa.opParseCtx.get(input); + RowResolver inputRR = inputCtx.getRowResolver(); + ArrayList<ColumnInfo> columns = inputRR.getColumnInfos(); + ArrayList<ExprNodeDesc> colList = new ArrayList<ExprNodeDesc>(); + ArrayList<String> columnNames = new ArrayList<String>(); + Map<String, ExprNodeDesc> columnExprMap = + new HashMap<String, ExprNodeDesc>(); + // the column positions in the operator should be like this + // <----non-partition columns---->|<--static partition columns-->|<--dynamic partition columns--> + // ExprNodeColumnDesc | ExprNodeConstantDesc | ExprNodeColumnDesc + // from input | generate itself | from input + // | + + // 1. deal with non-partition columns + for (int i = 0; i < this.columns.size(); i++) { + ColumnInfo col = columns.get(i); + ExprNodeDesc exprNodeDesc = new ExprNodeColumnDesc(col); + colList.add(exprNodeDesc); + String internalName = selRS.getColumnNames().get(i); + columnNames.add(internalName); + columnExprMap.put(internalName, exprNodeDesc); + signature.add(selRS.getSignature().get(i)); + } + // if there is any partition column (in static partition or dynamic + // partition or mixed case) + int dynamicPartBegin = -1; + for (int i = 0; i < partitionColumns.size(); i++) { + ExprNodeDesc exprNodeDesc = null; + String partColName = partitionColumns.get(i).getName(); + // 2. deal with static partition columns + if (partSpec != null && partSpec.containsKey(partColName) + && partSpec.get(partColName) != null) { + if (dynamicPartBegin > 0) { + throw new SemanticException( + "Dynamic partition columns should not come before static partition columns."); + } + exprNodeDesc = new ExprNodeConstantDesc(partSpec.get(partColName)); + TypeInfo srcType = exprNodeDesc.getTypeInfo(); + TypeInfo destType = selRS.getSignature().get(this.columns.size() + i).getType(); + if (!srcType.equals(destType)) { + // This may be possible when srcType is string but destType is integer + exprNodeDesc = ParseUtils + .createConversionCast(exprNodeDesc, (PrimitiveTypeInfo) destType); + } + } + // 3. dynamic partition columns + else { + dynamicPartBegin++; + ColumnInfo col = columns.get(this.columns.size() + dynamicPartBegin); + TypeInfo srcType = col.getType(); + TypeInfo destType = selRS.getSignature().get(this.columns.size() + i).getType(); + exprNodeDesc = new ExprNodeColumnDesc(col); + if (!srcType.equals(destType)) { + exprNodeDesc = ParseUtils + .createConversionCast(exprNodeDesc, (PrimitiveTypeInfo) destType); + } + } + colList.add(exprNodeDesc); + String internalName = selRS.getColumnNames().get(this.columns.size() + i); + columnNames.add(internalName); + columnExprMap.put(internalName, exprNodeDesc); + signature.add(selRS.getSignature().get(this.columns.size() + i)); + } + operator.setConf(new SelectDesc(colList, columnNames)); + operator.setColumnExprMap(columnExprMap); + selRS.setSignature(signature); + operator.setSchema(selRS); + } + + public String getCompleteName() { + return tbl.getDbName() + "." + tbl.getTableName(); + } + + public boolean isInsertInto() { + return isInsertInto; + } + + public static boolean canRunAutogatherStats(Operator curr) { + // check the ObjectInspector + for (ColumnInfo cinfo : curr.getSchema().getSignature()) { + if (cinfo.getIsVirtualCol()) { + return false; + } else if (cinfo.getObjectInspector().getCategory() != ObjectInspector.Category.PRIMITIVE) { + return false; + } else { + switch (((PrimitiveTypeInfo) cinfo.getType()).getPrimitiveCategory()) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + case TIMESTAMP: + case FLOAT: + case DOUBLE: + case STRING: + case CHAR: + case VARCHAR: + case BINARY: + case DECIMAL: + // TODO: Support case DATE: + break; + default: + return false; + } + } + } + return true; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java index 3b6cbce..d3aef41 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java @@ -110,11 +110,18 @@ public class ColumnStatsSemanticAnalyzer extends SemanticAnalyzer { partValsSpecified += partSpec.get(partKey) == null ? 0 : 1; } try { - if ((partValsSpecified == tbl.getPartitionKeys().size()) && (db.getPartition(tbl, partSpec, false, null, false) == null)) { - throw new SemanticException(ErrorMsg.COLUMNSTATSCOLLECTOR_INVALID_PARTITION.getMsg() + " : " + partSpec); + // for static partition, it may not exist when HIVESTATSCOLAUTOGATHER is + // set to true + if (!conf.getBoolVar(ConfVars.HIVESTATSCOLAUTOGATHER)) { + if ((partValsSpecified == tbl.getPartitionKeys().size()) + && (db.getPartition(tbl, partSpec, false, null, false) == null)) { + throw new SemanticException(ErrorMsg.COLUMNSTATSCOLLECTOR_INVALID_PARTITION.getMsg() + + " : " + partSpec); + } } } catch (HiveException he) { - throw new SemanticException(ErrorMsg.COLUMNSTATSCOLLECTOR_INVALID_PARTITION.getMsg() + " : " + partSpec); + throw new SemanticException(ErrorMsg.COLUMNSTATSCOLLECTOR_INVALID_PARTITION.getMsg() + " : " + + partSpec); } // User might have only specified partial list of partition keys, in which case add other partition keys in partSpec @@ -157,7 +164,7 @@ public class ColumnStatsSemanticAnalyzer extends SemanticAnalyzer { } else { groupByClause.append(","); } - groupByClause.append(fs.getName()); + groupByClause.append("`" + fs.getName() + "`"); } // attach the predicate and group by to the return clause @@ -235,12 +242,12 @@ public class ColumnStatsSemanticAnalyzer extends SemanticAnalyzer { if (isPartitionStats) { for (FieldSchema fs : tbl.getPartCols()) { - rewrittenQueryBuilder.append(" , " + fs.getName()); + rewrittenQueryBuilder.append(" , `" + fs.getName() + "`"); } } - rewrittenQueryBuilder.append(" from "); + rewrittenQueryBuilder.append(" from `"); rewrittenQueryBuilder.append(tbl.getDbName()); - rewrittenQueryBuilder.append("."); + rewrittenQueryBuilder.append("`."); rewrittenQueryBuilder.append("`" + tbl.getTableName() + "`"); isRewritten = true; @@ -378,4 +385,54 @@ public class ColumnStatsSemanticAnalyzer extends SemanticAnalyzer { analyzeInternal(originalTree); } } + + /** + * @param ast + * is the original analyze ast + * @param qb + * is the qb that calls this function + * @param sem + * is the semantic analyzer that calls this function + * @return + * @throws SemanticException + */ + public ASTNode rewriteAST(ASTNode ast, ColumnStatsAutoGatherContext context) + throws SemanticException { + tbl = AnalyzeCommandUtils.getTable(ast, this); + colNames = getColumnName(ast); + // Save away the original AST + originalTree = ast; + boolean isPartitionStats = AnalyzeCommandUtils.isPartitionLevelStats(ast); + Map<String, String> partSpec = null; + checkForPartitionColumns(colNames, + Utilities.getColumnNamesFromFieldSchema(tbl.getPartitionKeys())); + validateSpecifiedColumnNames(colNames); + if (conf.getBoolVar(ConfVars.HIVE_STATS_COLLECT_PART_LEVEL_STATS) && tbl.isPartitioned()) { + isPartitionStats = true; + } + + if (isPartitionStats) { + isTableLevel = false; + partSpec = AnalyzeCommandUtils.getPartKeyValuePairsFromAST(tbl, ast, conf); + handlePartialPartitionSpec(partSpec); + } else { + isTableLevel = true; + } + colType = getColumnTypes(colNames); + int numBitVectors = 0; + try { + numBitVectors = HiveStatsUtils.getNumBitVectorsForNDVEstimation(conf); + } catch (Exception e) { + throw new SemanticException(e.getMessage()); + } + rewrittenQuery = genRewrittenQuery(colNames, numBitVectors, partSpec, isPartitionStats); + rewrittenTree = genRewrittenTree(rewrittenQuery); + + context.analyzeRewrite = new AnalyzeRewriteContext(); + context.analyzeRewrite.setTableName(tbl.getDbName() + "." + tbl.getTableName()); + context.analyzeRewrite.setTblLvl(isTableLevel); + context.analyzeRewrite.setColName(colNames); + context.analyzeRewrite.setColType(colType); + return rewrittenTree; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java index 96ef20d..b2125ca 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java @@ -80,6 +80,7 @@ public class ParseContext { private HashMap<String, SplitSample> nameToSplitSample; private List<LoadTableDesc> loadTableWork; private List<LoadFileDesc> loadFileWork; + private List<ColumnStatsAutoGatherContext> columnStatsAutoGatherContexts; private Context ctx; private QueryState queryState; private HiveConf conf; @@ -166,6 +167,7 @@ public class ParseContext { Set<JoinOperator> joinOps, Set<SMBMapJoinOperator> smbMapJoinOps, List<LoadTableDesc> loadTableWork, List<LoadFileDesc> loadFileWork, + List<ColumnStatsAutoGatherContext> columnStatsAutoGatherContexts, Context ctx, HashMap<String, String> idToTableNameMap, int destTableId, UnionProcContext uCtx, List<AbstractMapJoinOperator<? extends MapJoinDesc>> listMapJoinOpsNoReducer, Map<String, PrunedPartitionList> prunedPartitions, @@ -188,6 +190,7 @@ public class ParseContext { this.smbMapJoinOps = smbMapJoinOps; this.loadFileWork = loadFileWork; this.loadTableWork = loadTableWork; + this.columnStatsAutoGatherContexts = columnStatsAutoGatherContexts; this.topOps = topOps; this.ctx = ctx; this.idToTableNameMap = idToTableNameMap; @@ -608,4 +611,13 @@ public class ParseContext { public Map<String, Table> getTabNameToTabObject() { return tabNameToTabObject; } + + public List<ColumnStatsAutoGatherContext> getColumnStatsAutoGatherContexts() { + return columnStatsAutoGatherContexts; + } + + public void setColumnStatsAutoGatherContexts( + List<ColumnStatsAutoGatherContext> columnStatsAutoGatherContexts) { + this.columnStatsAutoGatherContexts = columnStatsAutoGatherContexts; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java index 3a226e7..3a0402e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java @@ -63,7 +63,9 @@ public class QBParseInfo { private final Set<String> destCubes; private final Set<String> destGroupingSets; private final Map<String, ASTNode> destToHaving; - private final HashSet<String> insertIntoTables; + // insertIntoTables/insertOverwriteTables map a table's fullName to its ast; + private final Map<String, ASTNode> insertIntoTables; + private final Map<String, ASTNode> insertOverwriteTables; private boolean isAnalyzeCommand; // used for the analyze command (statistics) private boolean isNoScanAnalyzeCommand; // used for the analyze command (statistics) (noscan) @@ -133,7 +135,8 @@ public class QBParseInfo { destToSortby = new HashMap<String, ASTNode>(); destToOrderby = new HashMap<String, ASTNode>(); destToLimit = new HashMap<String, SimpleEntry<Integer, Integer>>(); - insertIntoTables = new HashSet<String>(); + insertIntoTables = new HashMap<String, ASTNode>(); + insertOverwriteTables = new HashMap<String, ASTNode>(); destRollups = new HashSet<String>(); destCubes = new HashSet<String>(); destGroupingSets = new HashSet<String>(); @@ -174,13 +177,13 @@ public class QBParseInfo { } } - public void addInsertIntoTable(String fullName) { - insertIntoTables.add(fullName.toLowerCase()); + public void addInsertIntoTable(String fullName, ASTNode ast) { + insertIntoTables.put(fullName.toLowerCase(), ast); } public boolean isInsertIntoTable(String dbName, String table) { String fullName = dbName + "." + table; - return insertIntoTables.contains(fullName.toLowerCase()); + return insertIntoTables.containsKey(fullName.toLowerCase()); } /** @@ -189,7 +192,7 @@ public class QBParseInfo { * @return */ public boolean isInsertIntoTable(String fullTableName) { - return insertIntoTables.contains(fullTableName.toLowerCase()); + return insertIntoTables.containsKey(fullTableName.toLowerCase()); } public HashMap<String, ASTNode> getAggregationExprsForClause(String clause) { @@ -636,6 +639,11 @@ public class QBParseInfo { public void setPartialScanAnalyzeCommand(boolean isPartialScanAnalyzeCommand) { this.isPartialScanAnalyzeCommand = isPartialScanAnalyzeCommand; } + + public Map<String, ASTNode> getInsertOverwriteTables() { + return insertOverwriteTables; + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 7162c08..6937308 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -79,6 +79,7 @@ import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; import org.apache.hadoop.hive.ql.exec.ArchiveUtils; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory; +import org.apache.hadoop.hive.ql.exec.FetchOperator; import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.FilterOperator; @@ -129,6 +130,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException; import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException.UnsupportedFeature; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.HiveOpConverterPostProc; import org.apache.hadoop.hive.ql.optimizer.lineage.Generator; +import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec.SpecType; import org.apache.hadoop.hive.ql.parse.CalcitePlanner.ASTSearcher; @@ -182,6 +184,7 @@ import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PTFDesc; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.ScriptDesc; @@ -259,6 +262,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { protected LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtx; private List<LoadTableDesc> loadTableWork; private List<LoadFileDesc> loadFileWork; + private List<ColumnStatsAutoGatherContext> columnStatsAutoGatherContexts; private final Map<JoinOperator, QBJoinTree> joinContext; private final Map<SMBMapJoinOperator, QBJoinTree> smbMapJoinContext; private final HashMap<TableScanOperator, Table> topToTable; @@ -353,6 +357,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { topOps = new LinkedHashMap<String, TableScanOperator>(); loadTableWork = new ArrayList<LoadTableDesc>(); loadFileWork = new ArrayList<LoadFileDesc>(); + columnStatsAutoGatherContexts = new ArrayList<ColumnStatsAutoGatherContext>(); opParseCtx = new LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext>(); joinContext = new HashMap<JoinOperator, QBJoinTree>(); smbMapJoinContext = new HashMap<SMBMapJoinOperator, QBJoinTree>(); @@ -390,6 +395,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { tabNameToTabObject.clear(); loadTableWork.clear(); loadFileWork.clear(); + columnStatsAutoGatherContexts.clear(); topOps.clear(); destTableId = 1; idToTableNameMap.clear(); @@ -448,7 +454,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { return new ParseContext(queryState, opToPartPruner, opToPartList, topOps, new HashSet<JoinOperator>(joinContext.keySet()), new HashSet<SMBMapJoinOperator>(smbMapJoinContext.keySet()), - loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, + loadTableWork, loadFileWork, columnStatsAutoGatherContexts, ctx, idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer, prunedPartitions, tabNameToTabObject, opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner, viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting, @@ -1401,18 +1407,25 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { case HiveParser.TOK_INSERT_INTO: String currentDatabase = SessionState.get().getCurrentDatabase(); String tab_name = getUnescapedName((ASTNode) ast.getChild(0).getChild(0), currentDatabase); - qbp.addInsertIntoTable(tab_name); + qbp.addInsertIntoTable(tab_name, ast); case HiveParser.TOK_DESTINATION: ctx_1.dest = "insclause-" + ctx_1.nextNum; ctx_1.nextNum++; boolean isTmpFileDest = false; if (ast.getChildCount() > 0 && ast.getChild(0) instanceof ASTNode) { - ASTNode ch = (ASTNode)ast.getChild(0); - if (ch.getToken().getType() == HiveParser.TOK_DIR - && ch.getChildCount() > 0 && ch.getChild(0) instanceof ASTNode) { - ch = (ASTNode)ch.getChild(0); + ASTNode ch = (ASTNode) ast.getChild(0); + if (ch.getToken().getType() == HiveParser.TOK_DIR && ch.getChildCount() > 0 + && ch.getChild(0) instanceof ASTNode) { + ch = (ASTNode) ch.getChild(0); isTmpFileDest = ch.getToken().getType() == HiveParser.TOK_TMP_FILE; + } else { + if (ast.getToken().getType() == HiveParser.TOK_DESTINATION + && ast.getChild(0).getType() == HiveParser.TOK_TAB) { + String fullTableName = getUnescapedName((ASTNode) ast.getChild(0).getChild(0), + SessionState.get().getCurrentDatabase()); + qbp.getInsertOverwriteTables().put(fullTableName, ast); + } } } @@ -6516,6 +6529,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { DynamicPartitionCtx dpCtx = null; LoadTableDesc ltd = null; ListBucketingCtx lbCtx = null; + Map<String, String> partSpec = null; switch (dest_type.intValue()) { case QBMetaData.DEST_TABLE: { @@ -6531,7 +6545,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { ErrorMsg.INSERT_EXTERNAL_TABLE.getMsg(dest_tab.getTableName())); } - Map<String, String> partSpec = qbm.getPartSpecForAlias(dest); + partSpec = qbm.getPartSpecForAlias(dest); dest_path = dest_tab.getPath(); // If the query here is an INSERT_INTO and the target is an immutable table, @@ -6875,6 +6889,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } input = genConversionSelectOperator(dest, qb, input, table_desc, dpCtx); + inputRR = opParseCtx.get(input).getRowResolver(); ArrayList<ColumnInfo> vecCol = new ArrayList<ColumnInfo>(); @@ -7004,9 +7019,41 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { FileSinkOperator fso = (FileSinkOperator) output; fso.getConf().setTable(dest_tab); fsopToTable.put(fso, dest_tab); + // the following code is used to collect column stats when + // hive.stats.autogather=true + // and it is an insert overwrite or insert into table + if (dest_tab != null && conf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER) + && conf.getBoolVar(ConfVars.HIVESTATSCOLAUTOGATHER) + && ColumnStatsAutoGatherContext.canRunAutogatherStats(fso)) { + if (dest_type.intValue() == QBMetaData.DEST_TABLE) { + genAutoColumnStatsGatheringPipeline(qb, table_desc, partSpec, input, qb.getParseInfo() + .isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName())); + } else if (dest_type.intValue() == QBMetaData.DEST_PARTITION) { + genAutoColumnStatsGatheringPipeline(qb, table_desc, dest_part.getSpec(), input, qb + .getParseInfo().isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName())); + + } + } return output; } + private void genAutoColumnStatsGatheringPipeline(QB qb, TableDesc table_desc, + Map<String, String> partSpec, Operator curr, boolean isInsertInto) throws SemanticException { + String tableName = table_desc.getTableName(); + Table table = null; + try { + table = db.getTable(tableName); + } catch (HiveException e) { + throw new SemanticException(e.getMessage()); + } + LOG.info("Generate an operator pipleline to autogather column stats for table " + tableName + + " in query " + ctx.getCmd()); + ColumnStatsAutoGatherContext columnStatsAutoGatherContext = null; + columnStatsAutoGatherContext = new ColumnStatsAutoGatherContext(this, conf, curr, table, partSpec, isInsertInto); + columnStatsAutoGatherContext.insertAnalyzePipeline(); + columnStatsAutoGatherContexts.add(columnStatsAutoGatherContext); + } + String fixCtasColumnName(String colName) { return colName; } @@ -10689,7 +10736,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { ParseContext pCtx = new ParseContext(queryState, opToPartPruner, opToPartList, topOps, new HashSet<JoinOperator>(joinContext.keySet()), new HashSet<SMBMapJoinOperator>(smbMapJoinContext.keySet()), - loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, + loadTableWork, loadFileWork, columnStatsAutoGatherContexts, ctx, idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer, prunedPartitions, tabNameToTabObject, opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner, viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting, @@ -12895,4 +12942,12 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { String.format("Warning: %s", msg)); } + public List<LoadFileDesc> getLoadFileWork() { + return loadFileWork; + } + + public void setLoadFileWork(List<LoadFileDesc> loadFileWork) { + this.loadFileWork = loadFileWork; + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index 4049f40..4b34ebf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -20,15 +20,19 @@ package org.apache.hadoop.hive.ql.parse; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashSet; +import java.util.LinkedList; import java.util.List; +import java.util.Queue; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -69,6 +73,8 @@ import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.thrift.ThriftFormatter; import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe; +import akka.util.Collections; + import com.google.common.collect.Interner; import com.google.common.collect.Interners; @@ -251,15 +257,6 @@ public abstract class TaskCompiler { generateTaskTree(rootTasks, pCtx, mvTask, inputs, outputs); - /* - * If the query was the result of analyze table column compute statistics rewrite, create - * a column stats task instead of a fetch task to persist stats to the metastore. - */ - if (isCStats) { - genColumnStatsTask(pCtx.getAnalyzeRewrite(), loadTableWork, loadFileWork, - rootTasks, outerQueryLimit); - } - // For each task, set the key descriptor for the reducer for (Task<? extends Serializable> rootTask : rootTasks) { GenMapRedUtils.setKeyAndValueDescForTaskTree(rootTask); @@ -273,6 +270,35 @@ public abstract class TaskCompiler { optimizeTaskPlan(rootTasks, pCtx, ctx); + /* + * If the query was the result of analyze table column compute statistics rewrite, create + * a column stats task instead of a fetch task to persist stats to the metastore. + */ + if (isCStats || !pCtx.getColumnStatsAutoGatherContexts().isEmpty()) { + Set<Task<? extends Serializable>> leafTasks = new LinkedHashSet<Task<? extends Serializable>>(); + getLeafTasks(rootTasks, leafTasks); + if (isCStats) { + genColumnStatsTask(pCtx.getAnalyzeRewrite(), loadFileWork, leafTasks, outerQueryLimit, 0); + } else { + for (ColumnStatsAutoGatherContext columnStatsAutoGatherContext : pCtx + .getColumnStatsAutoGatherContexts()) { + if (!columnStatsAutoGatherContext.isInsertInto()) { + genColumnStatsTask(columnStatsAutoGatherContext.getAnalyzeRewrite(), + columnStatsAutoGatherContext.getLoadFileWork(), leafTasks, outerQueryLimit, 0); + } else { + int numBitVector; + try { + numBitVector = HiveStatsUtils.getNumBitVectorsForNDVEstimation(conf); + } catch (Exception e) { + throw new SemanticException(e.getMessage()); + } + genColumnStatsTask(columnStatsAutoGatherContext.getAnalyzeRewrite(), + columnStatsAutoGatherContext.getLoadFileWork(), leafTasks, outerQueryLimit, numBitVector); + } + } + } + } + decideExecMode(rootTasks, ctx, globalLimitCtx); if (pCtx.getQueryProperties().isCTAS() && !pCtx.getCreateTable().isMaterialization()) { @@ -355,8 +381,9 @@ public abstract class TaskCompiler { * @param qb */ @SuppressWarnings("unchecked") - protected void genColumnStatsTask(AnalyzeRewriteContext analyzeRewrite, List<LoadTableDesc> loadTableWork, - List<LoadFileDesc> loadFileWork, List<Task<? extends Serializable>> rootTasks, int outerQueryLimit) { + protected void genColumnStatsTask(AnalyzeRewriteContext analyzeRewrite, + List<LoadFileDesc> loadFileWork, Set<Task<? extends Serializable>> leafTasks, + int outerQueryLimit, int numBitVector) { ColumnStatsTask cStatsTask = null; ColumnStatsWork cStatsWork = null; FetchWork fetch = null; @@ -385,12 +412,12 @@ public abstract class TaskCompiler { fetch = new FetchWork(loadFileWork.get(0).getSourcePath(), resultTab, outerQueryLimit); ColumnStatsDesc cStatsDesc = new ColumnStatsDesc(tableName, - colName, colType, isTblLevel); + colName, colType, isTblLevel, numBitVector); cStatsWork = new ColumnStatsWork(fetch, cStatsDesc); cStatsTask = (ColumnStatsTask) TaskFactory.get(cStatsWork, conf); - // This is a column stats task. According to the semantic, there should be - // only one MR task in the rootTask. - rootTasks.get(0).addDependentTask(cStatsTask); + for (Task<? extends Serializable> tsk : leafTasks) { + tsk.addDependentTask(cStatsTask); + } } @@ -398,7 +425,7 @@ public abstract class TaskCompiler { * Find all leaf tasks of the list of root tasks. */ protected void getLeafTasks(List<Task<? extends Serializable>> rootTasks, - HashSet<Task<? extends Serializable>> leaves) { + Set<Task<? extends Serializable>> leaves) { for (Task<? extends Serializable> root : rootTasks) { getLeafTasks(root, leaves); @@ -406,7 +433,7 @@ public abstract class TaskCompiler { } private void getLeafTasks(Task<? extends Serializable> task, - HashSet<Task<? extends Serializable>> leaves) { + Set<Task<? extends Serializable>> leaves) { if (task.getDependentTasks() == null) { if (!leaves.contains(task)) { leaves.add(task); @@ -453,7 +480,7 @@ public abstract class TaskCompiler { ParseContext clone = new ParseContext(queryState, pCtx.getOpToPartPruner(), pCtx.getOpToPartList(), pCtx.getTopOps(), pCtx.getJoinOps(), pCtx.getSmbMapJoinOps(), - pCtx.getLoadTableWork(), pCtx.getLoadFileWork(), pCtx.getContext(), + pCtx.getLoadTableWork(), pCtx.getLoadFileWork(), pCtx.getColumnStatsAutoGatherContexts(), pCtx.getContext(), pCtx.getIdToTableNameMap(), pCtx.getDestTableId(), pCtx.getUCtx(), pCtx.getListMapJoinOpsNoReducer(), pCtx.getPrunedPartitions(), pCtx.getTabNameToTabObject(), pCtx.getOpToSamplePruner(), pCtx.getGlobalLimitCtx(), http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java index 9934fdf..778d6f9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java @@ -51,7 +51,7 @@ public class ConditionalResolverSkewJoin implements ConditionalResolver, Seriali // this map stores mapping from "big key dir" to its corresponding mapjoin // task. private HashMap<Path, Task<? extends Serializable>> dirToTaskMap; - private Task<? extends Serializable> noSkewTask; + private List<Task<? extends Serializable>> noSkewTask; /** * For serialization use only. @@ -61,7 +61,7 @@ public class ConditionalResolverSkewJoin implements ConditionalResolver, Seriali public ConditionalResolverSkewJoinCtx( HashMap<Path, Task<? extends Serializable>> dirToTaskMap, - Task<? extends Serializable> noSkewTask) { + List<Task<? extends Serializable>> noSkewTask) { super(); this.dirToTaskMap = dirToTaskMap; this.noSkewTask = noSkewTask; @@ -76,11 +76,11 @@ public class ConditionalResolverSkewJoin implements ConditionalResolver, Seriali this.dirToTaskMap = dirToTaskMap; } - public Task<? extends Serializable> getNoSkewTask() { + public List<Task<? extends Serializable>> getNoSkewTask() { return noSkewTask; } - public void setNoSkewTask(Task<? extends Serializable> noSkewTask) { + public void setNoSkewTask(List<Task<? extends Serializable>> noSkewTask) { this.noSkewTask = noSkewTask; } } @@ -121,7 +121,7 @@ public class ConditionalResolverSkewJoin implements ConditionalResolver, Seriali e.printStackTrace(); } if (resTsks.isEmpty() && ctx.getNoSkewTask() != null) { - resTsks.add(ctx.getNoSkewTask()); + resTsks.addAll(ctx.getNoSkewTask()); } return resTsks; } http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/test/queries/clientpositive/autoColumnStats_1.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/autoColumnStats_1.q b/ql/src/test/queries/clientpositive/autoColumnStats_1.q new file mode 100644 index 0000000..bb7252a --- /dev/null +++ b/ql/src/test/queries/clientpositive/autoColumnStats_1.q @@ -0,0 +1,192 @@ +set hive.stats.column.autogather=true; +set hive.stats.fetch.column.stats=true; +set hive.exec.dynamic.partition=true; +set hive.exec.dynamic.partition.mode=nonstrict; +set hive.auto.convert.join=true; +set hive.join.emit.interval=2; +set hive.auto.convert.join.noconditionaltask=true; +set hive.auto.convert.join.noconditionaltask.size=10000; +set hive.auto.convert.sortmerge.join.bigtable.selection.policy = org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ; +set hive.optimize.bucketingsorting=false; + +drop table src_multi1; + +create table src_multi1 like src; + +insert overwrite table src_multi1 select * from src; + +explain extended select * from src_multi1; + +describe formatted src_multi1; + +drop table a; +drop table b; +create table a like src; +create table b like src; + +from src +insert overwrite table a select * +insert overwrite table b select *; + +describe formatted a; +describe formatted b; + +drop table a; +drop table b; +create table a like src; +create table b like src; + +from src +insert overwrite table a select * +insert into table b select *; + +describe formatted a; +describe formatted b; + + +drop table src_multi2; + +create table src_multi2 like src; + +insert overwrite table src_multi2 select subq.key, src.value from (select * from src union select * from src1)subq join src on subq.key=src.key; + +describe formatted src_multi2; + + +drop table nzhang_part14; + +create table if not exists nzhang_part14 (key string) + partitioned by (value string); + +insert overwrite table nzhang_part14 partition(value) +select key, value from ( + select * from (select 'k1' as key, cast(null as string) as value from src limit 2)a + union all + select * from (select 'k2' as key, '' as value from src limit 2)b + union all + select * from (select 'k3' as key, ' ' as value from src limit 2)c +) T; + +explain select key from nzhang_part14; + + +drop table src5; + +create table src5 as select key, value from src limit 5; + +insert overwrite table nzhang_part14 partition(value) +select key, value from src5; + +explain select key from nzhang_part14; + + +create table alter5 ( col1 string ) partitioned by (dt string); + +alter table alter5 add partition (dt='a') location 'parta'; + +describe formatted alter5 partition (dt='a'); + +insert overwrite table alter5 partition (dt='a') select key from src ; + +describe formatted alter5 partition (dt='a'); + +explain select * from alter5 where dt='a'; + + +drop table src_stat_part; +create table src_stat_part(key string, value string) partitioned by (partitionId int); + +insert overwrite table src_stat_part partition (partitionId=1) +select * from src1 limit 5; + +describe formatted src_stat_part PARTITION(partitionId=1); + +insert overwrite table src_stat_part partition (partitionId=2) +select * from src1; + +describe formatted src_stat_part PARTITION(partitionId=2); + +drop table srcbucket_mapjoin; +CREATE TABLE srcbucket_mapjoin(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; +drop table tab_part; +CREATE TABLE tab_part (key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE; +drop table srcbucket_mapjoin_part; +CREATE TABLE srcbucket_mapjoin_part (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE; + +load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08'); +load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08'); + +load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08'); +load data local inpath '../../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08'); +load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08'); +load data local inpath '../../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08'); + +insert overwrite table tab_part partition (ds='2008-04-08') +select key,value from srcbucket_mapjoin_part; + +describe formatted tab_part partition (ds='2008-04-08'); + +CREATE TABLE tab(key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; +insert overwrite table tab partition (ds='2008-04-08') +select key,value from srcbucket_mapjoin; + +describe formatted tab partition (ds='2008-04-08'); + +drop table nzhang_part14; + +create table if not exists nzhang_part14 (key string, value string) + partitioned by (ds string, hr string); + +describe formatted nzhang_part14; + +insert overwrite table nzhang_part14 partition(ds, hr) +select key, value, ds, hr from ( + select * from (select 'k1' as key, cast(null as string) as value, '1' as ds, '2' as hr from src limit 2)a + union all + select * from (select 'k2' as key, '' as value, '1' as ds, '3' as hr from src limit 2)b + union all + select * from (select 'k3' as key, ' ' as value, '2' as ds, '1' as hr from src limit 2)c +) T; + +desc formatted nzhang_part14 partition(ds='1', hr='3'); + + +INSERT OVERWRITE TABLE nzhang_part14 PARTITION (ds='2010-03-03', hr) +SELECT key, value, hr FROM srcpart WHERE ds is not null and hr>10; + +desc formatted nzhang_part14 PARTITION(ds='2010-03-03', hr='12'); + + +drop table nzhang_part14; +create table if not exists nzhang_part14 (key string, value string) +partitioned by (ds string, hr string); + +INSERT OVERWRITE TABLE nzhang_part14 PARTITION (ds='2010-03-03', hr) +SELECT key, value, hr FROM srcpart WHERE ds is not null and hr>10; + +desc formatted nzhang_part14 PARTITION(ds='2010-03-03', hr='12'); + +drop table a; +create table a (key string, value string) +partitioned by (ds string, hr string); + +drop table b; +create table b (key string, value string) +partitioned by (ds string, hr string); + +drop table c; +create table c (key string, value string) +partitioned by (ds string, hr string); + + +FROM srcpart +INSERT OVERWRITE TABLE a PARTITION (ds='2010-03-11', hr) SELECT key, value, hr WHERE ds is not null and hr>10 +INSERT OVERWRITE TABLE b PARTITION (ds='2010-04-11', hr) SELECT key, value, hr WHERE ds is not null and hr>11 +INSERT OVERWRITE TABLE c PARTITION (ds='2010-05-11', hr) SELECT key, value, hr WHERE hr>0; + +explain select key from a; +explain select value from b; +explain select key from b; +explain select value from c; +explain select key from c; + http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/test/queries/clientpositive/autoColumnStats_2.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/autoColumnStats_2.q b/ql/src/test/queries/clientpositive/autoColumnStats_2.q new file mode 100644 index 0000000..c1abcb1 --- /dev/null +++ b/ql/src/test/queries/clientpositive/autoColumnStats_2.q @@ -0,0 +1,214 @@ +set hive.stats.column.autogather=true; +set hive.stats.fetch.column.stats=true; +set hive.exec.dynamic.partition=true; +set hive.exec.dynamic.partition.mode=nonstrict; +set hive.auto.convert.join=true; +set hive.join.emit.interval=2; +set hive.auto.convert.join.noconditionaltask=true; +set hive.auto.convert.join.noconditionaltask.size=10000; +set hive.auto.convert.sortmerge.join.bigtable.selection.policy = org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ; +set hive.optimize.bucketingsorting=false; + +drop table src_multi1; + +create table src_multi1 like src; + +insert into table src_multi1 select * from src; + +explain extended select * from src_multi1; + +describe formatted src_multi1; + +drop table a; +drop table b; +create table a like src; +create table b like src; + +from src +insert into table a select * +insert into table b select *; + +describe formatted a key; +describe formatted b key; + +from src +insert overwrite table a select * +insert into table b select *; + +describe formatted a; +describe formatted b; + +describe formatted b key; +describe formatted b value; + +insert into table b select NULL, NULL from src limit 10; + +describe formatted b key; +describe formatted b value; + +insert into table b(value) select key+100000 from src limit 10; + +describe formatted b key; +describe formatted b value; + +drop table src_multi2; + +create table src_multi2 like src; + +insert into table src_multi2 select subq.key, src.value from (select * from src union select * from src1)subq join src on subq.key=src.key; + +describe formatted src_multi2; + + +drop table nzhang_part14; + +create table if not exists nzhang_part14 (key string) + partitioned by (value string); + +insert into table nzhang_part14 partition(value) +select key, value from ( + select * from (select 'k1' as key, cast(null as string) as value from src limit 2)a + union all + select * from (select 'k2' as key, '' as value from src limit 2)b + union all + select * from (select 'k3' as key, ' ' as value from src limit 2)c +) T; + +explain select key from nzhang_part14; + + +drop table src5; + +create table src5 as select key, value from src limit 5; + +insert into table nzhang_part14 partition(value) +select key, value from src5; + +explain select key from nzhang_part14; + +drop table alter5; + +create table alter5 ( col1 string ) partitioned by (dt string); + +alter table alter5 add partition (dt='a'); + +describe formatted alter5 partition (dt='a'); + +insert into table alter5 partition (dt='a') select key from src ; + +describe formatted alter5 partition (dt='a'); + +explain select * from alter5 where dt='a'; + +drop table alter5; + +create table alter5 ( col1 string ) partitioned by (dt string); + +alter table alter5 add partition (dt='a') location 'parta'; + +describe formatted alter5 partition (dt='a'); + +insert into table alter5 partition (dt='a') select key from src ; + +describe formatted alter5 partition (dt='a'); + +explain select * from alter5 where dt='a'; + + +drop table src_stat_part; +create table src_stat_part(key string, value string) partitioned by (partitionId int); + +insert into table src_stat_part partition (partitionId=1) +select * from src1 limit 5; + +describe formatted src_stat_part PARTITION(partitionId=1); + +insert into table src_stat_part partition (partitionId=2) +select * from src1; + +describe formatted src_stat_part PARTITION(partitionId=2); + +drop table srcbucket_mapjoin; +CREATE TABLE srcbucket_mapjoin(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; +drop table tab_part; +CREATE TABLE tab_part (key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE; +drop table srcbucket_mapjoin_part; +CREATE TABLE srcbucket_mapjoin_part (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE; + +load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08'); +load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08'); + +load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08'); +load data local inpath '../../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08'); +load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08'); +load data local inpath '../../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08'); + +insert into table tab_part partition (ds='2008-04-08') +select key,value from srcbucket_mapjoin_part; + +describe formatted tab_part partition (ds='2008-04-08'); + +CREATE TABLE tab(key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; +insert into table tab partition (ds='2008-04-08') +select key,value from srcbucket_mapjoin; + +describe formatted tab partition (ds='2008-04-08'); + +drop table nzhang_part14; + +create table if not exists nzhang_part14 (key string, value string) + partitioned by (ds string, hr string); + +describe formatted nzhang_part14; + +insert into table nzhang_part14 partition(ds, hr) +select key, value, ds, hr from ( + select * from (select 'k1' as key, cast(null as string) as value, '1' as ds, '2' as hr from src limit 2)a + union all + select * from (select 'k2' as key, '' as value, '1' as ds, '3' as hr from src limit 2)b + union all + select * from (select 'k3' as key, ' ' as value, '2' as ds, '1' as hr from src limit 2)c +) T; + +desc formatted nzhang_part14 partition(ds='1', hr='3'); + + +INSERT into TABLE nzhang_part14 PARTITION (ds='2010-03-03', hr) +SELECT key, value, hr FROM srcpart WHERE ds is not null and hr>10; + +desc formatted nzhang_part14 PARTITION(ds='2010-03-03', hr='12'); + + +drop table nzhang_part14; +create table if not exists nzhang_part14 (key string, value string) +partitioned by (ds string, hr string); + +INSERT into TABLE nzhang_part14 PARTITION (ds='2010-03-03', hr) +SELECT key, value, hr FROM srcpart WHERE ds is not null and hr>10; + +desc formatted nzhang_part14 PARTITION(ds='2010-03-03', hr='12'); + +drop table a; +create table a (key string, value string) +partitioned by (ds string, hr string); + +drop table b; +create table b (key string, value string) +partitioned by (ds string, hr string); + +drop table c; +create table c (key string, value string) +partitioned by (ds string, hr string); + + +FROM srcpart +INSERT into TABLE a PARTITION (ds='2010-03-11', hr) SELECT key, value, hr WHERE ds is not null and hr>10 +INSERT into TABLE b PARTITION (ds='2010-04-11', hr) SELECT key, value, hr WHERE ds is not null and hr>11 +INSERT into TABLE c PARTITION (ds='2010-05-11', hr) SELECT key, value, hr WHERE hr>0; + +explain select key from a; +explain select value from b; +explain select key from b; +explain select value from c; +explain select key from c; +
