Repository: hive Updated Branches: refs/heads/branch-3 7948def19 -> 013403392
HIVE-19154: Poll notification events to invalidate the results cache (Jason Dere, reviewed by GopalV) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/01340339 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/01340339 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/01340339 Branch: refs/heads/branch-3 Commit: 0134033929e156588b7bf459dfac719096c468af Parents: 7948def Author: Jason Dere <jd...@hortonworks.com> Authored: Mon Apr 16 16:57:49 2018 -0700 Committer: Jason Dere <jd...@hortonworks.com> Committed: Mon Apr 16 16:57:49 2018 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 10 + .../test/resources/testconfiguration.properties | 1 + .../org/apache/hadoop/hive/ql/QTestUtil.java | 6 + .../ql/cache/results/QueryResultsCache.java | 166 +++++++- .../hive/ql/metadata/events/EventConsumer.java | 28 ++ .../metadata/events/NotificationEventPoll.java | 162 ++++++++ .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 3 +- .../results_cache_invalidation2.q | 63 +++ .../llap/results_cache_invalidation2.q.out | 380 +++++++++++++++++++ .../results_cache_invalidation2.q.out | 373 ++++++++++++++++++ .../apache/hive/service/server/HiveServer2.java | 7 + 11 files changed, 1178 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/01340339/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 9eb76e7..7dd16e3 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4291,6 +4291,16 @@ public class HiveConf extends Configuration { (long) 10 * 1024 * 1024, "Maximum size in bytes that a single query result is allowed to use in the results cache directory"), + HIVE_NOTFICATION_EVENT_POLL_INTERVAL("hive.notification.event.poll.interval", "60s", + new TimeValidator(TimeUnit.SECONDS), + "How often the notification log is polled for new NotificationEvents from the metastore." + + "A nonpositive value means the notification log is never polled."), + + HIVE_NOTFICATION_EVENT_CONSUMERS("hive.notification.event.consumers", + "org.apache.hadoop.hive.ql.cache.results.QueryResultsCache$InvalidationEventConsumer", + "Comma-separated list of class names extending EventConsumer," + + "to handle the NotificationEvents retreived by the notification event poll."), + /* BLOBSTORE section */ HIVE_BLOBSTORE_SUPPORTED_SCHEMES("hive.blobstore.supported.schemes", "s3,s3a,s3n", http://git-wip-us.apache.org/repos/asf/hive/blob/01340339/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 27e5feb..183dc4c 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -246,6 +246,7 @@ minillaplocal.shared.query.files=alter_merge_2_orc.q,\ results_cache_empty_result.q,\ results_cache_invalidation.q,\ results_cache_transactional.q,\ + results_cache_invalidation2.q,\ sample1.q,\ selectDistinctStar.q,\ select_dummy_source.q,\ http://git-wip-us.apache.org/repos/asf/hive/blob/01340339/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java ---------------------------------------------------------------------- diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index c33851f..2eb4889 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -91,6 +91,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveMaterializedViewsRegistry; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.metadata.events.NotificationEventPoll; import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ParseDriver; @@ -1055,6 +1056,7 @@ public class QTestUtil { } // Remove any cached results from the previous test. + NotificationEventPoll.shutdown(); QueryResultsCache.cleanupInstance(); // allocate and initialize a new conf since a test can @@ -2230,4 +2232,8 @@ public class QTestUtil { public QOutProcessor getQOutProcessor() { return qOutProcessor; } + + public static void initEventNotificationPoll() throws Exception { + NotificationEventPoll.initialize(SessionState.get().getConf()); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/01340339/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java index 90c8ec3..56a9faa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java @@ -23,6 +23,7 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -43,8 +44,11 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; import java.util.function.Supplier; +import java.util.stream.Stream; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -57,12 +61,17 @@ import org.apache.hadoop.hive.common.ValidTxnWriteIdList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.EventUtils; +import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.Entity.Type; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.metadata.events.EventConsumer; import org.apache.hadoop.hive.ql.parse.ColumnAccessInfo; import org.apache.hadoop.hive.ql.parse.TableAccessInfo; import org.apache.hadoop.hive.ql.plan.FetchWork; @@ -95,6 +104,7 @@ public final class QueryResultsCache { } public static class QueryInfo { + private long queryTime; private LookupInfo lookupInfo; private HiveOperation hiveOperation; private List<FieldSchema> resultSchema; @@ -103,12 +113,14 @@ public final class QueryResultsCache { private Set<ReadEntity> inputs; public QueryInfo( + long queryTime, LookupInfo lookupInfo, HiveOperation hiveOperation, List<FieldSchema> resultSchema, TableAccessInfo tableAccessInfo, ColumnAccessInfo columnAccessInfo, Set<ReadEntity> inputs) { + this.queryTime = queryTime; this.lookupInfo = lookupInfo; this.hiveOperation = hiveOperation; this.resultSchema = resultSchema; @@ -164,6 +176,14 @@ public final class QueryResultsCache { public void setInputs(Set<ReadEntity> inputs) { this.inputs = inputs; } + + public long getQueryTime() { + return queryTime; + } + + public void setQueryTime(long queryTime) { + this.queryTime = queryTime; + } } public enum CacheEntryStatus { @@ -176,7 +196,6 @@ public final class QueryResultsCache { private Path cachedResultsPath; // Cache administration - private long createTime; private long size; private AtomicInteger readers = new AtomicInteger(0); private ScheduledFuture<?> invalidationFuture = null; @@ -311,6 +330,12 @@ public final class QueryResultsCache { } } } + + public Stream<String> getTableNames() { + return queryInfo.getInputs().stream() + .filter(readEntity -> readEntity.getType() == Type.TABLE) + .map(readEntity -> readEntity.getTable().getFullyQualifiedName()); + } } // Allow lookup by query string @@ -321,6 +346,9 @@ public final class QueryResultsCache { private final Map<CacheEntry, CacheEntry> lru = Collections.synchronizedMap( new LinkedHashMap<CacheEntry, CacheEntry>(INITIAL_LRU_SIZE, LRU_LOAD_FACTOR, true)); + // Lookup of cache entries by table used in the query, for cache invalidation. + private final Map<String, Set<CacheEntry>> tableToEntryMap = new HashMap<>(); + private final HiveConf conf; private Path cacheDirPath; private Path zeroRowsPath; @@ -329,6 +357,7 @@ public final class QueryResultsCache { private long maxEntrySize; private long maxEntryLifetime; private ReadWriteLock rwLock = new ReentrantReadWriteLock(); + private ScheduledFuture<?> invalidationPollFuture; private QueryResultsCache(HiveConf configuration) throws IOException { this.conf = configuration; @@ -472,13 +501,11 @@ public final class QueryResultsCache { LOG.info("Adding placeholder cache entry for query '{}'", queryText); // Add the entry to the cache structures while under write lock. - Set<CacheEntry> entriesForQuery = queryMap.get(queryText); - if (entriesForQuery == null) { - entriesForQuery = new HashSet<CacheEntry>(); - queryMap.put(queryText, entriesForQuery); - } - entriesForQuery.add(addedEntry); + addToEntryMap(queryMap, queryText, addedEntry); lru.put(addedEntry, addedEntry); + // Index of entries by table usage. + addedEntry.getTableNames() + .forEach(tableName -> addToEntryMap(tableToEntryMap, tableName, addedEntry)); } finally { writeLock.unlock(); } @@ -544,7 +571,6 @@ public final class QueryResultsCache { cacheEntry.cachedResultsPath = cachedResultsPath; cacheEntry.size = resultSize; this.cacheSize += resultSize; - cacheEntry.createTime = System.currentTimeMillis(); cacheEntry.txnWriteIdList = txnWriteIdList; cacheEntry.setStatus(CacheEntryStatus.VALID); @@ -616,6 +642,32 @@ public final class QueryResultsCache { } } + public void notifyTableChanged(String dbName, String tableName, long updateTime) { + LOG.debug("Table changed: {}.{}, at {}", dbName, tableName, updateTime); + // Invalidate all cache entries using this table. + List<CacheEntry> entriesToInvalidate = null; + rwLock.writeLock().lock(); + try { + String key = (dbName.toLowerCase() + "." + tableName.toLowerCase()); + Set<CacheEntry> entriesForTable = tableToEntryMap.get(key); + if (entriesForTable != null) { + // Possible concurrent modification issues if we try to remove cache entries while + // traversing the cache structures. Save the entries to remove in a separate list. + entriesToInvalidate = new ArrayList<>(entriesForTable); + } + if (entriesToInvalidate != null) { + for (CacheEntry entry : entriesToInvalidate) { + // Ignore updates that occured before this cached query was created. + if (entry.getQueryInfo().getQueryTime() <= updateTime) { + removeEntry(entry); + } + } + } + } finally { + rwLock.writeLock().unlock(); + } + } + private static final int INITIAL_LRU_SIZE = 16; private static final float LRU_LOAD_FACTOR = 0.75f; private static final CacheEntry[] EMPTY_CACHEENTRY_ARRAY = {}; @@ -690,18 +742,13 @@ public final class QueryResultsCache { private void removeFromLookup(CacheEntry entry) { String queryString = entry.getQueryText(); - Set<CacheEntry> entries = queryMap.get(queryString); - if (entries == null) { - LOG.warn("ResultsCache: no entry for {}", queryString); - return; - } - boolean deleted = entries.remove(entry); - if (!deleted) { - LOG.warn("ResultsCache: Attempted to remove entry but it was not in the cache: {}", entry); - } - if (entries.isEmpty()) { - queryMap.remove(queryString); + if (!removeFromEntryMap(queryMap, queryString, entry)) { + LOG.warn("Attempted to remove entry but it was not in the cache: {}", entry); } + + // Remove this entry from the table usage mappings. + entry.getTableNames() + .forEach(tableName -> removeFromEntryMap(tableToEntryMap, tableName, entry)); } private void calculateEntrySize(CacheEntry entry, FetchWork fetchWork) throws IOException { @@ -783,13 +830,42 @@ public final class QueryResultsCache { return false; } + private static void addToEntryMap(Map<String, Set<CacheEntry>> entryMap, + String key, CacheEntry entry) { + Set<CacheEntry> entriesForKey = entryMap.get(key); + if (entriesForKey == null) { + entriesForKey = new HashSet<CacheEntry>(); + entryMap.put(key, entriesForKey); + } + entriesForKey.add(entry); + } + + private static boolean removeFromEntryMap(Map<String, Set<CacheEntry>> entryMap, + String key, CacheEntry entry) { + Set<CacheEntry> entries = entryMap.get(key); + if (entries == null) { + return false; + } + boolean deleted = entries.remove(entry); + if (!deleted) { + return false; + } + if (entries.isEmpty()) { + entryMap.remove(key); + } + return true; + } @VisibleForTesting public static void cleanupInstance() { // This should only ever be called in testing scenarios. // There should not be any other users of the cache or its entries or this may mess up cleanup. if (inited.get()) { - getInstance().clear(); + if (instance.invalidationPollFuture != null) { + instance.invalidationPollFuture.cancel(true); + instance.invalidationPollFuture = null; + } + instance.clear(); instance = null; inited.set(false); } @@ -880,4 +956,54 @@ public final class QueryResultsCache { metrics.addGauge(MetricsConstant.QC_MAX_SIZE, maxCacheSize); metrics.addGauge(MetricsConstant.QC_CURRENT_SIZE, curCacheSize); } + + // EventConsumer to invalidate cache entries based on metastore notification events (alter table, add partition, etc). + public static class InvalidationEventConsumer implements EventConsumer { + Configuration conf; + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public void accept(NotificationEvent event) { + String dbName; + String tableName; + + switch (event.getEventType()) { + case MessageFactory.ADD_PARTITION_EVENT: + case MessageFactory.ALTER_PARTITION_EVENT: + case MessageFactory.DROP_PARTITION_EVENT: + case MessageFactory.ALTER_TABLE_EVENT: + case MessageFactory.DROP_TABLE_EVENT: + case MessageFactory.INSERT_EVENT: + dbName = event.getDbName(); + tableName = event.getTableName(); + break; + default: + return; + } + + if (dbName == null || tableName == null) { + LOG.info("Possibly malformed notification event, missing db or table name: {}", event); + return; + } + + LOG.debug("Handling event {} on table {}.{}", event.getEventType(), dbName, tableName); + + QueryResultsCache cache = QueryResultsCache.getInstance(); + if (cache != null) { + long eventTime = event.getEventTime() * 1000L; + cache.notifyTableChanged(dbName, tableName, eventTime); + } else { + LOG.debug("Cache not instantiated, skipping event on {}.{}", dbName, tableName); + } + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/01340339/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventConsumer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventConsumer.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventConsumer.java new file mode 100644 index 0000000..32e3308 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventConsumer.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.metadata.events; + +import java.util.function.Consumer; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; + +public interface EventConsumer extends Configurable, Consumer<NotificationEvent> { + +} http://git-wip-us.apache.org/repos/asf/hive/blob/01340339/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java new file mode 100644 index 0000000..c35ca44 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/NotificationEventPoll.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.metadata.events; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.EventUtils; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandler; +import org.apache.hadoop.util.ReflectionUtils; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NotificationEventPoll { + private static final Logger LOG = LoggerFactory.getLogger(NotificationEventPoll.class); + private static final AtomicBoolean inited = new AtomicBoolean(false); + private static NotificationEventPoll instance; + + Configuration conf; + ScheduledExecutorService executorService; + List<EventConsumer> eventConsumers = new ArrayList<>(); + ScheduledFuture<?> pollFuture; + long lastCheckedEventId; + + public static void initialize(Configuration conf) throws Exception { + if (!inited.getAndSet(true)) { + try { + instance = new NotificationEventPoll(conf); + } catch (Exception err) { + inited.set(false); + throw err; + } + } + } + + public static void shutdown() { + // Should only be called for testing. + if (inited.get()) { + instance.stop(); + instance = null; + inited.set(false); + } + } + + private NotificationEventPoll(Configuration conf) throws Exception { + this.conf = conf; + + long pollInterval = HiveConf.getTimeVar(conf, + HiveConf.ConfVars.HIVE_NOTFICATION_EVENT_POLL_INTERVAL, TimeUnit.MILLISECONDS); + if (pollInterval <= 0) { + LOG.debug("Non-positive poll interval configured, notification event polling disabled"); + return; + } + + // Initialize the list of event handlers + String[] consumerClassNames = + conf.getStrings(HiveConf.ConfVars.HIVE_NOTFICATION_EVENT_CONSUMERS.varname); + if (consumerClassNames != null && consumerClassNames.length > 0) { + for (String consumerClassName : consumerClassNames) { + Class<?> consumerClass = JavaUtils.loadClass(consumerClassName); + EventConsumer consumer = + (EventConsumer) ReflectionUtils.newInstance(consumerClass, conf); + eventConsumers.add(consumer); + } + } else { + LOG.debug("No event consumers configured, notification event polling disabled"); + return; + } + + EventUtils.MSClientNotificationFetcher evFetcher + = new EventUtils.MSClientNotificationFetcher(Hive.get().getMSC()); + lastCheckedEventId = evFetcher.getCurrentNotificationEventId(); + LOG.info("Initializing lastCheckedEventId to {}", lastCheckedEventId); + + // Start the scheduled poll task + ThreadFactory threadFactory = + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("NotificationEventPoll %d") + .build(); + executorService = Executors.newSingleThreadScheduledExecutor(threadFactory); + pollFuture = executorService.scheduleAtFixedRate(new Poller(), + pollInterval, pollInterval, TimeUnit.MILLISECONDS); + } + + private void stop() { + if (pollFuture != null) { + pollFuture.cancel(true); + pollFuture = null; + } + if (executorService != null) { + executorService.shutdown(); + executorService = null; + } + } + + class Poller implements Runnable { + @Override + public void run() { + LOG.debug("Polling for notification events"); + + int eventsProcessed = 0; + try { + // Get any new notification events that have been since the last time we checked, + // And pass them on to the event handlers. + EventUtils.MSClientNotificationFetcher evFetcher + = new EventUtils.MSClientNotificationFetcher(Hive.get().getMSC()); + EventUtils.NotificationEventIterator evIter = + new EventUtils.NotificationEventIterator(evFetcher, lastCheckedEventId, 0, "*", null); + + while (evIter.hasNext()) { + NotificationEvent event = evIter.next(); + LOG.debug("Event: " + event); + for (EventConsumer eventConsumer : eventConsumers) { + try { + eventConsumer.accept(event); + } catch (Exception err) { + LOG.error("Error processing notification event " + event, err); + } + } + eventsProcessed++; + lastCheckedEventId = event.getEventId(); + } + } catch (Exception err) { + LOG.error("Error polling for notification events", err); + } + + LOG.debug("Processed {} notification events", eventsProcessed); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/01340339/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 6df22a2..b6282e0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -14582,7 +14582,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } private QueryResultsCache.QueryInfo createCacheQueryInfoForQuery(QueryResultsCache.LookupInfo lookupInfo) { - return new QueryResultsCache.QueryInfo(lookupInfo, queryState.getHiveOperation(), + long queryTime = SessionState.get().getQueryCurrentTimestamp().getTime(); + return new QueryResultsCache.QueryInfo(queryTime, lookupInfo, queryState.getHiveOperation(), resultSchema, getTableAccessInfo(), getColumnAccessInfo(), inputs); } http://git-wip-us.apache.org/repos/asf/hive/blob/01340339/ql/src/test/queries/clientpositive/results_cache_invalidation2.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/results_cache_invalidation2.q b/ql/src/test/queries/clientpositive/results_cache_invalidation2.q new file mode 100644 index 0000000..1667ceb --- /dev/null +++ b/ql/src/test/queries/clientpositive/results_cache_invalidation2.q @@ -0,0 +1,63 @@ +set hive.metastore.event.listeners=org.apache.hive.hcatalog.listener.DbNotificationListener; + +set hive.query.results.cache.enabled=true; +-- Enable this setting when HIVE-18609 is in +--set hive.query.results.cache.nontransactional.tables.enabled=true; + +set hive.fetch.task.conversion=more; +-- Start polling the notification events +set hive.notification.event.poll.interval=2s; +select reflect('org.apache.hadoop.hive.ql.QTestUtil', 'initEventNotificationPoll'); + +create table tab1 stored as textfile as select * from src; +insert into tab1 select * from src; + +create table tab2 (key string, value string) stored as textfile; +insert into tab2 select * from src; + +-- Run queries which should be saved by the cache. +select count(*) from tab1 where key > 0; +select count(*) from tab1 join tab2 on (tab1.key = tab2.key); +select count(*) from tab2 where key > 0; + +set test.comment="Cached entry should be used"; +set test.comment; +explain +select count(*) from tab1 where key > 0; +select count(*) from tab1 where key > 0; + +set test.comment="Cached entry should be used"; +set test.comment; +explain +select count(*) from tab1 join tab2 on (tab1.key = tab2.key); +select count(*) from tab1 join tab2 on (tab1.key = tab2.key); + +set test.comment="Cached entry should be used"; +set test.comment; +explain +select count(*) from tab2 where key > 0; +select count(*) from tab2 where key > 0; + +-- Update tab1 +insert into tab1 select * from src; + +-- Run a query long enough that the invalidation check can run. +select reflect("java.lang.Thread", 'sleep', cast(4000 as bigint)); + +set test.comment="Cached entry should be invalidated - query should not use cache"; +set test.comment; +explain +select count(*) from tab1 where key > 0; +select count(*) from tab1 where key > 0; + +set test.comment="Cached entry should be invalidated - query should not use cache"; +set test.comment; +explain +select count(*) from tab1 join tab2 on (tab1.key = tab2.key); +select count(*) from tab1 join tab2 on (tab1.key = tab2.key); + +set test.comment="tab2 was not modified, this query should still use cache"; +set test.comment; +explain +select count(*) from tab2 where key > 0; +select count(*) from tab2 where key > 0; http://git-wip-us.apache.org/repos/asf/hive/blob/01340339/ql/src/test/results/clientpositive/llap/results_cache_invalidation2.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/results_cache_invalidation2.q.out b/ql/src/test/results/clientpositive/llap/results_cache_invalidation2.q.out new file mode 100644 index 0000000..5e19cd2 --- /dev/null +++ b/ql/src/test/results/clientpositive/llap/results_cache_invalidation2.q.out @@ -0,0 +1,380 @@ +PREHOOK: query: select reflect('org.apache.hadoop.hive.ql.QTestUtil', 'initEventNotificationPoll') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +#### A masked pattern was here #### +POSTHOOK: query: select reflect('org.apache.hadoop.hive.ql.QTestUtil', 'initEventNotificationPoll') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +#### A masked pattern was here #### +null +PREHOOK: query: create table tab1 stored as textfile as select * from src +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@src +PREHOOK: Output: database:default +PREHOOK: Output: default@tab1 +POSTHOOK: query: create table tab1 stored as textfile as select * from src +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@src +POSTHOOK: Output: database:default +POSTHOOK: Output: default@tab1 +POSTHOOK: Lineage: tab1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tab1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: insert into tab1 select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@tab1 +POSTHOOK: query: insert into tab1 select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@tab1 +POSTHOOK: Lineage: tab1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tab1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: create table tab2 (key string, value string) stored as textfile +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@tab2 +POSTHOOK: query: create table tab2 (key string, value string) stored as textfile +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@tab2 +PREHOOK: query: insert into tab2 select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@tab2 +POSTHOOK: query: insert into tab2 select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@tab2 +POSTHOOK: Lineage: tab2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tab2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: select count(*) from tab1 where key > 0 +PREHOOK: type: QUERY +PREHOOK: Input: default@tab1 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from tab1 where key > 0 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab1 +#### A masked pattern was here #### +994 +PREHOOK: query: select count(*) from tab1 join tab2 on (tab1.key = tab2.key) +PREHOOK: type: QUERY +PREHOOK: Input: default@tab1 +PREHOOK: Input: default@tab2 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from tab1 join tab2 on (tab1.key = tab2.key) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab1 +POSTHOOK: Input: default@tab2 +#### A masked pattern was here #### +2056 +PREHOOK: query: select count(*) from tab2 where key > 0 +PREHOOK: type: QUERY +PREHOOK: Input: default@tab2 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from tab2 where key > 0 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab2 +#### A masked pattern was here #### +497 +test.comment="Cached entry should be used" +PREHOOK: query: explain +select count(*) from tab1 where key > 0 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from tab1 where key > 0 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + Cached Query Result: true + +PREHOOK: query: select count(*) from tab1 where key > 0 +PREHOOK: type: QUERY +PREHOOK: Input: default@tab1 +POSTHOOK: query: select count(*) from tab1 where key > 0 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab1 +994 +test.comment="Cached entry should be used" +PREHOOK: query: explain +select count(*) from tab1 join tab2 on (tab1.key = tab2.key) +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from tab1 join tab2 on (tab1.key = tab2.key) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + Cached Query Result: true + +PREHOOK: query: select count(*) from tab1 join tab2 on (tab1.key = tab2.key) +PREHOOK: type: QUERY +PREHOOK: Input: default@tab1 +PREHOOK: Input: default@tab2 +POSTHOOK: query: select count(*) from tab1 join tab2 on (tab1.key = tab2.key) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab1 +POSTHOOK: Input: default@tab2 +2056 +test.comment="Cached entry should be used" +PREHOOK: query: explain +select count(*) from tab2 where key > 0 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from tab2 where key > 0 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + Cached Query Result: true + +PREHOOK: query: select count(*) from tab2 where key > 0 +PREHOOK: type: QUERY +PREHOOK: Input: default@tab2 +POSTHOOK: query: select count(*) from tab2 where key > 0 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab2 +497 +PREHOOK: query: insert into tab1 select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@tab1 +POSTHOOK: query: insert into tab1 select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@tab1 +POSTHOOK: Lineage: tab1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tab1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: select reflect("java.lang.Thread", 'sleep', cast(4000 as bigint)) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +#### A masked pattern was here #### +POSTHOOK: query: select reflect("java.lang.Thread", 'sleep', cast(4000 as bigint)) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +#### A masked pattern was here #### +null +test.comment="Cached entry should be invalidated - query should not use cache" +PREHOOK: query: explain +select count(*) from tab1 where key > 0 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from tab1 where key > 0 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: tab1 + Statistics: Num rows: 1500 Data size: 262384 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (UDFToDouble(key) > 0.0D) (type: boolean) + Statistics: Num rows: 500 Data size: 87461 Basic stats: COMPLETE Column stats: NONE + Select Operator + Statistics: Num rows: 500 Data size: 87461 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Execution mode: llap + LLAP IO: no inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from tab1 where key > 0 +PREHOOK: type: QUERY +PREHOOK: Input: default@tab1 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from tab1 where key > 0 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab1 +#### A masked pattern was here #### +1491 +test.comment="Cached entry should be invalidated - query should not use cache" +PREHOOK: query: explain +select count(*) from tab1 join tab2 on (tab1.key = tab2.key) +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from tab1 join tab2 on (tab1.key = tab2.key) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: tab1 + Statistics: Num rows: 1500 Data size: 262384 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 1425 Data size: 249264 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 1425 Data size: 249264 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 1425 Data size: 249264 Basic stats: COMPLETE Column stats: NONE + Execution mode: llap + LLAP IO: no inputs + Map 4 + Map Operator Tree: + TableScan + alias: tab2 + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Execution mode: llap + LLAP IO: no inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + Statistics: Num rows: 1567 Data size: 274190 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reducer 3 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from tab1 join tab2 on (tab1.key = tab2.key) +PREHOOK: type: QUERY +PREHOOK: Input: default@tab1 +PREHOOK: Input: default@tab2 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from tab1 join tab2 on (tab1.key = tab2.key) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab1 +POSTHOOK: Input: default@tab2 +#### A masked pattern was here #### +3084 +test.comment="tab2 was not modified, this query should still use cache" +PREHOOK: query: explain +select count(*) from tab2 where key > 0 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from tab2 where key > 0 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + Cached Query Result: true + +PREHOOK: query: select count(*) from tab2 where key > 0 +PREHOOK: type: QUERY +PREHOOK: Input: default@tab2 +POSTHOOK: query: select count(*) from tab2 where key > 0 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab2 +497 http://git-wip-us.apache.org/repos/asf/hive/blob/01340339/ql/src/test/results/clientpositive/results_cache_invalidation2.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/results_cache_invalidation2.q.out b/ql/src/test/results/clientpositive/results_cache_invalidation2.q.out new file mode 100644 index 0000000..50c8adf --- /dev/null +++ b/ql/src/test/results/clientpositive/results_cache_invalidation2.q.out @@ -0,0 +1,373 @@ +PREHOOK: query: select reflect('org.apache.hadoop.hive.ql.QTestUtil', 'initEventNotificationPoll') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +#### A masked pattern was here #### +POSTHOOK: query: select reflect('org.apache.hadoop.hive.ql.QTestUtil', 'initEventNotificationPoll') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +#### A masked pattern was here #### +null +PREHOOK: query: create table tab1 stored as textfile as select * from src +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@src +PREHOOK: Output: database:default +PREHOOK: Output: default@tab1 +POSTHOOK: query: create table tab1 stored as textfile as select * from src +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@src +POSTHOOK: Output: database:default +POSTHOOK: Output: default@tab1 +POSTHOOK: Lineage: tab1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tab1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: insert into tab1 select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@tab1 +POSTHOOK: query: insert into tab1 select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@tab1 +POSTHOOK: Lineage: tab1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tab1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: create table tab2 (key string, value string) stored as textfile +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@tab2 +POSTHOOK: query: create table tab2 (key string, value string) stored as textfile +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@tab2 +PREHOOK: query: insert into tab2 select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@tab2 +POSTHOOK: query: insert into tab2 select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@tab2 +POSTHOOK: Lineage: tab2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tab2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: select count(*) from tab1 where key > 0 +PREHOOK: type: QUERY +PREHOOK: Input: default@tab1 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from tab1 where key > 0 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab1 +#### A masked pattern was here #### +994 +PREHOOK: query: select count(*) from tab1 join tab2 on (tab1.key = tab2.key) +PREHOOK: type: QUERY +PREHOOK: Input: default@tab1 +PREHOOK: Input: default@tab2 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from tab1 join tab2 on (tab1.key = tab2.key) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab1 +POSTHOOK: Input: default@tab2 +#### A masked pattern was here #### +2056 +PREHOOK: query: select count(*) from tab2 where key > 0 +PREHOOK: type: QUERY +PREHOOK: Input: default@tab2 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from tab2 where key > 0 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab2 +#### A masked pattern was here #### +497 +test.comment="Cached entry should be used" +PREHOOK: query: explain +select count(*) from tab1 where key > 0 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from tab1 where key > 0 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + Cached Query Result: true + +PREHOOK: query: select count(*) from tab1 where key > 0 +PREHOOK: type: QUERY +PREHOOK: Input: default@tab1 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from tab1 where key > 0 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab1 +#### A masked pattern was here #### +994 +test.comment="Cached entry should be used" +PREHOOK: query: explain +select count(*) from tab1 join tab2 on (tab1.key = tab2.key) +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from tab1 join tab2 on (tab1.key = tab2.key) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + Cached Query Result: true + +PREHOOK: query: select count(*) from tab1 join tab2 on (tab1.key = tab2.key) +PREHOOK: type: QUERY +PREHOOK: Input: default@tab1 +PREHOOK: Input: default@tab2 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from tab1 join tab2 on (tab1.key = tab2.key) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab1 +POSTHOOK: Input: default@tab2 +#### A masked pattern was here #### +2056 +test.comment="Cached entry should be used" +PREHOOK: query: explain +select count(*) from tab2 where key > 0 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from tab2 where key > 0 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + Cached Query Result: true + +PREHOOK: query: select count(*) from tab2 where key > 0 +PREHOOK: type: QUERY +PREHOOK: Input: default@tab2 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from tab2 where key > 0 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab2 +#### A masked pattern was here #### +497 +PREHOOK: query: insert into tab1 select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@tab1 +POSTHOOK: query: insert into tab1 select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@tab1 +POSTHOOK: Lineage: tab1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tab1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: select reflect("java.lang.Thread", 'sleep', cast(4000 as bigint)) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +#### A masked pattern was here #### +POSTHOOK: query: select reflect("java.lang.Thread", 'sleep', cast(4000 as bigint)) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +#### A masked pattern was here #### +null +test.comment="Cached entry should be invalidated - query should not use cache" +PREHOOK: query: explain +select count(*) from tab1 where key > 0 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from tab1 where key > 0 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: tab1 + Statistics: Num rows: 1500 Data size: 15936 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (UDFToDouble(key) > 0.0D) (type: boolean) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from tab1 where key > 0 +PREHOOK: type: QUERY +PREHOOK: Input: default@tab1 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from tab1 where key > 0 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab1 +#### A masked pattern was here #### +1491 +test.comment="Cached entry should be invalidated - query should not use cache" +PREHOOK: query: explain +select count(*) from tab1 join tab2 on (tab1.key = tab2.key) +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from tab1 join tab2 on (tab1.key = tab2.key) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: tab1 + Statistics: Num rows: 1500 Data size: 15936 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 1500 Data size: 15936 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 1500 Data size: 15936 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 1500 Data size: 15936 Basic stats: COMPLETE Column stats: NONE + TableScan + alias: tab2 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + Statistics: Num rows: 1650 Data size: 17529 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-2 + Map Reduce + Map Operator Tree: + TableScan + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from tab1 join tab2 on (tab1.key = tab2.key) +PREHOOK: type: QUERY +PREHOOK: Input: default@tab1 +PREHOOK: Input: default@tab2 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from tab1 join tab2 on (tab1.key = tab2.key) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab1 +POSTHOOK: Input: default@tab2 +#### A masked pattern was here #### +3084 +test.comment="tab2 was not modified, this query should still use cache" +PREHOOK: query: explain +select count(*) from tab2 where key > 0 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from tab2 where key > 0 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + Cached Query Result: true + +PREHOOK: query: select count(*) from tab2 where key > 0 +PREHOOK: type: QUERY +PREHOOK: Input: default@tab2 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from tab2 where key > 0 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab2 +#### A masked pattern was here #### +497 http://git-wip-us.apache.org/repos/asf/hive/blob/01340339/service/src/java/org/apache/hive/service/server/HiveServer2.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index 47f84b5..f52e3e8 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -80,6 +80,7 @@ import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveMaterializedViewsRegistry; +import org.apache.hadoop.hive.ql.metadata.events.NotificationEventPoll; import org.apache.hadoop.hive.ql.session.ClearDanglingScratchDir; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper; @@ -233,6 +234,12 @@ public class HiveServer2 extends CompositeService { } } + try { + NotificationEventPoll.initialize(hiveConf); + } catch (Exception err) { + throw new RuntimeException("Error initializing notification event poll", err); + } + wmQueue = hiveConf.get(ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE.varname, "").trim(); this.serviceDiscovery = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY);