This is an automated email from the ASF dual-hosted git repository. abstractdog pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 3482c0b2d05 HIVE-28620: Query result is cached in case of IOWD if the subquery is not trivial (#5537) (Laszlo Bodor reviewed by Denys Kuzmenko, Krisztian Kasa) 3482c0b2d05 is described below commit 3482c0b2d056715105e1f2cb947fd11348cb64e5 Author: Bodor Laszlo <bodorlaszlo0...@gmail.com> AuthorDate: Wed Nov 13 11:34:20 2024 +0100 HIVE-28620: Query result is cached in case of IOWD if the subquery is not trivial (#5537) (Laszlo Bodor reviewed by Denys Kuzmenko, Krisztian Kasa) --- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 26 +++-- .../hadoop/hive/ql/parse/TestSemanticAnalyzer.java | 114 +++++++++++++++++++++ 2 files changed, 131 insertions(+), 9 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 0c668b93f6f..5f811571241 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -7436,8 +7436,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } } - private Path getDestinationFilePath(final String destinationFile, boolean isMmTable) { - if (this.isResultsCacheEnabled() && this.queryTypeCanUseCache()) { + private Path getDestinationFilePath(QB qb, final String destinationFile, boolean isMmTable) { + if (this.isResultsCacheEnabled() && this.queryTypeCanUseCache(qb)) { assert (!isMmTable); QueryResultsCache instance = QueryResultsCache.getInstance(); // QueryResultsCache should have been initialized by now @@ -7775,7 +7775,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { isLocal = true; // fall through case QBMetaData.DEST_DFS_FILE: { - destinationPath = getDestinationFilePath(qbm.getDestFileForAlias(dest), isMmTable); + destinationPath = getDestinationFilePath(qb, qbm.getDestFileForAlias(dest), isMmTable); // CTAS case: the file output format and serde are defined by the create // table command rather than taking the default value @@ -13139,7 +13139,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // Otherwise we have to wait until after the masking/filtering step. boolean isCacheEnabled = isResultsCacheEnabled(); QueryResultsCache.LookupInfo lookupInfo = null; - if (isCacheEnabled && !needsTransform && queryTypeCanUseCache()) { + if (isCacheEnabled && !needsTransform && queryTypeCanUseCache(qb)) { lookupInfo = createLookupInfoForQuery(ast); if (checkResultsCache(lookupInfo, false)) { return; @@ -13200,7 +13200,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // Check query results cache // In the case that row or column masking/filtering was required, we do not support caching. // TODO: Enable caching for queries with masking/filtering - if (isCacheEnabled && needsTransform && !usesMasking && queryTypeCanUseCache()) { + if (isCacheEnabled && needsTransform && !usesMasking && queryTypeCanUseCache(qb)) { lookupInfo = createLookupInfoForQuery(ast); if (checkResultsCache(lookupInfo, false)) { return; @@ -15840,25 +15840,33 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { /** * Some initial checks for a query to see if we can look this query up in the results cache. */ - private boolean queryTypeCanUseCache() { - if (this.qb == null || this.qb.getParseInfo() == null) { + private boolean queryTypeCanUseCache(QB qb) { + if (qb == null || qb.getParseInfo() == null) { return false; } if (this instanceof ColumnStatsSemanticAnalyzer) { // Column stats generates "select compute_stats() .." queries. // Disable caching for these. + LOG.debug("Query type cannot use cache (ColumnStatsSemanticAnalyzer)"); return false; } if (queryState.getHiveOperation() != HiveOperation.QUERY) { + LOG.debug("Query type cannot use cache (HiveOperation is not a QUERY)"); return false; } if (Optional.of(qb.getParseInfo()).filter(pi -> pi.isAnalyzeCommand() || pi.hasInsertTables() || pi.isInsertOverwriteDirectory()) .isPresent()) { + LOG.debug("Query type cannot use cache (analyze, insert, or IOWD)"); return false; } // HIVE-19096 - disable for explain and explain analyze - return ctx.getExplainAnalyze() == null; + if (ctx.getExplainAnalyze() != null) { + LOG.debug("Query type cannot use cache (explain analyze command)"); + return false; + } + + return true; } private boolean needsTransform() { @@ -15871,7 +15879,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { * can be added to the results cache. */ private boolean queryCanBeCached() { - if (!queryTypeCanUseCache()) { + if (!queryTypeCanUseCache(qb)) { LOG.info("Not eligible for results caching - wrong query type"); return false; } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestSemanticAnalyzer.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestSemanticAnalyzer.java index 0ab1c70e37c..3f1d95e32d3 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestSemanticAnalyzer.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestSemanticAnalyzer.java @@ -18,20 +18,68 @@ package org.apache.hadoop.hive.ql.parse; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.hadoop.hive.common.type.Date; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConfForTest; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.security.HadoopDefaultAuthenticator; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.io.DateWritableV2; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.stubbing.Answer; public class TestSemanticAnalyzer { + private static Hive db; + private static HiveConf conf; + + @BeforeClass + public static void beforeClass() throws Exception { + conf = new HiveConfForTest(TestSemanticAnalyzer.class); + conf.set("hive.security.authorization.enabled", "false"); + conf.set("hive.security.authorization.manager", + "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdConfOnlyAuthorizerFactory"); + db = Hive.get(conf); + + // table1 (col1 string, col2 int) + createKeyValueTable("table1"); + createKeyValueTable("table2"); + createKeyValueTable("table3"); + } + + private static void createKeyValueTable(String tableName) throws Exception { + Table table = new Table("default", tableName); + List<FieldSchema> columns = new ArrayList<>(); + columns.add(new FieldSchema("key", "string", "First column")); + columns.add(new FieldSchema("value", "int", "Second column")); + table.setFields(columns); // Set columns + db.createTable(table); + } + + @AfterClass + public static void afterClass() { + db.close(true); + } + @Test public void testNormalizeColSpec() throws Exception { // Hive normalizes partition spec for dates to yyyy-mm-dd format. Some versions of Java will @@ -134,4 +182,70 @@ public class TestSemanticAnalyzer { hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_SERVICE_USERS, "u1,u2,u3"); assertTrue(analyzer.skipAuthorization()); } + + @Test + public void testSelectCacheable() throws Exception { + checkQueryCanUseCache("SELECT key from table1", true); + } + + @Test + public void testInsertCacheable() throws Exception { + checkQueryCanUseCache("INSERT INTO table1 VALUES ('asdf', 2)", false); + } + + @Test + public void testInsertOverwriteDirectoryCacheable() throws Exception { + checkQueryCanUseCache("INSERT OVERWRITE DIRECTORY '/tmp' SELECT key FROM table2", false); + } + + @Test + public void testInsertOverwriteDirectoryWithNonTrivialSubqueryCacheable() throws Exception { + checkQueryCanUseCache("insert overwrite directory '/tmp' " + + "SELECT a.key, MAX(b.value) AS MAX_VALUE, COUNT(DISTINCT b.key) AS UNIQUE_KEYS, AVG(c.value) AS VALS " + + "FROM table1 a " + + "JOIN table2 b ON a.key = b.key " + + "JOIN table3 c ON a.key = c.key " + + "GROUP BY a.key HAVING AVG(LENGTH(a.key) + LENGTH(b.key)) > 5 " + + "ORDER BY MAX_VALUE DESC, UNIQUE_KEYS ASC" + , false); + } + + private void checkQueryCanUseCache(String query, boolean canUseCache) throws Exception { + conf.setBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED, true); + conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + QueryResultsCache.initialize(conf); + QueryResultsCache cache = QueryResultsCache.getInstance(); + String cacheDirPath = cache.getCacheDirPath().toUri().getPath(); + + SessionState.start(conf); + Context ctx = new Context(conf); + ASTNode astNode = ParseUtils.parse(query, ctx); + QueryState queryState = new QueryState.Builder().withHiveConf(conf).build(); + SemanticAnalyzer analyzer = spy((SemanticAnalyzer) SemanticAnalyzerFactory.get(queryState, astNode)); + + analyzer.initCtx(ctx); + + List<Operator<?>> capturedValues = new ArrayList<>(); + doAnswer((Answer<Operator<?>>) invocation -> { + Operator<?> fileSinkOperator = (Operator<?>) invocation.callRealMethod(); // Call the actual method + capturedValues.add(fileSinkOperator); + return fileSinkOperator; + }).when(analyzer).genFileSinkPlan(anyString(), any(QB.class), any(Operator.class)); + + analyzer.analyze(astNode, ctx); + + // this is a soft assertion, and doesn't reflect the goal of this unit test, + Assert.assertEquals("genFileSinkPlan is supposed to be called once during semantic analysis", + 1, capturedValues.size()); + FileSinkOperator operator = (FileSinkOperator) capturedValues.get(0); + String finalPath = operator.getConf().getDestPath().toUri().toString(); + + if (canUseCache) { + Assert.assertTrue(String.format("Final path %s is not in the cache folder (%s), which is unexpected", + finalPath, cacheDirPath), finalPath.contains(cacheDirPath)); + } else { + assertFalse(String.format("Final path %s is in cache folder (%s), which is unexpected", + finalPath, cacheDirPath), finalPath.contains(cacheDirPath)); + } + } }